From 4d368a37e229a7acbf86bc80f79e63a7201c406d Mon Sep 17 00:00:00 2001 From: Simon Frei Date: Mon, 25 Nov 2019 11:07:36 +0100 Subject: [PATCH] lib/model, lib/protocol: Add contexts sending indexes and download-progress (#6176) --- lib/model/fakeconns_test.go | 12 +++++----- lib/model/model.go | 10 ++++---- lib/model/progressemitter.go | 8 +++---- lib/model/progressemitter_test.go | 3 ++- lib/model/requests_test.go | 38 +++++++++++++++---------------- lib/protocol/protocol.go | 18 +++++++-------- lib/protocol/protocol_test.go | 10 ++++---- lib/protocol/wireformat.go | 8 +++---- 8 files changed, 55 insertions(+), 52 deletions(-) diff --git a/lib/model/fakeconns_test.go b/lib/model/fakeconns_test.go index b19939b5c..e0ece9130 100644 --- a/lib/model/fakeconns_test.go +++ b/lib/model/fakeconns_test.go @@ -32,7 +32,7 @@ type fakeConnection struct { fileData map[string][]byte folder string model *model - indexFn func(string, []protocol.FileInfo) + indexFn func(context.Context, string, []protocol.FileInfo) requestFn func(ctx context.Context, folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) closeFn func(error) mut sync.Mutex @@ -64,20 +64,20 @@ func (f *fakeConnection) Option(string) string { return "" } -func (f *fakeConnection) Index(folder string, fs []protocol.FileInfo) error { +func (f *fakeConnection) Index(ctx context.Context, folder string, fs []protocol.FileInfo) error { f.mut.Lock() defer f.mut.Unlock() if f.indexFn != nil { - f.indexFn(folder, fs) + f.indexFn(ctx, folder, fs) } return nil } -func (f *fakeConnection) IndexUpdate(folder string, fs []protocol.FileInfo) error { +func (f *fakeConnection) IndexUpdate(ctx context.Context, folder string, fs []protocol.FileInfo) error { f.mut.Lock() defer f.mut.Unlock() if f.indexFn != nil { - f.indexFn(folder, fs) + f.indexFn(ctx, folder, fs) } return nil } @@ -109,7 +109,7 @@ func (f *fakeConnection) Statistics() protocol.Statistics { return protocol.Statistics{} } -func (f *fakeConnection) DownloadProgress(folder string, updates []protocol.FileDownloadProgressUpdate) { +func (f *fakeConnection) DownloadProgress(_ context.Context, folder string, updates []protocol.FileDownloadProgressUpdate) { f.downloadProgressMessages = append(f.downloadProgressMessages, downloadProgressMessage{ folder: folder, updates: updates, diff --git a/lib/model/model.go b/lib/model/model.go index 10a3f69cf..36843fcb6 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -1990,7 +1990,7 @@ func (s *indexSender) serve(ctx context.Context) { defer l.Debugf("Exiting indexSender for %s to %s at %s: %v", s.folder, s.dev, s.conn, err) // We need to send one index, regardless of whether there is something to send or not - err = s.sendIndexTo() + err = s.sendIndexTo(ctx) // Subscribe to LocalIndexUpdated (we have new information to send) and // DeviceDisconnected (it might be us who disconnected, so we should @@ -2028,7 +2028,7 @@ func (s *indexSender) serve(ctx context.Context) { continue } - err = s.sendIndexTo() + err = s.sendIndexTo(ctx) // Wait a short amount of time before entering the next loop. If there // are continuous changes happening to the local index, this gives us @@ -2046,16 +2046,16 @@ func (s *indexSender) Complete() bool { return true } // sendIndexTo sends file infos with a sequence number higher than prevSequence and // returns the highest sent sequence number. -func (s *indexSender) sendIndexTo() error { +func (s *indexSender) sendIndexTo(ctx context.Context) error { initial := s.prevSequence == 0 batch := newFileInfoBatch(nil) batch.flushFn = func(fs []protocol.FileInfo) error { l.Debugf("%v: Sending %d files (<%d bytes)", s, len(batch.infos), batch.size) if initial { initial = false - return s.conn.Index(s.folder, fs) + return s.conn.Index(ctx, s.folder, fs) } - return s.conn.IndexUpdate(s.folder, fs) + return s.conn.IndexUpdate(ctx, s.folder, fs) } var err error diff --git a/lib/model/progressemitter.go b/lib/model/progressemitter.go index 6c2800676..5c4ce1c47 100644 --- a/lib/model/progressemitter.go +++ b/lib/model/progressemitter.go @@ -85,7 +85,7 @@ func (t *ProgressEmitter) serve(ctx context.Context) { lastCount = newCount t.sendDownloadProgressEventLocked() if len(t.connections) > 0 { - t.sendDownloadProgressMessagesLocked() + t.sendDownloadProgressMessagesLocked(ctx) } } else { l.Debugln("progress emitter: nothing new") @@ -114,7 +114,7 @@ func (t *ProgressEmitter) sendDownloadProgressEventLocked() { l.Debugf("progress emitter: emitting %#v", output) } -func (t *ProgressEmitter) sendDownloadProgressMessagesLocked() { +func (t *ProgressEmitter) sendDownloadProgressMessagesLocked(ctx context.Context) { for id, conn := range t.connections { for _, folder := range t.foldersByConns[id] { pullers, ok := t.registry[folder] @@ -145,7 +145,7 @@ func (t *ProgressEmitter) sendDownloadProgressMessagesLocked() { updates := state.update(folder, activePullers) if len(updates) > 0 { - conn.DownloadProgress(folder, updates) + conn.DownloadProgress(ctx, folder, updates) } } } @@ -311,7 +311,7 @@ func (t *ProgressEmitter) clearLocked() { } for _, folder := range state.folders() { if updates := state.cleanup(folder); len(updates) > 0 { - conn.DownloadProgress(folder, updates) + conn.DownloadProgress(context.Background(), folder, updates) } } } diff --git a/lib/model/progressemitter_test.go b/lib/model/progressemitter_test.go index 3ed9f47e3..4ecca5aba 100644 --- a/lib/model/progressemitter_test.go +++ b/lib/model/progressemitter_test.go @@ -7,6 +7,7 @@ package model import ( + "context" "fmt" "os" "path/filepath" @@ -461,5 +462,5 @@ func TestSendDownloadProgressMessages(t *testing.T) { func sendMsgs(p *ProgressEmitter) { p.mut.Lock() defer p.mut.Unlock() - p.sendDownloadProgressMessagesLocked() + p.sendDownloadProgressMessagesLocked(context.Background()) } diff --git a/lib/model/requests_test.go b/lib/model/requests_test.go index 38c95bd03..f20b0856f 100644 --- a/lib/model/requests_test.go +++ b/lib/model/requests_test.go @@ -38,7 +38,7 @@ func TestRequestSimple(t *testing.T) { // the expected test file. done := make(chan struct{}) fc.mut.Lock() - fc.indexFn = func(folder string, fs []protocol.FileInfo) { + fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) { select { case <-done: t.Error("More than one index update sent") @@ -80,7 +80,7 @@ func TestSymlinkTraversalRead(t *testing.T) { // the expected test file. done := make(chan struct{}) fc.mut.Lock() - fc.indexFn = func(folder string, fs []protocol.FileInfo) { + fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) { select { case <-done: t.Error("More than one index update sent") @@ -125,7 +125,7 @@ func TestSymlinkTraversalWrite(t *testing.T) { badReq := make(chan string, 1) badIdx := make(chan string, 1) fc.mut.Lock() - fc.indexFn = func(folder string, fs []protocol.FileInfo) { + fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) { for _, f := range fs { if f.Name == "symlink" { done <- struct{}{} @@ -183,7 +183,7 @@ func TestRequestCreateTmpSymlink(t *testing.T) { goodIdx := make(chan struct{}) name := fs.TempName("testlink") fc.mut.Lock() - fc.indexFn = func(folder string, fs []protocol.FileInfo) { + fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) { for _, f := range fs { if f.Name == name { if f.IsInvalid() { @@ -240,7 +240,7 @@ func TestRequestVersioningSymlinkAttack(t *testing.T) { // the expected test file. idx := make(chan int) fc.mut.Lock() - fc.indexFn = func(folder string, fs []protocol.FileInfo) { + fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) { idx <- len(fs) } fc.mut.Unlock() @@ -339,7 +339,7 @@ func pullInvalidIgnored(t *testing.T, ft config.FolderType) { done := make(chan struct{}) fc.mut.Lock() - fc.indexFn = func(folder string, fs []protocol.FileInfo) { + fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) { expected := map[string]struct{}{invIgn: {}, ign: {}, ignExisting: {}} for _, f := range fs { if _, ok := expected[f.Name]; !ok { @@ -375,7 +375,7 @@ func pullInvalidIgnored(t *testing.T, ft config.FolderType) { // The indexes will normally arrive in one update, but it is possible // that they arrive in separate ones. fc.mut.Lock() - fc.indexFn = func(folder string, fs []protocol.FileInfo) { + fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) { for _, f := range fs { if _, ok := expected[f.Name]; !ok { t.Errorf("Unexpected file %v was updated in index", f.Name) @@ -434,7 +434,7 @@ func TestIssue4841(t *testing.T) { received := make(chan []protocol.FileInfo) fc.mut.Lock() - fc.indexFn = func(_ string, fs []protocol.FileInfo) { + fc.indexFn = func(_ context.Context, _ string, fs []protocol.FileInfo) { received <- fs } fc.mut.Unlock() @@ -483,7 +483,7 @@ func TestRescanIfHaveInvalidContent(t *testing.T) { received := make(chan []protocol.FileInfo) fc.mut.Lock() - fc.indexFn = func(_ string, fs []protocol.FileInfo) { + fc.indexFn = func(_ context.Context, _ string, fs []protocol.FileInfo) { received <- fs } fc.mut.Unlock() @@ -550,7 +550,7 @@ func TestParentDeletion(t *testing.T) { fc.addFile(parent, 0777, protocol.FileInfoTypeDirectory, nil) fc.addFile(child, 0777, protocol.FileInfoTypeDirectory, nil) fc.mut.Lock() - fc.indexFn = func(folder string, fs []protocol.FileInfo) { + fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) { received <- fs } fc.mut.Unlock() @@ -623,7 +623,7 @@ func TestRequestSymlinkWindows(t *testing.T) { received := make(chan []protocol.FileInfo) fc.mut.Lock() - fc.indexFn = func(folder string, fs []protocol.FileInfo) { + fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) { select { case <-received: t.Error("More than one index update sent") @@ -693,7 +693,7 @@ func TestRequestRemoteRenameChanged(t *testing.T) { received := make(chan []protocol.FileInfo) fc.mut.Lock() - fc.indexFn = func(folder string, fs []protocol.FileInfo) { + fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) { select { case <-received: t.Error("More than one index update sent") @@ -733,7 +733,7 @@ func TestRequestRemoteRenameChanged(t *testing.T) { bFinalVersion := bIntermediateVersion.Copy().Update(fc.id.Short()) done := make(chan struct{}) fc.mut.Lock() - fc.indexFn = func(folder string, fs []protocol.FileInfo) { + fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) { select { case <-done: t.Error("Received more index updates than expected") @@ -834,7 +834,7 @@ func TestRequestRemoteRenameConflict(t *testing.T) { recv := make(chan int) fc.mut.Lock() - fc.indexFn = func(folder string, fs []protocol.FileInfo) { + fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) { recv <- len(fs) } fc.mut.Unlock() @@ -924,7 +924,7 @@ func TestRequestDeleteChanged(t *testing.T) { done := make(chan struct{}) fc.mut.Lock() - fc.indexFn = func(folder string, fs []protocol.FileInfo) { + fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) { select { case <-done: t.Error("More than one index update sent") @@ -947,7 +947,7 @@ func TestRequestDeleteChanged(t *testing.T) { } fc.mut.Lock() - fc.indexFn = func(folder string, fs []protocol.FileInfo) { + fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) { select { case <-done: t.Error("More than one index update sent") @@ -1069,7 +1069,7 @@ func TestIgnoreDeleteUnignore(t *testing.T) { done := make(chan struct{}) fc.mut.Lock() - fc.indexFn = func(folder string, fs []protocol.FileInfo) { + fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) { basicCheck(fs) close(done) } @@ -1088,7 +1088,7 @@ func TestIgnoreDeleteUnignore(t *testing.T) { done = make(chan struct{}) fc.mut.Lock() - fc.indexFn = func(folder string, fs []protocol.FileInfo) { + fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) { basicCheck(fs) f := fs[0] if !f.IsInvalid() { @@ -1110,7 +1110,7 @@ func TestIgnoreDeleteUnignore(t *testing.T) { done = make(chan struct{}) fc.mut.Lock() - fc.indexFn = func(folder string, fs []protocol.FileInfo) { + fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) { basicCheck(fs) f := fs[0] if f.IsInvalid() { diff --git a/lib/protocol/protocol.go b/lib/protocol/protocol.go index d08fdbeb1..1f7e9a071 100644 --- a/lib/protocol/protocol.go +++ b/lib/protocol/protocol.go @@ -135,11 +135,11 @@ type Connection interface { Close(err error) ID() DeviceID Name() string - Index(folder string, files []FileInfo) error - IndexUpdate(folder string, files []FileInfo) error + Index(ctx context.Context, folder string, files []FileInfo) error + IndexUpdate(ctx context.Context, folder string, files []FileInfo) error Request(ctx context.Context, folder string, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) ClusterConfig(config ClusterConfig) - DownloadProgress(folder string, updates []FileDownloadProgressUpdate) + DownloadProgress(ctx context.Context, folder string, updates []FileDownloadProgressUpdate) Statistics() Statistics Closed() bool } @@ -249,14 +249,14 @@ func (c *rawConnection) Name() string { } // Index writes the list of file information to the connected peer device -func (c *rawConnection) Index(folder string, idx []FileInfo) error { +func (c *rawConnection) Index(ctx context.Context, folder string, idx []FileInfo) error { select { case <-c.closed: return ErrClosed default: } c.idxMut.Lock() - c.send(context.TODO(), &Index{ + c.send(ctx, &Index{ Folder: folder, Files: idx, }, nil) @@ -265,14 +265,14 @@ func (c *rawConnection) Index(folder string, idx []FileInfo) error { } // IndexUpdate writes the list of file information to the connected peer device as an update -func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo) error { +func (c *rawConnection) IndexUpdate(ctx context.Context, folder string, idx []FileInfo) error { select { case <-c.closed: return ErrClosed default: } c.idxMut.Lock() - c.send(context.TODO(), &IndexUpdate{ + c.send(ctx, &IndexUpdate{ Folder: folder, Files: idx, }, nil) @@ -340,8 +340,8 @@ func (c *rawConnection) Closed() bool { } // DownloadProgress sends the progress updates for the files that are currently being downloaded. -func (c *rawConnection) DownloadProgress(folder string, updates []FileDownloadProgressUpdate) { - c.send(context.TODO(), &DownloadProgress{ +func (c *rawConnection) DownloadProgress(ctx context.Context, folder string, updates []FileDownloadProgressUpdate) { + c.send(ctx, &DownloadProgress{ Folder: folder, Updates: updates, }, nil) diff --git a/lib/protocol/protocol_test.go b/lib/protocol/protocol_test.go index 3e0482710..5b4ebd9f5 100644 --- a/lib/protocol/protocol_test.go +++ b/lib/protocol/protocol_test.go @@ -75,10 +75,12 @@ func TestClose(t *testing.T) { t.Error("Ping should not return true") } - c0.Index("default", nil) - c0.Index("default", nil) + ctx := context.Background() - if _, err := c0.Request(context.Background(), "default", "foo", 0, 0, nil, 0, false); err == nil { + c0.Index(ctx, "default", nil) + c0.Index(ctx, "default", nil) + + if _, err := c0.Request(ctx, "default", "foo", 0, 0, nil, 0, false); err == nil { t.Error("Request should return an error") } } @@ -152,7 +154,7 @@ func TestCloseRace(t *testing.T) { c0.ClusterConfig(ClusterConfig{}) c1.ClusterConfig(ClusterConfig{}) - c1.Index("default", nil) + c1.Index(context.Background(), "default", nil) select { case <-indexReceived: case <-time.After(time.Second): diff --git a/lib/protocol/wireformat.go b/lib/protocol/wireformat.go index 7da80a12a..e32d0b0d0 100644 --- a/lib/protocol/wireformat.go +++ b/lib/protocol/wireformat.go @@ -13,7 +13,7 @@ type wireFormatConnection struct { Connection } -func (c wireFormatConnection) Index(folder string, fs []FileInfo) error { +func (c wireFormatConnection) Index(ctx context.Context, folder string, fs []FileInfo) error { var myFs = make([]FileInfo, len(fs)) copy(myFs, fs) @@ -21,10 +21,10 @@ func (c wireFormatConnection) Index(folder string, fs []FileInfo) error { myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name)) } - return c.Connection.Index(folder, myFs) + return c.Connection.Index(ctx, folder, myFs) } -func (c wireFormatConnection) IndexUpdate(folder string, fs []FileInfo) error { +func (c wireFormatConnection) IndexUpdate(ctx context.Context, folder string, fs []FileInfo) error { var myFs = make([]FileInfo, len(fs)) copy(myFs, fs) @@ -32,7 +32,7 @@ func (c wireFormatConnection) IndexUpdate(folder string, fs []FileInfo) error { myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name)) } - return c.Connection.IndexUpdate(folder, myFs) + return c.Connection.IndexUpdate(ctx, folder, myFs) } func (c wireFormatConnection) Request(ctx context.Context, folder string, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {