syncthing/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go
Jakob Borg 65aaa607ab Use Go 1.5 vendoring instead of Godeps
Change made by:

- running "gvt fetch" on each of the packages mentioned in
  Godeps/Godeps.json
- `rm -rf Godeps`
- tweaking the build scripts to not mention Godeps
- tweaking the build scripts to test `./lib/...`, `./cmd/...` explicitly
  (to avoid testing vendor)
- tweaking the build scripts to not juggle GOPATH for Godeps and instead
  set GO15VENDOREXPERIMENT.

This also results in some updated packages at the same time I bet.

Building with Go 1.3 and 1.4 still *works* but won't use our vendored
dependencies - the user needs to have the actual packages in their
GOPATH then, which they'll get with a normal "go get". Building with Go
1.6+ will get our vendored dependencies by default even when not using
our build script, which is nice.

By doing this we gain some freedom in that we can pick and choose
manually what to include in vendor, as it's not based on just dependency
analysis of our own code. This is also a risk as we might pick up
dependencies we are unaware of, as the build may work locally with those
packages present in GOPATH. On the other hand the build server will
detect this as it has no packages in it's GOPATH beyond what is included
in the repo.

Recommended tool to manage dependencies is github.com/FiloSottile/gvt.
2016-03-05 21:21:24 +01:00

290 lines
7.6 KiB
Go

