From ca3ae64bbf019ee68731dab8796f6a22c83509e9 Mon Sep 17 00:00:00 2001 From: Simon Frei Date: Fri, 15 Feb 2019 00:15:13 +0100 Subject: [PATCH] lib/db: Flush batch based on size and refactor (fixes #5531) (#5536) Flush the batch when exceeding a certain size, instead of when reaching a number of batched operations. Move batch to lowlevel to be able to use it in NamespacedKV. Increase the leveldb memory buffer from 4 to 16 MiB. --- lib/db/blockmap_test.go | 4 ++-- lib/db/instance.go | 6 +++--- lib/db/lowlevel.go | 29 +++++++++++++++++++++++++- lib/db/namespaced.go | 16 +++------------ lib/db/schemaupdater.go | 12 +++++------ lib/db/transactions.go | 45 +++++++++++++---------------------------- 6 files changed, 55 insertions(+), 57 deletions(-) diff --git a/lib/db/blockmap_test.go b/lib/db/blockmap_test.go index f930a251d..98a289191 100644 --- a/lib/db/blockmap_test.go +++ b/lib/db/blockmap_test.go @@ -73,7 +73,7 @@ func addToBlockMap(db *instance, folder []byte, fs []protocol.FileInfo) { name := []byte(f.Name) for i, block := range f.Blocks { binary.BigEndian.PutUint32(blockBuf, uint32(i)) - keyBuf = t.db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name) + keyBuf = t.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name) t.Put(keyBuf, blockBuf) } } @@ -89,7 +89,7 @@ func discardFromBlockMap(db *instance, folder []byte, fs []protocol.FileInfo) { if !ef.IsDirectory() && !ef.IsDeleted() && !ef.IsInvalid() { name := []byte(ef.Name) for _, block := range ef.Blocks { - keyBuf = t.db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name) + keyBuf = t.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name) t.Delete(keyBuf) } } diff --git a/lib/db/instance.go b/lib/db/instance.go index 1cc678eac..d7c7bbbe5 100644 --- a/lib/db/instance.go +++ b/lib/db/instance.go @@ -80,7 +80,7 @@ func (db *instance) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta if ok { if !ef.IsDirectory() && !ef.IsDeleted() && !ef.IsInvalid() { for _, block := range ef.Blocks { - keyBuf = t.db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name) + keyBuf = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name) t.Delete(keyBuf) } } @@ -100,7 +100,7 @@ func (db *instance) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta l.Debugf("insert (local); folder=%q %v", folder, f) t.Put(dk, mustMarshal(&f)) - gk = t.db.keyer.GenerateGlobalVersionKey(gk, folder, []byte(f.Name)) + gk = db.keyer.GenerateGlobalVersionKey(gk, folder, []byte(f.Name)) keyBuf, _ = t.updateGlobal(gk, keyBuf, folder, protocol.LocalDeviceID[:], f, meta) keyBuf = db.keyer.GenerateSequenceKey(keyBuf, folder, f.Sequence) @@ -110,7 +110,7 @@ func (db *instance) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta if !f.IsDirectory() && !f.IsDeleted() && !f.IsInvalid() { for i, block := range f.Blocks { binary.BigEndian.PutUint32(blockBuf, uint32(i)) - keyBuf = t.db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name) + keyBuf = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name) t.Put(keyBuf, blockBuf) } } diff --git a/lib/db/lowlevel.go b/lib/db/lowlevel.go index c114557af..69f1758f1 100644 --- a/lib/db/lowlevel.go +++ b/lib/db/lowlevel.go @@ -19,7 +19,8 @@ import ( const ( dbMaxOpenFiles = 100 - dbWriteBuffer = 4 << 20 + dbWriteBuffer = 16 << 20 + dbFlushBatch = dbWriteBuffer / 4 // Some leeway for any leveldb in-memory optimizations ) // Lowlevel is the lowest level database interface. It has a very simple @@ -127,3 +128,29 @@ func leveldbIsCorrupted(err error) bool { return false } + +type batch struct { + *leveldb.Batch + db *Lowlevel +} + +func (db *Lowlevel) newBatch() *batch { + return &batch{ + Batch: new(leveldb.Batch), + db: db, + } +} + +// checkFlush flushes and resets the batch if its size exceeds dbFlushBatch. +func (b *batch) checkFlush() { + if len(b.Dump()) > dbFlushBatch { + b.flush() + b.Reset() + } +} + +func (b *batch) flush() { + if err := b.db.Write(b.Batch, nil); err != nil { + panic(err) + } +} diff --git a/lib/db/namespaced.go b/lib/db/namespaced.go index fd6432726..5cbe720bb 100644 --- a/lib/db/namespaced.go +++ b/lib/db/namespaced.go @@ -10,7 +10,6 @@ import ( "encoding/binary" "time" - "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/util" ) @@ -39,21 +38,12 @@ func NewNamespacedKV(db *Lowlevel, prefix string) *NamespacedKV { func (n *NamespacedKV) Reset() { it := n.db.NewIterator(util.BytesPrefix(n.prefix), nil) defer it.Release() - batch := new(leveldb.Batch) + batch := n.db.newBatch() for it.Next() { batch.Delete(it.Key()) - if batch.Len() > batchFlushSize { - if err := n.db.Write(batch, nil); err != nil { - panic(err) - } - batch.Reset() - } - } - if batch.Len() > 0 { - if err := n.db.Write(batch, nil); err != nil { - panic(err) - } + batch.checkFlush() } + batch.flush() } // PutInt64 stores a new int64. Any existing value (even if of another type) diff --git a/lib/db/schemaupdater.go b/lib/db/schemaupdater.go index acb6b8920..ccc65254d 100644 --- a/lib/db/schemaupdater.go +++ b/lib/db/schemaupdater.go @@ -104,18 +104,18 @@ func (db *schemaUpdater) updateSchema0to1() { var gk, buf []byte for dbi.Next() { + t.checkFlush() + folder, ok := db.keyer.FolderFromDeviceFileKey(dbi.Key()) if !ok { // not having the folder in the index is bad; delete and continue t.Delete(dbi.Key()) - t.checkFlush() continue } device, ok := db.keyer.DeviceFromDeviceFileKey(dbi.Key()) if !ok { // not having the device in the index is bad; delete and continue t.Delete(dbi.Key()) - t.checkFlush() continue } name := db.keyer.NameFromDeviceFileKey(dbi.Key()) @@ -128,7 +128,6 @@ func (db *schemaUpdater) updateSchema0to1() { gk = db.keyer.GenerateGlobalVersionKey(gk, folder, name) buf = t.removeFromGlobal(gk, buf, folder, device, nil, nil) t.Delete(dbi.Key()) - t.checkFlush() continue } @@ -149,7 +148,6 @@ func (db *schemaUpdater) updateSchema0to1() { panic("can't happen: " + err.Error()) } t.Put(dbi.Key(), bs) - t.checkFlush() symlinkConv++ } @@ -210,7 +208,7 @@ func (db *schemaUpdater) updateSchema2to3() { if !need(f, ok, v) { return true } - nk = t.db.keyer.GenerateNeedFileKey(nk, folder, []byte(f.FileName())) + nk = t.keyer.GenerateNeedFileKey(nk, folder, []byte(f.FileName())) t.Put(nk, nil) t.checkFlush() return true @@ -282,7 +280,7 @@ func (db *schemaUpdater) updateSchema6to7() { svl, err := t.Get(gk, nil) if err != nil { // If there is no global list, we hardly need it. - t.Delete(t.db.keyer.GenerateNeedFileKey(nk, folder, name)) + t.Delete(t.keyer.GenerateNeedFileKey(nk, folder, name)) return true } var fl VersionList @@ -293,7 +291,7 @@ func (db *schemaUpdater) updateSchema6to7() { return true } if localFV, haveLocalFV := fl.Get(protocol.LocalDeviceID[:]); !need(global, haveLocalFV, localFV.Version) { - t.Delete(t.db.keyer.GenerateNeedFileKey(nk, folder, name)) + t.Delete(t.keyer.GenerateNeedFileKey(nk, folder, name)) } return true }) diff --git a/lib/db/transactions.go b/lib/db/transactions.go index 2cd38bd5c..ebbe63636 100644 --- a/lib/db/transactions.go +++ b/lib/db/transactions.go @@ -12,13 +12,10 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) -// Flush batches to disk when they contain this many records. -const batchFlushSize = 64 - // A readOnlyTransaction represents a database snapshot. type readOnlyTransaction struct { *leveldb.Snapshot - db *instance + keyer keyer } func (db *instance) newReadOnlyTransaction() readOnlyTransaction { @@ -28,7 +25,7 @@ func (db *instance) newReadOnlyTransaction() readOnlyTransaction { } return readOnlyTransaction{ Snapshot: snap, - db: db, + keyer: db.keyer, } } @@ -37,7 +34,7 @@ func (t readOnlyTransaction) close() { } func (t readOnlyTransaction) getFile(folder, device, file []byte) (protocol.FileInfo, bool) { - return t.getFileByKey(t.db.keyer.GenerateDeviceFileKey(nil, folder, device, file)) + return t.getFileByKey(t.keyer.GenerateDeviceFileKey(nil, folder, device, file)) } func (t readOnlyTransaction) getFileByKey(key []byte) (protocol.FileInfo, bool) { @@ -65,7 +62,7 @@ func (t readOnlyTransaction) getFileTrunc(key []byte, trunc bool) (FileIntf, boo } func (t readOnlyTransaction) getGlobal(keyBuf, folder, file []byte, truncate bool) ([]byte, FileIntf, bool) { - keyBuf = t.db.keyer.GenerateGlobalVersionKey(keyBuf, folder, file) + keyBuf = t.keyer.GenerateGlobalVersionKey(keyBuf, folder, file) bs, err := t.Get(keyBuf, nil) if err != nil { @@ -77,7 +74,7 @@ func (t readOnlyTransaction) getGlobal(keyBuf, folder, file []byte, truncate boo return keyBuf, nil, false } - keyBuf = t.db.keyer.GenerateDeviceFileKey(keyBuf, folder, vl.Versions[0].Device, file) + keyBuf = t.keyer.GenerateDeviceFileKey(keyBuf, folder, vl.Versions[0].Device, file) if fi, ok := t.getFileTrunc(keyBuf, truncate); ok { return keyBuf, fi, true } @@ -90,14 +87,13 @@ func (t readOnlyTransaction) getGlobal(keyBuf, folder, file []byte, truncate boo // batch size. type readWriteTransaction struct { readOnlyTransaction - *leveldb.Batch + *batch } func (db *instance) newReadWriteTransaction() readWriteTransaction { - t := db.newReadOnlyTransaction() return readWriteTransaction{ - readOnlyTransaction: t, - Batch: new(leveldb.Batch), + readOnlyTransaction: db.newReadOnlyTransaction(), + batch: db.newBatch(), } } @@ -106,19 +102,6 @@ func (t readWriteTransaction) close() { t.readOnlyTransaction.close() } -func (t readWriteTransaction) checkFlush() { - if t.Batch.Len() > batchFlushSize { - t.flush() - t.Batch.Reset() - } -} - -func (t readWriteTransaction) flush() { - if err := t.db.Write(t.Batch, nil); err != nil { - panic(err) - } -} - // updateGlobal adds this device+version to the version list for the given // file. If the device is already present in the list, the version is updated. // If the file does not have an entry in the global list, it is created. @@ -142,7 +125,7 @@ func (t readWriteTransaction) updateGlobal(gk, keyBuf, folder, device []byte, fi // Inserted a new newest version global = file } else { - keyBuf = t.db.keyer.GenerateDeviceFileKey(keyBuf, folder, fl.Versions[0].Device, name) + keyBuf = t.keyer.GenerateDeviceFileKey(keyBuf, folder, fl.Versions[0].Device, name) if new, ok := t.getFileByKey(keyBuf); ok { global = new } else { @@ -167,7 +150,7 @@ func (t readWriteTransaction) updateGlobal(gk, keyBuf, folder, device []byte, fi // The previous newest version is now at index 1 oldGlobalFV = fl.Versions[1] } - keyBuf = t.db.keyer.GenerateDeviceFileKey(keyBuf, folder, oldGlobalFV.Device, name) + keyBuf = t.keyer.GenerateDeviceFileKey(keyBuf, folder, oldGlobalFV.Device, name) if oldFile, ok := t.getFileByKey(keyBuf); ok { // A failure to get the file here is surprising and our // global size data will be incorrect until a restart... @@ -187,7 +170,7 @@ func (t readWriteTransaction) updateGlobal(gk, keyBuf, folder, device []byte, fi // device according to the version list and global FileInfo given and updates // the db accordingly. func (t readWriteTransaction) updateLocalNeed(keyBuf, folder, name []byte, fl VersionList, global protocol.FileInfo) []byte { - keyBuf = t.db.keyer.GenerateNeedFileKey(keyBuf, folder, name) + keyBuf = t.keyer.GenerateNeedFileKey(keyBuf, folder, name) hasNeeded, _ := t.Has(keyBuf, nil) if localFV, haveLocalFV := fl.Get(protocol.LocalDeviceID[:]); need(global, haveLocalFV, localFV.Version) { if !hasNeeded { @@ -246,21 +229,21 @@ func (t readWriteTransaction) removeFromGlobal(gk, keyBuf, folder, device []byte if removedAt == 0 { // A failure to get the file here is surprising and our // global size data will be incorrect until a restart... - keyBuf = t.db.keyer.GenerateDeviceFileKey(keyBuf, folder, device, file) + keyBuf = t.keyer.GenerateDeviceFileKey(keyBuf, folder, device, file) if f, ok := t.getFileByKey(keyBuf); ok { meta.removeFile(protocol.GlobalDeviceID, f) } } if len(fl.Versions) == 0 { - keyBuf = t.db.keyer.GenerateNeedFileKey(keyBuf, folder, file) + keyBuf = t.keyer.GenerateNeedFileKey(keyBuf, folder, file) t.Delete(keyBuf) t.Delete(gk) return keyBuf } if removedAt == 0 { - keyBuf = t.db.keyer.GenerateDeviceFileKey(keyBuf, folder, fl.Versions[0].Device, file) + keyBuf = t.keyer.GenerateDeviceFileKey(keyBuf, folder, fl.Versions[0].Device, file) global, ok := t.getFileByKey(keyBuf) if !ok { panic("This file must exist in the db")