2014-07-06 14:46:48 +02:00
|
|
|
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
|
|
|
|
// All rights reserved.
|
|
|
|
//
|
|
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
|
|
// found in the LICENSE file.
|
|
|
|
|
|
|
|
package leveldb
|
|
|
|
|
|
|
|
import (
|
2014-11-18 13:24:42 +01:00
|
|
|
"fmt"
|
2014-07-06 14:46:48 +02:00
|
|
|
"io"
|
|
|
|
"os"
|
|
|
|
"sync"
|
|
|
|
"sync/atomic"
|
|
|
|
|
2014-11-18 13:24:42 +01:00
|
|
|
"github.com/syndtr/goleveldb/leveldb/errors"
|
2014-07-06 14:46:48 +02:00
|
|
|
"github.com/syndtr/goleveldb/leveldb/iterator"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/journal"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/storage"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/util"
|
|
|
|
)
|
|
|
|
|
2014-11-18 13:24:42 +01:00
|
|
|
type ErrManifestCorrupted struct {
|
|
|
|
Field string
|
|
|
|
Reason string
|
|
|
|
}
|
|
|
|
|
|
|
|
func (e *ErrManifestCorrupted) Error() string {
|
|
|
|
return fmt.Sprintf("leveldb: manifest corrupted (field '%s'): %s", e.Field, e.Reason)
|
|
|
|
}
|
|
|
|
|
|
|
|
func newErrManifestCorrupted(f storage.File, field, reason string) error {
|
|
|
|
return errors.NewErrCorrupted(f, &ErrManifestCorrupted{field, reason})
|
|
|
|
}
|
|
|
|
|
2014-07-06 14:46:48 +02:00
|
|
|
// session represent a persistent database session.
|
|
|
|
type session struct {
|
|
|
|
// Need 64-bit alignment.
|
2014-11-18 13:24:42 +01:00
|
|
|
stNextFileNum uint64 // current unused file number
|
2014-07-06 14:46:48 +02:00
|
|
|
stJournalNum uint64 // current journal file number; need external synchronization
|
|
|
|
stPrevJournalNum uint64 // prev journal file number; no longer used; for compatibility with older version of leveldb
|
2014-11-18 13:24:42 +01:00
|
|
|
stSeqNum uint64 // last mem compacted seq; need external synchronization
|
2014-07-06 14:46:48 +02:00
|
|
|
stTempFileNum uint64
|
|
|
|
|
|
|
|
stor storage.Storage
|
|
|
|
storLock util.Releaser
|
2014-11-18 13:24:42 +01:00
|
|
|
o *cachedOptions
|
2014-07-06 23:13:10 +02:00
|
|
|
icmp *iComparer
|
2014-07-06 14:46:48 +02:00
|
|
|
tops *tOps
|
|
|
|
|
|
|
|
manifest *journal.Writer
|
|
|
|
manifestWriter storage.Writer
|
|
|
|
manifestFile storage.File
|
|
|
|
|
2014-11-18 13:24:42 +01:00
|
|
|
stCompPtrs []iKey // compaction pointers; need external synchronization
|
|
|
|
stVersion *version // current version
|
|
|
|
vmu sync.Mutex
|
2014-07-06 14:46:48 +02:00
|
|
|
}
|
|
|
|
|
2014-07-23 08:31:36 +02:00
|
|
|
// Creates new initialized session instance.
|
2014-07-06 14:46:48 +02:00
|
|
|
func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
|
|
|
|
if stor == nil {
|
|
|
|
return nil, os.ErrInvalid
|
|
|
|
}
|
|
|
|
storLock, err := stor.Lock()
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
s = &session{
|
2014-11-18 13:24:42 +01:00
|
|
|
stor: stor,
|
|
|
|
storLock: storLock,
|
|
|
|
stCompPtrs: make([]iKey, o.GetNumLevel()),
|
2014-07-06 14:46:48 +02:00
|
|
|
}
|
|
|
|
s.setOptions(o)
|
2014-12-29 12:22:58 +01:00
|
|
|
s.tops = newTableOps(s)
|
2014-11-18 13:24:42 +01:00
|
|
|
s.setVersion(newVersion(s))
|
|
|
|
s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed")
|
2014-07-06 14:46:48 +02:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close session.
|
|
|
|
func (s *session) close() {
|
|
|
|
s.tops.close()
|
|
|
|
if s.manifest != nil {
|
|
|
|
s.manifest.Close()
|
|
|
|
}
|
|
|
|
if s.manifestWriter != nil {
|
|
|
|
s.manifestWriter.Close()
|
|
|
|
}
|
|
|
|
s.manifest = nil
|
|
|
|
s.manifestWriter = nil
|
|
|
|
s.manifestFile = nil
|
|
|
|
s.stVersion = nil
|
|
|
|
}
|
|
|
|
|
2014-07-23 08:31:36 +02:00
|
|
|
// Release session lock.
|
2014-07-06 14:46:48 +02:00
|
|
|
func (s *session) release() {
|
|
|
|
s.storLock.Release()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create a new database session; need external synchronization.
|
|
|
|
func (s *session) create() error {
|
|
|
|
// create manifest
|
|
|
|
return s.newManifest(nil, nil)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Recover a database session; need external synchronization.
|
|
|
|
func (s *session) recover() (err error) {
|
|
|
|
defer func() {
|
|
|
|
if os.IsNotExist(err) {
|
|
|
|
// Don't return os.ErrNotExist if the underlying storage contains
|
|
|
|
// other files that belong to LevelDB. So the DB won't get trashed.
|
|
|
|
if files, _ := s.stor.GetFiles(storage.TypeAll); len(files) > 0 {
|
2014-11-18 13:24:42 +01:00
|
|
|
err = &errors.ErrCorrupted{File: &storage.FileInfo{Type: storage.TypeManifest}, Err: &errors.ErrMissingFiles{}}
|
2014-07-06 14:46:48 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2014-11-18 13:24:42 +01:00
|
|
|
m, err := s.stor.GetManifest()
|
2014-07-06 14:46:48 +02:00
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2014-11-18 13:24:42 +01:00
|
|
|
reader, err := m.Open()
|
2014-07-06 14:46:48 +02:00
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
defer reader.Close()
|
|
|
|
strict := s.o.GetStrict(opt.StrictManifest)
|
2014-11-18 13:24:42 +01:00
|
|
|
jr := journal.NewReader(reader, dropper{s, m}, strict, true)
|
2014-07-06 14:46:48 +02:00
|
|
|
|
2014-11-18 13:24:42 +01:00
|
|
|
staging := s.stVersion.newStaging()
|
|
|
|
rec := &sessionRecord{numLevel: s.o.GetNumLevel()}
|
2014-07-06 14:46:48 +02:00
|
|
|
for {
|
|
|
|
var r io.Reader
|
|
|
|
r, err = jr.Next()
|
|
|
|
if err != nil {
|
|
|
|
if err == io.EOF {
|
|
|
|
err = nil
|
|
|
|
break
|
|
|
|
}
|
2014-11-18 13:24:42 +01:00
|
|
|
return errors.SetFile(err, m)
|
2014-07-06 14:46:48 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
err = rec.decode(r)
|
|
|
|
if err == nil {
|
|
|
|
// save compact pointers
|
2014-11-18 13:24:42 +01:00
|
|
|
for _, r := range rec.compPtrs {
|
|
|
|
s.stCompPtrs[r.level] = iKey(r.ikey)
|
2014-07-06 14:46:48 +02:00
|
|
|
}
|
|
|
|
// commit record to version staging
|
|
|
|
staging.commit(rec)
|
|
|
|
} else {
|
2014-11-18 13:24:42 +01:00
|
|
|
err = errors.SetFile(err, m)
|
|
|
|
if strict || !errors.IsCorrupted(err) {
|
|
|
|
return
|
|
|
|
} else {
|
|
|
|
s.logf("manifest error: %v (skipped)", errors.SetFile(err, m))
|
|
|
|
}
|
2014-07-06 14:46:48 +02:00
|
|
|
}
|
2014-11-18 13:24:42 +01:00
|
|
|
rec.resetCompPtrs()
|
2014-07-06 14:46:48 +02:00
|
|
|
rec.resetAddedTables()
|
|
|
|
rec.resetDeletedTables()
|
|
|
|
}
|
|
|
|
|
|
|
|
switch {
|
|
|
|
case !rec.has(recComparer):
|
2014-11-18 13:24:42 +01:00
|
|
|
return newErrManifestCorrupted(m, "comparer", "missing")
|
2014-07-06 23:13:10 +02:00
|
|
|
case rec.comparer != s.icmp.uName():
|
2014-11-18 13:24:42 +01:00
|
|
|
return newErrManifestCorrupted(m, "comparer", fmt.Sprintf("mismatch: want '%s', got '%s'", s.icmp.uName(), rec.comparer))
|
|
|
|
case !rec.has(recNextFileNum):
|
|
|
|
return newErrManifestCorrupted(m, "next-file-num", "missing")
|
2014-07-06 14:46:48 +02:00
|
|
|
case !rec.has(recJournalNum):
|
2014-11-18 13:24:42 +01:00
|
|
|
return newErrManifestCorrupted(m, "journal-file-num", "missing")
|
|
|
|
case !rec.has(recSeqNum):
|
|
|
|
return newErrManifestCorrupted(m, "seq-num", "missing")
|
2014-07-06 14:46:48 +02:00
|
|
|
}
|
|
|
|
|
2014-11-18 13:24:42 +01:00
|
|
|
s.manifestFile = m
|
2014-07-06 14:46:48 +02:00
|
|
|
s.setVersion(staging.finish())
|
2014-11-18 13:24:42 +01:00
|
|
|
s.setNextFileNum(rec.nextFileNum)
|
2014-07-06 14:46:48 +02:00
|
|
|
s.recordCommited(rec)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Commit session; need external synchronization.
|
|
|
|
func (s *session) commit(r *sessionRecord) (err error) {
|
2014-11-18 13:24:42 +01:00
|
|
|
v := s.version()
|
|
|
|
defer v.release()
|
|
|
|
|
2014-07-06 14:46:48 +02:00
|
|
|
// spawn new version based on current version
|
2014-11-18 13:24:42 +01:00
|
|
|
nv := v.spawn(r)
|
2014-07-06 14:46:48 +02:00
|
|
|
|
|
|
|
if s.manifest == nil {
|
|
|
|
// manifest journal writer not yet created, create one
|
|
|
|
err = s.newManifest(r, nv)
|
|
|
|
} else {
|
|
|
|
err = s.flushManifest(r)
|
|
|
|
}
|
|
|
|
|
|
|
|
// finally, apply new version if no error rise
|
|
|
|
if err == nil {
|
|
|
|
s.setVersion(nv)
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Pick a compaction based on current state; need external synchronization.
|
|
|
|
func (s *session) pickCompaction() *compaction {
|
2014-11-18 13:24:42 +01:00
|
|
|
v := s.version()
|
2014-07-06 14:46:48 +02:00
|
|
|
|
|
|
|
var level int
|
|
|
|
var t0 tFiles
|
|
|
|
if v.cScore >= 1 {
|
|
|
|
level = v.cLevel
|
2014-11-18 13:24:42 +01:00
|
|
|
cptr := s.stCompPtrs[level]
|
2014-07-23 08:31:36 +02:00
|
|
|
tables := v.tables[level]
|
|
|
|
for _, t := range tables {
|
|
|
|
if cptr == nil || s.icmp.Compare(t.imax, cptr) > 0 {
|
2014-07-06 14:46:48 +02:00
|
|
|
t0 = append(t0, t)
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if len(t0) == 0 {
|
2014-07-23 08:31:36 +02:00
|
|
|
t0 = append(t0, tables[0])
|
2014-07-06 14:46:48 +02:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
if p := atomic.LoadPointer(&v.cSeek); p != nil {
|
|
|
|
ts := (*tSet)(p)
|
|
|
|
level = ts.level
|
|
|
|
t0 = append(t0, ts.table)
|
|
|
|
} else {
|
2014-11-18 13:24:42 +01:00
|
|
|
v.release()
|
2014-07-06 14:46:48 +02:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-11-18 13:24:42 +01:00
|
|
|
return newCompaction(s, v, level, t0)
|
2014-07-06 14:46:48 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// Create compaction from given level and range; need external synchronization.
|
2014-07-23 08:31:36 +02:00
|
|
|
func (s *session) getCompactionRange(level int, umin, umax []byte) *compaction {
|
2014-11-18 13:24:42 +01:00
|
|
|
v := s.version()
|
2014-07-06 14:46:48 +02:00
|
|
|
|
2014-07-23 08:31:36 +02:00
|
|
|
t0 := v.tables[level].getOverlaps(nil, s.icmp, umin, umax, level == 0)
|
2014-07-06 14:46:48 +02:00
|
|
|
if len(t0) == 0 {
|
2014-11-18 13:24:42 +01:00
|
|
|
v.release()
|
2014-07-06 14:46:48 +02:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Avoid compacting too much in one shot in case the range is large.
|
|
|
|
// But we cannot do this for level-0 since level-0 files can overlap
|
|
|
|
// and we must not pick one file and drop another older file if the
|
|
|
|
// two files overlap.
|
|
|
|
if level > 0 {
|
2014-11-18 13:24:42 +01:00
|
|
|
limit := uint64(v.s.o.GetCompactionSourceLimit(level))
|
2014-07-06 14:46:48 +02:00
|
|
|
total := uint64(0)
|
|
|
|
for i, t := range t0 {
|
|
|
|
total += t.size
|
|
|
|
if total >= limit {
|
|
|
|
s.logf("table@compaction limiting F·%d -> F·%d", len(t0), i+1)
|
|
|
|
t0 = t0[:i+1]
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-11-18 13:24:42 +01:00
|
|
|
return newCompaction(s, v, level, t0)
|
|
|
|
}
|
|
|
|
|
|
|
|
func newCompaction(s *session, v *version, level int, t0 tFiles) *compaction {
|
|
|
|
c := &compaction{
|
|
|
|
s: s,
|
|
|
|
v: v,
|
|
|
|
level: level,
|
|
|
|
tables: [2]tFiles{t0, nil},
|
|
|
|
maxGPOverlaps: uint64(s.o.GetCompactionGPOverlaps(level)),
|
|
|
|
tPtrs: make([]int, s.o.GetNumLevel()),
|
|
|
|
}
|
2014-07-06 14:46:48 +02:00
|
|
|
c.expand()
|
2014-11-18 13:24:42 +01:00
|
|
|
c.save()
|
2014-07-06 14:46:48 +02:00
|
|
|
return c
|
|
|
|
}
|
|
|
|
|
2014-07-23 08:31:36 +02:00
|
|
|
// compaction represent a compaction state.
|
2014-07-06 14:46:48 +02:00
|
|
|
type compaction struct {
|
2014-07-23 08:31:36 +02:00
|
|
|
s *session
|
|
|
|
v *version
|
2014-07-06 14:46:48 +02:00
|
|
|
|
2014-11-18 13:24:42 +01:00
|
|
|
level int
|
|
|
|
tables [2]tFiles
|
|
|
|
maxGPOverlaps uint64
|
|
|
|
|
|
|
|
gp tFiles
|
|
|
|
gpi int
|
|
|
|
seenKey bool
|
|
|
|
gpOverlappedBytes uint64
|
|
|
|
imin, imax iKey
|
|
|
|
tPtrs []int
|
|
|
|
released bool
|
|
|
|
|
|
|
|
snapGPI int
|
|
|
|
snapSeenKey bool
|
|
|
|
snapGPOverlappedBytes uint64
|
|
|
|
snapTPtrs []int
|
|
|
|
}
|
2014-07-06 14:46:48 +02:00
|
|
|
|
2014-11-18 13:24:42 +01:00
|
|
|
func (c *compaction) save() {
|
|
|
|
c.snapGPI = c.gpi
|
|
|
|
c.snapSeenKey = c.seenKey
|
|
|
|
c.snapGPOverlappedBytes = c.gpOverlappedBytes
|
|
|
|
c.snapTPtrs = append(c.snapTPtrs[:0], c.tPtrs...)
|
|
|
|
}
|
2014-07-06 14:46:48 +02:00
|
|
|
|
2014-11-18 13:24:42 +01:00
|
|
|
func (c *compaction) restore() {
|
|
|
|
c.gpi = c.snapGPI
|
|
|
|
c.seenKey = c.snapSeenKey
|
|
|
|
c.gpOverlappedBytes = c.snapGPOverlappedBytes
|
|
|
|
c.tPtrs = append(c.tPtrs[:0], c.snapTPtrs...)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *compaction) release() {
|
|
|
|
if !c.released {
|
|
|
|
c.released = true
|
|
|
|
c.v.release()
|
|
|
|
}
|
2014-07-06 14:46:48 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// Expand compacted tables; need external synchronization.
|
|
|
|
func (c *compaction) expand() {
|
2014-11-18 13:24:42 +01:00
|
|
|
limit := uint64(c.s.o.GetCompactionExpandLimit(c.level))
|
|
|
|
vt0, vt1 := c.v.tables[c.level], c.v.tables[c.level+1]
|
2014-07-06 14:46:48 +02:00
|
|
|
|
|
|
|
t0, t1 := c.tables[0], c.tables[1]
|
2014-07-23 08:31:36 +02:00
|
|
|
imin, imax := t0.getRange(c.s.icmp)
|
2014-11-18 13:24:42 +01:00
|
|
|
// We expand t0 here just incase ukey hop across tables.
|
|
|
|
t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.level == 0)
|
|
|
|
if len(t0) != len(c.tables[0]) {
|
|
|
|
imin, imax = t0.getRange(c.s.icmp)
|
|
|
|
}
|
2014-07-23 08:31:36 +02:00
|
|
|
t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false)
|
|
|
|
// Get entire range covered by compaction.
|
|
|
|
amin, amax := append(t0, t1...).getRange(c.s.icmp)
|
2014-07-06 14:46:48 +02:00
|
|
|
|
|
|
|
// See if we can grow the number of inputs in "level" without
|
|
|
|
// changing the number of "level+1" files we pick up.
|
|
|
|
if len(t1) > 0 {
|
2014-11-18 13:24:42 +01:00
|
|
|
exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), c.level == 0)
|
|
|
|
if len(exp0) > len(t0) && t1.size()+exp0.size() < limit {
|
2014-07-23 08:31:36 +02:00
|
|
|
xmin, xmax := exp0.getRange(c.s.icmp)
|
|
|
|
exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false)
|
2014-07-06 14:46:48 +02:00
|
|
|
if len(exp1) == len(t1) {
|
2014-07-23 08:31:36 +02:00
|
|
|
c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)",
|
2014-11-18 13:24:42 +01:00
|
|
|
c.level, c.level+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())),
|
2014-07-06 14:46:48 +02:00
|
|
|
len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size())))
|
2014-07-23 08:31:36 +02:00
|
|
|
imin, imax = xmin, xmax
|
2014-07-06 14:46:48 +02:00
|
|
|
t0, t1 = exp0, exp1
|
2014-07-23 08:31:36 +02:00
|
|
|
amin, amax = append(t0, t1...).getRange(c.s.icmp)
|
2014-07-06 14:46:48 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Compute the set of grandparent files that overlap this compaction
|
|
|
|
// (parent == level+1; grandparent == level+2)
|
2014-11-18 13:24:42 +01:00
|
|
|
if c.level+2 < c.s.o.GetNumLevel() {
|
|
|
|
c.gp = c.v.tables[c.level+2].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false)
|
2014-07-06 14:46:48 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
c.tables[0], c.tables[1] = t0, t1
|
2014-07-23 08:31:36 +02:00
|
|
|
c.imin, c.imax = imin, imax
|
2014-07-06 14:46:48 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// Check whether compaction is trivial.
|
|
|
|
func (c *compaction) trivial() bool {
|
2014-11-18 13:24:42 +01:00
|
|
|
return len(c.tables[0]) == 1 && len(c.tables[1]) == 0 && c.gp.size() <= c.maxGPOverlaps
|
2014-07-06 14:46:48 +02:00
|
|
|
}
|
|
|
|
|
2014-07-23 08:31:36 +02:00
|
|
|
func (c *compaction) baseLevelForKey(ukey []byte) bool {
|
|
|
|
for level, tables := range c.v.tables[c.level+2:] {
|
|
|
|
for c.tPtrs[level] < len(tables) {
|
|
|
|
t := tables[c.tPtrs[level]]
|
|
|
|
if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 {
|
|
|
|
// We've advanced far enough.
|
|
|
|
if c.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 {
|
|
|
|
// Key falls in this file's range, so definitely not base level.
|
2014-07-06 14:46:48 +02:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
break
|
|
|
|
}
|
|
|
|
c.tPtrs[level]++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2014-07-23 08:31:36 +02:00
|
|
|
func (c *compaction) shouldStopBefore(ikey iKey) bool {
|
2014-11-18 13:24:42 +01:00
|
|
|
for ; c.gpi < len(c.gp); c.gpi++ {
|
|
|
|
gp := c.gp[c.gpi]
|
2014-07-23 08:31:36 +02:00
|
|
|
if c.s.icmp.Compare(ikey, gp.imax) <= 0 {
|
2014-07-06 14:46:48 +02:00
|
|
|
break
|
|
|
|
}
|
|
|
|
if c.seenKey {
|
2014-11-18 13:24:42 +01:00
|
|
|
c.gpOverlappedBytes += gp.size
|
2014-07-06 14:46:48 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
c.seenKey = true
|
|
|
|
|
2014-11-18 13:24:42 +01:00
|
|
|
if c.gpOverlappedBytes > c.maxGPOverlaps {
|
2014-07-23 08:31:36 +02:00
|
|
|
// Too much overlap for current output; start new output.
|
2014-11-18 13:24:42 +01:00
|
|
|
c.gpOverlappedBytes = 0
|
2014-07-06 14:46:48 +02:00
|
|
|
return true
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2014-07-23 08:31:36 +02:00
|
|
|
// Creates an iterator.
|
2014-07-06 14:46:48 +02:00
|
|
|
func (c *compaction) newIterator() iterator.Iterator {
|
2014-07-23 08:31:36 +02:00
|
|
|
// Creates iterator slice.
|
|
|
|
icap := len(c.tables)
|
2014-07-06 14:46:48 +02:00
|
|
|
if c.level == 0 {
|
2014-07-23 08:31:36 +02:00
|
|
|
// Special case for level-0
|
2014-07-06 14:46:48 +02:00
|
|
|
icap = len(c.tables[0]) + 1
|
|
|
|
}
|
|
|
|
its := make([]iterator.Iterator, 0, icap)
|
|
|
|
|
2014-07-23 08:31:36 +02:00
|
|
|
// Options.
|
2014-07-06 14:46:48 +02:00
|
|
|
ro := &opt.ReadOptions{
|
|
|
|
DontFillCache: true,
|
2014-11-18 13:24:42 +01:00
|
|
|
Strict: opt.StrictOverride,
|
|
|
|
}
|
|
|
|
strict := c.s.o.GetStrict(opt.StrictCompaction)
|
|
|
|
if strict {
|
|
|
|
ro.Strict |= opt.StrictReader
|
2014-07-06 14:46:48 +02:00
|
|
|
}
|
|
|
|
|
2014-07-23 08:31:36 +02:00
|
|
|
for i, tables := range c.tables {
|
|
|
|
if len(tables) == 0 {
|
2014-07-06 14:46:48 +02:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2014-07-23 08:31:36 +02:00
|
|
|
// Level-0 is not sorted and may overlaps each other.
|
|
|
|
if c.level+i == 0 {
|
|
|
|
for _, t := range tables {
|
|
|
|
its = append(its, c.s.tops.newIterator(t, nil, ro))
|
2014-07-06 14:46:48 +02:00
|
|
|
}
|
|
|
|
} else {
|
2014-11-18 13:24:42 +01:00
|
|
|
it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict)
|
2014-07-06 14:46:48 +02:00
|
|
|
its = append(its, it)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-11-18 13:24:42 +01:00
|
|
|
return iterator.NewMergedIterator(its, c.s.icmp, strict)
|
2014-07-06 14:46:48 +02:00
|
|
|
}
|