parent
f04d054b5a
commit
1b59960ff9
|
@ -446,6 +446,9 @@ func TestDeregisterOnFailInCopy(t *testing.T) {
|
||||||
m := NewModel(defaultCfgWrapper, protocol.LocalDeviceID, "syncthing", "dev", db, nil)
|
m := NewModel(defaultCfgWrapper, protocol.LocalDeviceID, "syncthing", "dev", db, nil)
|
||||||
m.AddFolder(defaultFolderConfig)
|
m.AddFolder(defaultFolderConfig)
|
||||||
|
|
||||||
|
// Set up our evet subscription early
|
||||||
|
s := events.Default.Subscribe(events.ItemFinished)
|
||||||
|
|
||||||
f := setUpSendReceiveFolder(m)
|
f := setUpSendReceiveFolder(m)
|
||||||
|
|
||||||
// queue.Done should be called by the finisher routine
|
// queue.Done should be called by the finisher routine
|
||||||
|
@ -470,15 +473,16 @@ func TestDeregisterOnFailInCopy(t *testing.T) {
|
||||||
// Receive a block at puller, to indicate that at least a single copier
|
// Receive a block at puller, to indicate that at least a single copier
|
||||||
// loop has been performed.
|
// loop has been performed.
|
||||||
toPull := <-pullChan
|
toPull := <-pullChan
|
||||||
// Wait until copier is trying to pass something down to the puller again
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
// Close the file
|
|
||||||
toPull.sharedPullerState.fail("test", os.ErrNotExist)
|
|
||||||
// Unblock copier
|
|
||||||
<-pullChan
|
|
||||||
|
|
||||||
s := events.Default.Subscribe(events.ItemFinished)
|
// Close the file, causing errors on further access
|
||||||
timeout = time.Second
|
toPull.sharedPullerState.fail("test", os.ErrNotExist)
|
||||||
|
|
||||||
|
// Unblock copier
|
||||||
|
go func() {
|
||||||
|
for range pullChan {
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case state := <-finisherBufferChan:
|
case state := <-finisherBufferChan:
|
||||||
// At this point the file should still be registered with both the job
|
// At this point the file should still be registered with both the job
|
||||||
|
@ -490,12 +494,13 @@ func TestDeregisterOnFailInCopy(t *testing.T) {
|
||||||
// Pass the file down the real finisher, and give it time to consume
|
// Pass the file down the real finisher, and give it time to consume
|
||||||
finisherChan <- state
|
finisherChan <- state
|
||||||
|
|
||||||
if ev, err := s.Poll(timeout); err != nil {
|
t0 := time.Now()
|
||||||
|
if ev, err := s.Poll(time.Minute); err != nil {
|
||||||
t.Fatal("Got error waiting for ItemFinished event:", err)
|
t.Fatal("Got error waiting for ItemFinished event:", err)
|
||||||
} else if n := ev.Data.(map[string]interface{})["item"]; n != state.file.Name {
|
} else if n := ev.Data.(map[string]interface{})["item"]; n != state.file.Name {
|
||||||
t.Fatal("Got ItemFinished event for wrong file:", n)
|
t.Fatal("Got ItemFinished event for wrong file:", n)
|
||||||
}
|
}
|
||||||
time.Sleep(100 * time.Millisecond)
|
t.Log("event took", time.Since(t0))
|
||||||
|
|
||||||
state.mut.Lock()
|
state.mut.Lock()
|
||||||
stateFd := state.fd
|
stateFd := state.fd
|
||||||
|
@ -510,11 +515,15 @@ func TestDeregisterOnFailInCopy(t *testing.T) {
|
||||||
|
|
||||||
// Doing it again should have no effect
|
// Doing it again should have no effect
|
||||||
finisherChan <- state
|
finisherChan <- state
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
|
if _, err := s.Poll(time.Second); err != events.ErrTimeout {
|
||||||
|
t.Fatal("Expected timeout, not another event", err)
|
||||||
|
}
|
||||||
|
|
||||||
if f.model.progressEmitter.lenRegistry() != 0 || f.queue.lenProgress() != 0 || f.queue.lenQueued() != 0 {
|
if f.model.progressEmitter.lenRegistry() != 0 || f.queue.lenProgress() != 0 || f.queue.lenQueued() != 0 {
|
||||||
t.Fatal("Still registered", f.model.progressEmitter.lenRegistry(), f.queue.lenProgress(), f.queue.lenQueued())
|
t.Fatal("Still registered", f.model.progressEmitter.lenRegistry(), f.queue.lenProgress(), f.queue.lenQueued())
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-time.After(time.Second):
|
case <-time.After(time.Second):
|
||||||
t.Fatal("Didn't get anything to the finisher")
|
t.Fatal("Didn't get anything to the finisher")
|
||||||
}
|
}
|
||||||
|
@ -528,6 +537,9 @@ func TestDeregisterOnFailInPull(t *testing.T) {
|
||||||
m := NewModel(defaultCfgWrapper, protocol.LocalDeviceID, "syncthing", "dev", db, nil)
|
m := NewModel(defaultCfgWrapper, protocol.LocalDeviceID, "syncthing", "dev", db, nil)
|
||||||
m.AddFolder(defaultFolderConfig)
|
m.AddFolder(defaultFolderConfig)
|
||||||
|
|
||||||
|
// Set up our evet subscription early
|
||||||
|
s := events.Default.Subscribe(events.ItemFinished)
|
||||||
|
|
||||||
f := setUpSendReceiveFolder(m)
|
f := setUpSendReceiveFolder(m)
|
||||||
|
|
||||||
// queue.Done should be called by the finisher routine
|
// queue.Done should be called by the finisher routine
|
||||||
|
@ -552,7 +564,6 @@ func TestDeregisterOnFailInPull(t *testing.T) {
|
||||||
|
|
||||||
// Receive at finisher, we should error out as puller has nowhere to pull
|
// Receive at finisher, we should error out as puller has nowhere to pull
|
||||||
// from.
|
// from.
|
||||||
s := events.Default.Subscribe(events.ItemFinished)
|
|
||||||
timeout = time.Second
|
timeout = time.Second
|
||||||
select {
|
select {
|
||||||
case state := <-finisherBufferChan:
|
case state := <-finisherBufferChan:
|
||||||
|
@ -565,12 +576,13 @@ func TestDeregisterOnFailInPull(t *testing.T) {
|
||||||
// Pass the file down the real finisher, and give it time to consume
|
// Pass the file down the real finisher, and give it time to consume
|
||||||
finisherChan <- state
|
finisherChan <- state
|
||||||
|
|
||||||
if ev, err := s.Poll(timeout); err != nil {
|
t0 := time.Now()
|
||||||
|
if ev, err := s.Poll(time.Minute); err != nil {
|
||||||
t.Fatal("Got error waiting for ItemFinished event:", err)
|
t.Fatal("Got error waiting for ItemFinished event:", err)
|
||||||
} else if n := ev.Data.(map[string]interface{})["item"]; n != state.file.Name {
|
} else if n := ev.Data.(map[string]interface{})["item"]; n != state.file.Name {
|
||||||
t.Fatal("Got ItemFinished event for wrong file:", n)
|
t.Fatal("Got ItemFinished event for wrong file:", n)
|
||||||
}
|
}
|
||||||
time.Sleep(100 * time.Millisecond)
|
t.Log("event took", time.Since(t0))
|
||||||
|
|
||||||
state.mut.Lock()
|
state.mut.Lock()
|
||||||
stateFd := state.fd
|
stateFd := state.fd
|
||||||
|
@ -585,7 +597,10 @@ func TestDeregisterOnFailInPull(t *testing.T) {
|
||||||
|
|
||||||
// Doing it again should have no effect
|
// Doing it again should have no effect
|
||||||
finisherChan <- state
|
finisherChan <- state
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
|
if _, err := s.Poll(time.Second); err != events.ErrTimeout {
|
||||||
|
t.Fatal("Expected timeout, not another event", err)
|
||||||
|
}
|
||||||
|
|
||||||
if f.model.progressEmitter.lenRegistry() != 0 || f.queue.lenProgress() != 0 || f.queue.lenQueued() != 0 {
|
if f.model.progressEmitter.lenRegistry() != 0 || f.queue.lenProgress() != 0 || f.queue.lenQueued() != 0 {
|
||||||
t.Fatal("Still registered", f.model.progressEmitter.lenRegistry(), f.queue.lenProgress(), f.queue.lenQueued())
|
t.Fatal("Still registered", f.model.progressEmitter.lenRegistry(), f.queue.lenProgress(), f.queue.lenQueued())
|
||||||
|
|
Loading…
Reference in New Issue