From 5087d02fba86589a6d40666fd659d5c58a47cc78 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Thu, 24 Jul 2014 09:38:16 +0200 Subject: [PATCH] Faster puller loop --- model/blockqueue.go | 62 +++----------------- model/model.go | 6 +- model/puller.go | 138 ++++++++++++++++++++++++-------------------- 3 files changed, 89 insertions(+), 117 deletions(-) diff --git a/model/blockqueue.go b/model/blockqueue.go index d84d8bd9e..d4ae6c73d 100644 --- a/model/blockqueue.go +++ b/model/blockqueue.go @@ -4,11 +4,7 @@ package model -import ( - "sync" - - "github.com/calmh/syncthing/protocol" -) +import "github.com/calmh/syncthing/protocol" type bqAdd struct { file protocol.FileInfo @@ -25,27 +21,10 @@ type bqBlock struct { } type blockQueue struct { - inbox chan bqAdd - outbox chan bqBlock - queued []bqBlock - - mut sync.Mutex } -func newBlockQueue() *blockQueue { - q := &blockQueue{ - inbox: make(chan bqAdd), - outbox: make(chan bqBlock), - } - go q.run() - return q -} - -func (q *blockQueue) addBlock(a bqAdd) { - q.mut.Lock() - defer q.mut.Unlock() - +func (q *blockQueue) put(a bqAdd) { // If we already have it queued, return for _, b := range q.queued { if b.file.Name == a.file.Name { @@ -84,36 +63,11 @@ func (q *blockQueue) addBlock(a bqAdd) { } } -func (q *blockQueue) run() { - for { - if len(q.queued) == 0 { - q.addBlock(<-q.inbox) - } else { - q.mut.Lock() - next := q.queued[0] - q.mut.Unlock() - select { - case a := <-q.inbox: - q.addBlock(a) - case q.outbox <- next: - q.mut.Lock() - q.queued = q.queued[1:] - q.mut.Unlock() - } - } +func (q *blockQueue) get() (bqBlock, bool) { + if len(q.queued) == 0 { + return bqBlock{}, false } -} - -func (q *blockQueue) put(a bqAdd) { - q.inbox <- a -} - -func (q *blockQueue) get() bqBlock { - return <-q.outbox -} - -func (q *blockQueue) empty() bool { - q.mut.Lock() - defer q.mut.Unlock() - return len(q.queued) == 0 + b := q.queued[0] + q.queued = q.queued[1:] + return b, true } diff --git a/model/model.go b/model/model.go index 1e468904c..0b7d49480 100644 --- a/model/model.go +++ b/model/model.go @@ -584,8 +584,10 @@ func sendIndexes(conn protocol.Connection, repo string, fs *files.Set) { }() for err == nil { - if !initial && fs.LocalVersion(protocol.LocalNodeID) <= minLocalVer { - time.Sleep(1 * time.Second) + if !initial { + time.Sleep(5 * time.Second) + } + if fs.LocalVersion(protocol.LocalNodeID) <= minLocalVer { continue } diff --git a/model/puller.go b/model/puller.go index 041b91d9e..af8d4bfb8 100644 --- a/model/puller.go +++ b/model/puller.go @@ -63,7 +63,8 @@ var errNoNode = errors.New("no available source node") type puller struct { cfg *config.Configuration repoCfg config.RepositoryConfiguration - bq *blockQueue + bq blockQueue + slots int model *Model oustandingPerNode activityMap openFiles map[string]openFile @@ -75,9 +76,9 @@ type puller struct { func newPuller(repoCfg config.RepositoryConfiguration, model *Model, slots int, cfg *config.Configuration) *puller { p := &puller{ - repoCfg: repoCfg, cfg: cfg, - bq: newBlockQueue(), + repoCfg: repoCfg, + slots: slots, model: model, oustandingPerNode: make(activityMap), openFiles: make(map[string]openFile), @@ -96,9 +97,6 @@ func newPuller(repoCfg config.RepositoryConfiguration, model *Model, slots int, if slots > 0 { // Read/write - for i := 0; i < slots; i++ { - p.requestSlots <- true - } if debug { l.Debugf("starting puller; repo %q dir %q slots %d", repoCfg.ID, repoCfg.Directory, slots) } @@ -114,57 +112,70 @@ func newPuller(repoCfg config.RepositoryConfiguration, model *Model, slots int, } func (p *puller) run() { - go func() { - // fill blocks queue when there are free slots - for { - <-p.requestSlots - b := p.bq.get() - if debug { - l.Debugf("filler: queueing %q / %q offset %d copy %d", p.repoCfg.ID, b.file.Name, b.block.Offset, len(b.copy)) - } - p.blocks <- b - } - }() - - timeout := time.Tick(5 * time.Second) changed := true scanintv := time.Duration(p.cfg.Options.RescanIntervalS) * time.Second lastscan := time.Now() var prevVer uint64 + var queued int + + // Load up the request slots + for i := 0; i < cap(p.requestSlots); i++ { + p.requestSlots <- true + } for { // Run the pulling loop as long as there are blocks to fetch - pull: - for { - select { - case res := <-p.requestResults: - p.model.setState(p.repoCfg.ID, RepoSyncing) - changed = true - p.requestSlots <- true - p.handleRequestResult(res) - case b := <-p.blocks: - p.model.setState(p.repoCfg.ID, RepoSyncing) - changed = true - if p.handleBlock(b) { - // Block was fully handled, free up the slot + prevVer, queued = p.queueNeededBlocks(prevVer) + if queued > 0 { + + pull: + for { + select { + case res := <-p.requestResults: + p.model.setState(p.repoCfg.ID, RepoSyncing) + changed = true p.requestSlots <- true - } + p.handleRequestResult(res) - case <-timeout: - if len(p.openFiles) == 0 && p.bq.empty() { - // Nothing more to do for the moment - break pull - } - if debug { - l.Debugf("%q: idle but have %d open files", p.repoCfg.ID, len(p.openFiles)) - i := 5 - for _, f := range p.openFiles { - l.Debugf(" %v", f) - i-- - if i == 0 { - break + case <-p.requestSlots: + b, ok := p.bq.get() + + if !ok { + if debug { + l.Debugf("%q: pulling loop needs more blocks", p.repoCfg.ID) } + prevVer, _ = p.queueNeededBlocks(prevVer) + b, ok = p.bq.get() + } + + if !ok && len(p.openFiles) == 0 { + // Nothing queued, nothing outstanding + if debug { + l.Debugf("%q: pulling loop done", p.repoCfg.ID) + } + break pull + } + + if !ok { + // Nothing queued, but there are still open files. + // Give the situation a moment to change. + if debug { + l.Debugf("%q: pulling loop paused", p.repoCfg.ID) + } + p.requestSlots <- true + time.Sleep(100 * time.Millisecond) + continue pull + } + + if debug { + l.Debugf("queueing %q / %q offset %d copy %d", p.repoCfg.ID, b.file.Name, b.block.Offset, len(b.copy)) + } + p.model.setState(p.repoCfg.ID, RepoSyncing) + changed = true + if p.handleBlock(b) { + // Block was fully handled, free up the slot + p.requestSlots <- true } } } @@ -192,19 +203,7 @@ func (p *puller) run() { lastscan = time.Now() } - if v := p.model.LocalVersion(p.repoCfg.ID); v != prevVer { - if debug { - l.Debugf("%q: checking for more needed blocks", p.repoCfg.ID) - } - // Queue more blocks to fetch, if any - if p.queueNeededBlocks() == 0 { - if debug { - l.Debugf("%q: no more needed blocks", p.repoCfg.ID) - } - // We've fetched all blocks we need - prevVer = v - } - } + time.Sleep(5 * time.Second) } } @@ -620,9 +619,21 @@ func (p *puller) handleEmptyBlock(b bqBlock) { delete(p.openFiles, f.Name) } -func (p *puller) queueNeededBlocks() int { +func (p *puller) queueNeededBlocks(prevVer uint64) (uint64, int) { + curVer := p.model.LocalVersion(p.repoCfg.ID) + if curVer == prevVer { + return curVer, 0 + } + + if debug { + l.Debugf("%q: checking for more needed blocks", p.repoCfg.ID) + } + queued := 0 for _, f := range p.model.NeedFilesRepo(p.repoCfg.ID) { + if _, ok := p.openFiles[f.Name]; ok { + continue + } lf := p.model.CurrentRepoFile(p.repoCfg.ID, f.Name) have, need := scanner.BlockDiff(lf.Blocks, f.Blocks) if debug { @@ -638,7 +649,12 @@ func (p *puller) queueNeededBlocks() int { if debug && queued > 0 { l.Debugf("%q: queued %d items", p.repoCfg.ID, queued) } - return queued + + if queued > 0 { + return prevVer, queued + } else { + return curVer, 0 + } } func (p *puller) closeFile(f protocol.FileInfo) {