You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
334 lines
7.7 KiB
334 lines
7.7 KiB
package liverpc |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"net/url" |
|
"sync/atomic" |
|
"time" |
|
|
|
"go-common/library/conf/env" |
|
"go-common/library/log" |
|
"go-common/library/naming" |
|
"go-common/library/naming/discovery" |
|
"go-common/library/net/metadata" |
|
"go-common/library/net/trace" |
|
"go-common/library/stat" |
|
xtime "go-common/library/time" |
|
|
|
"github.com/golang/protobuf/proto" |
|
"github.com/pkg/errors" |
|
) |
|
|
|
// Key is ContextKey |
|
type Key int |
|
|
|
const ( |
|
_ Key = iota |
|
// KeyHeader use this in context to pass rpc header field |
|
// Depreated 请使用HeaderOption来传递Header |
|
KeyHeader |
|
// KeyTimeout deprecated |
|
// Depreated 请使用HTTPOption来传递HTTP |
|
KeyTimeout |
|
) |
|
|
|
const ( |
|
_scheme = "liverpc" |
|
_dialRetries = 3 |
|
) |
|
|
|
// Get Implement tracer carrier interface |
|
func (m *Header) Get(key string) string { |
|
if key == trace.KeyTraceID { |
|
return m.TraceId |
|
} |
|
return "" |
|
} |
|
|
|
// Set Implement tracer carrier interface |
|
func (m *Header) Set(key string, val string) { |
|
if key == trace.KeyTraceID { |
|
m.TraceId = val |
|
} |
|
} |
|
|
|
var ( |
|
// ErrNoClient no RPC client. |
|
errNoClient = errors.New("no rpc client") |
|
errGroupInvalid = errors.New("invalid group") |
|
|
|
stats = stat.RPCClient |
|
) |
|
|
|
// GroupAddrs a map struct storing addrs vary groups |
|
type GroupAddrs map[string][]string |
|
|
|
// ClientConfig client config. |
|
type ClientConfig struct { |
|
AppID string |
|
Group string |
|
Timeout xtime.Duration |
|
ConnTimeout xtime.Duration |
|
Addr string // if addr is provided, it will use add, else, use discovery |
|
} |
|
|
|
// Client is a RPC client. |
|
type Client struct { |
|
conf *ClientConfig |
|
dis naming.Resolver |
|
addrs atomic.Value // GroupAddrs |
|
addrsIdx int64 |
|
} |
|
|
|
// NewClient new a RPC client with discovery. |
|
func NewClient(c *ClientConfig) *Client { |
|
if c.Timeout <= 0 { |
|
c.Timeout = xtime.Duration(time.Second) |
|
} |
|
if c.ConnTimeout <= 0 { |
|
c.ConnTimeout = xtime.Duration(time.Second) |
|
} |
|
cli := &Client{ |
|
conf: c, |
|
} |
|
if c.Addr != "" { |
|
groupAddrs := make(GroupAddrs) |
|
groupAddrs[""] = []string{c.Addr} |
|
cli.addrs.Store(groupAddrs) |
|
return cli |
|
} |
|
|
|
cli.dis = discovery.Build(c.AppID) |
|
// discovery watch & fetch nodes |
|
event := cli.dis.Watch() |
|
select { |
|
case _, ok := <-event: |
|
if !ok { |
|
panic("刚启动就从discovery拉到了关闭的event") |
|
} |
|
cli.disFetch() |
|
fmt.Printf("开始创建:%s 的liverpc client,等待从discovery拉取节点:%s\n", c.AppID, time.Now().Format("2006-01-02 15:04:05")) |
|
case <-time.After(10 * time.Second): |
|
fmt.Printf("失败创建:%s 的liverpc client,竟然从discovery拉取节点超时了:%s\n", c.AppID, time.Now().Format("2006-01-02 15:04:05")) |
|
} |
|
go cli.disproc(event) |
|
return cli |
|
} |
|
|
|
func (c *Client) disproc(event <-chan struct{}) { |
|
for { |
|
_, ok := <-event |
|
if !ok { |
|
return |
|
} |
|
c.disFetch() |
|
} |
|
} |
|
|
|
func (c *Client) disFetch() { |
|
ins, ok := c.dis.Fetch(context.Background()) |
|
if !ok { |
|
return |
|
} |
|
insZone, ok := ins[env.Zone] |
|
if !ok { |
|
return |
|
} |
|
addrs := make(GroupAddrs) |
|
for _, svr := range insZone { |
|
group, ok := svr.Metadata["color"] |
|
if !ok { |
|
group = "" |
|
} |
|
for _, addr := range svr.Addrs { |
|
u, err := url.Parse(addr) |
|
if err == nil && u.Scheme == _scheme { |
|
addrs[group] = append(addrs[group], u.Host) |
|
} |
|
} |
|
} |
|
if len(addrs) > 0 { |
|
c.addrs.Store(addrs) |
|
} |
|
} |
|
|
|
// pickConn pick conn by addrs |
|
func (c *Client) pickConn(ctx context.Context, addrs []string, dialTimeout time.Duration) (*ClientConn, error) { |
|
var ( |
|
lastErr error |
|
) |
|
if len(addrs) == 0 { |
|
lastErr = errors.New("addrs empty") |
|
} else { |
|
for i := 0; i < _dialRetries; i++ { |
|
idx := atomic.AddInt64(&c.addrsIdx, 1) |
|
addr := addrs[int(idx)%len(addrs)] |
|
if dialTimeout == 0 { |
|
dialTimeout = time.Duration(c.conf.ConnTimeout) |
|
} |
|
cc, err := Dial(ctx, "tcp", addr, time.Duration(c.conf.Timeout), dialTimeout) |
|
if err != nil { |
|
lastErr = errors.Wrapf(err, "Dial %s error", addr) |
|
continue |
|
} |
|
return cc, nil |
|
} |
|
} |
|
if lastErr != nil { |
|
return nil, errors.WithMessage(errNoClient, lastErr.Error()) |
|
} |
|
return nil, errors.WithStack(errNoClient) |
|
} |
|
|
|
// fetchAddrs fetch addrs by different strategies |
|
// source_group first, come from request header if exists, currently only CallRaw supports source_group |
|
// then env group, come from os.env |
|
// since no invalid group found, return error |
|
func (c *Client) fetchAddrs(ctx context.Context, request interface{}) (addrs []string, err error) { |
|
var ( |
|
args *Args |
|
groupAddrs GroupAddrs |
|
ok bool |
|
sourceGroup string |
|
groups []string |
|
) |
|
defer func() { |
|
if err != nil { |
|
err = errors.WithMessage(errGroupInvalid, err.Error()) |
|
} |
|
}() |
|
// try parse request header and fetch source group |
|
if args, ok = request.(*Args); ok && args.Header != nil { |
|
sourceGroup = args.Header.SourceGroup |
|
if sourceGroup != "" { |
|
groups = append(groups, sourceGroup) |
|
} |
|
} |
|
metaColor := metadata.String(ctx, metadata.Color) |
|
if metaColor != "" && metaColor != sourceGroup { |
|
groups = append(groups, metaColor) |
|
} |
|
|
|
if env.Color != "" && env.Color != metaColor { |
|
groups = append(groups, env.Color) |
|
} |
|
|
|
groups = append(groups, "") |
|
|
|
if groupAddrs, ok = c.addrs.Load().(GroupAddrs); !ok { |
|
err = errors.New("addrs load error") |
|
return |
|
} |
|
if len(groupAddrs) == 0 { |
|
err = errors.New("group addrs empty") |
|
return |
|
} |
|
for _, group := range groups { |
|
if addrs, ok = groupAddrs[group]; ok { |
|
break |
|
} |
|
} |
|
if len(addrs) == 0 { |
|
err = errors.Errorf("addrs empty source(%s), metadata(%s), env(%s), default empty, allAddrs(%+v)", |
|
sourceGroup, metaColor, env.Color, groupAddrs) |
|
return |
|
} |
|
return |
|
} |
|
|
|
// Call call the service method, waits for it to complete, and returns its error status. |
|
// client: {service} |
|
// serviceMethod: {version}|{controller.method} |
|
// httpURL: /room/v1/Room/room_init |
|
// httpURL: /{service}/{version}/{controller}/{method} |
|
func (c *Client) Call(ctx context.Context, version int, serviceMethod string, in proto.Message, out proto.Message, opts ...CallOption) (err error) { |
|
var ( |
|
cc *ClientConn |
|
addrs []string |
|
) |
|
isPickErr := true |
|
|
|
defer func() { |
|
if cc != nil { |
|
cc.Close() |
|
} |
|
if err != nil && isPickErr { |
|
log.Error("liverpc Call pick connection error, version %d, method: %s, error: %+v", version, serviceMethod, err) |
|
} |
|
}() // for now it is non-persistent connection |
|
var cInfo = &callInfo{} |
|
for _, o := range opts { |
|
o.before(cInfo) |
|
} |
|
addrs, err = c.fetchAddrs(ctx, in) |
|
if err != nil { |
|
return |
|
} |
|
cc, err = c.pickConn(ctx, addrs, cInfo.DialTimeout) |
|
if err != nil { |
|
return |
|
} |
|
isPickErr = false |
|
cc.callInfo = cInfo |
|
err = cc.Call(ctx, version, serviceMethod, in, out) |
|
if err != nil { |
|
return |
|
} |
|
for _, o := range opts { |
|
o.after(cc.callInfo) |
|
} |
|
return |
|
} |
|
|
|
// CallRaw call the service method, waits for it to complete, and returns reply its error status. |
|
// this is can be use without protobuf |
|
// client: {service} |
|
// serviceMethod: {version}|{controller.method} |
|
// httpURL: /room/v1/Room/room_init |
|
// httpURL: /{service}/{version}/{controller}/{method} |
|
func (c *Client) CallRaw(ctx context.Context, version int, serviceMethod string, in *Args, opts ...CallOption) (out *Reply, err error) { |
|
var ( |
|
cc *ClientConn |
|
addrs []string |
|
) |
|
isPickErr := true |
|
|
|
defer func() { |
|
if cc != nil { |
|
cc.Close() |
|
} |
|
if err != nil && isPickErr { |
|
log.Error("liverpc CallRaw pick connection error, version %d, method: %s, error: %+v", version, serviceMethod, err) |
|
} |
|
}() // for now it is non-persistent connection |
|
var cInfo = &callInfo{} |
|
for _, o := range opts { |
|
o.before(cInfo) |
|
} |
|
addrs, err = c.fetchAddrs(ctx, in) |
|
if err != nil { |
|
return |
|
} |
|
cc, err = c.pickConn(ctx, addrs, cInfo.DialTimeout) |
|
if err != nil { |
|
return |
|
} |
|
isPickErr = false |
|
cc.callInfo = cInfo |
|
out, err = cc.CallRaw(ctx, version, serviceMethod, in) |
|
if err != nil { |
|
return |
|
} |
|
for _, o := range opts { |
|
o.after(cc.callInfo) |
|
} |
|
return |
|
} |
|
|
|
//Close handle client exit |
|
func (c *Client) Close() { |
|
if c.dis != nil { |
|
c.dis.Close() |
|
} |
|
}
|
|
|