// Copyright (c) 2016, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package leveldb
import (
"errors"
"sync"
"time"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
var errTransactionDone = errors.New("leveldb: transaction already closed")
// Transaction is the transaction handle.
type Transaction struct {
db *DB
lk sync.RWMutex
seq uint64
mem *memDB
tables tFiles
ikScratch []byte
rec sessionRecord
stats cStatStaging
closed bool
}
// Get gets the value for the given key. It returns ErrNotFound if the
// DB does not contains the key.
//
// The returned slice is its own copy, it is safe to modify the contents
// of the returned slice.
// It is safe to modify the contents of the argument after Get returns.
func (tr *Transaction) Get(key []byte, ro *opt.ReadOptions) ([]byte, error) {
tr.lk.RLock()
defer tr.lk.RUnlock()
if tr.closed {
return nil, errTransactionDone
}
return tr.db.get(tr.mem.DB, tr.tables, key, tr.seq, ro)
}
// Has returns true if the DB does contains the given key.
//
// It is safe to modify the contents of the argument after Has returns.
func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) {
tr.lk.RLock()
defer tr.lk.RUnlock()
if tr.closed {
return false, errTransactionDone
}
return tr.db.has(tr.mem.DB, tr.tables, key, tr.seq, ro)
}
// NewIterator returns an iterator for the latest snapshot of the transaction.
// The returned iterator is not goroutine-safe, but it is safe to use multiple
// iterators concurrently, with each in a dedicated goroutine.
// It is also safe to use an iterator concurrently while writes to the
// transaction. The resultant key/value pairs are guaranteed to be consistent.
//
// Slice allows slicing the iterator to only contains keys in the given
// range. A nil Range.Start is treated as a key before all keys in the
// DB. And a nil Range.Limit is treated as a key after all keys in
// the DB.
//
// The iterator must be released after use, by calling Release method.
//
// Also read Iterator documentation of the leveldb/iterator package.
func (tr *Transaction) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
tr.lk.RLock()
defer tr.lk.RUnlock()
if tr.closed {
return iterator.NewEmptyIterator(errTransactionDone)
}
tr.mem.incref()
return tr.db.newIterator(tr.mem, tr.tables, tr.seq, slice, ro)
}
func (tr *Transaction) flush() error {
// Flush memdb.
if tr.mem.Len() != 0 {
tr.stats.startTimer()
iter := tr.mem.NewIterator(nil)
t, n, err := tr.db.s.tops.createFrom(iter)
iter.Release()
tr.stats.stopTimer()
if err != nil {
return err
}
if tr.mem.getref() == 1 {
tr.mem.Reset()
} else {
tr.mem.decref()
tr.mem = tr.db.mpoolGet(0)
tr.mem.incref()
}
tr.tables = append(tr.tables, t)
tr.rec.addTableFile(0, t)
tr.stats.write += t.size
tr.db.logf("transaction@flush created L0@%d N·%d S·%s %q:%q", t.fd.Num, n, shortenb(int(t.size)), t.imin, t.imax)
}
return nil
}
func (tr *Transaction) put(kt keyType, key, value []byte) error {
tr.ikScratch = makeInternalKey(tr.ikScratch, key, tr.seq+1, kt)
if tr.mem.Free() < len(tr.ikScratch)+len(value) {
if err := tr.flush(); err != nil {
return err
}
}
if err := tr.mem.Put(tr.ikScratch, value); err != nil {
return err
}
tr.seq++
return nil
}
// Put sets the value for the given key. It overwrites any previous value
// for that key; a DB is not a multi-map.
// Please note that the transaction is not compacted until committed, so if you
// writes 10 same keys, then those 10 same keys are in the transaction.
//
// It is safe to modify the contents of the arguments after Put returns.
func (tr *Transaction) Put(key, value []byte, wo *opt.WriteOptions) error {
tr.lk.Lock()
defer tr.lk.Unlock()
if tr.closed {
return errTransactionDone
}
return tr.put(keyTypeVal, key, value)
}
// Delete deletes the value for the given key.
// Please note that the transaction is not compacted until committed, so if you
// writes 10 same keys, then those 10 same keys are in the transaction.
//
// It is safe to modify the contents of the arguments after Delete returns.
func (tr *Transaction) Delete(key []byte, wo *opt.WriteOptions) error {
tr.lk.Lock()
defer tr.lk.Unlock()
if tr.closed {
return errTransactionDone
}
return tr.put(keyTypeDel, key, nil)
}
// Write apply the given batch to the transaction. The batch will be applied
// sequentially.
// Please note that the transaction is not compacted until committed, so if you
// writes 10 same keys, then those 10 same keys are in the transaction.
//
// It is safe to modify the contents of the arguments after Write returns.
func (tr *Transaction) Write(b *Batch, wo *opt.WriteOptions) error {
if b == nil || b.Len() == 0 {
return nil
}
tr.lk.Lock()
defer tr.lk.Unlock()
if tr.closed {
return errTransactionDone
}
return b.decodeRec(func(i int, kt keyType, key, value []byte) error {
return tr.put(kt, key, value)
})
}
func (tr *Transaction) setDone() {
tr.closed = true
tr.db.tr = nil
tr.mem.decref()
<-tr.db.writeLockC
}
// Commit commits the transaction.
//
// Other methods should not be called after transaction has been committed.
func (tr *Transaction) Commit() error {
if err := tr.db.ok(); err != nil {
return err
}
tr.lk.Lock()
defer tr.lk.Unlock()
if tr.closed {
return errTransactionDone
}
defer tr.setDone()
if err := tr.flush(); err != nil {
tr.discard()
return err
}
if len(tr.tables) != 0 {
// Committing transaction.
tr.rec.setSeqNum(tr.seq)
tr.db.compCommitLk.Lock()
defer tr.db.compCommitLk.Unlock()
for retry := 0; retry < 3; retry++ {
if err := tr.db.s.commit(&tr.rec); err != nil {
tr.db.logf("transaction@commit error R·%d %q", retry, err)
select {
case <-time.After(time.Second):
case _, _ = <-tr.db.closeC:
tr.db.logf("transaction@commit exiting")
return err
}
} else {
// Success. Set db.seq.
tr.db.setSeq(tr.seq)
break
}
}
// Trigger table auto-compaction.
tr.db.compTrigger(tr.db.tcompCmdC)
}
return nil
}
func (tr *Transaction) discard() {
// Discard transaction.
for _, t := range tr.tables {
tr.db.logf("transaction@discard @%d", t.fd.Num)
if err1 := tr.db.s.stor.Remove(t.fd); err1 == nil {
tr.db.s.reuseFileNum(t.fd.Num)
}
}
}
// Discard discards the transaction.
//
// Other methods should not be called after transaction has been discarded.
func (tr *Transaction) Discard() {
tr.lk.Lock()
if !tr.closed {
tr.discard()
tr.setDone()
}
tr.lk.Unlock()
}
// OpenTransaction opens an atomic DB transaction. Only one transaction can be
// opened at a time. Write will be blocked until the transaction is committed or
// discarded.
// The returned transaction handle is goroutine-safe.
//
// The transaction must be closed once done, either by committing or discarding
// the transaction.
// Closing the DB will discard open transaction.
func (db *DB) OpenTransaction() (*Transaction, error) {
if err := db.ok(); err != nil {
return nil, err
}
// The write happen synchronously.
select {
case db.writeLockC <- struct{}{}:
case err := <-db.compPerErrC:
return nil, err
case _, _ = <-db.closeC:
return nil, ErrClosed
}
if db.tr != nil {
panic("leveldb: has open transaction")
}
// Flush current memdb.
if db.mem != nil && db.mem.Len() != 0 {
if _, err := db.rotateMem(0, true); err != nil {
return nil, err
}
}
tr := &Transaction{
db: db,
seq: db.seq,
mem: db.mpoolGet(0),
}
tr.mem.incref()
db.tr = tr
return tr, nil
}