Performance: improve need computation

This commit is contained in:
Jakob Borg 2014-01-23 22:20:15 +01:00
parent fc2ebc6cad
commit ea5ef28c5a
1 changed files with 107 additions and 52 deletions

View File

@ -259,6 +259,11 @@ func (m *Model) NeedFiles() (files []File, bytes int) {
// Index is called when a new node is connected and we receive their full index. // Index is called when a new node is connected and we receive their full index.
// Implements the protocol.Model interface. // Implements the protocol.Model interface.
func (m *Model) Index(nodeID string, fs []protocol.FileInfo) { func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
var files = make([]File, len(fs))
for i := range fs {
files[i] = fileFromFileInfo(fs[i])
}
m.imut.Lock() m.imut.Lock()
defer m.imut.Unlock() defer m.imut.Unlock()
@ -267,7 +272,7 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
} }
repo := make(map[string]File) repo := make(map[string]File)
for _, f := range fs { for _, f := range files {
m.indexUpdate(repo, f) m.indexUpdate(repo, f)
} }
@ -276,17 +281,22 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
m.rmut.Unlock() m.rmut.Unlock()
m.recomputeGlobal() m.recomputeGlobal()
m.recomputeNeed() m.recomputeNeedForFiles(files)
} }
// IndexUpdate is called for incremental updates to connected nodes' indexes. // IndexUpdate is called for incremental updates to connected nodes' indexes.
// Implements the protocol.Model interface. // Implements the protocol.Model interface.
func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) { func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
var files = make([]File, len(fs))
for i := range fs {
files[i] = fileFromFileInfo(fs[i])
}
m.imut.Lock() m.imut.Lock()
defer m.imut.Unlock() defer m.imut.Unlock()
if m.trace["net"] { if m.trace["net"] {
log.Printf("NET IDXUP(in): %s: %d files", nodeID, len(fs)) log.Printf("NET IDXUP(in): %s: %d files", nodeID, len(files))
} }
m.rmut.Lock() m.rmut.Lock()
@ -297,16 +307,16 @@ func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
return return
} }
for _, f := range fs { for _, f := range files {
m.indexUpdate(repo, f) m.indexUpdate(repo, f)
} }
m.rmut.Unlock() m.rmut.Unlock()
m.recomputeGlobal() m.recomputeGlobal()
m.recomputeNeed() m.recomputeNeedForFiles(files)
} }
func (m *Model) indexUpdate(repo map[string]File, f protocol.FileInfo) { func (m *Model) indexUpdate(repo map[string]File, f File) {
if m.trace["idx"] { if m.trace["idx"] {
var flagComment string var flagComment string
if f.Flags&protocol.FlagDeleted != 0 { if f.Flags&protocol.FlagDeleted != 0 {
@ -320,7 +330,7 @@ func (m *Model) indexUpdate(repo map[string]File, f protocol.FileInfo) {
return return
} }
repo[f.Name] = fileFromFileInfo(f) repo[f.Name] = f
} }
// Close removes the peer from the model and closes the underlying connection if possible. // Close removes the peer from the model and closes the underlying connection if possible.
@ -344,7 +354,7 @@ func (m *Model) Close(node string, err error) {
m.pmut.Unlock() m.pmut.Unlock()
m.recomputeGlobal() m.recomputeGlobal()
m.recomputeNeed() m.recomputeNeedForGlobal()
} }
// Request returns the specified data segment by reading it from local disk. // Request returns the specified data segment by reading it from local disk.
@ -422,7 +432,7 @@ func (m *Model) ReplaceLocal(fs []File) {
m.lmut.Unlock() m.lmut.Unlock()
m.recomputeGlobal() m.recomputeGlobal()
m.recomputeNeed() m.recomputeNeedForGlobal()
m.umut.Lock() m.umut.Lock()
m.updatedLocal = time.Now().Unix() m.updatedLocal = time.Now().Unix()
@ -443,7 +453,7 @@ func (m *Model) SeedLocal(fs []protocol.FileInfo) {
m.lmut.Unlock() m.lmut.Unlock()
m.recomputeGlobal() m.recomputeGlobal()
m.recomputeNeed() m.recomputeNeedForGlobal()
} }
// ConnectedTo returns true if we are connected to the named node. // ConnectedTo returns true if we are connected to the named node.
@ -644,6 +654,25 @@ func (m *Model) updateLocal(f File) {
} }
} }
/*
XXX: Not done, needs elegant handling of availability
func (m *Model) recomputeGlobalFor(files []File) bool {
m.gmut.Lock()
defer m.gmut.Unlock()
var updated bool
for _, f := range files {
if gf, ok := m.global[f.Name]; !ok || f.NewerThan(gf) {
m.global[f.Name] = f
updated = true
// Fix availability
}
}
return updated
}
*/
func (m *Model) recomputeGlobal() { func (m *Model) recomputeGlobal() {
var newGlobal = make(map[string]File) var newGlobal = make(map[string]File)
@ -702,54 +731,20 @@ func (m *Model) recomputeGlobal() {
} }
} }
func (m *Model) recomputeNeed() { type addOrder struct {
type addOrder struct { n string
n string remote []Block
remote []Block fm *fileMonitor
fm *fileMonitor }
}
func (m *Model) recomputeNeedForGlobal() {
var toDelete []File var toDelete []File
var toAdd []addOrder var toAdd []addOrder
m.gmut.RLock() m.gmut.RLock()
for n, gf := range m.global { for _, gf := range m.global {
m.lmut.RLock() toAdd, toDelete = m.recomputeNeedForFile(gf, toAdd, toDelete)
lf, ok := m.local[n]
m.lmut.RUnlock()
if !ok || gf.NewerThan(lf) {
if gf.Flags&protocol.FlagInvalid != 0 {
// Never attempt to sync invalid files
continue
}
if gf.Flags&protocol.FlagDeleted != 0 && !m.delete {
// Don't want to delete files, so forget this need
continue
}
if gf.Flags&protocol.FlagDeleted != 0 && !ok {
// Don't have the file, so don't need to delete it
continue
}
if m.trace["need"] {
log.Printf("NEED: lf:%v gf:%v", lf, gf)
}
if gf.Flags&protocol.FlagDeleted != 0 {
toDelete = append(toDelete, gf)
} else {
local, remote := BlockDiff(lf.Blocks, gf.Blocks)
fm := fileMonitor{
name: n,
path: path.Clean(path.Join(m.dir, n)),
global: gf,
model: m,
localBlocks: local,
}
toAdd = append(toAdd, addOrder{n, remote, &fm})
}
}
} }
m.gmut.RUnlock() m.gmut.RUnlock()
@ -762,6 +757,66 @@ func (m *Model) recomputeNeed() {
} }
} }
func (m *Model) recomputeNeedForFiles(files []File) {
var toDelete []File
var toAdd []addOrder
m.gmut.RLock()
for _, gf := range files {
toAdd, toDelete = m.recomputeNeedForFile(gf, toAdd, toDelete)
}
m.gmut.RUnlock()
for _, ao := range toAdd {
m.fq.Add(ao.n, ao.remote, ao.fm)
}
for _, gf := range toDelete {
m.dq <- gf
}
}
func (m *Model) recomputeNeedForFile(gf File, toAdd []addOrder, toDelete []File) ([]addOrder, []File) {
m.lmut.RLock()
lf, ok := m.local[gf.Name]
m.lmut.RUnlock()
if !ok || gf.NewerThan(lf) {
if gf.Flags&protocol.FlagInvalid != 0 {
// Never attempt to sync invalid files
return toAdd, toDelete
}
if gf.Flags&protocol.FlagDeleted != 0 && !m.delete {
// Don't want to delete files, so forget this need
return toAdd, toDelete
}
if gf.Flags&protocol.FlagDeleted != 0 && !ok {
// Don't have the file, so don't need to delete it
return toAdd, toDelete
}
if m.trace["need"] {
log.Printf("NEED: lf:%v gf:%v", lf, gf)
}
if gf.Flags&protocol.FlagDeleted != 0 {
toDelete = append(toDelete, gf)
} else {
local, remote := BlockDiff(lf.Blocks, gf.Blocks)
fm := fileMonitor{
name: gf.Name,
path: path.Clean(path.Join(m.dir, gf.Name)),
global: gf,
model: m,
localBlocks: local,
}
toAdd = append(toAdd, addOrder{gf.Name, remote, &fm})
}
}
return toAdd, toDelete
}
func (m *Model) WhoHas(name string) []string { func (m *Model) WhoHas(name string) []string {
var remote []string var remote []string