You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
105 lines
2.6 KiB
105 lines
2.6 KiB
package rpc |
|
|
|
import ( |
|
"context" |
|
"reflect" |
|
"sync/atomic" |
|
|
|
"go-common/library/log" |
|
"go-common/library/sync/errgroup" |
|
) |
|
|
|
const ( |
|
_clientsPool = 3 |
|
) |
|
|
|
// balancer interface. |
|
type balancer interface { |
|
Boardcast(context.Context, string, interface{}, interface{}) error |
|
Call(context.Context, string, interface{}, interface{}) error |
|
} |
|
|
|
// wrr get avaliable rpc client by wrr strategy. |
|
type wrr struct { |
|
pool []*Client |
|
weight int64 |
|
server int64 |
|
idx int64 |
|
} |
|
|
|
// Boardcast broad cast to all Client. |
|
// NOTE: reply must be ptr. |
|
func (r *wrr) Boardcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) { |
|
if r.weight == 0 { |
|
log.Error("wrr get() error weight:%d server:%d idx:%d", len(r.pool), r.server, r.idx) |
|
return ErrNoClient |
|
} |
|
rtp := reflect.TypeOf(reply).Elem() |
|
g, ctx := errgroup.WithContext(ctx) |
|
for i := 0; i < int(r.server); i++ { |
|
j := i |
|
g.Go(func() error { |
|
nrp := reflect.New(rtp).Interface() |
|
return r.pool[j].Call(ctx, serviceMethod, args, nrp) |
|
}) |
|
} |
|
return g.Wait() |
|
} |
|
|
|
func (r *wrr) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) { |
|
if r.weight == 0 { |
|
log.Error("wrr get() error weight:%d server:%d idx:%d", len(r.pool), r.server, r.idx) |
|
return ErrNoClient |
|
} |
|
v := atomic.AddInt64(&r.idx, 1) |
|
for i := int64(0); i < r.server; i++ { |
|
cli := r.pool[int((v+i)%r.weight)] |
|
if err = cli.Call(ctx, serviceMethod, args, reply); err != ErrNoClient { |
|
return |
|
} |
|
} |
|
return ErrNoClient |
|
} |
|
|
|
type key interface { |
|
Key() int64 |
|
} |
|
|
|
type sharding struct { |
|
pool []*Client |
|
weight int64 |
|
server int64 |
|
idx int64 |
|
} |
|
|
|
// Boardcast broad cast to all clients. |
|
// NOTE: reply must be ptr. |
|
func (r *sharding) Boardcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) { |
|
if r.weight == 0 { |
|
log.Error("wrr get() error weight:%d server:%d idx:%d", len(r.pool), r.server, r.idx) |
|
return ErrNoClient |
|
} |
|
rtp := reflect.TypeOf(reply).Elem() |
|
g, ctx := errgroup.WithContext(ctx) |
|
for i := 0; i < int(r.server); i++ { |
|
j := i |
|
g.Go(func() error { |
|
nrp := reflect.New(rtp).Interface() |
|
return r.pool[j].Call(ctx, serviceMethod, args, nrp) |
|
}) |
|
} |
|
return g.Wait() |
|
} |
|
|
|
func (r *sharding) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) { |
|
if r.weight == 0 { |
|
log.Error("wrr get() error weight:%d server:%d idx:%d", len(r.pool), r.server, r.idx) |
|
return ErrNoClient |
|
} |
|
if k, ok := args.(key); ok { |
|
if err = r.pool[int(k.Key()%r.server)].Call(ctx, serviceMethod, args, reply); err != ErrNoClient { |
|
return |
|
} |
|
} |
|
return ErrNoClient |
|
}
|
|
|