syncthing/lib/model/requests_test.go

1465 lines
38 KiB
Go
Raw Normal View History

// Copyright (C) 2016 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package model
import (
"bytes"
"context"
"errors"
"io"
"path/filepath"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/syncthing/syncthing/lib/build"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/rand"
)
func TestRequestSimple(t *testing.T) {
// Verify that the model performs a request and creates a file based on
// an incoming index update.
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
tfs := fcfg.Filesystem(nil)
defer cleanupModelAndRemoveDir(m, tfs.URI())
// We listen for incoming index updates and trigger when we see one for
// the expected test file.
done := make(chan struct{})
fc.setIndexFn(func(_ context.Context, folder string, fs []protocol.FileInfo) error {
select {
case <-done:
t.Error("More than one index update sent")
default:
}
for _, f := range fs {
if f.Name == "testfile" {
close(done)
return nil
}
}
return nil
})
// Send an update for the test file, wait for it to sync and be reported back.
contents := []byte("test file contents\n")
fc.addFile("testfile", 0o644, protocol.FileInfoTypeFile, contents)
fc.addFile("testfile", 0o644, protocol.FileInfoTypeFile, contents)
fc.sendIndexUpdate()
select {
case <-done:
case <-time.After(10 * time.Second):
t.Fatal("timed out")
}
// Verify the contents
if err := equalContents(tfs, "testfile", contents); err != nil {
t.Error("File did not sync correctly:", err)
}
}
func TestSymlinkTraversalRead(t *testing.T) {
// Verify that a symlink can not be traversed for reading.
if build.IsWindows {
t.Skip("no symlink support on CI")
return
}
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem(nil).URI())
// We listen for incoming index updates and trigger when we see one for
// the expected test file.
done := make(chan struct{})
fc.setIndexFn(func(_ context.Context, folder string, fs []protocol.FileInfo) error {
select {
case <-done:
t.Error("More than one index update sent")
default:
}
for _, f := range fs {
if f.Name == "symlink" {
close(done)
return nil
}
}
return nil
})
// Send an update for the symlink, wait for it to sync and be reported back.
contents := []byte("..")
fc.addFile("symlink", 0o644, protocol.FileInfoTypeSymlink, contents)
fc.sendIndexUpdate()
<-done
// Request a file by traversing the symlink
lib/protocol: Refactor interface (#9375) This is a refactor of the protocol/model interface to take the actual message as the parameter, instead of the broken-out fields: ```diff type Model interface { // An index was received from the peer device - Index(conn Connection, folder string, files []FileInfo) error + Index(conn Connection, idx *Index) error // An index update was received from the peer device - IndexUpdate(conn Connection, folder string, files []FileInfo) error + IndexUpdate(conn Connection, idxUp *IndexUpdate) error // A request was made by the peer device - Request(conn Connection, folder, name string, blockNo, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error) + Request(conn Connection, req *Request) (RequestResponse, error) // A cluster configuration message was received - ClusterConfig(conn Connection, config ClusterConfig) error + ClusterConfig(conn Connection, config *ClusterConfig) error // The peer device closed the connection or an error occurred Closed(conn Connection, err error) // The peer device sent progress updates for the files it is currently downloading - DownloadProgress(conn Connection, folder string, updates []FileDownloadProgressUpdate) error + DownloadProgress(conn Connection, p *DownloadProgress) error } ``` (and changing the `ClusterConfig` to `*ClusterConfig` for symmetry; we'll be forced to use all pointers everywhere at some point anyway...) The reason for this is that I have another thing cooking which is a small troubleshooting change to check index consistency during transfer. This required adding a field or two to the index/indexupdate messages, and plumbing the extra parameters in umpteen changes is almost as big a diff as this is. I figured let's do it once and avoid having to do that in the future again... The rest of the diff falls out of the change above, much of it being in test code where we run these methods manually...
2024-01-31 08:18:27 +01:00
res, err := m.Request(device1Conn, &protocol.Request{Folder: "default", Name: "symlink/requests_test.go", Size: 10})
if err == nil || res != nil {
t.Error("Managed to traverse symlink")
}
}
func TestSymlinkTraversalWrite(t *testing.T) {
// Verify that a symlink can not be traversed for writing.
if build.IsWindows {
t.Skip("no symlink support on CI")
return
}
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem(nil).URI())
// We listen for incoming index updates and trigger when we see one for
// the expected names.
done := make(chan struct{}, 1)
badReq := make(chan string, 1)
badIdx := make(chan string, 1)
fc.setIndexFn(func(_ context.Context, folder string, fs []protocol.FileInfo) error {
for _, f := range fs {
if f.Name == "symlink" {
done <- struct{}{}
return nil
}
if strings.HasPrefix(f.Name, "symlink") {
badIdx <- f.Name
return nil
}
}
return nil
})
fc.RequestCalls(func(ctx context.Context, folder, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
if name != "symlink" && strings.HasPrefix(name, "symlink") {
badReq <- name
}
return fc.fileData[name], nil
})
// Send an update for the symlink, wait for it to sync and be reported back.
contents := []byte("..")
fc.addFile("symlink", 0o644, protocol.FileInfoTypeSymlink, contents)
fc.sendIndexUpdate()
<-done
// Send an update for things behind the symlink, wait for requests for
// blocks for any of them to come back, or index entries. Hopefully none
// of that should happen.
contents = []byte("testdata testdata\n")
fc.addFile("symlink/testfile", 0o644, protocol.FileInfoTypeFile, contents)
fc.addFile("symlink/testdir", 0o644, protocol.FileInfoTypeDirectory, contents)
fc.addFile("symlink/testsyml", 0o644, protocol.FileInfoTypeSymlink, contents)
fc.sendIndexUpdate()
select {
case name := <-badReq:
t.Fatal("Should not have requested the data for", name)
case name := <-badIdx:
t.Fatal("Should not have sent the index entry for", name)
case <-time.After(3 * time.Second):
// Unfortunately not much else to trigger on here. The puller sleep
// interval is 1s so if we didn't get any requests within two
// iterations we should be fine.
}
}
func TestRequestCreateTmpSymlink(t *testing.T) {
// Test that an update for a temporary file is invalidated
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem(nil).URI())
// We listen for incoming index updates and trigger when we see one for
// the expected test file.
goodIdx := make(chan struct{})
name := fs.TempName("testlink")
fc.setIndexFn(func(_ context.Context, folder string, fs []protocol.FileInfo) error {
for _, f := range fs {
if f.Name == name {
if f.IsInvalid() {
goodIdx <- struct{}{}
} else {
t.Error("Received index with non-invalid temporary file")
close(goodIdx)
}
return nil
}
}
return nil
})
// Send an update for the test file, wait for it to sync and be reported back.
fc.addFile(name, 0o644, protocol.FileInfoTypeSymlink, []byte(".."))
fc.sendIndexUpdate()
select {
case <-goodIdx:
case <-time.After(3 * time.Second):
t.Fatal("Timed out without index entry being sent")
}
}
func TestPullInvalidIgnoredSO(t *testing.T) {
2023-02-22 10:51:49 +01:00
t.Skip("flaky")
pullInvalidIgnored(t, config.FolderTypeSendOnly)
}
func TestPullInvalidIgnoredSR(t *testing.T) {
2023-02-22 10:51:49 +01:00
t.Skip("flaky")
pullInvalidIgnored(t, config.FolderTypeSendReceive)
}
// This test checks that (un-)ignored/invalid/deleted files are treated as expected.
func pullInvalidIgnored(t *testing.T, ft config.FolderType) {
w, wCancel := newConfigWrapper(defaultCfgWrapper.RawCopy())
defer wCancel()
fcfg := w.FolderList()[0]
fss := fcfg.Filesystem(nil)
fcfg.Type = ft
setFolder(t, w, fcfg)
m := setupModel(t, w)
defer cleanupModelAndRemoveDir(m, fss.URI())
folderIgnoresAlwaysReload(t, m, fcfg)
fc := addFakeConn(m, device1, fcfg.ID)
fc.folder = "default"
if err := m.SetIgnores("default", []string{"*ignored*"}); err != nil {
panic(err)
}
contents := []byte("test file contents\n")
otherContents := []byte("other test file contents\n")
invIgn := "invalid:ignored"
invDel := "invalid:deleted"
ign := "ignoredNonExisting"
ignExisting := "ignoredExisting"
fc.addFile(invIgn, 0o644, protocol.FileInfoTypeFile, contents)
fc.addFile(invDel, 0o644, protocol.FileInfoTypeFile, contents)
fc.deleteFile(invDel)
fc.addFile(ign, 0o644, protocol.FileInfoTypeFile, contents)
fc.addFile(ignExisting, 0o644, protocol.FileInfoTypeFile, contents)
writeFile(t, fss, ignExisting, otherContents)
done := make(chan struct{})
fc.setIndexFn(func(_ context.Context, folder string, fs []protocol.FileInfo) error {
expected := map[string]struct{}{invIgn: {}, ign: {}, ignExisting: {}}
for _, f := range fs {
if _, ok := expected[f.Name]; !ok {
t.Errorf("Unexpected file %v was added to index", f.Name)
}
if !f.IsInvalid() {
t.Errorf("File %v wasn't marked as invalid", f.Name)
}
delete(expected, f.Name)
}
for name := range expected {
t.Errorf("File %v wasn't added to index", name)
}
close(done)
return nil
})
sub := m.evLogger.Subscribe(events.FolderErrors)
defer sub.Unsubscribe()
fc.sendIndexUpdate()
select {
case ev := <-sub.C():
t.Fatalf("Errors while scanning/pulling: %v", ev)
case <-time.After(5 * time.Second):
t.Fatalf("timed out before index was received")
case <-done:
}
done = make(chan struct{})
expected := map[string]struct{}{ign: {}, ignExisting: {}}
var expectedMut sync.Mutex
// The indexes will normally arrive in one update, but it is possible
// that they arrive in separate ones.
fc.setIndexFn(func(_ context.Context, folder string, fs []protocol.FileInfo) error {
expectedMut.Lock()
for _, f := range fs {
_, ok := expected[f.Name]
if !ok {
t.Errorf("Unexpected file %v was updated in index", f.Name)
continue
}
if f.IsInvalid() {
t.Errorf("File %v is still marked as invalid", f.Name)
}
if f.Name == ign {
// The unignored deleted file should have an
// empty version, to make it not override
// existing global files.
if !f.Deleted {
t.Errorf("File %v was not marked as deleted", f.Name)
}
if len(f.Version.Counters) != 0 {
t.Errorf("File %v has version %v, expected empty", f.Name, f.Version)
}
} else {
// The unignored existing file should have a
// version with only a local counter, to make
// it conflict changed global files.
if f.Deleted {
t.Errorf("File %v is marked as deleted", f.Name)
}
if len(f.Version.Counters) != 1 || f.Version.Counter(myID.Short()) == 0 {
t.Errorf("File %v has version %v, expected one entry for ourselves", f.Name, f.Version)
}
}
delete(expected, f.Name)
}
if len(expected) == 0 {
close(done)
}
expectedMut.Unlock()
return nil
})
// Make sure pulling doesn't interfere, as index updates are racy and
// thus we cannot distinguish between scan and pull results.
fc.RequestCalls(func(ctx context.Context, folder, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
return nil, nil
})
if err := m.SetIgnores("default", []string{"*:ignored*"}); err != nil {
panic(err)
}
select {
case <-time.After(5 * time.Second):
expectedMut.Lock()
t.Fatal("timed out before receiving index updates for all existing files, missing", expected)
expectedMut.Unlock()
case <-done:
}
}
func TestIssue4841(t *testing.T) {
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem(nil).URI())
received := make(chan []protocol.FileInfo)
fc.setIndexFn(func(_ context.Context, _ string, fs []protocol.FileInfo) error {
received <- fs
return nil
})
checkReceived := func(fs []protocol.FileInfo) protocol.FileInfo {
t.Helper()
if len(fs) != 1 {
t.Fatalf("Sent index with %d files, should be 1", len(fs))
}
if fs[0].Name != "foo" {
t.Fatalf(`Sent index with file %v, should be "foo"`, fs[0].Name)
}
return fs[0]
}
// Setup file from remote that was ignored locally
runner, _ := m.folderRunners.Get(defaultFolderConfig.ID)
folder := runner.(*sendReceiveFolder)
folder.updateLocals([]protocol.FileInfo{{
Name: "foo",
Type: protocol.FileInfoTypeFile,
LocalFlags: protocol.FlagLocalIgnored,
Version: protocol.Vector{}.Update(device1.Short()),
}})
checkReceived(<-received)
// Scan without ignore patterns with "foo" not existing locally
if err := m.ScanFolder("default"); err != nil {
t.Fatal("Failed scanning:", err)
}
select {
case <-time.After(10 * time.Second):
t.Fatal("timed out")
case r := <-received:
f := checkReceived(r)
if !f.Version.Equal(protocol.Vector{}) {
t.Errorf("Got Version == %v, expected empty version", f.Version)
}
}
}
func TestRescanIfHaveInvalidContent(t *testing.T) {
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
tfs := fcfg.Filesystem(nil)
defer cleanupModelAndRemoveDir(m, tfs.URI())
payload := []byte("hello")
writeFile(t, tfs, "foo", payload)
received := make(chan []protocol.FileInfo)
fc.setIndexFn(func(_ context.Context, _ string, fs []protocol.FileInfo) error {
received <- fs
return nil
})
checkReceived := func(fs []protocol.FileInfo) protocol.FileInfo {
t.Helper()
if len(fs) != 1 {
t.Fatalf("Sent index with %d files, should be 1", len(fs))
}
if fs[0].Name != "foo" {
t.Fatalf(`Sent index with file %v, should be "foo"`, fs[0].Name)
}
return fs[0]
}
// Scan without ignore patterns with "foo" not existing locally
if err := m.ScanFolder("default"); err != nil {
t.Fatal("Failed scanning:", err)
}
f := checkReceived(<-received)
if f.Blocks[0].WeakHash != 103547413 {
t.Fatalf("unexpected weak hash: %d != 103547413", f.Blocks[0].WeakHash)
}
lib/protocol: Refactor interface (#9375) This is a refactor of the protocol/model interface to take the actual message as the parameter, instead of the broken-out fields: ```diff type Model interface { // An index was received from the peer device - Index(conn Connection, folder string, files []FileInfo) error + Index(conn Connection, idx *Index) error // An index update was received from the peer device - IndexUpdate(conn Connection, folder string, files []FileInfo) error + IndexUpdate(conn Connection, idxUp *IndexUpdate) error // A request was made by the peer device - Request(conn Connection, folder, name string, blockNo, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error) + Request(conn Connection, req *Request) (RequestResponse, error) // A cluster configuration message was received - ClusterConfig(conn Connection, config ClusterConfig) error + ClusterConfig(conn Connection, config *ClusterConfig) error // The peer device closed the connection or an error occurred Closed(conn Connection, err error) // The peer device sent progress updates for the files it is currently downloading - DownloadProgress(conn Connection, folder string, updates []FileDownloadProgressUpdate) error + DownloadProgress(conn Connection, p *DownloadProgress) error } ``` (and changing the `ClusterConfig` to `*ClusterConfig` for symmetry; we'll be forced to use all pointers everywhere at some point anyway...) The reason for this is that I have another thing cooking which is a small troubleshooting change to check index consistency during transfer. This required adding a field or two to the index/indexupdate messages, and plumbing the extra parameters in umpteen changes is almost as big a diff as this is. I figured let's do it once and avoid having to do that in the future again... The rest of the diff falls out of the change above, much of it being in test code where we run these methods manually...
2024-01-31 08:18:27 +01:00
res, err := m.Request(device1Conn, &protocol.Request{Folder: "default", Name: "foo", Size: len(payload), Hash: f.Blocks[0].Hash, WeakHash: f.Blocks[0].WeakHash})
if err != nil {
t.Fatal(err)
}
buf := res.Data()
if !bytes.Equal(buf, payload) {
t.Errorf("%s != %s", buf, payload)
}
payload = []byte("bye")
buf = make([]byte, len(payload))
writeFile(t, tfs, "foo", payload)
lib/protocol: Refactor interface (#9375) This is a refactor of the protocol/model interface to take the actual message as the parameter, instead of the broken-out fields: ```diff type Model interface { // An index was received from the peer device - Index(conn Connection, folder string, files []FileInfo) error + Index(conn Connection, idx *Index) error // An index update was received from the peer device - IndexUpdate(conn Connection, folder string, files []FileInfo) error + IndexUpdate(conn Connection, idxUp *IndexUpdate) error // A request was made by the peer device - Request(conn Connection, folder, name string, blockNo, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error) + Request(conn Connection, req *Request) (RequestResponse, error) // A cluster configuration message was received - ClusterConfig(conn Connection, config ClusterConfig) error + ClusterConfig(conn Connection, config *ClusterConfig) error // The peer device closed the connection or an error occurred Closed(conn Connection, err error) // The peer device sent progress updates for the files it is currently downloading - DownloadProgress(conn Connection, folder string, updates []FileDownloadProgressUpdate) error + DownloadProgress(conn Connection, p *DownloadProgress) error } ``` (and changing the `ClusterConfig` to `*ClusterConfig` for symmetry; we'll be forced to use all pointers everywhere at some point anyway...) The reason for this is that I have another thing cooking which is a small troubleshooting change to check index consistency during transfer. This required adding a field or two to the index/indexupdate messages, and plumbing the extra parameters in umpteen changes is almost as big a diff as this is. I figured let's do it once and avoid having to do that in the future again... The rest of the diff falls out of the change above, much of it being in test code where we run these methods manually...
2024-01-31 08:18:27 +01:00
_, err = m.Request(device1Conn, &protocol.Request{Folder: "default", Name: "foo", Size: len(payload), Hash: f.Blocks[0].Hash, WeakHash: f.Blocks[0].WeakHash})
if err == nil {
t.Fatalf("expected failure")
}
select {
case fs := <-received:
f := checkReceived(fs)
if f.Blocks[0].WeakHash != 41943361 {
t.Fatalf("unexpected weak hash: %d != 41943361", f.Blocks[0].WeakHash)
}
case <-time.After(time.Second):
t.Fatalf("timed out")
}
}
func TestParentDeletion(t *testing.T) {
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
testFs := fcfg.Filesystem(nil)
defer cleanupModelAndRemoveDir(m, testFs.URI())
parent := "foo"
child := filepath.Join(parent, "bar")
received := make(chan []protocol.FileInfo)
fc.addFile(parent, 0o777, protocol.FileInfoTypeDirectory, nil)
fc.addFile(child, 0o777, protocol.FileInfoTypeDirectory, nil)
fc.setIndexFn(func(_ context.Context, folder string, fs []protocol.FileInfo) error {
received <- fs
return nil
})
fc.sendIndexUpdate()
// Get back index from initial setup
select {
case fs := <-received:
if len(fs) != 2 {
t.Fatalf("Sent index with %d files, should be 2", len(fs))
}
case <-time.After(time.Second):
t.Fatalf("timed out")
}
// Delete parent dir
must(t, testFs.RemoveAll(parent))
// Scan only the child dir (not the parent)
if err := m.ScanFolderSubdirs("default", []string{child}); err != nil {
t.Fatal("Failed scanning:", err)
}
select {
case fs := <-received:
if len(fs) != 1 {
t.Fatalf("Sent index with %d files, should be 1", len(fs))
}
if fs[0].Name != child {
t.Fatalf(`Sent index with file "%v", should be "%v"`, fs[0].Name, child)
}
case <-time.After(time.Second):
t.Fatalf("timed out")
}
// Recreate the child dir on the remote
fc.updateFile(child, 0o777, protocol.FileInfoTypeDirectory, nil)
fc.sendIndexUpdate()
// Wait for the child dir to be recreated and sent to the remote
select {
case fs := <-received:
l.Debugln("sent:", fs)
found := false
for _, f := range fs {
if f.Name == child {
if f.Deleted {
t.Fatalf(`File "%v" is still deleted`, child)
}
found = true
}
}
if !found {
t.Fatalf(`File "%v" is missing in index`, child)
}
case <-time.After(5 * time.Second):
t.Fatalf("timed out")
}
}
// TestRequestSymlinkWindows checks that symlinks aren't marked as deleted on windows
// Issue: https://github.com/syncthing/syncthing/issues/5125
func TestRequestSymlinkWindows(t *testing.T) {
if !build.IsWindows {
t.Skip("windows specific test")
}
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
defer cleanupModelAndRemoveDir(m, fcfg.Filesystem(nil).URI())
received := make(chan []protocol.FileInfo)
fc.setIndexFn(func(_ context.Context, folder string, fs []protocol.FileInfo) error {
select {
case <-received:
t.Error("More than one index update sent")
default:
}
received <- fs
return nil
})
fc.addFile("link", 0o644, protocol.FileInfoTypeSymlink, nil)
fc.sendIndexUpdate()
select {
case fs := <-received:
close(received)
// expected first index
if len(fs) != 1 {
t.Fatalf("Expected just one file in index, got %v", fs)
}
f := fs[0]
if f.Name != "link" {
t.Fatalf(`Got file info with path "%v", expected "link"`, f.Name)
}
if !f.IsInvalid() {
t.Errorf(`File info was not marked as invalid`)
}
case <-time.After(time.Second):
t.Fatalf("timed out before pull was finished")
}
sub := m.evLogger.Subscribe(events.StateChanged | events.LocalIndexUpdated)
defer sub.Unsubscribe()
m.ScanFolder("default")
for {
select {
case ev := <-sub.C():
switch data := ev.Data.(map[string]interface{}); {
case ev.Type == events.LocalIndexUpdated:
t.Fatalf("Local index was updated unexpectedly: %v", data)
case ev.Type == events.StateChanged:
if data["from"] == "scanning" {
return
}
}
case <-time.After(5 * time.Second):
t.Fatalf("Timed out before scan finished")
}
}
}
func equalContents(fs fs.Filesystem, path string, contents []byte) error {
fd, err := fs.Open(path)
if err != nil {
return err
}
2023-06-19 08:50:53 +02:00
defer fd.Close()
bs, err := io.ReadAll(fd)
if err != nil {
return err
}
if !bytes.Equal(bs, contents) {
return errors.New("incorrect data")
}
return nil
}
func TestRequestRemoteRenameChanged(t *testing.T) {
2023-02-22 10:51:49 +01:00
t.Skip("flaky")
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
tfs := fcfg.Filesystem(nil)
defer cleanupModel(m)
received := make(chan []protocol.FileInfo)
fc.setIndexFn(func(_ context.Context, folder string, fs []protocol.FileInfo) error {
select {
case <-received:
t.Error("More than one index update sent")
default:
}
received <- fs
return nil
})
// setup
a := "a"
b := "b"
data := map[string][]byte{
a: []byte("aData"),
b: []byte("bData"),
}
for _, n := range [2]string{a, b} {
fc.addFile(n, 0o644, protocol.FileInfoTypeFile, data[n])
}
fc.sendIndexUpdate()
select {
case fs := <-received:
close(received)
if len(fs) != 2 {
t.Fatalf("Received index with %v indexes instead of 2", len(fs))
}
case <-time.After(10 * time.Second):
t.Fatal("timed out")
}
for _, n := range [2]string{a, b} {
must(t, equalContents(tfs, n, data[n]))
}
var gotA, gotB, gotConfl bool
done := make(chan struct{})
fc.setIndexFn(func(_ context.Context, folder string, fs []protocol.FileInfo) error {
select {
case <-done:
t.Error("Received more index updates than expected")
return nil
default:
}
for _, f := range fs {
switch {
case f.Name == a:
if gotA {
t.Error("Got more than one index update for", f.Name)
}
gotA = true
case f.Name == b:
if gotB {
t.Error("Got more than one index update for", f.Name)
}
if f.Version.Counter(fc.id.Short()) == 0 {
// This index entry might be superseded
// by the final one or sent before it separately.
break
}
gotB = true
case strings.HasPrefix(f.Name, "b.sync-conflict-"):
if gotConfl {
t.Error("Got more than one index update for conflicts of", f.Name)
}
gotConfl = true
default:
t.Error("Got unexpected file in index update", f.Name)
}
}
if gotA && gotB && gotConfl {
close(done)
}
return nil
})
fd, err := tfs.OpenFile(b, fs.OptReadWrite, 0o644)
if err != nil {
t.Fatal(err)
}
otherData := []byte("otherData")
if _, err = fd.Write(otherData); err != nil {
t.Fatal(err)
}
fd.Close()
// rename
fc.deleteFile(a)
fc.updateFile(b, 0o644, protocol.FileInfoTypeFile, data[a])
// Make sure the remote file for b is newer and thus stays global -> local conflict
fc.mut.Lock()
for i := range fc.files {
if fc.files[i].Name == b {
fc.files[i].ModifiedS += 100
break
}
}
fc.mut.Unlock()
fc.sendIndexUpdate()
select {
case <-done:
case <-time.After(10 * time.Second):
t.Errorf("timed out without receiving all expected index updates")
}
// Check outcome
tfs.Walk(".", func(path string, info fs.FileInfo, err error) error {
switch {
case path == a:
t.Errorf(`File "a" was not removed`)
case path == b:
if err := equalContents(tfs, b, data[a]); err != nil {
t.Error(`File "b" has unexpected content (renamed from a on remote)`)
}
case strings.HasPrefix(path, b+".sync-conflict-"):
if err := equalContents(tfs, path, otherData); err != nil {
t.Error(`Sync conflict of "b" has unexptected content`)
}
case path == "." || strings.HasPrefix(path, ".stfolder"):
default:
t.Error("Found unexpected file", path)
}
return nil
})
}
func TestRequestRemoteRenameConflict(t *testing.T) {
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
tfs := fcfg.Filesystem(nil)
defer cleanupModel(m)
recv := make(chan int)
fc.setIndexFn(func(_ context.Context, folder string, fs []protocol.FileInfo) error {
recv <- len(fs)
return nil
})
// setup
a := "a"
b := "b"
data := map[string][]byte{
a: []byte("aData"),
b: []byte("bData"),
}
for _, n := range [2]string{a, b} {
fc.addFile(n, 0o644, protocol.FileInfoTypeFile, data[n])
}
fc.sendIndexUpdate()
select {
case i := <-recv:
if i != 2 {
t.Fatalf("received %v items in index, expected 1", i)
}
case <-time.After(10 * time.Second):
t.Fatal("timed out")
}
for _, n := range [2]string{a, b} {
must(t, equalContents(tfs, n, data[n]))
}
fd, err := tfs.OpenFile(b, fs.OptReadWrite, 0o644)
if err != nil {
t.Fatal(err)
}
otherData := []byte("otherData")
if _, err = fd.Write(otherData); err != nil {
t.Fatal(err)
}
fd.Close()
m.ScanFolders()
select {
case i := <-recv:
if i != 1 {
t.Fatalf("received %v items in index, expected 1", i)
}
case <-time.After(10 * time.Second):
t.Fatal("timed out")
}
// make sure the following rename is more recent (not concurrent)
time.Sleep(2 * time.Second)
// rename
fc.deleteFile(a)
fc.updateFile(b, 0o644, protocol.FileInfoTypeFile, data[a])
fc.sendIndexUpdate()
select {
case <-recv:
case <-time.After(10 * time.Second):
t.Fatal("timed out")
}
// Check outcome
foundB := false
foundBConfl := false
tfs.Walk(".", func(path string, info fs.FileInfo, err error) error {
switch {
case path == a:
t.Errorf(`File "a" was not removed`)
case path == b:
foundB = true
case strings.HasPrefix(path, b) && strings.Contains(path, ".sync-conflict-"):
foundBConfl = true
}
return nil
})
if !foundB {
t.Errorf(`File "b" was removed`)
}
if !foundBConfl {
t.Errorf(`No conflict file for "b" was created`)
}
}
func TestRequestDeleteChanged(t *testing.T) {
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
tfs := fcfg.Filesystem(nil)
defer cleanupModelAndRemoveDir(m, tfs.URI())
done := make(chan struct{})
fc.setIndexFn(func(_ context.Context, folder string, fs []protocol.FileInfo) error {
select {
case <-done:
t.Error("More than one index update sent")
default:
}
close(done)
return nil
})
// setup
a := "a"
data := []byte("aData")
fc.addFile(a, 0o644, protocol.FileInfoTypeFile, data)
fc.sendIndexUpdate()
select {
case <-done:
done = make(chan struct{})
case <-time.After(10 * time.Second):
t.Fatal("timed out")
}
fc.setIndexFn(func(_ context.Context, folder string, fs []protocol.FileInfo) error {
select {
case <-done:
t.Error("More than one index update sent")
default:
}
close(done)
return nil
})
fd, err := tfs.OpenFile(a, fs.OptReadWrite, 0o644)
if err != nil {
t.Fatal(err)
}
otherData := []byte("otherData")
if _, err = fd.Write(otherData); err != nil {
t.Fatal(err)
}
fd.Close()
// rename
fc.deleteFile(a)
fc.sendIndexUpdate()
select {
case <-done:
case <-time.After(10 * time.Second):
t.Fatal("timed out")
}
// Check outcome
if _, err := tfs.Lstat(a); err != nil {
if fs.IsNotExist(err) {
t.Error(`Modified file "a" was removed`)
} else {
t.Error(`Error stating file "a":`, err)
}
}
}
func TestNeedFolderFiles(t *testing.T) {
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
defer cleanupModel(m)
sub := m.evLogger.Subscribe(events.RemoteIndexUpdated)
defer sub.Unsubscribe()
errPreventSync := errors.New("you aren't getting any of this")
fc.RequestCalls(func(ctx context.Context, folder, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
return nil, errPreventSync
})
data := []byte("foo")
num := 20
for i := 0; i < num; i++ {
fc.addFile(strconv.Itoa(i), 0o644, protocol.FileInfoTypeFile, data)
}
fc.sendIndexUpdate()
select {
case <-sub.C():
case <-time.After(5 * time.Second):
t.Fatal("Timed out before receiving index")
}
progress, queued, rest, err := m.NeedFolderFiles(fcfg.ID, 1, 100)
must(t, err)
if got := len(progress) + len(queued) + len(rest); got != num {
t.Errorf("Got %v needed items, expected %v", got, num)
}
exp := 10
for page := 1; page < 3; page++ {
progress, queued, rest, err := m.NeedFolderFiles(fcfg.ID, page, exp)
must(t, err)
if got := len(progress) + len(queued) + len(rest); got != exp {
t.Errorf("Got %v needed items on page %v, expected %v", got, page, exp)
}
}
}
// TestIgnoreDeleteUnignore checks that the deletion of an ignored file is not
// propagated upon un-ignoring.
// https://github.com/syncthing/syncthing/issues/6038
func TestIgnoreDeleteUnignore(t *testing.T) {
w, fcfg, wCancel := newDefaultCfgWrapper()
defer wCancel()
m := setupModel(t, w)
fss := fcfg.Filesystem(nil)
defer cleanupModel(m)
folderIgnoresAlwaysReload(t, m, fcfg)
m.ScanFolders()
fc := addFakeConn(m, device1, fcfg.ID)
fc.folder = "default"
fc.mut.Lock()
fc.mut.Unlock()
file := "foobar"
contents := []byte("test file contents\n")
basicCheck := func(fs []protocol.FileInfo) {
t.Helper()
if len(fs) != 1 {
t.Fatal("expected a single index entry, got", len(fs))
} else if fs[0].Name != file {
t.Fatalf("expected a index entry for %v, got one for %v", file, fs[0].Name)
}
}
done := make(chan struct{})
fc.setIndexFn(func(_ context.Context, folder string, fs []protocol.FileInfo) error {
basicCheck(fs)
close(done)
return nil
})
writeFile(t, fss, file, contents)
m.ScanFolders()
select {
case <-time.After(5 * time.Second):
t.Fatalf("timed out before index was received")
case <-done:
}
done = make(chan struct{})
fc.setIndexFn(func(_ context.Context, folder string, fs []protocol.FileInfo) error {
basicCheck(fs)
f := fs[0]
if !f.IsInvalid() {
t.Errorf("Received non-invalid index update")
}
close(done)
return nil
})
if err := m.SetIgnores("default", []string{"foobar"}); err != nil {
panic(err)
}
select {
case <-time.After(5 * time.Second):
t.Fatal("timed out before receiving index update")
case <-done:
}
done = make(chan struct{})
fc.setIndexFn(func(_ context.Context, folder string, fs []protocol.FileInfo) error {
basicCheck(fs)
f := fs[0]
if f.IsInvalid() {
t.Errorf("Received invalid index update")
}
if !f.Version.Equal(protocol.Vector{}) && f.Deleted {
t.Error("Received deleted index entry with non-empty version")
}
l.Infoln(f)
close(done)
return nil
})
if err := fss.Remove(file); err != nil {
t.Fatal(err)
}
if err := m.SetIgnores("default", []string{}); err != nil {
panic(err)
}
select {
case <-time.After(5 * time.Second):
t.Fatalf("timed out before index was received")
case <-done:
}
}
// TestRequestLastFileProgress checks that the last pulled file (here only) is registered
// as in progress.
func TestRequestLastFileProgress(t *testing.T) {
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
tfs := fcfg.Filesystem(nil)
defer cleanupModelAndRemoveDir(m, tfs.URI())
done := make(chan struct{})
fc.RequestCalls(func(ctx context.Context, folder, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
defer close(done)
progress, queued, rest, err := m.NeedFolderFiles(folder, 1, 10)
must(t, err)
if len(queued)+len(rest) != 0 {
t.Error(`There should not be any queued or "rest" items`)
}
if len(progress) != 1 {
t.Error("Expected exactly one item in progress.")
}
return fc.fileData[name], nil
})
contents := []byte("test file contents\n")
fc.addFile("testfile", 0o644, protocol.FileInfoTypeFile, contents)
fc.sendIndexUpdate()
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("Timed out before file was requested")
}
}
func TestRequestIndexSenderPause(t *testing.T) {
done := make(chan struct{})
defer close(done)
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
tfs := fcfg.Filesystem(nil)
defer cleanupModelAndRemoveDir(m, tfs.URI())
indexChan := make(chan []protocol.FileInfo)
fc.setIndexFn(func(ctx context.Context, folder string, fs []protocol.FileInfo) error {
select {
case indexChan <- fs:
case <-done:
case <-ctx.Done():
}
return nil
})
var seq int64 = 1
files := []protocol.FileInfo{{Name: "foo", Size: 10, Version: protocol.Vector{}.Update(myID.Short()), Sequence: seq}}
// Both devices connected, none paused
localIndexUpdate(m, fcfg.ID, files)
select {
case <-time.After(5 * time.Second):
t.Fatal("timed out before receiving index")
case <-indexChan:
}
// Remote paused
cc := basicClusterConfig(device1, myID, fcfg.ID)
cc.Folders[0].Paused = true
m.ClusterConfig(fc, cc)
seq++
files[0].Sequence = seq
files[0].Version = files[0].Version.Update(myID.Short())
localIndexUpdate(m, fcfg.ID, files)
// I don't see what to hook into to ensure an index update is not sent.
dur := 50 * time.Millisecond
if !testing.Short() {
dur = 2 * time.Second
}
select {
case <-time.After(dur):
case <-indexChan:
t.Error("Received index despite remote being paused")
}
// Remote unpaused
cc.Folders[0].Paused = false
m.ClusterConfig(fc, cc)
select {
case <-time.After(5 * time.Second):
t.Fatal("timed out before receiving index")
case <-indexChan:
}
// Local paused and resume
pauseFolder(t, m.cfg, fcfg.ID, true)
pauseFolder(t, m.cfg, fcfg.ID, false)
seq++
files[0].Sequence = seq
files[0].Version = files[0].Version.Update(myID.Short())
localIndexUpdate(m, fcfg.ID, files)
select {
case <-time.After(5 * time.Second):
t.Fatal("timed out before receiving index")
case <-indexChan:
}
// Local and remote paused, then first resume remote, then local
cc.Folders[0].Paused = true
m.ClusterConfig(fc, cc)
pauseFolder(t, m.cfg, fcfg.ID, true)
cc.Folders[0].Paused = false
m.ClusterConfig(fc, cc)
pauseFolder(t, m.cfg, fcfg.ID, false)
seq++
files[0].Sequence = seq
files[0].Version = files[0].Version.Update(myID.Short())
localIndexUpdate(m, fcfg.ID, files)
select {
case <-time.After(5 * time.Second):
t.Fatal("timed out before receiving index")
case <-indexChan:
}
// Folder removed on remote
lib/protocol: Refactor interface (#9375) This is a refactor of the protocol/model interface to take the actual message as the parameter, instead of the broken-out fields: ```diff type Model interface { // An index was received from the peer device - Index(conn Connection, folder string, files []FileInfo) error + Index(conn Connection, idx *Index) error // An index update was received from the peer device - IndexUpdate(conn Connection, folder string, files []FileInfo) error + IndexUpdate(conn Connection, idxUp *IndexUpdate) error // A request was made by the peer device - Request(conn Connection, folder, name string, blockNo, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error) + Request(conn Connection, req *Request) (RequestResponse, error) // A cluster configuration message was received - ClusterConfig(conn Connection, config ClusterConfig) error + ClusterConfig(conn Connection, config *ClusterConfig) error // The peer device closed the connection or an error occurred Closed(conn Connection, err error) // The peer device sent progress updates for the files it is currently downloading - DownloadProgress(conn Connection, folder string, updates []FileDownloadProgressUpdate) error + DownloadProgress(conn Connection, p *DownloadProgress) error } ``` (and changing the `ClusterConfig` to `*ClusterConfig` for symmetry; we'll be forced to use all pointers everywhere at some point anyway...) The reason for this is that I have another thing cooking which is a small troubleshooting change to check index consistency during transfer. This required adding a field or two to the index/indexupdate messages, and plumbing the extra parameters in umpteen changes is almost as big a diff as this is. I figured let's do it once and avoid having to do that in the future again... The rest of the diff falls out of the change above, much of it being in test code where we run these methods manually...
2024-01-31 08:18:27 +01:00
cc = &protocol.ClusterConfig{}
m.ClusterConfig(fc, cc)
seq++
files[0].Sequence = seq
files[0].Version = files[0].Version.Update(myID.Short())
localIndexUpdate(m, fcfg.ID, files)
select {
case <-time.After(dur):
case <-indexChan:
t.Error("Received index despite remote not having the folder")
}
}
func TestRequestIndexSenderClusterConfigBeforeStart(t *testing.T) {
w, fcfg, wCancel := newDefaultCfgWrapper()
defer wCancel()
tfs := fcfg.Filesystem(nil)
dir1 := "foo"
dir2 := "bar"
// Initialise db with an entry and then stop everything again
must(t, tfs.Mkdir(dir1, 0o777))
m := newModel(t, w, myID, nil)
defer cleanupModelAndRemoveDir(m, tfs.URI())
m.ServeBackground()
m.ScanFolders()
m.cancel()
<-m.stopped
// Add connection (sends incoming cluster config) before starting the new model
m = &testModel{
model: NewModel(m.cfg, m.id, m.db, m.protectedFiles, m.evLogger, protocol.NewKeyGenerator()).(*model),
evCancel: m.evCancel,
stopped: make(chan struct{}),
}
defer cleanupModel(m)
fc := addFakeConn(m, device1, fcfg.ID)
done := make(chan struct{})
defer close(done) // Must be the last thing to be deferred, thus first to run.
indexChan := make(chan []protocol.FileInfo, 1)
ccChan := make(chan protocol.ClusterConfig, 1)
fc.setIndexFn(func(_ context.Context, folder string, fs []protocol.FileInfo) error {
select {
case indexChan <- fs:
case <-done:
}
return nil
})
fc.ClusterConfigCalls(func(cc protocol.ClusterConfig) {
select {
case ccChan <- cc:
case <-done:
}
})
m.ServeBackground()
timeout := time.After(5 * time.Second)
// Check that cluster-config is resent after adding folders when starting model
select {
case <-timeout:
t.Fatal("timed out before receiving cluster-config")
case <-ccChan:
}
// Check that an index is sent for the newly added item
must(t, tfs.Mkdir(dir2, 0o777))
m.ScanFolders()
select {
case <-timeout:
t.Fatal("timed out before receiving index")
case <-indexChan:
}
}
func TestRequestReceiveEncrypted(t *testing.T) {
if testing.Short() {
t.Skip("skipping on short testing - scrypt is too slow")
}
w, fcfg, wCancel := newDefaultCfgWrapper()
defer wCancel()
tfs := fcfg.Filesystem(nil)
fcfg.Type = config.FolderTypeReceiveEncrypted
setFolder(t, w, fcfg)
encToken := protocol.PasswordToken(protocol.NewKeyGenerator(), fcfg.ID, "pw")
must(t, writeEncryptionToken(encToken, fcfg))
m := setupModel(t, w)
defer cleanupModelAndRemoveDir(m, tfs.URI())
files := genFiles(2)
files[1].LocalFlags = protocol.FlagLocalReceiveOnly
m.mut.RLock()
fset := m.folderFiles[fcfg.ID]
m.mut.RUnlock()
fset.Update(protocol.LocalDeviceID, files)
indexChan := make(chan []protocol.FileInfo, 10)
done := make(chan struct{})
defer close(done)
fc := newFakeConnection(device1, m)
fc.folder = fcfg.ID
fc.setIndexFn(func(_ context.Context, _ string, fs []protocol.FileInfo) error {
select {
case indexChan <- fs:
case <-done:
}
return nil
})
m.AddConnection(fc, protocol.Hello{})
lib/protocol: Refactor interface (#9375) This is a refactor of the protocol/model interface to take the actual message as the parameter, instead of the broken-out fields: ```diff type Model interface { // An index was received from the peer device - Index(conn Connection, folder string, files []FileInfo) error + Index(conn Connection, idx *Index) error // An index update was received from the peer device - IndexUpdate(conn Connection, folder string, files []FileInfo) error + IndexUpdate(conn Connection, idxUp *IndexUpdate) error // A request was made by the peer device - Request(conn Connection, folder, name string, blockNo, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error) + Request(conn Connection, req *Request) (RequestResponse, error) // A cluster configuration message was received - ClusterConfig(conn Connection, config ClusterConfig) error + ClusterConfig(conn Connection, config *ClusterConfig) error // The peer device closed the connection or an error occurred Closed(conn Connection, err error) // The peer device sent progress updates for the files it is currently downloading - DownloadProgress(conn Connection, folder string, updates []FileDownloadProgressUpdate) error + DownloadProgress(conn Connection, p *DownloadProgress) error } ``` (and changing the `ClusterConfig` to `*ClusterConfig` for symmetry; we'll be forced to use all pointers everywhere at some point anyway...) The reason for this is that I have another thing cooking which is a small troubleshooting change to check index consistency during transfer. This required adding a field or two to the index/indexupdate messages, and plumbing the extra parameters in umpteen changes is almost as big a diff as this is. I figured let's do it once and avoid having to do that in the future again... The rest of the diff falls out of the change above, much of it being in test code where we run these methods manually...
2024-01-31 08:18:27 +01:00
m.ClusterConfig(fc, &protocol.ClusterConfig{
Folders: []protocol.Folder{
{
ID: "default",
Devices: []protocol.Device{
{
ID: myID,
EncryptionPasswordToken: encToken,
},
{ID: device1},
},
},
},
})
select {
case fs := <-indexChan:
if len(fs) != 1 {
t.Error("Expected index with one file, got", fs)
}
if got := fs[0].Name; got != files[0].Name {
t.Errorf("Expected file %v, got %v", got, files[0].Name)
}
case <-time.After(5 * time.Second):
t.Fatal("timed out before receiving index")
}
// Detects deletion, as we never really created the file on disk
// Shouldn't send anything because receive-encrypted
must(t, m.ScanFolder(fcfg.ID))
// One real file to be sent
name := "foo"
data := make([]byte, 2000)
rand.Read(data)
fc.addFile(name, 0o664, protocol.FileInfoTypeFile, data)
fc.sendIndexUpdate()
select {
case fs := <-indexChan:
if len(fs) != 1 {
t.Error("Expected index with one file, got", fs)
}
if got := fs[0].Name; got != name {
t.Errorf("Expected file %v, got %v", got, files[0].Name)
}
case <-time.After(5 * time.Second):
t.Fatal("timed out before receiving index")
}
// Simulate request from device that is untrusted too, i.e. with non-empty, but garbage hash
lib/protocol: Refactor interface (#9375) This is a refactor of the protocol/model interface to take the actual message as the parameter, instead of the broken-out fields: ```diff type Model interface { // An index was received from the peer device - Index(conn Connection, folder string, files []FileInfo) error + Index(conn Connection, idx *Index) error // An index update was received from the peer device - IndexUpdate(conn Connection, folder string, files []FileInfo) error + IndexUpdate(conn Connection, idxUp *IndexUpdate) error // A request was made by the peer device - Request(conn Connection, folder, name string, blockNo, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error) + Request(conn Connection, req *Request) (RequestResponse, error) // A cluster configuration message was received - ClusterConfig(conn Connection, config ClusterConfig) error + ClusterConfig(conn Connection, config *ClusterConfig) error // The peer device closed the connection or an error occurred Closed(conn Connection, err error) // The peer device sent progress updates for the files it is currently downloading - DownloadProgress(conn Connection, folder string, updates []FileDownloadProgressUpdate) error + DownloadProgress(conn Connection, p *DownloadProgress) error } ``` (and changing the `ClusterConfig` to `*ClusterConfig` for symmetry; we'll be forced to use all pointers everywhere at some point anyway...) The reason for this is that I have another thing cooking which is a small troubleshooting change to check index consistency during transfer. This required adding a field or two to the index/indexupdate messages, and plumbing the extra parameters in umpteen changes is almost as big a diff as this is. I figured let's do it once and avoid having to do that in the future again... The rest of the diff falls out of the change above, much of it being in test code where we run these methods manually...
2024-01-31 08:18:27 +01:00
_, err := m.Request(fc, &protocol.Request{Folder: fcfg.ID, Name: name, Size: 1064, Hash: []byte("garbage")})
must(t, err)
changed, err := m.LocalChangedFolderFiles(fcfg.ID, 1, 10)
must(t, err)
if l := len(changed); l != 1 {
t.Errorf("Expected one locally changed file, got %v", l)
} else if changed[0].Name != files[0].Name {
t.Errorf("Expected %v, got %v", files[0].Name, changed[0].Name)
}
}
func TestRequestGlobalInvalidToValid(t *testing.T) {
done := make(chan struct{})
defer close(done)
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
defer wcfgCancel()
fcfg.Devices = append(fcfg.Devices, config.FolderDeviceConfiguration{DeviceID: device2})
waiter, err := m.cfg.Modify(func(cfg *config.Configuration) {
cfg.SetDevice(newDeviceConfiguration(cfg.Defaults.Device, device2, "device2"))
fcfg.Devices = append(fcfg.Devices, config.FolderDeviceConfiguration{DeviceID: device2})
cfg.SetFolder(fcfg)
})
must(t, err)
waiter.Wait()
conn := addFakeConn(m, device2, fcfg.ID)
tfs := fcfg.Filesystem(nil)
defer cleanupModelAndRemoveDir(m, tfs.URI())
indexChan := make(chan []protocol.FileInfo, 1)
fc.setIndexFn(func(ctx context.Context, folder string, fs []protocol.FileInfo) error {
select {
case indexChan <- fs:
case <-done:
case <-ctx.Done():
}
return nil
})
name := "foo"
// Setup device with valid file, do not send index yet
contents := []byte("test file contents\n")
fc.addFile(name, 0o644, protocol.FileInfoTypeFile, contents)
// Third device ignoring the same file
fc.mut.Lock()
file := fc.files[0]
fc.mut.Unlock()
file.SetIgnored()
lib/protocol: Refactor interface (#9375) This is a refactor of the protocol/model interface to take the actual message as the parameter, instead of the broken-out fields: ```diff type Model interface { // An index was received from the peer device - Index(conn Connection, folder string, files []FileInfo) error + Index(conn Connection, idx *Index) error // An index update was received from the peer device - IndexUpdate(conn Connection, folder string, files []FileInfo) error + IndexUpdate(conn Connection, idxUp *IndexUpdate) error // A request was made by the peer device - Request(conn Connection, folder, name string, blockNo, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error) + Request(conn Connection, req *Request) (RequestResponse, error) // A cluster configuration message was received - ClusterConfig(conn Connection, config ClusterConfig) error + ClusterConfig(conn Connection, config *ClusterConfig) error // The peer device closed the connection or an error occurred Closed(conn Connection, err error) // The peer device sent progress updates for the files it is currently downloading - DownloadProgress(conn Connection, folder string, updates []FileDownloadProgressUpdate) error + DownloadProgress(conn Connection, p *DownloadProgress) error } ``` (and changing the `ClusterConfig` to `*ClusterConfig` for symmetry; we'll be forced to use all pointers everywhere at some point anyway...) The reason for this is that I have another thing cooking which is a small troubleshooting change to check index consistency during transfer. This required adding a field or two to the index/indexupdate messages, and plumbing the extra parameters in umpteen changes is almost as big a diff as this is. I figured let's do it once and avoid having to do that in the future again... The rest of the diff falls out of the change above, much of it being in test code where we run these methods manually...
2024-01-31 08:18:27 +01:00
m.IndexUpdate(conn, &protocol.IndexUpdate{Folder: fcfg.ID, Files: []protocol.FileInfo{prepareFileInfoForIndex(file)}})
// Wait for the ignored file to be received and possible pulled
timeout := time.After(10 * time.Second)
globalUpdated := false
for {
select {
case <-timeout:
t.Fatalf("timed out (globalUpdated == %v)", globalUpdated)
default:
time.Sleep(10 * time.Millisecond)
}
if !globalUpdated {
_, ok, err := m.CurrentGlobalFile(fcfg.ID, name)
if err != nil {
t.Fatal(err)
}
if !ok {
continue
}
globalUpdated = true
}
snap, err := m.DBSnapshot(fcfg.ID)
if err != nil {
t.Fatal(err)
}
need := snap.NeedSize(protocol.LocalDeviceID)
snap.Release()
if need.Files == 0 {
break
}
}
// Send the valid file
fc.sendIndexUpdate()
gotInvalid := false
for {
select {
case <-timeout:
t.Fatal("timed out before receiving index")
case fs := <-indexChan:
if len(fs) != 1 {
t.Fatalf("Expected one file in index, got %v", len(fs))
}
if !fs[0].IsInvalid() {
return
}
if gotInvalid {
t.Fatal("Received two invalid index updates")
}
t.Log("got index with invalid file")
gotInvalid = true
}
}
}