Update goleveldb

This commit is contained in:
Jakob Borg 2014-11-03 22:00:11 -06:00
parent bccd21ac14
commit 6efe521e44
14 changed files with 334 additions and 201 deletions

2
Godeps/Godeps.json generated
View File

@ -56,7 +56,7 @@
}, },
{ {
"ImportPath": "github.com/syndtr/goleveldb/leveldb", "ImportPath": "github.com/syndtr/goleveldb/leveldb",
"Rev": "0d8857b7ec571b0a6c9677d8e6c0a4ceeabd1d71" "Rev": "cd2b8f743192883ab9fbc5f070ebda1dc90f3732"
}, },
{ {
"ImportPath": "github.com/vitrun/qart/coding", "ImportPath": "github.com/vitrun/qart/coding",

View File

@ -12,3 +12,4 @@ License
------- -------
MIT MIT

View File

@ -7,6 +7,7 @@
package leveldb package leveldb
import ( import (
"container/list"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -46,7 +47,7 @@ type DB struct {
// Snapshot. // Snapshot.
snapsMu sync.Mutex snapsMu sync.Mutex
snapsRoot snapshotElement snapsList *list.List
// Stats. // Stats.
aliveSnaps, aliveIters int32 aliveSnaps, aliveIters int32
@ -85,6 +86,8 @@ func openDB(s *session) (*DB, error) {
seq: s.stSeq, seq: s.stSeq,
// MemDB // MemDB
memPool: make(chan *memdb.DB, 1), memPool: make(chan *memdb.DB, 1),
// Snapshot
snapsList: list.New(),
// Write // Write
writeC: make(chan *Batch), writeC: make(chan *Batch),
writeMergedC: make(chan bool), writeMergedC: make(chan bool),
@ -103,7 +106,6 @@ func openDB(s *session) (*DB, error) {
// Close // Close
closeC: make(chan struct{}), closeC: make(chan struct{}),
} }
db.initSnapshot()
if err := db.recoverJournal(); err != nil { if err := db.recoverJournal(); err != nil {
return nil, err return nil, err
@ -609,7 +611,9 @@ func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
return return
} }
return db.get(key, db.getSeq(), ro) se := db.acquireSnapshot()
defer db.releaseSnapshot(se)
return db.get(key, se.seq, ro)
} }
// NewIterator returns an iterator for the latest snapshot of the // NewIterator returns an iterator for the latest snapshot of the
@ -633,9 +637,11 @@ func (db *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Itera
return iterator.NewEmptyIterator(err) return iterator.NewEmptyIterator(err)
} }
snap := db.newSnapshot() se := db.acquireSnapshot()
defer snap.Release() defer db.releaseSnapshot(se)
return snap.NewIterator(slice, ro) // Iterator holds 'version' lock, 'version' is immutable so snapshot
// can be released after iterator created.
return db.newIterator(se.seq, slice, ro)
} }
// GetSnapshot returns a latest snapshot of the underlying DB. A snapshot // GetSnapshot returns a latest snapshot of the underlying DB. A snapshot
@ -655,7 +661,7 @@ func (db *DB) GetSnapshot() (*Snapshot, error) {
// //
// Property names: // Property names:
// leveldb.num-files-at-level{n} // leveldb.num-files-at-level{n}
// Returns the number of filer at level 'n'. // Returns the number of files at level 'n'.
// leveldb.stats // leveldb.stats
// Returns statistics of the underlying DB. // Returns statistics of the underlying DB.
// leveldb.sstables // leveldb.sstables
@ -685,11 +691,12 @@ func (db *DB) GetProperty(name string) (value string, err error) {
v := db.s.version() v := db.s.version()
defer v.release() defer v.release()
numFilesPrefix := "num-files-at-level"
switch { switch {
case strings.HasPrefix(p, "num-files-at-level"): case strings.HasPrefix(p, numFilesPrefix):
var level uint var level uint
var rest string var rest string
n, _ := fmt.Scanf("%d%s", &level, &rest) n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest)
if n != 1 || level >= kNumLevels { if n != 1 || level >= kNumLevels {
err = errors.New("leveldb: GetProperty: invalid property: " + name) err = errors.New("leveldb: GetProperty: invalid property: " + name)
} else { } else {
@ -796,12 +803,13 @@ func (db *DB) Close() error {
default: default:
} }
// Signal all goroutines.
close(db.closeC) close(db.closeC)
// Wait for the close WaitGroup. // Wait for all gorotines to exit.
db.closeW.Wait() db.closeW.Wait()
// Close journal. // Lock writer and closes journal.
db.writeLockC <- struct{}{} db.writeLockC <- struct{}{}
if db.journal != nil { if db.journal != nil {
db.journal.Close() db.journal.Close()
@ -827,7 +835,6 @@ func (db *DB) Close() error {
db.journalWriter = nil db.journalWriter = nil
db.journalFile = nil db.journalFile = nil
db.frozenJournalFile = nil db.frozenJournalFile = nil
db.snapsRoot = snapshotElement{}
db.closer = nil db.closer = nil
return err return err

View File

@ -538,6 +538,9 @@ type cIdle struct {
} }
func (r cIdle) ack(err error) { func (r cIdle) ack(err error) {
defer func() {
recover()
}()
r.ackC <- err r.ackC <- err
} }
@ -548,27 +551,33 @@ type cRange struct {
} }
func (r cRange) ack(err error) { func (r cRange) ack(err error) {
if r.ackC != nil {
defer func() { defer func() {
recover() recover()
}() }()
if r.ackC != nil {
r.ackC <- err r.ackC <- err
} }
} }
func (db *DB) compSendIdle(compC chan<- cCmd) error { func (db *DB) compSendIdle(compC chan<- cCmd) (err error) {
ch := make(chan error) ch := make(chan error)
defer close(ch) defer close(ch)
// Send cmd. // Send cmd.
select { select {
case compC <- cIdle{ch}: case compC <- cIdle{ch}:
case err := <-db.compErrC: case err = <-db.compErrC:
return err return
case _, _ = <-db.closeC: case _, _ = <-db.closeC:
return ErrClosed return ErrClosed
} }
// Wait cmd. // Wait cmd.
return <-ch select {
case err = <-ch:
case err = <-db.compErrC:
case _, _ = <-db.closeC:
return ErrClosed
}
return err
} }
func (db *DB) compSendRange(compC chan<- cCmd, level int, min, max []byte) (err error) { func (db *DB) compSendRange(compC chan<- cCmd, level int, min, max []byte) (err error) {
@ -584,8 +593,10 @@ func (db *DB) compSendRange(compC chan<- cCmd, level int, min, max []byte) (err
} }
// Wait cmd. // Wait cmd.
select { select {
case err = <-db.compErrC:
case err = <-ch: case err = <-ch:
case err = <-db.compErrC:
case _, _ = <-db.closeC:
return ErrClosed
} }
return err return err
} }

View File

@ -48,8 +48,7 @@ func (db *DB) newRawIterator(slice *util.Range, ro *opt.ReadOptions) iterator.It
i = append(i, fmi) i = append(i, fmi)
} }
i = append(i, ti...) i = append(i, ti...)
strict := db.s.o.GetStrict(opt.StrictIterator) || ro.GetStrict(opt.StrictIterator) mi := iterator.NewMergedIterator(i, db.s.icmp, true)
mi := iterator.NewMergedIterator(i, db.s.icmp, strict)
mi.SetReleaser(&versionReleaser{v: v}) mi.SetReleaser(&versionReleaser{v: v})
return mi return mi
} }

