syncthing/lib/watchaggregator/aggregator_test.go

322 lines
9.6 KiB
Go

// Copyright (C) 2016 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package watchaggregator
import (
"context"
"os"
"path/filepath"
"runtime"
"strconv"
"testing"
"time"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/fs"
)
func TestMain(m *testing.M) {
maxFiles = 32
maxFilesPerDir = 8
ret := m.Run()
maxFiles = 512
maxFilesPerDir = 128
os.Exit(ret)
}
const (
testNotifyDelayS = 1
testNotifyTimeout = 2 * time.Second
timeoutWithinBatch = time.Second
)
var (
folderRoot = filepath.Clean("/home/someuser/syncthing")
defaultFolderCfg = config.FolderConfiguration{
FilesystemType: fs.FilesystemTypeBasic,
Path: folderRoot,
FSWatcherDelayS: testNotifyDelayS,
}
defaultCfg = config.Wrap("", config.Configuration{
Folders: []config.FolderConfiguration{defaultFolderCfg},
})
)
// Represents possibly multiple (different event types) expected paths from
// aggregation, that should be received back to back.
type expectedBatch struct {
paths [][]string
afterMs int
beforeMs int
}
// TestAggregate checks whether maxFilesPerDir+1 events in one dir are
// aggregated to parent dir
func TestAggregate(t *testing.T) {
inProgress := make(map[string]struct{})
folderCfg := defaultFolderCfg.Copy()
folderCfg.ID = "Aggregate"
ctx, _ := context.WithCancel(context.Background())
a := newAggregator(folderCfg, ctx)
// checks whether maxFilesPerDir events in one dir are kept as is
for i := 0; i < maxFilesPerDir; i++ {
a.newEvent(fs.Event{filepath.Join("parent", strconv.Itoa(i)), fs.NonRemove}, inProgress)
}
if l := len(getEventPaths(a.root, ".", a)); l != maxFilesPerDir {
t.Errorf("Unexpected number of events stored, got %v, expected %v", l, maxFilesPerDir)
}
// checks whether maxFilesPerDir+1 events in one dir are aggregated to parent dir
a.newEvent(fs.Event{filepath.Join("parent", "new"), fs.NonRemove}, inProgress)
compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"})
// checks that adding an event below "parent" does not change anything
a.newEvent(fs.Event{filepath.Join("parent", "extra"), fs.NonRemove}, inProgress)
compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"})
// again test aggregation in "parent" but with event in subdirs
a = newAggregator(folderCfg, ctx)
for i := 0; i < maxFilesPerDir; i++ {
a.newEvent(fs.Event{filepath.Join("parent", strconv.Itoa(i)), fs.NonRemove}, inProgress)
}
a.newEvent(fs.Event{filepath.Join("parent", "sub", "new"), fs.NonRemove}, inProgress)
compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"})
// test aggregation in root
a = newAggregator(folderCfg, ctx)
for i := 0; i < maxFiles; i++ {
a.newEvent(fs.Event{strconv.Itoa(i), fs.NonRemove}, inProgress)
}
if len(getEventPaths(a.root, ".", a)) != maxFiles {
t.Errorf("Unexpected number of events stored in root")
}
a.newEvent(fs.Event{filepath.Join("parent", "sub", "new"), fs.NonRemove}, inProgress)
compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"."})
// checks that adding an event when "." is already stored is a noop
a.newEvent(fs.Event{"anythingelse", fs.NonRemove}, inProgress)
compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"."})
a = newAggregator(folderCfg, ctx)
filesPerDir := maxFilesPerDir / 2
dirs := make([]string, maxFiles/filesPerDir+1)
for i := 0; i < maxFiles/filesPerDir+1; i++ {
dirs[i] = "dir" + strconv.Itoa(i)
}
for _, dir := range dirs {
for i := 0; i < filesPerDir; i++ {
a.newEvent(fs.Event{filepath.Join(dir, strconv.Itoa(i)), fs.NonRemove}, inProgress)
}
}
compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"."})
}
// TestInProgress checks that ignoring files currently edited by Syncthing works
func TestInProgress(t *testing.T) {
testCase := func(c chan<- fs.Event) {
events.Default.Log(events.ItemStarted, map[string]string{
"item": "inprogress",
})
sleepMs(100)
c <- fs.Event{Name: "inprogress", Type: fs.NonRemove}
sleepMs(1000)
events.Default.Log(events.ItemFinished, map[string]interface{}{
"item": "inprogress",
})
sleepMs(100)
c <- fs.Event{Name: "notinprogress", Type: fs.NonRemove}
sleepMs(800)
}
expectedBatches := []expectedBatch{
{[][]string{{"notinprogress"}}, 2000, 3500},
}
testScenario(t, "InProgress", testCase, expectedBatches)
}
// TestDelay checks that recurring changes to the same path are delayed
// and different types separated and ordered correctly
func TestDelay(t *testing.T) {
file := filepath.Join("parent", "file")
delayed := "delayed"
del := "deleted"
delAfter := "deletedAfter"
both := filepath.Join("parent", "sub", "both")
testCase := func(c chan<- fs.Event) {
sleepMs(200)
c <- fs.Event{Name: file, Type: fs.NonRemove}
delay := time.Duration(300) * time.Millisecond
timer := time.NewTimer(delay)
<-timer.C
timer.Reset(delay)
c <- fs.Event{Name: delayed, Type: fs.NonRemove}
c <- fs.Event{Name: both, Type: fs.NonRemove}
c <- fs.Event{Name: both, Type: fs.Remove}
c <- fs.Event{Name: del, Type: fs.Remove}
for i := 0; i < 9; i++ {
<-timer.C
timer.Reset(delay)
c <- fs.Event{Name: delayed, Type: fs.NonRemove}
}
c <- fs.Event{Name: delAfter, Type: fs.Remove}
<-timer.C
}
// batches that we expect to receive with time interval in milliseconds
expectedBatches := []expectedBatch{
{[][]string{{file}}, 500, 2500},
{[][]string{{delayed}, {both}, {del}}, 2500, 4500},
{[][]string{{delayed}, {delAfter}}, 3600, 7000},
}
testScenario(t, "Delay", testCase, expectedBatches)
}
func getEventPaths(dir *eventDir, dirPath string, a *aggregator) []string {
var paths []string
for childName, childDir := range dir.dirs {
for _, path := range getEventPaths(childDir, filepath.Join(dirPath, childName), a) {
paths = append(paths, path)
}
}
for name := range dir.events {
paths = append(paths, filepath.Join(dirPath, name))
}
return paths
}
func sleepMs(ms int) {
time.Sleep(time.Duration(ms) * time.Millisecond)
}
func durationMs(ms int) time.Duration {
return time.Duration(ms) * time.Millisecond
}
func compareBatchToExpected(batch []string, expectedPaths []string) (missing []string, unexpected []string) {
for _, expected := range expectedPaths {
expected = filepath.Clean(expected)
found := false
for i, received := range batch {
if expected == received {
found = true
batch = append(batch[:i], batch[i+1:]...)
break
}
}
if !found {
missing = append(missing, expected)
}
}
for _, received := range batch {
unexpected = append(unexpected, received)
}
return missing, unexpected
}
func compareBatchToExpectedDirect(t *testing.T, batch []string, expectedPaths []string) {
t.Helper()
missing, unexpected := compareBatchToExpected(batch, expectedPaths)
for _, p := range missing {
t.Errorf("Did not receive event %s", p)
}
for _, p := range unexpected {
t.Errorf("Received unexpected event %s", p)
}
}
func testScenario(t *testing.T, name string, testCase func(c chan<- fs.Event), expectedBatches []expectedBatch) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
eventChan := make(chan fs.Event)
watchChan := make(chan []string)
folderCfg := defaultFolderCfg.Copy()
folderCfg.ID = name
a := newAggregator(folderCfg, ctx)
a.notifyTimeout = testNotifyTimeout
startTime := time.Now()
go a.mainLoop(eventChan, watchChan, defaultCfg)
sleepMs(20)
go testCase(eventChan)
testAggregatorOutput(t, watchChan, expectedBatches, startTime)
cancel()
}
func testAggregatorOutput(t *testing.T, fsWatchChan <-chan []string, expectedBatches []expectedBatch, startTime time.Time) {
t.Helper()
var received []string
var elapsedTime time.Duration
var batchIndex, innerIndex int
timeout := time.NewTimer(10 * time.Second)
for {
select {
case <-timeout.C:
t.Errorf("Timeout: Received only %d batches (%d expected)", batchIndex, len(expectedBatches))
return
case received = <-fsWatchChan:
}
if batchIndex >= len(expectedBatches) {
t.Errorf("Received batch %d, expected only %d", batchIndex+1, len(expectedBatches))
continue
}
if runtime.GOOS != "darwin" {
now := time.Since(startTime)
if innerIndex == 0 {
switch {
case now < durationMs(expectedBatches[batchIndex].afterMs):
t.Errorf("Received batch %d after %v (too soon)", batchIndex+1, elapsedTime)
case now > durationMs(expectedBatches[batchIndex].beforeMs):
t.Errorf("Received batch %d after %v (too late)", batchIndex+1, elapsedTime)
}
} else if innerTime := now - elapsedTime; innerTime > timeoutWithinBatch {
t.Errorf("Receive part %d of batch %d after %v (too late)", innerIndex+1, batchIndex+1, innerTime)
}
elapsedTime = now
}
expected := expectedBatches[batchIndex].paths[innerIndex]
if len(received) != len(expected) {
t.Errorf("Received %v events instead of %v for batch %v", len(received), len(expected), batchIndex+1)
}
missing, unexpected := compareBatchToExpected(received, expected)
for _, p := range missing {
t.Errorf("Did not receive event %s in batch %d (%d)", p, batchIndex+1, innerIndex+1)
}
for _, p := range unexpected {
t.Errorf("Received unexpected event %s in batch %d (%d)", p, batchIndex+1, innerIndex+1)
}
if innerIndex == len(expectedBatches[batchIndex].paths)-1 {
if batchIndex == len(expectedBatches)-1 {
// received everything we expected to
return
}
innerIndex = 0
batchIndex++
} else {
innerIndex++
}
}
}