You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
860 lines
18 KiB
860 lines
18 KiB
// Copyright (c) 2012, Suryandaru Triandana <[email protected]> |
|
// All rights reserved. |
|
// |
|
// Use of this source code is governed by a BSD-style license that can be |
|
// found in the LICENSE file. |
|
|
|
package leveldb |
|
|
|
import ( |
|
"sync" |
|
"time" |
|
|
|
"github.com/syndtr/goleveldb/leveldb/errors" |
|
"github.com/syndtr/goleveldb/leveldb/opt" |
|
"github.com/syndtr/goleveldb/leveldb/storage" |
|
) |
|
|
|
var ( |
|
errCompactionTransactExiting = errors.New("leveldb: compaction transact exiting") |
|
) |
|
|
|
type cStat struct { |
|
duration time.Duration |
|
read int64 |
|
write int64 |
|
} |
|
|
|
func (p *cStat) add(n *cStatStaging) { |
|
p.duration += n.duration |
|
p.read += n.read |
|
p.write += n.write |
|
} |
|
|
|
func (p *cStat) get() (duration time.Duration, read, write int64) { |
|
return p.duration, p.read, p.write |
|
} |
|
|
|
type cStatStaging struct { |
|
start time.Time |
|
duration time.Duration |
|
on bool |
|
read int64 |
|
write int64 |
|
} |
|
|
|
func (p *cStatStaging) startTimer() { |
|
if !p.on { |
|
p.start = time.Now() |
|
p.on = true |
|
} |
|
} |
|
|
|
func (p *cStatStaging) stopTimer() { |
|
if p.on { |
|
p.duration += time.Since(p.start) |
|
p.on = false |
|
} |
|
} |
|
|
|
type cStats struct { |
|
lk sync.Mutex |
|
stats []cStat |
|
} |
|
|
|
func (p *cStats) addStat(level int, n *cStatStaging) { |
|
p.lk.Lock() |
|
if level >= len(p.stats) { |
|
newStats := make([]cStat, level+1) |
|
copy(newStats, p.stats) |
|
p.stats = newStats |
|
} |
|
p.stats[level].add(n) |
|
p.lk.Unlock() |
|
} |
|
|
|
func (p *cStats) getStat(level int) (duration time.Duration, read, write int64) { |
|
p.lk.Lock() |
|
defer p.lk.Unlock() |
|
if level < len(p.stats) { |
|
return p.stats[level].get() |
|
} |
|
return |
|
} |
|
|
|
func (db *DB) compactionError() { |
|
var err error |
|
noerr: |
|
// No error. |
|
for { |
|
select { |
|
case err = <-db.compErrSetC: |
|
switch { |
|
case err == nil: |
|
case err == ErrReadOnly, errors.IsCorrupted(err): |
|
goto hasperr |
|
default: |
|
goto haserr |
|
} |
|
case <-db.closeC: |
|
return |
|
} |
|
} |
|
haserr: |
|
// Transient error. |
|
for { |
|
select { |
|
case db.compErrC <- err: |
|
case err = <-db.compErrSetC: |
|
switch { |
|
case err == nil: |
|
goto noerr |
|
case err == ErrReadOnly, errors.IsCorrupted(err): |
|
goto hasperr |
|
default: |
|
} |
|
case <-db.closeC: |
|
return |
|
} |
|
} |
|
hasperr: |
|
// Persistent error. |
|
for { |
|
select { |
|
case db.compErrC <- err: |
|
case db.compPerErrC <- err: |
|
case db.writeLockC <- struct{}{}: |
|
// Hold write lock, so that write won't pass-through. |
|
db.compWriteLocking = true |
|
case <-db.closeC: |
|
if db.compWriteLocking { |
|
// We should release the lock or Close will hang. |
|
<-db.writeLockC |
|
} |
|
return |
|
} |
|
} |
|
} |
|
|
|
type compactionTransactCounter int |
|
|
|
func (cnt *compactionTransactCounter) incr() { |
|
*cnt++ |
|
} |
|
|
|
type compactionTransactInterface interface { |
|
run(cnt *compactionTransactCounter) error |
|
revert() error |
|
} |
|
|
|
func (db *DB) compactionTransact(name string, t compactionTransactInterface) { |
|
defer func() { |
|
if x := recover(); x != nil { |
|
if x == errCompactionTransactExiting { |
|
if err := t.revert(); err != nil { |
|
db.logf("%s revert error %q", name, err) |
|
} |
|
} |
|
panic(x) |
|
} |
|
}() |
|
|
|
const ( |
|
backoffMin = 1 * time.Second |
|
backoffMax = 8 * time.Second |
|
backoffMul = 2 * time.Second |
|
) |
|
var ( |
|
backoff = backoffMin |
|
backoffT = time.NewTimer(backoff) |
|
lastCnt = compactionTransactCounter(0) |
|
|
|
disableBackoff = db.s.o.GetDisableCompactionBackoff() |
|
) |
|
for n := 0; ; n++ { |
|
// Check whether the DB is closed. |
|
if db.isClosed() { |
|
db.logf("%s exiting", name) |
|
db.compactionExitTransact() |
|
} else if n > 0 { |
|
db.logf("%s retrying N·%d", name, n) |
|
} |
|
|
|
// Execute. |
|
cnt := compactionTransactCounter(0) |
|
err := t.run(&cnt) |
|
if err != nil { |
|
db.logf("%s error I·%d %q", name, cnt, err) |
|
} |
|
|
|
// Set compaction error status. |
|
select { |
|
case db.compErrSetC <- err: |
|
case perr := <-db.compPerErrC: |
|
if err != nil { |
|
db.logf("%s exiting (persistent error %q)", name, perr) |
|
db.compactionExitTransact() |
|
} |
|
case <-db.closeC: |
|
db.logf("%s exiting", name) |
|
db.compactionExitTransact() |
|
} |
|
if err == nil { |
|
return |
|
} |
|
if errors.IsCorrupted(err) { |
|
db.logf("%s exiting (corruption detected)", name) |
|
db.compactionExitTransact() |
|
} |
|
|
|
if !disableBackoff { |
|
// Reset backoff duration if counter is advancing. |
|
if cnt > lastCnt { |
|
backoff = backoffMin |
|
lastCnt = cnt |
|
} |
|
|
|
// Backoff. |
|
backoffT.Reset(backoff) |
|
if backoff < backoffMax { |
|
backoff *= backoffMul |
|
if backoff > backoffMax { |
|
backoff = backoffMax |
|
} |
|
} |
|
select { |
|
case <-backoffT.C: |
|
case <-db.closeC: |
|
db.logf("%s exiting", name) |
|
db.compactionExitTransact() |
|
} |
|
} |
|
} |
|
} |
|
|
|
type compactionTransactFunc struct { |
|
runFunc func(cnt *compactionTransactCounter) error |
|
revertFunc func() error |
|
} |
|
|
|
func (t *compactionTransactFunc) run(cnt *compactionTransactCounter) error { |
|
return t.runFunc(cnt) |
|
} |
|
|
|
func (t *compactionTransactFunc) revert() error { |
|
if t.revertFunc != nil { |
|
return t.revertFunc() |
|
} |
|
return nil |
|
} |
|
|
|
func (db *DB) compactionTransactFunc(name string, run func(cnt *compactionTransactCounter) error, revert func() error) { |
|
db.compactionTransact(name, &compactionTransactFunc{run, revert}) |
|
} |
|
|
|
func (db *DB) compactionExitTransact() { |
|
panic(errCompactionTransactExiting) |
|
} |
|
|
|
func (db *DB) compactionCommit(name string, rec *sessionRecord) { |
|
db.compCommitLk.Lock() |
|
defer db.compCommitLk.Unlock() // Defer is necessary. |
|
db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error { |
|
return db.s.commit(rec) |
|
}, nil) |
|
} |
|
|
|
func (db *DB) memCompaction() { |
|
mdb := db.getFrozenMem() |
|
if mdb == nil { |
|
return |
|
} |
|
defer mdb.decref() |
|
|
|
db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(mdb.Size())) |
|
|
|
// Don't compact empty memdb. |
|
if mdb.Len() == 0 { |
|
db.logf("memdb@flush skipping") |
|
// drop frozen memdb |
|
db.dropFrozenMem() |
|
return |
|
} |
|
|
|
// Pause table compaction. |
|
resumeC := make(chan struct{}) |
|
select { |
|
case db.tcompPauseC <- (chan<- struct{})(resumeC): |
|
case <-db.compPerErrC: |
|
close(resumeC) |
|
resumeC = nil |
|
case <-db.closeC: |
|
db.compactionExitTransact() |
|
} |
|
|
|
var ( |
|
rec = &sessionRecord{} |
|
stats = &cStatStaging{} |
|
flushLevel int |
|
) |
|
|
|
// Generate tables. |
|
db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) { |
|
stats.startTimer() |
|
flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel) |
|
stats.stopTimer() |
|
return |
|
}, func() error { |
|
for _, r := range rec.addedTables { |
|
db.logf("memdb@flush revert @%d", r.num) |
|
if err := db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: r.num}); err != nil { |
|
return err |
|
} |
|
} |
|
return nil |
|
}) |
|
|
|
rec.setJournalNum(db.journalFd.Num) |
|
rec.setSeqNum(db.frozenSeq) |
|
|
|
// Commit. |
|
stats.startTimer() |
|
db.compactionCommit("memdb", rec) |
|
stats.stopTimer() |
|
|
|
db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration) |
|
|
|
for _, r := range rec.addedTables { |
|
stats.write += r.size |
|
} |
|
db.compStats.addStat(flushLevel, stats) |
|
|
|
// Drop frozen memdb. |
|
db.dropFrozenMem() |
|
|
|
// Resume table compaction. |
|
if resumeC != nil { |
|
select { |
|
case <-resumeC: |
|
close(resumeC) |
|
case <-db.closeC: |
|
db.compactionExitTransact() |
|
} |
|
} |
|
|
|
// Trigger table compaction. |
|
db.compTrigger(db.tcompCmdC) |
|
} |
|
|
|
type tableCompactionBuilder struct { |
|
db *DB |
|
s *session |
|
c *compaction |
|
rec *sessionRecord |
|
stat0, stat1 *cStatStaging |
|
|
|
snapHasLastUkey bool |
|
snapLastUkey []byte |
|
snapLastSeq uint64 |
|
snapIter int |
|
snapKerrCnt int |
|
snapDropCnt int |
|
|
|
kerrCnt int |
|
dropCnt int |
|
|
|
minSeq uint64 |
|
strict bool |
|
tableSize int |
|
|
|
tw *tWriter |
|
} |
|
|
|
func (b *tableCompactionBuilder) appendKV(key, value []byte) error { |
|
// Create new table if not already. |
|
if b.tw == nil { |
|
// Check for pause event. |
|
if b.db != nil { |
|
select { |
|
case ch := <-b.db.tcompPauseC: |
|
b.db.pauseCompaction(ch) |
|
case <-b.db.closeC: |
|
b.db.compactionExitTransact() |
|
default: |
|
} |
|
} |
|
|
|
// Create new table. |
|
var err error |
|
b.tw, err = b.s.tops.create() |
|
if err != nil { |
|
return err |
|
} |
|
} |
|
|
|
// Write key/value into table. |
|
return b.tw.append(key, value) |
|
} |
|
|
|
func (b *tableCompactionBuilder) needFlush() bool { |
|
return b.tw.tw.BytesLen() >= b.tableSize |
|
} |
|
|
|
func (b *tableCompactionBuilder) flush() error { |
|
t, err := b.tw.finish() |
|
if err != nil { |
|
return err |
|
} |
|
b.rec.addTableFile(b.c.sourceLevel+1, t) |
|
b.stat1.write += t.size |
|
b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, t.fd.Num, b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax) |
|
b.tw = nil |
|
return nil |
|
} |
|
|
|
func (b *tableCompactionBuilder) cleanup() { |
|
if b.tw != nil { |
|
b.tw.drop() |
|
b.tw = nil |
|
} |
|
} |
|
|
|
func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) error { |
|
snapResumed := b.snapIter > 0 |
|
hasLastUkey := b.snapHasLastUkey // The key might has zero length, so this is necessary. |
|
lastUkey := append([]byte{}, b.snapLastUkey...) |
|
lastSeq := b.snapLastSeq |
|
b.kerrCnt = b.snapKerrCnt |
|
b.dropCnt = b.snapDropCnt |
|
// Restore compaction state. |
|
b.c.restore() |
|
|
|
defer b.cleanup() |
|
|
|
b.stat1.startTimer() |
|
defer b.stat1.stopTimer() |
|
|
|
iter := b.c.newIterator() |
|
defer iter.Release() |
|
for i := 0; iter.Next(); i++ { |
|
// Incr transact counter. |
|
cnt.incr() |
|
|
|
// Skip until last state. |
|
if i < b.snapIter { |
|
continue |
|
} |
|
|
|
resumed := false |
|
if snapResumed { |
|
resumed = true |
|
snapResumed = false |
|
} |
|
|
|
ikey := iter.Key() |
|
ukey, seq, kt, kerr := parseInternalKey(ikey) |
|
|
|
if kerr == nil { |
|
shouldStop := !resumed && b.c.shouldStopBefore(ikey) |
|
|
|
if !hasLastUkey || b.s.icmp.uCompare(lastUkey, ukey) != 0 { |
|
// First occurrence of this user key. |
|
|
|
// Only rotate tables if ukey doesn't hop across. |
|
if b.tw != nil && (shouldStop || b.needFlush()) { |
|
if err := b.flush(); err != nil { |
|
return err |
|
} |
|
|
|
// Creates snapshot of the state. |
|
b.c.save() |
|
b.snapHasLastUkey = hasLastUkey |
|
b.snapLastUkey = append(b.snapLastUkey[:0], lastUkey...) |
|
b.snapLastSeq = lastSeq |
|
b.snapIter = i |
|
b.snapKerrCnt = b.kerrCnt |
|
b.snapDropCnt = b.dropCnt |
|
} |
|
|
|
hasLastUkey = true |
|
lastUkey = append(lastUkey[:0], ukey...) |
|
lastSeq = keyMaxSeq |
|
} |
|
|
|
switch { |
|
case lastSeq <= b.minSeq: |
|
// Dropped because newer entry for same user key exist |
|
fallthrough // (A) |
|
case kt == keyTypeDel && seq <= b.minSeq && b.c.baseLevelForKey(lastUkey): |
|
// For this user key: |
|
// (1) there is no data in higher levels |
|
// (2) data in lower levels will have larger seq numbers |
|
// (3) data in layers that are being compacted here and have |
|
// smaller seq numbers will be dropped in the next |
|
// few iterations of this loop (by rule (A) above). |
|
// Therefore this deletion marker is obsolete and can be dropped. |
|
lastSeq = seq |
|
b.dropCnt++ |
|
continue |
|
default: |
|
lastSeq = seq |
|
} |
|
} else { |
|
if b.strict { |
|
return kerr |
|
} |
|
|
|
// Don't drop corrupted keys. |
|
hasLastUkey = false |
|
lastUkey = lastUkey[:0] |
|
lastSeq = keyMaxSeq |
|
b.kerrCnt++ |
|
} |
|
|
|
if err := b.appendKV(ikey, iter.Value()); err != nil { |
|
return err |
|
} |
|
} |
|
|
|
if err := iter.Error(); err != nil { |
|
return err |
|
} |
|
|
|
// Finish last table. |
|
if b.tw != nil && !b.tw.empty() { |
|
return b.flush() |
|
} |
|
return nil |
|
} |
|
|
|
func (b *tableCompactionBuilder) revert() error { |
|
for _, at := range b.rec.addedTables { |
|
b.s.logf("table@build revert @%d", at.num) |
|
if err := b.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: at.num}); err != nil { |
|
return err |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
func (db *DB) tableCompaction(c *compaction, noTrivial bool) { |
|
defer c.release() |
|
|
|
rec := &sessionRecord{} |
|
rec.addCompPtr(c.sourceLevel, c.imax) |
|
|
|
if !noTrivial && c.trivial() { |
|
t := c.levels[0][0] |
|
db.logf("table@move L%d@%d -> L%d", c.sourceLevel, t.fd.Num, c.sourceLevel+1) |
|
rec.delTable(c.sourceLevel, t.fd.Num) |
|
rec.addTableFile(c.sourceLevel+1, t) |
|
db.compactionCommit("table-move", rec) |
|
return |
|
} |
|
|
|
var stats [2]cStatStaging |
|
for i, tables := range c.levels { |
|
for _, t := range tables { |
|
stats[i].read += t.size |
|
// Insert deleted tables into record |
|
rec.delTable(c.sourceLevel+i, t.fd.Num) |
|
} |
|
} |
|
sourceSize := int(stats[0].read + stats[1].read) |
|
minSeq := db.minSeq() |
|
db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.sourceLevel, len(c.levels[0]), c.sourceLevel+1, len(c.levels[1]), shortenb(sourceSize), minSeq) |
|
|
|
b := &tableCompactionBuilder{ |
|
db: db, |
|
s: db.s, |
|
c: c, |
|
rec: rec, |
|
stat1: &stats[1], |
|
minSeq: minSeq, |
|
strict: db.s.o.GetStrict(opt.StrictCompaction), |
|
tableSize: db.s.o.GetCompactionTableSize(c.sourceLevel + 1), |
|
} |
|
db.compactionTransact("table@build", b) |
|
|
|
// Commit. |
|
stats[1].startTimer() |
|
db.compactionCommit("table", rec) |
|
stats[1].stopTimer() |
|
|
|
resultSize := int(stats[1].write) |
|
db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration) |
|
|
|
// Save compaction stats |
|
for i := range stats { |
|
db.compStats.addStat(c.sourceLevel+1, &stats[i]) |
|
} |
|
} |
|
|
|
func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error { |
|
db.logf("table@compaction range L%d %q:%q", level, umin, umax) |
|
if level >= 0 { |
|
if c := db.s.getCompactionRange(level, umin, umax, true); c != nil { |
|
db.tableCompaction(c, true) |
|
} |
|
} else { |
|
// Retry until nothing to compact. |
|
for { |
|
compacted := false |
|
|
|
// Scan for maximum level with overlapped tables. |
|
v := db.s.version() |
|
m := 1 |
|
for i := m; i < len(v.levels); i++ { |
|
tables := v.levels[i] |
|
if tables.overlaps(db.s.icmp, umin, umax, false) { |
|
m = i |
|
} |
|
} |
|
v.release() |
|
|
|
for level := 0; level < m; level++ { |
|
if c := db.s.getCompactionRange(level, umin, umax, false); c != nil { |
|
db.tableCompaction(c, true) |
|
compacted = true |
|
} |
|
} |
|
|
|
if !compacted { |
|
break |
|
} |
|
} |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func (db *DB) tableAutoCompaction() { |
|
if c := db.s.pickCompaction(); c != nil { |
|
db.tableCompaction(c, false) |
|
} |
|
} |
|
|
|
func (db *DB) tableNeedCompaction() bool { |
|
v := db.s.version() |
|
defer v.release() |
|
return v.needCompaction() |
|
} |
|
|
|
// resumeWrite returns an indicator whether we should resume write operation if enough level0 files are compacted. |
|
func (db *DB) resumeWrite() bool { |
|
v := db.s.version() |
|
defer v.release() |
|
if v.tLen(0) < db.s.o.GetWriteL0PauseTrigger() { |
|
return true |
|
} |
|
return false |
|
} |
|
|
|
func (db *DB) pauseCompaction(ch chan<- struct{}) { |
|
select { |
|
case ch <- struct{}{}: |
|
case <-db.closeC: |
|
db.compactionExitTransact() |
|
} |
|
} |
|
|
|
type cCmd interface { |
|
ack(err error) |
|
} |
|
|
|
type cAuto struct { |
|
// Note for table compaction, an empty ackC represents it's a compaction waiting command. |
|
ackC chan<- error |
|
} |
|
|
|
func (r cAuto) ack(err error) { |
|
if r.ackC != nil { |
|
defer func() { |
|
recover() |
|
}() |
|
r.ackC <- err |
|
} |
|
} |
|
|
|
type cRange struct { |
|
level int |
|
min, max []byte |
|
ackC chan<- error |
|
} |
|
|
|
func (r cRange) ack(err error) { |
|
if r.ackC != nil { |
|
defer func() { |
|
recover() |
|
}() |
|
r.ackC <- err |
|
} |
|
} |
|
|
|
// This will trigger auto compaction but will not wait for it. |
|
func (db *DB) compTrigger(compC chan<- cCmd) { |
|
select { |
|
case compC <- cAuto{}: |
|
default: |
|
} |
|
} |
|
|
|
// This will trigger auto compaction and/or wait for all compaction to be done. |
|
func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) { |
|
ch := make(chan error) |
|
defer close(ch) |
|
// Send cmd. |
|
select { |
|
case compC <- cAuto{ch}: |
|
case err = <-db.compErrC: |
|
return |
|
case <-db.closeC: |
|
return ErrClosed |
|
} |
|
// Wait cmd. |
|
select { |
|
case err = <-ch: |
|
case err = <-db.compErrC: |
|
case <-db.closeC: |
|
return ErrClosed |
|
} |
|
return err |
|
} |
|
|
|
// Send range compaction request. |
|
func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (err error) { |
|
ch := make(chan error) |
|
defer close(ch) |
|
// Send cmd. |
|
select { |
|
case compC <- cRange{level, min, max, ch}: |
|
case err := <-db.compErrC: |
|
return err |
|
case <-db.closeC: |
|
return ErrClosed |
|
} |
|
// Wait cmd. |
|
select { |
|
case err = <-ch: |
|
case err = <-db.compErrC: |
|
case <-db.closeC: |
|
return ErrClosed |
|
} |
|
return err |
|
} |
|
|
|
func (db *DB) mCompaction() { |
|
var x cCmd |
|
|
|
defer func() { |
|
if x := recover(); x != nil { |
|
if x != errCompactionTransactExiting { |
|
panic(x) |
|
} |
|
} |
|
if x != nil { |
|
x.ack(ErrClosed) |
|
} |
|
db.closeW.Done() |
|
}() |
|
|
|
for { |
|
select { |
|
case x = <-db.mcompCmdC: |
|
switch x.(type) { |
|
case cAuto: |
|
db.memCompaction() |
|
x.ack(nil) |
|
x = nil |
|
default: |
|
panic("leveldb: unknown command") |
|
} |
|
case <-db.closeC: |
|
return |
|
} |
|
} |
|
} |
|
|
|
func (db *DB) tCompaction() { |
|
var ( |
|
x cCmd |
|
ackQ, waitQ []cCmd |
|
) |
|
|
|
defer func() { |
|
if x := recover(); x != nil { |
|
if x != errCompactionTransactExiting { |
|
panic(x) |
|
} |
|
} |
|
for i := range ackQ { |
|
ackQ[i].ack(ErrClosed) |
|
ackQ[i] = nil |
|
} |
|
for i := range waitQ { |
|
waitQ[i].ack(ErrClosed) |
|
waitQ[i] = nil |
|
} |
|
if x != nil { |
|
x.ack(ErrClosed) |
|
} |
|
db.closeW.Done() |
|
}() |
|
|
|
for { |
|
if db.tableNeedCompaction() { |
|
select { |
|
case x = <-db.tcompCmdC: |
|
case ch := <-db.tcompPauseC: |
|
db.pauseCompaction(ch) |
|
continue |
|
case <-db.closeC: |
|
return |
|
default: |
|
} |
|
// Resume write operation as soon as possible. |
|
if len(waitQ) > 0 && db.resumeWrite() { |
|
for i := range waitQ { |
|
waitQ[i].ack(nil) |
|
waitQ[i] = nil |
|
} |
|
waitQ = waitQ[:0] |
|
} |
|
} else { |
|
for i := range ackQ { |
|
ackQ[i].ack(nil) |
|
ackQ[i] = nil |
|
} |
|
ackQ = ackQ[:0] |
|
for i := range waitQ { |
|
waitQ[i].ack(nil) |
|
waitQ[i] = nil |
|
} |
|
waitQ = waitQ[:0] |
|
select { |
|
case x = <-db.tcompCmdC: |
|
case ch := <-db.tcompPauseC: |
|
db.pauseCompaction(ch) |
|
continue |
|
case <-db.closeC: |
|
return |
|
} |
|
} |
|
if x != nil { |
|
switch cmd := x.(type) { |
|
case cAuto: |
|
if cmd.ackC != nil { |
|
waitQ = append(waitQ, x) |
|
} else { |
|
ackQ = append(ackQ, x) |
|
} |
|
case cRange: |
|
x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max)) |
|
default: |
|
panic("leveldb: unknown command") |
|
} |
|
x = nil |
|
} |
|
db.tableAutoCompaction() |
|
} |
|
}
|
|
|