You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
275 lines
7.7 KiB
275 lines
7.7 KiB
// Copyright (C) 2016 The GoHBase Authors. All rights reserved. |
|
// This file is part of GoHBase. |
|
// Use of this source code is governed by the Apache License 2.0 |
|
// that can be found in the COPYING file. |
|
|
|
package gohbase |
|
|
|
import ( |
|
"bytes" |
|
"io" |
|
"sync" |
|
|
|
"github.com/cznic/b" |
|
log "github.com/sirupsen/logrus" |
|
"github.com/tsuna/gohbase/hrpc" |
|
) |
|
|
|
// clientRegionCache is client -> region cache. Used to quickly |
|
// look up all the regioninfos that map to a specific client |
|
type clientRegionCache struct { |
|
m sync.RWMutex |
|
|
|
regions map[hrpc.RegionClient]map[hrpc.RegionInfo]struct{} |
|
} |
|
|
|
// put caches client and associates a region with it. Returns a client that is in cache. |
|
// TODO: obvious place for optimization (use map with address as key to lookup exisiting clients) |
|
func (rcc *clientRegionCache) put(c hrpc.RegionClient, r hrpc.RegionInfo) hrpc.RegionClient { |
|
rcc.m.Lock() |
|
for existingClient, regions := range rcc.regions { |
|
// check if client already exists, checking by host and port |
|
// because concurrent callers might try to put the same client |
|
if c.Addr() == existingClient.Addr() { |
|
// check client already knows about the region, checking |
|
// by pointer is enough because we make sure that there are |
|
// no regions with the same name around |
|
if _, ok := regions[r]; !ok { |
|
regions[r] = struct{}{} |
|
} |
|
rcc.m.Unlock() |
|
|
|
log.WithFields(log.Fields{ |
|
"existingClient": existingClient, |
|
"client": c, |
|
}).Debug("region client is already in client's cache") |
|
return existingClient |
|
} |
|
} |
|
|
|
// no such client yet |
|
rcc.regions[c] = map[hrpc.RegionInfo]struct{}{r: struct{}{}} |
|
rcc.m.Unlock() |
|
|
|
log.WithField("client", c).Info("added new region client") |
|
return c |
|
} |
|
|
|
func (rcc *clientRegionCache) del(r hrpc.RegionInfo) { |
|
rcc.m.Lock() |
|
c := r.Client() |
|
if c != nil { |
|
r.SetClient(nil) |
|
regions := rcc.regions[c] |
|
delete(regions, r) |
|
} |
|
rcc.m.Unlock() |
|
} |
|
|
|
func (rcc *clientRegionCache) closeAll() { |
|
rcc.m.Lock() |
|
for client, regions := range rcc.regions { |
|
for region := range regions { |
|
region.MarkUnavailable() |
|
region.SetClient(nil) |
|
} |
|
client.Close() |
|
} |
|
rcc.m.Unlock() |
|
} |
|
|
|
func (rcc *clientRegionCache) clientDown(c hrpc.RegionClient) map[hrpc.RegionInfo]struct{} { |
|
rcc.m.Lock() |
|
downregions, ok := rcc.regions[c] |
|
delete(rcc.regions, c) |
|
rcc.m.Unlock() |
|
|
|
if ok { |
|
log.WithField("client", c).Info("removed region client") |
|
} |
|
return downregions |
|
} |
|
|
|
// TODO: obvious place for optimization (use map with address as key to lookup exisiting clients) |
|
func (rcc *clientRegionCache) checkForClient(addr string) hrpc.RegionClient { |
|
rcc.m.RLock() |
|
|
|
for client := range rcc.regions { |
|
if client.Addr() == addr { |
|
rcc.m.RUnlock() |
|
return client |
|
} |
|
} |
|
|
|
rcc.m.RUnlock() |
|
return nil |
|
} |
|
|
|
// key -> region cache. |
|
type keyRegionCache struct { |
|
m sync.RWMutex |
|
|
|
// Maps a []byte of a region start key to a hrpc.RegionInfo |
|
regions *b.Tree |
|
} |
|
|
|
func (krc *keyRegionCache) get(key []byte) ([]byte, hrpc.RegionInfo) { |
|
krc.m.RLock() |
|
|
|
enum, ok := krc.regions.Seek(key) |
|
if ok { |
|
krc.m.RUnlock() |
|
log.Fatalf("WTF: got exact match for region search key %q", key) |
|
return nil, nil |
|
} |
|
k, v, err := enum.Prev() |
|
enum.Close() |
|
|
|
krc.m.RUnlock() |
|
|
|
if err == io.EOF { |
|
// we are the beginning of the tree |
|
return nil, nil |
|
} |
|
return k.([]byte), v.(hrpc.RegionInfo) |
|
} |
|
|
|
func isRegionOverlap(regA, regB hrpc.RegionInfo) bool { |
|
// if region's stop key is empty, it's assumed to be the greatest key |
|
return bytes.Equal(regA.Namespace(), regB.Namespace()) && |
|
bytes.Equal(regA.Table(), regB.Table()) && |
|
(len(regB.StopKey()) == 0 || bytes.Compare(regA.StartKey(), regB.StopKey()) < 0) && |
|
(len(regA.StopKey()) == 0 || bytes.Compare(regA.StopKey(), regB.StartKey()) > 0) |
|
} |
|
|
|
func (krc *keyRegionCache) getOverlaps(reg hrpc.RegionInfo) []hrpc.RegionInfo { |
|
var overlaps []hrpc.RegionInfo |
|
var v interface{} |
|
var err error |
|
|
|
// deal with empty tree in the beginning so that we don't have to check |
|
// EOF errors for enum later |
|
if krc.regions.Len() == 0 { |
|
return overlaps |
|
} |
|
|
|
// check if key created from new region falls into any cached regions |
|
key := createRegionSearchKey(fullyQualifiedTable(reg), reg.StartKey()) |
|
enum, ok := krc.regions.Seek(key) |
|
if ok { |
|
log.Fatalf("WTF: found a region with exact name as the search key %q", key) |
|
} |
|
|
|
// case 1: landed before the first region in cache |
|
// enum.Prev() returns io.EOF |
|
// enum.Next() returns io.EOF |
|
// SeekFirst() + enum.Next() returns the first region, which has larger start key |
|
|
|
// case 2: landed before the second region in cache |
|
// enum.Prev() returns the first region X and moves pointer to -infinity |
|
// enum.Next() returns io.EOF |
|
// SeekFirst() + enum.Next() returns first region X, which has smaller start key |
|
|
|
// case 3: landed anywhere after the second region |
|
// enum.Prev() returns the region X before it landed, moves pointer to the region X - 1 |
|
// enum.Next() returns X - 1 and move pointer to X, which has smaller start key |
|
|
|
enum.Prev() |
|
_, _, err = enum.Next() |
|
if err == io.EOF { |
|
// we are in the beginning of tree, get new enum starting |
|
// from first region |
|
enum.Close() |
|
enum, err = krc.regions.SeekFirst() |
|
if err != nil { |
|
log.Fatalf( |
|
"error seeking first region when getting overlaps for region %v: %v", reg, err) |
|
} |
|
} |
|
|
|
_, v, err = enum.Next() |
|
if isRegionOverlap(v.(hrpc.RegionInfo), reg) { |
|
overlaps = append(overlaps, v.(hrpc.RegionInfo)) |
|
} |
|
_, v, err = enum.Next() |
|
|
|
// now append all regions that overlap until the end of the tree |
|
// or until they don't overlap |
|
for err != io.EOF && isRegionOverlap(v.(hrpc.RegionInfo), reg) { |
|
overlaps = append(overlaps, v.(hrpc.RegionInfo)) |
|
_, v, err = enum.Next() |
|
} |
|
enum.Close() |
|
return overlaps |
|
} |
|
|
|
// put looks up if there's already region with this name in regions cache |
|
// and if there's, returns it in overlaps and doesn't modify the cache. |
|
// Otherwise, it puts the region and removes all overlaps in case all of |
|
// them are older. Returns a slice of overlapping regions and whether |
|
// passed region was put in the cache. |
|
func (krc *keyRegionCache) put(reg hrpc.RegionInfo) (overlaps []hrpc.RegionInfo, replaced bool) { |
|
krc.m.Lock() |
|
krc.regions.Put(reg.Name(), func(v interface{}, exists bool) (interface{}, bool) { |
|
if exists { |
|
// region is already in cache, |
|
// note: regions with the same name have the same age |
|
overlaps = []hrpc.RegionInfo{v.(hrpc.RegionInfo)} |
|
return nil, false |
|
} |
|
// find all entries that are overlapping with the range of the new region. |
|
overlaps = krc.getOverlaps(reg) |
|
for _, o := range overlaps { |
|
if o.ID() > reg.ID() { |
|
// overlapping region is younger, |
|
// don't replace any regions |
|
// TODO: figure out if there can a case where we might |
|
// have both older and younger overlapping regions, for |
|
// now we only replace if all overlaps are older |
|
return nil, false |
|
} |
|
} |
|
// all overlaps are older, put the new region |
|
replaced = true |
|
return reg, true |
|
}) |
|
if !replaced { |
|
krc.m.Unlock() |
|
|
|
log.WithFields(log.Fields{ |
|
"region": reg, |
|
"overlaps": overlaps, |
|
"replaced": replaced, |
|
}).Debug("region is already in cache") |
|
return |
|
} |
|
// delete overlapping regions |
|
// TODO: in case overlaps are always either younger or older, |
|
// we can just greedily remove them in Put function |
|
for _, o := range overlaps { |
|
krc.regions.Delete(o.Name()) |
|
// let region establishers know that they can give up |
|
o.MarkDead() |
|
} |
|
krc.m.Unlock() |
|
|
|
log.WithFields(log.Fields{ |
|
"region": reg, |
|
"overlaps": overlaps, |
|
"replaced": replaced, |
|
}).Info("added new region") |
|
return |
|
} |
|
|
|
func (krc *keyRegionCache) del(reg hrpc.RegionInfo) bool { |
|
krc.m.Lock() |
|
success := krc.regions.Delete(reg.Name()) |
|
krc.m.Unlock() |
|
// let region establishers know that they can give up |
|
reg.MarkDead() |
|
|
|
log.WithFields(log.Fields{ |
|
"region": reg, |
|
}).Debug("removed region") |
|
return success |
|
}
|
|
|