You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
644 lines
18 KiB
644 lines
18 KiB
// Copyright (C) 2016 The GoHBase Authors. All rights reserved. |
|
// This file is part of GoHBase. |
|
// Use of this source code is governed by the Apache License 2.0 |
|
// that can be found in the COPYING file. |
|
|
|
package gohbase |
|
|
|
import ( |
|
"bytes" |
|
"context" |
|
"errors" |
|
"fmt" |
|
"io" |
|
"strconv" |
|
"time" |
|
|
|
"github.com/golang/protobuf/proto" |
|
log "github.com/sirupsen/logrus" |
|
"github.com/tsuna/gohbase/hrpc" |
|
"github.com/tsuna/gohbase/region" |
|
"github.com/tsuna/gohbase/zk" |
|
) |
|
|
|
// Constants |
|
var ( |
|
// Name of the meta region. |
|
metaTableName = []byte("hbase:meta") |
|
|
|
infoFamily = map[string][]string{ |
|
"info": nil, |
|
} |
|
|
|
// ErrRegionUnavailable is returned when sending rpc to a region that is unavailable |
|
ErrRegionUnavailable = errors.New("region unavailable") |
|
|
|
// TableNotFound is returned when attempting to access a table that |
|
// doesn't exist on this cluster. |
|
TableNotFound = errors.New("table not found") |
|
|
|
// ErrCannotFindRegion is returned when it took too many tries to find a |
|
// region for the request. It's likely that hbase:meta has overlaps or some other |
|
// inconsistency. |
|
ErrConnotFindRegion = errors.New("cannot find region for the rpc") |
|
|
|
// ErrClientClosed is returned when the gohbase client has been closed |
|
ErrClientClosed = errors.New("client is closed") |
|
|
|
// errMetaLookupThrottled is returned when a lookup for the rpc's region |
|
// has been throttled. |
|
errMetaLookupThrottled = errors.New("lookup to hbase:meta has been throttled") |
|
) |
|
|
|
const ( |
|
// maxSendRPCTries is the maximum number of times to try to send an RPC |
|
maxSendRPCTries = 10 |
|
|
|
backoffStart = 16 * time.Millisecond |
|
) |
|
|
|
func (c *client) SendRPC(rpc hrpc.Call) (proto.Message, error) { |
|
var err error |
|
for i := 0; i < maxSendRPCTries; i++ { |
|
// Check the cache for a region that can handle this request |
|
reg := c.getRegionFromCache(rpc.Table(), rpc.Key()) |
|
if reg == nil { |
|
reg, err = c.findRegion(rpc.Context(), rpc.Table(), rpc.Key()) |
|
if err == ErrRegionUnavailable { |
|
continue |
|
} else if err == errMetaLookupThrottled { |
|
// lookup for region has been throttled, check the cache |
|
// again but don't count this as SendRPC try as there |
|
// might be just too many request going on at a time. |
|
i-- |
|
continue |
|
} else if err != nil { |
|
return nil, err |
|
} |
|
} |
|
|
|
msg, err := c.sendRPCToRegion(rpc, reg) |
|
switch err { |
|
case ErrRegionUnavailable: |
|
if ch := reg.AvailabilityChan(); ch != nil { |
|
// The region is unavailable. Wait for it to become available, |
|
// a new region or for the deadline to be exceeded. |
|
select { |
|
case <-rpc.Context().Done(): |
|
return nil, rpc.Context().Err() |
|
case <-c.done: |
|
return nil, ErrClientClosed |
|
case <-ch: |
|
} |
|
} |
|
default: |
|
return msg, err |
|
} |
|
} |
|
return nil, ErrConnotFindRegion |
|
} |
|
|
|
func sendBlocking(rc hrpc.RegionClient, rpc hrpc.Call) (hrpc.RPCResult, error) { |
|
rc.QueueRPC(rpc) |
|
|
|
var res hrpc.RPCResult |
|
// Wait for the response |
|
select { |
|
case res = <-rpc.ResultChan(): |
|
return res, nil |
|
case <-rpc.Context().Done(): |
|
return res, rpc.Context().Err() |
|
} |
|
} |
|
|
|
func (c *client) sendRPCToRegion(rpc hrpc.Call, reg hrpc.RegionInfo) (proto.Message, error) { |
|
if reg.IsUnavailable() { |
|
return nil, ErrRegionUnavailable |
|
} |
|
rpc.SetRegion(reg) |
|
|
|
// Queue the RPC to be sent to the region |
|
client := reg.Client() |
|
if client == nil { |
|
// There was an error queueing the RPC. |
|
// Mark the region as unavailable. |
|
if reg.MarkUnavailable() { |
|
// If this was the first goroutine to mark the region as |
|
// unavailable, start a goroutine to reestablish a connection |
|
go c.reestablishRegion(reg) |
|
} |
|
return nil, ErrRegionUnavailable |
|
} |
|
res, err := sendBlocking(client, rpc) |
|
if err != nil { |
|
return nil, err |
|
} |
|
// Check for errors |
|
switch res.Error.(type) { |
|
case region.RetryableError: |
|
// There's an error specific to this region, but |
|
// our region client is fine. Mark this region as |
|
// unavailable (as opposed to all regions sharing |
|
// the client), and start a goroutine to reestablish |
|
// it. |
|
if reg.MarkUnavailable() { |
|
go c.reestablishRegion(reg) |
|
} |
|
return nil, ErrRegionUnavailable |
|
case region.UnrecoverableError: |
|
// If it was an unrecoverable error, the region client is |
|
// considered dead. |
|
if reg == c.adminRegionInfo { |
|
// If this is the admin client, mark the region |
|
// as unavailable and start up a goroutine to |
|
// reconnect if it wasn't already marked as such. |
|
if reg.MarkUnavailable() { |
|
go c.reestablishRegion(reg) |
|
} |
|
} else { |
|
c.clientDown(client) |
|
} |
|
|
|
// Fall through to the case of the region being unavailable, |
|
// which will result in blocking until it's available again. |
|
return nil, ErrRegionUnavailable |
|
default: |
|
// RPC was successfully sent, or an unknown type of error |
|
// occurred. In either case, return the results. |
|
return res.Msg, res.Error |
|
} |
|
} |
|
|
|
// clientDown removes client from cache and marks |
|
// all the regions sharing this region's |
|
// client as unavailable, and start a goroutine |
|
// to reconnect for each of them. |
|
func (c *client) clientDown(client hrpc.RegionClient) { |
|
downregions := c.clients.clientDown(client) |
|
for downreg := range downregions { |
|
if downreg.MarkUnavailable() { |
|
downreg.SetClient(nil) |
|
go c.reestablishRegion(downreg) |
|
} |
|
} |
|
} |
|
|
|
func (c *client) lookupRegion(ctx context.Context, |
|
table, key []byte) (hrpc.RegionInfo, string, error) { |
|
var reg hrpc.RegionInfo |
|
var addr string |
|
var err error |
|
backoff := backoffStart |
|
for { |
|
// If it takes longer than regionLookupTimeout, fail so that we can sleep |
|
lookupCtx, cancel := context.WithTimeout(ctx, c.regionLookupTimeout) |
|
if c.clientType == adminClient { |
|
log.WithField("resource", zk.Master).Debug("looking up master") |
|
|
|
addr, err = c.zkLookup(lookupCtx, zk.Master) |
|
cancel() |
|
reg = c.adminRegionInfo |
|
} else if bytes.Compare(table, metaTableName) == 0 { |
|
log.WithField("resource", zk.Meta).Debug("looking up region server of hbase:meta") |
|
|
|
addr, err = c.zkLookup(lookupCtx, zk.Meta) |
|
cancel() |
|
reg = c.metaRegionInfo |
|
} else { |
|
log.WithFields(log.Fields{ |
|
"table": strconv.Quote(string(table)), |
|
"key": strconv.Quote(string(key)), |
|
}).Debug("looking up region") |
|
|
|
reg, addr, err = c.metaLookup(lookupCtx, table, key) |
|
cancel() |
|
if err == TableNotFound { |
|
log.WithFields(log.Fields{ |
|
"table": strconv.Quote(string(table)), |
|
"key": strconv.Quote(string(key)), |
|
"err": err, |
|
}).Debug("hbase:meta does not know about this table/key") |
|
|
|
return nil, "", err |
|
} else if err == errMetaLookupThrottled { |
|
return nil, "", err |
|
} else if err == ErrClientClosed { |
|
return nil, "", err |
|
} |
|
} |
|
if err == nil { |
|
log.WithFields(log.Fields{ |
|
"table": strconv.Quote(string(table)), |
|
"key": strconv.Quote(string(key)), |
|
"region": reg, |
|
"addr": addr, |
|
}).Debug("looked up a region") |
|
|
|
return reg, addr, nil |
|
} |
|
|
|
log.WithFields(log.Fields{ |
|
"table": strconv.Quote(string(table)), |
|
"key": strconv.Quote(string(key)), |
|
"backoff": backoff, |
|
"err": err, |
|
}).Error("failed looking up region") |
|
|
|
// This will be hit if there was an error locating the region |
|
backoff, err = sleepAndIncreaseBackoff(ctx, backoff) |
|
if err != nil { |
|
return nil, "", err |
|
} |
|
} |
|
} |
|
|
|
func (c *client) findRegion(ctx context.Context, table, key []byte) (hrpc.RegionInfo, error) { |
|
// The region was not in the cache, it |
|
// must be looked up in the meta table |
|
reg, addr, err := c.lookupRegion(ctx, table, key) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
// We are the ones that looked up the region, so we need to |
|
// mark in unavailable and find a client for it. |
|
reg.MarkUnavailable() |
|
|
|
if reg != c.metaRegionInfo && reg != c.adminRegionInfo { |
|
// Check that the region wasn't added to |
|
// the cache while we were looking it up. |
|
overlaps, replaced := c.regions.put(reg) |
|
if !replaced { |
|
// the same or younger regions are already in cache, retry looking up in cache |
|
return nil, ErrRegionUnavailable |
|
} |
|
|
|
// otherwise, new region in cache, delete overlaps from client's cache |
|
for _, r := range overlaps { |
|
c.clients.del(r) |
|
} |
|
} |
|
|
|
// Start a goroutine to connect to the region |
|
go c.establishRegion(reg, addr) |
|
|
|
// Wait for the new region to become |
|
// available, and then send the RPC |
|
return reg, nil |
|
} |
|
|
|
// Searches in the regions cache for the region hosting the given row. |
|
func (c *client) getRegionFromCache(table, key []byte) hrpc.RegionInfo { |
|
if c.clientType == adminClient { |
|
return c.adminRegionInfo |
|
} else if bytes.Equal(table, metaTableName) { |
|
return c.metaRegionInfo |
|
} |
|
regionName := createRegionSearchKey(table, key) |
|
_, region := c.regions.get(regionName) |
|
if region == nil { |
|
return nil |
|
} |
|
|
|
// make sure the returned region is for the same table |
|
if !bytes.Equal(fullyQualifiedTable(region), table) { |
|
// not the same table, can happen if we got the last region |
|
return nil |
|
} |
|
|
|
if len(region.StopKey()) != 0 && |
|
// If the stop key is an empty byte array, it means this region is the |
|
// last region for this table and this key ought to be in that region. |
|
bytes.Compare(key, region.StopKey()) >= 0 { |
|
return nil |
|
} |
|
|
|
return region |
|
} |
|
|
|
// Creates the META key to search for in order to locate the given key. |
|
func createRegionSearchKey(table, key []byte) []byte { |
|
metaKey := make([]byte, 0, len(table)+len(key)+3) |
|
metaKey = append(metaKey, table...) |
|
metaKey = append(metaKey, ',') |
|
metaKey = append(metaKey, key...) |
|
metaKey = append(metaKey, ',') |
|
// ':' is the first byte greater than '9'. We always want to find the |
|
// entry with the greatest timestamp, so by looking right before ':' |
|
// we'll find it. |
|
metaKey = append(metaKey, ':') |
|
return metaKey |
|
} |
|
|
|
// lookupLimit throttles lookups to hbase:meta to metaLookupLimit requests |
|
// per metaLookupInterval. It returns nil if we were lucky enough to |
|
// reserve right away and errMetaLookupThrottled or context's error otherwise. |
|
func (c *client) metaLookupLimit(ctx context.Context) error { |
|
r := c.metaLookupLimiter.Reserve() |
|
if !r.OK() { |
|
panic("wtf: cannot reserve a meta lookup") |
|
} |
|
|
|
delay := r.Delay() |
|
if delay <= 0 { |
|
return nil |
|
} |
|
|
|
// We've been rate limitted |
|
t := time.NewTimer(delay) |
|
defer t.Stop() |
|
select { |
|
case <-t.C: |
|
return errMetaLookupThrottled |
|
case <-ctx.Done(): |
|
r.Cancel() |
|
return ctx.Err() |
|
} |
|
} |
|
|
|
// metaLookup checks meta table for the region in which the given row key for the given table is. |
|
func (c *client) metaLookup(ctx context.Context, |
|
table, key []byte) (hrpc.RegionInfo, string, error) { |
|
metaKey := createRegionSearchKey(table, key) |
|
rpc, err := hrpc.NewScanRange(ctx, metaTableName, metaKey, table, |
|
hrpc.Families(infoFamily), |
|
hrpc.Reversed(), |
|
hrpc.CloseScanner(), |
|
hrpc.NumberOfRows(1)) |
|
if err != nil { |
|
return nil, "", err |
|
} |
|
|
|
scanner := c.Scan(rpc) |
|
resp, err := scanner.Next() |
|
if err == io.EOF { |
|
return nil, "", TableNotFound |
|
} |
|
if err != nil { |
|
return nil, "", err |
|
} |
|
|
|
reg, addr, err := region.ParseRegionInfo(resp) |
|
if err != nil { |
|
return nil, "", err |
|
} |
|
if !bytes.Equal(table, fullyQualifiedTable(reg)) { |
|
// This would indicate a bug in HBase. |
|
return nil, "", fmt.Errorf("wtf: meta returned an entry for the wrong table!"+ |
|
" Looked up table=%q key=%q got region=%s", table, key, reg) |
|
} else if len(reg.StopKey()) != 0 && |
|
bytes.Compare(key, reg.StopKey()) >= 0 { |
|
// This would indicate a hole in the meta table. |
|
return nil, "", fmt.Errorf("wtf: meta returned an entry for the wrong region!"+ |
|
" Looked up table=%q key=%q got region=%s", table, key, reg) |
|
} |
|
return reg, addr, nil |
|
} |
|
|
|
func fullyQualifiedTable(reg hrpc.RegionInfo) []byte { |
|
namespace := reg.Namespace() |
|
table := reg.Table() |
|
if namespace == nil { |
|
return table |
|
} |
|
// non-default namespace table |
|
fqTable := make([]byte, 0, len(namespace)+1+len(table)) |
|
fqTable = append(fqTable, namespace...) |
|
fqTable = append(fqTable, byte(':')) |
|
fqTable = append(fqTable, table...) |
|
return fqTable |
|
} |
|
|
|
func (c *client) reestablishRegion(reg hrpc.RegionInfo) { |
|
select { |
|
case <-c.done: |
|
return |
|
default: |
|
} |
|
|
|
log.WithField("region", reg).Debug("reestablishing region") |
|
c.establishRegion(reg, "") |
|
} |
|
|
|
// probeKey returns a key in region that is unlikely to have data at it |
|
// in order to test if the region is online. This prevents the Get request |
|
// to actually fetch the data from the storage which consumes resources |
|
// of the region server |
|
func probeKey(reg hrpc.RegionInfo) []byte { |
|
// now we create a probe key: reg.StartKey() + 17 zeros |
|
probe := make([]byte, len(reg.StartKey())+17) |
|
copy(probe, reg.StartKey()) |
|
return probe |
|
} |
|
|
|
// isRegionEstablished checks whether regionserver accepts rpcs for the region. |
|
// Returns the cause if not established. |
|
func isRegionEstablished(rc hrpc.RegionClient, reg hrpc.RegionInfo) error { |
|
probe, err := hrpc.NewGet(context.Background(), fullyQualifiedTable(reg), probeKey(reg), |
|
hrpc.SkipBatch()) |
|
if err != nil { |
|
panic(fmt.Sprintf("should not happen: %s", err)) |
|
} |
|
probe.ExistsOnly() |
|
|
|
probe.SetRegion(reg) |
|
res, err := sendBlocking(rc, probe) |
|
if err != nil { |
|
panic(fmt.Sprintf("should not happen: %s", err)) |
|
} |
|
|
|
switch res.Error.(type) { |
|
case region.RetryableError, region.UnrecoverableError: |
|
return res.Error |
|
default: |
|
return nil |
|
} |
|
} |
|
|
|
func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) { |
|
var backoff time.Duration |
|
var err error |
|
for { |
|
backoff, err = sleepAndIncreaseBackoff(reg.Context(), backoff) |
|
if err != nil { |
|
// region is dead |
|
reg.MarkAvailable() |
|
return |
|
} |
|
if addr == "" { |
|
// need to look up region and address of the regionserver |
|
originalReg := reg |
|
// lookup region forever until we get it or we learn that it doesn't exist |
|
reg, addr, err = c.lookupRegion(originalReg.Context(), |
|
fullyQualifiedTable(originalReg), originalReg.StartKey()) |
|
|
|
if err == TableNotFound { |
|
// region doesn't exist, delete it from caches |
|
c.regions.del(originalReg) |
|
c.clients.del(originalReg) |
|
originalReg.MarkAvailable() |
|
|
|
log.WithFields(log.Fields{ |
|
"region": originalReg.String(), |
|
"err": err, |
|
"backoff": backoff, |
|
}).Info("region does not exist anymore") |
|
|
|
return |
|
} else if originalReg.Context().Err() != nil { |
|
// region is dead |
|
originalReg.MarkAvailable() |
|
|
|
log.WithFields(log.Fields{ |
|
"region": originalReg.String(), |
|
"err": err, |
|
"backoff": backoff, |
|
}).Info("region became dead while establishing client for it") |
|
|
|
return |
|
} else if err == errMetaLookupThrottled { |
|
// We've been throttled, backoff and retry the lookup |
|
// TODO: backoff might be unnecessary |
|
reg = originalReg |
|
continue |
|
} else if err == ErrClientClosed { |
|
// client has been closed |
|
return |
|
} else if err != nil { |
|
log.WithFields(log.Fields{ |
|
"region": originalReg.String(), |
|
"err": err, |
|
"backoff": backoff, |
|
}).Fatal("unknown error occured when looking up region") |
|
} |
|
if !bytes.Equal(reg.Name(), originalReg.Name()) { |
|
// put new region and remove overlapping ones. |
|
// Should remove the original region as well. |
|
reg.MarkUnavailable() |
|
overlaps, replaced := c.regions.put(reg) |
|
if !replaced { |
|
// a region that is the same or younger is already in cache |
|
reg.MarkAvailable() |
|
originalReg.MarkAvailable() |
|
return |
|
} |
|
// otherwise delete the overlapped regions in cache |
|
for _, r := range overlaps { |
|
c.clients.del(r) |
|
} |
|
// let rpcs know that they can retry and either get the newly |
|
// added region from cache or lookup the one they need |
|
originalReg.MarkAvailable() |
|
} else { |
|
// same region, discard the looked up one |
|
reg = originalReg |
|
} |
|
} |
|
|
|
// connect to the region's regionserver |
|
client, err := c.establishRegionClient(reg, addr) |
|
if err == nil { |
|
if reg == c.adminRegionInfo { |
|
reg.SetClient(client) |
|
reg.MarkAvailable() |
|
return |
|
} |
|
|
|
if existing := c.clients.put(client, reg); existing != client { |
|
// a client for this regionserver is already in cache, discard this one. |
|
client.Close() |
|
client = existing |
|
} |
|
|
|
if err = isRegionEstablished(client, reg); err == nil { |
|
// set region client so that as soon as we mark it available, |
|
// concurrent readers are able to find the client |
|
reg.SetClient(client) |
|
reg.MarkAvailable() |
|
return |
|
} else if _, ok := err.(region.UnrecoverableError); ok { |
|
// the client we got died |
|
c.clientDown(client) |
|
} |
|
} else if err == context.Canceled { |
|
// region is dead |
|
reg.MarkAvailable() |
|
return |
|
} |
|
log.WithFields(log.Fields{ |
|
"region": reg, |
|
"backoff": backoff, |
|
"err": err, |
|
}).Debug("region was not established, retrying") |
|
// reset address because we weren't able to connect to it |
|
// or regionserver says it's still offline, should look up again |
|
addr = "" |
|
} |
|
} |
|
|
|
func sleepAndIncreaseBackoff(ctx context.Context, backoff time.Duration) (time.Duration, error) { |
|
if backoff == 0 { |
|
return backoffStart, nil |
|
} |
|
select { |
|
case <-time.After(backoff): |
|
case <-ctx.Done(): |
|
return 0, ctx.Err() |
|
} |
|
// TODO: Revisit how we back off here. |
|
if backoff < 5000*time.Millisecond { |
|
return backoff * 2, nil |
|
} |
|
return backoff + 5000*time.Millisecond, nil |
|
} |
|
|
|
func (c *client) establishRegionClient(reg hrpc.RegionInfo, |
|
addr string) (hrpc.RegionClient, error) { |
|
if c.clientType != adminClient { |
|
// if rpc is not for hbasemaster, check if client for regionserver |
|
// already exists |
|
if client := c.clients.checkForClient(addr); client != nil { |
|
// There's already a client |
|
return client, nil |
|
} |
|
} |
|
|
|
var clientType region.ClientType |
|
if c.clientType == standardClient { |
|
clientType = region.RegionClient |
|
} else { |
|
clientType = region.MasterClient |
|
} |
|
clientCtx, cancel := context.WithTimeout(reg.Context(), c.regionLookupTimeout) |
|
defer cancel() |
|
|
|
return region.NewClient(clientCtx, addr, clientType, |
|
c.rpcQueueSize, c.flushInterval, c.effectiveUser, |
|
c.regionReadTimeout) |
|
} |
|
|
|
// zkResult contains the result of a ZooKeeper lookup (when we're looking for |
|
// the meta region or the HMaster). |
|
type zkResult struct { |
|
addr string |
|
err error |
|
} |
|
|
|
// zkLookup asynchronously looks up the meta region or HMaster in ZooKeeper. |
|
func (c *client) zkLookup(ctx context.Context, resource zk.ResourceName) (string, error) { |
|
// We make this a buffered channel so that if we stop waiting due to a |
|
// timeout, we won't block the zkLookupSync() that we start in a |
|
// separate goroutine. |
|
reschan := make(chan zkResult, 1) |
|
go func() { |
|
addr, err := c.zkClient.LocateResource(resource.Prepend(c.zkRoot)) |
|
// This is guaranteed to never block as the channel is always buffered. |
|
reschan <- zkResult{addr, err} |
|
}() |
|
select { |
|
case res := <-reschan: |
|
return res.addr, res.err |
|
case <-ctx.Done(): |
|
return "", ctx.Err() |
|
} |
|
}
|
|
|