diff --git a/lib/model/model.go b/lib/model/model.go index b3dd341d8..7ff25a259 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -1041,6 +1041,7 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon m.pmut.RLock() conn, ok := m.conn[deviceID] + closed := m.closed[deviceID] hello := m.helloMessages[deviceID] m.pmut.RUnlock() if !ok { @@ -1172,7 +1173,19 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon } } - go sendIndexes(conn, folder.ID, fs, startSequence, dropSymlinks) + // The token isn't tracked as the service stops when the connection + // terminates and is automatically removed from supervisor (by + // implementing suture.IsCompletable). + m.Add(&indexSender{ + conn: conn, + connClosed: closed, + folder: folder.ID, + fset: fs, + prevSequence: startSequence, + dropSymlinks: dropSymlinks, + stop: make(chan struct{}), + stopped: make(chan struct{}), + }) } m.pmut.Lock() @@ -1886,15 +1899,28 @@ func (m *model) deviceWasSeen(deviceID protocol.DeviceID) { m.deviceStatRef(deviceID).WasSeen() } -func sendIndexes(conn protocol.Connection, folder string, fs *db.FileSet, prevSequence int64, dropSymlinks bool) { - deviceID := conn.ID() +type indexSender struct { + conn protocol.Connection + folder string + dev string + fset *db.FileSet + prevSequence int64 + dropSymlinks bool + connClosed chan struct{} + stop chan struct{} + stopped chan struct{} +} + +func (s *indexSender) Serve() { + defer close(s.stopped) + var err error - l.Debugf("Starting sendIndexes for %s to %s at %s (slv=%d)", folder, deviceID, conn, prevSequence) - defer l.Debugf("Exiting sendIndexes for %s to %s at %s: %v", folder, deviceID, conn, err) + l.Debugf("Starting indexSender for %s to %s at %s (slv=%d)", s.folder, s.dev, s.conn, s.prevSequence) + defer l.Debugf("Exiting indexSender for %s to %s at %s: %v", s.folder, s.dev, s.conn, err) // We need to send one index, regardless of whether there is something to send or not - prevSequence, err = sendIndexTo(prevSequence, conn, folder, fs, dropSymlinks) + err = s.sendIndexTo() // Subscribe to LocalIndexUpdated (we have new information to send) and // DeviceDisconnected (it might be us who disconnected, so we should @@ -1902,22 +1928,37 @@ func sendIndexes(conn protocol.Connection, folder string, fs *db.FileSet, prevSe sub := events.Default.Subscribe(events.LocalIndexUpdated | events.DeviceDisconnected) defer events.Default.Unsubscribe(sub) + evChan := sub.C() + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + for err == nil { - if conn.Closed() { - // Our work is done. + select { + case <-s.stop: return + case <-s.connClosed: + return + default: } // While we have sent a sequence at least equal to the one // currently in the database, wait for the local index to update. The // local index may update for other folders than the one we are // sending for. - if fs.Sequence(protocol.LocalDeviceID) <= prevSequence { - sub.Poll(time.Minute) + if s.fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence { + select { + case <-s.stop: + return + case <-s.connClosed: + return + case <-evChan: + case <-ticker.C: + } + continue } - prevSequence, err = sendIndexTo(prevSequence, conn, folder, fs, dropSymlinks) + err = s.sendIndexTo() // Wait a short amount of time before entering the next loop. If there // are continuous changes happening to the local index, this gives us @@ -1926,31 +1967,42 @@ func sendIndexes(conn protocol.Connection, folder string, fs *db.FileSet, prevSe } } +func (s *indexSender) Stop() { + close(s.stop) + <-s.stopped +} + +// Complete implements the suture.IsCompletable interface. When Serve terminates +// before Stop is called, the supervisor will check for this method and if it +// returns true removes the service instead of restarting it. Here it always +// returns true, as indexSender only terminates when a connection is +// closed/has failed, in which case retrying doesn't help. +func (s *indexSender) Complete() bool { return true } + // sendIndexTo sends file infos with a sequence number higher than prevSequence and // returns the highest sent sequence number. -func sendIndexTo(prevSequence int64, conn protocol.Connection, folder string, fs *db.FileSet, dropSymlinks bool) (int64, error) { - deviceID := conn.ID() - initial := prevSequence == 0 +func (s *indexSender) sendIndexTo() error { + initial := s.prevSequence == 0 batch := newFileInfoBatch(nil) batch.flushFn = func(fs []protocol.FileInfo) error { - l.Debugf("Sending indexes for %s to %s at %s: %d files (<%d bytes)", folder, deviceID, conn, len(batch.infos), batch.size) + l.Debugf("Sending indexes for %s to %s at %s: %d files (<%d bytes)", s.folder, s.dev, s.conn, len(batch.infos), batch.size) if initial { initial = false - return conn.Index(folder, fs) + return s.conn.Index(s.folder, fs) } - return conn.IndexUpdate(folder, fs) + return s.conn.IndexUpdate(s.folder, fs) } var err error var f protocol.FileInfo - fs.WithHaveSequence(prevSequence+1, func(fi db.FileIntf) bool { + s.fset.WithHaveSequence(s.prevSequence+1, func(fi db.FileIntf) bool { if err = batch.flushIfFull(); err != nil { return false } if shouldDebug() { - if fi.SequenceNo() < prevSequence+1 { - panic(fmt.Sprintln("sequence lower than requested, got:", fi.SequenceNo(), ", asked to start at:", prevSequence+1)) + if fi.SequenceNo() < s.prevSequence+1 { + panic(fmt.Sprintln("sequence lower than requested, got:", fi.SequenceNo(), ", asked to start at:", s.prevSequence+1)) } if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence { panic(fmt.Sprintln("non-increasing sequence, current:", fi.SequenceNo(), "<= previous:", f.Sequence)) @@ -1969,7 +2021,7 @@ func sendIndexTo(prevSequence int64, conn protocol.Connection, folder string, fs } f.LocalFlags = 0 // never sent externally - if dropSymlinks && f.IsSymlink() { + if s.dropSymlinks && f.IsSymlink() { // Do not send index entries with symlinks to clients that can't // handle it. Fixes issue #3802. Once both sides are upgraded, a // rescan (i.e., change) of the symlink is required for it to @@ -1981,17 +2033,18 @@ func sendIndexTo(prevSequence int64, conn protocol.Connection, folder string, fs return true }) if err != nil { - return prevSequence, err + return err } err = batch.flush() // True if there was nothing to be sent if f.Sequence == 0 { - return prevSequence, err + return err } - return f.Sequence, err + s.prevSequence = f.Sequence + return err } func (m *model) requestGlobal(deviceID protocol.DeviceID, folder, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {