You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1135 lines
27 KiB
1135 lines
27 KiB
// Copyright (c) 2012, Suryandaru Triandana <[email protected]> |
|
// All rights reserved. |
|
// |
|
// Use of this source code is governed by a BSD-style license that can be |
|
// found in the LICENSE file. |
|
|
|
package table |
|
|
|
import ( |
|
"encoding/binary" |
|
"fmt" |
|
"io" |
|
"sort" |
|
"strings" |
|
"sync" |
|
|
|
"github.com/golang/snappy" |
|
|
|
"github.com/syndtr/goleveldb/leveldb/cache" |
|
"github.com/syndtr/goleveldb/leveldb/comparer" |
|
"github.com/syndtr/goleveldb/leveldb/errors" |
|
"github.com/syndtr/goleveldb/leveldb/filter" |
|
"github.com/syndtr/goleveldb/leveldb/iterator" |
|
"github.com/syndtr/goleveldb/leveldb/opt" |
|
"github.com/syndtr/goleveldb/leveldb/storage" |
|
"github.com/syndtr/goleveldb/leveldb/util" |
|
) |
|
|
|
// Reader errors. |
|
var ( |
|
ErrNotFound = errors.ErrNotFound |
|
ErrReaderReleased = errors.New("leveldb/table: reader released") |
|
ErrIterReleased = errors.New("leveldb/table: iterator released") |
|
) |
|
|
|
// ErrCorrupted describes error due to corruption. This error will be wrapped |
|
// with errors.ErrCorrupted. |
|
type ErrCorrupted struct { |
|
Pos int64 |
|
Size int64 |
|
Kind string |
|
Reason string |
|
} |
|
|
|
func (e *ErrCorrupted) Error() string { |
|
return fmt.Sprintf("leveldb/table: corruption on %s (pos=%d): %s", e.Kind, e.Pos, e.Reason) |
|
} |
|
|
|
func max(x, y int) int { |
|
if x > y { |
|
return x |
|
} |
|
return y |
|
} |
|
|
|
type block struct { |
|
bpool *util.BufferPool |
|
bh blockHandle |
|
data []byte |
|
restartsLen int |
|
restartsOffset int |
|
} |
|
|
|
func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) { |
|
index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool { |
|
offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) |
|
offset++ // shared always zero, since this is a restart point |
|
v1, n1 := binary.Uvarint(b.data[offset:]) // key length |
|
_, n2 := binary.Uvarint(b.data[offset+n1:]) // value length |
|
m := offset + n1 + n2 |
|
return cmp.Compare(b.data[m:m+int(v1)], key) > 0 |
|
}) + rstart - 1 |
|
if index < rstart { |
|
// The smallest key is greater-than key sought. |
|
index = rstart |
|
} |
|
offset = int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:])) |
|
return |
|
} |
|
|
|
func (b *block) restartIndex(rstart, rlimit, offset int) int { |
|
return sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool { |
|
return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) > offset |
|
}) + rstart - 1 |
|
} |
|
|
|
func (b *block) restartOffset(index int) int { |
|
return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:])) |
|
} |
|
|
|
func (b *block) entry(offset int) (key, value []byte, nShared, n int, err error) { |
|
if offset >= b.restartsOffset { |
|
if offset != b.restartsOffset { |
|
err = &ErrCorrupted{Reason: "entries offset not aligned"} |
|
} |
|
return |
|
} |
|
v0, n0 := binary.Uvarint(b.data[offset:]) // Shared prefix length |
|
v1, n1 := binary.Uvarint(b.data[offset+n0:]) // Key length |
|
v2, n2 := binary.Uvarint(b.data[offset+n0+n1:]) // Value length |
|
m := n0 + n1 + n2 |
|
n = m + int(v1) + int(v2) |
|
if n0 <= 0 || n1 <= 0 || n2 <= 0 || offset+n > b.restartsOffset { |
|
err = &ErrCorrupted{Reason: "entries corrupted"} |
|
return |
|
} |
|
key = b.data[offset+m : offset+m+int(v1)] |
|
value = b.data[offset+m+int(v1) : offset+n] |
|
nShared = int(v0) |
|
return |
|
} |
|
|
|
func (b *block) Release() { |
|
b.bpool.Put(b.data) |
|
b.bpool = nil |
|
b.data = nil |
|
} |
|
|
|
type dir int |
|
|
|
const ( |
|
dirReleased dir = iota - 1 |
|
dirSOI |
|
dirEOI |
|
dirBackward |
|
dirForward |
|
) |
|
|
|
type blockIter struct { |
|
tr *Reader |
|
block *block |
|
blockReleaser util.Releaser |
|
releaser util.Releaser |
|
key, value []byte |
|
offset int |
|
// Previous offset, only filled by Next. |
|
prevOffset int |
|
prevNode []int |
|
prevKeys []byte |
|
restartIndex int |
|
// Iterator direction. |
|
dir dir |
|
// Restart index slice range. |
|
riStart int |
|
riLimit int |
|
// Offset slice range. |
|
offsetStart int |
|
offsetRealStart int |
|
offsetLimit int |
|
// Error. |
|
err error |
|
} |
|
|
|
func (i *blockIter) sErr(err error) { |
|
i.err = err |
|
i.key = nil |
|
i.value = nil |
|
i.prevNode = nil |
|
i.prevKeys = nil |
|
} |
|
|
|
func (i *blockIter) reset() { |
|
if i.dir == dirBackward { |
|
i.prevNode = i.prevNode[:0] |
|
i.prevKeys = i.prevKeys[:0] |
|
} |
|
i.restartIndex = i.riStart |
|
i.offset = i.offsetStart |
|
i.dir = dirSOI |
|
i.key = i.key[:0] |
|
i.value = nil |
|
} |
|
|
|
func (i *blockIter) isFirst() bool { |
|
switch i.dir { |
|
case dirForward: |
|
return i.prevOffset == i.offsetRealStart |
|
case dirBackward: |
|
return len(i.prevNode) == 1 && i.restartIndex == i.riStart |
|
} |
|
return false |
|
} |
|
|
|
func (i *blockIter) isLast() bool { |
|
switch i.dir { |
|
case dirForward, dirBackward: |
|
return i.offset == i.offsetLimit |
|
} |
|
return false |
|
} |
|
|
|
func (i *blockIter) First() bool { |
|
if i.err != nil { |
|
return false |
|
} else if i.dir == dirReleased { |
|
i.err = ErrIterReleased |
|
return false |
|
} |
|
|
|
if i.dir == dirBackward { |
|
i.prevNode = i.prevNode[:0] |
|
i.prevKeys = i.prevKeys[:0] |
|
} |
|
i.dir = dirSOI |
|
return i.Next() |
|
} |
|
|
|
func (i *blockIter) Last() bool { |
|
if i.err != nil { |
|
return false |
|
} else if i.dir == dirReleased { |
|
i.err = ErrIterReleased |
|
return false |
|
} |
|
|
|
if i.dir == dirBackward { |
|
i.prevNode = i.prevNode[:0] |
|
i.prevKeys = i.prevKeys[:0] |
|
} |
|
i.dir = dirEOI |
|
return i.Prev() |
|
} |
|
|
|
func (i *blockIter) Seek(key []byte) bool { |
|
if i.err != nil { |
|
return false |
|
} else if i.dir == dirReleased { |
|
i.err = ErrIterReleased |
|
return false |
|
} |
|
|
|
ri, offset, err := i.block.seek(i.tr.cmp, i.riStart, i.riLimit, key) |
|
if err != nil { |
|
i.sErr(err) |
|
return false |
|
} |
|
i.restartIndex = ri |
|
i.offset = max(i.offsetStart, offset) |
|
if i.dir == dirSOI || i.dir == dirEOI { |
|
i.dir = dirForward |
|
} |
|
for i.Next() { |
|
if i.tr.cmp.Compare(i.key, key) >= 0 { |
|
return true |
|
} |
|
} |
|
return false |
|
} |
|
|
|
func (i *blockIter) Next() bool { |
|
if i.dir == dirEOI || i.err != nil { |
|
return false |
|
} else if i.dir == dirReleased { |
|
i.err = ErrIterReleased |
|
return false |
|
} |
|
|
|
if i.dir == dirSOI { |
|
i.restartIndex = i.riStart |
|
i.offset = i.offsetStart |
|
} else if i.dir == dirBackward { |
|
i.prevNode = i.prevNode[:0] |
|
i.prevKeys = i.prevKeys[:0] |
|
} |
|
for i.offset < i.offsetRealStart { |
|
key, value, nShared, n, err := i.block.entry(i.offset) |
|
if err != nil { |
|
i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err)) |
|
return false |
|
} |
|
if n == 0 { |
|
i.dir = dirEOI |
|
return false |
|
} |
|
i.key = append(i.key[:nShared], key...) |
|
i.value = value |
|
i.offset += n |
|
} |
|
if i.offset >= i.offsetLimit { |
|
i.dir = dirEOI |
|
if i.offset != i.offsetLimit { |
|
i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned")) |
|
} |
|
return false |
|
} |
|
key, value, nShared, n, err := i.block.entry(i.offset) |
|
if err != nil { |
|
i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err)) |
|
return false |
|
} |
|
if n == 0 { |
|
i.dir = dirEOI |
|
return false |
|
} |
|
i.key = append(i.key[:nShared], key...) |
|
i.value = value |
|
i.prevOffset = i.offset |
|
i.offset += n |
|
i.dir = dirForward |
|
return true |
|
} |
|
|
|
func (i *blockIter) Prev() bool { |
|
if i.dir == dirSOI || i.err != nil { |
|
return false |
|
} else if i.dir == dirReleased { |
|
i.err = ErrIterReleased |
|
return false |
|
} |
|
|
|
var ri int |
|
if i.dir == dirForward { |
|
// Change direction. |
|
i.offset = i.prevOffset |
|
if i.offset == i.offsetRealStart { |
|
i.dir = dirSOI |
|
return false |
|
} |
|
ri = i.block.restartIndex(i.restartIndex, i.riLimit, i.offset) |
|
i.dir = dirBackward |
|
} else if i.dir == dirEOI { |
|
// At the end of iterator. |
|
i.restartIndex = i.riLimit |
|
i.offset = i.offsetLimit |
|
if i.offset == i.offsetRealStart { |
|
i.dir = dirSOI |
|
return false |
|
} |
|
ri = i.riLimit - 1 |
|
i.dir = dirBackward |
|
} else if len(i.prevNode) == 1 { |
|
// This is the end of a restart range. |
|
i.offset = i.prevNode[0] |
|
i.prevNode = i.prevNode[:0] |
|
if i.restartIndex == i.riStart { |
|
i.dir = dirSOI |
|
return false |
|
} |
|
i.restartIndex-- |
|
ri = i.restartIndex |
|
} else { |
|
// In the middle of restart range, get from cache. |
|
n := len(i.prevNode) - 3 |
|
node := i.prevNode[n:] |
|
i.prevNode = i.prevNode[:n] |
|
// Get the key. |
|
ko := node[0] |
|
i.key = append(i.key[:0], i.prevKeys[ko:]...) |
|
i.prevKeys = i.prevKeys[:ko] |
|
// Get the value. |
|
vo := node[1] |
|
vl := vo + node[2] |
|
i.value = i.block.data[vo:vl] |
|
i.offset = vl |
|
return true |
|
} |
|
// Build entries cache. |
|
i.key = i.key[:0] |
|
i.value = nil |
|
offset := i.block.restartOffset(ri) |
|
if offset == i.offset { |
|
ri-- |
|
if ri < 0 { |
|
i.dir = dirSOI |
|
return false |
|
} |
|
offset = i.block.restartOffset(ri) |
|
} |
|
i.prevNode = append(i.prevNode, offset) |
|
for { |
|
key, value, nShared, n, err := i.block.entry(offset) |
|
if err != nil { |
|
i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err)) |
|
return false |
|
} |
|
if offset >= i.offsetRealStart { |
|
if i.value != nil { |
|
// Appends 3 variables: |
|
// 1. Previous keys offset |
|
// 2. Value offset in the data block |
|
// 3. Value length |
|
i.prevNode = append(i.prevNode, len(i.prevKeys), offset-len(i.value), len(i.value)) |
|
i.prevKeys = append(i.prevKeys, i.key...) |
|
} |
|
i.value = value |
|
} |
|
i.key = append(i.key[:nShared], key...) |
|
offset += n |
|
// Stop if target offset reached. |
|
if offset >= i.offset { |
|
if offset != i.offset { |
|
i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned")) |
|
return false |
|
} |
|
|
|
break |
|
} |
|
} |
|
i.restartIndex = ri |
|
i.offset = offset |
|
return true |
|
} |
|
|
|
func (i *blockIter) Key() []byte { |
|
if i.err != nil || i.dir <= dirEOI { |
|
return nil |
|
} |
|
return i.key |
|
} |
|
|
|
func (i *blockIter) Value() []byte { |
|
if i.err != nil || i.dir <= dirEOI { |
|
return nil |
|
} |
|
return i.value |
|
} |
|
|
|
func (i *blockIter) Release() { |
|
if i.dir != dirReleased { |
|
i.tr = nil |
|
i.block = nil |
|
i.prevNode = nil |
|
i.prevKeys = nil |
|
i.key = nil |
|
i.value = nil |
|
i.dir = dirReleased |
|
if i.blockReleaser != nil { |
|
i.blockReleaser.Release() |
|
i.blockReleaser = nil |
|
} |
|
if i.releaser != nil { |
|
i.releaser.Release() |
|
i.releaser = nil |
|
} |
|
} |
|
} |
|
|
|
func (i *blockIter) SetReleaser(releaser util.Releaser) { |
|
if i.dir == dirReleased { |
|
panic(util.ErrReleased) |
|
} |
|
if i.releaser != nil && releaser != nil { |
|
panic(util.ErrHasReleaser) |
|
} |
|
i.releaser = releaser |
|
} |
|
|
|
func (i *blockIter) Valid() bool { |
|
return i.err == nil && (i.dir == dirBackward || i.dir == dirForward) |
|
} |
|
|
|
func (i *blockIter) Error() error { |
|
return i.err |
|
} |
|
|
|
type filterBlock struct { |
|
bpool *util.BufferPool |
|
data []byte |
|
oOffset int |
|
baseLg uint |
|
filtersNum int |
|
} |
|
|
|
func (b *filterBlock) contains(filter filter.Filter, offset uint64, key []byte) bool { |
|
i := int(offset >> b.baseLg) |
|
if i < b.filtersNum { |
|
o := b.data[b.oOffset+i*4:] |
|
n := int(binary.LittleEndian.Uint32(o)) |
|
m := int(binary.LittleEndian.Uint32(o[4:])) |
|
if n < m && m <= b.oOffset { |
|
return filter.Contains(b.data[n:m], key) |
|
} else if n == m { |
|
return false |
|
} |
|
} |
|
return true |
|
} |
|
|
|
func (b *filterBlock) Release() { |
|
b.bpool.Put(b.data) |
|
b.bpool = nil |
|
b.data = nil |
|
} |
|
|
|
type indexIter struct { |
|
*blockIter |
|
tr *Reader |
|
slice *util.Range |
|
// Options |
|
fillCache bool |
|
} |
|
|
|
func (i *indexIter) Get() iterator.Iterator { |
|
value := i.Value() |
|
if value == nil { |
|
return nil |
|
} |
|
dataBH, n := decodeBlockHandle(value) |
|
if n == 0 { |
|
return iterator.NewEmptyIterator(i.tr.newErrCorruptedBH(i.tr.indexBH, "bad data block handle")) |
|
} |
|
|
|
var slice *util.Range |
|
if i.slice != nil && (i.blockIter.isFirst() || i.blockIter.isLast()) { |
|
slice = i.slice |
|
} |
|
return i.tr.getDataIterErr(dataBH, slice, i.tr.verifyChecksum, i.fillCache) |
|
} |
|
|
|
// Reader is a table reader. |
|
type Reader struct { |
|
mu sync.RWMutex |
|
fd storage.FileDesc |
|
reader io.ReaderAt |
|
cache *cache.NamespaceGetter |
|
err error |
|
bpool *util.BufferPool |
|
// Options |
|
o *opt.Options |
|
cmp comparer.Comparer |
|
filter filter.Filter |
|
verifyChecksum bool |
|
|
|
dataEnd int64 |
|
metaBH, indexBH, filterBH blockHandle |
|
indexBlock *block |
|
filterBlock *filterBlock |
|
} |
|
|
|
func (r *Reader) blockKind(bh blockHandle) string { |
|
switch bh.offset { |
|
case r.metaBH.offset: |
|
return "meta-block" |
|
case r.indexBH.offset: |
|
return "index-block" |
|
case r.filterBH.offset: |
|
if r.filterBH.length > 0 { |
|
return "filter-block" |
|
} |
|
} |
|
return "data-block" |
|
} |
|
|
|
func (r *Reader) newErrCorrupted(pos, size int64, kind, reason string) error { |
|
return &errors.ErrCorrupted{Fd: r.fd, Err: &ErrCorrupted{Pos: pos, Size: size, Kind: kind, Reason: reason}} |
|
} |
|
|
|
func (r *Reader) newErrCorruptedBH(bh blockHandle, reason string) error { |
|
return r.newErrCorrupted(int64(bh.offset), int64(bh.length), r.blockKind(bh), reason) |
|
} |
|
|
|
func (r *Reader) fixErrCorruptedBH(bh blockHandle, err error) error { |
|
if cerr, ok := err.(*ErrCorrupted); ok { |
|
cerr.Pos = int64(bh.offset) |
|
cerr.Size = int64(bh.length) |
|
cerr.Kind = r.blockKind(bh) |
|
return &errors.ErrCorrupted{Fd: r.fd, Err: cerr} |
|
} |
|
return err |
|
} |
|
|
|
func (r *Reader) readRawBlock(bh blockHandle, verifyChecksum bool) ([]byte, error) { |
|
data := r.bpool.Get(int(bh.length + blockTrailerLen)) |
|
if _, err := r.reader.ReadAt(data, int64(bh.offset)); err != nil && err != io.EOF { |
|
return nil, err |
|
} |
|
|
|
if verifyChecksum { |
|
n := bh.length + 1 |
|
checksum0 := binary.LittleEndian.Uint32(data[n:]) |
|
checksum1 := util.NewCRC(data[:n]).Value() |
|
if checksum0 != checksum1 { |
|
r.bpool.Put(data) |
|
return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("checksum mismatch, want=%#x got=%#x", checksum0, checksum1)) |
|
} |
|
} |
|
|
|
switch data[bh.length] { |
|
case blockTypeNoCompression: |
|
data = data[:bh.length] |
|
case blockTypeSnappyCompression: |
|
decLen, err := snappy.DecodedLen(data[:bh.length]) |
|
if err != nil { |
|
r.bpool.Put(data) |
|
return nil, r.newErrCorruptedBH(bh, err.Error()) |
|
} |
|
decData := r.bpool.Get(decLen) |
|
decData, err = snappy.Decode(decData, data[:bh.length]) |
|
r.bpool.Put(data) |
|
if err != nil { |
|
r.bpool.Put(decData) |
|
return nil, r.newErrCorruptedBH(bh, err.Error()) |
|
} |
|
data = decData |
|
default: |
|
r.bpool.Put(data) |
|
return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("unknown compression type %#x", data[bh.length])) |
|
} |
|
return data, nil |
|
} |
|
|
|
func (r *Reader) readBlock(bh blockHandle, verifyChecksum bool) (*block, error) { |
|
data, err := r.readRawBlock(bh, verifyChecksum) |
|
if err != nil { |
|
return nil, err |
|
} |
|
restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:])) |
|
b := &block{ |
|
bpool: r.bpool, |
|
bh: bh, |
|
data: data, |
|
restartsLen: restartsLen, |
|
restartsOffset: len(data) - (restartsLen+1)*4, |
|
} |
|
return b, nil |
|
} |
|
|
|
func (r *Reader) readBlockCached(bh blockHandle, verifyChecksum, fillCache bool) (*block, util.Releaser, error) { |
|
if r.cache != nil { |
|
var ( |
|
err error |
|
ch *cache.Handle |
|
) |
|
if fillCache { |
|
ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) { |
|
var b *block |
|
b, err = r.readBlock(bh, verifyChecksum) |
|
if err != nil { |
|
return 0, nil |
|
} |
|
return cap(b.data), b |
|
}) |
|
} else { |
|
ch = r.cache.Get(bh.offset, nil) |
|
} |
|
if ch != nil { |
|
b, ok := ch.Value().(*block) |
|
if !ok { |
|
ch.Release() |
|
return nil, nil, errors.New("leveldb/table: inconsistent block type") |
|
} |
|
return b, ch, err |
|
} else if err != nil { |
|
return nil, nil, err |
|
} |
|
} |
|
|
|
b, err := r.readBlock(bh, verifyChecksum) |
|
return b, b, err |
|
} |
|
|
|
func (r *Reader) readFilterBlock(bh blockHandle) (*filterBlock, error) { |
|
data, err := r.readRawBlock(bh, true) |
|
if err != nil { |
|
return nil, err |
|
} |
|
n := len(data) |
|
if n < 5 { |
|
return nil, r.newErrCorruptedBH(bh, "too short") |
|
} |
|
m := n - 5 |
|
oOffset := int(binary.LittleEndian.Uint32(data[m:])) |
|
if oOffset > m { |
|
return nil, r.newErrCorruptedBH(bh, "invalid data-offsets offset") |
|
} |
|
b := &filterBlock{ |
|
bpool: r.bpool, |
|
data: data, |
|
oOffset: oOffset, |
|
baseLg: uint(data[n-1]), |
|
filtersNum: (m - oOffset) / 4, |
|
} |
|
return b, nil |
|
} |
|
|
|
func (r *Reader) readFilterBlockCached(bh blockHandle, fillCache bool) (*filterBlock, util.Releaser, error) { |
|
if r.cache != nil { |
|
var ( |
|
err error |
|
ch *cache.Handle |
|
) |
|
if fillCache { |
|
ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) { |
|
var b *filterBlock |
|
b, err = r.readFilterBlock(bh) |
|
if err != nil { |
|
return 0, nil |
|
} |
|
return cap(b.data), b |
|
}) |
|
} else { |
|
ch = r.cache.Get(bh.offset, nil) |
|
} |
|
if ch != nil { |
|
b, ok := ch.Value().(*filterBlock) |
|
if !ok { |
|
ch.Release() |
|
return nil, nil, errors.New("leveldb/table: inconsistent block type") |
|
} |
|
return b, ch, err |
|
} else if err != nil { |
|
return nil, nil, err |
|
} |
|
} |
|
|
|
b, err := r.readFilterBlock(bh) |
|
return b, b, err |
|
} |
|
|
|
func (r *Reader) getIndexBlock(fillCache bool) (b *block, rel util.Releaser, err error) { |
|
if r.indexBlock == nil { |
|
return r.readBlockCached(r.indexBH, true, fillCache) |
|
} |
|
return r.indexBlock, util.NoopReleaser{}, nil |
|
} |
|
|
|
func (r *Reader) getFilterBlock(fillCache bool) (*filterBlock, util.Releaser, error) { |
|
if r.filterBlock == nil { |
|
return r.readFilterBlockCached(r.filterBH, fillCache) |
|
} |
|
return r.filterBlock, util.NoopReleaser{}, nil |
|
} |
|
|
|
func (r *Reader) newBlockIter(b *block, bReleaser util.Releaser, slice *util.Range, inclLimit bool) *blockIter { |
|
bi := &blockIter{ |
|
tr: r, |
|
block: b, |
|
blockReleaser: bReleaser, |
|
// Valid key should never be nil. |
|
key: make([]byte, 0), |
|
dir: dirSOI, |
|
riStart: 0, |
|
riLimit: b.restartsLen, |
|
offsetStart: 0, |
|
offsetRealStart: 0, |
|
offsetLimit: b.restartsOffset, |
|
} |
|
if slice != nil { |
|
if slice.Start != nil { |
|
if bi.Seek(slice.Start) { |
|
bi.riStart = b.restartIndex(bi.restartIndex, b.restartsLen, bi.prevOffset) |
|
bi.offsetStart = b.restartOffset(bi.riStart) |
|
bi.offsetRealStart = bi.prevOffset |
|
} else { |
|
bi.riStart = b.restartsLen |
|
bi.offsetStart = b.restartsOffset |
|
bi.offsetRealStart = b.restartsOffset |
|
} |
|
} |
|
if slice.Limit != nil { |
|
if bi.Seek(slice.Limit) && (!inclLimit || bi.Next()) { |
|
bi.offsetLimit = bi.prevOffset |
|
bi.riLimit = bi.restartIndex + 1 |
|
} |
|
} |
|
bi.reset() |
|
if bi.offsetStart > bi.offsetLimit { |
|
bi.sErr(errors.New("leveldb/table: invalid slice range")) |
|
} |
|
} |
|
return bi |
|
} |
|
|
|
func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator { |
|
b, rel, err := r.readBlockCached(dataBH, verifyChecksum, fillCache) |
|
if err != nil { |
|
return iterator.NewEmptyIterator(err) |
|
} |
|
return r.newBlockIter(b, rel, slice, false) |
|
} |
|
|
|
func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator { |
|
r.mu.RLock() |
|
defer r.mu.RUnlock() |
|
|
|
if r.err != nil { |
|
return iterator.NewEmptyIterator(r.err) |
|
} |
|
|
|
return r.getDataIter(dataBH, slice, verifyChecksum, fillCache) |
|
} |
|
|
|
// NewIterator creates an iterator from the table. |
|
// |
|
// Slice allows slicing the iterator to only contains keys in the given |
|
// range. A nil Range.Start is treated as a key before all keys in the |
|
// table. And a nil Range.Limit is treated as a key after all keys in |
|
// the table. |
|
// |
|
// The returned iterator is not safe for concurrent use and should be released |
|
// after use. |
|
// |
|
// Also read Iterator documentation of the leveldb/iterator package. |
|
func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { |
|
r.mu.RLock() |
|
defer r.mu.RUnlock() |
|
|
|
if r.err != nil { |
|
return iterator.NewEmptyIterator(r.err) |
|
} |
|
|
|
fillCache := !ro.GetDontFillCache() |
|
indexBlock, rel, err := r.getIndexBlock(fillCache) |
|
if err != nil { |
|
return iterator.NewEmptyIterator(err) |
|
} |
|
index := &indexIter{ |
|
blockIter: r.newBlockIter(indexBlock, rel, slice, true), |
|
tr: r, |
|
slice: slice, |
|
fillCache: !ro.GetDontFillCache(), |
|
} |
|
return iterator.NewIndexedIterator(index, opt.GetStrict(r.o, ro, opt.StrictReader)) |
|
} |
|
|
|
func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bool) (rkey, value []byte, err error) { |
|
r.mu.RLock() |
|
defer r.mu.RUnlock() |
|
|
|
if r.err != nil { |
|
err = r.err |
|
return |
|
} |
|
|
|
indexBlock, rel, err := r.getIndexBlock(true) |
|
if err != nil { |
|
return |
|
} |
|
defer rel.Release() |
|
|
|
index := r.newBlockIter(indexBlock, nil, nil, true) |
|
defer index.Release() |
|
|
|
if !index.Seek(key) { |
|
if err = index.Error(); err == nil { |
|
err = ErrNotFound |
|
} |
|
return |
|
} |
|
|
|
dataBH, n := decodeBlockHandle(index.Value()) |
|
if n == 0 { |
|
r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle") |
|
return nil, nil, r.err |
|
} |
|
|
|
// The filter should only used for exact match. |
|
if filtered && r.filter != nil { |
|
filterBlock, frel, ferr := r.getFilterBlock(true) |
|
if ferr == nil { |
|
if !filterBlock.contains(r.filter, dataBH.offset, key) { |
|
frel.Release() |
|
return nil, nil, ErrNotFound |
|
} |
|
frel.Release() |
|
} else if !errors.IsCorrupted(ferr) { |
|
return nil, nil, ferr |
|
} |
|
} |
|
|
|
data := r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache()) |
|
if !data.Seek(key) { |
|
data.Release() |
|
if err = data.Error(); err != nil { |
|
return |
|
} |
|
|
|
// The nearest greater-than key is the first key of the next block. |
|
if !index.Next() { |
|
if err = index.Error(); err == nil { |
|
err = ErrNotFound |
|
} |
|
return |
|
} |
|
|
|
dataBH, n = decodeBlockHandle(index.Value()) |
|
if n == 0 { |
|
r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle") |
|
return nil, nil, r.err |
|
} |
|
|
|
data = r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache()) |
|
if !data.Next() { |
|
data.Release() |
|
if err = data.Error(); err == nil { |
|
err = ErrNotFound |
|
} |
|
return |
|
} |
|
} |
|
|
|
// Key doesn't use block buffer, no need to copy the buffer. |
|
rkey = data.Key() |
|
if !noValue { |
|
if r.bpool == nil { |
|
value = data.Value() |
|
} else { |
|
// Value does use block buffer, and since the buffer will be |
|
// recycled, it need to be copied. |
|
value = append([]byte{}, data.Value()...) |
|
} |
|
} |
|
data.Release() |
|
return |
|
} |
|
|
|
// Find finds key/value pair whose key is greater than or equal to the |
|
// given key. It returns ErrNotFound if the table doesn't contain |
|
// such pair. |
|
// If filtered is true then the nearest 'block' will be checked against |
|
// 'filter data' (if present) and will immediately return ErrNotFound if |
|
// 'filter data' indicates that such pair doesn't exist. |
|
// |
|
// The caller may modify the contents of the returned slice as it is its |
|
// own copy. |
|
// It is safe to modify the contents of the argument after Find returns. |
|
func (r *Reader) Find(key []byte, filtered bool, ro *opt.ReadOptions) (rkey, value []byte, err error) { |
|
return r.find(key, filtered, ro, false) |
|
} |
|
|
|
// FindKey finds key that is greater than or equal to the given key. |
|
// It returns ErrNotFound if the table doesn't contain such key. |
|
// If filtered is true then the nearest 'block' will be checked against |
|
// 'filter data' (if present) and will immediately return ErrNotFound if |
|
// 'filter data' indicates that such key doesn't exist. |
|
// |
|
// The caller may modify the contents of the returned slice as it is its |
|
// own copy. |
|
// It is safe to modify the contents of the argument after Find returns. |
|
func (r *Reader) FindKey(key []byte, filtered bool, ro *opt.ReadOptions) (rkey []byte, err error) { |
|
rkey, _, err = r.find(key, filtered, ro, true) |
|
return |
|
} |
|
|
|
// Get gets the value for the given key. It returns errors.ErrNotFound |
|
// if the table does not contain the key. |
|
// |
|
// The caller may modify the contents of the returned slice as it is its |
|
// own copy. |
|
// It is safe to modify the contents of the argument after Find returns. |
|
func (r *Reader) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { |
|
r.mu.RLock() |
|
defer r.mu.RUnlock() |
|
|
|
if r.err != nil { |
|
err = r.err |
|
return |
|
} |
|
|
|
rkey, value, err := r.find(key, false, ro, false) |
|
if err == nil && r.cmp.Compare(rkey, key) != 0 { |
|
value = nil |
|
err = ErrNotFound |
|
} |
|
return |
|
} |
|
|
|
// OffsetOf returns approximate offset for the given key. |
|
// |
|
// It is safe to modify the contents of the argument after Get returns. |
|
func (r *Reader) OffsetOf(key []byte) (offset int64, err error) { |
|
r.mu.RLock() |
|
defer r.mu.RUnlock() |
|
|
|
if r.err != nil { |
|
err = r.err |
|
return |
|
} |
|
|
|
indexBlock, rel, err := r.readBlockCached(r.indexBH, true, true) |
|
if err != nil { |
|
return |
|
} |
|
defer rel.Release() |
|
|
|
index := r.newBlockIter(indexBlock, nil, nil, true) |
|
defer index.Release() |
|
if index.Seek(key) { |
|
dataBH, n := decodeBlockHandle(index.Value()) |
|
if n == 0 { |
|
r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle") |
|
return |
|
} |
|
offset = int64(dataBH.offset) |
|
return |
|
} |
|
err = index.Error() |
|
if err == nil { |
|
offset = r.dataEnd |
|
} |
|
return |
|
} |
|
|
|
// Release implements util.Releaser. |
|
// It also close the file if it is an io.Closer. |
|
func (r *Reader) Release() { |
|
r.mu.Lock() |
|
defer r.mu.Unlock() |
|
|
|
if closer, ok := r.reader.(io.Closer); ok { |
|
closer.Close() |
|
} |
|
if r.indexBlock != nil { |
|
r.indexBlock.Release() |
|
r.indexBlock = nil |
|
} |
|
if r.filterBlock != nil { |
|
r.filterBlock.Release() |
|
r.filterBlock = nil |
|
} |
|
r.reader = nil |
|
r.cache = nil |
|
r.bpool = nil |
|
r.err = ErrReaderReleased |
|
} |
|
|
|
// NewReader creates a new initialized table reader for the file. |
|
// The fi, cache and bpool is optional and can be nil. |
|
// |
|
// The returned table reader instance is safe for concurrent use. |
|
func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.NamespaceGetter, bpool *util.BufferPool, o *opt.Options) (*Reader, error) { |
|
if f == nil { |
|
return nil, errors.New("leveldb/table: nil file") |
|
} |
|
|
|
r := &Reader{ |
|
fd: fd, |
|
reader: f, |
|
cache: cache, |
|
bpool: bpool, |
|
o: o, |
|
cmp: o.GetComparer(), |
|
verifyChecksum: o.GetStrict(opt.StrictBlockChecksum), |
|
} |
|
|
|
if size < footerLen { |
|
r.err = r.newErrCorrupted(0, size, "table", "too small") |
|
return r, nil |
|
} |
|
|
|
footerPos := size - footerLen |
|
var footer [footerLen]byte |
|
if _, err := r.reader.ReadAt(footer[:], footerPos); err != nil && err != io.EOF { |
|
return nil, err |
|
} |
|
if string(footer[footerLen-len(magic):footerLen]) != magic { |
|
r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad magic number") |
|
return r, nil |
|
} |
|
|
|
var n int |
|
// Decode the metaindex block handle. |
|
r.metaBH, n = decodeBlockHandle(footer[:]) |
|
if n == 0 { |
|
r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad metaindex block handle") |
|
return r, nil |
|
} |
|
|
|
// Decode the index block handle. |
|
r.indexBH, n = decodeBlockHandle(footer[n:]) |
|
if n == 0 { |
|
r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad index block handle") |
|
return r, nil |
|
} |
|
|
|
// Read metaindex block. |
|
metaBlock, err := r.readBlock(r.metaBH, true) |
|
if err != nil { |
|
if errors.IsCorrupted(err) { |
|
r.err = err |
|
return r, nil |
|
} |
|
return nil, err |
|
} |
|
|
|
// Set data end. |
|
r.dataEnd = int64(r.metaBH.offset) |
|
|
|
// Read metaindex. |
|
metaIter := r.newBlockIter(metaBlock, nil, nil, true) |
|
for metaIter.Next() { |
|
key := string(metaIter.Key()) |
|
if !strings.HasPrefix(key, "filter.") { |
|
continue |
|
} |
|
fn := key[7:] |
|
if f0 := o.GetFilter(); f0 != nil && f0.Name() == fn { |
|
r.filter = f0 |
|
} else { |
|
for _, f0 := range o.GetAltFilters() { |
|
if f0.Name() == fn { |
|
r.filter = f0 |
|
break |
|
} |
|
} |
|
} |
|
if r.filter != nil { |
|
filterBH, n := decodeBlockHandle(metaIter.Value()) |
|
if n == 0 { |
|
continue |
|
} |
|
r.filterBH = filterBH |
|
// Update data end. |
|
r.dataEnd = int64(filterBH.offset) |
|
break |
|
} |
|
} |
|
metaIter.Release() |
|
metaBlock.Release() |
|
|
|
// Cache index and filter block locally, since we don't have global cache. |
|
if cache == nil { |
|
r.indexBlock, err = r.readBlock(r.indexBH, true) |
|
if err != nil { |
|
if errors.IsCorrupted(err) { |
|
r.err = err |
|
return r, nil |
|
} |
|
return nil, err |
|
} |
|
if r.filter != nil { |
|
r.filterBlock, err = r.readFilterBlock(r.filterBH) |
|
if err != nil { |
|
if !errors.IsCorrupted(err) { |
|
return nil, err |
|
} |
|
|
|
// Don't use filter then. |
|
r.filter = nil |
|
} |
|
} |
|
} |
|
|
|
return r, nil |
|
}
|
|
|