You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
278 lines
6.7 KiB
278 lines
6.7 KiB
// Copyright (C) 2017 The GoHBase Authors. All rights reserved. |
|
// This file is part of GoHBase. |
|
// Use of this source code is governed by the Apache License 2.0 |
|
// that can be found in the COPYING file. |
|
|
|
package region |
|
|
|
import ( |
|
"context" |
|
"errors" |
|
"fmt" |
|
"sync" |
|
|
|
"github.com/golang/protobuf/proto" |
|
"github.com/tsuna/gohbase/hrpc" |
|
"github.com/tsuna/gohbase/pb" |
|
) |
|
|
|
var multiPool = sync.Pool{ |
|
New: func() interface{} { |
|
return &multi{} |
|
}, |
|
} |
|
|
|
func freeMulti(m *multi) { |
|
m.calls = m.calls[:0] |
|
m.regions = m.regions[:0] |
|
m.size = 0 |
|
multiPool.Put(m) |
|
} |
|
|
|
type multi struct { |
|
size int |
|
calls []hrpc.Call |
|
// regions preserves the order of regions to match against RegionActionResults |
|
regions []hrpc.RegionInfo |
|
} |
|
|
|
func newMulti(queueSize int) *multi { |
|
m := multiPool.Get().(*multi) |
|
m.size = queueSize |
|
return m |
|
} |
|
|
|
// Name returns the name of this RPC call. |
|
func (m *multi) Name() string { |
|
return "Multi" |
|
} |
|
|
|
// ToProto converts all request in multi batch to a protobuf message. |
|
func (m *multi) ToProto() proto.Message { |
|
// aggregate calls per region |
|
actionsPerReg := map[hrpc.RegionInfo][]*pb.Action{} |
|
|
|
for i, c := range m.calls { |
|
select { |
|
case <-c.Context().Done(): |
|
// context has expired, don't bother sending it |
|
m.calls[i] = nil |
|
continue |
|
default: |
|
} |
|
|
|
msg := c.ToProto() |
|
|
|
a := &pb.Action{ |
|
Index: proto.Uint32(uint32(i) + 1), // +1 because 0 index means there's no index |
|
} |
|
|
|
switch r := msg.(type) { |
|
case *pb.GetRequest: |
|
a.Get = r.Get |
|
case *pb.MutateRequest: |
|
a.Mutation = r.Mutation |
|
default: |
|
panic(fmt.Sprintf("unsupported call type for Multi: %T", c)) |
|
} |
|
|
|
actionsPerReg[c.Region()] = append(actionsPerReg[c.Region()], a) |
|
} |
|
|
|
// construct the multi proto |
|
ra := make([]*pb.RegionAction, len(actionsPerReg)) |
|
m.regions = make([]hrpc.RegionInfo, len(actionsPerReg)) |
|
|
|
i := 0 |
|
for r, as := range actionsPerReg { |
|
ra[i] = &pb.RegionAction{ |
|
Region: &pb.RegionSpecifier{ |
|
Type: pb.RegionSpecifier_REGION_NAME.Enum(), |
|
Value: r.Name(), |
|
}, |
|
Action: as, |
|
} |
|
// Track the order of RegionActions, |
|
// so that we can handle whole region exceptions. |
|
m.regions[i] = r |
|
i++ |
|
} |
|
return &pb.MultiRequest{RegionAction: ra} |
|
} |
|
|
|
// NewResponse creates an empty protobuf message to read the response of this RPC. |
|
func (m *multi) NewResponse() proto.Message { |
|
return &pb.MultiResponse{} |
|
} |
|
|
|
// DeserializeCellBlocks deserializes action results from cell blocks. |
|
func (m *multi) DeserializeCellBlocks(msg proto.Message, b []byte) (uint32, error) { |
|
mr := msg.(*pb.MultiResponse) |
|
|
|
var nread uint32 |
|
for _, rar := range mr.GetRegionActionResult() { |
|
if e := rar.GetException(); e != nil { |
|
if l := len(rar.GetResultOrException()); l != 0 { |
|
return 0, fmt.Errorf( |
|
"got exception for region, but still have %d result(s) returned from it", l) |
|
} |
|
continue |
|
} |
|
|
|
for _, roe := range rar.GetResultOrException() { |
|
e := roe.GetException() |
|
r := roe.GetResult() |
|
i := roe.GetIndex() |
|
|
|
if i == 0 { |
|
return 0, errors.New("no index for result in multi response") |
|
} else if r == nil && e == nil { |
|
return 0, errors.New("no result or exception for action in multi response") |
|
} else if r != nil && e != nil { |
|
return 0, errors.New("got result and exception for action in multi response") |
|
} else if e != nil { |
|
continue |
|
} |
|
|
|
c := m.get(i) // TODO: maybe return error if it's out-of-bounds |
|
d := c.(canDeserializeCellBlocks) // let it panic, because then it's our bug |
|
|
|
response := c.NewResponse() |
|
switch rsp := response.(type) { |
|
case *pb.GetResponse: |
|
rsp.Result = r |
|
case *pb.MutateResponse: |
|
rsp.Result = r |
|
default: |
|
panic(fmt.Sprintf("unsupported response type for Multi: %T", response)) |
|
} |
|
|
|
// TODO: don't bother deserializing if the call's context has already expired |
|
n, err := d.DeserializeCellBlocks(response, b[nread:]) |
|
if err != nil { |
|
return 0, fmt.Errorf( |
|
"error deserializing cellblocks for %q call as part of MultiResponse: %v", |
|
c.Name(), err) |
|
} |
|
nread += n |
|
} |
|
} |
|
return nread, nil |
|
} |
|
|
|
func (m *multi) returnResults(msg proto.Message, err error) { |
|
defer freeMulti(m) |
|
|
|
if err != nil { |
|
for _, c := range m.calls { |
|
if c == nil { |
|
continue |
|
} |
|
c.ResultChan() <- hrpc.RPCResult{Error: err} |
|
} |
|
return |
|
} |
|
|
|
mr := msg.(*pb.MultiResponse) |
|
|
|
// Here we can assume that everything has been deserialized correctly. |
|
// Dispatch results to appropriate calls. |
|
for i, rar := range mr.GetRegionActionResult() { |
|
if e := rar.GetException(); e != nil { |
|
// Got an exception for the whole region, |
|
// fail all the calls for that region. |
|
reg := m.regions[i] |
|
|
|
err := exceptionToError(*e.Name, string(e.Value)) |
|
for _, c := range m.calls { |
|
if c == nil { |
|
continue |
|
} |
|
if c.Region() == reg { |
|
c.ResultChan() <- hrpc.RPCResult{Error: err} |
|
} |
|
} |
|
continue |
|
} |
|
|
|
for _, roe := range rar.GetResultOrException() { |
|
i := roe.GetIndex() |
|
e := roe.GetException() |
|
r := roe.GetResult() |
|
|
|
c := m.get(i) |
|
|
|
// TODO: don't bother if the call's context has already expired |
|
|
|
if e != nil { |
|
c.ResultChan() <- hrpc.RPCResult{ |
|
Error: exceptionToError(*e.Name, string(e.Value)), |
|
} |
|
continue |
|
} |
|
|
|
response := c.NewResponse() |
|
switch rsp := response.(type) { |
|
case *pb.GetResponse: |
|
rsp.Result = r |
|
case *pb.MutateResponse: |
|
rsp.Result = r |
|
default: |
|
panic(fmt.Sprintf("unsupported response type for Multi: %T", response)) |
|
} |
|
|
|
c.ResultChan() <- hrpc.RPCResult{Msg: response} |
|
} |
|
} |
|
} |
|
|
|
// add adds the call and returns wether the batch is full. |
|
func (m *multi) add(call hrpc.Call) bool { |
|
m.calls = append(m.calls, call) |
|
return len(m.calls) == m.size |
|
} |
|
|
|
// len returns number of batched calls. |
|
func (m *multi) len() int { |
|
return len(m.calls) |
|
} |
|
|
|
// get retruns an rpc at index. Indicies start from 1 since 0 means that |
|
// region server didn't set an index for the action result. |
|
func (m *multi) get(i uint32) hrpc.Call { |
|
if i == 0 { |
|
panic("index cannot be 0") |
|
} |
|
return m.calls[i-1] |
|
} |
|
|
|
// Table is not supported for Multi. |
|
func (m *multi) Table() []byte { |
|
panic("'Table' is not supported for 'Multi'") |
|
} |
|
|
|
// Reqion is not supported for Multi. |
|
func (m *multi) Region() hrpc.RegionInfo { |
|
panic("'Region' is not supported for 'Multi'") |
|
} |
|
|
|
// SetRegion is not supported for Multi. |
|
func (m *multi) SetRegion(r hrpc.RegionInfo) { |
|
panic("'SetRegion' is not supported for 'Multi'") |
|
} |
|
|
|
// ResultChan is not supported for Multi. |
|
func (m *multi) ResultChan() chan hrpc.RPCResult { |
|
panic("'ResultChan' is not supported for 'Multi'") |
|
} |
|
|
|
// Context is not supported for Multi. |
|
func (m *multi) Context() context.Context { |
|
// TODO: maybe pick the one with the longest deadline and use a context that has that deadline? |
|
return context.Background() |
|
} |
|
|
|
// Key is not supported for Multi RPC. |
|
func (m *multi) Key() []byte { |
|
panic("'Key' is not supported for 'Multi'") |
|
}
|
|
|