View File

@ -7,6 +7,7 @@
package leveldb package leveldb
import ( import (
"container/list"
"runtime" "runtime"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -19,62 +20,53 @@ import (
type snapshotElement struct { type snapshotElement struct {
seq uint64 seq uint64
ref int ref int
// Next and previous pointers in the doubly-linked list of elements. e *list.Element
next, prev *snapshotElement
}
// Initialize the snapshot.
func (db *DB) initSnapshot() {
db.snapsRoot.next = &db.snapsRoot
db.snapsRoot.prev = &db.snapsRoot
} }
// Acquires a snapshot, based on latest sequence. // Acquires a snapshot, based on latest sequence.
func (db *DB) acquireSnapshot() *snapshotElement { func (db *DB) acquireSnapshot() *snapshotElement {
db.snapsMu.Lock() db.snapsMu.Lock()
defer db.snapsMu.Unlock()
seq := db.getSeq() seq := db.getSeq()
elem := db.snapsRoot.prev
if elem == &db.snapsRoot || elem.seq != seq { if e := db.snapsList.Back(); e != nil {
at := db.snapsRoot.prev se := e.Value.(*snapshotElement)
next := at.next if se.seq == seq {
elem = &snapshotElement{ se.ref++
seq: seq, return se
prev: at, } else if seq < se.seq {
next: next, panic("leveldb: sequence number is not increasing")
} }
at.next = elem
next.prev = elem
} }
elem.ref++ se := &snapshotElement{seq: seq, ref: 1}
db.snapsMu.Unlock() se.e = db.snapsList.PushBack(se)
return elem return se
} }
// Releases given snapshot element. // Releases given snapshot element.
func (db *DB) releaseSnapshot(elem *snapshotElement) { func (db *DB) releaseSnapshot(se *snapshotElement) {
if !db.isClosed() {
db.snapsMu.Lock() db.snapsMu.Lock()
elem.ref-- defer db.snapsMu.Unlock()
if elem.ref == 0 {
elem.prev.next = elem.next se.ref--
elem.next.prev = elem.prev if se.ref == 0 {
elem.next = nil db.snapsList.Remove(se.e)
elem.prev = nil se.e = nil
} else if elem.ref < 0 { } else if se.ref < 0 {
panic("leveldb: Snapshot: negative element reference") panic("leveldb: Snapshot: negative element reference")
} }
db.snapsMu.Unlock()
}
} }
// Gets minimum sequence that not being snapshoted. // Gets minimum sequence that not being snapshoted.
func (db *DB) minSeq() uint64 { func (db *DB) minSeq() uint64 {
db.snapsMu.Lock() db.snapsMu.Lock()
defer db.snapsMu.Unlock() defer db.snapsMu.Unlock()
elem := db.snapsRoot.next
if elem != &db.snapsRoot { if e := db.snapsList.Front(); e != nil {
return elem.seq return e.Value.(*snapshotElement).seq
} }
return db.getSeq() return db.getSeq()
} }

View File

@ -7,6 +7,9 @@
package leveldb package leveldb
import ( import (
"container/list"
crand "crypto/rand"
"encoding/binary"
"fmt" "fmt"
"math/rand" "math/rand"
"os" "os"
@ -1126,8 +1129,7 @@ func TestDb_Snapshot(t *testing.T) {
} }
func TestDb_SnapshotList(t *testing.T) { func TestDb_SnapshotList(t *testing.T) {
db := &DB{} db := &DB{snapsList: list.New()}
db.initSnapshot()
e0a := db.acquireSnapshot() e0a := db.acquireSnapshot()
e0b := db.acquireSnapshot() e0b := db.acquireSnapshot()
db.seq = 1 db.seq = 1
@ -1983,7 +1985,6 @@ func TestDb_GoleveldbIssue74(t *testing.T) {
t.Fatalf("#%d %d != %d", i, k, n) t.Fatalf("#%d %d != %d", i, k, n)
} }
} }
t.Logf("writer done after %d iterations", i)
}() }()
go func() { go func() {
var i int var i int
@ -2022,3 +2023,146 @@ func TestDb_GoleveldbIssue74(t *testing.T) {
}() }()
wg.Wait() wg.Wait()
} }
func TestDb_GetProperties(t *testing.T) {
h := newDbHarness(t)
defer h.close()
_, err := h.db.GetProperty("leveldb.num-files-at-level")
if err == nil {
t.Error("GetProperty() failed to detect missing level")
}
_, err = h.db.GetProperty("leveldb.num-files-at-level0")
if err != nil {
t.Error("got unexpected error", err)
}
_, err = h.db.GetProperty("leveldb.num-files-at-level0x")
if err == nil {
t.Error("GetProperty() failed to detect invalid level")
}
}
func TestDb_GoleveldbIssue72and83(t *testing.T) {
h := newDbHarnessWopt(t, &opt.Options{
WriteBuffer: 1 * opt.MiB,
CachedOpenFiles: 3,
})
defer h.close()
const n, wn, dur = 10000, 100, 30 * time.Second
runtime.GOMAXPROCS(runtime.NumCPU())
randomData := func(prefix byte, i int) []byte {
data := make([]byte, 1+4+32+64+32)
_, err := crand.Reader.Read(data[1 : len(data)-4])
if err != nil {
panic(err)
}
data[0] = prefix
binary.LittleEndian.PutUint32(data[len(data)-4:], uint32(i))
return data
}
keys := make([][]byte, n)
for i := range keys {
keys[i] = randomData(1, 0)
}
until := time.Now().Add(dur)
wg := new(sync.WaitGroup)
wg.Add(3)
var done uint32
go func() {
i := 0
defer func() {
t.Logf("WRITER DONE #%d", i)
wg.Done()
}()
b := new(Batch)
for ; i < wn && atomic.LoadUint32(&done) == 0; i++ {
b.Reset()
for _, k1 := range keys {
k2 := randomData(2, i)
b.Put(k2, randomData(42, i))
b.Put(k1, k2)
}
if err := h.db.Write(b, h.wo); err != nil {
atomic.StoreUint32(&done, 1)
t.Fatalf("WRITER #%d db.Write: %v", i, err)
}
}
}()
go func() {
var i int
defer func() {
t.Logf("READER0 DONE #%d", i)
atomic.StoreUint32(&done, 1)
wg.Done()
}()
for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
snap := h.getSnapshot()
seq := snap.elem.seq
if seq == 0 {
snap.Release()
continue
}
iter := snap.NewIterator(util.BytesPrefix([]byte{1}), nil)
writei := int(snap.elem.seq/(n*2) - 1)
var k int
for ; iter.Next(); k++ {
k1 := iter.Key()
k2 := iter.Value()
kwritei := int(binary.LittleEndian.Uint32(k2[len(k2)-4:]))
if writei != kwritei {
t.Fatalf("READER0 #%d.%d W#%d invalid write iteration num: %d", i, k, writei, kwritei)
}
if _, err := snap.Get(k2, nil); err != nil {
t.Fatalf("READER0 #%d.%d W#%d snap.Get: %v\nk1: %x\n -> k2: %x", i, k, writei, err, k1, k2)
}
}
if err := iter.Error(); err != nil {
t.Fatalf("READER0 #%d.%d W#%d snap.Iterator: %v", i, k, err)
}
iter.Release()
snap.Release()
if k > 0 && k != n {
t.Fatalf("READER0 #%d W#%d short read, got=%d want=%d", i, writei, k, n)
}
}
}()
go func() {
var i int
defer func() {
t.Logf("READER1 DONE #%d", i)
atomic.StoreUint32(&done, 1)
wg.Done()
}()
for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
iter := h.db.NewIterator(nil, nil)
seq := iter.(*dbIter).seq
if seq == 0 {
iter.Release()
continue
}
writei := int(seq/(n*2) - 1)
var k int
for ok := iter.Last(); ok; ok = iter.Prev() {
k++
}
if err := iter.Error(); err != nil {
t.Fatalf("READER1 #%d.%d W#%d db.Iterator: %v", i, k, writei, err)
}
iter.Release()
if m := (writei+1)*n + n; k != m {
t.Fatalf("READER1 #%d W#%d short read, got=%d want=%d", i, writei, k, m)
}
}
}()
wg.Wait()
}

