From 6efe521e4446456de329ca1d113608c47a8cb176 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Mon, 3 Nov 2014 22:00:11 -0600 Subject: [PATCH] Update goleveldb --- Godeps/Godeps.json | 2 +- .../src/github.com/calmh/logger/README.md | 1 + .../github.com/syndtr/goleveldb/leveldb/db.go | 31 ++-- .../syndtr/goleveldb/leveldb/db_compaction.go | 27 +++- .../syndtr/goleveldb/leveldb/db_iter.go | 3 +- .../syndtr/goleveldb/leveldb/db_snapshot.go | 66 ++++---- .../syndtr/goleveldb/leveldb/db_test.go | 150 +++++++++++++++++- .../syndtr/goleveldb/leveldb/db_write.go | 12 +- .../syndtr/goleveldb/leveldb/opt/options.go | 2 +- .../syndtr/goleveldb/leveldb/storage_test.go | 17 +- .../syndtr/goleveldb/leveldb/table.go | 54 +------ .../goleveldb/leveldb/table/block_test.go | 27 ++-- .../syndtr/goleveldb/leveldb/table/reader.go | 140 ++++++++-------- .../goleveldb/leveldb/util/buffer_pool.go | 3 + 14 files changed, 334 insertions(+), 201 deletions(-) diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 08db2b4ed..5b8b13c5b 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -56,7 +56,7 @@ }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb", - "Rev": "0d8857b7ec571b0a6c9677d8e6c0a4ceeabd1d71" + "Rev": "cd2b8f743192883ab9fbc5f070ebda1dc90f3732" }, { "ImportPath": "github.com/vitrun/qart/coding", diff --git a/Godeps/_workspace/src/github.com/calmh/logger/README.md b/Godeps/_workspace/src/github.com/calmh/logger/README.md index badb2027c..1abbc9336 100644 --- a/Godeps/_workspace/src/github.com/calmh/logger/README.md +++ b/Godeps/_workspace/src/github.com/calmh/logger/README.md @@ -12,3 +12,4 @@ License ------- MIT + diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go index 979d0ac4a..2f4c92e8d 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go @@ -7,6 +7,7 @@ package leveldb import ( + "container/list" "errors" "fmt" "io" @@ -46,7 +47,7 @@ type DB struct { // Snapshot. snapsMu sync.Mutex - snapsRoot snapshotElement + snapsList *list.List // Stats. aliveSnaps, aliveIters int32 @@ -85,6 +86,8 @@ func openDB(s *session) (*DB, error) { seq: s.stSeq, // MemDB memPool: make(chan *memdb.DB, 1), + // Snapshot + snapsList: list.New(), // Write writeC: make(chan *Batch), writeMergedC: make(chan bool), @@ -103,7 +106,6 @@ func openDB(s *session) (*DB, error) { // Close closeC: make(chan struct{}), } - db.initSnapshot() if err := db.recoverJournal(); err != nil { return nil, err @@ -609,7 +611,9 @@ func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { return } - return db.get(key, db.getSeq(), ro) + se := db.acquireSnapshot() + defer db.releaseSnapshot(se) + return db.get(key, se.seq, ro) } // NewIterator returns an iterator for the latest snapshot of the @@ -633,9 +637,11 @@ func (db *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Itera return iterator.NewEmptyIterator(err) } - snap := db.newSnapshot() - defer snap.Release() - return snap.NewIterator(slice, ro) + se := db.acquireSnapshot() + defer db.releaseSnapshot(se) + // Iterator holds 'version' lock, 'version' is immutable so snapshot + // can be released after iterator created. + return db.newIterator(se.seq, slice, ro) } // GetSnapshot returns a latest snapshot of the underlying DB. A snapshot @@ -655,7 +661,7 @@ func (db *DB) GetSnapshot() (*Snapshot, error) { // // Property names: // leveldb.num-files-at-level{n} -// Returns the number of filer at level 'n'. +// Returns the number of files at level 'n'. // leveldb.stats // Returns statistics of the underlying DB. // leveldb.sstables @@ -685,11 +691,12 @@ func (db *DB) GetProperty(name string) (value string, err error) { v := db.s.version() defer v.release() + numFilesPrefix := "num-files-at-level" switch { - case strings.HasPrefix(p, "num-files-at-level"): + case strings.HasPrefix(p, numFilesPrefix): var level uint var rest string - n, _ := fmt.Scanf("%d%s", &level, &rest) + n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest) if n != 1 || level >= kNumLevels { err = errors.New("leveldb: GetProperty: invalid property: " + name) } else { @@ -796,12 +803,13 @@ func (db *DB) Close() error { default: } + // Signal all goroutines. close(db.closeC) - // Wait for the close WaitGroup. + // Wait for all gorotines to exit. db.closeW.Wait() - // Close journal. + // Lock writer and closes journal. db.writeLockC <- struct{}{} if db.journal != nil { db.journal.Close() @@ -827,7 +835,6 @@ func (db *DB) Close() error { db.journalWriter = nil db.journalFile = nil db.frozenJournalFile = nil - db.snapsRoot = snapshotElement{} db.closer = nil return err diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go index 4c9032084..e0ae721e4 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go @@ -538,6 +538,9 @@ type cIdle struct { } func (r cIdle) ack(err error) { + defer func() { + recover() + }() r.ackC <- err } @@ -548,27 +551,33 @@ type cRange struct { } func (r cRange) ack(err error) { - defer func() { - recover() - }() if r.ackC != nil { + defer func() { + recover() + }() r.ackC <- err } } -func (db *DB) compSendIdle(compC chan<- cCmd) error { +func (db *DB) compSendIdle(compC chan<- cCmd) (err error) { ch := make(chan error) defer close(ch) // Send cmd. select { case compC <- cIdle{ch}: - case err := <-db.compErrC: - return err + case err = <-db.compErrC: + return case _, _ = <-db.closeC: return ErrClosed } // Wait cmd. - return <-ch + select { + case err = <-ch: + case err = <-db.compErrC: + case _, _ = <-db.closeC: + return ErrClosed + } + return err } func (db *DB) compSendRange(compC chan<- cCmd, level int, min, max []byte) (err error) { @@ -584,8 +593,10 @@ func (db *DB) compSendRange(compC chan<- cCmd, level int, min, max []byte) (err } // Wait cmd. select { - case err = <-db.compErrC: case err = <-ch: + case err = <-db.compErrC: + case _, _ = <-db.closeC: + return ErrClosed } return err } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go index 49c44059b..5c36a1937 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go @@ -48,8 +48,7 @@ func (db *DB) newRawIterator(slice *util.Range, ro *opt.ReadOptions) iterator.It i = append(i, fmi) } i = append(i, ti...) - strict := db.s.o.GetStrict(opt.StrictIterator) || ro.GetStrict(opt.StrictIterator) - mi := iterator.NewMergedIterator(i, db.s.icmp, strict) + mi := iterator.NewMergedIterator(i, db.s.icmp, true) mi.SetReleaser(&versionReleaser{v: v}) return mi } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go index e8f50e3b0..e5679d800 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go @@ -7,6 +7,7 @@ package leveldb import ( + "container/list" "runtime" "sync" "sync/atomic" @@ -19,51 +20,41 @@ import ( type snapshotElement struct { seq uint64 ref int - // Next and previous pointers in the doubly-linked list of elements. - next, prev *snapshotElement -} - -// Initialize the snapshot. -func (db *DB) initSnapshot() { - db.snapsRoot.next = &db.snapsRoot - db.snapsRoot.prev = &db.snapsRoot + e *list.Element } // Acquires a snapshot, based on latest sequence. func (db *DB) acquireSnapshot() *snapshotElement { db.snapsMu.Lock() + defer db.snapsMu.Unlock() + seq := db.getSeq() - elem := db.snapsRoot.prev - if elem == &db.snapsRoot || elem.seq != seq { - at := db.snapsRoot.prev - next := at.next - elem = &snapshotElement{ - seq: seq, - prev: at, - next: next, + + if e := db.snapsList.Back(); e != nil { + se := e.Value.(*snapshotElement) + if se.seq == seq { + se.ref++ + return se + } else if seq < se.seq { + panic("leveldb: sequence number is not increasing") } - at.next = elem - next.prev = elem } - elem.ref++ - db.snapsMu.Unlock() - return elem + se := &snapshotElement{seq: seq, ref: 1} + se.e = db.snapsList.PushBack(se) + return se } // Releases given snapshot element. -func (db *DB) releaseSnapshot(elem *snapshotElement) { - if !db.isClosed() { - db.snapsMu.Lock() - elem.ref-- - if elem.ref == 0 { - elem.prev.next = elem.next - elem.next.prev = elem.prev - elem.next = nil - elem.prev = nil - } else if elem.ref < 0 { - panic("leveldb: Snapshot: negative element reference") - } - db.snapsMu.Unlock() +func (db *DB) releaseSnapshot(se *snapshotElement) { + db.snapsMu.Lock() + defer db.snapsMu.Unlock() + + se.ref-- + if se.ref == 0 { + db.snapsList.Remove(se.e) + se.e = nil + } else if se.ref < 0 { + panic("leveldb: Snapshot: negative element reference") } } @@ -71,10 +62,11 @@ func (db *DB) releaseSnapshot(elem *snapshotElement) { func (db *DB) minSeq() uint64 { db.snapsMu.Lock() defer db.snapsMu.Unlock() - elem := db.snapsRoot.next - if elem != &db.snapsRoot { - return elem.seq + + if e := db.snapsList.Front(); e != nil { + return e.Value.(*snapshotElement).seq } + return db.getSeq() } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go index 88da205ff..aa2c8013d 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go @@ -7,6 +7,9 @@ package leveldb import ( + "container/list" + crand "crypto/rand" + "encoding/binary" "fmt" "math/rand" "os" @@ -1126,8 +1129,7 @@ func TestDb_Snapshot(t *testing.T) { } func TestDb_SnapshotList(t *testing.T) { - db := &DB{} - db.initSnapshot() + db := &DB{snapsList: list.New()} e0a := db.acquireSnapshot() e0b := db.acquireSnapshot() db.seq = 1 @@ -1983,7 +1985,6 @@ func TestDb_GoleveldbIssue74(t *testing.T) { t.Fatalf("#%d %d != %d", i, k, n) } } - t.Logf("writer done after %d iterations", i) }() go func() { var i int @@ -2022,3 +2023,146 @@ func TestDb_GoleveldbIssue74(t *testing.T) { }() wg.Wait() } + +func TestDb_GetProperties(t *testing.T) { + h := newDbHarness(t) + defer h.close() + + _, err := h.db.GetProperty("leveldb.num-files-at-level") + if err == nil { + t.Error("GetProperty() failed to detect missing level") + } + + _, err = h.db.GetProperty("leveldb.num-files-at-level0") + if err != nil { + t.Error("got unexpected error", err) + } + + _, err = h.db.GetProperty("leveldb.num-files-at-level0x") + if err == nil { + t.Error("GetProperty() failed to detect invalid level") + } +} + +func TestDb_GoleveldbIssue72and83(t *testing.T) { + h := newDbHarnessWopt(t, &opt.Options{ + WriteBuffer: 1 * opt.MiB, + CachedOpenFiles: 3, + }) + defer h.close() + + const n, wn, dur = 10000, 100, 30 * time.Second + + runtime.GOMAXPROCS(runtime.NumCPU()) + + randomData := func(prefix byte, i int) []byte { + data := make([]byte, 1+4+32+64+32) + _, err := crand.Reader.Read(data[1 : len(data)-4]) + if err != nil { + panic(err) + } + data[0] = prefix + binary.LittleEndian.PutUint32(data[len(data)-4:], uint32(i)) + return data + } + + keys := make([][]byte, n) + for i := range keys { + keys[i] = randomData(1, 0) + } + + until := time.Now().Add(dur) + wg := new(sync.WaitGroup) + wg.Add(3) + var done uint32 + go func() { + i := 0 + defer func() { + t.Logf("WRITER DONE #%d", i) + wg.Done() + }() + + b := new(Batch) + for ; i < wn && atomic.LoadUint32(&done) == 0; i++ { + b.Reset() + for _, k1 := range keys { + k2 := randomData(2, i) + b.Put(k2, randomData(42, i)) + b.Put(k1, k2) + } + if err := h.db.Write(b, h.wo); err != nil { + atomic.StoreUint32(&done, 1) + t.Fatalf("WRITER #%d db.Write: %v", i, err) + } + } + }() + go func() { + var i int + defer func() { + t.Logf("READER0 DONE #%d", i) + atomic.StoreUint32(&done, 1) + wg.Done() + }() + for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ { + snap := h.getSnapshot() + seq := snap.elem.seq + if seq == 0 { + snap.Release() + continue + } + iter := snap.NewIterator(util.BytesPrefix([]byte{1}), nil) + writei := int(snap.elem.seq/(n*2) - 1) + var k int + for ; iter.Next(); k++ { + k1 := iter.Key() + k2 := iter.Value() + kwritei := int(binary.LittleEndian.Uint32(k2[len(k2)-4:])) + if writei != kwritei { + t.Fatalf("READER0 #%d.%d W#%d invalid write iteration num: %d", i, k, writei, kwritei) + } + if _, err := snap.Get(k2, nil); err != nil { + t.Fatalf("READER0 #%d.%d W#%d snap.Get: %v\nk1: %x\n -> k2: %x", i, k, writei, err, k1, k2) + } + } + if err := iter.Error(); err != nil { + t.Fatalf("READER0 #%d.%d W#%d snap.Iterator: %v", i, k, err) + } + iter.Release() + snap.Release() + if k > 0 && k != n { + t.Fatalf("READER0 #%d W#%d short read, got=%d want=%d", i, writei, k, n) + } + } + }() + go func() { + var i int + defer func() { + t.Logf("READER1 DONE #%d", i) + atomic.StoreUint32(&done, 1) + wg.Done() + }() + for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ { + iter := h.db.NewIterator(nil, nil) + seq := iter.(*dbIter).seq + if seq == 0 { + iter.Release() + continue + } + writei := int(seq/(n*2) - 1) + var k int + for ok := iter.Last(); ok; ok = iter.Prev() { + k++ + } + if err := iter.Error(); err != nil { + t.Fatalf("READER1 #%d.%d W#%d db.Iterator: %v", i, k, writei, err) + } + iter.Release() + if m := (writei+1)*n + n; k != m { + t.Fatalf("READER1 #%d W#%d short read, got=%d want=%d", i, writei, k, m) + } + } + }() + + wg.Wait() + +} diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go index 82725a9ee..939d9c3b6 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go @@ -127,21 +127,24 @@ func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) { b.init(wo.GetSync()) // The write happen synchronously. -retry: select { case db.writeC <- b: if <-db.writeMergedC { return <-db.writeAckC } - goto retry case db.writeLockC <- struct{}{}: case _, _ = <-db.closeC: return ErrClosed } merged := 0 + danglingMerge := false defer func() { - <-db.writeLockC + if danglingMerge { + db.writeMergedC <- false + } else { + <-db.writeLockC + } for i := 0; i < merged; i++ { db.writeAckC <- err } @@ -170,7 +173,7 @@ drain: db.writeMergedC <- true merged++ } else { - db.writeMergedC <- false + danglingMerge = true break drain } default: @@ -262,6 +265,7 @@ func (db *DB) CompactRange(r util.Range) error { return err } + // Lock writer. select { case db.writeLockC <- struct{}{}: case _, _ = <-db.closeC: diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go index 2a375ba60..126e2a36a 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go @@ -92,7 +92,7 @@ const ( // DefaultStrict is the default strict flags. Specify any strict flags // will override default strict flags as whole (i.e. not OR'ed). - DefaultStrict = StrictJournalChecksum | StrictBlockChecksum + DefaultStrict = StrictJournalChecksum | StrictIterator | StrictBlockChecksum // NoStrict disables all strict flags. Override default strict flags. NoStrict = ^StrictAll diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage_test.go index 27e76d707..d6628b299 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage_test.go @@ -28,11 +28,12 @@ var ( ) var ( - tsFSEnv = os.Getenv("GOLEVELDB_USEFS") - tsKeepFS = tsFSEnv == "2" - tsFS = tsKeepFS || tsFSEnv == "" || tsFSEnv == "1" - tsMU = &sync.Mutex{} - tsNum = 0 + tsFSEnv = os.Getenv("GOLEVELDB_USEFS") + tsTempdir = os.Getenv("GOLEVELDB_TEMPDIR") + tsKeepFS = tsFSEnv == "2" + tsFS = tsKeepFS || tsFSEnv == "" || tsFSEnv == "1" + tsMU = &sync.Mutex{} + tsNum = 0 ) type tsLock struct { @@ -413,7 +414,11 @@ func newTestStorage(t *testing.T) *testStorage { num := tsNum tsNum++ tsMU.Unlock() - path := filepath.Join(os.TempDir(), fmt.Sprintf("goleveldb-test%d0%d0%d", os.Getuid(), os.Getpid(), num)) + tempdir := tsTempdir + if tempdir == "" { + tempdir = os.TempDir() + } + path := filepath.Join(tempdir, fmt.Sprintf("goleveldb-test%d0%d0%d", os.Getuid(), os.Getpid(), num)) if _, err := os.Stat(path); err != nil { stor, err = storage.OpenFile(path) if err != nil { diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go index 87c4e155a..a1b04d827 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go @@ -7,9 +7,7 @@ package leveldb import ( - "fmt" "sort" - "sync" "sync/atomic" "github.com/syndtr/goleveldb/leveldb/cache" @@ -278,8 +276,6 @@ type tOps struct { cache cache.Cache cacheNS cache.Namespace bpool *util.BufferPool - mu sync.Mutex - closed bool } // Creates an empty table and returns table writer. @@ -326,42 +322,9 @@ func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) { return } -type trWrapper struct { - *table.Reader - t *tOps - ref int -} - -func (w *trWrapper) Release() { - if w.ref != 0 && !w.t.closed { - panic(fmt.Sprintf("BUG: invalid ref %d, refer to issue #72", w.ref)) - } - w.Reader.Release() -} - -type trCacheHandleWrapper struct { - cache.Handle - t *tOps - released bool -} - -func (w *trCacheHandleWrapper) Release() { - w.t.mu.Lock() - defer w.t.mu.Unlock() - - if !w.released { - w.released = true - w.Value().(*trWrapper).ref-- - } - w.Handle.Release() -} - // Opens table. It returns a cache handle, which should // be released after use. func (t *tOps) open(f *tFile) (ch cache.Handle, err error) { - t.mu.Lock() - defer t.mu.Unlock() - num := f.file.Num() ch = t.cacheNS.Get(num, func() (charge int, value interface{}) { var r storage.Reader @@ -374,13 +337,11 @@ func (t *tOps) open(f *tFile) (ch cache.Handle, err error) { if bc := t.s.o.GetBlockCache(); bc != nil { bcacheNS = bc.GetNamespace(num) } - return 1, &trWrapper{table.NewReader(r, int64(f.size), bcacheNS, t.bpool, t.s.o), t, 0} + return 1, table.NewReader(r, int64(f.size), bcacheNS, t.bpool, t.s.o) }) if ch == nil && err == nil { err = ErrClosed } - ch.Value().(*trWrapper).ref++ - ch = &trCacheHandleWrapper{ch, t, false} return } @@ -392,7 +353,7 @@ func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []b return nil, nil, err } defer ch.Release() - return ch.Value().(*trWrapper).Find(key, ro) + return ch.Value().(*table.Reader).Find(key, ro) } // Returns approximate offset of the given key. @@ -402,7 +363,7 @@ func (t *tOps) offsetOf(f *tFile, key []byte) (offset uint64, err error) { return } defer ch.Release() - offset_, err := ch.Value().(*trWrapper).OffsetOf(key) + offset_, err := ch.Value().(*table.Reader).OffsetOf(key) return uint64(offset_), err } @@ -412,7 +373,7 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite if err != nil { return iterator.NewEmptyIterator(err) } - iter := ch.Value().(*trWrapper).NewIterator(slice, ro) + iter := ch.Value().(*table.Reader).NewIterator(slice, ro) iter.SetReleaser(ch) return iter } @@ -420,9 +381,6 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite // Removes table from persistent storage. It waits until // no one use the the table. func (t *tOps) remove(f *tFile) { - t.mu.Lock() - defer t.mu.Unlock() - num := f.file.Num() t.cacheNS.Delete(num, func(exist, pending bool) { if !pending { @@ -441,10 +399,6 @@ func (t *tOps) remove(f *tFile) { // Closes the table ops instance. It will close all tables, // regadless still used or not. func (t *tOps) close() { - t.mu.Lock() - defer t.mu.Unlock() - - t.closed = true t.cache.Zap() t.bpool.Close() } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/block_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/block_test.go index f3b53c093..e583d9a81 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/block_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/block_test.go @@ -19,13 +19,18 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) -func (b *block) TestNewIterator(slice *util.Range) iterator.Iterator { - return b.newIterator(slice, false, nil) +type blockTesting struct { + tr *Reader + b *block +} + +func (t *blockTesting) TestNewIterator(slice *util.Range) iterator.Iterator { + return t.tr.newBlockIter(t.b, nil, slice, false) } var _ = testutil.Defer(func() { Describe("Block", func() { - Build := func(kv *testutil.KeyValue, restartInterval int) *block { + Build := func(kv *testutil.KeyValue, restartInterval int) *blockTesting { // Building the block. bw := &blockWriter{ restartInterval: restartInterval, @@ -39,11 +44,13 @@ var _ = testutil.Defer(func() { // Opening the block. data := bw.buf.Bytes() restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:])) - return &block{ - tr: &Reader{cmp: comparer.DefaultComparer}, - data: data, - restartsLen: restartsLen, - restartsOffset: len(data) - (restartsLen+1)*4, + return &blockTesting{ + tr: &Reader{cmp: comparer.DefaultComparer}, + b: &block{ + data: data, + restartsLen: restartsLen, + restartsOffset: len(data) - (restartsLen+1)*4, + }, } } @@ -102,11 +109,11 @@ var _ = testutil.Defer(func() { for restartInterval := 1; restartInterval <= 5; restartInterval++ { Describe(fmt.Sprintf("with restart interval of %d", restartInterval), func() { // Make block. - br := Build(kv, restartInterval) + bt := Build(kv, restartInterval) Test := func(r *util.Range) func(done Done) { return func(done Done) { - iter := br.newIterator(r, false, nil) + iter := bt.TestNewIterator(r) Expect(iter.Error()).ShouldNot(HaveOccurred()) t := testutil.IteratorTesting{ diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go index ab62c44ed..5efd70b00 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go @@ -39,7 +39,7 @@ func max(x, y int) int { } type block struct { - tr *Reader + bpool *util.BufferPool data []byte restartsLen int restartsOffset int @@ -47,14 +47,14 @@ type block struct { checksum bool } -func (b *block) seek(rstart, rlimit int, key []byte) (index, offset int, err error) { +func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) { index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool { offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) offset += 1 // shared always zero, since this is a restart point v1, n1 := binary.Uvarint(b.data[offset:]) // key length _, n2 := binary.Uvarint(b.data[offset+n1:]) // value length m := offset + n1 + n2 - return b.tr.cmp.Compare(b.data[m:m+int(v1)], key) > 0 + return cmp.Compare(b.data[m:m+int(v1)], key) > 0 }) + rstart - 1 if index < rstart { // The smallest key is greater-than key sought. @@ -96,48 +96,9 @@ func (b *block) entry(offset int) (key, value []byte, nShared, n int, err error) return } -func (b *block) newIterator(slice *util.Range, inclLimit bool, cache util.Releaser) *blockIter { - bi := &blockIter{ - block: b, - cache: cache, - // Valid key should never be nil. - key: make([]byte, 0), - dir: dirSOI, - riStart: 0, - riLimit: b.restartsLen, - offsetStart: 0, - offsetRealStart: 0, - offsetLimit: b.restartsOffset, - } - if slice != nil { - if slice.Start != nil { - if bi.Seek(slice.Start) { - bi.riStart = b.restartIndex(bi.restartIndex, b.restartsLen, bi.prevOffset) - bi.offsetStart = b.restartOffset(bi.riStart) - bi.offsetRealStart = bi.prevOffset - } else { - bi.riStart = b.restartsLen - bi.offsetStart = b.restartsOffset - bi.offsetRealStart = b.restartsOffset - } - } - if slice.Limit != nil { - if bi.Seek(slice.Limit) && (!inclLimit || bi.Next()) { - bi.offsetLimit = bi.prevOffset - bi.riLimit = bi.restartIndex + 1 - } - } - bi.reset() - if bi.offsetStart > bi.offsetLimit { - bi.sErr(errors.New("leveldb/table: Reader: invalid slice range")) - } - } - return bi -} - func (b *block) Release() { - b.tr.bpool.Put(b.data) - b.tr = nil + b.bpool.Put(b.data) + b.bpool = nil b.data = nil } @@ -152,10 +113,12 @@ const ( ) type blockIter struct { - block *block - cache, releaser util.Releaser - key, value []byte - offset int + tr *Reader + block *block + blockReleaser util.Releaser + releaser util.Releaser + key, value []byte + offset int // Previous offset, only filled by Next. prevOffset int prevNode []int @@ -252,7 +215,7 @@ func (i *blockIter) Seek(key []byte) bool { return false } - ri, offset, err := i.block.seek(i.riStart, i.riLimit, key) + ri, offset, err := i.block.seek(i.tr.cmp, i.riStart, i.riLimit, key) if err != nil { i.sErr(err) return false @@ -263,7 +226,7 @@ func (i *blockIter) Seek(key []byte) bool { i.dir = dirForward } for i.Next() { - if i.block.tr.cmp.Compare(i.key, key) >= 0 { + if i.tr.cmp.Compare(i.key, key) >= 0 { return true } } @@ -440,15 +403,16 @@ func (i *blockIter) Value() []byte { func (i *blockIter) Release() { if i.dir != dirReleased { + i.tr = nil i.block = nil i.prevNode = nil i.prevKeys = nil i.key = nil i.value = nil i.dir = dirReleased - if i.cache != nil { - i.cache.Release() - i.cache = nil + if i.blockReleaser != nil { + i.blockReleaser.Release() + i.blockReleaser = nil } if i.releaser != nil { i.releaser.Release() @@ -476,21 +440,21 @@ func (i *blockIter) Error() error { } type filterBlock struct { - tr *Reader + bpool *util.BufferPool data []byte oOffset int baseLg uint filtersNum int } -func (b *filterBlock) contains(offset uint64, key []byte) bool { +func (b *filterBlock) contains(filter filter.Filter, offset uint64, key []byte) bool { i := int(offset >> b.baseLg) if i < b.filtersNum { o := b.data[b.oOffset+i*4:] n := int(binary.LittleEndian.Uint32(o)) m := int(binary.LittleEndian.Uint32(o[4:])) if n < m && m <= b.oOffset { - return b.tr.filter.Contains(b.data[n:m], key) + return filter.Contains(b.data[n:m], key) } else if n == m { return false } @@ -499,13 +463,14 @@ func (b *filterBlock) contains(offset uint64, key []byte) bool { } func (b *filterBlock) Release() { - b.tr.bpool.Put(b.data) - b.tr = nil + b.bpool.Put(b.data) + b.bpool = nil b.data = nil } type indexIter struct { *blockIter + tr *Reader slice *util.Range // Options checksum bool @@ -526,7 +491,7 @@ func (i *indexIter) Get() iterator.Iterator { if i.slice != nil && (i.blockIter.isFirst() || i.blockIter.isLast()) { slice = i.slice } - return i.blockIter.block.tr.getDataIterErr(dataBH, slice, i.checksum, i.fillCache) + return i.tr.getDataIterErr(dataBH, slice, i.checksum, i.fillCache) } // Reader is a table reader. @@ -594,7 +559,7 @@ func (r *Reader) readBlock(bh blockHandle, checksum bool) (*block, error) { } restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:])) b := &block{ - tr: r, + bpool: r.bpool, data: data, restartsLen: restartsLen, restartsOffset: len(data) - (restartsLen+1)*4, @@ -655,7 +620,7 @@ func (r *Reader) readFilterBlock(bh blockHandle) (*filterBlock, error) { return nil, errors.New("leveldb/table: Reader: invalid filter block (invalid offset)") } b := &filterBlock{ - tr: r, + bpool: r.bpool, data: data, oOffset: oOffset, baseLg: uint(data[n-1]), @@ -713,7 +678,7 @@ func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fi if err != nil { return iterator.NewEmptyIterator(err) } - return b.newIterator(slice, false, rel) + return r.newBlockIter(b, rel, slice, false) } func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, checksum, fillCache bool) iterator.Iterator { @@ -727,6 +692,46 @@ func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, checksum, return r.getDataIter(dataBH, slice, checksum, fillCache) } +func (r *Reader) newBlockIter(b *block, bReleaser util.Releaser, slice *util.Range, inclLimit bool) *blockIter { + bi := &blockIter{ + tr: r, + block: b, + blockReleaser: bReleaser, + // Valid key should never be nil. + key: make([]byte, 0), + dir: dirSOI, + riStart: 0, + riLimit: b.restartsLen, + offsetStart: 0, + offsetRealStart: 0, + offsetLimit: b.restartsOffset, + } + if slice != nil { + if slice.Start != nil { + if bi.Seek(slice.Start) { + bi.riStart = b.restartIndex(bi.restartIndex, b.restartsLen, bi.prevOffset) + bi.offsetStart = b.restartOffset(bi.riStart) + bi.offsetRealStart = bi.prevOffset + } else { + bi.riStart = b.restartsLen + bi.offsetStart = b.restartsOffset + bi.offsetRealStart = b.restartsOffset + } + } + if slice.Limit != nil { + if bi.Seek(slice.Limit) && (!inclLimit || bi.Next()) { + bi.offsetLimit = bi.prevOffset + bi.riLimit = bi.restartIndex + 1 + } + } + bi.reset() + if bi.offsetStart > bi.offsetLimit { + bi.sErr(errors.New("leveldb/table: Reader: invalid slice range")) + } + } + return bi +} + // NewIterator creates an iterator from the table. // // Slice allows slicing the iterator to only contains keys in the given @@ -752,7 +757,8 @@ func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.It return iterator.NewEmptyIterator(err) } index := &indexIter{ - blockIter: indexBlock.newIterator(slice, true, rel), + blockIter: r.newBlockIter(indexBlock, rel, slice, true), + tr: r, slice: slice, checksum: ro.GetStrict(opt.StrictBlockChecksum), fillCache: !ro.GetDontFillCache(), @@ -781,7 +787,7 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err } defer rel.Release() - index := indexBlock.newIterator(nil, true, nil) + index := r.newBlockIter(indexBlock, nil, nil, true) defer index.Release() if !index.Seek(key) { err = index.Error() @@ -798,7 +804,7 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err if r.filter != nil { filterBlock, rel, ferr := r.getFilterBlock(true) if ferr == nil { - if !filterBlock.contains(dataBH.offset, key) { + if !filterBlock.contains(r.filter, dataBH.offset, key) { rel.Release() return nil, nil, ErrNotFound } @@ -866,7 +872,7 @@ func (r *Reader) OffsetOf(key []byte) (offset int64, err error) { } defer rel.Release() - index := indexBlock.newIterator(nil, true, nil) + index := r.newBlockIter(indexBlock, nil, nil, true) defer index.Release() if index.Seek(key) { dataBH, n := decodeBlockHandle(index.Value()) @@ -956,7 +962,7 @@ func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, bpool *util.Buf } // Set data end. r.dataEnd = int64(metaBH.offset) - metaIter := metaBlock.newIterator(nil, false, nil) + metaIter := r.newBlockIter(metaBlock, nil, nil, false) for metaIter.Next() { key := string(metaIter.Key()) if !strings.HasPrefix(key, "filter.") { diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go index aea39dca8..d9509d2af 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go @@ -156,6 +156,9 @@ func (p *BufferPool) Put(b []byte) { atomic.AddUint32(&p.put, 1) pool := p.pool[p.poolNum(cap(b))] + defer func() { + recover() + }() select { case pool <- b: default: