diff --git a/lib/model/folder_sendrecv_test.go b/lib/model/folder_sendrecv_test.go index 1b4fa37fd..b3eab1c9e 100644 --- a/lib/model/folder_sendrecv_test.go +++ b/lib/model/folder_sendrecv_test.go @@ -490,17 +490,21 @@ func TestDeregisterOnFailInCopy(t *testing.T) { t.Fatal("Expected file in progress") } - copyChan := make(chan copyBlocksState) pullChan := make(chan pullBlockState) finisherBufferChan := make(chan *sharedPullerState) finisherChan := make(chan *sharedPullerState) dbUpdateChan := make(chan dbUpdateJob, 1) - go f.copierRoutine(copyChan, pullChan, finisherBufferChan) + copyChan, copyWg := startCopier(f, pullChan, finisherBufferChan) go f.finisherRoutine(finisherChan, dbUpdateChan, make(chan string)) - defer close(copyChan) - defer close(pullChan) - defer close(finisherChan) + + defer func() { + close(copyChan) + copyWg.Wait() + close(pullChan) + close(finisherBufferChan) + close(finisherChan) + }() f.handleFile(file, copyChan, dbUpdateChan) @@ -508,15 +512,15 @@ func TestDeregisterOnFailInCopy(t *testing.T) { // loop has been performed. toPull := <-pullChan - // Close the file, causing errors on further access - toPull.sharedPullerState.fail(os.ErrNotExist) - // Unblock copier go func() { for range pullChan { } }() + // Close the file, causing errors on further access + toPull.sharedPullerState.fail(os.ErrNotExist) + select { case state := <-finisherBufferChan: // At this point the file should still be registered with both the job @@ -580,66 +584,90 @@ func TestDeregisterOnFailInPull(t *testing.T) { t.Fatal("Expected file in progress") } - copyChan := make(chan copyBlocksState) pullChan := make(chan pullBlockState) finisherBufferChan := make(chan *sharedPullerState) finisherChan := make(chan *sharedPullerState) dbUpdateChan := make(chan dbUpdateJob, 1) - go f.copierRoutine(copyChan, pullChan, finisherBufferChan) - go f.pullerRoutine(pullChan, finisherBufferChan) + copyChan, copyWg := startCopier(f, pullChan, finisherBufferChan) + pullWg := sync.NewWaitGroup() + pullWg.Add(1) + go func() { + f.pullerRoutine(pullChan, finisherBufferChan) + pullWg.Done() + }() go f.finisherRoutine(finisherChan, dbUpdateChan, make(chan string)) - defer close(copyChan) - defer close(pullChan) - defer close(finisherChan) + defer func() { + // Unblock copier and puller + go func() { + for range finisherBufferChan { + } + }() + close(copyChan) + copyWg.Wait() + close(pullChan) + pullWg.Wait() + close(finisherBufferChan) + close(finisherChan) + }() f.handleFile(file, copyChan, dbUpdateChan) // Receive at finisher, we should error out as puller has nowhere to pull // from. timeout = time.Second - select { - case state := <-finisherBufferChan: - // At this point the file should still be registered with both the job - // queue, and the progress emitter. Verify this. - if f.model.progressEmitter.lenRegistry() != 1 || f.queue.lenProgress() != 1 || f.queue.lenQueued() != 0 { - t.Fatal("Could not find file") + + // Both the puller and copier may send to the finisherBufferChan. + var state *sharedPullerState + after := time.After(5 * time.Second) + for { + select { + case state = <-finisherBufferChan: + case <-after: + t.Fatal("Didn't get failed state to the finisher") } - - // Pass the file down the real finisher, and give it time to consume - finisherChan <- state - - t0 := time.Now() - if ev, err := s.Poll(time.Minute); err != nil { - t.Fatal("Got error waiting for ItemFinished event:", err) - } else if n := ev.Data.(map[string]interface{})["item"]; n != state.file.Name { - t.Fatal("Got ItemFinished event for wrong file:", n) + if state.failed() != nil { + break } - t.Log("event took", time.Since(t0)) + } - state.mut.Lock() - stateWriter := state.writer - state.mut.Unlock() - if stateWriter != nil { - t.Fatal("File not closed?") - } + // At this point the file should still be registered with both the job + // queue, and the progress emitter. Verify this. + if f.model.progressEmitter.lenRegistry() != 1 || f.queue.lenProgress() != 1 || f.queue.lenQueued() != 0 { + t.Fatal("Could not find file") + } - if f.model.progressEmitter.lenRegistry() != 0 || f.queue.lenProgress() != 0 || f.queue.lenQueued() != 0 { - t.Fatal("Still registered", f.model.progressEmitter.lenRegistry(), f.queue.lenProgress(), f.queue.lenQueued()) - } + // Pass the file down the real finisher, and give it time to consume + finisherChan <- state - // Doing it again should have no effect - finisherChan <- state + t0 := time.Now() + if ev, err := s.Poll(time.Minute); err != nil { + t.Fatal("Got error waiting for ItemFinished event:", err) + } else if n := ev.Data.(map[string]interface{})["item"]; n != state.file.Name { + t.Fatal("Got ItemFinished event for wrong file:", n) + } + t.Log("event took", time.Since(t0)) - if _, err := s.Poll(time.Second); err != events.ErrTimeout { - t.Fatal("Expected timeout, not another event", err) - } + state.mut.Lock() + stateWriter := state.writer + state.mut.Unlock() + if stateWriter != nil { + t.Fatal("File not closed?") + } - if f.model.progressEmitter.lenRegistry() != 0 || f.queue.lenProgress() != 0 || f.queue.lenQueued() != 0 { - t.Fatal("Still registered", f.model.progressEmitter.lenRegistry(), f.queue.lenProgress(), f.queue.lenQueued()) - } - case <-time.After(5 * time.Second): - t.Fatal("Didn't get anything to the finisher") + if f.model.progressEmitter.lenRegistry() != 0 || f.queue.lenProgress() != 0 || f.queue.lenQueued() != 0 { + t.Fatal("Still registered", f.model.progressEmitter.lenRegistry(), f.queue.lenProgress(), f.queue.lenQueued()) + } + + // Doing it again should have no effect + finisherChan <- state + + if _, err := s.Poll(time.Second); err != events.ErrTimeout { + t.Fatal("Expected timeout, not another event", err) + } + + if f.model.progressEmitter.lenRegistry() != 0 || f.queue.lenProgress() != 0 || f.queue.lenQueued() != 0 { + t.Fatal("Still registered", f.model.progressEmitter.lenRegistry(), f.queue.lenProgress(), f.queue.lenQueued()) } } @@ -830,11 +858,14 @@ func TestCopyOwner(t *testing.T) { // comes the finisher is done. finisherChan := make(chan *sharedPullerState) - defer close(finisherChan) - copierChan := make(chan copyBlocksState) - defer close(copierChan) - go f.copierRoutine(copierChan, nil, finisherChan) + copierChan, copyWg := startCopier(f, nil, finisherChan) go f.finisherRoutine(finisherChan, dbUpdateChan, nil) + defer func() { + close(copierChan) + copyWg.Wait() + close(finisherChan) + }() + f.handleFile(file, copierChan, nil) <-dbUpdateChan @@ -993,3 +1024,14 @@ func cleanupSharedPullerState(s *sharedPullerState) { s.writer.fd.Close() s.writer.mut.Unlock() } + +func startCopier(f *sendReceiveFolder, pullChan chan<- pullBlockState, finisherChan chan<- *sharedPullerState) (chan copyBlocksState, sync.WaitGroup) { + copyChan := make(chan copyBlocksState) + wg := sync.NewWaitGroup() + wg.Add(1) + go func() { + f.copierRoutine(copyChan, pullChan, finisherChan) + wg.Done() + }() + return copyChan, wg +} diff --git a/lib/model/progressemitter_test.go b/lib/model/progressemitter_test.go index ca3140e53..3ed9f47e3 100644 --- a/lib/model/progressemitter_test.go +++ b/lib/model/progressemitter_test.go @@ -66,6 +66,7 @@ func TestProgressEmitter(t *testing.T) { p := NewProgressEmitter(c, evLogger) go p.Serve() + defer p.Stop() p.interval = 0 expectTimeout(w, t)