lib/watchaggregator: Don't delay mixed events only (#5094)
Also fix a minor bug in testing failure output.
This commit is contained in:
parent
2c9d96375b
commit
c55c0c8c28
|
@ -308,8 +308,8 @@ func (a *aggregator) actOnTimer(out chan<- []string) {
|
||||||
}
|
}
|
||||||
oldEvents := make(map[string]*aggregatedEvent, c)
|
oldEvents := make(map[string]*aggregatedEvent, c)
|
||||||
a.popOldEventsTo(oldEvents, a.root, ".", time.Now(), true)
|
a.popOldEventsTo(oldEvents, a.root, ".", time.Now(), true)
|
||||||
if a.notifyDelay != a.notifyTimeout && a.counts[fs.NonRemove]+a.counts[fs.Mixed] == 0 && a.counts[fs.Remove] != 0 {
|
if a.notifyDelay != a.notifyTimeout && a.counts[fs.NonRemove] == 0 && a.counts[fs.Remove]+a.counts[fs.Mixed] != 0 {
|
||||||
// Only deletion events remaining, no need to delay them additionally
|
// Only delayed events remaining, no need to delay them additionally
|
||||||
a.popOldEventsTo(oldEvents, a.root, ".", time.Now(), false)
|
a.popOldEventsTo(oldEvents, a.root, ".", time.Now(), false)
|
||||||
}
|
}
|
||||||
if len(oldEvents) == 0 {
|
if len(oldEvents) == 0 {
|
||||||
|
|
|
@ -183,6 +183,23 @@ func TestDelay(t *testing.T) {
|
||||||
testScenario(t, "Delay", testCase, expectedBatches)
|
testScenario(t, "Delay", testCase, expectedBatches)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestNoDelay checks that no delay occurs if there are no non-remove events
|
||||||
|
func TestNoDelay(t *testing.T) {
|
||||||
|
mixed := "foo"
|
||||||
|
del := "bar"
|
||||||
|
testCase := func(c chan<- fs.Event) {
|
||||||
|
c <- fs.Event{Name: mixed, Type: fs.NonRemove}
|
||||||
|
c <- fs.Event{Name: mixed, Type: fs.Remove}
|
||||||
|
c <- fs.Event{Name: del, Type: fs.Remove}
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedBatches := []expectedBatch{
|
||||||
|
{[][]string{{mixed}, {del}}, 500, 2000},
|
||||||
|
}
|
||||||
|
|
||||||
|
testScenario(t, "NoDelay", testCase, expectedBatches)
|
||||||
|
}
|
||||||
|
|
||||||
func getEventPaths(dir *eventDir, dirPath string, a *aggregator) []string {
|
func getEventPaths(dir *eventDir, dirPath string, a *aggregator) []string {
|
||||||
var paths []string
|
var paths []string
|
||||||
for childName, childDir := range dir.dirs {
|
for childName, childDir := range dir.dirs {
|
||||||
|
@ -283,10 +300,10 @@ func testAggregatorOutput(t *testing.T, fsWatchChan <-chan []string, expectedBat
|
||||||
if innerIndex == 0 {
|
if innerIndex == 0 {
|
||||||
switch {
|
switch {
|
||||||
case now < durationMs(expectedBatches[batchIndex].afterMs):
|
case now < durationMs(expectedBatches[batchIndex].afterMs):
|
||||||
t.Errorf("Received batch %d after %v (too soon)", batchIndex+1, elapsedTime)
|
t.Errorf("Received batch %d after %v (too soon)", batchIndex+1, now)
|
||||||
|
|
||||||
case now > durationMs(expectedBatches[batchIndex].beforeMs):
|
case now > durationMs(expectedBatches[batchIndex].beforeMs):
|
||||||
t.Errorf("Received batch %d after %v (too late)", batchIndex+1, elapsedTime)
|
t.Errorf("Received batch %d after %v (too late)", batchIndex+1, now)
|
||||||
}
|
}
|
||||||
} else if innerTime := now - elapsedTime; innerTime > timeoutWithinBatch {
|
} else if innerTime := now - elapsedTime; innerTime > timeoutWithinBatch {
|
||||||
t.Errorf("Receive part %d of batch %d after %v (too late)", innerIndex+1, batchIndex+1, innerTime)
|
t.Errorf("Receive part %d of batch %d after %v (too late)", innerIndex+1, batchIndex+1, innerTime)
|
||||||
|
|
Loading…
Reference in New Issue