From 90d85fd0a2fddebc9bc2fafdcd3cc128467782d3 Mon Sep 17 00:00:00 2001 From: Simon Frei Date: Thu, 21 Nov 2019 08:41:15 +0100 Subject: [PATCH] lib: Replace done channel with contexts in and add names to util services (#6166) --- lib/api/api.go | 7 ++-- lib/beacon/beacon.go | 13 +++---- lib/beacon/broadcast.go | 36 +++++++++---------- lib/beacon/multicast.go | 37 +++++++++----------- lib/connections/quic_listen.go | 8 ++--- lib/connections/relay_listen.go | 7 ++-- lib/connections/service.go | 13 +++---- lib/connections/tcp_listen.go | 7 ++-- lib/discover/global.go | 9 ++--- lib/discover/local.go | 14 ++++---- lib/events/events.go | 12 +++++-- lib/fs/basicfs_watch.go | 4 +-- lib/fs/basicfs_watch_test.go | 6 ++-- lib/model/folder.go | 16 +++------ lib/model/folder_sendonly.go | 2 +- lib/model/folder_sendrecv.go | 2 +- lib/model/folder_summary.go | 13 +++---- lib/model/model.go | 14 +++++--- lib/model/progressemitter.go | 7 ++-- lib/nat/registry.go | 7 ++-- lib/nat/service.go | 43 +++++++++++++---------- lib/relay/client/client.go | 9 ++--- lib/relay/client/dynamic.go | 13 +++---- lib/relay/client/static.go | 13 +++---- lib/stun/stun.go | 21 +++++------ lib/syncthing/auditservice.go | 12 +++++-- lib/syncthing/verboseservice.go | 11 ++++-- lib/ur/usage_report.go | 6 ++-- lib/util/utils.go | 48 +++++++++++--------------- lib/util/utils_test.go | 13 +++---- lib/versioner/staggered.go | 12 +++++-- lib/versioner/trashcan.go | 7 ++-- lib/watchaggregator/aggregator.go | 6 ++-- lib/watchaggregator/aggregator_test.go | 10 +++--- 34 files changed, 240 insertions(+), 218 deletions(-) diff --git a/lib/api/api.go b/lib/api/api.go index e7a4b83b7..cdd13db4e 100644 --- a/lib/api/api.go +++ b/lib/api/api.go @@ -8,6 +8,7 @@ package api import ( "bytes" + "context" "crypto/tls" "crypto/x509" "encoding/json" @@ -136,7 +137,7 @@ func New(id protocol.DeviceID, cfg config.Wrapper, assetDir, tlsDefaultCommonNam configChanged: make(chan struct{}), startedOnce: make(chan struct{}), } - s.Service = util.AsService(s.serve) + s.Service = util.AsService(s.serve, s.String()) return s } @@ -207,7 +208,7 @@ func sendJSON(w http.ResponseWriter, jsonObject interface{}) { fmt.Fprintf(w, "%s\n", bs) } -func (s *service) serve(stop chan struct{}) { +func (s *service) serve(ctx context.Context) { listener, err := s.getListener(s.cfg.GUI()) if err != nil { select { @@ -381,7 +382,7 @@ func (s *service) serve(stop chan struct{}) { // Wait for stop, restart or error signals select { - case <-stop: + case <-ctx.Done(): // Shutting down permanently l.Debugln("shutting down (stop)") case <-s.configChanged: diff --git a/lib/beacon/beacon.go b/lib/beacon/beacon.go index b1296b219..d7cb5e04f 100644 --- a/lib/beacon/beacon.go +++ b/lib/beacon/beacon.go @@ -7,6 +7,7 @@ package beacon import ( + "context" "fmt" "net" "time" @@ -63,23 +64,23 @@ func newCast(name string) *cast { } } -func (c *cast) addReader(svc func(chan struct{}) error) { +func (c *cast) addReader(svc func(context.Context) error) { c.reader = c.createService(svc, "reader") c.Add(c.reader) } -func (c *cast) addWriter(svc func(stop chan struct{}) error) { +func (c *cast) addWriter(svc func(ctx context.Context) error) { c.writer = c.createService(svc, "writer") c.Add(c.writer) } -func (c *cast) createService(svc func(chan struct{}) error, suffix string) util.ServiceWithError { - return util.AsServiceWithError(func(stop chan struct{}) error { +func (c *cast) createService(svc func(context.Context) error, suffix string) util.ServiceWithError { + return util.AsServiceWithError(func(ctx context.Context) error { l.Debugln("Starting", c.name, suffix) - err := svc(stop) + err := svc(ctx) l.Debugf("Stopped %v %v: %v", c.name, suffix, err) return err - }) + }, fmt.Sprintf("%s/%s", c, suffix)) } func (c *cast) Stop() { diff --git a/lib/beacon/broadcast.go b/lib/beacon/broadcast.go index 969fd63ba..1ab0cddb3 100644 --- a/lib/beacon/broadcast.go +++ b/lib/beacon/broadcast.go @@ -7,34 +7,32 @@ package beacon import ( + "context" "net" "time" ) func NewBroadcast(port int) Interface { c := newCast("broadcastBeacon") - c.addReader(func(stop chan struct{}) error { - return readBroadcasts(c.outbox, port, stop) + c.addReader(func(ctx context.Context) error { + return readBroadcasts(ctx, c.outbox, port) }) - c.addWriter(func(stop chan struct{}) error { - return writeBroadcasts(c.inbox, port, stop) + c.addWriter(func(ctx context.Context) error { + return writeBroadcasts(ctx, c.inbox, port) }) return c } -func writeBroadcasts(inbox <-chan []byte, port int, stop chan struct{}) error { +func writeBroadcasts(ctx context.Context, inbox <-chan []byte, port int) error { conn, err := net.ListenUDP("udp4", nil) if err != nil { l.Debugln(err) return err } - done := make(chan struct{}) - defer close(done) + doneCtx, cancel := context.WithCancel(ctx) + defer cancel() go func() { - select { - case <-stop: - case <-done: - } + <-doneCtx.Done() conn.Close() }() @@ -42,7 +40,7 @@ func writeBroadcasts(inbox <-chan []byte, port int, stop chan struct{}) error { var bs []byte select { case bs = <-inbox: - case <-stop: + case <-doneCtx.Done(): return nil } @@ -99,19 +97,17 @@ func writeBroadcasts(inbox <-chan []byte, port int, stop chan struct{}) error { } } -func readBroadcasts(outbox chan<- recv, port int, stop chan struct{}) error { +func readBroadcasts(ctx context.Context, outbox chan<- recv, port int) error { conn, err := net.ListenUDP("udp4", &net.UDPAddr{Port: port}) if err != nil { l.Debugln(err) return err } - done := make(chan struct{}) - defer close(done) + + doneCtx, cancel := context.WithCancel(ctx) + defer cancel() go func() { - select { - case <-stop: - case <-done: - } + <-doneCtx.Done() conn.Close() }() @@ -129,7 +125,7 @@ func readBroadcasts(outbox chan<- recv, port int, stop chan struct{}) error { copy(c, bs) select { case outbox <- recv{c, addr}: - case <-stop: + case <-doneCtx.Done(): return nil default: l.Debugln("dropping message") diff --git a/lib/beacon/multicast.go b/lib/beacon/multicast.go index f3c1e7fd6..353955a2c 100644 --- a/lib/beacon/multicast.go +++ b/lib/beacon/multicast.go @@ -7,6 +7,7 @@ package beacon import ( + "context" "errors" "net" "time" @@ -16,16 +17,16 @@ import ( func NewMulticast(addr string) Interface { c := newCast("multicastBeacon") - c.addReader(func(stop chan struct{}) error { - return readMulticasts(c.outbox, addr, stop) + c.addReader(func(ctx context.Context) error { + return readMulticasts(ctx, c.outbox, addr) }) - c.addWriter(func(stop chan struct{}) error { - return writeMulticasts(c.inbox, addr, stop) + c.addWriter(func(ctx context.Context) error { + return writeMulticasts(ctx, c.inbox, addr) }) return c } -func writeMulticasts(inbox <-chan []byte, addr string, stop chan struct{}) error { +func writeMulticasts(ctx context.Context, inbox <-chan []byte, addr string) error { gaddr, err := net.ResolveUDPAddr("udp6", addr) if err != nil { l.Debugln(err) @@ -37,13 +38,10 @@ func writeMulticasts(inbox <-chan []byte, addr string, stop chan struct{}) error l.Debugln(err) return err } - done := make(chan struct{}) - defer close(done) + doneCtx, cancel := context.WithCancel(ctx) + defer cancel() go func() { - select { - case <-stop: - case <-done: - } + <-doneCtx.Done() conn.Close() }() @@ -57,7 +55,7 @@ func writeMulticasts(inbox <-chan []byte, addr string, stop chan struct{}) error var bs []byte select { case bs = <-inbox: - case <-stop: + case <-doneCtx.Done(): return nil } @@ -84,7 +82,7 @@ func writeMulticasts(inbox <-chan []byte, addr string, stop chan struct{}) error success++ select { - case <-stop: + case <-doneCtx.Done(): return nil default: } @@ -96,7 +94,7 @@ func writeMulticasts(inbox <-chan []byte, addr string, stop chan struct{}) error } } -func readMulticasts(outbox chan<- recv, addr string, stop chan struct{}) error { +func readMulticasts(ctx context.Context, outbox chan<- recv, addr string) error { gaddr, err := net.ResolveUDPAddr("udp6", addr) if err != nil { l.Debugln(err) @@ -108,13 +106,10 @@ func readMulticasts(outbox chan<- recv, addr string, stop chan struct{}) error { l.Debugln(err) return err } - done := make(chan struct{}) - defer close(done) + doneCtx, cancel := context.WithCancel(ctx) + defer cancel() go func() { - select { - case <-stop: - case <-done: - } + <-doneCtx.Done() conn.Close() }() @@ -144,7 +139,7 @@ func readMulticasts(outbox chan<- recv, addr string, stop chan struct{}) error { bs := make([]byte, 65536) for { select { - case <-stop: + case <-doneCtx.Done(): return nil default: } diff --git a/lib/connections/quic_listen.go b/lib/connections/quic_listen.go index 5c402d747..722d0f494 100644 --- a/lib/connections/quic_listen.go +++ b/lib/connections/quic_listen.go @@ -78,13 +78,9 @@ func (t *quicListener) OnExternalAddressChanged(address *stun.Host, via string) } } -func (t *quicListener) serve(stop chan struct{}) error { +func (t *quicListener) serve(ctx context.Context) error { network := strings.Replace(t.uri.Scheme, "quic", "udp", -1) - // Convert the stop channel into a context - ctx, cancel := context.WithCancel(context.Background()) - go func() { <-stop; cancel() }() - packetConn, err := net.ListenPacket(network, t.uri.Host) if err != nil { l.Infoln("Listen (BEP/quic):", err) @@ -205,7 +201,7 @@ func (f *quicListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls. conns: conns, factory: f, } - l.ServiceWithError = util.AsServiceWithError(l.serve) + l.ServiceWithError = util.AsServiceWithError(l.serve, l.String()) l.nat.Store(stun.NATUnknown) return l } diff --git a/lib/connections/relay_listen.go b/lib/connections/relay_listen.go index 5612336a7..81a105db4 100644 --- a/lib/connections/relay_listen.go +++ b/lib/connections/relay_listen.go @@ -7,6 +7,7 @@ package connections import ( + "context" "crypto/tls" "net/url" "sync" @@ -40,7 +41,7 @@ type relayListener struct { mut sync.RWMutex } -func (t *relayListener) serve(stop chan struct{}) error { +func (t *relayListener) serve(ctx context.Context) error { clnt, err := client.NewClient(t.uri, t.tlsCfg.Certificates, nil, 10*time.Second) if err != nil { l.Infoln("Listen (BEP/relay):", err) @@ -112,7 +113,7 @@ func (t *relayListener) serve(stop chan struct{}) error { t.notifyAddressesChanged(t) } - case <-stop: + case <-ctx.Done(): return nil } } @@ -178,7 +179,7 @@ func (f *relayListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls conns: conns, factory: f, } - t.ServiceWithError = util.AsServiceWithError(t.serve) + t.ServiceWithError = util.AsServiceWithError(t.serve, t.String()) return t } diff --git a/lib/connections/service.go b/lib/connections/service.go index 10231d5d2..e662aeab3 100644 --- a/lib/connections/service.go +++ b/lib/connections/service.go @@ -7,6 +7,7 @@ package connections import ( + "context" "crypto/tls" "errors" "fmt" @@ -185,18 +186,18 @@ func NewService(cfg config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *t // the common handling regardless of whether the connection was // incoming or outgoing. - service.Add(util.AsService(service.connect)) - service.Add(util.AsService(service.handle)) + service.Add(util.AsService(service.connect, fmt.Sprintf("%s/connect", service))) + service.Add(util.AsService(service.handle, fmt.Sprintf("%s/handle", service))) service.Add(service.listenerSupervisor) return service } -func (s *service) handle(stop chan struct{}) { +func (s *service) handle(ctx context.Context) { var c internalConn for { select { - case <-stop: + case <-ctx.Done(): return case c = <-s.conns: } @@ -324,7 +325,7 @@ func (s *service) handle(stop chan struct{}) { } } -func (s *service) connect(stop chan struct{}) { +func (s *service) connect(ctx context.Context) { nextDial := make(map[string]time.Time) // Used as delay for the first few connection attempts, increases @@ -480,7 +481,7 @@ func (s *service) connect(stop chan struct{}) { select { case <-time.After(sleep): - case <-stop: + case <-ctx.Done(): return } } diff --git a/lib/connections/tcp_listen.go b/lib/connections/tcp_listen.go index d958358f6..351eaa908 100644 --- a/lib/connections/tcp_listen.go +++ b/lib/connections/tcp_listen.go @@ -7,6 +7,7 @@ package connections import ( + "context" "crypto/tls" "net" "net/url" @@ -42,7 +43,7 @@ type tcpListener struct { mut sync.RWMutex } -func (t *tcpListener) serve(stop chan struct{}) error { +func (t *tcpListener) serve(ctx context.Context) error { tcaddr, err := net.ResolveTCPAddr(t.uri.Scheme, t.uri.Host) if err != nil { l.Infoln("Listen (BEP/tcp):", err) @@ -76,7 +77,7 @@ func (t *tcpListener) serve(stop chan struct{}) error { listener.SetDeadline(time.Now().Add(time.Second)) conn, err := listener.Accept() select { - case <-stop: + case <-ctx.Done(): if err == nil { conn.Close() } @@ -183,7 +184,7 @@ func (f *tcpListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.C natService: natService, factory: f, } - l.ServiceWithError = util.AsServiceWithError(l.serve) + l.ServiceWithError = util.AsServiceWithError(l.serve, l.String()) return l } diff --git a/lib/discover/global.go b/lib/discover/global.go index 2ce44864a..aa072e8e4 100644 --- a/lib/discover/global.go +++ b/lib/discover/global.go @@ -8,6 +8,7 @@ package discover import ( "bytes" + "context" "crypto/tls" "encoding/json" "errors" @@ -128,7 +129,7 @@ func NewGlobal(server string, cert tls.Certificate, addrList AddressLister, evLo noLookup: opts.noLookup, evLogger: evLogger, } - cl.Service = util.AsService(cl.serve) + cl.Service = util.AsService(cl.serve, cl.String()) if !opts.noAnnounce { // If we are supposed to annonce, it's an error until we've done so. cl.setError(errors.New("not announced")) @@ -188,11 +189,11 @@ func (c *globalClient) String() string { return "global@" + c.server } -func (c *globalClient) serve(stop chan struct{}) { +func (c *globalClient) serve(ctx context.Context) { if c.noAnnounce { // We're configured to not do announcements, only lookups. To maintain // the same interface, we just pause here if Serve() is run. - <-stop + <-ctx.Done() return } @@ -212,7 +213,7 @@ func (c *globalClient) serve(stop chan struct{}) { case <-timer.C: c.sendAnnouncement(timer) - case <-stop: + case <-ctx.Done(): return } } diff --git a/lib/discover/local.go b/lib/discover/local.go index 0b2472f3f..7f7abc0ee 100644 --- a/lib/discover/local.go +++ b/lib/discover/local.go @@ -10,8 +10,10 @@ package discover import ( + "context" "encoding/binary" "encoding/hex" + "fmt" "io" "net" "net/url" @@ -81,9 +83,9 @@ func NewLocal(id protocol.DeviceID, addr string, addrList AddressLister, evLogge c.beacon = beacon.NewMulticast(addr) } c.Add(c.beacon) - c.Add(util.AsService(c.recvAnnouncements)) + c.Add(util.AsService(c.recvAnnouncements, fmt.Sprintf("%s/recv", c))) - c.Add(util.AsService(c.sendLocalAnnouncements)) + c.Add(util.AsService(c.sendLocalAnnouncements, fmt.Sprintf("%s/sendLocal", c))) return c, nil } @@ -135,7 +137,7 @@ func (c *localClient) announcementPkt(instanceID int64, msg []byte) ([]byte, boo return msg, true } -func (c *localClient) sendLocalAnnouncements(stop chan struct{}) { +func (c *localClient) sendLocalAnnouncements(ctx context.Context) { var msg []byte var ok bool instanceID := rand.Int63() @@ -147,18 +149,18 @@ func (c *localClient) sendLocalAnnouncements(stop chan struct{}) { select { case <-c.localBcastTick: case <-c.forcedBcastTick: - case <-stop: + case <-ctx.Done(): return } } } -func (c *localClient) recvAnnouncements(stop chan struct{}) { +func (c *localClient) recvAnnouncements(ctx context.Context) { b := c.beacon warnedAbout := make(map[string]bool) for { select { - case <-stop: + case <-ctx.Done(): return default: } diff --git a/lib/events/events.go b/lib/events/events.go index bd6f9a0bb..1e364cb89 100644 --- a/lib/events/events.go +++ b/lib/events/events.go @@ -8,8 +8,10 @@ package events import ( + "context" "encoding/json" "errors" + "fmt" "runtime" "time" @@ -258,7 +260,7 @@ func NewLogger() Logger { funcs: make(chan func()), toUnsubscribe: make(chan *subscription), } - l.Service = util.AsService(l.serve) + l.Service = util.AsService(l.serve, l.String()) // Make sure the timer is in the stopped state and hasn't fired anything // into the channel. if !l.timeout.Stop() { @@ -267,7 +269,7 @@ func NewLogger() Logger { return l } -func (l *logger) serve(stop chan struct{}) { +func (l *logger) serve(ctx context.Context) { loop: for { select { @@ -282,7 +284,7 @@ loop: case s := <-l.toUnsubscribe: l.unsubscribe(s) - case <-stop: + case <-ctx.Done(): break loop } } @@ -388,6 +390,10 @@ func (l *logger) unsubscribe(s *subscription) { close(s.events) } +func (l *logger) String() string { + return fmt.Sprintf("events.Logger/@%p", l) +} + // Poll returns an event from the subscription or an error if the poll times // out of the event channel is closed. Poll should not be called concurrently // from multiple goroutines for a single subscription. diff --git a/lib/fs/basicfs_watch.go b/lib/fs/basicfs_watch.go index 903123bce..e2437ec7c 100644 --- a/lib/fs/basicfs_watch.go +++ b/lib/fs/basicfs_watch.go @@ -55,12 +55,12 @@ func (f *BasicFilesystem) Watch(name string, ignore Matcher, ctx context.Context } errChan := make(chan error) - go f.watchLoop(name, roots, backendChan, outChan, errChan, ignore, ctx) + go f.watchLoop(ctx, name, roots, backendChan, outChan, errChan, ignore) return outChan, errChan, nil } -func (f *BasicFilesystem) watchLoop(name string, roots []string, backendChan chan notify.EventInfo, outChan chan<- Event, errChan chan<- error, ignore Matcher, ctx context.Context) { +func (f *BasicFilesystem) watchLoop(ctx context.Context, name string, roots []string, backendChan chan notify.EventInfo, outChan chan<- Event, errChan chan<- error, ignore Matcher) { for { // Detect channel overflow if len(backendChan) == backendBuffer { diff --git a/lib/fs/basicfs_watch_test.go b/lib/fs/basicfs_watch_test.go index 26b01aab3..cf2cdb702 100644 --- a/lib/fs/basicfs_watch_test.go +++ b/lib/fs/basicfs_watch_test.go @@ -178,7 +178,7 @@ func TestWatchWinRoot(t *testing.T) { } cancel() }() - fs.watchLoop(".", roots, backendChan, outChan, errChan, fakeMatcher{}, ctx) + fs.watchLoop(ctx, ".", roots, backendChan, outChan, errChan, fakeMatcher{}) }() // filepath.Dir as watch has a /... suffix @@ -219,7 +219,7 @@ func expectErrorForPath(t *testing.T, path string) { // testFs is Filesystem, but we need BasicFilesystem here fs := newBasicFilesystem(testDirAbs) - go fs.watchLoop(".", []string{testDirAbs}, backendChan, outChan, errChan, fakeMatcher{}, ctx) + go fs.watchLoop(ctx, ".", []string{testDirAbs}, backendChan, outChan, errChan, fakeMatcher{}) backendChan <- fakeEventInfo(path) @@ -244,7 +244,7 @@ func TestWatchSubpath(t *testing.T) { fs := newBasicFilesystem(testDirAbs) abs, _ := fs.rooted("sub") - go fs.watchLoop("sub", []string{testDirAbs}, backendChan, outChan, errChan, fakeMatcher{}, ctx) + go fs.watchLoop(ctx, "sub", []string{testDirAbs}, backendChan, outChan, errChan, fakeMatcher{}) backendChan <- fakeEventInfo(filepath.Join(abs, "file")) diff --git a/lib/model/folder.go b/lib/model/folder.go index 1055c62ce..109720955 100644 --- a/lib/model/folder.go +++ b/lib/model/folder.go @@ -49,7 +49,6 @@ type folder struct { fset *db.FileSet ignores *ignore.Matcher ctx context.Context - cancel context.CancelFunc scanInterval time.Duration scanTimer *time.Timer @@ -80,8 +79,6 @@ type puller interface { } func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, evLogger events.Logger) folder { - ctx, cancel := context.WithCancel(context.Background()) - return folder{ stateTracker: newStateTracker(cfg.ID, evLogger), FolderConfiguration: cfg, @@ -91,8 +88,6 @@ func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg conf shortID: model.shortID, fset: fset, ignores: ignores, - ctx: ctx, - cancel: cancel, scanInterval: time.Duration(cfg.RescanIntervalS) * time.Second, scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately. @@ -109,10 +104,12 @@ func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg conf } } -func (f *folder) serve(_ chan struct{}) { +func (f *folder) serve(ctx context.Context) { atomic.AddInt32(&f.model.foldersRunning, 1) defer atomic.AddInt32(&f.model.foldersRunning, -1) + f.ctx = ctx + l.Debugln(f, "starting") defer l.Debugln(f, "exiting") @@ -256,11 +253,6 @@ func (f *folder) Delay(next time.Duration) { f.scanDelay <- next } -func (f *folder) Stop() { - f.cancel() - f.Service.Stop() -} - // CheckHealth checks the folder for common errors, updates the folder state // and returns the current folder error, or nil if the folder is healthy. func (f *folder) CheckHealth() error { @@ -643,7 +635,7 @@ func (f *folder) monitorWatch(ctx context.Context) { failTimer.Reset(time.Minute) continue } - watchaggregator.Aggregate(eventChan, f.watchChan, f.FolderConfiguration, f.model.cfg, f.evLogger, aggrCtx) + watchaggregator.Aggregate(aggrCtx, eventChan, f.watchChan, f.FolderConfiguration, f.model.cfg, f.evLogger) l.Debugln("Started filesystem watcher for folder", f.Description()) case err = <-errChan: f.setWatchError(err) diff --git a/lib/model/folder_sendonly.go b/lib/model/folder_sendonly.go index 0bb7e7688..f56ce16a6 100644 --- a/lib/model/folder_sendonly.go +++ b/lib/model/folder_sendonly.go @@ -30,7 +30,7 @@ func newSendOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, folder: newFolder(model, fset, ignores, cfg, evLogger), } f.folder.puller = f - f.folder.Service = util.AsService(f.serve) + f.folder.Service = util.AsService(f.serve, f.String()) return f } diff --git a/lib/model/folder_sendrecv.go b/lib/model/folder_sendrecv.go index 7d3051c0c..e52e91073 100644 --- a/lib/model/folder_sendrecv.go +++ b/lib/model/folder_sendrecv.go @@ -118,7 +118,7 @@ func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matche pullErrorsMut: sync.NewMutex(), } f.folder.puller = f - f.folder.Service = util.AsService(f.serve) + f.folder.Service = util.AsService(f.serve, f.String()) if f.Copiers == 0 { f.Copiers = defaultCopiers diff --git a/lib/model/folder_summary.go b/lib/model/folder_summary.go index d4bda64e0..78addb555 100644 --- a/lib/model/folder_summary.go +++ b/lib/model/folder_summary.go @@ -7,6 +7,7 @@ package model import ( + "context" "fmt" "strings" "time" @@ -63,8 +64,8 @@ func NewFolderSummaryService(cfg config.Wrapper, m Model, id protocol.DeviceID, lastEventReqMut: sync.NewMutex(), } - service.Add(util.AsService(service.listenForUpdates)) - service.Add(util.AsService(service.calculateSummaries)) + service.Add(util.AsService(service.listenForUpdates, fmt.Sprintf("%s/listenForUpdates", service))) + service.Add(util.AsService(service.calculateSummaries, fmt.Sprintf("%s/calculateSummaries", service))) return service } @@ -145,7 +146,7 @@ func (c *folderSummaryService) OnEventRequest() { // listenForUpdates subscribes to the event bus and makes note of folders that // need their data recalculated. -func (c *folderSummaryService) listenForUpdates(stop chan struct{}) { +func (c *folderSummaryService) listenForUpdates(ctx context.Context) { sub := c.evLogger.Subscribe(events.LocalIndexUpdated | events.RemoteIndexUpdated | events.StateChanged | events.RemoteDownloadProgress | events.DeviceConnected | events.FolderWatchStateChanged | events.DownloadProgress) defer sub.Unsubscribe() @@ -155,7 +156,7 @@ func (c *folderSummaryService) listenForUpdates(stop chan struct{}) { select { case ev := <-sub.C(): c.processUpdate(ev) - case <-stop: + case <-ctx.Done(): return } } @@ -234,7 +235,7 @@ func (c *folderSummaryService) processUpdate(ev events.Event) { // calculateSummaries periodically recalculates folder summaries and // completion percentage, and sends the results on the event bus. -func (c *folderSummaryService) calculateSummaries(stop chan struct{}) { +func (c *folderSummaryService) calculateSummaries(ctx context.Context) { const pumpInterval = 2 * time.Second pump := time.NewTimer(pumpInterval) @@ -255,7 +256,7 @@ func (c *folderSummaryService) calculateSummaries(stop chan struct{}) { case folder := <-c.immediate: c.sendSummary(folder) - case <-stop: + case <-ctx.Done(): return } } diff --git a/lib/model/model.go b/lib/model/model.go index 4df65ea7f..522a2c7e7 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -1227,7 +1227,7 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon dropSymlinks: dropSymlinks, evLogger: m.evLogger, } - is.Service = util.AsService(is.serve) + is.Service = util.AsService(is.serve, is.String()) // The token isn't tracked as the service stops when the connection // terminates and is automatically removed from supervisor (by // implementing suture.IsCompletable). @@ -1970,7 +1970,7 @@ type indexSender struct { connClosed chan struct{} } -func (s *indexSender) serve(stop chan struct{}) { +func (s *indexSender) serve(ctx context.Context) { var err error l.Debugf("Starting indexSender for %s to %s at %s (slv=%d)", s.folder, s.dev, s.conn, s.prevSequence) @@ -1991,7 +1991,7 @@ func (s *indexSender) serve(stop chan struct{}) { for err == nil { select { - case <-stop: + case <-ctx.Done(): return case <-s.connClosed: return @@ -2004,7 +2004,7 @@ func (s *indexSender) serve(stop chan struct{}) { // sending for. if s.fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence { select { - case <-stop: + case <-ctx.Done(): return case <-s.connClosed: return @@ -2037,7 +2037,7 @@ func (s *indexSender) sendIndexTo() error { initial := s.prevSequence == 0 batch := newFileInfoBatch(nil) batch.flushFn = func(fs []protocol.FileInfo) error { - l.Debugf("Sending indexes for %s to %s at %s: %d files (<%d bytes)", s.folder, s.dev, s.conn, len(batch.infos), batch.size) + l.Debugf("%v: Sending %d files (<%d bytes)", s, len(batch.infos), batch.size) if initial { initial = false return s.conn.Index(s.folder, fs) @@ -2099,6 +2099,10 @@ func (s *indexSender) sendIndexTo() error { return err } +func (s *indexSender) String() string { + return fmt.Sprintf("indexSender@%p for %s to %s at %s", s, s.folder, s.dev, s.conn) +} + func (m *model) requestGlobal(ctx context.Context, deviceID protocol.DeviceID, folder, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) { m.pmut.RLock() nc, ok := m.conn[deviceID] diff --git a/lib/model/progressemitter.go b/lib/model/progressemitter.go index fed22e259..6c2800676 100644 --- a/lib/model/progressemitter.go +++ b/lib/model/progressemitter.go @@ -7,6 +7,7 @@ package model import ( + "context" "fmt" "time" @@ -47,7 +48,7 @@ func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmi evLogger: evLogger, mut: sync.NewMutex(), } - t.Service = util.AsService(t.serve) + t.Service = util.AsService(t.serve, t.String()) t.CommitConfiguration(config.Configuration{}, cfg.RawCopy()) cfg.Subscribe(t) @@ -57,12 +58,12 @@ func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmi // serve starts the progress emitter which starts emitting DownloadProgress // events as the progress happens. -func (t *ProgressEmitter) serve(stop chan struct{}) { +func (t *ProgressEmitter) serve(ctx context.Context) { var lastUpdate time.Time var lastCount, newCount int for { select { - case <-stop: + case <-ctx.Done(): l.Debugln("progress emitter: stopping") return case <-t.timer.C: diff --git a/lib/nat/registry.go b/lib/nat/registry.go index 0c04e11aa..889884759 100644 --- a/lib/nat/registry.go +++ b/lib/nat/registry.go @@ -7,6 +7,7 @@ package nat import ( + "context" "sync" "time" ) @@ -19,7 +20,7 @@ func Register(provider DiscoverFunc) { providers = append(providers, provider) } -func discoverAll(renewal, timeout time.Duration, stop chan struct{}) map[string]Device { +func discoverAll(ctx context.Context, renewal, timeout time.Duration) map[string]Device { wg := &sync.WaitGroup{} wg.Add(len(providers)) @@ -32,7 +33,7 @@ func discoverAll(renewal, timeout time.Duration, stop chan struct{}) map[string] for _, dev := range f(renewal, timeout) { select { case c <- dev: - case <-stop: + case <-ctx.Done(): return } } @@ -50,7 +51,7 @@ func discoverAll(renewal, timeout time.Duration, stop chan struct{}) map[string] return } nats[dev.ID()] = dev - case <-stop: + case <-ctx.Done(): return } } diff --git a/lib/nat/service.go b/lib/nat/service.go index e54e25f58..6e2bc2bf1 100644 --- a/lib/nat/service.go +++ b/lib/nat/service.go @@ -7,6 +7,7 @@ package nat import ( + "context" "fmt" "hash/fnv" "math/rand" @@ -43,11 +44,11 @@ func NewService(id protocol.DeviceID, cfg config.Wrapper) *Service { timer: time.NewTimer(0), mut: sync.NewRWMutex(), } - s.Service = util.AsService(s.serve) + s.Service = util.AsService(s.serve, s.String()) return s } -func (s *Service) serve(stop chan struct{}) { +func (s *Service) serve(ctx context.Context) { announce := stdsync.Once{} s.mut.Lock() @@ -57,7 +58,7 @@ func (s *Service) serve(stop chan struct{}) { for { select { case <-s.timer.C: - if found := s.process(stop); found != -1 { + if found := s.process(ctx); found != -1 { announce.Do(func() { suffix := "s" if found == 1 { @@ -66,7 +67,7 @@ func (s *Service) serve(stop chan struct{}) { l.Infoln("Detected", found, "NAT service"+suffix) }) } - case <-stop: + case <-ctx.Done(): s.timer.Stop() s.mut.RLock() for _, mapping := range s.mappings { @@ -78,7 +79,7 @@ func (s *Service) serve(stop chan struct{}) { } } -func (s *Service) process(stop chan struct{}) int { +func (s *Service) process(ctx context.Context) int { // toRenew are mappings which are due for renewal // toUpdate are the remaining mappings, which will only be updated if one of // the old IGDs has gone away, or a new IGD has appeared, but only if we @@ -120,14 +121,14 @@ func (s *Service) process(stop chan struct{}) int { return -1 } - nats := discoverAll(time.Duration(s.cfg.Options().NATRenewalM)*time.Minute, time.Duration(s.cfg.Options().NATTimeoutS)*time.Second, stop) + nats := discoverAll(ctx, time.Duration(s.cfg.Options().NATRenewalM)*time.Minute, time.Duration(s.cfg.Options().NATTimeoutS)*time.Second) for _, mapping := range toRenew { - s.updateMapping(mapping, nats, true, stop) + s.updateMapping(ctx, mapping, nats, true) } for _, mapping := range toUpdate { - s.updateMapping(mapping, nats, false, stop) + s.updateMapping(ctx, mapping, nats, false) } return len(nats) @@ -177,17 +178,17 @@ func (s *Service) RemoveMapping(mapping *Mapping) { // acquire mappings for natds which the mapping was unaware of before. // Optionally takes renew flag which indicates whether or not we should renew // mappings with existing natds -func (s *Service) updateMapping(mapping *Mapping, nats map[string]Device, renew bool, stop chan struct{}) { +func (s *Service) updateMapping(ctx context.Context, mapping *Mapping, nats map[string]Device, renew bool) { var added, removed []Address renewalTime := time.Duration(s.cfg.Options().NATRenewalM) * time.Minute mapping.expires = time.Now().Add(renewalTime) - newAdded, newRemoved := s.verifyExistingMappings(mapping, nats, renew, stop) + newAdded, newRemoved := s.verifyExistingMappings(ctx, mapping, nats, renew) added = append(added, newAdded...) removed = append(removed, newRemoved...) - newAdded, newRemoved = s.acquireNewMappings(mapping, nats, stop) + newAdded, newRemoved = s.acquireNewMappings(ctx, mapping, nats) added = append(added, newAdded...) removed = append(removed, newRemoved...) @@ -196,14 +197,14 @@ func (s *Service) updateMapping(mapping *Mapping, nats map[string]Device, renew } } -func (s *Service) verifyExistingMappings(mapping *Mapping, nats map[string]Device, renew bool, stop chan struct{}) ([]Address, []Address) { +func (s *Service) verifyExistingMappings(ctx context.Context, mapping *Mapping, nats map[string]Device, renew bool) ([]Address, []Address) { var added, removed []Address leaseTime := time.Duration(s.cfg.Options().NATLeaseM) * time.Minute for id, address := range mapping.addressMap() { select { - case <-stop: + case <-ctx.Done(): return nil, nil default: } @@ -225,7 +226,7 @@ func (s *Service) verifyExistingMappings(mapping *Mapping, nats map[string]Devic l.Debugf("Renewing %s -> %s mapping on %s", mapping, address, id) - addr, err := s.tryNATDevice(nat, mapping.address.Port, address.Port, leaseTime, stop) + addr, err := s.tryNATDevice(ctx, nat, mapping.address.Port, address.Port, leaseTime) if err != nil { l.Debugf("Failed to renew %s -> mapping on %s", mapping, address, id) mapping.removeAddress(id) @@ -247,7 +248,7 @@ func (s *Service) verifyExistingMappings(mapping *Mapping, nats map[string]Devic return added, removed } -func (s *Service) acquireNewMappings(mapping *Mapping, nats map[string]Device, stop chan struct{}) ([]Address, []Address) { +func (s *Service) acquireNewMappings(ctx context.Context, mapping *Mapping, nats map[string]Device) ([]Address, []Address) { var added, removed []Address leaseTime := time.Duration(s.cfg.Options().NATLeaseM) * time.Minute @@ -255,7 +256,7 @@ func (s *Service) acquireNewMappings(mapping *Mapping, nats map[string]Device, s for id, nat := range nats { select { - case <-stop: + case <-ctx.Done(): return nil, nil default: } @@ -274,7 +275,7 @@ func (s *Service) acquireNewMappings(mapping *Mapping, nats map[string]Device, s l.Debugf("Acquiring %s mapping on %s", mapping, id) - addr, err := s.tryNATDevice(nat, mapping.address.Port, 0, leaseTime, stop) + addr, err := s.tryNATDevice(ctx, nat, mapping.address.Port, 0, leaseTime) if err != nil { l.Debugf("Failed to acquire %s mapping on %s", mapping, id) continue @@ -291,7 +292,7 @@ func (s *Service) acquireNewMappings(mapping *Mapping, nats map[string]Device, s // tryNATDevice tries to acquire a port mapping for the given internal address to // the given external port. If external port is 0, picks a pseudo-random port. -func (s *Service) tryNATDevice(natd Device, intPort, extPort int, leaseTime time.Duration, stop chan struct{}) (Address, error) { +func (s *Service) tryNATDevice(ctx context.Context, natd Device, intPort, extPort int, leaseTime time.Duration) (Address, error) { var err error var port int @@ -313,7 +314,7 @@ func (s *Service) tryNATDevice(natd Device, intPort, extPort int, leaseTime time for i := 0; i < 10; i++ { select { - case <-stop: + case <-ctx.Done(): return Address{}, nil default: } @@ -343,6 +344,10 @@ findIP: }, nil } +func (s *Service) String() string { + return fmt.Sprintf("nat.Service@%p", s) +} + func hash(input string) int64 { h := fnv.New64a() h.Write([]byte(input)) diff --git a/lib/relay/client/client.go b/lib/relay/client/client.go index 9ece6088a..887234236 100644 --- a/lib/relay/client/client.go +++ b/lib/relay/client/client.go @@ -3,6 +3,7 @@ package client import ( + "context" "crypto/tls" "fmt" "net/url" @@ -51,16 +52,16 @@ type commonClient struct { mut sync.RWMutex } -func newCommonClient(invitations chan protocol.SessionInvitation, serve func(chan struct{}) error) commonClient { +func newCommonClient(invitations chan protocol.SessionInvitation, serve func(context.Context) error, creator string) commonClient { c := commonClient{ invitations: invitations, mut: sync.NewRWMutex(), } - newServe := func(stop chan struct{}) error { + newServe := func(ctx context.Context) error { defer c.cleanup() - return serve(stop) + return serve(ctx) } - c.ServiceWithError = util.AsServiceWithError(newServe) + c.ServiceWithError = util.AsServiceWithError(newServe, creator) if c.invitations == nil { c.closeInvitationsOnFinish = true c.invitations = make(chan protocol.SessionInvitation) diff --git a/lib/relay/client/dynamic.go b/lib/relay/client/dynamic.go index f2ad0c401..25c67544e 100644 --- a/lib/relay/client/dynamic.go +++ b/lib/relay/client/dynamic.go @@ -3,6 +3,7 @@ package client import ( + "context" "crypto/tls" "encoding/json" "fmt" @@ -32,11 +33,11 @@ func newDynamicClient(uri *url.URL, certs []tls.Certificate, invitations chan pr certs: certs, timeout: timeout, } - c.commonClient = newCommonClient(invitations, c.serve) + c.commonClient = newCommonClient(invitations, c.serve, c.String()) return c } -func (c *dynamicClient) serve(stop chan struct{}) error { +func (c *dynamicClient) serve(ctx context.Context) error { uri := *c.pooladdr // Trim off the `dynamic+` prefix @@ -69,9 +70,9 @@ func (c *dynamicClient) serve(stop chan struct{}) error { addrs = append(addrs, ruri.String()) } - for _, addr := range relayAddressesOrder(addrs, stop) { + for _, addr := range relayAddressesOrder(ctx, addrs) { select { - case <-stop: + case <-ctx.Done(): l.Debugln(c, "stopping") return nil default: @@ -148,7 +149,7 @@ type dynamicAnnouncement struct { // the closest 50ms, and puts them in buckets of 50ms latency ranges. Then // shuffles each bucket, and returns all addresses starting with the ones from // the lowest latency bucket, ending with the highest latency buceket. -func relayAddressesOrder(input []string, stop chan struct{}) []string { +func relayAddressesOrder(ctx context.Context, input []string) []string { buckets := make(map[int][]string) for _, relay := range input { @@ -162,7 +163,7 @@ func relayAddressesOrder(input []string, stop chan struct{}) []string { buckets[id] = append(buckets[id], relay) select { - case <-stop: + case <-ctx.Done(): return nil default: } diff --git a/lib/relay/client/static.go b/lib/relay/client/static.go index c6cbdc7f7..df38c5ef8 100644 --- a/lib/relay/client/static.go +++ b/lib/relay/client/static.go @@ -3,6 +3,7 @@ package client import ( + "context" "crypto/tls" "fmt" "net" @@ -39,11 +40,11 @@ func newStaticClient(uri *url.URL, certs []tls.Certificate, invitations chan pro messageTimeout: time.Minute * 2, connectTimeout: timeout, } - c.commonClient = newCommonClient(invitations, c.serve) + c.commonClient = newCommonClient(invitations, c.serve, c.String()) return c } -func (c *staticClient) serve(stop chan struct{}) error { +func (c *staticClient) serve(ctx context.Context) error { if err := c.connect(); err != nil { l.Infof("Could not connect to relay %s: %s", c.uri, err) return err @@ -72,7 +73,7 @@ func (c *staticClient) serve(stop chan struct{}) error { messages := make(chan interface{}) errors := make(chan error, 1) - go messageReader(c.conn, messages, errors, stop) + go messageReader(ctx, c.conn, messages, errors) timeout := time.NewTimer(c.messageTimeout) @@ -106,7 +107,7 @@ func (c *staticClient) serve(stop chan struct{}) error { return fmt.Errorf("protocol error: unexpected message %v", msg) } - case <-stop: + case <-ctx.Done(): l.Debugln(c, "stopping") return nil @@ -241,7 +242,7 @@ func performHandshakeAndValidation(conn *tls.Conn, uri *url.URL) error { return nil } -func messageReader(conn net.Conn, messages chan<- interface{}, errors chan<- error, stop chan struct{}) { +func messageReader(ctx context.Context, conn net.Conn, messages chan<- interface{}, errors chan<- error) { for { msg, err := protocol.ReadMessage(conn) if err != nil { @@ -250,7 +251,7 @@ func messageReader(conn net.Conn, messages chan<- interface{}, errors chan<- err } select { case messages <- msg: - case <-stop: + case <-ctx.Done(): return } } diff --git a/lib/stun/stun.go b/lib/stun/stun.go index 87ffbd7a7..0b27f8093 100644 --- a/lib/stun/stun.go +++ b/lib/stun/stun.go @@ -7,6 +7,7 @@ package stun import ( + "context" "net" "sync/atomic" "time" @@ -104,7 +105,7 @@ func New(cfg config.Wrapper, subscriber Subscriber, conn net.PacketConn) (*Servi natType: NATUnknown, addr: nil, } - s.Service = util.AsService(s.serve) + s.Service = util.AsService(s.serve, s.String()) return s, otherDataConn } @@ -113,7 +114,7 @@ func (s *Service) Stop() { s.Service.Stop() } -func (s *Service) serve(stop chan struct{}) { +func (s *Service) serve(ctx context.Context) { for { disabled: s.setNATType(NATUnknown) @@ -121,7 +122,7 @@ func (s *Service) serve(stop chan struct{}) { if s.cfg.Options().IsStunDisabled() { select { - case <-stop: + case <-ctx.Done(): return case <-time.After(time.Second): continue @@ -134,12 +135,12 @@ func (s *Service) serve(stop chan struct{}) { // This blocks until we hit an exit condition or there are issues with the STUN server. // This returns a boolean signifying if a different STUN server should be tried (oppose to the whole thing // shutting down and this winding itself down. - if !s.runStunForServer(addr, stop) { + if !s.runStunForServer(ctx, addr) { // Check exit conditions. // Have we been asked to stop? select { - case <-stop: + case <-ctx.Done(): return default: } @@ -165,13 +166,13 @@ func (s *Service) serve(stop chan struct{}) { // Chillout for a while. select { case <-time.After(stunRetryInterval): - case <-stop: + case <-ctx.Done(): return } } } -func (s *Service) runStunForServer(addr string, stop chan struct{}) (tryNext bool) { +func (s *Service) runStunForServer(ctx context.Context, addr string) (tryNext bool) { l.Debugf("Running stun for %s via %s", s, addr) // Resolve the address, so that in case the server advertises two @@ -209,10 +210,10 @@ func (s *Service) runStunForServer(addr string, stop chan struct{}) (tryNext boo return false } - return s.stunKeepAlive(addr, extAddr, stop) + return s.stunKeepAlive(ctx, addr, extAddr) } -func (s *Service) stunKeepAlive(addr string, extAddr *Host, stop chan struct{}) (tryNext bool) { +func (s *Service) stunKeepAlive(ctx context.Context, addr string, extAddr *Host) (tryNext bool) { var err error nextSleep := time.Duration(s.cfg.Options().StunKeepaliveStartS) * time.Second @@ -255,7 +256,7 @@ func (s *Service) stunKeepAlive(addr string, extAddr *Host, stop chan struct{}) select { case <-time.After(sleepFor): - case <-stop: + case <-ctx.Done(): l.Debugf("%s stopping, aborting stun", s) return false } diff --git a/lib/syncthing/auditservice.go b/lib/syncthing/auditservice.go index 863a437e0..05c85190c 100644 --- a/lib/syncthing/auditservice.go +++ b/lib/syncthing/auditservice.go @@ -7,7 +7,9 @@ package syncthing import ( + "context" "encoding/json" + "fmt" "io" "github.com/thejerf/suture" @@ -29,19 +31,19 @@ func newAuditService(w io.Writer, evLogger events.Logger) *auditService { w: w, sub: evLogger.Subscribe(events.AllEvents), } - s.Service = util.AsService(s.serve) + s.Service = util.AsService(s.serve, s.String()) return s } // serve runs the audit service. -func (s *auditService) serve(stop chan struct{}) { +func (s *auditService) serve(ctx context.Context) { enc := json.NewEncoder(s.w) for { select { case ev := <-s.sub.C(): enc.Encode(ev) - case <-stop: + case <-ctx.Done(): return } } @@ -52,3 +54,7 @@ func (s *auditService) Stop() { s.Service.Stop() s.sub.Unsubscribe() } + +func (s *auditService) String() string { + return fmt.Sprintf("auditService@%p", s) +} diff --git a/lib/syncthing/verboseservice.go b/lib/syncthing/verboseservice.go index c0c43cd86..80c2a2368 100644 --- a/lib/syncthing/verboseservice.go +++ b/lib/syncthing/verboseservice.go @@ -7,6 +7,7 @@ package syncthing import ( + "context" "fmt" "github.com/thejerf/suture" @@ -26,12 +27,12 @@ func newVerboseService(evLogger events.Logger) *verboseService { s := &verboseService{ sub: evLogger.Subscribe(events.AllEvents), } - s.Service = util.AsService(s.serve) + s.Service = util.AsService(s.serve, s.String()) return s } // serve runs the verbose logging service. -func (s *verboseService) serve(stop chan struct{}) { +func (s *verboseService) serve(ctx context.Context) { for { select { case ev := <-s.sub.C(): @@ -39,7 +40,7 @@ func (s *verboseService) serve(stop chan struct{}) { if formatted != "" { l.Verboseln(formatted) } - case <-stop: + case <-ctx.Done(): return } } @@ -187,3 +188,7 @@ func (s *verboseService) formatEvent(ev events.Event) string { return fmt.Sprintf("%s %#v", ev.Type, ev) } + +func (s *verboseService) String() string { + return fmt.Sprintf("verboseService@%p", s) +} diff --git a/lib/ur/usage_report.go b/lib/ur/usage_report.go index 4ca53b5ca..a8d619541 100644 --- a/lib/ur/usage_report.go +++ b/lib/ur/usage_report.go @@ -57,7 +57,7 @@ func New(cfg config.Wrapper, m model.Model, connectionsService connections.Servi noUpgrade: noUpgrade, forceRun: make(chan struct{}, 1), // Buffered to prevent locking } - svc.Service = util.AsService(svc.serve) + svc.Service = util.AsService(svc.serve, svc.String()) cfg.Subscribe(svc) return svc } @@ -384,11 +384,11 @@ func (s *Service) sendUsageReport() error { return err } -func (s *Service) serve(stop chan struct{}) { +func (s *Service) serve(ctx context.Context) { t := time.NewTimer(time.Duration(s.cfg.Options().URInitialDelayS) * time.Second) for { select { - case <-stop: + case <-ctx.Done(): return case <-s.forceRun: t.Reset(0) diff --git a/lib/util/utils.go b/lib/util/utils.go index 81b999c31..010418f71 100644 --- a/lib/util/utils.go +++ b/lib/util/utils.go @@ -7,10 +7,10 @@ package util import ( + "context" "fmt" "net/url" "reflect" - "runtime" "strconv" "strings" @@ -178,11 +178,11 @@ func Address(network, host string) string { // AsService wraps the given function to implement suture.Service by calling // that function on serve and closing the passed channel when Stop is called. -func AsService(fn func(stop chan struct{})) suture.Service { - return asServiceWithError(func(stop chan struct{}) error { - fn(stop) +func AsService(fn func(ctx context.Context), creator string) suture.Service { + return asServiceWithError(func(ctx context.Context) error { + fn(ctx) return nil - }) + }, creator) } type ServiceWithError interface { @@ -194,25 +194,18 @@ type ServiceWithError interface { // AsServiceWithError does the same as AsService, except that it keeps track // of an error returned by the given function. -func AsServiceWithError(fn func(stop chan struct{}) error) ServiceWithError { - return asServiceWithError(fn) +func AsServiceWithError(fn func(ctx context.Context) error, creator string) ServiceWithError { + return asServiceWithError(fn, creator) } -// caller retrieves information about the creator of the service, i.e. the stack -// two levels up from itself. -func caller() string { - pc := make([]uintptr, 1) - _ = runtime.Callers(4, pc) - f, _ := runtime.CallersFrames(pc).Next() - return f.Function -} - -func asServiceWithError(fn func(stop chan struct{}) error) ServiceWithError { +func asServiceWithError(fn func(ctx context.Context) error, creator string) ServiceWithError { + ctx, cancel := context.WithCancel(context.Background()) s := &service{ - caller: caller(), serve: fn, - stop: make(chan struct{}), + ctx: ctx, + cancel: cancel, stopped: make(chan struct{}), + creator: creator, mut: sync.NewMutex(), } close(s.stopped) // not yet started, don't block on Stop() @@ -220,9 +213,10 @@ func asServiceWithError(fn func(stop chan struct{}) error) ServiceWithError { } type service struct { - caller string - serve func(stop chan struct{}) error - stop chan struct{} + creator string + serve func(ctx context.Context) error + ctx context.Context + cancel context.CancelFunc stopped chan struct{} err error mut sync.Mutex @@ -231,7 +225,7 @@ type service struct { func (s *service) Serve() { s.mut.Lock() select { - case <-s.stop: + case <-s.ctx.Done(): s.mut.Unlock() return default: @@ -247,16 +241,16 @@ func (s *service) Serve() { close(s.stopped) s.mut.Unlock() }() - err = s.serve(s.stop) + err = s.serve(s.ctx) } func (s *service) Stop() { s.mut.Lock() select { - case <-s.stop: + case <-s.ctx.Done(): panic(fmt.Sprintf("Stop called more than once on %v", s)) default: - close(s.stop) + s.cancel() } s.mut.Unlock() <-s.stopped @@ -275,5 +269,5 @@ func (s *service) SetError(err error) { } func (s *service) String() string { - return fmt.Sprintf("Service@%p created by %v", s, s.caller) + return fmt.Sprintf("Service@%p created by %v", s, s.creator) } diff --git a/lib/util/utils_test.go b/lib/util/utils_test.go index d623f9c2c..747bc9064 100644 --- a/lib/util/utils_test.go +++ b/lib/util/utils_test.go @@ -7,6 +7,7 @@ package util import ( + "context" "strings" "testing" ) @@ -227,17 +228,17 @@ func TestCopyMatching(t *testing.T) { } func TestUtilStopTwicePanic(t *testing.T) { - s := AsService(func(stop chan struct{}) { - <-stop - }) + name := "foo" + s := AsService(func(ctx context.Context) { + <-ctx.Done() + }, name) go s.Serve() s.Stop() defer func() { - expected := "lib/util.TestUtilStopTwicePanic" - if r := recover(); r == nil || !strings.Contains(r.(string), expected) { - t.Fatalf(`expected panic containing "%v", got "%v"`, expected, r) + if r := recover(); r == nil || !strings.Contains(r.(string), name) { + t.Fatalf(`expected panic containing "%v", got "%v"`, name, r) } }() s.Stop() diff --git a/lib/versioner/staggered.go b/lib/versioner/staggered.go index 3d708b05e..69a0e88f1 100644 --- a/lib/versioner/staggered.go +++ b/lib/versioner/staggered.go @@ -7,6 +7,8 @@ package versioner import ( + "context" + "fmt" "sort" "strconv" "time" @@ -65,13 +67,13 @@ func NewStaggered(folderID string, folderFs fs.Filesystem, params map[string]str }, mutex: sync.NewMutex(), } - s.Service = util.AsService(s.serve) + s.Service = util.AsService(s.serve, s.String()) l.Debugf("instantiated %#v", s) return s } -func (v *Staggered) serve(stop chan struct{}) { +func (v *Staggered) serve(ctx context.Context) { v.clean() if v.testCleanDone != nil { close(v.testCleanDone) @@ -83,7 +85,7 @@ func (v *Staggered) serve(stop chan struct{}) { select { case <-tck.C: v.clean() - case <-stop: + case <-ctx.Done(): return } } @@ -230,3 +232,7 @@ func (v *Staggered) GetVersions() (map[string][]FileVersion, error) { func (v *Staggered) Restore(filepath string, versionTime time.Time) error { return restoreFile(v.versionsFs, v.folderFs, filepath, versionTime, TagFilename) } + +func (v *Staggered) String() string { + return fmt.Sprintf("Staggered/@%p", v) +} diff --git a/lib/versioner/trashcan.go b/lib/versioner/trashcan.go index d1438a5b6..bd408135e 100644 --- a/lib/versioner/trashcan.go +++ b/lib/versioner/trashcan.go @@ -7,6 +7,7 @@ package versioner import ( + "context" "fmt" "strconv" "time" @@ -38,7 +39,7 @@ func NewTrashcan(folderID string, folderFs fs.Filesystem, params map[string]stri versionsFs: fsFromParams(folderFs, params), cleanoutDays: cleanoutDays, } - s.Service = util.AsService(s.serve) + s.Service = util.AsService(s.serve, s.String()) l.Debugf("instantiated %#v", s) return s @@ -52,7 +53,7 @@ func (t *Trashcan) Archive(filePath string) error { }) } -func (t *Trashcan) serve(stop chan struct{}) { +func (t *Trashcan) serve(ctx context.Context) { l.Debugln(t, "starting") defer l.Debugln(t, "stopping") @@ -62,7 +63,7 @@ func (t *Trashcan) serve(stop chan struct{}) { for { select { - case <-stop: + case <-ctx.Done(): return case <-timer.C: diff --git a/lib/watchaggregator/aggregator.go b/lib/watchaggregator/aggregator.go index ecc03ad4a..e69ed5257 100644 --- a/lib/watchaggregator/aggregator.go +++ b/lib/watchaggregator/aggregator.go @@ -109,7 +109,7 @@ type aggregator struct { ctx context.Context } -func newAggregator(folderCfg config.FolderConfiguration, ctx context.Context) *aggregator { +func newAggregator(ctx context.Context, folderCfg config.FolderConfiguration) *aggregator { a := &aggregator{ folderID: folderCfg.ID, folderCfgUpdate: make(chan config.FolderConfiguration), @@ -125,8 +125,8 @@ func newAggregator(folderCfg config.FolderConfiguration, ctx context.Context) *a return a } -func Aggregate(in <-chan fs.Event, out chan<- []string, folderCfg config.FolderConfiguration, cfg config.Wrapper, evLogger events.Logger, ctx context.Context) { - a := newAggregator(folderCfg, ctx) +func Aggregate(ctx context.Context, in <-chan fs.Event, out chan<- []string, folderCfg config.FolderConfiguration, cfg config.Wrapper, evLogger events.Logger) { + a := newAggregator(ctx, folderCfg) // Necessary for unit tests where the backend is mocked go a.mainLoop(in, out, cfg, evLogger) diff --git a/lib/watchaggregator/aggregator_test.go b/lib/watchaggregator/aggregator_test.go index c317642c8..4d860c90b 100644 --- a/lib/watchaggregator/aggregator_test.go +++ b/lib/watchaggregator/aggregator_test.go @@ -67,7 +67,7 @@ func TestAggregate(t *testing.T) { folderCfg.ID = "Aggregate" ctx, cancel := context.WithCancel(context.Background()) defer cancel() - a := newAggregator(folderCfg, ctx) + a := newAggregator(ctx, folderCfg) // checks whether maxFilesPerDir events in one dir are kept as is for i := 0; i < maxFilesPerDir; i++ { @@ -95,7 +95,7 @@ func TestAggregate(t *testing.T) { compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"}) // again test aggregation in "parent" but with event in subdirs - a = newAggregator(folderCfg, ctx) + a = newAggregator(ctx, folderCfg) for i := 0; i < maxFilesPerDir; i++ { a.newEvent(fs.Event{ Name: filepath.Join("parent", strconv.Itoa(i)), @@ -109,7 +109,7 @@ func TestAggregate(t *testing.T) { compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"}) // test aggregation in root - a = newAggregator(folderCfg, ctx) + a = newAggregator(ctx, folderCfg) for i := 0; i < maxFiles; i++ { a.newEvent(fs.Event{ Name: strconv.Itoa(i), @@ -132,7 +132,7 @@ func TestAggregate(t *testing.T) { }, inProgress) compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"."}) - a = newAggregator(folderCfg, ctx) + a = newAggregator(ctx, folderCfg) filesPerDir := maxFilesPerDir / 2 dirs := make([]string, maxFiles/filesPerDir+1) for i := 0; i < maxFiles/filesPerDir+1; i++ { @@ -293,7 +293,7 @@ func testScenario(t *testing.T, name string, testCase func(c chan<- fs.Event), e folderCfg := defaultFolderCfg.Copy() folderCfg.ID = name - a := newAggregator(folderCfg, ctx) + a := newAggregator(ctx, folderCfg) a.notifyTimeout = testNotifyTimeout startTime := time.Now()