diff --git a/lib/db/set.go b/lib/db/set.go index afaa36494..71c1a19fd 100644 --- a/lib/db/set.go +++ b/lib/db/set.go @@ -194,9 +194,9 @@ func (s *FileSet) WithHaveTruncated(device protocol.DeviceID, fn Iterator) { s.db.withHave([]byte(s.folder), device[:], nil, true, nativeFileIterator(fn)) } -func (s *FileSet) WithPrefixedHaveTruncated(device protocol.DeviceID, prefix string, fn Iterator) { - l.Debugf("%s WithPrefixedHaveTruncated(%v)", s.folder, device) - s.db.withHave([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(prefix)), true, nativeFileIterator(fn)) +func (s *FileSet) WithPrefixedHave(device protocol.DeviceID, prefix string, fn Iterator) { + l.Debugf("%s WithPrefixedHave(%v)", s.folder, device) + s.db.withHave([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(prefix)), false, nativeFileIterator(fn)) } func (s *FileSet) WithGlobal(fn Iterator) { l.Debugf("%s WithGlobal()", s.folder) diff --git a/lib/fs/walkfs.go b/lib/fs/walkfs.go index 005c7e262..d4fa391f6 100644 --- a/lib/fs/walkfs.go +++ b/lib/fs/walkfs.go @@ -10,7 +10,10 @@ package fs -import "path/filepath" +import ( + "path/filepath" + "sort" +) // WalkFunc is the type of the function called for each file or directory // visited by Walk. The path argument contains the argument to Walk as a @@ -54,6 +57,7 @@ func (f *walkFilesystem) walk(path string, info FileInfo, walkFn WalkFunc) error if err != nil { return walkFn(path, info, err) } + sort.Strings(names) for _, name := range names { filename := filepath.Join(path, name) diff --git a/lib/model/model.go b/lib/model/model.go index 304f33e43..b14809ea8 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -1395,14 +1395,21 @@ func (m *Model) CurrentGlobalFile(folder string, file string) (protocol.FileInfo return fs.GetGlobal(file) } -type cFiler struct { - m *Model - r string +type haveWalker struct { + fset *db.FileSet } -// Implements scanner.CurrentFiler -func (cf cFiler) CurrentFile(file string) (protocol.FileInfo, bool) { - return cf.m.CurrentFolderFile(cf.r, file) +func (h haveWalker) Walk(prefix string, ctx context.Context, out chan<- protocol.FileInfo) { + ctxChan := ctx.Done() + h.fset.WithPrefixedHave(protocol.LocalDeviceID, prefix, func(fi db.FileIntf) bool { + f := fi.(protocol.FileInfo) + select { + case out <- f: + case <-ctxChan: + return false + } + return true + }) } // Connection returns the current connection for device, and a boolean wether a connection was found. @@ -1955,13 +1962,14 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su runner.setState(FolderScanning) - fchan := scanner.Walk(ctx, scanner.Config{ + haveWalker := haveWalker{fset} + rchan := scanner.Walk(ctx, scanner.Config{ Folder: folderCfg.ID, Subs: subDirs, Matcher: ignores, BlockSize: protocol.BlockSize, TempLifetime: time.Duration(m.cfg.Options().KeepTemporariesH) * time.Hour, - CurrentFiler: cFiler{m, folder}, + Have: haveWalker, Filesystem: mtimefs, IgnorePerms: folderCfg.IgnorePerms, AutoNormalize: folderCfg.AutoNormalize, @@ -1978,6 +1986,17 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su batch := make([]protocol.FileInfo, 0, maxBatchSizeFiles) batchSizeBytes := 0 changes := 0 + checkBatch := func() error { + if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes { + if err := runner.CheckHealth(); err != nil { + return err + } + m.updateLocalsFromScanning(folder, batch) + batch = batch[:0] + batchSizeBytes = 0 + } + return nil + } // Schedule a pull after scanning, but only if we actually detected any // changes. @@ -1987,98 +2006,49 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su } }() - for f := range fchan { - if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes { - if err := runner.CheckHealth(); err != nil { + var delDirStack []protocol.FileInfo + for r := range rchan { + if err := checkBatch(); err != nil { + l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), err) + return err + } + + // Append deleted dirs from stack if the current file isn't a child, + // which means all children were already processed. + for len(delDirStack) != 0 && !strings.HasPrefix(r.New.Name, delDirStack[len(delDirStack)-1].Name+string(fs.PathSeparator)) { + lastDelDir := delDirStack[len(delDirStack)-1] + batch = append(batch, lastDelDir) + batchSizeBytes += lastDelDir.ProtoSize() + changes++ + if err := checkBatch(); err != nil { l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), err) return err } - m.updateLocalsFromScanning(folder, batch) - batch = batch[:0] - batchSizeBytes = 0 + delDirStack = delDirStack[:len(delDirStack)-1] } - batch = append(batch, f) - batchSizeBytes += f.ProtoSize() + // Delay appending deleted dirs until all its children are processed + if r.Old.IsDirectory() && (r.New.Deleted || !r.New.IsDirectory()) { + delDirStack = append(delDirStack, r.New) + continue + } + + l.Debugln("Appending", r) + batch = append(batch, r.New) + batchSizeBytes += r.New.ProtoSize() changes++ } - if err := runner.CheckHealth(); err != nil { - l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), err) - return err - } else if len(batch) > 0 { - m.updateLocalsFromScanning(folder, batch) - } - - if len(subDirs) == 0 { - // If we have no specific subdirectories to traverse, set it to one - // empty prefix so we traverse the entire folder contents once. - subDirs = []string{""} - } - - // Do a scan of the database for each prefix, to check for deleted and - // ignored files. - batch = batch[:0] - batchSizeBytes = 0 - for _, sub := range subDirs { - var iterError error - - fset.WithPrefixedHaveTruncated(protocol.LocalDeviceID, sub, func(fi db.FileIntf) bool { - f := fi.(db.FileInfoTruncated) - if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes { - if err := runner.CheckHealth(); err != nil { - iterError = err - return false - } - m.updateLocalsFromScanning(folder, batch) - batch = batch[:0] - batchSizeBytes = 0 - } - - switch { - case !f.IsInvalid() && ignores.Match(f.Name).IsIgnored(): - // File was valid at last pass but has been ignored. Set invalid bit. - l.Debugln("setting invalid bit on ignored", f) - nf := f.ConvertToInvalidFileInfo(m.id.Short()) - batch = append(batch, nf) - batchSizeBytes += nf.ProtoSize() - changes++ - - case !f.IsInvalid() && !f.IsDeleted(): - // The file is valid and not deleted. Lets check if it's - // still here. - - if _, err := mtimefs.Lstat(f.Name); err != nil { - // We don't specifically verify that the error is - // fs.IsNotExist because there is a corner case when a - // directory is suddenly transformed into a file. When that - // happens, files that were in the directory (that is now a - // file) are deleted but will return a confusing error ("not a - // directory") when we try to Lstat() them. - - nf := protocol.FileInfo{ - Name: f.Name, - Type: f.Type, - Size: 0, - ModifiedS: f.ModifiedS, - ModifiedNs: f.ModifiedNs, - ModifiedBy: m.id.Short(), - Deleted: true, - Version: f.Version.Update(m.shortID), - } - - batch = append(batch, nf) - batchSizeBytes += nf.ProtoSize() - changes++ - } - } - return true - }) - - if iterError != nil { - l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), iterError) - return iterError + // Append remaining deleted dirs. + for i := len(delDirStack) - 1; i >= 0; i-- { + if err := checkBatch(); err != nil { + l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), err) + return err } + + batch = append(batch, delDirStack[i]) + batchSizeBytes += delDirStack[i].ProtoSize() + changes++ } if err := runner.CheckHealth(); err != nil { diff --git a/lib/model/model_test.go b/lib/model/model_test.go index 309210c97..be3ac0c50 100644 --- a/lib/model/model_test.go +++ b/lib/model/model_test.go @@ -2808,6 +2808,185 @@ func TestNoRequestsFromPausedDevices(t *testing.T) { } } +// TestIssue2571 tests replacing a directory with content with a symlink +func TestIssue2571(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Symlinks aren't supported by fs and scanner on windows") + } + err := defaultFs.MkdirAll("replaceDir", 0755) + if err != nil { + t.Fatal(err) + } + defer func() { + defaultFs.RemoveAll("replaceDir") + }() + + testFs := fs.NewFilesystem(fs.FilesystemTypeBasic, filepath.Join(defaultFs.URI(), "replaceDir")) + + for _, dir := range []string{"toLink", "linkTarget"} { + err := testFs.MkdirAll(dir, 0775) + if err != nil { + t.Fatal(err) + } + fd, err := testFs.Create(filepath.Join(dir, "a")) + if err != nil { + t.Fatal(err) + } + fd.Close() + } + + dbi := db.OpenMemory() + m := NewModel(defaultConfig, protocol.LocalDeviceID, "syncthing", "dev", dbi, nil) + m.AddFolder(defaultFolderConfig) + m.StartFolder("default") + m.ServeBackground() + defer m.Stop() + m.ScanFolder("default") + + if err = testFs.RemoveAll("toLink"); err != nil { + t.Fatal(err) + } + + if err := osutil.DebugSymlinkForTestsOnly(filepath.Join(testFs.URI(), "linkTarget"), filepath.Join(testFs.URI(), "toLink")); err != nil { + t.Fatal(err) + } + + m.ScanFolder("default") + + if dir, ok := m.CurrentFolderFile("default", filepath.Join("replaceDir", "toLink")); !ok { + t.Fatalf("Dir missing in db") + } else if !dir.IsSymlink() { + t.Errorf("Dir wasn't changed to symlink") + } + if file, ok := m.CurrentFolderFile("default", filepath.Join("replaceDir", "toLink", "a")); !ok { + t.Fatalf("File missing in db") + } else if !file.Deleted { + t.Errorf("File below symlink has not been marked as deleted") + } +} + +// TestIssue4573 tests that contents of an unavailable dir aren't marked deleted +func TestIssue4573(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Can't make the dir inaccessible on windows") + } + + err := defaultFs.MkdirAll("inaccessible", 0755) + if err != nil { + t.Fatal(err) + } + defer func() { + defaultFs.Chmod("inaccessible", 0777) + defaultFs.RemoveAll("inaccessible") + }() + + file := filepath.Join("inaccessible", "a") + fd, err := defaultFs.Create(file) + if err != nil { + t.Fatal(err) + } + fd.Close() + + dbi := db.OpenMemory() + m := NewModel(defaultConfig, protocol.LocalDeviceID, "syncthing", "dev", dbi, nil) + m.AddFolder(defaultFolderConfig) + m.StartFolder("default") + m.ServeBackground() + defer m.Stop() + m.ScanFolder("default") + + err = defaultFs.Chmod("inaccessible", 0000) + if err != nil { + t.Fatal(err) + } + + m.ScanFolder("default") + + if file, ok := m.CurrentFolderFile("default", file); !ok { + t.Fatalf("File missing in db") + } else if file.Deleted { + t.Errorf("Inaccessible file has been marked as deleted.") + } +} + +// TestInternalScan checks whether various fs operations are correctly represented +// in the db after scanning. +func TestInternalScan(t *testing.T) { + err := defaultFs.MkdirAll("internalScan", 0755) + if err != nil { + t.Fatal(err) + } + defer func() { + defaultFs.RemoveAll("internalScan") + }() + + testFs := fs.NewFilesystem(fs.FilesystemTypeBasic, filepath.Join(defaultFs.URI(), "internalScan")) + + testCases := map[string]func(protocol.FileInfo) bool{ + "removeDir": func(f protocol.FileInfo) bool { + return !f.Deleted + }, + "dirToFile": func(f protocol.FileInfo) bool { + return f.Deleted || f.IsDirectory() + }, + } + + baseDirs := []string{"dirToFile", "removeDir"} + for _, dir := range baseDirs { + sub := filepath.Join(dir, "subDir") + for _, dir := range []string{dir, sub} { + err := testFs.MkdirAll(dir, 0775) + if err != nil { + t.Fatalf("%v: %v", dir, err) + } + } + testCases[sub] = func(f protocol.FileInfo) bool { + return !f.Deleted + } + for _, dir := range []string{dir, sub} { + file := filepath.Join(dir, "a") + fd, err := testFs.Create(file) + if err != nil { + t.Fatal(err) + } + fd.Close() + testCases[file] = func(f protocol.FileInfo) bool { + return !f.Deleted + } + } + } + + dbi := db.OpenMemory() + m := NewModel(defaultConfig, protocol.LocalDeviceID, "syncthing", "dev", dbi, nil) + m.AddFolder(defaultFolderConfig) + m.StartFolder("default") + m.ServeBackground() + defer m.Stop() + m.ScanFolder("default") + + for _, dir := range baseDirs { + if err = testFs.RemoveAll(dir); err != nil { + t.Fatal(err) + } + } + + fd, err := testFs.Create("dirToFile") + if err != nil { + t.Fatal(err) + } + fd.Close() + + m.ScanFolder("default") + + for path, cond := range testCases { + if f, ok := m.CurrentFolderFile("default", filepath.Join("internalScan", path)); !ok { + t.Fatalf("%v missing in db", path) + } else if cond(f) { + t.Errorf("Incorrect db entry for %v", path) + } + } +} + func TestCustomMarkerName(t *testing.T) { ldb := db.OpenMemory() set := db.NewFileSet("default", defaultFs, ldb) diff --git a/lib/protocol/bep_extensions.go b/lib/protocol/bep_extensions.go index 6c339f124..2bd2e0e0d 100644 --- a/lib/protocol/bep_extensions.go +++ b/lib/protocol/bep_extensions.go @@ -117,6 +117,10 @@ func (f FileInfo) WinsConflict(other FileInfo) bool { return f.Version.Compare(other.Version) == ConcurrentGreater } +func (f FileInfo) IsEmpty() bool { + return f.Version.Counters == nil +} + func (f *FileInfo) Invalidate(invalidatedBy ShortID) { f.Invalid = true f.ModifiedBy = invalidatedBy @@ -124,6 +128,23 @@ func (f *FileInfo) Invalidate(invalidatedBy ShortID) { f.Sequence = 0 } +func (f *FileInfo) InvalidatedCopy(invalidatedBy ShortID) FileInfo { + copy := *f + copy.Invalidate(invalidatedBy) + return copy +} + +func (f *FileInfo) DeletedCopy(deletedBy ShortID) FileInfo { + copy := *f + copy.Size = 0 + copy.ModifiedBy = deletedBy + copy.Deleted = true + copy.Version = f.Version.Update(deletedBy) + copy.Sequence = 0 + copy.Blocks = nil + return copy +} + func (b BlockInfo) String() string { return fmt.Sprintf("Block{%d/%d/%d/%x}", b.Offset, b.Size, b.WeakHash, b.Hash) } diff --git a/lib/protocol/protocol_test.go b/lib/protocol/protocol_test.go index f19710ba9..6f2249eab 100644 --- a/lib/protocol/protocol_test.go +++ b/lib/protocol/protocol_test.go @@ -24,6 +24,10 @@ var ( quickCfg = &quick.Config{} ) +const ( + fileSize = 1 << 40 +) + func TestPing(t *testing.T) { ar, aw := io.Pipe() br, bw := io.Pipe() @@ -243,12 +247,6 @@ func TestMarshalledIndexMessageSize(t *testing.T) { return } - const ( - maxMessageSize = MaxMessageLen - fileSize = 1 << 40 - blockSize = BlockSize - ) - f := FileInfo{ Name: "a normal length file name withoout any weird stuff.txt", Type: FileInfoTypeFile, @@ -256,12 +254,12 @@ func TestMarshalledIndexMessageSize(t *testing.T) { Permissions: 0666, ModifiedS: time.Now().Unix(), Version: Vector{Counters: []Counter{{ID: 1 << 60, Value: 1}, {ID: 2 << 60, Value: 1}}}, - Blocks: make([]BlockInfo, fileSize/blockSize), + Blocks: make([]BlockInfo, fileSize/BlockSize), } - for i := 0; i < fileSize/blockSize; i++ { - f.Blocks[i].Offset = int64(i) * blockSize - f.Blocks[i].Size = blockSize + for i := 0; i < fileSize/BlockSize; i++ { + f.Blocks[i].Offset = int64(i) * BlockSize + f.Blocks[i].Size = BlockSize f.Blocks[i].Hash = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 20, 1, 2, 3, 4, 5, 6, 7, 8, 9, 30, 1, 2} } @@ -271,8 +269,8 @@ func TestMarshalledIndexMessageSize(t *testing.T) { } msgSize := idx.ProtoSize() - if msgSize > maxMessageSize { - t.Errorf("Message size %d bytes is larger than max %d", msgSize, maxMessageSize) + if msgSize > MaxMessageLen { + t.Errorf("Message size %d bytes is larger than max %d", msgSize, MaxMessageLen) } } @@ -400,3 +398,31 @@ func TestCheckConsistency(t *testing.T) { } } } + +func TestCopyFileInfo(t *testing.T) { + f := FileInfo{ + Name: "a normal length file name withoout any weird stuff.txt", + Type: FileInfoTypeFile, + Size: fileSize, + Permissions: 0666, + ModifiedS: time.Now().Unix(), + Version: Vector{Counters: []Counter{{ID: 1 << 60, Value: 1}, {ID: 2 << 60, Value: 1}}}, + Blocks: make([]BlockInfo, fileSize/BlockSize), + } + + del := f.DeletedCopy(LocalDeviceID.Short()) + if f.Deleted { + t.Errorf("Source file info was deleted on copy") + } + if !del.Deleted { + t.Errorf("Returned file info was not deleted on copy") + } + + inv := f.InvalidatedCopy(LocalDeviceID.Short()) + if f.Invalid { + t.Errorf("Source file info was invalid on copy") + } + if !inv.Invalid { + t.Errorf("Returned file info was not invalid on copy") + } +} diff --git a/lib/scanner/blockqueue.go b/lib/scanner/blockqueue.go index e1ef4be1f..e354423eb 100644 --- a/lib/scanner/blockqueue.go +++ b/lib/scanner/blockqueue.go @@ -65,15 +65,15 @@ type parallelHasher struct { fs fs.Filesystem blockSize int workers int - outbox chan<- protocol.FileInfo - inbox <-chan protocol.FileInfo + outbox chan<- ScanResult + inbox <-chan ScanResult counter Counter done chan<- struct{} useWeakHashes bool wg sync.WaitGroup } -func newParallelHasher(ctx context.Context, fs fs.Filesystem, blockSize, workers int, outbox chan<- protocol.FileInfo, inbox <-chan protocol.FileInfo, counter Counter, done chan<- struct{}, useWeakHashes bool) { +func newParallelHasher(ctx context.Context, fs fs.Filesystem, blockSize, workers int, outbox chan<- ScanResult, inbox <-chan ScanResult, counter Counter, done chan<- struct{}, useWeakHashes bool) { ph := ¶llelHasher{ fs: fs, blockSize: blockSize, @@ -104,25 +104,25 @@ func (ph *parallelHasher) hashFiles(ctx context.Context) { return } - if f.IsDirectory() || f.IsDeleted() { + if f.New.IsDirectory() || f.New.IsDeleted() { panic("Bug. Asked to hash a directory or a deleted file.") } - blocks, err := HashFile(ctx, ph.fs, f.Name, ph.blockSize, ph.counter, ph.useWeakHashes) + blocks, err := HashFile(ctx, ph.fs, f.New.Name, ph.blockSize, ph.counter, ph.useWeakHashes) if err != nil { - l.Debugln("hash error:", f.Name, err) + l.Debugln("hash error:", f.New.Name, err) continue } - f.Blocks = blocks + f.New.Blocks = blocks // The size we saw when initially deciding to hash the file // might not have been the size it actually had when we hashed // it. Update the size from the block list. - f.Size = 0 + f.New.Size = 0 for _, b := range blocks { - f.Size += int64(b.Size) + f.New.Size += int64(b.Size) } select { diff --git a/lib/scanner/walk.go b/lib/scanner/walk.go index 06d138dfe..5112f1e87 100644 --- a/lib/scanner/walk.go +++ b/lib/scanner/walk.go @@ -8,7 +8,9 @@ package scanner import ( "context" + "errors" "runtime" + "strings" "sync/atomic" "time" "unicode/utf8" @@ -48,8 +50,8 @@ type Config struct { Matcher *ignore.Matcher // Number of hours to keep temporary files for TempLifetime time.Duration - // If CurrentFiler is not nil, it is queried for the current file before rescanning. - CurrentFiler CurrentFiler + // Walks over file infos as present in the db before the scan alphabetically. + Have haveWalker // The Filesystem provides an abstraction on top of the actual filesystem. Filesystem fs.Filesystem // If IgnorePerms is true, changes to permission bits will not be @@ -70,16 +72,28 @@ type Config struct { UseWeakHashes bool } -type CurrentFiler interface { - // CurrentFile returns the file as seen at last scan. - CurrentFile(name string) (protocol.FileInfo, bool) +type haveWalker interface { + // Walk passes all local file infos from the db which start with prefix + // to out and aborts early if ctx is cancelled. + Walk(prefix string, ctx context.Context, out chan<- protocol.FileInfo) } -func Walk(ctx context.Context, cfg Config) chan protocol.FileInfo { +type fsWalkResult struct { + path string + info fs.FileInfo + err error +} + +type ScanResult struct { + New protocol.FileInfo + Old protocol.FileInfo +} + +func Walk(ctx context.Context, cfg Config) chan ScanResult { w := walker{cfg} - if w.CurrentFiler == nil { - w.CurrentFiler = noCurrentFiler{} + if w.Have == nil { + w.Have = noHaveWalker{} } if w.Filesystem == nil { panic("no filesystem specified") @@ -97,25 +111,19 @@ type walker struct { // Walk returns the list of files found in the local folder by scanning the // file system. Files are blockwise hashed. -func (w *walker) walk(ctx context.Context) chan protocol.FileInfo { +func (w *walker) walk(ctx context.Context) chan ScanResult { l.Debugln("Walk", w.Subs, w.BlockSize, w.Matcher) - toHashChan := make(chan protocol.FileInfo) - finishedChan := make(chan protocol.FileInfo) + haveChan := make(chan protocol.FileInfo) + haveCtx, haveCancel := context.WithCancel(ctx) + go w.dbWalkerRoutine(haveCtx, haveChan) - // A routine which walks the filesystem tree, and sends files which have - // been modified to the counter routine. - go func() { - hashFiles := w.walkAndHashFiles(ctx, toHashChan, finishedChan) - if len(w.Subs) == 0 { - w.Filesystem.Walk(".", hashFiles) - } else { - for _, sub := range w.Subs { - w.Filesystem.Walk(sub, hashFiles) - } - } - close(toHashChan) - }() + fsChan := make(chan fsWalkResult) + go w.fsWalkerRoutine(ctx, fsChan, haveCancel) + + toHashChan := make(chan ScanResult) + finishedChan := make(chan ScanResult) + go w.processWalkResults(ctx, fsChan, haveChan, toHashChan, finishedChan) // We're not required to emit scan progress events, just kick off hashers, // and feed inputs directly from the walker. @@ -139,15 +147,15 @@ func (w *walker) walk(ctx context.Context) chan protocol.FileInfo { // Parallel hasher is stopped by this routine when we close the channel over // which it receives the files we ask it to hash. go func() { - var filesToHash []protocol.FileInfo + var filesToHash []ScanResult var total int64 = 1 for file := range toHashChan { filesToHash = append(filesToHash, file) - total += file.Size + total += file.New.Size } - realToHashChan := make(chan protocol.FileInfo) + realToHashChan := make(chan ScanResult) done := make(chan struct{}) progress := newByteCounter() @@ -183,7 +191,7 @@ func (w *walker) walk(ctx context.Context) chan protocol.FileInfo { loop: for _, file := range filesToHash { - l.Debugln("real to hash:", file.Name) + l.Debugln("real to hash:", file.New.Name) select { case realToHashChan <- file: case <-ctx.Done(): @@ -196,15 +204,49 @@ func (w *walker) walk(ctx context.Context) chan protocol.FileInfo { return finishedChan } -func (w *walker) walkAndHashFiles(ctx context.Context, fchan, dchan chan protocol.FileInfo) fs.WalkFunc { - now := time.Now() - return func(path string, info fs.FileInfo, err error) error { +// dbWalkerRoutine walks the db and sends back file infos to be compared to scan results. +func (w *walker) dbWalkerRoutine(ctx context.Context, haveChan chan<- protocol.FileInfo) { + defer close(haveChan) + + if len(w.Subs) == 0 { + w.Have.Walk("", ctx, haveChan) + return + } + + for _, sub := range w.Subs { select { case <-ctx.Done(): - return ctx.Err() + return default: } + w.Have.Walk(sub, ctx, haveChan) + } +} +// fsWalkerRoutine walks the filesystem tree and sends back file infos and potential +// errors at paths that need to be processed. +func (w *walker) fsWalkerRoutine(ctx context.Context, fsChan chan<- fsWalkResult, haveCancel context.CancelFunc) { + defer close(fsChan) + + walkFn := w.createFSWalkFn(ctx, fsChan) + if len(w.Subs) == 0 { + if err := w.Filesystem.Walk(".", walkFn); err != nil { + haveCancel() + } + return + } + + for _, sub := range w.Subs { + if err := w.Filesystem.Walk(sub, walkFn); err != nil { + haveCancel() + break + } + } +} + +func (w *walker) createFSWalkFn(ctx context.Context, fsChan chan<- fsWalkResult) fs.WalkFunc { + now := time.Now() + return func(path string, info fs.FileInfo, err error) error { // Return value used when we are returning early and don't want to // process the item. For directories, this means do-not-descend. var skip error // nil @@ -213,21 +255,14 @@ func (w *walker) walkAndHashFiles(ctx context.Context, fchan, dchan chan protoco skip = fs.SkipDir } - if err != nil { - l.Debugln("error:", path, info, err) - return skip - } - if path == "." { + if err != nil { + fsWalkError(ctx, fsChan, path, err) + return skip + } return nil } - info, err = w.Filesystem.Lstat(path) - // An error here would be weird as we've already gotten to this point, but act on it nonetheless - if err != nil { - return skip - } - if fs.IsTemporary(path) { l.Debugln("temporary:", path) if info.IsRegular() && info.ModTime().Add(w.TempLifetime).Before(now) { @@ -238,48 +273,177 @@ func (w *walker) walkAndHashFiles(ctx context.Context, fchan, dchan chan protoco } if fs.IsInternal(path) { - l.Debugln("ignored (internal):", path) + l.Debugln("skip walking (internal):", path) return skip } if w.Matcher.Match(path).IsIgnored() { - l.Debugln("ignored (patterns):", path) + l.Debugln("skip walking (patterns):", path) + return skip + } + + if err != nil { + if sendErr := fsWalkError(ctx, fsChan, path, err); sendErr != nil { + return sendErr + } return skip } if !utf8.ValidString(path) { + if err := fsWalkError(ctx, fsChan, path, errors.New("path isn't a valid utf8 string")); err != nil { + return err + } l.Warnf("File name %q is not in UTF8 encoding; skipping.", path) return skip } path, shouldSkip := w.normalizePath(path, info) if shouldSkip { + if err := fsWalkError(ctx, fsChan, path, errors.New("failed to normalize path")); err != nil { + return err + } return skip } - switch { - case info.IsSymlink(): - if err := w.walkSymlink(ctx, path, dchan); err != nil { - return err - } - if info.IsDir() { - // under no circumstances shall we descend into a symlink - return fs.SkipDir - } - return nil + select { + case fsChan <- fsWalkResult{ + path: path, + info: info, + err: nil, + }: + case <-ctx.Done(): + return ctx.Err() + } - case info.IsDir(): - err = w.walkDir(ctx, path, info, dchan) - - case info.IsRegular(): - err = w.walkRegular(ctx, path, info, fchan) + // under no circumstances shall we descend into a symlink + if info.IsSymlink() && info.IsDir() { + l.Debugln("skip walking (symlinked directory):", path) + return skip } return err } } -func (w *walker) walkRegular(ctx context.Context, relPath string, info fs.FileInfo, fchan chan protocol.FileInfo) error { +func fsWalkError(ctx context.Context, dst chan<- fsWalkResult, path string, err error) error { + select { + case dst <- fsWalkResult{ + path: path, + info: nil, + err: err, + }: + case <-ctx.Done(): + return ctx.Err() + } + + return nil +} + +func (w *walker) processWalkResults(ctx context.Context, fsChan <-chan fsWalkResult, haveChan <-chan protocol.FileInfo, toHashChan, finishedChan chan<- ScanResult) { + ctxChan := ctx.Done() + fsRes, fsChanOpen := <-fsChan + currDBFile, haveChanOpen := <-haveChan + for fsChanOpen { + if haveChanOpen { + // File infos below an error walking the filesystem tree + // may be marked as ignored but should not be deleted. + if fsRes.err != nil && (strings.HasPrefix(currDBFile.Name, fsRes.path+string(fs.PathSeparator)) || fsRes.path == ".") { + w.checkIgnoredAndInvalidate(currDBFile, finishedChan, ctxChan) + currDBFile, haveChanOpen = <-haveChan + continue + } + // Delete file infos that were not encountered when + // walking the filesystem tree, except on error (see + // above) or if they are ignored. + if currDBFile.Name < fsRes.path { + w.checkIgnoredAndDelete(currDBFile, finishedChan, ctxChan) + currDBFile, haveChanOpen = <-haveChan + continue + } + } + + var oldFile protocol.FileInfo + if haveChanOpen && currDBFile.Name == fsRes.path { + oldFile = currDBFile + currDBFile, haveChanOpen = <-haveChan + } + + if fsRes.err != nil { + if fs.IsNotExist(fsRes.err) && !oldFile.IsEmpty() && !oldFile.Deleted { + select { + case finishedChan <- ScanResult{ + New: oldFile.DeletedCopy(w.ShortID), + Old: oldFile, + }: + case <-ctx.Done(): + return + } + } + fsRes, fsChanOpen = <-fsChan + continue + } + + switch { + case fsRes.info.IsDir(): + w.walkDir(ctx, fsRes.path, fsRes.info, oldFile, finishedChan) + + case fsRes.info.IsSymlink(): + w.walkSymlink(ctx, fsRes.path, oldFile, finishedChan) + + case fsRes.info.IsRegular(): + w.walkRegular(ctx, fsRes.path, fsRes.info, oldFile, toHashChan) + } + + fsRes, fsChanOpen = <-fsChan + } + + // Filesystem tree walking finished, if there is anything left in the + // db, mark it as deleted, except when it's ignored. + if haveChanOpen { + w.checkIgnoredAndDelete(currDBFile, finishedChan, ctxChan) + for currDBFile = range haveChan { + w.checkIgnoredAndDelete(currDBFile, finishedChan, ctxChan) + } + } + + close(toHashChan) +} + +func (w *walker) checkIgnoredAndDelete(f protocol.FileInfo, finishedChan chan<- ScanResult, done <-chan struct{}) { + if w.checkIgnoredAndInvalidate(f, finishedChan, done) { + return + } + + if !f.Invalid && !f.Deleted { + select { + case finishedChan <- ScanResult{ + New: f.DeletedCopy(w.ShortID), + Old: f, + }: + case <-done: + } + } +} + +func (w *walker) checkIgnoredAndInvalidate(f protocol.FileInfo, finishedChan chan<- ScanResult, done <-chan struct{}) bool { + if !w.Matcher.Match(f.Name).IsIgnored() { + return false + } + + if !f.Invalid { + select { + case finishedChan <- ScanResult{ + New: f.InvalidatedCopy(w.ShortID), + Old: f, + }: + case <-done: + } + } + + return true +} + +func (w *walker) walkRegular(ctx context.Context, relPath string, info fs.FileInfo, cf protocol.FileInfo, toHashChan chan<- ScanResult) { curMode := uint32(info.Mode()) if runtime.GOOS == "windows" && osutil.IsWindowsExecutable(relPath) { curMode |= 0111 @@ -294,40 +458,38 @@ func (w *walker) walkRegular(ctx context.Context, relPath string, info fs.FileIn // - was not a symlink (since it's a file now) // - was not invalid (since it looks valid now) // - has the same size as previously - cf, ok := w.CurrentFiler.CurrentFile(relPath) - permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, curMode) - if ok && permUnchanged && !cf.IsDeleted() && cf.ModTime().Equal(info.ModTime()) && !cf.IsDirectory() && - !cf.IsSymlink() && !cf.IsInvalid() && cf.Size == info.Size() { - return nil - } - - if ok { + if !cf.IsEmpty() { + permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, curMode) + if permUnchanged && !cf.IsDeleted() && cf.ModTime().Equal(info.ModTime()) && !cf.IsDirectory() && + !cf.IsSymlink() && !cf.IsInvalid() && cf.Size == info.Size() { + return + } l.Debugln("rescan:", cf, info.ModTime().Unix(), info.Mode()&fs.ModePerm) } - f := protocol.FileInfo{ - Name: relPath, - Type: protocol.FileInfoTypeFile, - Version: cf.Version.Update(w.ShortID), - Permissions: curMode & uint32(maskModePerm), - NoPermissions: w.IgnorePerms, - ModifiedS: info.ModTime().Unix(), - ModifiedNs: int32(info.ModTime().Nanosecond()), - ModifiedBy: w.ShortID, - Size: info.Size(), + f := ScanResult{ + New: protocol.FileInfo{ + Name: relPath, + Type: protocol.FileInfoTypeFile, + Version: cf.Version.Update(w.ShortID), + Permissions: curMode & uint32(maskModePerm), + NoPermissions: w.IgnorePerms, + ModifiedS: info.ModTime().Unix(), + ModifiedNs: int32(info.ModTime().Nanosecond()), + ModifiedBy: w.ShortID, + Size: info.Size(), + }, + Old: cf, } l.Debugln("to hash:", relPath, f) select { - case fchan <- f: + case toHashChan <- f: case <-ctx.Done(): - return ctx.Err() } - - return nil } -func (w *walker) walkDir(ctx context.Context, relPath string, info fs.FileInfo, dchan chan protocol.FileInfo) error { +func (w *walker) walkDir(ctx context.Context, relPath string, info fs.FileInfo, cf protocol.FileInfo, finishedChan chan<- ScanResult) { // A directory is "unchanged", if it // - exists // - has the same permissions as previously, unless we are ignoring permissions @@ -335,40 +497,41 @@ func (w *walker) walkDir(ctx context.Context, relPath string, info fs.FileInfo, // - was a directory previously (not a file or something else) // - was not a symlink (since it's a directory now) // - was not invalid (since it looks valid now) - cf, ok := w.CurrentFiler.CurrentFile(relPath) - permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, uint32(info.Mode())) - if ok && permUnchanged && !cf.IsDeleted() && cf.IsDirectory() && !cf.IsSymlink() && !cf.IsInvalid() { - return nil + if !cf.IsEmpty() { + permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, uint32(info.Mode())) + if permUnchanged && !cf.IsDeleted() && cf.IsDirectory() && !cf.IsSymlink() && !cf.IsInvalid() { + return + } } - f := protocol.FileInfo{ - Name: relPath, - Type: protocol.FileInfoTypeDirectory, - Version: cf.Version.Update(w.ShortID), - Permissions: uint32(info.Mode() & maskModePerm), - NoPermissions: w.IgnorePerms, - ModifiedS: info.ModTime().Unix(), - ModifiedNs: int32(info.ModTime().Nanosecond()), - ModifiedBy: w.ShortID, + f := ScanResult{ + New: protocol.FileInfo{ + Name: relPath, + Type: protocol.FileInfoTypeDirectory, + Version: cf.Version.Update(w.ShortID), + Permissions: uint32(info.Mode() & maskModePerm), + NoPermissions: w.IgnorePerms, + ModifiedS: info.ModTime().Unix(), + ModifiedNs: int32(info.ModTime().Nanosecond()), + ModifiedBy: w.ShortID, + }, + Old: cf, } l.Debugln("dir:", relPath, f) select { - case dchan <- f: + case finishedChan <- f: case <-ctx.Done(): - return ctx.Err() } - - return nil } // walkSymlink returns nil or an error, if the error is of the nature that // it should stop the entire walk. -func (w *walker) walkSymlink(ctx context.Context, relPath string, dchan chan protocol.FileInfo) error { +func (w *walker) walkSymlink(ctx context.Context, relPath string, cf protocol.FileInfo, finishedChan chan<- ScanResult) { // Symlinks are not supported on Windows. We ignore instead of returning // an error. if runtime.GOOS == "windows" { - return nil + return } // We always rehash symlinks as they have no modtime or @@ -379,7 +542,7 @@ func (w *walker) walkSymlink(ctx context.Context, relPath string, dchan chan pro target, err := w.Filesystem.ReadSymlink(relPath) if err != nil { l.Debugln("readlink error:", relPath, err) - return nil + return } // A symlink is "unchanged", if @@ -388,28 +551,27 @@ func (w *walker) walkSymlink(ctx context.Context, relPath string, dchan chan pro // - it was a symlink // - it wasn't invalid // - the target was the same - cf, ok := w.CurrentFiler.CurrentFile(relPath) - if ok && !cf.IsDeleted() && cf.IsSymlink() && !cf.IsInvalid() && cf.SymlinkTarget == target { - return nil + if !cf.IsEmpty() && !cf.IsDeleted() && cf.IsSymlink() && !cf.IsInvalid() && cf.SymlinkTarget == target { + return } - f := protocol.FileInfo{ - Name: relPath, - Type: protocol.FileInfoTypeSymlink, - Version: cf.Version.Update(w.ShortID), - NoPermissions: true, // Symlinks don't have permissions of their own - SymlinkTarget: target, + f := ScanResult{ + New: protocol.FileInfo{ + Name: relPath, + Type: protocol.FileInfoTypeSymlink, + Version: cf.Version.Update(w.ShortID), + NoPermissions: true, // Symlinks don't have permissions of their own + SymlinkTarget: target, + }, + Old: cf, } l.Debugln("symlink changedb:", relPath, f) select { - case dchan <- f: + case finishedChan <- f: case <-ctx.Done(): - return ctx.Err() } - - return nil } // normalizePath returns the normalized relative path (possibly after fixing @@ -532,10 +694,6 @@ func (c *byteCounter) Close() { close(c.stop) } -// A no-op CurrentFiler +type noHaveWalker struct{} -type noCurrentFiler struct{} - -func (noCurrentFiler) CurrentFile(name string) (protocol.FileInfo, bool) { - return protocol.FileInfo{}, false -} +func (noHaveWalker) Walk(prefix string, ctx context.Context, out chan<- protocol.FileInfo) {} diff --git a/lib/scanner/walk_test.go b/lib/scanner/walk_test.go index e9121a6b4..a3100e1a1 100644 --- a/lib/scanner/walk_test.go +++ b/lib/scanner/walk_test.go @@ -69,7 +69,7 @@ func TestWalkSub(t *testing.T) { Matcher: ignores, Hashers: 2, }) - var files []protocol.FileInfo + var files []ScanResult for f := range fchan { files = append(files, f) } @@ -80,10 +80,10 @@ func TestWalkSub(t *testing.T) { if len(files) != 2 { t.Fatalf("Incorrect length %d != 2", len(files)) } - if files[0].Name != "dir2" { + if files[0].New.Name != "dir2" { t.Errorf("Incorrect file %v != dir2", files[0]) } - if files[1].Name != filepath.Join("dir2", "cfile") { + if files[1].New.Name != filepath.Join("dir2", "cfile") { t.Errorf("Incorrect file %v != dir2/cfile", files[1]) } } @@ -103,7 +103,7 @@ func TestWalk(t *testing.T) { Hashers: 2, }) - var tmp []protocol.FileInfo + var tmp []ScanResult for f := range fchan { tmp = append(tmp, f) } @@ -251,9 +251,9 @@ func TestNormalization(t *testing.T) { } func TestIssue1507(t *testing.T) { - w := &walker{} - c := make(chan protocol.FileInfo, 100) - fn := w.walkAndHashFiles(context.TODO(), c, c) + w := &walker{Config{Matcher: ignore.New(fs.NewFilesystem(fs.FilesystemTypeBasic, "."))}} + c := make(chan fsWalkResult, 100) + fn := w.createFSWalkFn(context.TODO(), c) fn("", nil, protocol.ErrClosed) } @@ -274,15 +274,14 @@ func TestWalkSymlinkUnix(t *testing.T) { // Scan it files, _ := walkDir(fs.NewFilesystem(fs.FilesystemTypeBasic, "_symlinks"), path) - // Verify that we got one symlink and with the correct attributes if len(files) != 1 { t.Errorf("expected 1 symlink, not %d", len(files)) } - if len(files[0].Blocks) != 0 { - t.Errorf("expected zero blocks for symlink, not %d", len(files[0].Blocks)) + if len(files[0].New.Blocks) != 0 { + t.Errorf("expected zero blocks for symlink, not %d", len(files[0].New.Blocks)) } - if files[0].SymlinkTarget != "../testdata" { - t.Errorf("expected symlink to have target destination, not %q", files[0].SymlinkTarget) + if files[0].New.SymlinkTarget != "../testdata" { + t.Errorf("expected symlink to have target destination, not %q", files[0].New.SymlinkTarget) } } } @@ -342,7 +341,7 @@ func TestWalkRootSymlink(t *testing.T) { } } -func walkDir(fs fs.Filesystem, dir string) ([]protocol.FileInfo, error) { +func walkDir(fs fs.Filesystem, dir string) ([]ScanResult, error) { fchan := Walk(context.TODO(), Config{ Filesystem: fs, Subs: []string{dir}, @@ -351,7 +350,7 @@ func walkDir(fs fs.Filesystem, dir string) ([]protocol.FileInfo, error) { Hashers: 2, }) - var tmp []protocol.FileInfo + var tmp []ScanResult for f := range fchan { tmp = append(tmp, f) } @@ -360,14 +359,14 @@ func walkDir(fs fs.Filesystem, dir string) ([]protocol.FileInfo, error) { return tmp, nil } -type fileList []protocol.FileInfo +type fileList []ScanResult func (l fileList) Len() int { return len(l) } func (l fileList) Less(a, b int) bool { - return l[a].Name < l[b].Name + return l[a].New.Name < l[b].New.Name } func (l fileList) Swap(a, b int) { @@ -377,12 +376,12 @@ func (l fileList) Swap(a, b int) { func (l fileList) testfiles() testfileList { testfiles := make(testfileList, len(l)) for i, f := range l { - if len(f.Blocks) > 1 { + if len(f.New.Blocks) > 1 { panic("simple test case stuff only supports a single block per file") } - testfiles[i] = testfile{name: f.Name, length: f.FileSize()} - if len(f.Blocks) == 1 { - testfiles[i].hash = fmt.Sprintf("%x", f.Blocks[0].Hash) + testfiles[i] = testfile{name: f.New.Name, length: f.New.FileSize()} + if len(f.New.Blocks) == 1 { + testfiles[i].hash = fmt.Sprintf("%x", f.New.Blocks[0].Hash) } } return testfiles @@ -465,13 +464,13 @@ func TestStopWalk(t *testing.T) { for { f := <-fchan t.Log("Scanned", f) - if f.IsDirectory() { - if len(f.Name) == 0 || f.Permissions == 0 { + if f.New.IsDirectory() { + if len(f.New.Name) == 0 || f.New.Permissions == 0 { t.Error("Bad directory entry", f) } dirs++ } else { - if len(f.Name) == 0 || len(f.Blocks) == 0 || f.Permissions == 0 { + if len(f.New.Name) == 0 || len(f.New.Blocks) == 0 || f.New.Permissions == 0 { t.Error("Bad file entry", f) } files++ @@ -529,3 +528,69 @@ func verify(r io.Reader, blocksize int, blocks []protocol.BlockInfo) error { return nil } + +// The following (randomish) scenario produced an error uncovered by integration tests +func TestWalkIntegration(t *testing.T) { + tmpDir, err := ioutil.TempDir(".", "_request-") + if err != nil { + panic("Failed to create temporary testing dir") + } + defer os.RemoveAll(tmpDir) + + fs := fs.NewFilesystem(fs.FilesystemTypeBasic, tmpDir) + fs.Mkdir("a", 0777) + toDel := filepath.Join("a", "b") + for _, f := range []string{"b", toDel} { + fi, err := fs.Create(f) + if err != nil { + panic(err) + } + fi.Close() + } + + conf := Config{ + Filesystem: fs, + BlockSize: 128 * 1024, + Hashers: 2, + } + + rchan := Walk(context.TODO(), conf) + + var res []ScanResult + for r := range rchan { + res = append(res, r) + } + sort.Sort(fileList(res)) + thw := make([]protocol.FileInfo, 0, len(res)) + for _, r := range res { + thw = append(thw, r.New) + } + conf.Have = testHaveWalker(thw) + + if err = fs.Remove(toDel); err != nil { + panic(err) + } + + rchan = Walk(context.TODO(), conf) + + for r := range rchan { + if r.New.Name != toDel { + t.Fatalf("Received unexpected result %v", r) + } + } +} + +type testHaveWalker []protocol.FileInfo + +func (thw testHaveWalker) Walk(prefix string, ctx context.Context, out chan<- protocol.FileInfo) { + if prefix != "" { + panic("cannot walk with prefix") + } + for _, f := range thw { + select { + case out <- f: + case <-ctx.Done(): + return + } + } +}