You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
271 lines
5.6 KiB
271 lines
5.6 KiB
// Copyright (c) 2012, Suryandaru Triandana <[email protected]> |
|
// All rights reserved. |
|
// |
|
// Use of this source code is governed by a BSD-style license that can be |
|
// found in the LICENSE file. |
|
|
|
package leveldb |
|
|
|
import ( |
|
"fmt" |
|
"sync/atomic" |
|
|
|
"github.com/syndtr/goleveldb/leveldb/journal" |
|
"github.com/syndtr/goleveldb/leveldb/storage" |
|
) |
|
|
|
// Logging. |
|
|
|
type dropper struct { |
|
s *session |
|
fd storage.FileDesc |
|
} |
|
|
|
func (d dropper) Drop(err error) { |
|
if e, ok := err.(*journal.ErrCorrupted); ok { |
|
d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(e.Size), e.Reason) |
|
} else { |
|
d.s.logf("journal@drop %s-%d %q", d.fd.Type, d.fd.Num, err) |
|
} |
|
} |
|
|
|
func (s *session) log(v ...interface{}) { s.stor.Log(fmt.Sprint(v...)) } |
|
func (s *session) logf(format string, v ...interface{}) { s.stor.Log(fmt.Sprintf(format, v...)) } |
|
|
|
// File utils. |
|
|
|
func (s *session) newTemp() storage.FileDesc { |
|
num := atomic.AddInt64(&s.stTempFileNum, 1) - 1 |
|
return storage.FileDesc{storage.TypeTemp, num} |
|
} |
|
|
|
func (s *session) addFileRef(fd storage.FileDesc, ref int) int { |
|
ref += s.fileRef[fd.Num] |
|
if ref > 0 { |
|
s.fileRef[fd.Num] = ref |
|
} else if ref == 0 { |
|
delete(s.fileRef, fd.Num) |
|
} else { |
|
panic(fmt.Sprintf("negative ref: %v", fd)) |
|
} |
|
return ref |
|
} |
|
|
|
// Session state. |
|
|
|
// Get current version. This will incr version ref, must call |
|
// version.release (exactly once) after use. |
|
func (s *session) version() *version { |
|
s.vmu.Lock() |
|
defer s.vmu.Unlock() |
|
s.stVersion.incref() |
|
return s.stVersion |
|
} |
|
|
|
func (s *session) tLen(level int) int { |
|
s.vmu.Lock() |
|
defer s.vmu.Unlock() |
|
return s.stVersion.tLen(level) |
|
} |
|
|
|
// Set current version to v. |
|
func (s *session) setVersion(v *version) { |
|
s.vmu.Lock() |
|
defer s.vmu.Unlock() |
|
// Hold by session. It is important to call this first before releasing |
|
// current version, otherwise the still used files might get released. |
|
v.incref() |
|
if s.stVersion != nil { |
|
// Release current version. |
|
s.stVersion.releaseNB() |
|
} |
|
s.stVersion = v |
|
} |
|
|
|
// Get current unused file number. |
|
func (s *session) nextFileNum() int64 { |
|
return atomic.LoadInt64(&s.stNextFileNum) |
|
} |
|
|
|
// Set current unused file number to num. |
|
func (s *session) setNextFileNum(num int64) { |
|
atomic.StoreInt64(&s.stNextFileNum, num) |
|
} |
|
|
|
// Mark file number as used. |
|
func (s *session) markFileNum(num int64) { |
|
nextFileNum := num + 1 |
|
for { |
|
old, x := s.stNextFileNum, nextFileNum |
|
if old > x { |
|
x = old |
|
} |
|
if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) { |
|
break |
|
} |
|
} |
|
} |
|
|
|
// Allocate a file number. |
|
func (s *session) allocFileNum() int64 { |
|
return atomic.AddInt64(&s.stNextFileNum, 1) - 1 |
|
} |
|
|
|
// Reuse given file number. |
|
func (s *session) reuseFileNum(num int64) { |
|
for { |
|
old, x := s.stNextFileNum, num |
|
if old != x+1 { |
|
x = old |
|
} |
|
if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) { |
|
break |
|
} |
|
} |
|
} |
|
|
|
// Set compaction ptr at given level; need external synchronization. |
|
func (s *session) setCompPtr(level int, ik internalKey) { |
|
if level >= len(s.stCompPtrs) { |
|
newCompPtrs := make([]internalKey, level+1) |
|
copy(newCompPtrs, s.stCompPtrs) |
|
s.stCompPtrs = newCompPtrs |
|
} |
|
s.stCompPtrs[level] = append(internalKey{}, ik...) |
|
} |
|
|
|
// Get compaction ptr at given level; need external synchronization. |
|
func (s *session) getCompPtr(level int) internalKey { |
|
if level >= len(s.stCompPtrs) { |
|
return nil |
|
} |
|
return s.stCompPtrs[level] |
|
} |
|
|
|
// Manifest related utils. |
|
|
|
// Fill given session record obj with current states; need external |
|
// synchronization. |
|
func (s *session) fillRecord(r *sessionRecord, snapshot bool) { |
|
r.setNextFileNum(s.nextFileNum()) |
|
|
|
if snapshot { |
|
if !r.has(recJournalNum) { |
|
r.setJournalNum(s.stJournalNum) |
|
} |
|
|
|
if !r.has(recSeqNum) { |
|
r.setSeqNum(s.stSeqNum) |
|
} |
|
|
|
for level, ik := range s.stCompPtrs { |
|
if ik != nil { |
|
r.addCompPtr(level, ik) |
|
} |
|
} |
|
|
|
r.setComparer(s.icmp.uName()) |
|
} |
|
} |
|
|
|
// Mark if record has been committed, this will update session state; |
|
// need external synchronization. |
|
func (s *session) recordCommited(rec *sessionRecord) { |
|
if rec.has(recJournalNum) { |
|
s.stJournalNum = rec.journalNum |
|
} |
|
|
|
if rec.has(recPrevJournalNum) { |
|
s.stPrevJournalNum = rec.prevJournalNum |
|
} |
|
|
|
if rec.has(recSeqNum) { |
|
s.stSeqNum = rec.seqNum |
|
} |
|
|
|
for _, r := range rec.compPtrs { |
|
s.setCompPtr(r.level, internalKey(r.ikey)) |
|
} |
|
} |
|
|
|
// Create a new manifest file; need external synchronization. |
|
func (s *session) newManifest(rec *sessionRecord, v *version) (err error) { |
|
fd := storage.FileDesc{storage.TypeManifest, s.allocFileNum()} |
|
writer, err := s.stor.Create(fd) |
|
if err != nil { |
|
return |
|
} |
|
jw := journal.NewWriter(writer) |
|
|
|
if v == nil { |
|
v = s.version() |
|
defer v.release() |
|
} |
|
if rec == nil { |
|
rec = &sessionRecord{} |
|
} |
|
s.fillRecord(rec, true) |
|
v.fillRecord(rec) |
|
|
|
defer func() { |
|
if err == nil { |
|
s.recordCommited(rec) |
|
if s.manifest != nil { |
|
s.manifest.Close() |
|
} |
|
if s.manifestWriter != nil { |
|
s.manifestWriter.Close() |
|
} |
|
if !s.manifestFd.Zero() { |
|
s.stor.Remove(s.manifestFd) |
|
} |
|
s.manifestFd = fd |
|
s.manifestWriter = writer |
|
s.manifest = jw |
|
} else { |
|
writer.Close() |
|
s.stor.Remove(fd) |
|
s.reuseFileNum(fd.Num) |
|
} |
|
}() |
|
|
|
w, err := jw.Next() |
|
if err != nil { |
|
return |
|
} |
|
err = rec.encode(w) |
|
if err != nil { |
|
return |
|
} |
|
err = jw.Flush() |
|
if err != nil { |
|
return |
|
} |
|
err = s.stor.SetMeta(fd) |
|
return |
|
} |
|
|
|
// Flush record to disk. |
|
func (s *session) flushManifest(rec *sessionRecord) (err error) { |
|
s.fillRecord(rec, false) |
|
w, err := s.manifest.Next() |
|
if err != nil { |
|
return |
|
} |
|
err = rec.encode(w) |
|
if err != nil { |
|
return |
|
} |
|
err = s.manifest.Flush() |
|
if err != nil { |
|
return |
|
} |
|
if !s.o.GetNoSync() { |
|
err = s.manifestWriter.Sync() |
|
if err != nil { |
|
return |
|
} |
|
} |
|
s.recordCommited(rec) |
|
return |
|
}
|
|
|