syncthing/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go

818 lines
19 KiB
Go
Raw Normal View History

2014-07-06 14:46:48 +02:00
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package leveldb
import (
"errors"
"fmt"
"io"
"os"
"runtime"
"strings"
"sync"
"time"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/journal"
"github.com/syndtr/goleveldb/leveldb/memdb"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/table"
"github.com/syndtr/goleveldb/leveldb/util"
)
// DB is a LevelDB database.
type DB struct {
// Need 64-bit alignment.
seq uint64
2014-07-23 08:31:36 +02:00
// Session.
2014-07-06 14:46:48 +02:00
s *session
2014-07-23 08:31:36 +02:00
// MemDB.
2014-07-06 14:46:48 +02:00
memMu sync.RWMutex
2014-08-15 09:16:30 +02:00
memPool *util.Pool
mem, frozenMem *memDB
2014-07-06 14:46:48 +02:00
journal *journal.Writer
journalWriter storage.Writer
journalFile storage.File
frozenJournalFile storage.File
frozenSeq uint64
2014-07-23 08:31:36 +02:00
// Snapshot.
2014-07-06 14:46:48 +02:00
snapsMu sync.Mutex
snapsRoot snapshotElement
2014-07-23 08:31:36 +02:00
// Write.
2014-07-06 14:46:48 +02:00
writeC chan *Batch
writeMergedC chan bool
writeLockC chan struct{}
writeAckC chan error
journalC chan *Batch
journalAckC chan error
2014-07-23 08:31:36 +02:00
// Compaction.
2014-07-06 14:46:48 +02:00
tcompCmdC chan cCmd
tcompPauseC chan chan<- struct{}
tcompTriggerC chan struct{}
mcompCmdC chan cCmd
mcompTriggerC chan struct{}
compErrC chan error
compErrSetC chan error
compStats [kNumLevels]cStats
2014-07-23 08:31:36 +02:00
// Close.
2014-07-06 14:46:48 +02:00
closeW sync.WaitGroup
closeC chan struct{}
closed uint32
closer io.Closer
}
func openDB(s *session) (*DB, error) {
s.log("db@open opening")
start := time.Now()
db := &DB{
s: s,
// Initial sequence
seq: s.stSeq,
2014-08-15 09:16:30 +02:00
// MemDB
memPool: util.NewPool(1),
2014-07-06 14:46:48 +02:00
// Write
writeC: make(chan *Batch),
writeMergedC: make(chan bool),
writeLockC: make(chan struct{}, 1),
writeAckC: make(chan error),
journalC: make(chan *Batch),
journalAckC: make(chan error),
// Compaction
tcompCmdC: make(chan cCmd),
tcompPauseC: make(chan chan<- struct{}),
tcompTriggerC: make(chan struct{}, 1),
mcompCmdC: make(chan cCmd),
mcompTriggerC: make(chan struct{}, 1),
compErrC: make(chan error),
compErrSetC: make(chan error),
// Close
closeC: make(chan struct{}),
}
db.initSnapshot()
if err := db.recoverJournal(); err != nil {
return nil, err
}
// Remove any obsolete files.
if err := db.checkAndCleanFiles(); err != nil {
// Close journal.
if db.journal != nil {
db.journal.Close()
db.journalWriter.Close()
}
return nil, err
}
// Don't include compaction error goroutine into wait group.
go db.compactionError()
db.closeW.Add(3)
go db.tCompaction()
go db.mCompaction()
go db.jWriter()
s.logf("db@open done T·%v", time.Since(start))
runtime.SetFinalizer(db, (*DB).Close)
return db, nil
}
// Open opens or creates a DB for the given storage.
// The DB will be created if not exist, unless ErrorIfMissing is true.
// Also, if ErrorIfExist is true and the DB exist Open will returns
// os.ErrExist error.
//
// Open will return an error with type of ErrCorrupted if corruption
// detected in the DB. Corrupted DB can be recovered with Recover
// function.
//
2014-07-23 08:31:36 +02:00
// The returned DB instance is goroutine-safe.
2014-07-06 14:46:48 +02:00
// The DB must be closed after use, by calling Close method.
2014-07-23 08:31:36 +02:00
func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
s, err := newSession(stor, o)
2014-07-06 14:46:48 +02:00
if err != nil {
return
}
defer func() {
if err != nil {
s.close()
s.release()
}
}()
err = s.recover()
if err != nil {
if !os.IsNotExist(err) || s.o.GetErrorIfMissing() {
return
}
err = s.create()
if err != nil {
return
}
} else if s.o.GetErrorIfExist() {
err = os.ErrExist
return
}
return openDB(s)
}
// OpenFile opens or creates a DB for the given path.
// The DB will be created if not exist, unless ErrorIfMissing is true.
// Also, if ErrorIfExist is true and the DB exist OpenFile will returns
// os.ErrExist error.
//
// OpenFile uses standard file-system backed storage implementation as
// desribed in the leveldb/storage package.
//
// OpenFile will return an error with type of ErrCorrupted if corruption
// detected in the DB. Corrupted DB can be recovered with Recover
// function.
//
2014-07-23 08:31:36 +02:00
// The returned DB instance is goroutine-safe.
2014-07-06 14:46:48 +02:00
// The DB must be closed after use, by calling Close method.
func OpenFile(path string, o *opt.Options) (db *DB, err error) {
stor, err := storage.OpenFile(path)
if err != nil {
return
}
db, err = Open(stor, o)
if err != nil {
stor.Close()
} else {
db.closer = stor
}
return
}
// Recover recovers and opens a DB with missing or corrupted manifest files
// for the given storage. It will ignore any manifest files, valid or not.
// The DB must already exist or it will returns an error.
// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
//
2014-07-23 08:31:36 +02:00
// The returned DB instance is goroutine-safe.
2014-07-06 14:46:48 +02:00
// The DB must be closed after use, by calling Close method.
2014-07-23 08:31:36 +02:00
func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
s, err := newSession(stor, o)
2014-07-06 14:46:48 +02:00
if err != nil {
return
}
defer func() {
if err != nil {
s.close()
s.release()
}
}()
err = recoverTable(s, o)
if err != nil {
return
}
return openDB(s)
}
// RecoverFile recovers and opens a DB with missing or corrupted manifest files
// for the given path. It will ignore any manifest files, valid or not.
// The DB must already exist or it will returns an error.
// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
//
// RecoverFile uses standard file-system backed storage implementation as desribed
// in the leveldb/storage package.
//
2014-07-23 08:31:36 +02:00
// The returned DB instance is goroutine-safe.
2014-07-06 14:46:48 +02:00
// The DB must be closed after use, by calling Close method.
func RecoverFile(path string, o *opt.Options) (db *DB, err error) {
stor, err := storage.OpenFile(path)
if err != nil {
return
}
db, err = Recover(stor, o)
if err != nil {
stor.Close()
} else {
db.closer = stor
}
return
}
func recoverTable(s *session, o *opt.Options) error {
2014-07-23 08:31:36 +02:00
// Get all tables and sort it by file number.
tableFiles_, err := s.getFiles(storage.TypeTable)
2014-07-06 14:46:48 +02:00
if err != nil {
return err
}
2014-07-23 08:31:36 +02:00
tableFiles := files(tableFiles_)
tableFiles.sort()
2014-07-06 14:46:48 +02:00
var mSeq uint64
var good, corrupted int
rec := new(sessionRecord)
2014-08-14 12:14:48 +02:00
bpool := util.NewBufferPool(o.GetBlockSize() + 5)
2014-07-06 14:46:48 +02:00
buildTable := func(iter iterator.Iterator) (tmp storage.File, size int64, err error) {
tmp = s.newTemp()
writer, err := tmp.Create()
if err != nil {
return
}
defer func() {
writer.Close()
if err != nil {
tmp.Remove()
tmp = nil
}
}()
2014-07-23 08:31:36 +02:00
// Copy entries.
2014-07-06 14:46:48 +02:00
tw := table.NewWriter(writer, o)
for iter.Next() {
key := iter.Key()
if validIkey(key) {
err = tw.Append(key, iter.Value())
if err != nil {
return
}
}
}
err = iter.Error()
if err != nil {
return
}
err = tw.Close()
if err != nil {
return
}
err = writer.Sync()
if err != nil {
return
}
size = int64(tw.BytesLen())
return
}
recoverTable := func(file storage.File) error {
s.logf("table@recovery recovering @%d", file.Num())
reader, err := file.Open()
if err != nil {
return err
}
defer reader.Close()
2014-07-23 08:31:36 +02:00
2014-07-06 14:46:48 +02:00
// Get file size.
size, err := reader.Seek(0, 2)
if err != nil {
return err
}
2014-07-23 08:31:36 +02:00
2014-07-06 14:46:48 +02:00
var tSeq uint64
var tgood, tcorrupted, blockerr int
2014-07-23 08:31:36 +02:00
var imin, imax []byte
2014-08-14 12:14:48 +02:00
tr := table.NewReader(reader, size, nil, bpool, o)
2014-07-06 14:46:48 +02:00
iter := tr.NewIterator(nil, nil)
iter.(iterator.ErrorCallbackSetter).SetErrorCallback(func(err error) {
s.logf("table@recovery found error @%d %q", file.Num(), err)
blockerr++
})
2014-07-23 08:31:36 +02:00
2014-07-06 14:46:48 +02:00
// Scan the table.
for iter.Next() {
key := iter.Key()
_, seq, _, ok := parseIkey(key)
if !ok {
tcorrupted++
continue
}
tgood++
if seq > tSeq {
tSeq = seq
}
2014-07-23 08:31:36 +02:00
if imin == nil {
imin = append([]byte{}, key...)
2014-07-06 14:46:48 +02:00
}
2014-07-23 08:31:36 +02:00
imax = append(imax[:0], key...)
2014-07-06 14:46:48 +02:00
}
if err := iter.Error(); err != nil {
iter.Release()
return err
}
iter.Release()
2014-07-23 08:31:36 +02:00
2014-07-06 14:46:48 +02:00
if tgood > 0 {
if tcorrupted > 0 || blockerr > 0 {
// Rebuild the table.
s.logf("table@recovery rebuilding @%d", file.Num())
iter := tr.NewIterator(nil, nil)
tmp, newSize, err := buildTable(iter)
iter.Release()
if err != nil {
return err
}
reader.Close()
if err := file.Replace(tmp); err != nil {
return err
}
size = newSize
}
if tSeq > mSeq {
mSeq = tSeq
}
// Add table to level 0.
2014-07-23 08:31:36 +02:00
rec.addTable(0, file.Num(), uint64(size), imin, imax)
2014-07-06 14:46:48 +02:00
s.logf("table@recovery recovered @%d N·%d C·%d B·%d S·%d Q·%d", file.Num(), tgood, tcorrupted, blockerr, size, tSeq)
} else {
s.logf("table@recovery unrecoverable @%d C·%d B·%d S·%d", file.Num(), tcorrupted, blockerr, size)
}
good += tgood
corrupted += tcorrupted
return nil
}
2014-07-23 08:31:36 +02:00
2014-07-06 14:46:48 +02:00
// Recover all tables.
2014-07-23 08:31:36 +02:00
if len(tableFiles) > 0 {
s.logf("table@recovery F·%d", len(tableFiles))
// Mark file number as used.
s.markFileNum(tableFiles[len(tableFiles)-1].Num())
for _, file := range tableFiles {
2014-07-06 14:46:48 +02:00
if err := recoverTable(file); err != nil {
return err
}
}
2014-07-23 08:31:36 +02:00
s.logf("table@recovery recovered F·%d N·%d C·%d Q·%d", len(tableFiles), good, corrupted, mSeq)
2014-07-06 14:46:48 +02:00
}
2014-07-23 08:31:36 +02:00
2014-07-06 14:46:48 +02:00
// Set sequence number.
rec.setSeq(mSeq + 1)
2014-07-23 08:31:36 +02:00
2014-07-06 14:46:48 +02:00
// Create new manifest.
if err := s.create(); err != nil {
return err
}
2014-07-23 08:31:36 +02:00
2014-07-06 14:46:48 +02:00
// Commit.
return s.commit(rec)
}
2014-07-23 08:31:36 +02:00
func (db *DB) recoverJournal() error {
// Get all tables and sort it by file number.
journalFiles_, err := db.s.getFiles(storage.TypeJournal)
2014-07-06 14:46:48 +02:00
if err != nil {
return err
}
2014-07-23 08:31:36 +02:00
journalFiles := files(journalFiles_)
journalFiles.sort()
// Discard older journal.
prev := -1
for i, file := range journalFiles {
if file.Num() >= db.s.stJournalNum {
if prev >= 0 {
i--
journalFiles[i] = journalFiles[prev]
}
journalFiles = journalFiles[i:]
break
} else if file.Num() == db.s.stPrevJournalNum {
prev = i
2014-07-06 14:46:48 +02:00
}
}
var jr *journal.Reader
var of storage.File
var mem *memdb.DB
batch := new(Batch)
2014-07-23 08:31:36 +02:00
cm := newCMem(db.s)
2014-07-06 14:46:48 +02:00
buf := new(util.Buffer)
// Options.
2014-07-23 08:31:36 +02:00
strict := db.s.o.GetStrict(opt.StrictJournal)
checksum := db.s.o.GetStrict(opt.StrictJournalChecksum)
writeBuffer := db.s.o.GetWriteBuffer()
2014-07-06 14:46:48 +02:00
recoverJournal := func(file storage.File) error {
2014-07-23 08:31:36 +02:00
db.logf("journal@recovery recovering @%d", file.Num())
2014-07-06 14:46:48 +02:00
reader, err := file.Open()
if err != nil {
return err
}
defer reader.Close()
2014-07-23 08:31:36 +02:00
// Create/reset journal reader instance.
2014-07-06 14:46:48 +02:00
if jr == nil {
2014-07-23 08:31:36 +02:00
jr = journal.NewReader(reader, dropper{db.s, file}, strict, checksum)
2014-07-06 14:46:48 +02:00
} else {
2014-07-23 08:31:36 +02:00
jr.Reset(reader, dropper{db.s, file}, strict, checksum)
2014-07-06 14:46:48 +02:00
}
2014-07-23 08:31:36 +02:00
// Flush memdb and remove obsolete journal file.
2014-07-06 14:46:48 +02:00
if of != nil {
if mem.Len() > 0 {
if err := cm.flush(mem, 0); err != nil {
return err
}
}
2014-07-23 08:31:36 +02:00
if err := cm.commit(file.Num(), db.seq); err != nil {
2014-07-06 14:46:48 +02:00
return err
}
cm.reset()
of.Remove()
of = nil
}
2014-07-23 08:31:36 +02:00
// Replay journal to memdb.
2014-07-06 14:46:48 +02:00
mem.Reset()
for {
r, err := jr.Next()
if err != nil {
if err == io.EOF {
break
}
return err
}
2014-07-23 08:31:36 +02:00
2014-07-06 14:46:48 +02:00
buf.Reset()
if _, err := buf.ReadFrom(r); err != nil {
2014-08-12 09:24:36 +02:00
if err == io.ErrUnexpectedEOF {
continue
} else {
2014-07-06 14:46:48 +02:00
return err
}
}
if err := batch.decode(buf.Bytes()); err != nil {
return err
}
if err := batch.memReplay(mem); err != nil {
return err
}
2014-07-23 08:31:36 +02:00
// Save sequence number.
db.seq = batch.seq + uint64(batch.len())
// Flush it if large enough.
2014-07-06 14:46:48 +02:00
if mem.Size() >= writeBuffer {
if err := cm.flush(mem, 0); err != nil {
return err
}
mem.Reset()
}
}
2014-07-23 08:31:36 +02:00
2014-07-06 14:46:48 +02:00
of = file
return nil
}
2014-07-23 08:31:36 +02:00
2014-07-06 14:46:48 +02:00
// Recover all journals.
2014-07-23 08:31:36 +02:00
if len(journalFiles) > 0 {
db.logf("journal@recovery F·%d", len(journalFiles))
// Mark file number as used.
db.s.markFileNum(journalFiles[len(journalFiles)-1].Num())
mem = memdb.New(db.s.icmp, writeBuffer)
for _, file := range journalFiles {
2014-07-06 14:46:48 +02:00
if err := recoverJournal(file); err != nil {
return err
}
}
2014-07-23 08:31:36 +02:00
2014-07-06 14:46:48 +02:00
// Flush the last journal.
if mem.Len() > 0 {
if err := cm.flush(mem, 0); err != nil {
return err
}
}
}
2014-07-23 08:31:36 +02:00
2014-07-06 14:46:48 +02:00
// Create a new journal.
2014-07-23 08:31:36 +02:00
if _, err := db.newMem(0); err != nil {
2014-07-06 14:46:48 +02:00
return err
}
2014-07-23 08:31:36 +02:00
2014-07-06 14:46:48 +02:00
// Commit.
2014-07-23 08:31:36 +02:00
if err := cm.commit(db.journalFile.Num(), db.seq); err != nil {
// Close journal.
if db.journal != nil {
db.journal.Close()
db.journalWriter.Close()
}
2014-07-06 14:46:48 +02:00
return err
}
2014-07-23 08:31:36 +02:00
// Remove the last obsolete journal file.
2014-07-06 14:46:48 +02:00
if of != nil {
of.Remove()
}
2014-07-23 08:31:36 +02:00
2014-07-06 14:46:48 +02:00
return nil
}
2014-07-23 08:31:36 +02:00
func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
2014-07-06 14:46:48 +02:00
ikey := newIKey(key, seq, tSeek)
2014-07-23 08:31:36 +02:00
em, fm := db.getMems()
2014-08-15 09:16:30 +02:00
for _, m := range [...]*memDB{em, fm} {
2014-07-06 14:46:48 +02:00
if m == nil {
continue
}
2014-08-15 09:16:30 +02:00
defer m.decref()
2014-07-23 08:31:36 +02:00
2014-08-15 09:16:30 +02:00
mk, mv, me := m.db.Find(ikey)
2014-07-06 14:46:48 +02:00
if me == nil {
ukey, _, t, ok := parseIkey(mk)
2014-07-23 08:31:36 +02:00
if ok && db.s.icmp.uCompare(ukey, key) == 0 {
2014-07-06 14:46:48 +02:00
if t == tDel {
return nil, ErrNotFound
}
2014-08-15 09:16:30 +02:00
return append([]byte{}, mv...), nil
2014-07-06 14:46:48 +02:00
}
} else if me != ErrNotFound {
return nil, me
}
}
2014-07-23 08:31:36 +02:00
v := db.s.version()
2014-07-06 14:46:48 +02:00
value, cSched, err := v.get(ikey, ro)
v.release()
if cSched {
// Trigger table compaction.
2014-07-23 08:31:36 +02:00
db.compTrigger(db.tcompTriggerC)
2014-07-06 14:46:48 +02:00
}
return
}
// Get gets the value for the given key. It returns ErrNotFound if the
// DB does not contain the key.
//
2014-08-15 09:16:30 +02:00
// The returned slice is its own copy, it is safe to modify the contents
// of the returned slice.
// It is safe to modify the contents of the argument after Get returns.
2014-07-23 08:31:36 +02:00
func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
err = db.ok()
2014-07-06 14:46:48 +02:00
if err != nil {
return
}
2014-07-23 08:31:36 +02:00
return db.get(key, db.getSeq(), ro)
2014-07-06 14:46:48 +02:00
}
// NewIterator returns an iterator for the latest snapshot of the
// uderlying DB.
// The returned iterator is not goroutine-safe, but it is safe to use
// multiple iterators concurrently, with each in a dedicated goroutine.
// It is also safe to use an iterator concurrently with modifying its
// underlying DB. The resultant key/value pairs are guaranteed to be
// consistent.
//
// Slice allows slicing the iterator to only contains keys in the given
// range. A nil Range.Start is treated as a key before all keys in the
// DB. And a nil Range.Limit is treated as a key after all keys in
// the DB.
//
// The iterator must be released after use, by calling Release method.
//
// Also read Iterator documentation of the leveldb/iterator package.
2014-07-23 08:31:36 +02:00
func (db *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
if err := db.ok(); err != nil {
2014-07-06 14:46:48 +02:00
return iterator.NewEmptyIterator(err)
}
2014-07-23 08:31:36 +02:00
snap := db.newSnapshot()
defer snap.Release()
return snap.NewIterator(slice, ro)
2014-07-06 14:46:48 +02:00
}
// GetSnapshot returns a latest snapshot of the underlying DB. A snapshot
// is a frozen snapshot of a DB state at a particular point in time. The
// content of snapshot are guaranteed to be consistent.
//
// The snapshot must be released after use, by calling Release method.
2014-07-23 08:31:36 +02:00
func (db *DB) GetSnapshot() (*Snapshot, error) {
if err := db.ok(); err != nil {
2014-07-06 14:46:48 +02:00
return nil, err
}
2014-07-23 08:31:36 +02:00
return db.newSnapshot(), nil
2014-07-06 14:46:48 +02:00
}
// GetProperty returns value of the given property name.
//
// Property names:
// leveldb.num-files-at-level{n}
// Returns the number of filer at level 'n'.
// leveldb.stats
// Returns statistics of the underlying DB.
// leveldb.sstables
// Returns sstables list for each level.
2014-08-29 12:36:45 +02:00
// leveldb.blockpool
// Returns block pool stats.
2014-07-23 08:31:36 +02:00
func (db *DB) GetProperty(name string) (value string, err error) {
err = db.ok()
2014-07-06 14:46:48 +02:00
if err != nil {
return
}
const prefix = "leveldb."
if !strings.HasPrefix(name, prefix) {
return "", errors.New("leveldb: GetProperty: unknown property: " + name)
}
p := name[len(prefix):]
2014-07-23 08:31:36 +02:00
v := db.s.version()
2014-07-06 14:46:48 +02:00
defer v.release()
switch {
case strings.HasPrefix(p, "num-files-at-level"):
var level uint
var rest string
n, _ := fmt.Scanf("%d%s", &level, &rest)
if n != 1 || level >= kNumLevels {
err = errors.New("leveldb: GetProperty: invalid property: " + name)
} else {
value = fmt.Sprint(v.tLen(int(level)))
}
case p == "stats":
value = "Compactions\n" +
" Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" +
"-------+------------+---------------+---------------+---------------+---------------\n"
2014-07-23 08:31:36 +02:00
for level, tables := range v.tables {
duration, read, write := db.compStats[level].get()
if len(tables) == 0 && duration == 0 {
2014-07-06 14:46:48 +02:00
continue
}
value += fmt.Sprintf(" %3d | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
2014-07-23 08:31:36 +02:00
level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(),
2014-07-06 14:46:48 +02:00
float64(read)/1048576.0, float64(write)/1048576.0)
}
case p == "sstables":
2014-07-23 08:31:36 +02:00
for level, tables := range v.tables {
2014-07-06 14:46:48 +02:00
value += fmt.Sprintf("--- level %d ---\n", level)
2014-07-23 08:31:36 +02:00
for _, t := range tables {
value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.file.Num(), t.size, t.imin, t.imax)
2014-07-06 14:46:48 +02:00
}
}
2014-08-29 12:36:45 +02:00
case p == "blockpool":
value = fmt.Sprintf("%v", db.s.tops.bpool)
case p == "cachedblock":
if bc := db.s.o.GetBlockCache(); bc != nil {
value = fmt.Sprintf("%d", bc.Size())
} else {
value = "<nil>"
}
case p == "openedtables":
value = fmt.Sprintf("%d", db.s.tops.cache.Size())
2014-07-06 14:46:48 +02:00
default:
err = errors.New("leveldb: GetProperty: unknown property: " + name)
}
return
}
2014-07-06 23:13:10 +02:00
// SizeOf calculates approximate sizes of the given key ranges.
2014-07-06 14:46:48 +02:00
// The length of the returned sizes are equal with the length of the given
// ranges. The returned sizes measure storage space usage, so if the user
// data compresses by a factor of ten, the returned sizes will be one-tenth
// the size of the corresponding user data size.
// The results may not include the sizes of recently written data.
2014-07-23 08:31:36 +02:00
func (db *DB) SizeOf(ranges []util.Range) (Sizes, error) {
if err := db.ok(); err != nil {
2014-07-06 14:46:48 +02:00
return nil, err
}
2014-07-23 08:31:36 +02:00
v := db.s.version()
2014-07-06 14:46:48 +02:00
defer v.release()
sizes := make(Sizes, 0, len(ranges))
for _, r := range ranges {
2014-07-23 08:31:36 +02:00
imin := newIKey(r.Start, kMaxSeq, tSeek)
imax := newIKey(r.Limit, kMaxSeq, tSeek)
start, err := v.offsetOf(imin)
2014-07-06 14:46:48 +02:00
if err != nil {
return nil, err
}
2014-07-23 08:31:36 +02:00
limit, err := v.offsetOf(imax)
2014-07-06 14:46:48 +02:00
if err != nil {
return nil, err
}
var size uint64
if limit >= start {
size = limit - start
}
sizes = append(sizes, size)
}
return sizes, nil
}
2014-07-23 08:31:36 +02:00
// Close closes the DB. This will also releases any outstanding snapshot and
// abort any in-flight compaction.
2014-07-06 14:46:48 +02:00
//
// It is not safe to close a DB until all outstanding iterators are released.
// It is valid to call Close multiple times. Other methods should not be
// called after the DB has been closed.
2014-07-23 08:31:36 +02:00
func (db *DB) Close() error {
if !db.setClosed() {
2014-07-06 14:46:48 +02:00
return ErrClosed
}
start := time.Now()
2014-07-23 08:31:36 +02:00
db.log("db@close closing")
2014-07-06 14:46:48 +02:00
// Clear the finalizer.
2014-07-23 08:31:36 +02:00
runtime.SetFinalizer(db, nil)
2014-07-06 14:46:48 +02:00
// Get compaction error.
var err error
select {
2014-07-23 08:31:36 +02:00
case err = <-db.compErrC:
2014-07-06 14:46:48 +02:00
default:
}
2014-07-23 08:31:36 +02:00
close(db.closeC)
2014-07-06 14:46:48 +02:00
// Wait for the close WaitGroup.
2014-07-23 08:31:36 +02:00
db.closeW.Wait()
2014-07-06 14:46:48 +02:00
// Close journal.
2014-07-23 08:31:36 +02:00
db.writeLockC <- struct{}{}
if db.journal != nil {
db.journal.Close()
db.journalWriter.Close()
2014-07-06 14:46:48 +02:00
}
// Close session.
2014-07-23 08:31:36 +02:00
db.s.close()
db.logf("db@close done T·%v", time.Since(start))
db.s.release()
2014-07-06 14:46:48 +02:00
2014-07-23 08:31:36 +02:00
if db.closer != nil {
if err1 := db.closer.Close(); err == nil {
2014-07-06 14:46:48 +02:00
err = err1
}
}
2014-07-23 08:31:36 +02:00
// NIL'ing pointers.
db.s = nil
db.mem = nil
db.frozenMem = nil
db.journal = nil
db.journalWriter = nil
db.journalFile = nil
db.frozenJournalFile = nil
db.snapsRoot = snapshotElement{}
db.closer = nil
2014-07-06 14:46:48 +02:00
return err
}