From 68c1b2dd472f18c3c37bb188fdb106df55dea927 Mon Sep 17 00:00:00 2001 From: Simon Frei Date: Wed, 14 Feb 2018 08:59:46 +0100 Subject: [PATCH] all: Revert simultaneously walk fs and db on scan (fixes #4756) (#4757) This reverts commit 6d3f9d5154b4c6b39f8083255cc9ed36b5c51146. --- lib/db/set.go | 6 +- lib/fs/walkfs.go | 6 +- lib/model/model.go | 152 ++++++++----- lib/model/model_test.go | 179 --------------- lib/protocol/bep_extensions.go | 21 -- lib/protocol/protocol_test.go | 50 +---- lib/scanner/blockqueue.go | 18 +- lib/scanner/walk.go | 398 ++++++++++----------------------- lib/scanner/walk_test.go | 111 ++------- 9 files changed, 261 insertions(+), 680 deletions(-) diff --git a/lib/db/set.go b/lib/db/set.go index 71c1a19fd..afaa36494 100644 --- a/lib/db/set.go +++ b/lib/db/set.go @@ -194,9 +194,9 @@ func (s *FileSet) WithHaveTruncated(device protocol.DeviceID, fn Iterator) { s.db.withHave([]byte(s.folder), device[:], nil, true, nativeFileIterator(fn)) } -func (s *FileSet) WithPrefixedHave(device protocol.DeviceID, prefix string, fn Iterator) { - l.Debugf("%s WithPrefixedHave(%v)", s.folder, device) - s.db.withHave([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(prefix)), false, nativeFileIterator(fn)) +func (s *FileSet) WithPrefixedHaveTruncated(device protocol.DeviceID, prefix string, fn Iterator) { + l.Debugf("%s WithPrefixedHaveTruncated(%v)", s.folder, device) + s.db.withHave([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(prefix)), true, nativeFileIterator(fn)) } func (s *FileSet) WithGlobal(fn Iterator) { l.Debugf("%s WithGlobal()", s.folder) diff --git a/lib/fs/walkfs.go b/lib/fs/walkfs.go index d4fa391f6..005c7e262 100644 --- a/lib/fs/walkfs.go +++ b/lib/fs/walkfs.go @@ -10,10 +10,7 @@ package fs -import ( - "path/filepath" - "sort" -) +import "path/filepath" // WalkFunc is the type of the function called for each file or directory // visited by Walk. The path argument contains the argument to Walk as a @@ -57,7 +54,6 @@ func (f *walkFilesystem) walk(path string, info FileInfo, walkFn WalkFunc) error if err != nil { return walkFn(path, info, err) } - sort.Strings(names) for _, name := range names { filename := filepath.Join(path, name) diff --git a/lib/model/model.go b/lib/model/model.go index b14809ea8..304f33e43 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -1395,21 +1395,14 @@ func (m *Model) CurrentGlobalFile(folder string, file string) (protocol.FileInfo return fs.GetGlobal(file) } -type haveWalker struct { - fset *db.FileSet +type cFiler struct { + m *Model + r string } -func (h haveWalker) Walk(prefix string, ctx context.Context, out chan<- protocol.FileInfo) { - ctxChan := ctx.Done() - h.fset.WithPrefixedHave(protocol.LocalDeviceID, prefix, func(fi db.FileIntf) bool { - f := fi.(protocol.FileInfo) - select { - case out <- f: - case <-ctxChan: - return false - } - return true - }) +// Implements scanner.CurrentFiler +func (cf cFiler) CurrentFile(file string) (protocol.FileInfo, bool) { + return cf.m.CurrentFolderFile(cf.r, file) } // Connection returns the current connection for device, and a boolean wether a connection was found. @@ -1962,14 +1955,13 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su runner.setState(FolderScanning) - haveWalker := haveWalker{fset} - rchan := scanner.Walk(ctx, scanner.Config{ + fchan := scanner.Walk(ctx, scanner.Config{ Folder: folderCfg.ID, Subs: subDirs, Matcher: ignores, BlockSize: protocol.BlockSize, TempLifetime: time.Duration(m.cfg.Options().KeepTemporariesH) * time.Hour, - Have: haveWalker, + CurrentFiler: cFiler{m, folder}, Filesystem: mtimefs, IgnorePerms: folderCfg.IgnorePerms, AutoNormalize: folderCfg.AutoNormalize, @@ -1986,17 +1978,6 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su batch := make([]protocol.FileInfo, 0, maxBatchSizeFiles) batchSizeBytes := 0 changes := 0 - checkBatch := func() error { - if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes { - if err := runner.CheckHealth(); err != nil { - return err - } - m.updateLocalsFromScanning(folder, batch) - batch = batch[:0] - batchSizeBytes = 0 - } - return nil - } // Schedule a pull after scanning, but only if we actually detected any // changes. @@ -2006,49 +1987,98 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su } }() - var delDirStack []protocol.FileInfo - for r := range rchan { - if err := checkBatch(); err != nil { - l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), err) - return err - } - - // Append deleted dirs from stack if the current file isn't a child, - // which means all children were already processed. - for len(delDirStack) != 0 && !strings.HasPrefix(r.New.Name, delDirStack[len(delDirStack)-1].Name+string(fs.PathSeparator)) { - lastDelDir := delDirStack[len(delDirStack)-1] - batch = append(batch, lastDelDir) - batchSizeBytes += lastDelDir.ProtoSize() - changes++ - if err := checkBatch(); err != nil { + for f := range fchan { + if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes { + if err := runner.CheckHealth(); err != nil { l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), err) return err } - delDirStack = delDirStack[:len(delDirStack)-1] + m.updateLocalsFromScanning(folder, batch) + batch = batch[:0] + batchSizeBytes = 0 } - // Delay appending deleted dirs until all its children are processed - if r.Old.IsDirectory() && (r.New.Deleted || !r.New.IsDirectory()) { - delDirStack = append(delDirStack, r.New) - continue - } - - l.Debugln("Appending", r) - batch = append(batch, r.New) - batchSizeBytes += r.New.ProtoSize() + batch = append(batch, f) + batchSizeBytes += f.ProtoSize() changes++ } - // Append remaining deleted dirs. - for i := len(delDirStack) - 1; i >= 0; i-- { - if err := checkBatch(); err != nil { - l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), err) - return err - } + if err := runner.CheckHealth(); err != nil { + l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), err) + return err + } else if len(batch) > 0 { + m.updateLocalsFromScanning(folder, batch) + } - batch = append(batch, delDirStack[i]) - batchSizeBytes += delDirStack[i].ProtoSize() - changes++ + if len(subDirs) == 0 { + // If we have no specific subdirectories to traverse, set it to one + // empty prefix so we traverse the entire folder contents once. + subDirs = []string{""} + } + + // Do a scan of the database for each prefix, to check for deleted and + // ignored files. + batch = batch[:0] + batchSizeBytes = 0 + for _, sub := range subDirs { + var iterError error + + fset.WithPrefixedHaveTruncated(protocol.LocalDeviceID, sub, func(fi db.FileIntf) bool { + f := fi.(db.FileInfoTruncated) + if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes { + if err := runner.CheckHealth(); err != nil { + iterError = err + return false + } + m.updateLocalsFromScanning(folder, batch) + batch = batch[:0] + batchSizeBytes = 0 + } + + switch { + case !f.IsInvalid() && ignores.Match(f.Name).IsIgnored(): + // File was valid at last pass but has been ignored. Set invalid bit. + l.Debugln("setting invalid bit on ignored", f) + nf := f.ConvertToInvalidFileInfo(m.id.Short()) + batch = append(batch, nf) + batchSizeBytes += nf.ProtoSize() + changes++ + + case !f.IsInvalid() && !f.IsDeleted(): + // The file is valid and not deleted. Lets check if it's + // still here. + + if _, err := mtimefs.Lstat(f.Name); err != nil { + // We don't specifically verify that the error is + // fs.IsNotExist because there is a corner case when a + // directory is suddenly transformed into a file. When that + // happens, files that were in the directory (that is now a + // file) are deleted but will return a confusing error ("not a + // directory") when we try to Lstat() them. + + nf := protocol.FileInfo{ + Name: f.Name, + Type: f.Type, + Size: 0, + ModifiedS: f.ModifiedS, + ModifiedNs: f.ModifiedNs, + ModifiedBy: m.id.Short(), + Deleted: true, + Version: f.Version.Update(m.shortID), + } + + batch = append(batch, nf) + batchSizeBytes += nf.ProtoSize() + changes++ + } + } + return true + }) + + if iterError != nil { + l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), iterError) + return iterError + } } if err := runner.CheckHealth(); err != nil { diff --git a/lib/model/model_test.go b/lib/model/model_test.go index be3ac0c50..309210c97 100644 --- a/lib/model/model_test.go +++ b/lib/model/model_test.go @@ -2808,185 +2808,6 @@ func TestNoRequestsFromPausedDevices(t *testing.T) { } } -// TestIssue2571 tests replacing a directory with content with a symlink -func TestIssue2571(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("Symlinks aren't supported by fs and scanner on windows") - } - err := defaultFs.MkdirAll("replaceDir", 0755) - if err != nil { - t.Fatal(err) - } - defer func() { - defaultFs.RemoveAll("replaceDir") - }() - - testFs := fs.NewFilesystem(fs.FilesystemTypeBasic, filepath.Join(defaultFs.URI(), "replaceDir")) - - for _, dir := range []string{"toLink", "linkTarget"} { - err := testFs.MkdirAll(dir, 0775) - if err != nil { - t.Fatal(err) - } - fd, err := testFs.Create(filepath.Join(dir, "a")) - if err != nil { - t.Fatal(err) - } - fd.Close() - } - - dbi := db.OpenMemory() - m := NewModel(defaultConfig, protocol.LocalDeviceID, "syncthing", "dev", dbi, nil) - m.AddFolder(defaultFolderConfig) - m.StartFolder("default") - m.ServeBackground() - defer m.Stop() - m.ScanFolder("default") - - if err = testFs.RemoveAll("toLink"); err != nil { - t.Fatal(err) - } - - if err := osutil.DebugSymlinkForTestsOnly(filepath.Join(testFs.URI(), "linkTarget"), filepath.Join(testFs.URI(), "toLink")); err != nil { - t.Fatal(err) - } - - m.ScanFolder("default") - - if dir, ok := m.CurrentFolderFile("default", filepath.Join("replaceDir", "toLink")); !ok { - t.Fatalf("Dir missing in db") - } else if !dir.IsSymlink() { - t.Errorf("Dir wasn't changed to symlink") - } - if file, ok := m.CurrentFolderFile("default", filepath.Join("replaceDir", "toLink", "a")); !ok { - t.Fatalf("File missing in db") - } else if !file.Deleted { - t.Errorf("File below symlink has not been marked as deleted") - } -} - -// TestIssue4573 tests that contents of an unavailable dir aren't marked deleted -func TestIssue4573(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("Can't make the dir inaccessible on windows") - } - - err := defaultFs.MkdirAll("inaccessible", 0755) - if err != nil { - t.Fatal(err) - } - defer func() { - defaultFs.Chmod("inaccessible", 0777) - defaultFs.RemoveAll("inaccessible") - }() - - file := filepath.Join("inaccessible", "a") - fd, err := defaultFs.Create(file) - if err != nil { - t.Fatal(err) - } - fd.Close() - - dbi := db.OpenMemory() - m := NewModel(defaultConfig, protocol.LocalDeviceID, "syncthing", "dev", dbi, nil) - m.AddFolder(defaultFolderConfig) - m.StartFolder("default") - m.ServeBackground() - defer m.Stop() - m.ScanFolder("default") - - err = defaultFs.Chmod("inaccessible", 0000) - if err != nil { - t.Fatal(err) - } - - m.ScanFolder("default") - - if file, ok := m.CurrentFolderFile("default", file); !ok { - t.Fatalf("File missing in db") - } else if file.Deleted { - t.Errorf("Inaccessible file has been marked as deleted.") - } -} - -// TestInternalScan checks whether various fs operations are correctly represented -// in the db after scanning. -func TestInternalScan(t *testing.T) { - err := defaultFs.MkdirAll("internalScan", 0755) - if err != nil { - t.Fatal(err) - } - defer func() { - defaultFs.RemoveAll("internalScan") - }() - - testFs := fs.NewFilesystem(fs.FilesystemTypeBasic, filepath.Join(defaultFs.URI(), "internalScan")) - - testCases := map[string]func(protocol.FileInfo) bool{ - "removeDir": func(f protocol.FileInfo) bool { - return !f.Deleted - }, - "dirToFile": func(f protocol.FileInfo) bool { - return f.Deleted || f.IsDirectory() - }, - } - - baseDirs := []string{"dirToFile", "removeDir"} - for _, dir := range baseDirs { - sub := filepath.Join(dir, "subDir") - for _, dir := range []string{dir, sub} { - err := testFs.MkdirAll(dir, 0775) - if err != nil { - t.Fatalf("%v: %v", dir, err) - } - } - testCases[sub] = func(f protocol.FileInfo) bool { - return !f.Deleted - } - for _, dir := range []string{dir, sub} { - file := filepath.Join(dir, "a") - fd, err := testFs.Create(file) - if err != nil { - t.Fatal(err) - } - fd.Close() - testCases[file] = func(f protocol.FileInfo) bool { - return !f.Deleted - } - } - } - - dbi := db.OpenMemory() - m := NewModel(defaultConfig, protocol.LocalDeviceID, "syncthing", "dev", dbi, nil) - m.AddFolder(defaultFolderConfig) - m.StartFolder("default") - m.ServeBackground() - defer m.Stop() - m.ScanFolder("default") - - for _, dir := range baseDirs { - if err = testFs.RemoveAll(dir); err != nil { - t.Fatal(err) - } - } - - fd, err := testFs.Create("dirToFile") - if err != nil { - t.Fatal(err) - } - fd.Close() - - m.ScanFolder("default") - - for path, cond := range testCases { - if f, ok := m.CurrentFolderFile("default", filepath.Join("internalScan", path)); !ok { - t.Fatalf("%v missing in db", path) - } else if cond(f) { - t.Errorf("Incorrect db entry for %v", path) - } - } -} - func TestCustomMarkerName(t *testing.T) { ldb := db.OpenMemory() set := db.NewFileSet("default", defaultFs, ldb) diff --git a/lib/protocol/bep_extensions.go b/lib/protocol/bep_extensions.go index a3ed7d164..6dc93afdb 100644 --- a/lib/protocol/bep_extensions.go +++ b/lib/protocol/bep_extensions.go @@ -122,10 +122,6 @@ func (f FileInfo) WinsConflict(other FileInfo) bool { return f.Version.Compare(other.Version) == ConcurrentGreater } -func (f FileInfo) IsEmpty() bool { - return f.Version.Counters == nil -} - func (f *FileInfo) Invalidate(invalidatedBy ShortID) { f.Invalid = true f.ModifiedBy = invalidatedBy @@ -133,23 +129,6 @@ func (f *FileInfo) Invalidate(invalidatedBy ShortID) { f.Sequence = 0 } -func (f *FileInfo) InvalidatedCopy(invalidatedBy ShortID) FileInfo { - copy := *f - copy.Invalidate(invalidatedBy) - return copy -} - -func (f *FileInfo) DeletedCopy(deletedBy ShortID) FileInfo { - copy := *f - copy.Size = 0 - copy.ModifiedBy = deletedBy - copy.Deleted = true - copy.Version = f.Version.Update(deletedBy) - copy.Sequence = 0 - copy.Blocks = nil - return copy -} - func (b BlockInfo) String() string { return fmt.Sprintf("Block{%d/%d/%d/%x}", b.Offset, b.Size, b.WeakHash, b.Hash) } diff --git a/lib/protocol/protocol_test.go b/lib/protocol/protocol_test.go index 6f2249eab..f19710ba9 100644 --- a/lib/protocol/protocol_test.go +++ b/lib/protocol/protocol_test.go @@ -24,10 +24,6 @@ var ( quickCfg = &quick.Config{} ) -const ( - fileSize = 1 << 40 -) - func TestPing(t *testing.T) { ar, aw := io.Pipe() br, bw := io.Pipe() @@ -247,6 +243,12 @@ func TestMarshalledIndexMessageSize(t *testing.T) { return } + const ( + maxMessageSize = MaxMessageLen + fileSize = 1 << 40 + blockSize = BlockSize + ) + f := FileInfo{ Name: "a normal length file name withoout any weird stuff.txt", Type: FileInfoTypeFile, @@ -254,12 +256,12 @@ func TestMarshalledIndexMessageSize(t *testing.T) { Permissions: 0666, ModifiedS: time.Now().Unix(), Version: Vector{Counters: []Counter{{ID: 1 << 60, Value: 1}, {ID: 2 << 60, Value: 1}}}, - Blocks: make([]BlockInfo, fileSize/BlockSize), + Blocks: make([]BlockInfo, fileSize/blockSize), } - for i := 0; i < fileSize/BlockSize; i++ { - f.Blocks[i].Offset = int64(i) * BlockSize - f.Blocks[i].Size = BlockSize + for i := 0; i < fileSize/blockSize; i++ { + f.Blocks[i].Offset = int64(i) * blockSize + f.Blocks[i].Size = blockSize f.Blocks[i].Hash = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 20, 1, 2, 3, 4, 5, 6, 7, 8, 9, 30, 1, 2} } @@ -269,8 +271,8 @@ func TestMarshalledIndexMessageSize(t *testing.T) { } msgSize := idx.ProtoSize() - if msgSize > MaxMessageLen { - t.Errorf("Message size %d bytes is larger than max %d", msgSize, MaxMessageLen) + if msgSize > maxMessageSize { + t.Errorf("Message size %d bytes is larger than max %d", msgSize, maxMessageSize) } } @@ -398,31 +400,3 @@ func TestCheckConsistency(t *testing.T) { } } } - -func TestCopyFileInfo(t *testing.T) { - f := FileInfo{ - Name: "a normal length file name withoout any weird stuff.txt", - Type: FileInfoTypeFile, - Size: fileSize, - Permissions: 0666, - ModifiedS: time.Now().Unix(), - Version: Vector{Counters: []Counter{{ID: 1 << 60, Value: 1}, {ID: 2 << 60, Value: 1}}}, - Blocks: make([]BlockInfo, fileSize/BlockSize), - } - - del := f.DeletedCopy(LocalDeviceID.Short()) - if f.Deleted { - t.Errorf("Source file info was deleted on copy") - } - if !del.Deleted { - t.Errorf("Returned file info was not deleted on copy") - } - - inv := f.InvalidatedCopy(LocalDeviceID.Short()) - if f.Invalid { - t.Errorf("Source file info was invalid on copy") - } - if !inv.Invalid { - t.Errorf("Returned file info was not invalid on copy") - } -} diff --git a/lib/scanner/blockqueue.go b/lib/scanner/blockqueue.go index e354423eb..e1ef4be1f 100644 --- a/lib/scanner/blockqueue.go +++ b/lib/scanner/blockqueue.go @@ -65,15 +65,15 @@ type parallelHasher struct { fs fs.Filesystem blockSize int workers int - outbox chan<- ScanResult - inbox <-chan ScanResult + outbox chan<- protocol.FileInfo + inbox <-chan protocol.FileInfo counter Counter done chan<- struct{} useWeakHashes bool wg sync.WaitGroup } -func newParallelHasher(ctx context.Context, fs fs.Filesystem, blockSize, workers int, outbox chan<- ScanResult, inbox <-chan ScanResult, counter Counter, done chan<- struct{}, useWeakHashes bool) { +func newParallelHasher(ctx context.Context, fs fs.Filesystem, blockSize, workers int, outbox chan<- protocol.FileInfo, inbox <-chan protocol.FileInfo, counter Counter, done chan<- struct{}, useWeakHashes bool) { ph := ¶llelHasher{ fs: fs, blockSize: blockSize, @@ -104,25 +104,25 @@ func (ph *parallelHasher) hashFiles(ctx context.Context) { return } - if f.New.IsDirectory() || f.New.IsDeleted() { + if f.IsDirectory() || f.IsDeleted() { panic("Bug. Asked to hash a directory or a deleted file.") } - blocks, err := HashFile(ctx, ph.fs, f.New.Name, ph.blockSize, ph.counter, ph.useWeakHashes) + blocks, err := HashFile(ctx, ph.fs, f.Name, ph.blockSize, ph.counter, ph.useWeakHashes) if err != nil { - l.Debugln("hash error:", f.New.Name, err) + l.Debugln("hash error:", f.Name, err) continue } - f.New.Blocks = blocks + f.Blocks = blocks // The size we saw when initially deciding to hash the file // might not have been the size it actually had when we hashed // it. Update the size from the block list. - f.New.Size = 0 + f.Size = 0 for _, b := range blocks { - f.New.Size += int64(b.Size) + f.Size += int64(b.Size) } select { diff --git a/lib/scanner/walk.go b/lib/scanner/walk.go index 81dce7f59..06d138dfe 100644 --- a/lib/scanner/walk.go +++ b/lib/scanner/walk.go @@ -8,9 +8,7 @@ package scanner import ( "context" - "errors" "runtime" - "strings" "sync/atomic" "time" "unicode/utf8" @@ -50,8 +48,8 @@ type Config struct { Matcher *ignore.Matcher // Number of hours to keep temporary files for TempLifetime time.Duration - // Walks over file infos as present in the db before the scan alphabetically. - Have haveWalker + // If CurrentFiler is not nil, it is queried for the current file before rescanning. + CurrentFiler CurrentFiler // The Filesystem provides an abstraction on top of the actual filesystem. Filesystem fs.Filesystem // If IgnorePerms is true, changes to permission bits will not be @@ -72,28 +70,16 @@ type Config struct { UseWeakHashes bool } -type haveWalker interface { - // Walk passes all local file infos from the db which start with prefix - // to out and aborts early if ctx is cancelled. - Walk(prefix string, ctx context.Context, out chan<- protocol.FileInfo) +type CurrentFiler interface { + // CurrentFile returns the file as seen at last scan. + CurrentFile(name string) (protocol.FileInfo, bool) } -type fsWalkResult struct { - path string - info fs.FileInfo - err error -} - -type ScanResult struct { - New protocol.FileInfo - Old protocol.FileInfo -} - -func Walk(ctx context.Context, cfg Config) chan ScanResult { +func Walk(ctx context.Context, cfg Config) chan protocol.FileInfo { w := walker{cfg} - if w.Have == nil { - w.Have = noHaveWalker{} + if w.CurrentFiler == nil { + w.CurrentFiler = noCurrentFiler{} } if w.Filesystem == nil { panic("no filesystem specified") @@ -111,19 +97,25 @@ type walker struct { // Walk returns the list of files found in the local folder by scanning the // file system. Files are blockwise hashed. -func (w *walker) walk(ctx context.Context) chan ScanResult { +func (w *walker) walk(ctx context.Context) chan protocol.FileInfo { l.Debugln("Walk", w.Subs, w.BlockSize, w.Matcher) - haveChan := make(chan protocol.FileInfo) - haveCtx, haveCancel := context.WithCancel(ctx) - go w.dbWalkerRoutine(haveCtx, haveChan) + toHashChan := make(chan protocol.FileInfo) + finishedChan := make(chan protocol.FileInfo) - fsChan := make(chan fsWalkResult) - go w.fsWalkerRoutine(ctx, fsChan, haveCancel) - - toHashChan := make(chan ScanResult) - finishedChan := make(chan ScanResult) - go w.processWalkResults(ctx, fsChan, haveChan, toHashChan, finishedChan) + // A routine which walks the filesystem tree, and sends files which have + // been modified to the counter routine. + go func() { + hashFiles := w.walkAndHashFiles(ctx, toHashChan, finishedChan) + if len(w.Subs) == 0 { + w.Filesystem.Walk(".", hashFiles) + } else { + for _, sub := range w.Subs { + w.Filesystem.Walk(sub, hashFiles) + } + } + close(toHashChan) + }() // We're not required to emit scan progress events, just kick off hashers, // and feed inputs directly from the walker. @@ -147,15 +139,15 @@ func (w *walker) walk(ctx context.Context) chan ScanResult { // Parallel hasher is stopped by this routine when we close the channel over // which it receives the files we ask it to hash. go func() { - var filesToHash []ScanResult + var filesToHash []protocol.FileInfo var total int64 = 1 for file := range toHashChan { filesToHash = append(filesToHash, file) - total += file.New.Size + total += file.Size } - realToHashChan := make(chan ScanResult) + realToHashChan := make(chan protocol.FileInfo) done := make(chan struct{}) progress := newByteCounter() @@ -191,7 +183,7 @@ func (w *walker) walk(ctx context.Context) chan ScanResult { loop: for _, file := range filesToHash { - l.Debugln("real to hash:", file.New.Name) + l.Debugln("real to hash:", file.Name) select { case realToHashChan <- file: case <-ctx.Done(): @@ -204,49 +196,15 @@ func (w *walker) walk(ctx context.Context) chan ScanResult { return finishedChan } -// dbWalkerRoutine walks the db and sends back file infos to be compared to scan results. -func (w *walker) dbWalkerRoutine(ctx context.Context, haveChan chan<- protocol.FileInfo) { - defer close(haveChan) - - if len(w.Subs) == 0 { - w.Have.Walk("", ctx, haveChan) - return - } - - for _, sub := range w.Subs { - select { - case <-ctx.Done(): - return - default: - } - w.Have.Walk(sub, ctx, haveChan) - } -} - -// fsWalkerRoutine walks the filesystem tree and sends back file infos and potential -// errors at paths that need to be processed. -func (w *walker) fsWalkerRoutine(ctx context.Context, fsChan chan<- fsWalkResult, haveCancel context.CancelFunc) { - defer close(fsChan) - - walkFn := w.createFSWalkFn(ctx, fsChan) - if len(w.Subs) == 0 { - if err := w.Filesystem.Walk(".", walkFn); err != nil { - haveCancel() - } - return - } - - for _, sub := range w.Subs { - if err := w.Filesystem.Walk(sub, walkFn); err != nil { - haveCancel() - break - } - } -} - -func (w *walker) createFSWalkFn(ctx context.Context, fsChan chan<- fsWalkResult) fs.WalkFunc { +func (w *walker) walkAndHashFiles(ctx context.Context, fchan, dchan chan protocol.FileInfo) fs.WalkFunc { now := time.Now() return func(path string, info fs.FileInfo, err error) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // Return value used when we are returning early and don't want to // process the item. For directories, this means do-not-descend. var skip error // nil @@ -256,9 +214,7 @@ func (w *walker) createFSWalkFn(ctx context.Context, fsChan chan<- fsWalkResult) } if err != nil { - if sendErr := fsWalkError(ctx, fsChan, path, err); sendErr != nil { - return sendErr - } + l.Debugln("error:", path, info, err) return skip } @@ -266,6 +222,12 @@ func (w *walker) createFSWalkFn(ctx context.Context, fsChan chan<- fsWalkResult) return nil } + info, err = w.Filesystem.Lstat(path) + // An error here would be weird as we've already gotten to this point, but act on it nonetheless + if err != nil { + return skip + } + if fs.IsTemporary(path) { l.Debugln("temporary:", path) if info.IsRegular() && info.ModTime().Add(w.TempLifetime).Before(now) { @@ -276,170 +238,48 @@ func (w *walker) createFSWalkFn(ctx context.Context, fsChan chan<- fsWalkResult) } if fs.IsInternal(path) { - l.Debugln("skip walking (internal):", path) + l.Debugln("ignored (internal):", path) return skip } if w.Matcher.Match(path).IsIgnored() { - l.Debugln("skip walking (patterns):", path) + l.Debugln("ignored (patterns):", path) return skip } if !utf8.ValidString(path) { - if err := fsWalkError(ctx, fsChan, path, errors.New("path isn't a valid utf8 string")); err != nil { - return err - } l.Warnf("File name %q is not in UTF8 encoding; skipping.", path) return skip } path, shouldSkip := w.normalizePath(path, info) if shouldSkip { - if err := fsWalkError(ctx, fsChan, path, errors.New("failed to normalize path")); err != nil { + return skip + } + + switch { + case info.IsSymlink(): + if err := w.walkSymlink(ctx, path, dchan); err != nil { return err } - return skip - } + if info.IsDir() { + // under no circumstances shall we descend into a symlink + return fs.SkipDir + } + return nil - select { - case fsChan <- fsWalkResult{ - path: path, - info: info, - err: nil, - }: - case <-ctx.Done(): - return ctx.Err() - } + case info.IsDir(): + err = w.walkDir(ctx, path, info, dchan) - // under no circumstances shall we descend into a symlink - if info.IsSymlink() && info.IsDir() { - l.Debugln("skip walking (symlinked directory):", path) - return skip + case info.IsRegular(): + err = w.walkRegular(ctx, path, info, fchan) } return err } } -func fsWalkError(ctx context.Context, dst chan<- fsWalkResult, path string, err error) error { - select { - case dst <- fsWalkResult{ - path: path, - info: nil, - err: err, - }: - case <-ctx.Done(): - return ctx.Err() - } - - return nil -} - -func (w *walker) processWalkResults(ctx context.Context, fsChan <-chan fsWalkResult, haveChan <-chan protocol.FileInfo, toHashChan, finishedChan chan<- ScanResult) { - ctxChan := ctx.Done() - fsRes, fsChanOpen := <-fsChan - currDBFile, haveChanOpen := <-haveChan - for fsChanOpen { - if haveChanOpen { - // File infos below an error walking the filesystem tree - // may be marked as ignored but should not be deleted. - if fsRes.err != nil && (strings.HasPrefix(currDBFile.Name, fsRes.path+string(fs.PathSeparator)) || fsRes.path == ".") { - w.checkIgnoredAndInvalidate(currDBFile, finishedChan, ctxChan) - currDBFile, haveChanOpen = <-haveChan - continue - } - // Delete file infos that were not encountered when - // walking the filesystem tree, except on error (see - // above) or if they are ignored. - if currDBFile.Name < fsRes.path { - w.checkIgnoredAndDelete(currDBFile, finishedChan, ctxChan) - currDBFile, haveChanOpen = <-haveChan - continue - } - } - - var oldFile protocol.FileInfo - if haveChanOpen && currDBFile.Name == fsRes.path { - oldFile = currDBFile - currDBFile, haveChanOpen = <-haveChan - } - - if fsRes.err != nil { - if fs.IsNotExist(fsRes.err) && !oldFile.IsEmpty() && !oldFile.Deleted { - select { - case finishedChan <- ScanResult{ - New: oldFile.DeletedCopy(w.ShortID), - Old: oldFile, - }: - case <-ctx.Done(): - return - } - } - fsRes, fsChanOpen = <-fsChan - continue - } - - switch { - case fsRes.info.IsDir(): - w.walkDir(ctx, fsRes.path, fsRes.info, oldFile, finishedChan) - - case fsRes.info.IsSymlink(): - w.walkSymlink(ctx, fsRes.path, oldFile, finishedChan) - - case fsRes.info.IsRegular(): - w.walkRegular(ctx, fsRes.path, fsRes.info, oldFile, toHashChan) - } - - fsRes, fsChanOpen = <-fsChan - } - - // Filesystem tree walking finished, if there is anything left in the - // db, mark it as deleted, except when it's ignored. - if haveChanOpen { - w.checkIgnoredAndDelete(currDBFile, finishedChan, ctxChan) - for currDBFile = range haveChan { - w.checkIgnoredAndDelete(currDBFile, finishedChan, ctxChan) - } - } - - close(toHashChan) -} - -func (w *walker) checkIgnoredAndDelete(f protocol.FileInfo, finishedChan chan<- ScanResult, done <-chan struct{}) { - if w.checkIgnoredAndInvalidate(f, finishedChan, done) { - return - } - - if !f.Invalid && !f.Deleted { - select { - case finishedChan <- ScanResult{ - New: f.DeletedCopy(w.ShortID), - Old: f, - }: - case <-done: - } - } -} - -func (w *walker) checkIgnoredAndInvalidate(f protocol.FileInfo, finishedChan chan<- ScanResult, done <-chan struct{}) bool { - if !w.Matcher.Match(f.Name).IsIgnored() { - return false - } - - if !f.Invalid { - select { - case finishedChan <- ScanResult{ - New: f.InvalidatedCopy(w.ShortID), - Old: f, - }: - case <-done: - } - } - - return true -} - -func (w *walker) walkRegular(ctx context.Context, relPath string, info fs.FileInfo, cf protocol.FileInfo, toHashChan chan<- ScanResult) { +func (w *walker) walkRegular(ctx context.Context, relPath string, info fs.FileInfo, fchan chan protocol.FileInfo) error { curMode := uint32(info.Mode()) if runtime.GOOS == "windows" && osutil.IsWindowsExecutable(relPath) { curMode |= 0111 @@ -454,38 +294,40 @@ func (w *walker) walkRegular(ctx context.Context, relPath string, info fs.FileIn // - was not a symlink (since it's a file now) // - was not invalid (since it looks valid now) // - has the same size as previously - if !cf.IsEmpty() { - permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, curMode) - if permUnchanged && !cf.IsDeleted() && cf.ModTime().Equal(info.ModTime()) && !cf.IsDirectory() && - !cf.IsSymlink() && !cf.IsInvalid() && cf.Size == info.Size() { - return - } + cf, ok := w.CurrentFiler.CurrentFile(relPath) + permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, curMode) + if ok && permUnchanged && !cf.IsDeleted() && cf.ModTime().Equal(info.ModTime()) && !cf.IsDirectory() && + !cf.IsSymlink() && !cf.IsInvalid() && cf.Size == info.Size() { + return nil + } + + if ok { l.Debugln("rescan:", cf, info.ModTime().Unix(), info.Mode()&fs.ModePerm) } - f := ScanResult{ - New: protocol.FileInfo{ - Name: relPath, - Type: protocol.FileInfoTypeFile, - Version: cf.Version.Update(w.ShortID), - Permissions: curMode & uint32(maskModePerm), - NoPermissions: w.IgnorePerms, - ModifiedS: info.ModTime().Unix(), - ModifiedNs: int32(info.ModTime().Nanosecond()), - ModifiedBy: w.ShortID, - Size: info.Size(), - }, - Old: cf, + f := protocol.FileInfo{ + Name: relPath, + Type: protocol.FileInfoTypeFile, + Version: cf.Version.Update(w.ShortID), + Permissions: curMode & uint32(maskModePerm), + NoPermissions: w.IgnorePerms, + ModifiedS: info.ModTime().Unix(), + ModifiedNs: int32(info.ModTime().Nanosecond()), + ModifiedBy: w.ShortID, + Size: info.Size(), } l.Debugln("to hash:", relPath, f) select { - case toHashChan <- f: + case fchan <- f: case <-ctx.Done(): + return ctx.Err() } + + return nil } -func (w *walker) walkDir(ctx context.Context, relPath string, info fs.FileInfo, cf protocol.FileInfo, finishedChan chan<- ScanResult) { +func (w *walker) walkDir(ctx context.Context, relPath string, info fs.FileInfo, dchan chan protocol.FileInfo) error { // A directory is "unchanged", if it // - exists // - has the same permissions as previously, unless we are ignoring permissions @@ -493,41 +335,40 @@ func (w *walker) walkDir(ctx context.Context, relPath string, info fs.FileInfo, // - was a directory previously (not a file or something else) // - was not a symlink (since it's a directory now) // - was not invalid (since it looks valid now) - if !cf.IsEmpty() { - permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, uint32(info.Mode())) - if permUnchanged && !cf.IsDeleted() && cf.IsDirectory() && !cf.IsSymlink() && !cf.IsInvalid() { - return - } + cf, ok := w.CurrentFiler.CurrentFile(relPath) + permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, uint32(info.Mode())) + if ok && permUnchanged && !cf.IsDeleted() && cf.IsDirectory() && !cf.IsSymlink() && !cf.IsInvalid() { + return nil } - f := ScanResult{ - New: protocol.FileInfo{ - Name: relPath, - Type: protocol.FileInfoTypeDirectory, - Version: cf.Version.Update(w.ShortID), - Permissions: uint32(info.Mode() & maskModePerm), - NoPermissions: w.IgnorePerms, - ModifiedS: info.ModTime().Unix(), - ModifiedNs: int32(info.ModTime().Nanosecond()), - ModifiedBy: w.ShortID, - }, - Old: cf, + f := protocol.FileInfo{ + Name: relPath, + Type: protocol.FileInfoTypeDirectory, + Version: cf.Version.Update(w.ShortID), + Permissions: uint32(info.Mode() & maskModePerm), + NoPermissions: w.IgnorePerms, + ModifiedS: info.ModTime().Unix(), + ModifiedNs: int32(info.ModTime().Nanosecond()), + ModifiedBy: w.ShortID, } l.Debugln("dir:", relPath, f) select { - case finishedChan <- f: + case dchan <- f: case <-ctx.Done(): + return ctx.Err() } + + return nil } // walkSymlink returns nil or an error, if the error is of the nature that // it should stop the entire walk. -func (w *walker) walkSymlink(ctx context.Context, relPath string, cf protocol.FileInfo, finishedChan chan<- ScanResult) { +func (w *walker) walkSymlink(ctx context.Context, relPath string, dchan chan protocol.FileInfo) error { // Symlinks are not supported on Windows. We ignore instead of returning // an error. if runtime.GOOS == "windows" { - return + return nil } // We always rehash symlinks as they have no modtime or @@ -538,7 +379,7 @@ func (w *walker) walkSymlink(ctx context.Context, relPath string, cf protocol.Fi target, err := w.Filesystem.ReadSymlink(relPath) if err != nil { l.Debugln("readlink error:", relPath, err) - return + return nil } // A symlink is "unchanged", if @@ -547,27 +388,28 @@ func (w *walker) walkSymlink(ctx context.Context, relPath string, cf protocol.Fi // - it was a symlink // - it wasn't invalid // - the target was the same - if !cf.IsEmpty() && !cf.IsDeleted() && cf.IsSymlink() && !cf.IsInvalid() && cf.SymlinkTarget == target { - return + cf, ok := w.CurrentFiler.CurrentFile(relPath) + if ok && !cf.IsDeleted() && cf.IsSymlink() && !cf.IsInvalid() && cf.SymlinkTarget == target { + return nil } - f := ScanResult{ - New: protocol.FileInfo{ - Name: relPath, - Type: protocol.FileInfoTypeSymlink, - Version: cf.Version.Update(w.ShortID), - NoPermissions: true, // Symlinks don't have permissions of their own - SymlinkTarget: target, - }, - Old: cf, + f := protocol.FileInfo{ + Name: relPath, + Type: protocol.FileInfoTypeSymlink, + Version: cf.Version.Update(w.ShortID), + NoPermissions: true, // Symlinks don't have permissions of their own + SymlinkTarget: target, } l.Debugln("symlink changedb:", relPath, f) select { - case finishedChan <- f: + case dchan <- f: case <-ctx.Done(): + return ctx.Err() } + + return nil } // normalizePath returns the normalized relative path (possibly after fixing @@ -690,6 +532,10 @@ func (c *byteCounter) Close() { close(c.stop) } -type noHaveWalker struct{} +// A no-op CurrentFiler -func (noHaveWalker) Walk(prefix string, ctx context.Context, out chan<- protocol.FileInfo) {} +type noCurrentFiler struct{} + +func (noCurrentFiler) CurrentFile(name string) (protocol.FileInfo, bool) { + return protocol.FileInfo{}, false +} diff --git a/lib/scanner/walk_test.go b/lib/scanner/walk_test.go index a3100e1a1..e9121a6b4 100644 --- a/lib/scanner/walk_test.go +++ b/lib/scanner/walk_test.go @@ -69,7 +69,7 @@ func TestWalkSub(t *testing.T) { Matcher: ignores, Hashers: 2, }) - var files []ScanResult + var files []protocol.FileInfo for f := range fchan { files = append(files, f) } @@ -80,10 +80,10 @@ func TestWalkSub(t *testing.T) { if len(files) != 2 { t.Fatalf("Incorrect length %d != 2", len(files)) } - if files[0].New.Name != "dir2" { + if files[0].Name != "dir2" { t.Errorf("Incorrect file %v != dir2", files[0]) } - if files[1].New.Name != filepath.Join("dir2", "cfile") { + if files[1].Name != filepath.Join("dir2", "cfile") { t.Errorf("Incorrect file %v != dir2/cfile", files[1]) } } @@ -103,7 +103,7 @@ func TestWalk(t *testing.T) { Hashers: 2, }) - var tmp []ScanResult + var tmp []protocol.FileInfo for f := range fchan { tmp = append(tmp, f) } @@ -251,9 +251,9 @@ func TestNormalization(t *testing.T) { } func TestIssue1507(t *testing.T) { - w := &walker{Config{Matcher: ignore.New(fs.NewFilesystem(fs.FilesystemTypeBasic, "."))}} - c := make(chan fsWalkResult, 100) - fn := w.createFSWalkFn(context.TODO(), c) + w := &walker{} + c := make(chan protocol.FileInfo, 100) + fn := w.walkAndHashFiles(context.TODO(), c, c) fn("", nil, protocol.ErrClosed) } @@ -274,14 +274,15 @@ func TestWalkSymlinkUnix(t *testing.T) { // Scan it files, _ := walkDir(fs.NewFilesystem(fs.FilesystemTypeBasic, "_symlinks"), path) + // Verify that we got one symlink and with the correct attributes if len(files) != 1 { t.Errorf("expected 1 symlink, not %d", len(files)) } - if len(files[0].New.Blocks) != 0 { - t.Errorf("expected zero blocks for symlink, not %d", len(files[0].New.Blocks)) + if len(files[0].Blocks) != 0 { + t.Errorf("expected zero blocks for symlink, not %d", len(files[0].Blocks)) } - if files[0].New.SymlinkTarget != "../testdata" { - t.Errorf("expected symlink to have target destination, not %q", files[0].New.SymlinkTarget) + if files[0].SymlinkTarget != "../testdata" { + t.Errorf("expected symlink to have target destination, not %q", files[0].SymlinkTarget) } } } @@ -341,7 +342,7 @@ func TestWalkRootSymlink(t *testing.T) { } } -func walkDir(fs fs.Filesystem, dir string) ([]ScanResult, error) { +func walkDir(fs fs.Filesystem, dir string) ([]protocol.FileInfo, error) { fchan := Walk(context.TODO(), Config{ Filesystem: fs, Subs: []string{dir}, @@ -350,7 +351,7 @@ func walkDir(fs fs.Filesystem, dir string) ([]ScanResult, error) { Hashers: 2, }) - var tmp []ScanResult + var tmp []protocol.FileInfo for f := range fchan { tmp = append(tmp, f) } @@ -359,14 +360,14 @@ func walkDir(fs fs.Filesystem, dir string) ([]ScanResult, error) { return tmp, nil } -type fileList []ScanResult +type fileList []protocol.FileInfo func (l fileList) Len() int { return len(l) } func (l fileList) Less(a, b int) bool { - return l[a].New.Name < l[b].New.Name + return l[a].Name < l[b].Name } func (l fileList) Swap(a, b int) { @@ -376,12 +377,12 @@ func (l fileList) Swap(a, b int) { func (l fileList) testfiles() testfileList { testfiles := make(testfileList, len(l)) for i, f := range l { - if len(f.New.Blocks) > 1 { + if len(f.Blocks) > 1 { panic("simple test case stuff only supports a single block per file") } - testfiles[i] = testfile{name: f.New.Name, length: f.New.FileSize()} - if len(f.New.Blocks) == 1 { - testfiles[i].hash = fmt.Sprintf("%x", f.New.Blocks[0].Hash) + testfiles[i] = testfile{name: f.Name, length: f.FileSize()} + if len(f.Blocks) == 1 { + testfiles[i].hash = fmt.Sprintf("%x", f.Blocks[0].Hash) } } return testfiles @@ -464,13 +465,13 @@ func TestStopWalk(t *testing.T) { for { f := <-fchan t.Log("Scanned", f) - if f.New.IsDirectory() { - if len(f.New.Name) == 0 || f.New.Permissions == 0 { + if f.IsDirectory() { + if len(f.Name) == 0 || f.Permissions == 0 { t.Error("Bad directory entry", f) } dirs++ } else { - if len(f.New.Name) == 0 || len(f.New.Blocks) == 0 || f.New.Permissions == 0 { + if len(f.Name) == 0 || len(f.Blocks) == 0 || f.Permissions == 0 { t.Error("Bad file entry", f) } files++ @@ -528,69 +529,3 @@ func verify(r io.Reader, blocksize int, blocks []protocol.BlockInfo) error { return nil } - -// The following (randomish) scenario produced an error uncovered by integration tests -func TestWalkIntegration(t *testing.T) { - tmpDir, err := ioutil.TempDir(".", "_request-") - if err != nil { - panic("Failed to create temporary testing dir") - } - defer os.RemoveAll(tmpDir) - - fs := fs.NewFilesystem(fs.FilesystemTypeBasic, tmpDir) - fs.Mkdir("a", 0777) - toDel := filepath.Join("a", "b") - for _, f := range []string{"b", toDel} { - fi, err := fs.Create(f) - if err != nil { - panic(err) - } - fi.Close() - } - - conf := Config{ - Filesystem: fs, - BlockSize: 128 * 1024, - Hashers: 2, - } - - rchan := Walk(context.TODO(), conf) - - var res []ScanResult - for r := range rchan { - res = append(res, r) - } - sort.Sort(fileList(res)) - thw := make([]protocol.FileInfo, 0, len(res)) - for _, r := range res { - thw = append(thw, r.New) - } - conf.Have = testHaveWalker(thw) - - if err = fs.Remove(toDel); err != nil { - panic(err) - } - - rchan = Walk(context.TODO(), conf) - - for r := range rchan { - if r.New.Name != toDel { - t.Fatalf("Received unexpected result %v", r) - } - } -} - -type testHaveWalker []protocol.FileInfo - -func (thw testHaveWalker) Walk(prefix string, ctx context.Context, out chan<- protocol.FileInfo) { - if prefix != "" { - panic("cannot walk with prefix") - } - for _, f := range thw { - select { - case out <- f: - case <-ctx.Done(): - return - } - } -}