You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
276 lines
5.3 KiB
276 lines
5.3 KiB
package server |
|
|
|
import ( |
|
"sync" |
|
"sync/atomic" |
|
|
|
"go-common/app/interface/main/broadcast/conf" |
|
"go-common/app/service/main/broadcast/model" |
|
) |
|
|
|
// ProtoRoom room proto. |
|
type ProtoRoom struct { |
|
RoomID string |
|
Proto *model.Proto |
|
} |
|
|
|
// Bucket is a channel holder. |
|
type Bucket struct { |
|
c *conf.Bucket |
|
cLock sync.RWMutex // protect the channels for chs |
|
chs map[string]*Channel // map sub key to a channel |
|
// room |
|
rooms map[string]*Room // bucket room channels |
|
routines []chan ProtoRoom |
|
routinesNum uint64 |
|
|
|
ipCnts map[string]int32 |
|
roomIPCnts map[string]int32 |
|
} |
|
|
|
// NewBucket new a bucket struct. store the key with im channel. |
|
func NewBucket(c *conf.Bucket) (b *Bucket) { |
|
b = new(Bucket) |
|
b.chs = make(map[string]*Channel, c.Channel) |
|
b.ipCnts = make(map[string]int32) |
|
b.roomIPCnts = make(map[string]int32) |
|
b.c = c |
|
b.rooms = make(map[string]*Room, c.Room) |
|
b.routines = make([]chan ProtoRoom, c.RoutineAmount) |
|
for i := uint64(0); i < c.RoutineAmount; i++ { |
|
c := make(chan ProtoRoom, c.RoutineSize) |
|
b.routines[i] = c |
|
go b.roomproc(c) |
|
} |
|
return |
|
} |
|
|
|
// ChannelCount channel count in the bucket |
|
func (b *Bucket) ChannelCount() int { |
|
return len(b.chs) |
|
} |
|
|
|
// RoomCount room count in the bucket |
|
func (b *Bucket) RoomCount() int { |
|
return len(b.rooms) |
|
} |
|
|
|
// RoomsCount get all room id where online number > 0. |
|
func (b *Bucket) RoomsCount() (res map[string]int32) { |
|
var ( |
|
roomID string |
|
room *Room |
|
) |
|
b.cLock.RLock() |
|
res = make(map[string]int32) |
|
for roomID, room = range b.rooms { |
|
if room.Online > 0 { |
|
res[roomID] = room.Online |
|
} |
|
} |
|
b.cLock.RUnlock() |
|
return |
|
} |
|
|
|
// ChangeRoom change ro room |
|
func (b *Bucket) ChangeRoom(nrid string, ch *Channel) (err error) { |
|
var ( |
|
nroom *Room |
|
ok bool |
|
oroom = ch.Room |
|
) |
|
// change to no room |
|
if nrid == model.NoRoom { |
|
if oroom != nil && oroom.Del(ch) { |
|
b.DelRoom(oroom) |
|
} |
|
ch.Room = nil |
|
return |
|
} |
|
b.cLock.Lock() |
|
if nroom, ok = b.rooms[nrid]; !ok { |
|
nroom = NewRoom(nrid) |
|
b.rooms[nrid] = nroom |
|
} |
|
b.cLock.Unlock() |
|
if err = nroom.Put(ch); err != nil { |
|
return |
|
} |
|
ch.Room = nroom |
|
if oroom != nil && oroom.Del(ch) { |
|
b.DelRoom(oroom) |
|
} |
|
return |
|
} |
|
|
|
// Put put a channel according with sub key. |
|
func (b *Bucket) Put(rid string, ch *Channel) (err error) { |
|
var ( |
|
room *Room |
|
ok bool |
|
) |
|
b.cLock.Lock() |
|
// close old channel |
|
if dch := b.chs[ch.Key]; dch != nil { |
|
dch.Close() |
|
} |
|
b.chs[ch.Key] = ch |
|
if rid != model.NoRoom && rid != "" { |
|
if room, ok = b.rooms[rid]; !ok { |
|
room = NewRoom(rid) |
|
b.rooms[rid] = room |
|
} |
|
ch.Room = room |
|
b.roomIPCnts[ch.IP]++ |
|
} |
|
b.ipCnts[ch.IP]++ |
|
b.cLock.Unlock() |
|
if room != nil { |
|
err = room.Put(ch) |
|
} |
|
return |
|
} |
|
|
|
// Del delete the channel by sub key. |
|
func (b *Bucket) Del(dch *Channel) { |
|
var ( |
|
ok bool |
|
ch *Channel |
|
room *Room |
|
) |
|
b.cLock.Lock() |
|
if ch, ok = b.chs[dch.Key]; ok { |
|
if room = ch.Room; room != nil { |
|
if b.roomIPCnts[ch.IP] > 1 { |
|
b.roomIPCnts[ch.IP]-- |
|
} else { |
|
delete(b.roomIPCnts, ch.IP) |
|
} |
|
} |
|
if ch == dch { |
|
delete(b.chs, ch.Key) |
|
} |
|
// ip counter |
|
if b.ipCnts[ch.IP] > 1 { |
|
b.ipCnts[ch.IP]-- |
|
} else { |
|
delete(b.ipCnts, ch.IP) |
|
} |
|
} |
|
b.cLock.Unlock() |
|
if room != nil && room.Del(ch) { |
|
// if empty room, must delete from bucket |
|
b.DelRoom(room) |
|
} |
|
} |
|
|
|
// Channel get a channel by sub key. |
|
func (b *Bucket) Channel(key string) (ch *Channel) { |
|
b.cLock.RLock() |
|
ch = b.chs[key] |
|
b.cLock.RUnlock() |
|
return |
|
} |
|
|
|
// Broadcast push msgs to all channels in the bucket. |
|
func (b *Bucket) Broadcast(p *model.Proto, op int32, platform string) { |
|
var ch *Channel |
|
b.cLock.RLock() |
|
for _, ch = range b.chs { |
|
if !ch.NeedPush(op, platform) { |
|
continue |
|
} |
|
ch.Push(p) |
|
} |
|
b.cLock.RUnlock() |
|
} |
|
|
|
// Room get a room by roomid. |
|
func (b *Bucket) Room(rid string) (room *Room) { |
|
b.cLock.RLock() |
|
room = b.rooms[rid] |
|
b.cLock.RUnlock() |
|
return |
|
} |
|
|
|
// DelRoom delete a room by roomid. |
|
func (b *Bucket) DelRoom(room *Room) { |
|
b.cLock.Lock() |
|
delete(b.rooms, room.ID) |
|
b.cLock.Unlock() |
|
room.Close() |
|
} |
|
|
|
// BroadcastRoom broadcast a message to specified room |
|
func (b *Bucket) BroadcastRoom(arg ProtoRoom) { |
|
num := atomic.AddUint64(&b.routinesNum, 1) % b.c.RoutineAmount |
|
b.routines[num] <- arg |
|
} |
|
|
|
// Rooms get all room id where online number > 0. |
|
func (b *Bucket) Rooms() (res map[string]struct{}) { |
|
var ( |
|
roomID string |
|
room *Room |
|
) |
|
res = make(map[string]struct{}) |
|
b.cLock.RLock() |
|
for roomID, room = range b.rooms { |
|
if room.Online > 0 { |
|
res[roomID] = struct{}{} |
|
} |
|
} |
|
b.cLock.RUnlock() |
|
return |
|
} |
|
|
|
// IPCount get ip count. |
|
func (b *Bucket) IPCount() (res map[string]struct{}) { |
|
var ( |
|
ip string |
|
) |
|
b.cLock.RLock() |
|
res = make(map[string]struct{}, len(b.ipCnts)) |
|
for ip = range b.ipCnts { |
|
res[ip] = struct{}{} |
|
} |
|
b.cLock.RUnlock() |
|
return |
|
} |
|
|
|
// RoomIPCount get ip count. |
|
func (b *Bucket) RoomIPCount() (res map[string]struct{}) { |
|
var ( |
|
ip string |
|
) |
|
b.cLock.RLock() |
|
res = make(map[string]struct{}, len(b.roomIPCnts)) |
|
for ip = range b.roomIPCnts { |
|
res[ip] = struct{}{} |
|
} |
|
b.cLock.RUnlock() |
|
return |
|
} |
|
|
|
// UpRoomsCount update all room count |
|
func (b *Bucket) UpRoomsCount(roomCountMap map[string]int32) { |
|
var ( |
|
roomID string |
|
room *Room |
|
) |
|
b.cLock.RLock() |
|
for roomID, room = range b.rooms { |
|
room.AllOnline = roomCountMap[roomID] |
|
} |
|
b.cLock.RUnlock() |
|
} |
|
|
|
// roomproc |
|
func (b *Bucket) roomproc(c chan ProtoRoom) { |
|
for { |
|
arg := <-c |
|
if room := b.Room(arg.RoomID); room != nil { |
|
room.Push(arg.Proto) |
|
} |
|
} |
|
}
|
|
|