diff --git a/lib/config/optionsconfiguration.go b/lib/config/optionsconfiguration.go index 4d3f08f32..101e00999 100644 --- a/lib/config/optionsconfiguration.go +++ b/lib/config/optionsconfiguration.go @@ -10,6 +10,7 @@ import ( "fmt" "runtime" + "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/rand" "github.com/syncthing/syncthing/lib/util" ) @@ -60,6 +61,7 @@ type OptionsConfiguration struct { StunKeepaliveMinS int `xml:"stunKeepaliveMinS" json:"stunKeepaliveMinS" default:"20"` // 0 for off RawStunServers []string `xml:"stunServer" json:"stunServers" default:"default"` DatabaseTuning Tuning `xml:"databaseTuning" json:"databaseTuning" restart:"true"` + RawMaxCIRequestKiB int `xml:"maxConcurrentIncomingRequestKiB" json:"maxConcurrentIncomingRequestKiB"` DeprecatedUPnPEnabled bool `xml:"upnpEnabled,omitempty" json:"-"` DeprecatedUPnPLeaseM int `xml:"upnpLeaseMinutes,omitempty" json:"-"` @@ -175,3 +177,26 @@ func (opts OptionsConfiguration) MaxFolderConcurrency() int { // of writing is two, 95-percentile at 12 folders.) return 4 // https://xkcd.com/221/ } + +func (opts OptionsConfiguration) MaxConcurrentIncomingRequestKiB() int { + // Negative is disabled, which in limiter land is spelled zero + if opts.RawMaxCIRequestKiB < 0 { + return 0 + } + + if opts.RawMaxFolderConcurrency == 0 { + // The default is 256 MiB + return 256 * 1024 // KiB + } + + // We can't really do less than a couple of concurrent blocks or we'll + // pretty much stall completely. Check that an explicit value is large + // enough. + const minAllowed = 2 * protocol.MaxBlockSize / 1024 + if opts.RawMaxCIRequestKiB < minAllowed { + return minAllowed + } + + // Roll with it. + return opts.RawMaxCIRequestKiB +} diff --git a/lib/model/bytesemaphore.go b/lib/model/bytesemaphore.go index 20ccd6fcb..2ee4c56a9 100644 --- a/lib/model/bytesemaphore.go +++ b/lib/model/bytesemaphore.go @@ -18,6 +18,9 @@ type byteSemaphore struct { } func newByteSemaphore(max int) *byteSemaphore { + if max < 0 { + max = 0 + } s := byteSemaphore{ max: max, available: max, @@ -56,6 +59,9 @@ func (s *byteSemaphore) give(bytes int) { } func (s *byteSemaphore) setCapacity(cap int) { + if cap < 0 { + cap = 0 + } s.mut.Lock() diff := cap - s.max s.max = cap diff --git a/lib/model/folder.go b/lib/model/folder.go index 5fe3cf4bc..3d8b0ef7e 100644 --- a/lib/model/folder.go +++ b/lib/model/folder.go @@ -33,15 +33,12 @@ import ( "github.com/thejerf/suture" ) -// folderIOLimiter limits the number of concurrent I/O heavy operations, -// such as scans and pulls. A limit of zero means no limit. -var folderIOLimiter = newByteSemaphore(0) - type folder struct { suture.Service stateTracker config.FolderConfiguration *stats.FolderStatisticsReference + ioLimiter *byteSemaphore localFlags uint32 @@ -79,11 +76,12 @@ type puller interface { pull() bool // true when successfull and should not be retried } -func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, evLogger events.Logger) folder { +func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, evLogger events.Logger, ioLimiter *byteSemaphore) folder { return folder{ stateTracker: newStateTracker(cfg.ID, evLogger), FolderConfiguration: cfg, FolderStatisticsReference: stats.NewFolderStatisticsReference(model.db, cfg.ID), + ioLimiter: ioLimiter, model: model, shortID: model.shortID, @@ -303,8 +301,8 @@ func (f *folder) pull() bool { f.setState(FolderSyncWaiting) defer f.setState(FolderIdle) - folderIOLimiter.take(1) - defer folderIOLimiter.give(1) + f.ioLimiter.take(1) + defer f.ioLimiter.give(1) return f.puller.pull() } @@ -342,8 +340,8 @@ func (f *folder) scanSubdirs(subDirs []string) error { f.setError(nil) f.setState(FolderScanWaiting) - folderIOLimiter.take(1) - defer folderIOLimiter.give(1) + f.ioLimiter.take(1) + defer f.ioLimiter.give(1) for i := range subDirs { sub := osutil.NativeFilename(subDirs[i]) diff --git a/lib/model/folder_recvonly.go b/lib/model/folder_recvonly.go index 1bc40f980..2639afab2 100644 --- a/lib/model/folder_recvonly.go +++ b/lib/model/folder_recvonly.go @@ -57,8 +57,8 @@ type receiveOnlyFolder struct { *sendReceiveFolder } -func newReceiveOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem, evLogger events.Logger) service { - sr := newSendReceiveFolder(model, fset, ignores, cfg, ver, fs, evLogger).(*sendReceiveFolder) +func newReceiveOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem, evLogger events.Logger, ioLimiter *byteSemaphore) service { + sr := newSendReceiveFolder(model, fset, ignores, cfg, ver, fs, evLogger, ioLimiter).(*sendReceiveFolder) sr.localFlags = protocol.FlagLocalReceiveOnly // gets propagated to the scanner, and set on locally changed files return &receiveOnlyFolder{sr} } diff --git a/lib/model/folder_sendonly.go b/lib/model/folder_sendonly.go index 1696217ae..f66afea63 100644 --- a/lib/model/folder_sendonly.go +++ b/lib/model/folder_sendonly.go @@ -25,9 +25,9 @@ type sendOnlyFolder struct { folder } -func newSendOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, _ versioner.Versioner, _ fs.Filesystem, evLogger events.Logger) service { +func newSendOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, _ versioner.Versioner, _ fs.Filesystem, evLogger events.Logger, ioLimiter *byteSemaphore) service { f := &sendOnlyFolder{ - folder: newFolder(model, fset, ignores, cfg, evLogger), + folder: newFolder(model, fset, ignores, cfg, evLogger, ioLimiter), } f.folder.puller = f f.folder.Service = util.AsService(f.serve, f.String()) diff --git a/lib/model/folder_sendrecv.go b/lib/model/folder_sendrecv.go index 1920acf11..67417f3c6 100644 --- a/lib/model/folder_sendrecv.go +++ b/lib/model/folder_sendrecv.go @@ -109,9 +109,9 @@ type sendReceiveFolder struct { pullErrorsMut sync.Mutex } -func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem, evLogger events.Logger) service { +func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem, evLogger events.Logger, ioLimiter *byteSemaphore) service { f := &sendReceiveFolder{ - folder: newFolder(model, fset, ignores, cfg, evLogger), + folder: newFolder(model, fset, ignores, cfg, evLogger, ioLimiter), fs: fs, versioner: ver, queue: newJobQueue(), diff --git a/lib/model/model.go b/lib/model/model.go index dcffba06f..a7f6222d8 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -121,6 +121,12 @@ type model struct { protectedFiles []string evLogger events.Logger + // globalRequestLimiter limits the amount of data in concurrent incoming requests + globalRequestLimiter *byteSemaphore + // folderIOLimiter limits the number of concurrent I/O heavy operations, + // such as scans and pulls. A limit of zero means no limit. + folderIOLimiter *byteSemaphore + clientName string clientVersion string @@ -145,7 +151,7 @@ type model struct { foldersRunning int32 // for testing only } -type folderFactory func(*model, *db.FileSet, *ignore.Matcher, config.FolderConfiguration, versioner.Versioner, fs.Filesystem, events.Logger) service +type folderFactory func(*model, *db.FileSet, *ignore.Matcher, config.FolderConfiguration, versioner.Versioner, fs.Filesystem, events.Logger, *byteSemaphore) service var ( folderFactories = make(map[config.FolderType]folderFactory) @@ -177,38 +183,39 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio }, PassThroughPanics: true, }), - cfg: cfg, - db: ldb, - finder: db.NewBlockFinder(ldb), - progressEmitter: NewProgressEmitter(cfg, evLogger), - id: id, - shortID: id.Short(), - cacheIgnoredFiles: cfg.Options().CacheIgnoredFiles, - protectedFiles: protectedFiles, - evLogger: evLogger, - clientName: clientName, - clientVersion: clientVersion, - folderCfgs: make(map[string]config.FolderConfiguration), - folderFiles: make(map[string]*db.FileSet), - deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference), - folderIgnores: make(map[string]*ignore.Matcher), - folderRunners: make(map[string]service), - folderRunnerTokens: make(map[string][]suture.ServiceToken), - folderVersioners: make(map[string]versioner.Versioner), - conn: make(map[protocol.DeviceID]connections.Connection), - connRequestLimiters: make(map[protocol.DeviceID]*byteSemaphore), - closed: make(map[protocol.DeviceID]chan struct{}), - helloMessages: make(map[protocol.DeviceID]protocol.HelloResult), - deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState), - remotePausedFolders: make(map[protocol.DeviceID][]string), - fmut: sync.NewRWMutex(), - pmut: sync.NewRWMutex(), + cfg: cfg, + db: ldb, + finder: db.NewBlockFinder(ldb), + progressEmitter: NewProgressEmitter(cfg, evLogger), + id: id, + shortID: id.Short(), + cacheIgnoredFiles: cfg.Options().CacheIgnoredFiles, + protectedFiles: protectedFiles, + evLogger: evLogger, + globalRequestLimiter: newByteSemaphore(1024 * cfg.Options().MaxConcurrentIncomingRequestKiB()), + folderIOLimiter: newByteSemaphore(cfg.Options().MaxFolderConcurrency()), + clientName: clientName, + clientVersion: clientVersion, + folderCfgs: make(map[string]config.FolderConfiguration), + folderFiles: make(map[string]*db.FileSet), + deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference), + folderIgnores: make(map[string]*ignore.Matcher), + folderRunners: make(map[string]service), + folderRunnerTokens: make(map[string][]suture.ServiceToken), + folderVersioners: make(map[string]versioner.Versioner), + conn: make(map[protocol.DeviceID]connections.Connection), + connRequestLimiters: make(map[protocol.DeviceID]*byteSemaphore), + closed: make(map[protocol.DeviceID]chan struct{}), + helloMessages: make(map[protocol.DeviceID]protocol.HelloResult), + deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState), + remotePausedFolders: make(map[protocol.DeviceID][]string), + fmut: sync.NewRWMutex(), + pmut: sync.NewRWMutex(), } for devID := range cfg.Devices() { m.deviceStatRefs[devID] = stats.NewDeviceStatisticsReference(m.db, devID.String()) } m.Add(m.progressEmitter) - folderIOLimiter.setCapacity(cfg.Options().MaxFolderConcurrency()) return m } @@ -340,7 +347,7 @@ func (m *model) startFolderLocked(cfg config.FolderConfiguration) { ignores := m.folderIgnores[folder] - p := folderFactory(m, fset, ignores, cfg, ver, ffs, m.evLogger) + p := folderFactory(m, fset, ignores, cfg, ver, ffs, m.evLogger, m.folderIOLimiter) m.folderRunners[folder] = p @@ -1500,12 +1507,10 @@ func (m *model) Request(deviceID protocol.DeviceID, folder, name string, size in limiter := m.connRequestLimiters[deviceID] m.pmut.RUnlock() - if limiter != nil { - limiter.take(int(size)) - } + // The requestResponse releases the bytes to the buffer pool and the + // limiters when its Close method is called. + res := newLimitedRequestResponse(int(size), limiter, m.globalRequestLimiter) - // The requestResponse releases the bytes to the limiter when its Close method is called. - res := newRequestResponse(int(size)) defer func() { // Close it ourselves if it isn't returned due to an error if err != nil { @@ -1513,13 +1518,6 @@ func (m *model) Request(deviceID protocol.DeviceID, folder, name string, size in } }() - if limiter != nil { - go func() { - res.Wait() - limiter.give(int(size)) - }() - } - // Only check temp files if the flag is set, and if we are set to advertise // the temp indexes. if fromTemporary && !folderCfg.DisableTempIndexes { @@ -1563,6 +1561,32 @@ func (m *model) Request(deviceID protocol.DeviceID, folder, name string, size in return res, nil } +// newLimitedRequestResponse takes size bytes from the limiters in order, +// skipping nil limiters, then returns a requestResponse of the given size. +// When the requestResponse is closed the limiters are given back the bytes, +// in reverse order. +func newLimitedRequestResponse(size int, limiters ...*byteSemaphore) *requestResponse { + for _, limiter := range limiters { + if limiter != nil { + limiter.take(size) + } + } + + res := newRequestResponse(size) + + go func() { + res.Wait() + for i := range limiters { + limiter := limiters[len(limiters)-1-i] + if limiter != nil { + limiter.give(size) + } + } + }() + + return res +} + func (m *model) recheckFile(deviceID protocol.DeviceID, folderFs fs.Filesystem, folder, name string, size int32, offset int64, hash []byte) { cf, ok := m.CurrentFolderFile(folder, name) if !ok { @@ -2483,7 +2507,8 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { } m.fmut.Unlock() - folderIOLimiter.setCapacity(to.Options.MaxFolderConcurrency()) + m.globalRequestLimiter.setCapacity(1024 * to.Options.MaxConcurrentIncomingRequestKiB()) + m.folderIOLimiter.setCapacity(to.Options.MaxFolderConcurrency()) // Some options don't require restart as those components handle it fine // by themselves. Compare the options structs containing only the diff --git a/lib/model/model_test.go b/lib/model/model_test.go index 21b3fbb9b..62d9e1ff6 100644 --- a/lib/model/model_test.go +++ b/lib/model/model_test.go @@ -3451,3 +3451,31 @@ func TestDeviceWasSeen(t *testing.T) { t.Error("device should have been seen now") } } + +func TestNewLimitedRequestResponse(t *testing.T) { + l0 := newByteSemaphore(0) + l1 := newByteSemaphore(1024) + l2 := (*byteSemaphore)(nil) + + // Should take 500 bytes from any non-unlimited non-nil limiters. + res := newLimitedRequestResponse(500, l0, l1, l2) + + if l1.available != 1024-500 { + t.Error("should have taken bytes from limited limiter") + } + + // Closing the result should return the bytes. + res.Close() + + // Try to take 1024 bytes to make sure the bytes were returned. + done := make(chan struct{}) + go func() { + l1.take(1024) + close(done) + }() + select { + case <-done: + case <-time.After(time.Second): + t.Error("Bytes weren't returned in a timely fashion") + } +}