You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
154 lines
4.2 KiB
154 lines
4.2 KiB
package service |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"net/url" |
|
"sync/atomic" |
|
"time" |
|
|
|
client "go-common/app/interface/main/broadcast/api/grpc/v1" |
|
"go-common/app/job/main/broadcast/conf" |
|
"go-common/library/log" |
|
"go-common/library/naming" |
|
) |
|
|
|
// CometOptions comet options. |
|
type CometOptions struct { |
|
RoutineSize uint64 |
|
RoutineChan uint64 |
|
} |
|
|
|
// Comet is a broadcast comet. |
|
type Comet struct { |
|
serverID string |
|
broadcastClient client.ZergClient |
|
pushChan []chan *client.PushMsgReq |
|
roomChan []chan *client.BroadcastRoomReq |
|
broadcastChan chan *client.BroadcastReq |
|
pushChanNum uint64 |
|
roomChanNum uint64 |
|
options CometOptions |
|
ctx context.Context |
|
cancel context.CancelFunc |
|
} |
|
|
|
// Push push a user message. |
|
func (c *Comet) Push(arg *client.PushMsgReq) (err error) { |
|
idx := atomic.AddUint64(&c.pushChanNum, 1) % c.options.RoutineSize |
|
c.pushChan[idx] <- arg |
|
return |
|
} |
|
|
|
// BroadcastRoom broadcast a room message. |
|
func (c *Comet) BroadcastRoom(arg *client.BroadcastRoomReq) (err error) { |
|
idx := atomic.AddUint64(&c.roomChanNum, 1) % c.options.RoutineSize |
|
c.roomChan[idx] <- arg |
|
return |
|
} |
|
|
|
// Broadcast broadcast a message. |
|
func (c *Comet) Broadcast(arg *client.BroadcastReq) (err error) { |
|
c.broadcastChan <- arg |
|
return |
|
} |
|
|
|
// process |
|
func (c *Comet) process(pushChan chan *client.PushMsgReq, roomChan chan *client.BroadcastRoomReq, broadcastChan chan *client.BroadcastReq) { |
|
var err error |
|
for { |
|
select { |
|
case broadcastArg := <-broadcastChan: |
|
_, err = c.broadcastClient.Broadcast(context.Background(), &client.BroadcastReq{ |
|
Proto: broadcastArg.Proto, |
|
ProtoOp: broadcastArg.ProtoOp, |
|
Speed: broadcastArg.Speed, |
|
Platform: broadcastArg.Platform, |
|
}) |
|
if err != nil { |
|
log.Error("c.broadcastClient.Broadcast(%s, %v, reply) serverId:%d error(%v)", broadcastArg, c.serverID, err) |
|
} |
|
case roomArg := <-roomChan: |
|
_, err = c.broadcastClient.BroadcastRoom(context.Background(), &client.BroadcastRoomReq{ |
|
RoomID: roomArg.RoomID, |
|
Proto: roomArg.Proto, |
|
}) |
|
if err != nil { |
|
log.Error("c.broadcastClient.BroadcastRoom(%s, %v, reply) serverId:%d error(%v)", roomArg, c.serverID, err) |
|
} |
|
case pushArg := <-pushChan: |
|
_, err = c.broadcastClient.PushMsg(context.Background(), &client.PushMsgReq{ |
|
Keys: pushArg.Keys, |
|
Proto: pushArg.Proto, |
|
ProtoOp: pushArg.ProtoOp, |
|
}) |
|
if err != nil { |
|
log.Error("c.broadcastClient.PushMsg(%s, %v, reply) serverId:%d error(%v)", pushArg, c.serverID, err) |
|
} |
|
case <-c.ctx.Done(): |
|
return |
|
} |
|
} |
|
} |
|
|
|
// Close close the resouces. |
|
func (c *Comet) Close() (err error) { |
|
finish := make(chan bool) |
|
go func() { |
|
for { |
|
n := len(c.broadcastChan) |
|
for _, ch := range c.pushChan { |
|
n += len(ch) |
|
} |
|
for _, ch := range c.roomChan { |
|
n += len(ch) |
|
} |
|
if n == 0 { |
|
finish <- true |
|
return |
|
} |
|
time.Sleep(time.Second) |
|
} |
|
}() |
|
select { |
|
case <-finish: |
|
log.Info("close comet finish") |
|
case <-time.After(5 * time.Second): |
|
err = fmt.Errorf("close comet(server:%s push:%d room:%d broadcast:%d) timeout", c.serverID, len(c.pushChan), len(c.roomChan), len(c.broadcastChan)) |
|
} |
|
c.cancel() |
|
return |
|
} |
|
|
|
// NewComet new a comet. |
|
func NewComet(data *naming.Instance, conf *conf.Config, options CometOptions) (*Comet, error) { |
|
c := &Comet{ |
|
serverID: data.Hostname, |
|
pushChan: make([]chan *client.PushMsgReq, options.RoutineSize), |
|
roomChan: make([]chan *client.BroadcastRoomReq, options.RoutineSize), |
|
broadcastChan: make(chan *client.BroadcastReq, options.RoutineSize), |
|
options: options, |
|
} |
|
var grpcAddr string |
|
for _, addrs := range data.Addrs { |
|
u, err := url.Parse(addrs) |
|
if err == nil && u.Scheme == "grpc" { |
|
grpcAddr = u.Host |
|
} |
|
} |
|
if grpcAddr == "" { |
|
return nil, fmt.Errorf("invalid grpc address:%v", data.Addrs) |
|
} |
|
var err error |
|
if c.broadcastClient, err = client.NewClient(grpcAddr, conf.RPC); err != nil { |
|
return nil, err |
|
} |
|
c.ctx, c.cancel = context.WithCancel(context.Background()) |
|
|
|
for i := uint64(0); i < options.RoutineSize; i++ { |
|
c.pushChan[i] = make(chan *client.PushMsgReq, options.RoutineChan) |
|
c.roomChan[i] = make(chan *client.BroadcastRoomReq, options.RoutineChan) |
|
go c.process(c.pushChan[i], c.roomChan[i], c.broadcastChan) |
|
} |
|
return c, nil |
|
}
|
|
|