From ff2cde469e3b00259cd9b5ee89a06400bf7efd73 Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Wed, 5 Dec 2018 07:40:05 +0000 Subject: [PATCH] lib/model: Allow limiting number of concurrent scans (fixes #2760) (#4888) --- gui/default/index.html | 1 + .../syncthing/core/syncthingController.js | 2 +- lib/config/optionsconfiguration.go | 1 + lib/model/bytesemaphore.go | 30 ++++- lib/model/bytesemaphore_test.go | 113 ++++++++++++++++++ lib/model/folder.go | 7 ++ lib/model/folderstate.go | 3 + lib/model/model.go | 3 + 8 files changed, 154 insertions(+), 6 deletions(-) create mode 100644 lib/model/bytesemaphore_test.go diff --git a/gui/default/index.html b/gui/default/index.html index 107d948c4..b35755bac 100644 --- a/gui/default/index.html +++ b/gui/default/index.html @@ -313,6 +313,7 @@ + diff --git a/gui/default/syncthing/core/syncthingController.js b/gui/default/syncthing/core/syncthingController.js index 2af7544e2..6a9ba4646 100755 --- a/gui/default/syncthing/core/syncthingController.js +++ b/gui/default/syncthing/core/syncthingController.js @@ -769,7 +769,7 @@ angular.module('syncthing.core') if (status === 'stopped' || status === 'outofsync' || status === 'error') { return 'danger'; } - if (status === 'unshared') { + if (status === 'unshared' || status === 'scan-waiting') { return 'warning'; } diff --git a/lib/config/optionsconfiguration.go b/lib/config/optionsconfiguration.go index 7c70fc94a..297999487 100644 --- a/lib/config/optionsconfiguration.go +++ b/lib/config/optionsconfiguration.go @@ -51,6 +51,7 @@ type OptionsConfiguration struct { TrafficClass int `xml:"trafficClass" json:"trafficClass"` DefaultFolderPath string `xml:"defaultFolderPath" json:"defaultFolderPath" default:"~"` SetLowPriority bool `xml:"setLowPriority" json:"setLowPriority" default:"true"` + MaxConcurrentScans int `xml:"maxConcurrentScans" json:"maxConcurrentScans"` DeprecatedUPnPEnabled bool `xml:"upnpEnabled,omitempty" json:"-"` DeprecatedUPnPLeaseM int `xml:"upnpLeaseMinutes,omitempty" json:"-"` diff --git a/lib/model/bytesemaphore.go b/lib/model/bytesemaphore.go index 04a00e76f..20ccd6fcb 100644 --- a/lib/model/bytesemaphore.go +++ b/lib/model/bytesemaphore.go @@ -6,7 +6,9 @@ package model -import "sync" +import ( + "sync" +) type byteSemaphore struct { max int @@ -25,26 +27,44 @@ func newByteSemaphore(max int) *byteSemaphore { } func (s *byteSemaphore) take(bytes int) { + s.mut.Lock() if bytes > s.max { bytes = s.max } - s.mut.Lock() for bytes > s.available { s.cond.Wait() + if bytes > s.max { + bytes = s.max + } } s.available -= bytes s.mut.Unlock() } func (s *byteSemaphore) give(bytes int) { + s.mut.Lock() if bytes > s.max { bytes = s.max } - s.mut.Lock() if s.available+bytes > s.max { - panic("bug: can never give more than max") + s.available = s.max + } else { + s.available += bytes + } + s.cond.Broadcast() + s.mut.Unlock() +} + +func (s *byteSemaphore) setCapacity(cap int) { + s.mut.Lock() + diff := cap - s.max + s.max = cap + s.available += diff + if s.available < 0 { + s.available = 0 + } else if s.available > s.max { + s.available = s.max } - s.available += bytes s.cond.Broadcast() s.mut.Unlock() } diff --git a/lib/model/bytesemaphore_test.go b/lib/model/bytesemaphore_test.go new file mode 100644 index 000000000..1efaa0069 --- /dev/null +++ b/lib/model/bytesemaphore_test.go @@ -0,0 +1,113 @@ +// Copyright (C) 2018 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at https://mozilla.org/MPL/2.0/. + +package model + +import "testing" + +func TestZeroByteSempahore(t *testing.T) { + // A semaphore with zero capacity is just a no-op. + + s := newByteSemaphore(0) + + // None of these should block or panic + s.take(123) + s.take(456) + s.give(1 << 30) +} + +func TestByteSempahoreCapChangeUp(t *testing.T) { + // Waiting takes should unblock when the capacity increases + + s := newByteSemaphore(100) + + s.take(75) + if s.available != 25 { + t.Error("bad state after take") + } + + gotit := make(chan struct{}) + go func() { + s.take(75) + close(gotit) + }() + + s.setCapacity(155) + <-gotit + if s.available != 5 { + t.Error("bad state after both takes") + } +} + +func TestByteSempahoreCapChangeDown1(t *testing.T) { + // Things should make sense when capacity is adjusted down + + s := newByteSemaphore(100) + + s.take(75) + if s.available != 25 { + t.Error("bad state after take") + } + + s.setCapacity(90) + if s.available != 15 { + t.Error("bad state after adjust") + } + + s.give(75) + if s.available != 90 { + t.Error("bad state after give") + } +} + +func TestByteSempahoreCapChangeDown2(t *testing.T) { + // Things should make sense when capacity is adjusted down, different case + + s := newByteSemaphore(100) + + s.take(75) + if s.available != 25 { + t.Error("bad state after take") + } + + s.setCapacity(10) + if s.available != 0 { + t.Error("bad state after adjust") + } + + s.give(75) + if s.available != 10 { + t.Error("bad state after give") + } +} + +func TestByteSempahoreGiveMore(t *testing.T) { + // We shouldn't end up with more available than we have capacity... + + s := newByteSemaphore(100) + + s.take(150) + if s.available != 0 { + t.Errorf("bad state after large take") + } + + s.give(150) + if s.available != 100 { + t.Errorf("bad state after large take + give") + } + + s.take(150) + s.setCapacity(125) + // available was zero before, we're increasing capacity by 25 + if s.available != 25 { + t.Errorf("bad state after setcap") + } + + s.give(150) + if s.available != 125 { + t.Errorf("bad state after large take + give with adjustment") + } +} diff --git a/lib/model/folder.go b/lib/model/folder.go index c44a7432c..ca2857d05 100644 --- a/lib/model/folder.go +++ b/lib/model/folder.go @@ -28,6 +28,9 @@ import ( "github.com/syncthing/syncthing/lib/watchaggregator" ) +// scanLimiter limits the number of concurrent scans. A limit of zero means no limit. +var scanLimiter = newByteSemaphore(0) + var errWatchNotStarted = errors.New("not started") type folder struct { @@ -284,6 +287,10 @@ func (f *folder) scanSubdirs(subDirs []string) error { f.model.fmut.RUnlock() mtimefs := fset.MtimeFS() + f.setState(FolderScanWaiting) + scanLimiter.take(1) + defer scanLimiter.give(1) + for i := range subDirs { sub := osutil.NativeFilename(subDirs[i]) diff --git a/lib/model/folderstate.go b/lib/model/folderstate.go index 991e90cda..cd504cb81 100644 --- a/lib/model/folderstate.go +++ b/lib/model/folderstate.go @@ -18,6 +18,7 @@ type folderState int const ( FolderIdle folderState = iota FolderScanning + FolderScanWaiting FolderSyncing FolderError ) @@ -28,6 +29,8 @@ func (s folderState) String() string { return "idle" case FolderScanning: return "scanning" + case FolderScanWaiting: + return "scan-waiting" case FolderSyncing: return "syncing" case FolderError: diff --git a/lib/model/model.go b/lib/model/model.go index 459ff2274..689b9cb6b 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -170,6 +170,7 @@ func NewModel(cfg *config.Wrapper, id protocol.DeviceID, clientName, clientVersi if cfg.Options().ProgressUpdateIntervalS > -1 { go m.progressEmitter.Serve() } + scanLimiter.setCapacity(cfg.Options().MaxConcurrentScans) cfg.Subscribe(m) return m @@ -2586,6 +2587,8 @@ func (m *Model) CommitConfiguration(from, to config.Configuration) bool { } } + scanLimiter.setCapacity(to.Options.MaxConcurrentScans) + // Some options don't require restart as those components handle it fine // by themselves. Compare the options structs containing only the // attributes that require restart and act apprioriately.