From b1c74860e8204e601dc83d0a9fbe0b011d9728f4 Mon Sep 17 00:00:00 2001 From: Simon Frei Date: Thu, 15 Aug 2019 16:29:37 +0200 Subject: [PATCH] all: Remove global events.Default (ref #4085) (#5886) --- cmd/stcli/main.go | 3 +- cmd/stfinddevice/main.go | 3 +- cmd/strelaysrv/main.go | 3 +- cmd/syncthing/main.go | 32 +++--- cmd/syncthing/monitor.go | 3 +- lib/api/api.go | 8 +- lib/api/api_auth.go | 10 +- lib/api/api_test.go | 10 +- lib/config/commit_test.go | 2 +- lib/config/config_test.go | 55 +++++---- lib/config/wrapper.go | 20 ++-- lib/connections/lan_test.go | 3 +- lib/connections/limiter_test.go | 3 +- lib/connections/service.go | 8 +- lib/discover/global.go | 8 +- lib/discover/global_test.go | 11 +- lib/discover/local.go | 6 +- lib/discover/local_test.go | 5 +- lib/events/debug.go | 4 +- lib/events/events.go | 149 ++++++++++++++++--------- lib/events/events_test.go | 28 ++--- lib/model/folder.go | 12 +- lib/model/folder_recvonly.go | 5 +- lib/model/folder_recvonly_test.go | 1 + lib/model/folder_sendonly.go | 5 +- lib/model/folder_sendrecv.go | 38 +++---- lib/model/folder_sendrecv_test.go | 69 ++++-------- lib/model/folder_summary.go | 12 +- lib/model/folderstate.go | 8 +- lib/model/model.go | 34 +++--- lib/model/model_test.go | 12 +- lib/model/progressemitter.go | 6 +- lib/model/progressemitter_test.go | 18 ++- lib/model/requests_test.go | 12 +- lib/model/testutils_test.go | 7 +- lib/rc/rc.go | 7 +- lib/scanner/walk.go | 4 +- lib/scanner/walk_test.go | 90 +++++++-------- lib/syncthing/auditservice.go | 8 +- lib/syncthing/auditservice_test.go | 17 ++- lib/syncthing/syncthing.go | 42 +++---- lib/syncthing/syncthing_test.go | 5 +- lib/syncthing/utils.go | 13 ++- lib/syncthing/verboseservice.go | 8 +- lib/watchaggregator/aggregator.go | 10 +- lib/watchaggregator/aggregator_test.go | 24 ++-- 46 files changed, 467 insertions(+), 374 deletions(-) diff --git a/cmd/stcli/main.go b/cmd/stcli/main.go index 920778479..50daf79c9 100644 --- a/cmd/stcli/main.go +++ b/cmd/stcli/main.go @@ -22,6 +22,7 @@ import ( "github.com/pkg/errors" "github.com/syncthing/syncthing/lib/build" "github.com/syncthing/syncthing/lib/config" + "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/locations" "github.com/syncthing/syncthing/lib/protocol" "github.com/urfave/cli" @@ -85,7 +86,7 @@ func main() { myID := protocol.NewDeviceID(cert.Certificate[0]) // Load the config - cfg, err := config.Load(locations.Get(locations.ConfigFile), myID) + cfg, err := config.Load(locations.Get(locations.ConfigFile), myID, events.NoopLogger) if err != nil { log.Fatalln(errors.Wrap(err, "loading config")) } diff --git a/cmd/stfinddevice/main.go b/cmd/stfinddevice/main.go index adfa7dbc3..7790ba0e6 100644 --- a/cmd/stfinddevice/main.go +++ b/cmd/stfinddevice/main.go @@ -17,6 +17,7 @@ import ( "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/discover" + "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/protocol" ) @@ -82,7 +83,7 @@ func checkServers(deviceID protocol.DeviceID, servers ...string) { } func checkServer(deviceID protocol.DeviceID, server string) checkResult { - disco, err := discover.NewGlobal(server, tls.Certificate{}, nil) + disco, err := discover.NewGlobal(server, tls.Certificate{}, nil, events.NoopLogger) if err != nil { return checkResult{error: err} } diff --git a/cmd/strelaysrv/main.go b/cmd/strelaysrv/main.go index 87d4f776b..c5b564698 100644 --- a/cmd/strelaysrv/main.go +++ b/cmd/strelaysrv/main.go @@ -20,6 +20,7 @@ import ( "syscall" "time" + "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/osutil" "github.com/syncthing/syncthing/lib/relay/protocol" "github.com/syncthing/syncthing/lib/tlsutil" @@ -194,7 +195,7 @@ func main() { log.Println("ID:", id) } - wrapper := config.Wrap("config", config.New(id)) + wrapper := config.Wrap("config", config.New(id), events.NoopLogger) wrapper.SetOptions(config.OptionsConfiguration{ NATLeaseM: natLease, NATRenewalM: natRenewal, diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index 70c7711fa..2c54e7798 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -394,7 +394,7 @@ func main() { } func openGUI(myID protocol.DeviceID) error { - cfg, err := loadOrDefaultConfig(myID) + cfg, err := loadOrDefaultConfig(myID, events.NoopLogger) if err != nil { return err } @@ -437,7 +437,7 @@ func generate(generateDir string) error { l.Warnln("Config exists; will not overwrite.") return nil } - cfg, err := syncthing.DefaultConfig(cfgFile, myID, noDefaultFolder) + cfg, err := syncthing.DefaultConfig(cfgFile, myID, events.NoopLogger, noDefaultFolder) if err != nil { return err } @@ -471,7 +471,7 @@ func debugFacilities() string { } func checkUpgrade() upgrade.Release { - cfg, _ := loadOrDefaultConfig(protocol.EmptyDeviceID) + cfg, _ := loadOrDefaultConfig(protocol.EmptyDeviceID, events.NoopLogger) opts := cfg.Options() release, err := upgrade.LatestRelease(opts.ReleasesURL, build.Version, opts.UpgradeToPreReleases) if err != nil { @@ -512,7 +512,7 @@ func performUpgrade(release upgrade.Release) { } func upgradeViaRest() error { - cfg, _ := loadOrDefaultConfig(protocol.EmptyDeviceID) + cfg, _ := loadOrDefaultConfig(protocol.EmptyDeviceID, events.NoopLogger) u, err := url.Parse(cfg.GUI().URL()) if err != nil { return err @@ -566,7 +566,11 @@ func syncthingMain(runtimeOptions RuntimeOptions) { os.Exit(1) } - cfg, err := syncthing.LoadConfigAtStartup(locations.Get(locations.ConfigFile), cert, runtimeOptions.allowNewerConfig, noDefaultFolder) + evLogger := events.NewLogger() + go evLogger.Serve() + defer evLogger.Stop() + + cfg, err := syncthing.LoadConfigAtStartup(locations.Get(locations.ConfigFile), cert, evLogger, runtimeOptions.allowNewerConfig, noDefaultFolder) if err != nil { l.Warnln("Failed to initialize config:", err) os.Exit(exitError) @@ -594,7 +598,7 @@ func syncthingMain(runtimeOptions RuntimeOptions) { appOpts.DeadlockTimeoutS = secs } - app := syncthing.New(cfg, ldb, cert, appOpts) + app := syncthing.New(cfg, ldb, evLogger, cert, appOpts) setupSignalHandling(app) @@ -639,7 +643,7 @@ func syncthingMain(runtimeOptions RuntimeOptions) { if runtimeOptions.NoUpgrade { l.Infof("No automatic upgrades; STNOUPGRADE environment variable defined.") } else { - go autoUpgrade(cfg, app) + go autoUpgrade(cfg, app, evLogger) } } @@ -684,12 +688,12 @@ func setupSignalHandling(app *syncthing.App) { }() } -func loadOrDefaultConfig(myID protocol.DeviceID) (config.Wrapper, error) { +func loadOrDefaultConfig(myID protocol.DeviceID, evLogger events.Logger) (config.Wrapper, error) { cfgFile := locations.Get(locations.ConfigFile) - cfg, err := config.Load(cfgFile, myID) + cfg, err := config.Load(cfgFile, myID, evLogger) if err != nil { - cfg, err = syncthing.DefaultConfig(cfgFile, myID, noDefaultFolder) + cfg, err = syncthing.DefaultConfig(cfgFile, myID, evLogger, noDefaultFolder) } return cfg, err @@ -774,9 +778,9 @@ func standbyMonitor(app *syncthing.App) { } } -func autoUpgrade(cfg config.Wrapper, app *syncthing.App) { +func autoUpgrade(cfg config.Wrapper, app *syncthing.App, evLogger events.Logger) { timer := time.NewTimer(0) - sub := events.Default.Subscribe(events.DeviceConnected) + sub := evLogger.Subscribe(events.DeviceConnected) for { select { case event := <-sub.C(): @@ -798,7 +802,7 @@ func autoUpgrade(cfg config.Wrapper, app *syncthing.App) { rel, err := upgrade.LatestRelease(opts.ReleasesURL, build.Version, opts.UpgradeToPreReleases) if err == upgrade.ErrUpgradeUnsupported { - events.Default.Unsubscribe(sub) + sub.Unsubscribe() return } if err != nil { @@ -822,7 +826,7 @@ func autoUpgrade(cfg config.Wrapper, app *syncthing.App) { timer.Reset(checkInterval) continue } - events.Default.Unsubscribe(sub) + sub.Unsubscribe() l.Warnf("Automatically upgraded to version %q. Restarting in 1 minute.", rel.Tag) time.Sleep(time.Minute) app.Stop(syncthing.ExitUpgrade) diff --git a/cmd/syncthing/monitor.go b/cmd/syncthing/monitor.go index 7ff3b7a4d..e44e876e1 100644 --- a/cmd/syncthing/monitor.go +++ b/cmd/syncthing/monitor.go @@ -18,6 +18,7 @@ import ( "syscall" "time" + "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/locations" "github.com/syncthing/syncthing/lib/osutil" "github.com/syncthing/syncthing/lib/protocol" @@ -450,7 +451,7 @@ func childEnv() []string { // panicUploadMaxWait uploading panics... func maybeReportPanics() { // Try to get a config to see if/where panics should be reported. - cfg, err := loadOrDefaultConfig(protocol.EmptyDeviceID) + cfg, err := loadOrDefaultConfig(protocol.EmptyDeviceID, events.NoopLogger) if err != nil { l.Warnln("Couldn't load config; not reporting crash") return diff --git a/lib/api/api.go b/lib/api/api.go index 6a32f1813..af7675a38 100644 --- a/lib/api/api.go +++ b/lib/api/api.go @@ -71,6 +71,7 @@ type service struct { model model.Model eventSubs map[events.EventType]events.BufferedSubscription eventSubsMut sync.Mutex + evLogger events.Logger discoverer discover.CachingMux connectionsService connections.Service fss model.FolderSummaryService @@ -105,7 +106,7 @@ type Service interface { WaitForStart() error } -func New(id protocol.DeviceID, cfg config.Wrapper, assetDir, tlsDefaultCommonName string, m model.Model, defaultSub, diskSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService connections.Service, urService *ur.Service, fss model.FolderSummaryService, errors, systemLog logger.Recorder, cpu Rater, contr Controller, noUpgrade bool) Service { +func New(id protocol.DeviceID, cfg config.Wrapper, assetDir, tlsDefaultCommonName string, m model.Model, defaultSub, diskSub events.BufferedSubscription, evLogger events.Logger, discoverer discover.CachingMux, connectionsService connections.Service, urService *ur.Service, fss model.FolderSummaryService, errors, systemLog logger.Recorder, cpu Rater, contr Controller, noUpgrade bool) Service { s := &service{ id: id, cfg: cfg, @@ -116,6 +117,7 @@ func New(id protocol.DeviceID, cfg config.Wrapper, assetDir, tlsDefaultCommonNam DiskEventMask: diskSub, }, eventSubsMut: sync.NewMutex(), + evLogger: evLogger, discoverer: discoverer, connectionsService: connectionsService, fss: fss, @@ -315,7 +317,7 @@ func (s *service) serve(stop chan struct{}) { // Wrap everything in basic auth, if user/password is set. if guiCfg.IsAuthEnabled() { - handler = basicAuthAndSessionMiddleware("sessionid-"+s.id.String()[:5], guiCfg, s.cfg.LDAP(), handler) + handler = basicAuthAndSessionMiddleware("sessionid-"+s.id.String()[:5], guiCfg, s.cfg.LDAP(), handler, s.evLogger) } // Redirect to HTTPS if we are supposed to @@ -1215,7 +1217,7 @@ func (s *service) getEventSub(mask events.EventType) events.BufferedSubscription s.eventSubsMut.Lock() bufsub, ok := s.eventSubs[mask] if !ok { - evsub := events.Default.Subscribe(mask) + evsub := s.evLogger.Subscribe(mask) bufsub = events.NewBufferedSubscription(evsub, EventSubBufferSize) s.eventSubs[mask] = bufsub } diff --git a/lib/api/api_auth.go b/lib/api/api_auth.go index b9a313aba..9d4607918 100644 --- a/lib/api/api_auth.go +++ b/lib/api/api_auth.go @@ -28,14 +28,14 @@ var ( sessionsMut = sync.NewMutex() ) -func emitLoginAttempt(success bool, username string) { - events.Default.Log(events.LoginAttempt, map[string]interface{}{ +func emitLoginAttempt(success bool, username string, evLogger events.Logger) { + evLogger.Log(events.LoginAttempt, map[string]interface{}{ "success": success, "username": username, }) } -func basicAuthAndSessionMiddleware(cookieName string, guiCfg config.GUIConfiguration, ldapCfg config.LDAPConfiguration, next http.Handler) http.Handler { +func basicAuthAndSessionMiddleware(cookieName string, guiCfg config.GUIConfiguration, ldapCfg config.LDAPConfiguration, next http.Handler, evLogger events.Logger) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if guiCfg.IsValidAPIKey(r.Header.Get("X-API-Key")) { next.ServeHTTP(w, r) @@ -94,7 +94,7 @@ func basicAuthAndSessionMiddleware(cookieName string, guiCfg config.GUIConfigura } if !authOk { - emitLoginAttempt(false, username) + emitLoginAttempt(false, username, evLogger) error() return } @@ -109,7 +109,7 @@ func basicAuthAndSessionMiddleware(cookieName string, guiCfg config.GUIConfigura MaxAge: 0, }) - emitLoginAttempt(true, username) + emitLoginAttempt(true, username, evLogger) next.ServeHTTP(w, r) }) } diff --git a/lib/api/api_test.go b/lib/api/api_test.go index 6ac1ada2c..4a979addd 100644 --- a/lib/api/api_test.go +++ b/lib/api/api_test.go @@ -100,9 +100,9 @@ func TestStopAfterBrokenConfig(t *testing.T) { RawUseTLS: false, }, } - w := config.Wrap("/dev/null", cfg) + w := config.Wrap("/dev/null", cfg, events.NoopLogger) - srv := New(protocol.LocalDeviceID, w, "", "syncthing", nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, false).(*service) + srv := New(protocol.LocalDeviceID, w, "", "syncthing", nil, nil, nil, events.NoopLogger, nil, nil, nil, nil, nil, nil, nil, nil, false).(*service) defer os.Remove(token) srv.started = make(chan string) @@ -512,8 +512,8 @@ func startHTTP(cfg *mockedConfig) (string, error) { // Instantiate the API service urService := ur.New(cfg, m, connections, false) - summaryService := model.NewFolderSummaryService(cfg, m, protocol.LocalDeviceID) - svc := New(protocol.LocalDeviceID, cfg, assetDir, "syncthing", m, eventSub, diskEventSub, discoverer, connections, urService, summaryService, errorLog, systemLog, cpu, nil, false).(*service) + summaryService := model.NewFolderSummaryService(cfg, m, protocol.LocalDeviceID, events.NoopLogger) + svc := New(protocol.LocalDeviceID, cfg, assetDir, "syncthing", m, eventSub, diskEventSub, events.NoopLogger, discoverer, connections, urService, summaryService, errorLog, systemLog, cpu, nil, false).(*service) defer os.Remove(token) svc.started = addrChan @@ -979,7 +979,7 @@ func TestEventMasks(t *testing.T) { cfg := new(mockedConfig) defSub := new(mockedEventSub) diskSub := new(mockedEventSub) - svc := New(protocol.LocalDeviceID, cfg, "", "syncthing", nil, defSub, diskSub, nil, nil, nil, nil, nil, nil, nil, nil, false).(*service) + svc := New(protocol.LocalDeviceID, cfg, "", "syncthing", nil, defSub, diskSub, events.NoopLogger, nil, nil, nil, nil, nil, nil, nil, nil, false).(*service) defer os.Remove(token) if mask := svc.getEventMask(""); mask != DefaultEventMask { diff --git a/lib/config/commit_test.go b/lib/config/commit_test.go index e14ed2c66..7c2cf9296 100644 --- a/lib/config/commit_test.go +++ b/lib/config/commit_test.go @@ -44,7 +44,7 @@ func (validationError) String() string { func TestReplaceCommit(t *testing.T) { t.Skip("broken, fails randomly, #3834") - w := Wrap("/dev/null", Configuration{Version: 0}) + w := wrap("/dev/null", Configuration{Version: 0}) if w.RawCopy().Version != 0 { t.Fatal("Config incorrect") } diff --git a/lib/config/config_test.go b/lib/config/config_test.go index 0fe04dd77..82cf57b9a 100644 --- a/lib/config/config_test.go +++ b/lib/config/config_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/d4l3k/messagediff" + "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/fs" "github.com/syncthing/syncthing/lib/osutil" "github.com/syncthing/syncthing/lib/protocol" @@ -86,7 +87,7 @@ func TestDefaultValues(t *testing.T) { func TestDeviceConfig(t *testing.T) { for i := OldestHandledVersion; i <= CurrentVersion; i++ { os.RemoveAll(filepath.Join("testdata", DefaultMarkerName)) - wr, err := Load(fmt.Sprintf("testdata/v%d.xml", i), device1) + wr, err := load(fmt.Sprintf("testdata/v%d.xml", i), device1) if err != nil { t.Fatal(err) } @@ -168,7 +169,7 @@ func TestDeviceConfig(t *testing.T) { } func TestNoListenAddresses(t *testing.T) { - cfg, err := Load("testdata/nolistenaddress.xml", device1) + cfg, err := load("testdata/nolistenaddress.xml", device1) if err != nil { t.Error(err) } @@ -225,7 +226,7 @@ func TestOverriddenValues(t *testing.T) { } os.Unsetenv("STNOUPGRADE") - cfg, err := Load("testdata/overridenvalues.xml", device1) + cfg, err := load("testdata/overridenvalues.xml", device1) if err != nil { t.Error(err) } @@ -270,7 +271,7 @@ func TestDeviceAddressesDynamic(t *testing.T) { }, } - cfg, err := Load("testdata/deviceaddressesdynamic.xml", device4) + cfg, err := load("testdata/deviceaddressesdynamic.xml", device4) if err != nil { t.Error(err) } @@ -319,7 +320,7 @@ func TestDeviceCompression(t *testing.T) { }, } - cfg, err := Load("testdata/devicecompression.xml", device4) + cfg, err := load("testdata/devicecompression.xml", device4) if err != nil { t.Error(err) } @@ -365,7 +366,7 @@ func TestDeviceAddressesStatic(t *testing.T) { }, } - cfg, err := Load("testdata/deviceaddressesstatic.xml", device4) + cfg, err := load("testdata/deviceaddressesstatic.xml", device4) if err != nil { t.Error(err) } @@ -377,7 +378,7 @@ func TestDeviceAddressesStatic(t *testing.T) { } func TestVersioningConfig(t *testing.T) { - cfg, err := Load("testdata/versioningconfig.xml", device4) + cfg, err := load("testdata/versioningconfig.xml", device4) if err != nil { t.Error(err) } @@ -404,7 +405,7 @@ func TestIssue1262(t *testing.T) { t.Skipf("path gets converted to absolute as part of the filesystem initialization on linux") } - cfg, err := Load("testdata/issue-1262.xml", device4) + cfg, err := load("testdata/issue-1262.xml", device4) if err != nil { t.Fatal(err) } @@ -418,7 +419,7 @@ func TestIssue1262(t *testing.T) { } func TestIssue1750(t *testing.T) { - cfg, err := Load("testdata/issue-1750.xml", device4) + cfg, err := load("testdata/issue-1750.xml", device4) if err != nil { t.Fatal(err) } @@ -520,7 +521,7 @@ func TestNewSaveLoad(t *testing.T) { } intCfg := New(device1) - cfg := Wrap(path, intCfg) + cfg := wrap(path, intCfg) // To make the equality pass later cfg.(*wrapper).cfg.XMLName.Local = "configuration" @@ -537,7 +538,7 @@ func TestNewSaveLoad(t *testing.T) { t.Error(path, "does not exist") } - cfg2, err := Load(path, device1) + cfg2, err := load(path, device1) if err != nil { t.Error(err) } @@ -564,7 +565,7 @@ func TestPrepare(t *testing.T) { } func TestCopy(t *testing.T) { - wrapper, err := Load("testdata/example.xml", device1) + wrapper, err := load("testdata/example.xml", device1) if err != nil { t.Fatal(err) } @@ -603,7 +604,7 @@ func TestCopy(t *testing.T) { } func TestPullOrder(t *testing.T) { - wrapper, err := Load("testdata/pullorder.xml", device1) + wrapper, err := load("testdata/pullorder.xml", device1) if err != nil { t.Fatal(err) } @@ -643,7 +644,7 @@ func TestPullOrder(t *testing.T) { if err != nil { t.Fatal(err) } - wrapper = Wrap("testdata/pullorder.xml", cfg) + wrapper = wrap("testdata/pullorder.xml", cfg) folders = wrapper.Folders() for _, tc := range expected { @@ -654,7 +655,7 @@ func TestPullOrder(t *testing.T) { } func TestLargeRescanInterval(t *testing.T) { - wrapper, err := Load("testdata/largeinterval.xml", device1) + wrapper, err := load("testdata/largeinterval.xml", device1) if err != nil { t.Fatal(err) } @@ -692,7 +693,7 @@ func TestGUIConfigURL(t *testing.T) { func TestDuplicateDevices(t *testing.T) { // Duplicate devices should be removed - wrapper, err := Load("testdata/dupdevices.xml", device1) + wrapper, err := load("testdata/dupdevices.xml", device1) if err != nil { t.Fatal(err) } @@ -710,7 +711,7 @@ func TestDuplicateDevices(t *testing.T) { func TestDuplicateFolders(t *testing.T) { // Duplicate folders are a loading error - _, err := Load("testdata/dupfolders.xml", device1) + _, err := load("testdata/dupfolders.xml", device1) if err == nil || !strings.Contains(err.Error(), errFolderIDDuplicate.Error()) { t.Fatal(`Expected error to mention "duplicate folder ID":`, err) } @@ -721,7 +722,7 @@ func TestEmptyFolderPaths(t *testing.T) { // get messed up by the prepare steps (e.g., become the current dir or // get a slash added so that it becomes the root directory or similar). - _, err := Load("testdata/nopath.xml", device1) + _, err := load("testdata/nopath.xml", device1) if err == nil || !strings.Contains(err.Error(), errFolderPathEmpty.Error()) { t.Fatal("Expected error due to empty folder path, got", err) } @@ -790,7 +791,7 @@ func TestIgnoredDevices(t *testing.T) { // Verify that ignored devices that are also present in the // configuration are not in fact ignored. - wrapper, err := Load("testdata/ignoreddevices.xml", device1) + wrapper, err := load("testdata/ignoreddevices.xml", device1) if err != nil { t.Fatal(err) } @@ -808,7 +809,7 @@ func TestIgnoredFolders(t *testing.T) { // configuration are not in fact ignored. // Also, verify that folders that are shared with a device are not ignored. - wrapper, err := Load("testdata/ignoredfolders.xml", device1) + wrapper, err := load("testdata/ignoredfolders.xml", device1) if err != nil { t.Fatal(err) } @@ -844,7 +845,7 @@ func TestIgnoredFolders(t *testing.T) { func TestGetDevice(t *testing.T) { // Verify that the Device() call does the right thing - wrapper, err := Load("testdata/ignoreddevices.xml", device1) + wrapper, err := load("testdata/ignoreddevices.xml", device1) if err != nil { t.Fatal(err) } @@ -871,7 +872,7 @@ func TestGetDevice(t *testing.T) { } func TestSharesRemovedOnDeviceRemoval(t *testing.T) { - wrapper, err := Load("testdata/example.xml", device1) + wrapper, err := load("testdata/example.xml", device1) if err != nil { t.Errorf("Failed: %s", err) } @@ -956,7 +957,7 @@ func TestIssue4219(t *testing.T) { t.Errorf("There should be three ignored folders, not %d", ignoredFolders) } - w := Wrap("/tmp/cfg", cfg) + w := wrap("/tmp/cfg", cfg) if !w.IgnoredFolder(device2, "t1") { t.Error("Folder device2 t1 should be ignored") } @@ -1145,3 +1146,11 @@ func defaultConfigAsMap() map[string]interface{} { } return tmp } + +func load(path string, myID protocol.DeviceID) (Wrapper, error) { + return Load(path, myID, events.NoopLogger) +} + +func wrap(path string, cfg Configuration) Wrapper { + return Wrap(path, cfg, events.NoopLogger) +} diff --git a/lib/config/wrapper.go b/lib/config/wrapper.go index 774016f89..1082b74b5 100644 --- a/lib/config/wrapper.go +++ b/lib/config/wrapper.go @@ -96,8 +96,9 @@ type Wrapper interface { } type wrapper struct { - cfg Configuration - path string + cfg Configuration + path string + evLogger events.Logger deviceMap map[protocol.DeviceID]DeviceConfiguration folderMap map[string]FolderConfiguration @@ -133,18 +134,19 @@ func (w *wrapper) StunServers() []string { // Wrap wraps an existing Configuration structure and ties it to a file on // disk. -func Wrap(path string, cfg Configuration) Wrapper { +func Wrap(path string, cfg Configuration, evLogger events.Logger) Wrapper { w := &wrapper{ - cfg: cfg, - path: path, - mut: sync.NewMutex(), + cfg: cfg, + path: path, + evLogger: evLogger, + mut: sync.NewMutex(), } return w } // Load loads an existing file on disk and returns a new configuration // wrapper. -func Load(path string, myID protocol.DeviceID) (Wrapper, error) { +func Load(path string, myID protocol.DeviceID, evLogger events.Logger) (Wrapper, error) { fd, err := os.Open(path) if err != nil { return nil, err @@ -156,7 +158,7 @@ func Load(path string, myID protocol.DeviceID) (Wrapper, error) { return nil, err } - return Wrap(path, cfg), nil + return Wrap(path, cfg, evLogger), nil } func (w *wrapper) ConfigPath() string { @@ -450,7 +452,7 @@ func (w *wrapper) Save() error { return err } - events.Default.Log(events.ConfigSaved, w.cfg) + w.evLogger.Log(events.ConfigSaved, w.cfg) return nil } diff --git a/lib/connections/lan_test.go b/lib/connections/lan_test.go index a9f388253..99f7d1310 100644 --- a/lib/connections/lan_test.go +++ b/lib/connections/lan_test.go @@ -10,6 +10,7 @@ import ( "testing" "github.com/syncthing/syncthing/lib/config" + "github.com/syncthing/syncthing/lib/events" ) func TestIsLANHost(t *testing.T) { @@ -35,7 +36,7 @@ func TestIsLANHost(t *testing.T) { Options: config.OptionsConfiguration{ AlwaysLocalNets: []string{"10.20.30.0/24"}, }, - }) + }, events.NoopLogger) s := &service{cfg: cfg} for _, tc := range cases { diff --git a/lib/connections/limiter_test.go b/lib/connections/limiter_test.go index 22319f1f3..e7b3a0436 100644 --- a/lib/connections/limiter_test.go +++ b/lib/connections/limiter_test.go @@ -8,6 +8,7 @@ package connections import ( "github.com/syncthing/syncthing/lib/config" + "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/protocol" "golang.org/x/time/rate" "math/rand" @@ -25,7 +26,7 @@ func init() { } func initConfig() config.Wrapper { - cfg := config.Wrap("/dev/null", config.New(device1)) + cfg := config.Wrap("/dev/null", config.New(device1), events.NoopLogger) dev1Conf = config.NewDeviceConfiguration(device1, "device1") dev2Conf = config.NewDeviceConfiguration(device2, "device2") dev3Conf = config.NewDeviceConfiguration(device3, "device3") diff --git a/lib/connections/service.go b/lib/connections/service.go index aa54b4893..7eff4c308 100644 --- a/lib/connections/service.go +++ b/lib/connections/service.go @@ -120,6 +120,7 @@ type service struct { limiter *limiter natService *nat.Service natServiceToken *suture.ServiceToken + evLogger events.Logger listenersMut sync.RWMutex listeners map[string]genericListener @@ -130,7 +131,7 @@ type service struct { connectionStatus map[string]ConnectionStatusEntry // address -> latest error/status } -func NewService(cfg config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *tls.Config, discoverer discover.Finder, bepProtocolName string, tlsDefaultCommonName string) Service { +func NewService(cfg config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *tls.Config, discoverer discover.Finder, bepProtocolName string, tlsDefaultCommonName string, evLogger events.Logger) Service { service := &service{ Supervisor: suture.New("connections.Service", suture.Spec{ Log: func(line string) { @@ -148,6 +149,7 @@ func NewService(cfg config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *t tlsDefaultCommonName: tlsDefaultCommonName, limiter: newLimiter(cfg), natService: nat.NewService(myID, cfg), + evLogger: evLogger, listenersMut: sync.NewRWMutex(), listeners: make(map[string]genericListener), @@ -552,7 +554,7 @@ func (s *service) createListener(factory listenerFactory, uri *url.URL) bool { } func (s *service) logListenAddressesChangedEvent(l genericListener) { - events.Default.Log(events.ListenAddressesChanged, map[string]interface{}{ + s.evLogger.Log(events.ListenAddressesChanged, map[string]interface{}{ "address": l.URI(), "lan": l.LANAddresses(), "wan": l.WANAddresses(), @@ -579,7 +581,7 @@ func (s *service) CommitConfiguration(from, to config.Configuration) bool { s.listenersMut.Lock() seen := make(map[string]struct{}) - for _, addr := range config.Wrap("", to).ListenAddresses() { + for _, addr := range config.Wrap("", to, s.evLogger).ListenAddresses() { if addr == "" { // We can get an empty address if there is an empty listener // element in the config, indicating no listeners should be diff --git a/lib/discover/global.go b/lib/discover/global.go index 86738c393..2ce44864a 100644 --- a/lib/discover/global.go +++ b/lib/discover/global.go @@ -35,6 +35,7 @@ type globalClient struct { queryClient httpClient noAnnounce bool noLookup bool + evLogger events.Logger errorHolder } @@ -70,7 +71,7 @@ func (e lookupError) CacheFor() time.Duration { return e.cacheFor } -func NewGlobal(server string, cert tls.Certificate, addrList AddressLister) (FinderService, error) { +func NewGlobal(server string, cert tls.Certificate, addrList AddressLister, evLogger events.Logger) (FinderService, error) { server, opts, err := parseOptions(server) if err != nil { return nil, err @@ -125,6 +126,7 @@ func NewGlobal(server string, cert tls.Certificate, addrList AddressLister) (Fin queryClient: queryClient, noAnnounce: opts.noAnnounce, noLookup: opts.noLookup, + evLogger: evLogger, } cl.Service = util.AsService(cl.serve) if !opts.noAnnounce { @@ -197,8 +199,8 @@ func (c *globalClient) serve(stop chan struct{}) { timer := time.NewTimer(0) defer timer.Stop() - eventSub := events.Default.Subscribe(events.ListenAddressesChanged) - defer events.Default.Unsubscribe(eventSub) + eventSub := c.evLogger.Subscribe(events.ListenAddressesChanged) + defer eventSub.Unsubscribe() for { select { diff --git a/lib/discover/global_test.go b/lib/discover/global_test.go index 0bc1e63b9..45621749d 100644 --- a/lib/discover/global_test.go +++ b/lib/discover/global_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/tlsutil" ) @@ -54,15 +55,15 @@ func TestGlobalOverHTTP(t *testing.T) { // is only allowed in combination with the "insecure" and "noannounce" // parameters. - if _, err := NewGlobal("http://192.0.2.42/", tls.Certificate{}, nil); err == nil { + if _, err := NewGlobal("http://192.0.2.42/", tls.Certificate{}, nil, events.NoopLogger); err == nil { t.Fatal("http is not allowed without insecure and noannounce") } - if _, err := NewGlobal("http://192.0.2.42/?insecure", tls.Certificate{}, nil); err == nil { + if _, err := NewGlobal("http://192.0.2.42/?insecure", tls.Certificate{}, nil, events.NoopLogger); err == nil { t.Fatal("http is not allowed without noannounce") } - if _, err := NewGlobal("http://192.0.2.42/?noannounce", tls.Certificate{}, nil); err == nil { + if _, err := NewGlobal("http://192.0.2.42/?noannounce", tls.Certificate{}, nil, events.NoopLogger); err == nil { t.Fatal("http is not allowed without insecure") } @@ -193,7 +194,7 @@ func TestGlobalAnnounce(t *testing.T) { go func() { _ = http.Serve(list, mux) }() url := "https://" + list.Addr().String() + "?insecure" - disco, err := NewGlobal(url, cert, new(fakeAddressLister)) + disco, err := NewGlobal(url, cert, new(fakeAddressLister), events.NoopLogger) if err != nil { t.Fatal(err) } @@ -217,7 +218,7 @@ func TestGlobalAnnounce(t *testing.T) { } func testLookup(url string) ([]string, error) { - disco, err := NewGlobal(url, tls.Certificate{}, nil) + disco, err := NewGlobal(url, tls.Certificate{}, nil, events.NoopLogger) if err != nil { return nil, err } diff --git a/lib/discover/local.go b/lib/discover/local.go index 7108ef18b..fa560b1df 100644 --- a/lib/discover/local.go +++ b/lib/discover/local.go @@ -30,6 +30,7 @@ type localClient struct { myID protocol.DeviceID addrList AddressLister name string + evLogger events.Logger beacon beacon.Interface localBcastStart time.Time @@ -46,13 +47,14 @@ const ( v13Magic = uint32(0x7D79BC40) // previous version ) -func NewLocal(id protocol.DeviceID, addr string, addrList AddressLister) (FinderService, error) { +func NewLocal(id protocol.DeviceID, addr string, addrList AddressLister, evLogger events.Logger) (FinderService, error) { c := &localClient{ Supervisor: suture.New("local", suture.Spec{ PassThroughPanics: true, }), myID: id, addrList: addrList, + evLogger: evLogger, localBcastTick: time.NewTicker(BroadcastInterval).C, forcedBcastTick: make(chan time.Time), localBcastStart: time.Now(), @@ -272,7 +274,7 @@ func (c *localClient) registerDevice(src net.Addr, device Announce) bool { }) if isNewDevice { - events.Default.Log(events.DeviceDiscovered, map[string]interface{}{ + c.evLogger.Log(events.DeviceDiscovered, map[string]interface{}{ "device": device.ID.String(), "addrs": validAddresses, }) diff --git a/lib/discover/local_test.go b/lib/discover/local_test.go index 422e6ec9e..b6bf3c32f 100644 --- a/lib/discover/local_test.go +++ b/lib/discover/local_test.go @@ -11,11 +11,12 @@ import ( "net" "testing" + "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/protocol" ) func TestLocalInstanceID(t *testing.T) { - c, err := NewLocal(protocol.LocalDeviceID, ":0", &fakeAddressLister{}) + c, err := NewLocal(protocol.LocalDeviceID, ":0", &fakeAddressLister{}, events.NoopLogger) if err != nil { t.Fatal(err) } @@ -38,7 +39,7 @@ func TestLocalInstanceID(t *testing.T) { } func TestLocalInstanceIDShouldTriggerNew(t *testing.T) { - c, err := NewLocal(protocol.LocalDeviceID, ":0", &fakeAddressLister{}) + c, err := NewLocal(protocol.LocalDeviceID, ":0", &fakeAddressLister{}, events.NoopLogger) if err != nil { t.Fatal(err) } diff --git a/lib/events/debug.go b/lib/events/debug.go index bd3e9ca4b..307b98e3d 100644 --- a/lib/events/debug.go +++ b/lib/events/debug.go @@ -10,11 +10,11 @@ import ( "os" "strings" - "github.com/syncthing/syncthing/lib/logger" + liblogger "github.com/syncthing/syncthing/lib/logger" ) var ( - dl = logger.DefaultLogger.NewFacility("events", "Event generation and logging") + dl = liblogger.DefaultLogger.NewFacility("events", "Event generation and logging") ) func init() { diff --git a/lib/events/events.go b/lib/events/events.go index b954b6b50..bd6f9a0bb 100644 --- a/lib/events/events.go +++ b/lib/events/events.go @@ -13,7 +13,10 @@ import ( "runtime" "time" + "github.com/thejerf/suture" + "github.com/syncthing/syncthing/lib/sync" + "github.com/syncthing/syncthing/lib/util" ) type EventType int @@ -51,7 +54,10 @@ const ( AllEvents = (1 << iota) - 1 ) -var runningTests = false +var ( + runningTests = false + errNoop = errors.New("method of a noop object called") +) const eventLogTimeout = 15 * time.Millisecond @@ -199,13 +205,21 @@ func UnmarshalEventType(s string) EventType { const BufferSize = 64 -type Logger struct { - subs []*Subscription +type Logger interface { + suture.Service + Log(t EventType, data interface{}) + Subscribe(mask EventType) Subscription +} + +type logger struct { + suture.Service + subs []*subscription nextSubscriptionIDs []int nextGlobalID int timeout *time.Timer events chan Event funcs chan func() + toUnsubscribe chan *subscription stop chan struct{} } @@ -219,19 +233,17 @@ type Event struct { Data interface{} `json:"data"` } -type Subscription struct { - mask EventType - events chan Event - timeout *time.Timer +type Subscription interface { + C() <-chan Event + Poll(timeout time.Duration) (Event, error) + Unsubscribe() } -var Default = NewLogger() - -func init() { - // The default logger never stops. To ensure this we nil out the stop - // channel so any attempt to stop it will panic. - Default.stop = nil - go Default.Serve() +type subscription struct { + mask EventType + events chan Event + toUnsubscribe chan *subscription + timeout *time.Timer } var ( @@ -239,13 +251,14 @@ var ( ErrClosed = errors.New("closed") ) -func NewLogger() *Logger { - l := &Logger{ - timeout: time.NewTimer(time.Second), - events: make(chan Event, BufferSize), - funcs: make(chan func()), - stop: make(chan struct{}), +func NewLogger() Logger { + l := &logger{ + timeout: time.NewTimer(time.Second), + events: make(chan Event, BufferSize), + funcs: make(chan func()), + toUnsubscribe: make(chan *subscription), } + l.Service = util.AsService(l.serve) // Make sure the timer is in the stopped state and hasn't fired anything // into the channel. if !l.timeout.Stop() { @@ -254,7 +267,7 @@ func NewLogger() *Logger { return l } -func (l *Logger) Serve() { +func (l *logger) serve(stop chan struct{}) { loop: for { select { @@ -263,10 +276,13 @@ loop: l.sendEvent(e) case fn := <-l.funcs: - // Subscriptions etc are handled here. + // Subscriptions are handled here. fn() - case <-l.stop: + case s := <-l.toUnsubscribe: + l.unsubscribe(s) + + case <-stop: break loop } } @@ -279,11 +295,7 @@ loop: } } -func (l *Logger) Stop() { - close(l.stop) -} - -func (l *Logger) Log(t EventType, data interface{}) { +func (l *logger) Log(t EventType, data interface{}) { l.events <- Event{ Time: time.Now(), Type: t, @@ -292,7 +304,7 @@ func (l *Logger) Log(t EventType, data interface{}) { } } -func (l *Logger) sendEvent(e Event) { +func (l *logger) sendEvent(e Event) { l.nextGlobalID++ dl.Debugln("log", l.nextGlobalID, e.Type, e.Data) @@ -323,15 +335,16 @@ func (l *Logger) sendEvent(e Event) { } } -func (l *Logger) Subscribe(mask EventType) *Subscription { - res := make(chan *Subscription) +func (l *logger) Subscribe(mask EventType) Subscription { + res := make(chan Subscription) l.funcs <- func() { dl.Debugln("subscribe", mask) - s := &Subscription{ - mask: mask, - events: make(chan Event, BufferSize), - timeout: time.NewTimer(0), + s := &subscription{ + mask: mask, + events: make(chan Event, BufferSize), + toUnsubscribe: l.toUnsubscribe, + timeout: time.NewTimer(0), } // We need to create the timeout timer in the stopped, non-fired state so @@ -355,32 +368,30 @@ func (l *Logger) Subscribe(mask EventType) *Subscription { return <-res } -func (l *Logger) Unsubscribe(s *Subscription) { - l.funcs <- func() { - dl.Debugln("unsubscribe") - for i, ss := range l.subs { - if s == ss { - last := len(l.subs) - 1 +func (l *logger) unsubscribe(s *subscription) { + dl.Debugln("unsubscribe", s.mask) + for i, ss := range l.subs { + if s == ss { + last := len(l.subs) - 1 - l.subs[i] = l.subs[last] - l.subs[last] = nil - l.subs = l.subs[:last] + l.subs[i] = l.subs[last] + l.subs[last] = nil + l.subs = l.subs[:last] - l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last] - l.nextSubscriptionIDs[last] = 0 - l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last] + l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last] + l.nextSubscriptionIDs[last] = 0 + l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last] - break - } + break } - close(s.events) } + close(s.events) } // Poll returns an event from the subscription or an error if the poll times // out of the event channel is closed. Poll should not be called concurrently // from multiple goroutines for a single subscription. -func (s *Subscription) Poll(timeout time.Duration) (Event, error) { +func (s *subscription) Poll(timeout time.Duration) (Event, error) { dl.Debugln("poll", timeout) s.timeout.Reset(timeout) @@ -409,12 +420,16 @@ func (s *Subscription) Poll(timeout time.Duration) (Event, error) { } } -func (s *Subscription) C() <-chan Event { +func (s *subscription) C() <-chan Event { return s.events } +func (s *subscription) Unsubscribe() { + s.toUnsubscribe <- s +} + type bufferedSubscription struct { - sub *Subscription + sub Subscription buf []Event next int cur int // Current SubscriptionID @@ -426,7 +441,7 @@ type BufferedSubscription interface { Since(id int, into []Event, timeout time.Duration) []Event } -func NewBufferedSubscription(s *Subscription, size int) BufferedSubscription { +func NewBufferedSubscription(s Subscription, size int) BufferedSubscription { bs := &bufferedSubscription{ sub: s, buf: make([]Event, size), @@ -489,3 +504,29 @@ func Error(err error) *string { str := err.Error() return &str } + +type noopLogger struct{} + +var NoopLogger Logger = &noopLogger{} + +func (*noopLogger) Serve() {} + +func (*noopLogger) Stop() {} + +func (*noopLogger) Log(t EventType, data interface{}) {} + +func (*noopLogger) Subscribe(mask EventType) Subscription { + return &noopSubscription{} +} + +type noopSubscription struct{} + +func (*noopSubscription) C() <-chan Event { + return nil +} + +func (*noopSubscription) Poll(timeout time.Duration) (Event, error) { + return Event{}, errNoop +} + +func (*noopSubscription) Unsubscribe() {} diff --git a/lib/events/events_test.go b/lib/events/events_test.go index 3848e973c..49f64e9a4 100644 --- a/lib/events/events_test.go +++ b/lib/events/events_test.go @@ -33,7 +33,7 @@ func TestSubscriber(t *testing.T) { go l.Serve() s := l.Subscribe(0) - defer l.Unsubscribe(s) + defer s.Unsubscribe() if s == nil { t.Fatal("Unexpected nil Subscription") } @@ -45,7 +45,7 @@ func TestTimeout(t *testing.T) { go l.Serve() s := l.Subscribe(0) - defer l.Unsubscribe(s) + defer s.Unsubscribe() _, err := s.Poll(timeout) if err != ErrTimeout { t.Fatal("Unexpected non-Timeout error:", err) @@ -59,7 +59,7 @@ func TestEventBeforeSubscribe(t *testing.T) { l.Log(DeviceConnected, "foo") s := l.Subscribe(0) - defer l.Unsubscribe(s) + defer s.Unsubscribe() _, err := s.Poll(timeout) if err != ErrTimeout { @@ -73,7 +73,7 @@ func TestEventAfterSubscribe(t *testing.T) { go l.Serve() s := l.Subscribe(AllEvents) - defer l.Unsubscribe(s) + defer s.Unsubscribe() l.Log(DeviceConnected, "foo") ev, err := s.Poll(timeout) @@ -100,7 +100,7 @@ func TestEventAfterSubscribeIgnoreMask(t *testing.T) { go l.Serve() s := l.Subscribe(DeviceDisconnected) - defer l.Unsubscribe(s) + defer s.Unsubscribe() l.Log(DeviceConnected, "foo") _, err := s.Poll(timeout) @@ -115,7 +115,7 @@ func TestBufferOverflow(t *testing.T) { go l.Serve() s := l.Subscribe(AllEvents) - defer l.Unsubscribe(s) + defer s.Unsubscribe() // The first BufferSize events will be logged pretty much // instantaneously. The next BufferSize events will each block for up to @@ -147,7 +147,7 @@ func TestUnsubscribe(t *testing.T) { t.Fatal("Unexpected error:", err) } - l.Unsubscribe(s) + s.Unsubscribe() l.Log(DeviceConnected, "foo") _, err = s.Poll(timeout) @@ -162,7 +162,7 @@ func TestGlobalIDs(t *testing.T) { go l.Serve() s := l.Subscribe(AllEvents) - defer l.Unsubscribe(s) + defer s.Unsubscribe() l.Log(DeviceConnected, "foo") l.Subscribe(AllEvents) l.Log(DeviceConnected, "bar") @@ -194,7 +194,7 @@ func TestSubscriptionIDs(t *testing.T) { go l.Serve() s := l.Subscribe(DeviceConnected) - defer l.Unsubscribe(s) + defer s.Unsubscribe() l.Log(DeviceDisconnected, "a") l.Log(DeviceConnected, "b") @@ -236,7 +236,7 @@ func TestBufferedSub(t *testing.T) { go l.Serve() s := l.Subscribe(AllEvents) - defer l.Unsubscribe(s) + defer s.Unsubscribe() bs := NewBufferedSubscription(s, 10*BufferSize) go func() { @@ -267,7 +267,7 @@ func BenchmarkBufferedSub(b *testing.B) { go l.Serve() s := l.Subscribe(AllEvents) - defer l.Unsubscribe(s) + defer s.Unsubscribe() bufferSize := BufferSize bs := NewBufferedSubscription(s, bufferSize) @@ -323,7 +323,7 @@ func TestSinceUsesSubscriptionId(t *testing.T) { go l.Serve() s := l.Subscribe(DeviceConnected) - defer l.Unsubscribe(s) + defer s.Unsubscribe() bs := NewBufferedSubscription(s, 10*BufferSize) l.Log(DeviceConnected, "a") // SubscriptionID = 1 @@ -390,7 +390,7 @@ func TestUnsubscribeContention(t *testing.T) { defer listenerWg.Done() s := l.Subscribe(AllEvents) - defer l.Unsubscribe(s) + defer s.Unsubscribe() for { select { @@ -449,7 +449,7 @@ func BenchmarkLogEvent(b *testing.B) { go l.Serve() s := l.Subscribe(AllEvents) - defer l.Unsubscribe(s) + defer s.Unsubscribe() NewBufferedSubscription(s, 1) // runs in the background for i := 0; i < b.N; i++ { diff --git a/lib/model/folder.go b/lib/model/folder.go index ef5ca2ad5..01d1fc1c1 100644 --- a/lib/model/folder.go +++ b/lib/model/folder.go @@ -77,11 +77,11 @@ type puller interface { pull() bool // true when successfull and should not be retried } -func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration) folder { +func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, evLogger events.Logger) folder { ctx, cancel := context.WithCancel(context.Background()) return folder{ - stateTracker: newStateTracker(cfg.ID), + stateTracker: newStateTracker(cfg.ID, evLogger), FolderConfiguration: cfg, FolderStatisticsReference: stats.NewFolderStatisticsReference(model.db, cfg.ID), @@ -630,7 +630,7 @@ func (f *folder) monitorWatch(ctx context.Context) { failTimer.Reset(time.Minute) continue } - watchaggregator.Aggregate(eventChan, f.watchChan, f.FolderConfiguration, f.model.cfg, aggrCtx) + watchaggregator.Aggregate(eventChan, f.watchChan, f.FolderConfiguration, f.model.cfg, f.evLogger, aggrCtx) l.Debugln("Started filesystem watcher for folder", f.Description()) case err = <-errChan: f.setWatchError(err) @@ -669,7 +669,7 @@ func (f *folder) setWatchError(err error) { if err != nil { data["to"] = err.Error() } - events.Default.Log(events.FolderWatchStateChanged, data) + f.evLogger.Log(events.FolderWatchStateChanged, data) } if err == nil { return @@ -800,7 +800,7 @@ func (f *folder) updateLocals(fs []protocol.FileInfo) { filenames[i] = file.Name } - events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{ + f.evLogger.Log(events.LocalIndexUpdated, map[string]interface{}{ "folder": f.ID, "items": len(fs), "filenames": filenames, @@ -839,7 +839,7 @@ func (f *folder) emitDiskChangeEvents(fs []protocol.FileInfo, typeOfEvent events } // Two different events can be fired here based on what EventType is passed into function - events.Default.Log(typeOfEvent, map[string]string{ + f.evLogger.Log(typeOfEvent, map[string]string{ "folder": f.ID, "folderID": f.ID, // incorrect, deprecated, kept for historical compliance "label": f.Label, diff --git a/lib/model/folder_recvonly.go b/lib/model/folder_recvonly.go index 082a0b66d..7899e789b 100644 --- a/lib/model/folder_recvonly.go +++ b/lib/model/folder_recvonly.go @@ -12,6 +12,7 @@ import ( "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/db" + "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/fs" "github.com/syncthing/syncthing/lib/ignore" "github.com/syncthing/syncthing/lib/protocol" @@ -56,8 +57,8 @@ type receiveOnlyFolder struct { *sendReceiveFolder } -func newReceiveOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem) service { - sr := newSendReceiveFolder(model, fset, ignores, cfg, ver, fs).(*sendReceiveFolder) +func newReceiveOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem, evLogger events.Logger) service { + sr := newSendReceiveFolder(model, fset, ignores, cfg, ver, fs, evLogger).(*sendReceiveFolder) sr.localFlags = protocol.FlagLocalReceiveOnly // gets propagated to the scanner, and set on locally changed files return &receiveOnlyFolder{sr} } diff --git a/lib/model/folder_recvonly_test.go b/lib/model/folder_recvonly_test.go index aea96ddd1..02cf2caa7 100644 --- a/lib/model/folder_recvonly_test.go +++ b/lib/model/folder_recvonly_test.go @@ -321,6 +321,7 @@ func setupROFolder() (*model, *sendOnlyFolder) { f := &sendOnlyFolder{ folder: folder{ + stateTracker: newStateTracker(fcfg.ID, m.evLogger), fset: m.folderFiles[fcfg.ID], FolderConfiguration: fcfg, }, diff --git a/lib/model/folder_sendonly.go b/lib/model/folder_sendonly.go index d199eec59..0bb7e7688 100644 --- a/lib/model/folder_sendonly.go +++ b/lib/model/folder_sendonly.go @@ -9,6 +9,7 @@ package model import ( "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/db" + "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/fs" "github.com/syncthing/syncthing/lib/ignore" "github.com/syncthing/syncthing/lib/protocol" @@ -24,9 +25,9 @@ type sendOnlyFolder struct { folder } -func newSendOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, _ versioner.Versioner, _ fs.Filesystem) service { +func newSendOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, _ versioner.Versioner, _ fs.Filesystem, evLogger events.Logger) service { f := &sendOnlyFolder{ - folder: newFolder(model, fset, ignores, cfg), + folder: newFolder(model, fset, ignores, cfg, evLogger), } f.folder.puller = f f.folder.Service = util.AsService(f.serve) diff --git a/lib/model/folder_sendrecv.go b/lib/model/folder_sendrecv.go index 320541854..e11240303 100644 --- a/lib/model/folder_sendrecv.go +++ b/lib/model/folder_sendrecv.go @@ -108,9 +108,9 @@ type sendReceiveFolder struct { pullErrorsMut sync.Mutex } -func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem) service { +func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem, evLogger events.Logger) service { f := &sendReceiveFolder{ - folder: newFolder(model, fset, ignores, cfg), + folder: newFolder(model, fset, ignores, cfg, evLogger), fs: fs, versioner: ver, queue: newJobQueue(), @@ -211,7 +211,7 @@ func (f *sendReceiveFolder) pull() bool { // errors preventing us. Flag this with a warning and // wait a bit longer before retrying. if errors := f.Errors(); len(errors) > 0 { - events.Default.Log(events.FolderErrors, map[string]interface{}{ + f.evLogger.Log(events.FolderErrors, map[string]interface{}{ "folder": f.folderID, "errors": errors, }) @@ -544,7 +544,7 @@ func (f *sendReceiveFolder) handleDir(file protocol.FileInfo, dbUpdateChan chan< f.resetPullError(file.Name) - events.Default.Log(events.ItemStarted, map[string]string{ + f.evLogger.Log(events.ItemStarted, map[string]string{ "folder": f.folderID, "item": file.Name, "type": "dir", @@ -552,7 +552,7 @@ func (f *sendReceiveFolder) handleDir(file protocol.FileInfo, dbUpdateChan chan< }) defer func() { - events.Default.Log(events.ItemFinished, map[string]interface{}{ + f.evLogger.Log(events.ItemFinished, map[string]interface{}{ "folder": f.folderID, "item": file.Name, "error": events.Error(err), @@ -700,7 +700,7 @@ func (f *sendReceiveFolder) handleSymlink(file protocol.FileInfo, dbUpdateChan c f.resetPullError(file.Name) - events.Default.Log(events.ItemStarted, map[string]string{ + f.evLogger.Log(events.ItemStarted, map[string]string{ "folder": f.folderID, "item": file.Name, "type": "symlink", @@ -708,7 +708,7 @@ func (f *sendReceiveFolder) handleSymlink(file protocol.FileInfo, dbUpdateChan c }) defer func() { - events.Default.Log(events.ItemFinished, map[string]interface{}{ + f.evLogger.Log(events.ItemFinished, map[string]interface{}{ "folder": f.folderID, "item": file.Name, "error": events.Error(err), @@ -782,7 +782,7 @@ func (f *sendReceiveFolder) deleteDir(file protocol.FileInfo, dbUpdateChan chan< // care not declare another err. var err error - events.Default.Log(events.ItemStarted, map[string]string{ + f.evLogger.Log(events.ItemStarted, map[string]string{ "folder": f.folderID, "item": file.Name, "type": "dir", @@ -790,7 +790,7 @@ func (f *sendReceiveFolder) deleteDir(file protocol.FileInfo, dbUpdateChan chan< }) defer func() { - events.Default.Log(events.ItemFinished, map[string]interface{}{ + f.evLogger.Log(events.ItemFinished, map[string]interface{}{ "folder": f.folderID, "item": file.Name, "error": events.Error(err), @@ -822,7 +822,7 @@ func (f *sendReceiveFolder) deleteFileWithCurrent(file, cur protocol.FileInfo, h f.resetPullError(file.Name) - events.Default.Log(events.ItemStarted, map[string]string{ + f.evLogger.Log(events.ItemStarted, map[string]string{ "folder": f.folderID, "item": file.Name, "type": "file", @@ -833,7 +833,7 @@ func (f *sendReceiveFolder) deleteFileWithCurrent(file, cur protocol.FileInfo, h if err != nil { f.newPullError(file.Name, errors.Wrap(err, "delete file")) } - events.Default.Log(events.ItemFinished, map[string]interface{}{ + f.evLogger.Log(events.ItemFinished, map[string]interface{}{ "folder": f.folderID, "item": file.Name, "error": events.Error(err), @@ -897,13 +897,13 @@ func (f *sendReceiveFolder) renameFile(cur, source, target protocol.FileInfo, db // care not declare another err. var err error - events.Default.Log(events.ItemStarted, map[string]string{ + f.evLogger.Log(events.ItemStarted, map[string]string{ "folder": f.folderID, "item": source.Name, "type": "file", "action": "delete", }) - events.Default.Log(events.ItemStarted, map[string]string{ + f.evLogger.Log(events.ItemStarted, map[string]string{ "folder": f.folderID, "item": target.Name, "type": "file", @@ -911,14 +911,14 @@ func (f *sendReceiveFolder) renameFile(cur, source, target protocol.FileInfo, db }) defer func() { - events.Default.Log(events.ItemFinished, map[string]interface{}{ + f.evLogger.Log(events.ItemFinished, map[string]interface{}{ "folder": f.folderID, "item": source.Name, "error": events.Error(err), "type": "file", "action": "delete", }) - events.Default.Log(events.ItemFinished, map[string]interface{}{ + f.evLogger.Log(events.ItemFinished, map[string]interface{}{ "folder": f.folderID, "item": target.Name, "error": events.Error(err), @@ -1095,7 +1095,7 @@ func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, copyChan chan<- c // Shuffle the blocks rand.Shuffle(blocks) - events.Default.Log(events.ItemStarted, map[string]string{ + f.evLogger.Log(events.ItemStarted, map[string]string{ "folder": f.folderID, "item": file.Name, "type": "file", @@ -1178,7 +1178,7 @@ func (f *sendReceiveFolder) shortcutFile(file, curFile protocol.FileInfo, dbUpda f.resetPullError(file.Name) - events.Default.Log(events.ItemStarted, map[string]string{ + f.evLogger.Log(events.ItemStarted, map[string]string{ "folder": f.folderID, "item": file.Name, "type": "file", @@ -1186,7 +1186,7 @@ func (f *sendReceiveFolder) shortcutFile(file, curFile protocol.FileInfo, dbUpda }) var err error - defer events.Default.Log(events.ItemFinished, map[string]interface{}{ + defer f.evLogger.Log(events.ItemFinished, map[string]interface{}{ "folder": f.folderID, "item": file.Name, "error": events.Error(err), @@ -1575,7 +1575,7 @@ func (f *sendReceiveFolder) finisherRoutine(in <-chan *sharedPullerState, dbUpda f.model.progressEmitter.Deregister(state) - events.Default.Log(events.ItemFinished, map[string]interface{}{ + f.evLogger.Log(events.ItemFinished, map[string]interface{}{ "folder": f.folderID, "item": state.file.Name, "error": events.Error(err), diff --git a/lib/model/folder_sendrecv_test.go b/lib/model/folder_sendrecv_test.go index 3e53e252b..2dbda2b43 100644 --- a/lib/model/folder_sendrecv_test.go +++ b/lib/model/folder_sendrecv_test.go @@ -96,7 +96,7 @@ func setupSendReceiveFolder(files ...protocol.FileInfo) (*model, *sendReceiveFol f := &sendReceiveFolder{ folder: folder{ - stateTracker: newStateTracker("default"), + stateTracker: newStateTracker("default", model.evLogger), model: model, fset: model.folderFiles[fcfg.ID], initialScanFinished: make(chan struct{}), @@ -121,6 +121,12 @@ func setupSendReceiveFolder(files ...protocol.FileInfo) (*model, *sendReceiveFol return model, f } +func cleanupSRFolder(f *sendReceiveFolder, m *model) { + m.evLogger.Stop() + os.Remove(m.cfg.ConfigPath()) + os.Remove(f.Filesystem().URI()) +} + // Layout of the files: (indexes from the above array) // 12345678 - Required file // 02005008 - Existing file (currently in the index) @@ -137,10 +143,7 @@ func TestHandleFile(t *testing.T) { requiredFile.Blocks = blocks[1:] m, f := setupSendReceiveFolder(existingFile) - defer func() { - os.Remove(m.cfg.ConfigPath()) - os.Remove(f.Filesystem().URI()) - }() + defer cleanupSRFolder(f, m) copyChan := make(chan copyBlocksState, 1) dbUpdateChan := make(chan dbUpdateJob, 1) @@ -183,10 +186,7 @@ func TestHandleFileWithTemp(t *testing.T) { requiredFile.Blocks = blocks[1:] m, f := setupSendReceiveFolder(existingFile) - defer func() { - os.Remove(m.cfg.ConfigPath()) - os.Remove(f.Filesystem().URI()) - }() + defer cleanupSRFolder(f, m) if _, err := prepareTmpFile(f.Filesystem()); err != nil { t.Fatal(err) @@ -236,10 +236,7 @@ func TestCopierFinder(t *testing.T) { requiredFile.Name = "file2" m, f := setupSendReceiveFolder(existingFile) - defer func() { - os.Remove(m.cfg.ConfigPath()) - os.Remove(f.Filesystem().URI()) - }() + defer cleanupSRFolder(f, m) if _, err := prepareTmpFile(f.Filesystem()); err != nil { t.Fatal(err) @@ -302,11 +299,8 @@ func TestCopierFinder(t *testing.T) { func TestWeakHash(t *testing.T) { // Setup the model/pull environment model, fo := setupSendReceiveFolder() + defer cleanupSRFolder(fo, model) ffs := fo.Filesystem() - defer func() { - os.Remove(model.cfg.ConfigPath()) - os.Remove(ffs.URI()) - }() tempFile := fs.TempName("weakhash") var shift int64 = 10 @@ -432,10 +426,7 @@ func TestCopierCleanup(t *testing.T) { // Create a file file := setupFile("test", []int{0}) m, f := setupSendReceiveFolder(file) - defer func() { - os.Remove(m.cfg.ConfigPath()) - os.Remove(f.Filesystem().URI()) - }() + defer cleanupSRFolder(f, m) file.Blocks = []protocol.BlockInfo{blocks[1]} file.Version = file.Version.Update(myID.Short()) @@ -468,13 +459,10 @@ func TestDeregisterOnFailInCopy(t *testing.T) { file := setupFile("filex", []int{0, 2, 0, 0, 5, 0, 0, 8}) m, f := setupSendReceiveFolder() - defer func() { - os.Remove(m.cfg.ConfigPath()) - os.Remove(f.Filesystem().URI()) - }() + defer cleanupSRFolder(f, m) // Set up our evet subscription early - s := events.Default.Subscribe(events.ItemFinished) + s := m.evLogger.Subscribe(events.ItemFinished) // queue.Done should be called by the finisher routine f.queue.Push("filex", 0, time.Time{}) @@ -558,13 +546,10 @@ func TestDeregisterOnFailInPull(t *testing.T) { file := setupFile("filex", []int{0, 2, 0, 0, 5, 0, 0, 8}) m, f := setupSendReceiveFolder() - defer func() { - os.Remove(m.cfg.ConfigPath()) - os.Remove(f.Filesystem().URI()) - }() + defer cleanupSRFolder(f, m) // Set up our evet subscription early - s := events.Default.Subscribe(events.ItemFinished) + s := m.evLogger.Subscribe(events.ItemFinished) // queue.Done should be called by the finisher routine f.queue.Push("filex", 0, time.Time{}) @@ -636,12 +621,9 @@ func TestDeregisterOnFailInPull(t *testing.T) { func TestIssue3164(t *testing.T) { m, f := setupSendReceiveFolder() + defer cleanupSRFolder(f, m) ffs := f.Filesystem() tmpDir := ffs.URI() - defer func() { - os.Remove(m.cfg.ConfigPath()) - os.Remove(tmpDir) - }() ignDir := filepath.Join("issue3164", "oktodelete") subDir := filepath.Join(ignDir, "foobar") @@ -728,11 +710,8 @@ func TestDiffEmpty(t *testing.T) { // in the db. func TestDeleteIgnorePerms(t *testing.T) { m, f := setupSendReceiveFolder() + defer cleanupSRFolder(f, m) ffs := f.Filesystem() - defer func() { - os.Remove(m.cfg.ConfigPath()) - os.Remove(ffs.URI()) - }() f.IgnorePerms = true name := "deleteIgnorePerms" @@ -778,7 +757,7 @@ func TestCopyOwner(t *testing.T) { // filesystem. m, f := setupSendReceiveFolder() - defer os.Remove(m.cfg.ConfigPath()) + defer cleanupSRFolder(f, m) f.folder.FolderConfiguration = config.NewFolderConfiguration(m.id, f.ID, f.Label, fs.FilesystemTypeFake, "/TestCopyOwner") f.folder.FolderConfiguration.CopyOwnershipFromParent = true @@ -867,11 +846,8 @@ func TestCopyOwner(t *testing.T) { // is replaced with a directory and versions are conflicting func TestSRConflictReplaceFileByDir(t *testing.T) { m, f := setupSendReceiveFolder() + defer cleanupSRFolder(f, m) ffs := f.Filesystem() - defer func() { - os.Remove(m.cfg.ConfigPath()) - os.Remove(ffs.URI()) - }() name := "foo" @@ -902,11 +878,8 @@ func TestSRConflictReplaceFileByDir(t *testing.T) { // is replaced with a link and versions are conflicting func TestSRConflictReplaceFileByLink(t *testing.T) { m, f := setupSendReceiveFolder() + defer cleanupSRFolder(f, m) ffs := f.Filesystem() - defer func() { - os.Remove(m.cfg.ConfigPath()) - os.Remove(ffs.URI()) - }() name := "foo" diff --git a/lib/model/folder_summary.go b/lib/model/folder_summary.go index d9dd07f55..d4bda64e0 100644 --- a/lib/model/folder_summary.go +++ b/lib/model/folder_summary.go @@ -36,6 +36,7 @@ type folderSummaryService struct { cfg config.Wrapper model Model id protocol.DeviceID + evLogger events.Logger immediate chan string // For keeping track of folders to recalculate for @@ -47,7 +48,7 @@ type folderSummaryService struct { lastEventReqMut sync.Mutex } -func NewFolderSummaryService(cfg config.Wrapper, m Model, id protocol.DeviceID) FolderSummaryService { +func NewFolderSummaryService(cfg config.Wrapper, m Model, id protocol.DeviceID, evLogger events.Logger) FolderSummaryService { service := &folderSummaryService{ Supervisor: suture.New("folderSummaryService", suture.Spec{ PassThroughPanics: true, @@ -55,6 +56,7 @@ func NewFolderSummaryService(cfg config.Wrapper, m Model, id protocol.DeviceID) cfg: cfg, model: m, id: id, + evLogger: evLogger, immediate: make(chan string), folders: make(map[string]struct{}), foldersMut: sync.NewMutex(), @@ -144,8 +146,8 @@ func (c *folderSummaryService) OnEventRequest() { // listenForUpdates subscribes to the event bus and makes note of folders that // need their data recalculated. func (c *folderSummaryService) listenForUpdates(stop chan struct{}) { - sub := events.Default.Subscribe(events.LocalIndexUpdated | events.RemoteIndexUpdated | events.StateChanged | events.RemoteDownloadProgress | events.DeviceConnected | events.FolderWatchStateChanged | events.DownloadProgress) - defer events.Default.Unsubscribe(sub) + sub := c.evLogger.Subscribe(events.LocalIndexUpdated | events.RemoteIndexUpdated | events.StateChanged | events.RemoteDownloadProgress | events.DeviceConnected | events.FolderWatchStateChanged | events.DownloadProgress) + defer sub.Unsubscribe() for { // This loop needs to be fast so we don't miss too many events. @@ -291,7 +293,7 @@ func (c *folderSummaryService) sendSummary(folder string) { if err != nil { return } - events.Default.Log(events.FolderSummary, map[string]interface{}{ + c.evLogger.Log(events.FolderSummary, map[string]interface{}{ "folder": folder, "summary": data, }) @@ -311,6 +313,6 @@ func (c *folderSummaryService) sendSummary(folder string) { comp := c.model.Completion(devCfg.DeviceID, folder).Map() comp["folder"] = folder comp["device"] = devCfg.DeviceID.String() - events.Default.Log(events.FolderCompletion, comp) + c.evLogger.Log(events.FolderCompletion, comp) } } diff --git a/lib/model/folderstate.go b/lib/model/folderstate.go index cd504cb81..d11a72ea0 100644 --- a/lib/model/folderstate.go +++ b/lib/model/folderstate.go @@ -42,6 +42,7 @@ func (s folderState) String() string { type stateTracker struct { folderID string + evLogger events.Logger mut sync.Mutex current folderState @@ -49,9 +50,10 @@ type stateTracker struct { changed time.Time } -func newStateTracker(id string) stateTracker { +func newStateTracker(id string, evLogger events.Logger) stateTracker { return stateTracker{ folderID: id, + evLogger: evLogger, mut: sync.NewMutex(), } } @@ -83,7 +85,7 @@ func (s *stateTracker) setState(newState folderState) { s.current = newState s.changed = time.Now() - events.Default.Log(events.StateChanged, eventData) + s.evLogger.Log(events.StateChanged, eventData) } s.mut.Unlock() } @@ -124,5 +126,5 @@ func (s *stateTracker) setError(err error) { s.err = err s.changed = time.Now() - events.Default.Log(events.StateChanged, eventData) + s.evLogger.Log(events.StateChanged, eventData) } diff --git a/lib/model/model.go b/lib/model/model.go index 1e71c1c13..b63a13e1b 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -128,6 +128,7 @@ type model struct { shortID protocol.ShortID cacheIgnoredFiles bool protectedFiles []string + evLogger events.Logger clientName string clientVersion string @@ -152,7 +153,7 @@ type model struct { foldersRunning int32 // for testing only } -type folderFactory func(*model, *db.FileSet, *ignore.Matcher, config.FolderConfiguration, versioner.Versioner, fs.Filesystem) service +type folderFactory func(*model, *db.FileSet, *ignore.Matcher, config.FolderConfiguration, versioner.Versioner, fs.Filesystem, events.Logger) service var ( folderFactories = make(map[config.FolderType]folderFactory) @@ -175,7 +176,7 @@ var ( // NewModel creates and starts a new model. The model starts in read-only mode, // where it sends index information to connected peers and responds to requests // for file data without altering the local folder in any way. -func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersion string, ldb *db.Lowlevel, protectedFiles []string) Model { +func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersion string, ldb *db.Lowlevel, protectedFiles []string, evLogger events.Logger) Model { m := &model{ Supervisor: suture.New("model", suture.Spec{ Log: func(line string) { @@ -186,11 +187,12 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio cfg: cfg, db: ldb, finder: db.NewBlockFinder(ldb), - progressEmitter: NewProgressEmitter(cfg), + progressEmitter: NewProgressEmitter(cfg, evLogger), id: id, shortID: id.Short(), cacheIgnoredFiles: cfg.Options().CacheIgnoredFiles, protectedFiles: protectedFiles, + evLogger: evLogger, clientName: clientName, clientVersion: clientVersion, folderCfgs: make(map[string]config.FolderConfiguration), @@ -310,7 +312,7 @@ func (m *model) startFolderLocked(cfg config.FolderConfiguration) { ffs.Hide(".stversions") ffs.Hide(".stignore") - p := folderFactory(m, fset, m.folderIgnores[folder], cfg, ver, ffs) + p := folderFactory(m, fset, m.folderIgnores[folder], cfg, ver, ffs, m.evLogger) m.folderRunners[folder] = p @@ -1023,7 +1025,7 @@ func (m *model) handleIndex(deviceID protocol.DeviceID, folder string, fs []prot } files.Update(deviceID, fs) - events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{ + m.evLogger.Log(events.RemoteIndexUpdated, map[string]interface{}{ "device": deviceID.String(), "folder": folder, "items": len(fs), @@ -1077,7 +1079,7 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon } m.cfg.AddOrUpdatePendingFolder(folder.ID, folder.Label, deviceID) changed = true - events.Default.Log(events.FolderRejected, map[string]string{ + m.evLogger.Log(events.FolderRejected, map[string]string{ "folder": folder.ID, "folderLabel": folder.Label, "device": deviceID.String(), @@ -1180,6 +1182,7 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon fset: fs, prevSequence: startSequence, dropSymlinks: dropSymlinks, + evLogger: m.evLogger, } is.Service = util.AsService(is.serve) // The token isn't tracked as the service stops when the connection @@ -1432,7 +1435,7 @@ func (m *model) Closed(conn protocol.Connection, err error) { delete(m.closed, device) l.Infof("Connection to %s at %s closed: %v", device, conn.Name(), err) - events.Default.Log(events.DeviceDisconnected, map[string]string{ + m.evLogger.Log(events.DeviceDisconnected, map[string]string{ "id": device.String(), "error": err.Error(), }) @@ -1773,7 +1776,7 @@ func (m *model) OnHello(remoteID protocol.DeviceID, addr net.Addr, hello protoco if !ok { m.cfg.AddOrUpdatePendingDevice(remoteID, hello.DeviceName, addr.String()) _ = m.cfg.Save() // best effort - events.Default.Log(events.DeviceRejected, map[string]string{ + m.evLogger.Log(events.DeviceRejected, map[string]string{ "name": hello.DeviceName, "device": remoteID.String(), "address": addr.String(), @@ -1859,7 +1862,7 @@ func (m *model) AddConnection(conn connections.Connection, hello protocol.HelloR event["addr"] = addr.String() } - events.Default.Log(events.DeviceConnected, event) + m.evLogger.Log(events.DeviceConnected, event) l.Infof(`Device %s client is "%s %s" named "%s" at %s`, deviceID, hello.ClientName, hello.ClientVersion, hello.DeviceName, conn) @@ -1894,7 +1897,7 @@ func (m *model) DownloadProgress(device protocol.DeviceID, folder string, update downloads.Update(folder, updates) state := downloads.GetBlockCounts(folder) - events.Default.Log(events.RemoteDownloadProgress, map[string]interface{}{ + m.evLogger.Log(events.RemoteDownloadProgress, map[string]interface{}{ "device": device.String(), "folder": folder, "state": state, @@ -1926,6 +1929,7 @@ type indexSender struct { fset *db.FileSet prevSequence int64 dropSymlinks bool + evLogger events.Logger connClosed chan struct{} } @@ -1941,8 +1945,8 @@ func (s *indexSender) serve(stop chan struct{}) { // Subscribe to LocalIndexUpdated (we have new information to send) and // DeviceDisconnected (it might be us who disconnected, so we should // exit). - sub := events.Default.Subscribe(events.LocalIndexUpdated | events.DeviceDisconnected) - defer events.Default.Unsubscribe(sub) + sub := s.evLogger.Subscribe(events.LocalIndexUpdated | events.DeviceDisconnected) + defer sub.Unsubscribe() evChan := sub.C() ticker := time.NewTicker(time.Minute) @@ -2531,7 +2535,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { if toCfg.Paused { eventType = events.FolderPaused } - events.Default.Log(eventType, map[string]string{"id": toCfg.ID, "label": toCfg.Label}) + m.evLogger.Log(eventType, map[string]string{"id": toCfg.ID, "label": toCfg.Label}) } } @@ -2559,9 +2563,9 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { if toCfg.Paused { l.Infoln("Pausing", deviceID) m.closeConn(deviceID, errDevicePaused) - events.Default.Log(events.DevicePaused, map[string]string{"device": deviceID.String()}) + m.evLogger.Log(events.DevicePaused, map[string]string{"device": deviceID.String()}) } else { - events.Default.Log(events.DeviceResumed, map[string]string{"device": deviceID.String()}) + m.evLogger.Log(events.DeviceResumed, map[string]string{"device": deviceID.String()}) } } diff --git a/lib/model/model_test.go b/lib/model/model_test.go index 1689df458..9adf0bf87 100644 --- a/lib/model/model_test.go +++ b/lib/model/model_test.go @@ -110,7 +110,7 @@ func createTmpWrapper(cfg config.Configuration) config.Wrapper { if err != nil { panic(err) } - wrapper := config.Wrap(tmpFile.Name(), cfg) + wrapper := config.Wrap(tmpFile.Name(), cfg, events.NoopLogger) tmpFile.Close() return wrapper } @@ -303,7 +303,7 @@ func TestDeviceRename(t *testing.T) { DeviceID: device1, }, } - cfg := config.Wrap("testdata/tmpconfig.xml", rawCfg) + cfg := config.Wrap("testdata/tmpconfig.xml", rawCfg, events.NoopLogger) db := db.OpenMemory() m := newModel(cfg, myID, "syncthing", "dev", db, nil) @@ -339,7 +339,7 @@ func TestDeviceRename(t *testing.T) { t.Errorf("Device name got overwritten") } - cfgw, err := config.Load("testdata/tmpconfig.xml", myID) + cfgw, err := config.Load("testdata/tmpconfig.xml", myID, events.NoopLogger) if err != nil { t.Error(err) return @@ -3358,12 +3358,12 @@ func TestModTimeWindow(t *testing.T) { } func TestDevicePause(t *testing.T) { - sub := events.Default.Subscribe(events.DevicePaused) - defer events.Default.Unsubscribe(sub) - m, _, fcfg := setupModelWithConnection() defer cleanupModelAndRemoveDir(m, fcfg.Filesystem().URI()) + sub := m.evLogger.Subscribe(events.DevicePaused) + defer sub.Unsubscribe() + m.pmut.RLock() closed := m.closed[device1] m.pmut.RUnlock() diff --git a/lib/model/progressemitter.go b/lib/model/progressemitter.go index 27c0f4ae9..fed22e259 100644 --- a/lib/model/progressemitter.go +++ b/lib/model/progressemitter.go @@ -29,6 +29,7 @@ type ProgressEmitter struct { connections map[protocol.DeviceID]protocol.Connection foldersByConns map[protocol.DeviceID][]string disabled bool + evLogger events.Logger mut sync.Mutex timer *time.Timer @@ -36,13 +37,14 @@ type ProgressEmitter struct { // NewProgressEmitter creates a new progress emitter which emits // DownloadProgress events every interval. -func NewProgressEmitter(cfg config.Wrapper) *ProgressEmitter { +func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmitter { t := &ProgressEmitter{ registry: make(map[string]map[string]*sharedPullerState), timer: time.NewTimer(time.Millisecond), sentDownloadStates: make(map[protocol.DeviceID]*sentDownloadState), connections: make(map[protocol.DeviceID]protocol.Connection), foldersByConns: make(map[protocol.DeviceID][]string), + evLogger: evLogger, mut: sync.NewMutex(), } t.Service = util.AsService(t.serve) @@ -107,7 +109,7 @@ func (t *ProgressEmitter) sendDownloadProgressEventLocked() { output[folder][name] = puller.Progress() } } - events.Default.Log(events.DownloadProgress, output) + t.evLogger.Log(events.DownloadProgress, output) l.Debugf("progress emitter: emitting %#v", output) } diff --git a/lib/model/progressemitter_test.go b/lib/model/progressemitter_test.go index 55df08032..ca3140e53 100644 --- a/lib/model/progressemitter_test.go +++ b/lib/model/progressemitter_test.go @@ -30,7 +30,7 @@ func caller(skip int) string { return fmt.Sprintf("%s:%d", filepath.Base(file), line) } -func expectEvent(w *events.Subscription, t *testing.T, size int) { +func expectEvent(w events.Subscription, t *testing.T, size int) { event, err := w.Poll(timeout) if err != nil { t.Fatal("Unexpected error:", err, "at", caller(1)) @@ -44,7 +44,7 @@ func expectEvent(w *events.Subscription, t *testing.T, size int) { } } -func expectTimeout(w *events.Subscription, t *testing.T) { +func expectTimeout(w events.Subscription, t *testing.T) { _, err := w.Poll(timeout) if err != events.ErrTimeout { t.Fatal("Unexpected non-Timeout error:", err, "at", caller(1)) @@ -52,7 +52,11 @@ func expectTimeout(w *events.Subscription, t *testing.T) { } func TestProgressEmitter(t *testing.T) { - w := events.Default.Subscribe(events.DownloadProgress) + evLogger := events.NewLogger() + go evLogger.Serve() + defer evLogger.Stop() + + w := evLogger.Subscribe(events.DownloadProgress) c := createTmpWrapper(config.Configuration{}) defer os.Remove(c.ConfigPath()) @@ -60,7 +64,7 @@ func TestProgressEmitter(t *testing.T) { ProgressUpdateIntervalS: 0, }) - p := NewProgressEmitter(c) + p := NewProgressEmitter(c, evLogger) go p.Serve() p.interval = 0 @@ -112,7 +116,11 @@ func TestSendDownloadProgressMessages(t *testing.T) { fc := &fakeConnection{} - p := NewProgressEmitter(c) + evLogger := events.NewLogger() + go evLogger.Serve() + defer evLogger.Stop() + + p := NewProgressEmitter(c, evLogger) p.temporaryIndexSubscribe(fc, []string{"folder", "folder2"}) p.registry["folder"] = make(map[string]*sharedPullerState) p.registry["folder2"] = make(map[string]*sharedPullerState) diff --git a/lib/model/requests_test.go b/lib/model/requests_test.go index 6ffe8fc08..f71829582 100644 --- a/lib/model/requests_test.go +++ b/lib/model/requests_test.go @@ -350,8 +350,8 @@ func pullInvalidIgnored(t *testing.T, ft config.FolderType) { } fc.mut.Unlock() - sub := events.Default.Subscribe(events.FolderErrors) - defer events.Default.Unsubscribe(sub) + sub := m.evLogger.Subscribe(events.FolderErrors) + defer sub.Unsubscribe() fc.sendIndexUpdate() @@ -640,8 +640,8 @@ func TestRequestSymlinkWindows(t *testing.T) { t.Fatalf("timed out before pull was finished") } - sub := events.Default.Subscribe(events.StateChanged | events.LocalIndexUpdated) - defer events.Default.Unsubscribe(sub) + sub := m.evLogger.Subscribe(events.StateChanged | events.LocalIndexUpdated) + defer sub.Unsubscribe() m.ScanFolder("default") @@ -978,8 +978,8 @@ func TestNeedFolderFiles(t *testing.T) { tmpDir := tfs.URI() defer cleanupModelAndRemoveDir(m, tmpDir) - sub := events.Default.Subscribe(events.RemoteIndexUpdated) - defer events.Default.Unsubscribe(sub) + sub := m.evLogger.Subscribe(events.RemoteIndexUpdated) + defer sub.Unsubscribe() errPreventSync := errors.New("you aren't getting any of this") fc.mut.Lock() diff --git a/lib/model/testutils_test.go b/lib/model/testutils_test.go index 3c0b42260..49e128dfc 100644 --- a/lib/model/testutils_test.go +++ b/lib/model/testutils_test.go @@ -13,6 +13,7 @@ import ( "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/db" + "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/fs" "github.com/syncthing/syncthing/lib/protocol" ) @@ -117,12 +118,16 @@ func setupModel(w config.Wrapper) *model { } func newModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersion string, ldb *db.Lowlevel, protectedFiles []string) *model { - return NewModel(cfg, id, clientName, clientVersion, ldb, protectedFiles).(*model) + evLogger := events.NewLogger() + m := NewModel(cfg, id, clientName, clientVersion, ldb, protectedFiles, evLogger).(*model) + go evLogger.Serve() + return m } func cleanupModel(m *model) { m.Stop() m.db.Close() + m.evLogger.Stop() os.Remove(m.cfg.ConfigPath()) } diff --git a/lib/rc/rc.go b/lib/rc/rc.go index 21004fef5..b87c0fcbb 100644 --- a/lib/rc/rc.go +++ b/lib/rc/rc.go @@ -26,6 +26,7 @@ import ( "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/dialer" + "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/sync" ) @@ -455,7 +456,7 @@ func (p *Process) eventLoop() { default: } - events, err := p.Events(since) + evs, err := p.Events(since) if err != nil { if time.Since(start) < 5*time.Second { // The API has probably not started yet, lets give it some time. @@ -473,7 +474,7 @@ func (p *Process) eventLoop() { continue } - for _, ev := range events { + for _, ev := range evs { if ev.ID != since+1 { l.Warnln("Event ID jumped", since, "to", ev.ID) } @@ -493,7 +494,7 @@ func (p *Process) eventLoop() { p.id = id home := data["home"].(string) - w, err := config.Load(filepath.Join(home, "config.xml"), protocol.LocalDeviceID) + w, err := config.Load(filepath.Join(home, "config.xml"), protocol.LocalDeviceID, events.NoopLogger) if err != nil { log.Println("eventLoop: Starting:", err) continue diff --git a/lib/scanner/walk.go b/lib/scanner/walk.go index 92be4c6a6..3be171306 100644 --- a/lib/scanner/walk.go +++ b/lib/scanner/walk.go @@ -56,6 +56,8 @@ type Config struct { LocalFlags uint32 // Modification time is to be considered unchanged if the difference is lower. ModTimeWindow time.Duration + // Event logger to which the scan progress events are sent + EvLogger events.Logger } type CurrentFiler interface { @@ -168,7 +170,7 @@ func (w *walker) walk(ctx context.Context) chan ScanResult { current := progress.Total() rate := progress.Rate() l.Debugf("Walk %s %s current progress %d/%d at %.01f MiB/s (%d%%)", w.Folder, w.Subs, current, total, rate/1024/1024, current*100/total) - events.Default.Log(events.FolderScanProgress, map[string]interface{}{ + w.EvLogger.Log(events.FolderScanProgress, map[string]interface{}{ "folder": w.Folder, "current": current, "total": total, diff --git a/lib/scanner/walk_test.go b/lib/scanner/walk_test.go index 928e8ede1..4b2bd26b1 100644 --- a/lib/scanner/walk_test.go +++ b/lib/scanner/walk_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/d4l3k/messagediff" + "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/fs" "github.com/syncthing/syncthing/lib/ignore" "github.com/syncthing/syncthing/lib/osutil" @@ -66,12 +67,10 @@ func TestWalkSub(t *testing.T) { t.Fatal(err) } - fchan := Walk(context.TODO(), Config{ - Filesystem: testFs, - Subs: []string{"dir2"}, - Matcher: ignores, - Hashers: 2, - }) + cfg := testConfig() + cfg.Subs = []string{"dir2"} + cfg.Matcher = ignores + fchan := Walk(context.TODO(), cfg) var files []protocol.FileInfo for f := range fchan { if f.Err != nil { @@ -102,11 +101,9 @@ func TestWalk(t *testing.T) { } t.Log(ignores) - fchan := Walk(context.TODO(), Config{ - Filesystem: testFs, - Matcher: ignores, - Hashers: 2, - }) + cfg := testConfig() + cfg.Matcher = ignores + fchan := Walk(context.TODO(), cfg) var tmp []protocol.FileInfo for f := range fchan { @@ -466,15 +463,14 @@ func TestWalkReceiveOnly(t *testing.T) { } func walkDir(fs fs.Filesystem, dir string, cfiler CurrentFiler, matcher *ignore.Matcher, localFlags uint32) []protocol.FileInfo { - fchan := Walk(context.TODO(), Config{ - Filesystem: fs, - Subs: []string{dir}, - AutoNormalize: true, - Hashers: 2, - CurrentFiler: cfiler, - Matcher: matcher, - LocalFlags: localFlags, - }) + cfg := testConfig() + cfg.Filesystem = fs + cfg.Subs = []string{dir} + cfg.AutoNormalize = true + cfg.CurrentFiler = cfiler + cfg.Matcher = matcher + cfg.LocalFlags = localFlags + fchan := Walk(context.TODO(), cfg) var tmp []protocol.FileInfo for f := range fchan { @@ -576,11 +572,11 @@ func TestStopWalk(t *testing.T) { const numHashers = 4 ctx, cancel := context.WithCancel(context.Background()) - fchan := Walk(ctx, Config{ - Filesystem: fs, - Hashers: numHashers, - ProgressTickIntervalS: -1, // Don't attempt to build the full list of files before starting to scan... - }) + cfg := testConfig() + cfg.Filesystem = fs + cfg.Hashers = numHashers + cfg.ProgressTickIntervalS = -1 // Don't attempt to build the full list of files before starting to scan... + fchan := Walk(ctx, cfg) // Receive a few entries to make sure the walker is up and running, // scanning both files and dirs. Do some quick sanity tests on the @@ -705,21 +701,17 @@ func TestIssue4841(t *testing.T) { } fd.Close() - fchan := Walk(context.TODO(), Config{ - Filesystem: fs, - Subs: nil, - AutoNormalize: true, - Hashers: 2, - CurrentFiler: fakeCurrentFiler{ - "foo": { - Name: "foo", - Type: protocol.FileInfoTypeFile, - LocalFlags: protocol.FlagLocalIgnored, - Version: protocol.Vector{}.Update(1), - }, - }, - ShortID: protocol.LocalDeviceID.Short(), - }) + cfg := testConfig() + cfg.Filesystem = fs + cfg.AutoNormalize = true + cfg.CurrentFiler = fakeCurrentFiler{"foo": { + Name: "foo", + Type: protocol.FileInfoTypeFile, + LocalFlags: protocol.FlagLocalIgnored, + Version: protocol.Vector{}.Update(1), + }} + cfg.ShortID = protocol.LocalDeviceID.Short() + fchan := Walk(context.TODO(), cfg) var files []protocol.FileInfo for f := range fchan { @@ -745,11 +737,9 @@ func TestNotExistingError(t *testing.T) { t.Fatalf("Lstat returned error %v, while nothing should exist there.", err) } - fchan := Walk(context.TODO(), Config{ - Filesystem: testFs, - Subs: []string{sub}, - Hashers: 2, - }) + cfg := testConfig() + cfg.Subs = []string{sub} + fchan := Walk(context.TODO(), cfg) for f := range fchan { t.Fatalf("Expected no result from scan, got %v", f) } @@ -793,3 +783,13 @@ func (fcf fakeCurrentFiler) CurrentFile(name string) (protocol.FileInfo, bool) { f, ok := fcf[name] return f, ok } + +func testConfig() Config { + evLogger := events.NewLogger() + go evLogger.Serve() + return Config{ + Filesystem: testFs, + Hashers: 2, + EvLogger: evLogger, + } +} diff --git a/lib/syncthing/auditservice.go b/lib/syncthing/auditservice.go index 456381a20..863a437e0 100644 --- a/lib/syncthing/auditservice.go +++ b/lib/syncthing/auditservice.go @@ -21,13 +21,13 @@ import ( type auditService struct { suture.Service w io.Writer // audit destination - sub *events.Subscription + sub events.Subscription } -func newAuditService(w io.Writer) *auditService { +func newAuditService(w io.Writer, evLogger events.Logger) *auditService { s := &auditService{ w: w, - sub: events.Default.Subscribe(events.AllEvents), + sub: evLogger.Subscribe(events.AllEvents), } s.Service = util.AsService(s.serve) return s @@ -50,5 +50,5 @@ func (s *auditService) serve(stop chan struct{}) { // Stop stops the audit service. func (s *auditService) Stop() { s.Service.Stop() - events.Default.Unsubscribe(s.sub) + s.sub.Unsubscribe() } diff --git a/lib/syncthing/auditservice_test.go b/lib/syncthing/auditservice_test.go index b2df61ab4..13b67bd24 100644 --- a/lib/syncthing/auditservice_test.go +++ b/lib/syncthing/auditservice_test.go @@ -17,15 +17,22 @@ import ( func TestAuditService(t *testing.T) { buf := new(bytes.Buffer) + evLogger := events.NewLogger() + go evLogger.Serve() + defer evLogger.Stop() + sub := evLogger.Subscribe(events.AllEvents) + defer sub.Unsubscribe() - // Event sent before construction, will not be logged - events.Default.Log(events.ConfigSaved, "the first event") + // Event sent before start, will not be logged + evLogger.Log(events.ConfigSaved, "the first event") + // Make sure the event goes through before creating the service + <-sub.C() - service := newAuditService(buf) + service := newAuditService(buf, evLogger) go service.Serve() // Event that should end up in the audit log - events.Default.Log(events.ConfigSaved, "the second event") + evLogger.Log(events.ConfigSaved, "the second event") // We need to give the events time to arrive, since the channels are buffered etc. time.Sleep(10 * time.Millisecond) @@ -33,7 +40,7 @@ func TestAuditService(t *testing.T) { service.Stop() // This event should not be logged, since we have stopped. - events.Default.Log(events.ConfigSaved, "the third event") + evLogger.Log(events.ConfigSaved, "the third event") result := buf.String() t.Log(result) diff --git a/lib/syncthing/syncthing.go b/lib/syncthing/syncthing.go index 120672396..08a2cc9dd 100644 --- a/lib/syncthing/syncthing.go +++ b/lib/syncthing/syncthing.go @@ -68,6 +68,7 @@ type App struct { mainService *suture.Supervisor cfg config.Wrapper ll *db.Lowlevel + evLogger events.Logger cert tls.Certificate opts Options exitStatus ExitStatus @@ -78,14 +79,15 @@ type App struct { stopped chan struct{} } -func New(cfg config.Wrapper, ll *db.Lowlevel, cert tls.Certificate, opts Options) *App { +func New(cfg config.Wrapper, ll *db.Lowlevel, evLogger events.Logger, cert tls.Certificate, opts Options) *App { return &App{ - cfg: cfg, - ll: ll, - opts: opts, - cert: cert, - stop: make(chan struct{}), - stopped: make(chan struct{}), + cfg: cfg, + ll: ll, + evLogger: evLogger, + opts: opts, + cert: cert, + stop: make(chan struct{}), + stopped: make(chan struct{}), } } @@ -120,11 +122,11 @@ func (a *App) startup() error { a.mainService.ServeBackground() if a.opts.AuditWriter != nil { - a.mainService.Add(newAuditService(a.opts.AuditWriter)) + a.mainService.Add(newAuditService(a.opts.AuditWriter, a.evLogger)) } if a.opts.Verbose { - a.mainService.Add(newVerboseService()) + a.mainService.Add(newVerboseService(a.evLogger)) } errors := logger.NewRecorder(l, logger.LevelWarn, maxSystemErrors, 0) @@ -133,8 +135,8 @@ func (a *App) startup() error { // Event subscription for the API; must start early to catch the early // events. The LocalChangeDetected event might overwhelm the event // receiver in some situations so we will not subscribe to it here. - defaultSub := events.NewBufferedSubscription(events.Default.Subscribe(api.DefaultEventMask), api.EventSubBufferSize) - diskSub := events.NewBufferedSubscription(events.Default.Subscribe(api.DiskEventMask), api.EventSubBufferSize) + defaultSub := events.NewBufferedSubscription(a.evLogger.Subscribe(api.DefaultEventMask), api.EventSubBufferSize) + diskSub := events.NewBufferedSubscription(a.evLogger.Subscribe(api.DiskEventMask), api.EventSubBufferSize) // Attempt to increase the limit on number of open files to the maximum // allowed, in case we have many peers. We don't really care enough to @@ -153,7 +155,7 @@ func (a *App) startup() error { // Emit the Starting event, now that we know who we are. - events.Default.Log(events.Starting, map[string]string{ + a.evLogger.Log(events.Starting, map[string]string{ "home": locations.GetBaseDir(locations.ConfigBaseDir), "myID": a.myID.String(), }) @@ -228,7 +230,7 @@ func (a *App) startup() error { miscDB.PutString("prevVersion", build.Version) } - m := model.NewModel(a.cfg, a.myID, "syncthing", build.Version, a.ll, protectedFiles) + m := model.NewModel(a.cfg, a.myID, "syncthing", build.Version, a.ll, protectedFiles, a.evLogger) if a.opts.DeadlockTimeoutS > 0 { m.StartDeadlockDetector(time.Duration(a.opts.DeadlockTimeoutS) * time.Second) @@ -265,13 +267,13 @@ func (a *App) startup() error { // Start connection management - connectionsService := connections.NewService(a.cfg, a.myID, m, tlsCfg, cachedDiscovery, bepProtocolName, tlsDefaultCommonName) + connectionsService := connections.NewService(a.cfg, a.myID, m, tlsCfg, cachedDiscovery, bepProtocolName, tlsDefaultCommonName, a.evLogger) a.mainService.Add(connectionsService) if a.cfg.Options().GlobalAnnEnabled { for _, srv := range a.cfg.GlobalDiscoveryServers() { l.Infoln("Using discovery server", srv) - gd, err := discover.NewGlobal(srv, a.cert, connectionsService) + gd, err := discover.NewGlobal(srv, a.cert, connectionsService, a.evLogger) if err != nil { l.Warnln("Global discovery:", err) continue @@ -286,14 +288,14 @@ func (a *App) startup() error { if a.cfg.Options().LocalAnnEnabled { // v4 broadcasts - bcd, err := discover.NewLocal(a.myID, fmt.Sprintf(":%d", a.cfg.Options().LocalAnnPort), connectionsService) + bcd, err := discover.NewLocal(a.myID, fmt.Sprintf(":%d", a.cfg.Options().LocalAnnPort), connectionsService, a.evLogger) if err != nil { l.Warnln("IPv4 local discovery:", err) } else { cachedDiscovery.Add(bcd, 0, 0) } // v6 multicasts - mcd, err := discover.NewLocal(a.myID, a.cfg.Options().LocalAnnMCAddr, connectionsService) + mcd, err := discover.NewLocal(a.myID, a.cfg.Options().LocalAnnMCAddr, connectionsService, a.evLogger) if err != nil { l.Warnln("IPv6 local discovery:", err) } else { @@ -342,7 +344,7 @@ func (a *App) startup() error { l.Warnln("Syncthing should not run as a privileged or system user. Please consider using a normal user account.") } - events.Default.Log(events.StartupComplete, map[string]string{ + a.evLogger.Log(events.StartupComplete, map[string]string{ "myID": a.myID.String(), }) @@ -426,10 +428,10 @@ func (a *App) setupGUI(m model.Model, defaultSub, diskSub events.BufferedSubscri cpu := newCPUService() a.mainService.Add(cpu) - summaryService := model.NewFolderSummaryService(a.cfg, m, a.myID) + summaryService := model.NewFolderSummaryService(a.cfg, m, a.myID, a.evLogger) a.mainService.Add(summaryService) - apiSvc := api.New(a.myID, a.cfg, a.opts.AssetDir, tlsDefaultCommonName, m, defaultSub, diskSub, discoverer, connectionsService, urService, summaryService, errors, systemLog, cpu, &controller{a}, a.opts.NoUpgrade) + apiSvc := api.New(a.myID, a.cfg, a.opts.AssetDir, tlsDefaultCommonName, m, defaultSub, diskSub, a.evLogger, discoverer, connectionsService, urService, summaryService, errors, systemLog, cpu, &controller{a}, a.opts.NoUpgrade) a.mainService.Add(apiSvc) if err := apiSvc.WaitForStart(); err != nil { diff --git a/lib/syncthing/syncthing_test.go b/lib/syncthing/syncthing_test.go index f40e1f78d..bc1ca5f6a 100644 --- a/lib/syncthing/syncthing_test.go +++ b/lib/syncthing/syncthing_test.go @@ -10,6 +10,7 @@ import ( "testing" "github.com/syncthing/syncthing/lib/config" + "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/protocol" ) @@ -19,7 +20,7 @@ func TestShortIDCheck(t *testing.T) { {DeviceID: protocol.DeviceID{8, 16, 24, 32, 40, 48, 56, 0, 0}}, {DeviceID: protocol.DeviceID{8, 16, 24, 32, 40, 48, 56, 1, 1}}, // first 56 bits same, differ in the first 64 bits }, - }) + }, events.NoopLogger) if err := checkShortIDs(cfg); err != nil { t.Error("Unexpected error:", err) @@ -30,7 +31,7 @@ func TestShortIDCheck(t *testing.T) { {DeviceID: protocol.DeviceID{8, 16, 24, 32, 40, 48, 56, 64, 0}}, {DeviceID: protocol.DeviceID{8, 16, 24, 32, 40, 48, 56, 64, 1}}, // first 64 bits same }, - }) + }, events.NoopLogger) if err := checkShortIDs(cfg); err == nil { t.Error("Should have gotten an error") diff --git a/lib/syncthing/utils.go b/lib/syncthing/utils.go index 3e7a0628e..1eaea087c 100644 --- a/lib/syncthing/utils.go +++ b/lib/syncthing/utils.go @@ -17,6 +17,7 @@ import ( "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/db" + "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/fs" "github.com/syncthing/syncthing/lib/locations" "github.com/syncthing/syncthing/lib/protocol" @@ -39,7 +40,7 @@ func LoadOrGenerateCertificate(certFile, keyFile string) (tls.Certificate, error return cert, nil } -func DefaultConfig(path string, myID protocol.DeviceID, noDefaultFolder bool) (config.Wrapper, error) { +func DefaultConfig(path string, myID protocol.DeviceID, evLogger events.Logger, noDefaultFolder bool) (config.Wrapper, error) { newCfg, err := config.NewWithFreePorts(myID) if err != nil { return nil, err @@ -47,23 +48,23 @@ func DefaultConfig(path string, myID protocol.DeviceID, noDefaultFolder bool) (c if noDefaultFolder { l.Infoln("We will skip creation of a default folder on first start") - return config.Wrap(path, newCfg), nil + return config.Wrap(path, newCfg, evLogger), nil } newCfg.Folders = append(newCfg.Folders, config.NewFolderConfiguration(myID, "default", "Default Folder", fs.FilesystemTypeBasic, locations.Get(locations.DefFolder))) l.Infoln("Default folder created and/or linked to new config") - return config.Wrap(path, newCfg), nil + return config.Wrap(path, newCfg, evLogger), nil } // LoadConfigAtStartup loads an existing config. If it doesn't yet exist, it // creates a default one, without the default folder if noDefaultFolder is ture. // Otherwise it checks the version, and archives and upgrades the config if // necessary or returns an error, if the version isn't compatible. -func LoadConfigAtStartup(path string, cert tls.Certificate, allowNewerConfig, noDefaultFolder bool) (config.Wrapper, error) { +func LoadConfigAtStartup(path string, cert tls.Certificate, evLogger events.Logger, allowNewerConfig, noDefaultFolder bool) (config.Wrapper, error) { myID := protocol.NewDeviceID(cert.Certificate[0]) - cfg, err := config.Load(path, myID) + cfg, err := config.Load(path, myID, evLogger) if fs.IsNotExist(err) { - cfg, err = DefaultConfig(path, myID, noDefaultFolder) + cfg, err = DefaultConfig(path, myID, evLogger, noDefaultFolder) if err != nil { return nil, errors.Wrap(err, "failed to generate default config") } diff --git a/lib/syncthing/verboseservice.go b/lib/syncthing/verboseservice.go index c056bba5e..c0c43cd86 100644 --- a/lib/syncthing/verboseservice.go +++ b/lib/syncthing/verboseservice.go @@ -19,12 +19,12 @@ import ( // verbose format to the console using INFO level. type verboseService struct { suture.Service - sub *events.Subscription + sub events.Subscription } -func newVerboseService() *verboseService { +func newVerboseService(evLogger events.Logger) *verboseService { s := &verboseService{ - sub: events.Default.Subscribe(events.AllEvents), + sub: evLogger.Subscribe(events.AllEvents), } s.Service = util.AsService(s.serve) return s @@ -48,7 +48,7 @@ func (s *verboseService) serve(stop chan struct{}) { // Stop stops the verbose logging service. func (s *verboseService) Stop() { s.Service.Stop() - events.Default.Unsubscribe(s.sub) + s.sub.Unsubscribe() } diff --git a/lib/watchaggregator/aggregator.go b/lib/watchaggregator/aggregator.go index 9175625d6..ecc03ad4a 100644 --- a/lib/watchaggregator/aggregator.go +++ b/lib/watchaggregator/aggregator.go @@ -125,19 +125,19 @@ func newAggregator(folderCfg config.FolderConfiguration, ctx context.Context) *a return a } -func Aggregate(in <-chan fs.Event, out chan<- []string, folderCfg config.FolderConfiguration, cfg config.Wrapper, ctx context.Context) { +func Aggregate(in <-chan fs.Event, out chan<- []string, folderCfg config.FolderConfiguration, cfg config.Wrapper, evLogger events.Logger, ctx context.Context) { a := newAggregator(folderCfg, ctx) // Necessary for unit tests where the backend is mocked - go a.mainLoop(in, out, cfg) + go a.mainLoop(in, out, cfg, evLogger) } -func (a *aggregator) mainLoop(in <-chan fs.Event, out chan<- []string, cfg config.Wrapper) { +func (a *aggregator) mainLoop(in <-chan fs.Event, out chan<- []string, cfg config.Wrapper, evLogger events.Logger) { a.notifyTimer = time.NewTimer(a.notifyDelay) defer a.notifyTimer.Stop() - inProgressItemSubscription := events.Default.Subscribe(events.ItemStarted | events.ItemFinished) - defer events.Default.Unsubscribe(inProgressItemSubscription) + inProgressItemSubscription := evLogger.Subscribe(events.ItemStarted | events.ItemFinished) + defer inProgressItemSubscription.Unsubscribe() cfg.Subscribe(a) defer cfg.Unsubscribe(a) diff --git a/lib/watchaggregator/aggregator_test.go b/lib/watchaggregator/aggregator_test.go index 30df4353e..c317642c8 100644 --- a/lib/watchaggregator/aggregator_test.go +++ b/lib/watchaggregator/aggregator_test.go @@ -47,7 +47,7 @@ var ( } defaultCfg = config.Wrap("", config.Configuration{ Folders: []config.FolderConfiguration{defaultFolderCfg}, - }) + }, events.NoopLogger) ) // Represents possibly multiple (different event types) expected paths from @@ -151,14 +151,17 @@ func TestAggregate(t *testing.T) { // TestInProgress checks that ignoring files currently edited by Syncthing works func TestInProgress(t *testing.T) { + evLogger := events.NewLogger() + go evLogger.Serve() + defer evLogger.Stop() testCase := func(c chan<- fs.Event) { - events.Default.Log(events.ItemStarted, map[string]string{ + evLogger.Log(events.ItemStarted, map[string]string{ "item": "inprogress", }) sleepMs(100) c <- fs.Event{Name: "inprogress", Type: fs.NonRemove} sleepMs(1000) - events.Default.Log(events.ItemFinished, map[string]interface{}{ + evLogger.Log(events.ItemFinished, map[string]interface{}{ "item": "inprogress", }) sleepMs(100) @@ -170,7 +173,7 @@ func TestInProgress(t *testing.T) { {[][]string{{"notinprogress"}}, 2000, 3500}, } - testScenario(t, "InProgress", testCase, expectedBatches) + testScenario(t, "InProgress", testCase, expectedBatches, evLogger) } // TestDelay checks that recurring changes to the same path are delayed @@ -208,7 +211,7 @@ func TestDelay(t *testing.T) { {[][]string{{delayed}, {delAfter}}, 3600, 7000}, } - testScenario(t, "Delay", testCase, expectedBatches) + testScenario(t, "Delay", testCase, expectedBatches, nil) } // TestNoDelay checks that no delay occurs if there are no non-remove events @@ -225,7 +228,7 @@ func TestNoDelay(t *testing.T) { {[][]string{{mixed}, {del}}, 500, 2000}, } - testScenario(t, "NoDelay", testCase, expectedBatches) + testScenario(t, "NoDelay", testCase, expectedBatches, nil) } func getEventPaths(dir *eventDir, dirPath string, a *aggregator) []string { @@ -277,8 +280,13 @@ func compareBatchToExpectedDirect(t *testing.T, batch []string, expectedPaths [] } } -func testScenario(t *testing.T, name string, testCase func(c chan<- fs.Event), expectedBatches []expectedBatch) { +func testScenario(t *testing.T, name string, testCase func(c chan<- fs.Event), expectedBatches []expectedBatch, evLogger events.Logger) { t.Helper() + + if evLogger == nil { + evLogger = events.NoopLogger + } + ctx, cancel := context.WithCancel(context.Background()) eventChan := make(chan fs.Event) watchChan := make(chan []string) @@ -289,7 +297,7 @@ func testScenario(t *testing.T, name string, testCase func(c chan<- fs.Event), e a.notifyTimeout = testNotifyTimeout startTime := time.Now() - go a.mainLoop(eventChan, watchChan, defaultCfg) + go a.mainLoop(eventChan, watchChan, defaultCfg, evLogger) sleepMs(20)