View File

@ -127,21 +127,24 @@ func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) {
b.init(wo.GetSync()) b.init(wo.GetSync())
// The write happen synchronously. // The write happen synchronously.
retry:
select { select {
case db.writeC <- b: case db.writeC <- b:
if <-db.writeMergedC { if <-db.writeMergedC {
return <-db.writeAckC return <-db.writeAckC
} }
goto retry
case db.writeLockC <- struct{}{}: case db.writeLockC <- struct{}{}:
case _, _ = <-db.closeC: case _, _ = <-db.closeC:
return ErrClosed return ErrClosed
} }
merged := 0 merged := 0
danglingMerge := false
defer func() { defer func() {
if danglingMerge {
db.writeMergedC <- false
} else {
<-db.writeLockC <-db.writeLockC
}
for i := 0; i < merged; i++ { for i := 0; i < merged; i++ {
db.writeAckC <- err db.writeAckC <- err
} }
@ -170,7 +173,7 @@ drain:
db.writeMergedC <- true db.writeMergedC <- true
merged++ merged++
} else { } else {
db.writeMergedC <- false danglingMerge = true
break drain break drain
} }
default: default:
@ -262,6 +265,7 @@ func (db *DB) CompactRange(r util.Range) error {
return err return err
} }
// Lock writer.
select { select {
case db.writeLockC <- struct{}{}: case db.writeLockC <- struct{}{}:
case _, _ = <-db.closeC: case _, _ = <-db.closeC:

View File

@ -92,7 +92,7 @@ const (
// DefaultStrict is the default strict flags. Specify any strict flags // DefaultStrict is the default strict flags. Specify any strict flags
// will override default strict flags as whole (i.e. not OR'ed). // will override default strict flags as whole (i.e. not OR'ed).
DefaultStrict = StrictJournalChecksum | StrictBlockChecksum DefaultStrict = StrictJournalChecksum | StrictIterator | StrictBlockChecksum
// NoStrict disables all strict flags. Override default strict flags. // NoStrict disables all strict flags. Override default strict flags.
NoStrict = ^StrictAll NoStrict = ^StrictAll

View File

@ -29,6 +29,7 @@ var (
var ( var (
tsFSEnv = os.Getenv("GOLEVELDB_USEFS") tsFSEnv = os.Getenv("GOLEVELDB_USEFS")
tsTempdir = os.Getenv("GOLEVELDB_TEMPDIR")
tsKeepFS = tsFSEnv == "2" tsKeepFS = tsFSEnv == "2"
tsFS = tsKeepFS || tsFSEnv == "" || tsFSEnv == "1" tsFS = tsKeepFS || tsFSEnv == "" || tsFSEnv == "1"
tsMU = &sync.Mutex{} tsMU = &sync.Mutex{}
@ -413,7 +414,11 @@ func newTestStorage(t *testing.T) *testStorage {
num := tsNum num := tsNum
tsNum++ tsNum++
tsMU.Unlock() tsMU.Unlock()
path := filepath.Join(os.TempDir(), fmt.Sprintf("goleveldb-test%d0%d0%d", os.Getuid(), os.Getpid(), num)) tempdir := tsTempdir
if tempdir == "" {
tempdir = os.TempDir()
}
path := filepath.Join(tempdir, fmt.Sprintf("goleveldb-test%d0%d0%d", os.Getuid(), os.Getpid(), num))
if _, err := os.Stat(path); err != nil { if _, err := os.Stat(path); err != nil {
stor, err = storage.OpenFile(path) stor, err = storage.OpenFile(path)
if err != nil { if err != nil {

View File

@ -7,9 +7,7 @@
package leveldb package leveldb
import ( import (
"fmt"
"sort" "sort"
"sync"
"sync/atomic" "sync/atomic"
"github.com/syndtr/goleveldb/leveldb/cache" "github.com/syndtr/goleveldb/leveldb/cache"
@ -278,8 +276,6 @@ type tOps struct {
cache cache.Cache cache cache.Cache
cacheNS cache.Namespace cacheNS cache.Namespace
bpool *util.BufferPool bpool *util.BufferPool
mu sync.Mutex
closed bool
} }
// Creates an empty table and returns table writer. // Creates an empty table and returns table writer.
@ -326,42 +322,9 @@ func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
return return
} }
type trWrapper struct {
*table.Reader
t *tOps
ref int
}
func (w *trWrapper) Release() {
if w.ref != 0 && !w.t.closed {
panic(fmt.Sprintf("BUG: invalid ref %d, refer to issue #72", w.ref))
}
w.Reader.Release()
}
type trCacheHandleWrapper struct {
cache.Handle
t *tOps
released bool
}
func (w *trCacheHandleWrapper) Release() {
w.t.mu.Lock()
defer w.t.mu.Unlock()
if !w.released {
w.released = true
w.Value().(*trWrapper).ref--
}
w.Handle.Release()
}
// Opens table. It returns a cache handle, which should // Opens table. It returns a cache handle, which should
// be released after use. // be released after use.
func (t *tOps) open(f *tFile) (ch cache.Handle, err error) { func (t *tOps) open(f *tFile) (ch cache.Handle, err error) {
t.mu.Lock()
defer t.mu.Unlock()
num := f.file.Num() num := f.file.Num()
ch = t.cacheNS.Get(num, func() (charge int, value interface{}) { ch = t.cacheNS.Get(num, func() (charge int, value interface{}) {
var r storage.Reader var r storage.Reader
@ -374,13 +337,11 @@ func (t *tOps) open(f *tFile) (ch cache.Handle, err error) {
if bc := t.s.o.GetBlockCache(); bc != nil { if bc := t.s.o.GetBlockCache(); bc != nil {
bcacheNS = bc.GetNamespace(num) bcacheNS = bc.GetNamespace(num)
} }
return 1, &trWrapper{table.NewReader(r, int64(f.size), bcacheNS, t.bpool, t.s.o), t, 0} return 1, table.NewReader(r, int64(f.size), bcacheNS, t.bpool, t.s.o)
}) })
if ch == nil && err == nil { if ch == nil && err == nil {
err = ErrClosed err = ErrClosed
} }
ch.Value().(*trWrapper).ref++
ch = &trCacheHandleWrapper{ch, t, false}
return return
} }
@ -392,7 +353,7 @@ func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []b
return nil, nil, err return nil, nil, err
} }
defer ch.Release() defer ch.Release()
return ch.Value().(*trWrapper).Find(key, ro) return ch.Value().(*table.Reader).Find(key, ro)
} }
// Returns approximate offset of the given key. // Returns approximate offset of the given key.
@ -402,7 +363,7 @@ func (t *tOps) offsetOf(f *tFile, key []byte) (offset uint64, err error) {
return return
} }
defer ch.Release() defer ch.Release()
offset_, err := ch.Value().(*trWrapper).OffsetOf(key) offset_, err := ch.Value().(*table.Reader).OffsetOf(key)
return uint64(offset_), err return uint64(offset_), err
} }
@ -412,7 +373,7 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite
if err != nil { if err != nil {
return iterator.NewEmptyIterator(err) return iterator.NewEmptyIterator(err)
} }
iter := ch.Value().(*trWrapper).NewIterator(slice, ro) iter := ch.Value().(*table.Reader).NewIterator(slice, ro)
iter.SetReleaser(ch) iter.SetReleaser(ch)
return iter return iter
} }
@ -420,9 +381,6 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite
// Removes table from persistent storage. It waits until // Removes table from persistent storage. It waits until
// no one use the the table. // no one use the the table.
func (t *tOps) remove(f *tFile) { func (t *tOps) remove(f *tFile) {
t.mu.Lock()
defer t.mu.Unlock()
num := f.file.Num() num := f.file.Num()
t.cacheNS.Delete(num, func(exist, pending bool) { t.cacheNS.Delete(num, func(exist, pending bool) {
if !pending { if !pending {
@ -441,10 +399,6 @@ func (t *tOps) remove(f *tFile) {
// Closes the table ops instance. It will close all tables, // Closes the table ops instance. It will close all tables,
// regadless still used or not. // regadless still used or not.
func (t *tOps) close() { func (t *tOps) close() {
t.mu.Lock()
defer t.mu.Unlock()
t.closed = true
t.cache.Zap() t.cache.Zap()
t.bpool.Close() t.bpool.Close()
} }

View File

@ -19,13 +19,18 @@ import (
"github.com/syndtr/goleveldb/leveldb/util" "github.com/syndtr/goleveldb/leveldb/util"
) )
func (b *block) TestNewIterator(slice *util.Range) iterator.Iterator { type blockTesting struct {
return b.newIterator(slice, false, nil) tr *Reader
b *block
}
func (t *blockTesting) TestNewIterator(slice *util.Range) iterator.Iterator {
return t.tr.newBlockIter(t.b, nil, slice, false)
} }
var _ = testutil.Defer(func() { var _ = testutil.Defer(func() {
Describe("Block", func() { Describe("Block", func() {
Build := func(kv *testutil.KeyValue, restartInterval int) *block { Build := func(kv *testutil.KeyValue, restartInterval int) *blockTesting {
// Building the block. // Building the block.
bw := &blockWriter{ bw := &blockWriter{
restartInterval: restartInterval, restartInterval: restartInterval,
@ -39,11 +44,13 @@ var _ = testutil.Defer(func() {
// Opening the block. // Opening the block.
data := bw.buf.Bytes() data := bw.buf.Bytes()
restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:])) restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:]))
return &block{ return &blockTesting{
tr: &Reader{cmp: comparer.DefaultComparer}, tr: &Reader{cmp: comparer.DefaultComparer},
b: &block{
data: data, data: data,
restartsLen: restartsLen, restartsLen: restartsLen,
restartsOffset: len(data) - (restartsLen+1)*4, restartsOffset: len(data) - (restartsLen+1)*4,
},
} }
} }
@ -102,11 +109,11 @@ var _ = testutil.Defer(func() {
for restartInterval := 1; restartInterval <= 5; restartInterval++ { for restartInterval := 1; restartInterval <= 5; restartInterval++ {
Describe(fmt.Sprintf("with restart interval of %d", restartInterval), func() { Describe(fmt.Sprintf("with restart interval of %d", restartInterval), func() {
// Make block. // Make block.
br := Build(kv, restartInterval) bt := Build(kv, restartInterval)
Test := func(r *util.Range) func(done Done) { Test := func(r *util.Range) func(done Done) {
return func(done Done) { return func(done Done) {
iter := br.newIterator(r, false, nil) iter := bt.TestNewIterator(r)
Expect(iter.Error()).ShouldNot(HaveOccurred()) Expect(iter.Error()).ShouldNot(HaveOccurred())
t := testutil.IteratorTesting{ t := testutil.IteratorTesting{

View File

@ -39,7 +39,7 @@ func max(x, y int) int {
} }
type block struct { type block struct {
tr *Reader bpool *util.BufferPool
data []byte data []byte
restartsLen int restartsLen int
restartsOffset int restartsOffset int
@ -47,14 +47,14 @@ type block struct {
checksum bool checksum bool
} }
func (b *block) seek(rstart, rlimit int, key []byte) (index, offset int, err error) { func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) {
index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool { index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):]))
offset += 1 // shared always zero, since this is a restart point offset += 1 // shared always zero, since this is a restart point
v1, n1 := binary.Uvarint(b.data[offset:]) // key length v1, n1 := binary.Uvarint(b.data[offset:]) // key length
_, n2 := binary.Uvarint(b.data[offset+n1:]) // value length _, n2 := binary.Uvarint(b.data[offset+n1:]) // value length
m := offset + n1 + n2 m := offset + n1 + n2
return b.tr.cmp.Compare(b.data[m:m+int(v1)], key) > 0 return cmp.Compare(b.data[m:m+int(v1)], key) > 0
}) + rstart - 1 }) + rstart - 1
if index < rstart { if index < rstart {
// The smallest key is greater-than key sought. // The smallest key is greater-than key sought.
@ -96,48 +96,9 @@ func (b *block) entry(offset int) (key, value []byte, nShared, n int, err error)
return return
} }
func (b *block) newIterator(slice *util.Range, inclLimit bool, cache util.Releaser) *blockIter {
bi := &blockIter{
block: b,
cache: cache,
// Valid key should never be nil.
key: make([]byte, 0),
dir: dirSOI,
riStart: 0,
riLimit: b.restartsLen,
offsetStart: 0,
offsetRealStart: 0,
offsetLimit: b.restartsOffset,
}
if slice != nil {
if slice.Start != nil {
if bi.Seek(slice.Start) {
bi.riStart = b.restartIndex(bi.restartIndex, b.restartsLen, bi.prevOffset)
bi.offsetStart = b.restartOffset(bi.riStart)
bi.offsetRealStart = bi.prevOffset
} else {
bi.riStart = b.restartsLen
bi.offsetStart = b.restartsOffset
bi.offsetRealStart = b.restartsOffset
}
}
if slice.Limit != nil {
if bi.Seek(slice.Limit) && (!inclLimit || bi.Next()) {
bi.offsetLimit = bi.prevOffset
bi.riLimit = bi.restartIndex + 1
}
}
bi.reset()
if bi.offsetStart > bi.offsetLimit {
bi.sErr(errors.New("leveldb/table: Reader: invalid slice range"))
}
}
return bi
}
func (b *block) Release() { func (b *block) Release() {
b.tr.bpool.Put(b.data) b.bpool.Put(b.data)
b.tr = nil b.bpool = nil
b.data = nil b.data = nil
} }
@ -152,8 +113,10 @@ const (
) )
type blockIter struct { type blockIter struct {
tr *Reader
block *block block *block
cache, releaser util.Releaser blockReleaser util.Releaser
releaser util.Releaser
key, value []byte key, value []byte
offset int offset int
// Previous offset, only filled by Next. // Previous offset, only filled by Next.
@ -252,7 +215,7 @@ func (i *blockIter) Seek(key []byte) bool {
return false return false
} }
ri, offset, err := i.block.seek(i.riStart, i.riLimit, key) ri, offset, err := i.block.seek(i.tr.cmp, i.riStart, i.riLimit, key)
if err != nil { if err != nil {
i.sErr(err) i.sErr(err)
return false return false
@ -263,7 +226,7 @@ func (i *blockIter) Seek(key []byte) bool {
i.dir = dirForward i.dir = dirForward
} }
for i.Next() { for i.Next() {
if i.block.tr.cmp.Compare(i.key, key) >= 0 { if i.tr.cmp.Compare(i.key, key) >= 0 {
return true return true
} }
} }
@ -440,15 +403,16 @@ func (i *blockIter) Value() []byte {
func (i *blockIter) Release() { func (i *blockIter) Release() {
if i.dir != dirReleased { if i.dir != dirReleased {
i.tr = nil
i.block = nil i.block = nil
i.prevNode = nil i.prevNode = nil
i.prevKeys = nil i.prevKeys = nil
i.key = nil i.key = nil
i.value = nil i.value = nil
i.dir = dirReleased i.dir = dirReleased
if i.cache != nil { if i.blockReleaser != nil {
i.cache.Release() i.blockReleaser.Release()
i.cache = nil i.blockReleaser = nil
} }
if i.releaser != nil { if i.releaser != nil {
i.releaser.Release() i.releaser.Release()
@ -476,21 +440,21 @@ func (i *blockIter) Error() error {
} }
type filterBlock struct { type filterBlock struct {
tr *Reader bpool *util.BufferPool
data []byte data []byte
oOffset int oOffset int
baseLg uint baseLg uint
filtersNum int filtersNum int
} }
func (b *filterBlock) contains(offset uint64, key []byte) bool { func (b *filterBlock) contains(filter filter.Filter, offset uint64, key []byte) bool {
i := int(offset >> b.baseLg) i := int(offset >> b.baseLg)
if i < b.filtersNum { if i < b.filtersNum {
o := b.data[b.oOffset+i*4:] o := b.data[b.oOffset+i*4:]
n := int(binary.LittleEndian.Uint32(o)) n := int(binary.LittleEndian.Uint32(o))
m := int(binary.LittleEndian.Uint32(o[4:])) m := int(binary.LittleEndian.Uint32(o[4:]))
if n < m && m <= b.oOffset { if n < m && m <= b.oOffset {
return b.tr.filter.Contains(b.data[n:m], key) return filter.Contains(b.data[n:m], key)
} else if n == m { } else if n == m {
return false return false
} }
@ -499,13 +463,14 @@ func (b *filterBlock) contains(offset uint64, key []byte) bool {
} }
func (b *filterBlock) Release() { func (b *filterBlock) Release() {
b.tr.bpool.Put(b.data) b.bpool.Put(b.data)
b.tr = nil b.bpool = nil
b.data = nil b.data = nil
} }
type indexIter struct { type indexIter struct {
*blockIter *blockIter
tr *Reader
slice *util.Range slice *util.Range
// Options // Options
checksum bool checksum bool
@ -526,7 +491,7 @@ func (i *indexIter) Get() iterator.Iterator {
if i.slice != nil && (i.blockIter.isFirst() || i.blockIter.isLast()) { if i.slice != nil && (i.blockIter.isFirst() || i.blockIter.isLast()) {
slice = i.slice slice = i.slice
} }
return i.blockIter.block.tr.getDataIterErr(dataBH, slice, i.checksum, i.fillCache) return i.tr.getDataIterErr(dataBH, slice, i.checksum, i.fillCache)
} }
// Reader is a table reader. // Reader is a table reader.
@ -594,7 +559,7 @@ func (r *Reader) readBlock(bh blockHandle, checksum bool) (*block, error) {
} }
restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:])) restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:]))
b := &block{ b := &block{
tr: r, bpool: r.bpool,
data: data, data: data,
restartsLen: restartsLen, restartsLen: restartsLen,
restartsOffset: len(data) - (restartsLen+1)*4, restartsOffset: len(data) - (restartsLen+1)*4,
@ -655,7 +620,7 @@ func (r *Reader) readFilterBlock(bh blockHandle) (*filterBlock, error) {
return nil, errors.New("leveldb/table: Reader: invalid filter block (invalid offset)") return nil, errors.New("leveldb/table: Reader: invalid filter block (invalid offset)")
} }
b := &filterBlock{ b := &filterBlock{
tr: r, bpool: r.bpool,
data: data, data: data,
oOffset: oOffset, oOffset: oOffset,
baseLg: uint(data[n-1]), baseLg: uint(data[n-1]),
@ -713,7 +678,7 @@ func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fi
if err != nil { if err != nil {
return iterator.NewEmptyIterator(err) return iterator.NewEmptyIterator(err)
} }
return b.newIterator(slice, false, rel) return r.newBlockIter(b, rel, slice, false)
} }
func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, checksum, fillCache bool) iterator.Iterator { func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, checksum, fillCache bool) iterator.Iterator {
@ -727,6 +692,46 @@ func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, checksum,
return r.getDataIter(dataBH, slice, checksum, fillCache) return r.getDataIter(dataBH, slice, checksum, fillCache)
} }
func (r *Reader) newBlockIter(b *block, bReleaser util.Releaser, slice *util.Range, inclLimit bool) *blockIter {
bi := &blockIter{
tr: r,
block: b,
blockReleaser: bReleaser,
// Valid key should never be nil.
key: make([]byte, 0),
dir: dirSOI,
riStart: 0,
riLimit: b.restartsLen,
offsetStart: 0,
offsetRealStart: 0,
offsetLimit: b.restartsOffset,
}
if slice != nil {
if slice.Start != nil {
if bi.Seek(slice.Start) {
bi.riStart = b.restartIndex(bi.restartIndex, b.restartsLen, bi.prevOffset)
bi.offsetStart = b.restartOffset(bi.riStart)
bi.offsetRealStart = bi.prevOffset
} else {
bi.riStart = b.restartsLen
bi.offsetStart = b.restartsOffset
bi.offsetRealStart = b.restartsOffset
}
}
if slice.Limit != nil {
if bi.Seek(slice.Limit) && (!inclLimit || bi.Next()) {
bi.offsetLimit = bi.prevOffset
bi.riLimit = bi.restartIndex + 1
}
}
bi.reset()
if bi.offsetStart > bi.offsetLimit {
bi.sErr(errors.New("leveldb/table: Reader: invalid slice range"))
}
}
return bi
}
// NewIterator creates an iterator from the table. // NewIterator creates an iterator from the table.
// //
// Slice allows slicing the iterator to only contains keys in the given // Slice allows slicing the iterator to only contains keys in the given
@ -752,7 +757,8 @@ func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.It
return iterator.NewEmptyIterator(err) return iterator.NewEmptyIterator(err)
} }
index := &indexIter{ index := &indexIter{
blockIter: indexBlock.newIterator(slice, true, rel), blockIter: r.newBlockIter(indexBlock, rel, slice, true),
tr: r,
slice: slice, slice: slice,
checksum: ro.GetStrict(opt.StrictBlockChecksum), checksum: ro.GetStrict(opt.StrictBlockChecksum),
fillCache: !ro.GetDontFillCache(), fillCache: !ro.GetDontFillCache(),
@ -781,7 +787,7 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err
} }
defer rel.Release() defer rel.Release()
index := indexBlock.newIterator(nil, true, nil) index := r.newBlockIter(indexBlock, nil, nil, true)
defer index.Release() defer index.Release()
if !index.Seek(key) { if !index.Seek(key) {
err = index.Error() err = index.Error()
@ -798,7 +804,7 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err
if r.filter != nil { if r.filter != nil {
filterBlock, rel, ferr := r.getFilterBlock(true) filterBlock, rel, ferr := r.getFilterBlock(true)
if ferr == nil { if ferr == nil {
if !filterBlock.contains(dataBH.offset, key) { if !filterBlock.contains(r.filter, dataBH.offset, key) {
rel.Release() rel.Release()
return nil, nil, ErrNotFound return nil, nil, ErrNotFound
} }
@ -866,7 +872,7 @@ func (r *Reader) OffsetOf(key []byte) (offset int64, err error) {
} }
defer rel.Release() defer rel.Release()
index := indexBlock.newIterator(nil, true, nil) index := r.newBlockIter(indexBlock, nil, nil, true)
defer index.Release() defer index.Release()
if index.Seek(key) { if index.Seek(key) {
dataBH, n := decodeBlockHandle(index.Value()) dataBH, n := decodeBlockHandle(index.Value())
@ -956,7 +962,7 @@ func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, bpool *util.Buf
} }
// Set data end. // Set data end.
r.dataEnd = int64(metaBH.offset) r.dataEnd = int64(metaBH.offset)
metaIter := metaBlock.newIterator(nil, false, nil) metaIter := r.newBlockIter(metaBlock, nil, nil, false)
for metaIter.Next() { for metaIter.Next() {
key := string(metaIter.Key()) key := string(metaIter.Key())
if !strings.HasPrefix(key, "filter.") { if !strings.HasPrefix(key, "filter.") {

View File

@ -156,6 +156,9 @@ func (p *BufferPool) Put(b []byte) {
atomic.AddUint32(&p.put, 1) atomic.AddUint32(&p.put, 1)
pool := p.pool[p.poolNum(cap(b))] pool := p.pool[p.poolNum(cap(b))]
defer func() {
recover()
}()
select { select {
case pool <- b: case pool <- b:
default: default: