You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
106 lines
2.1 KiB
106 lines
2.1 KiB
package service |
|
|
|
import ( |
|
"time" |
|
|
|
"go-common/app/service/main/broadcast/libs/bytes" |
|
"go-common/app/service/main/broadcast/model" |
|
"go-common/library/log" |
|
) |
|
|
|
// RoomOptions room options. |
|
type RoomOptions struct { |
|
BatchNum int |
|
SignalTime time.Duration |
|
} |
|
|
|
// Room room. |
|
type Room struct { |
|
s *Service |
|
id string |
|
proto chan *model.Proto |
|
} |
|
|
|
var ( |
|
roomReadyProto = new(model.Proto) |
|
) |
|
|
|
// NewRoom new a room struct, store channel room info. |
|
func NewRoom(s *Service, id string, options RoomOptions) (r *Room) { |
|
r = &Room{ |
|
s: s, |
|
id: id, |
|
proto: make(chan *model.Proto, options.BatchNum*2), |
|
} |
|
go r.pushproc(options.BatchNum, options.SignalTime) |
|
return |
|
} |
|
|
|
// Push push msg to the room, if chan full discard it. |
|
func (r *Room) Push(op int32, msg []byte, contentType int32) (err error) { |
|
var p = &model.Proto{ |
|
Ver: 1, |
|
Operation: op, |
|
ContentType: contentType, |
|
Body: msg, |
|
} |
|
select { |
|
case r.proto <- p: |
|
default: |
|
err = ErrRoomFull |
|
} |
|
return |
|
} |
|
|
|
// pushproc merge proto and push msgs in batch. |
|
func (r *Room) pushproc(batch int, sigTime time.Duration) { |
|
var ( |
|
n int |
|
last time.Time |
|
p *model.Proto |
|
buf = bytes.NewWriterSize(int(model.MaxBodySize)) |
|
) |
|
log.Info("start room:%s goroutine", r.id) |
|
td := time.AfterFunc(sigTime, func() { |
|
select { |
|
case r.proto <- roomReadyProto: |
|
default: |
|
} |
|
}) |
|
defer td.Stop() |
|
for { |
|
if p = <-r.proto; p == nil { |
|
break // exit |
|
} else if p != roomReadyProto { |
|
// merge buffer ignore error, always nil |
|
p.WriteTo(buf) |
|
if n++; n == 1 { |
|
last = time.Now() |
|
td.Reset(sigTime) |
|
continue |
|
} else if n < batch { |
|
if sigTime > time.Since(last) { |
|
continue |
|
} |
|
} |
|
} else { |
|
if n == 0 { |
|
break |
|
} |
|
} |
|
r.s.broadcastRoomRawBytes(r.id, buf.Buffer()) |
|
// TODO use reset buffer |
|
// after push to room channel, renew a buffer, let old buffer gc |
|
buf = bytes.NewWriterSize(buf.Size()) |
|
n = 0 |
|
if r.s.conf.Room.Idle != 0 { |
|
td.Reset(time.Duration(r.s.conf.Room.Idle)) |
|
} else { |
|
td.Reset(time.Minute) |
|
} |
|
} |
|
r.s.roomsMutex.Lock() |
|
delete(r.s.rooms, r.id) |
|
r.s.roomsMutex.Unlock() |
|
log.Info("room:%s goroutine exit", r.id) |
|
}
|
|
|