diff --git a/lib/model/model.go b/lib/model/model.go index 730c4b706..bcd6e56b5 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -222,16 +222,8 @@ func (m *model) Stop() { for id := range devs { ids = append(ids, id) } - m.pmut.RLock() - closed := make([]chan struct{}, 0, len(m.closed)) - for _, c := range m.closed { - closed = append(closed, c) - } - m.pmut.RUnlock() - m.closeConns(ids, errStopped) - for _, c := range closed { - <-c - } + w := m.closeConns(ids, errStopped) + w.Wait() } // StartDeadlockDetector starts a deadlock detector on the models locks which @@ -416,12 +408,16 @@ func (m *model) tearDownFolderLocked(cfg config.FolderConfiguration, err error) // Close connections to affected devices // Must happen before stopping the folder service to abort ongoing // transmissions and thus allow timely service termination. - m.closeConns(cfg.DeviceIDs(), err) + w := m.closeConns(cfg.DeviceIDs(), err) for _, id := range tokens { m.RemoveAndWait(id, 0) } + // Wait for connections to stop to ensure that no more calls to methods + // expecting this folder to exist happen (e.g. .IndexUpdate). + w.Wait() + m.fmut.Lock() // Clean up our config maps @@ -1440,23 +1436,39 @@ func (m *model) Closed(conn protocol.Connection, err error) { close(closed) } -// closeConns will close the underlying connection for given devices -func (m *model) closeConns(devs []protocol.DeviceID, err error) { +// closeConns will close the underlying connection for given devices and return +// a waiter that will return once all the connections are finished closing. +func (m *model) closeConns(devs []protocol.DeviceID, err error) config.Waiter { conns := make([]connections.Connection, 0, len(devs)) + closed := make([]chan struct{}, 0, len(devs)) m.pmut.Lock() for _, dev := range devs { if conn, ok := m.conn[dev]; ok { conns = append(conns, conn) + closed = append(closed, m.closed[dev]) } } m.pmut.Unlock() for _, conn := range conns { conn.Close(err) } + return &channelWaiter{chans: closed} } -func (m *model) closeConn(dev protocol.DeviceID, err error) { - m.closeConns([]protocol.DeviceID{dev}, err) +// closeConn closes the underlying connection for the given device and returns +// a waiter that will return once the connection is finished closing. +func (m *model) closeConn(dev protocol.DeviceID, err error) config.Waiter { + return m.closeConns([]protocol.DeviceID{dev}, err) +} + +type channelWaiter struct { + chans []chan struct{} +} + +func (w *channelWaiter) Wait() { + for _, c := range w.chans { + <-c + } } // Implements protocol.RequestResponse @@ -2537,7 +2549,6 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { if toCfg.Paused { l.Infoln("Pausing", deviceID) - m.closeConn(deviceID, errDevicePaused) events.Default.Log(events.DevicePaused, map[string]string{"device": deviceID.String()}) } else { events.Default.Log(events.DeviceResumed, map[string]string{"device": deviceID.String()}) diff --git a/lib/protocol/protocol.go b/lib/protocol/protocol.go index 66dbb54ce..e756ded21 100644 --- a/lib/protocol/protocol.go +++ b/lib/protocol/protocol.go @@ -187,6 +187,7 @@ type rawConnection struct { closeBox chan asyncMessage clusterConfigBox chan *ClusterConfig dispatcherLoopStopped chan struct{} + preventSends chan struct{} closed chan struct{} closeOnce sync.Once sendCloseOnce sync.Once @@ -240,6 +241,7 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv closeBox: make(chan asyncMessage), clusterConfigBox: make(chan *ClusterConfig), dispatcherLoopStopped: make(chan struct{}), + preventSends: make(chan struct{}), closed: make(chan struct{}), compression: compress, } @@ -662,12 +664,13 @@ func (c *rawConnection) send(msg message, done chan struct{}) bool { select { case c.outbox <- asyncMessage{msg, done}: return true + case <-c.preventSends: case <-c.closed: - if done != nil { - close(done) - } - return false } + if done != nil { + close(done) + } + return false } func (c *rawConnection) writerLoop() {