You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
162 lines
3.9 KiB
162 lines
3.9 KiB
package dao |
|
|
|
import ( |
|
"context" |
|
"encoding/json" |
|
"fmt" |
|
"go-common/app/service/video/stream-mng/model" |
|
"go-common/library/log" |
|
"net/http" |
|
"time" |
|
) |
|
|
|
const ( |
|
_localLiveStreamList = "live_stream" |
|
_localStreamInfo = "rid:%d" |
|
_liveExpiredTime = 600 |
|
_localStreamExpiredTime = 1 |
|
) |
|
|
|
type OnAirStream struct { |
|
StreamName string `json:"stream_name, omitempty"` |
|
} |
|
|
|
type OnAirStreamList struct { |
|
List []*OnAirStream `json:"list,omitempty"` |
|
} |
|
|
|
type AllOnAirStream struct { |
|
M map[string]*OnAirStreamList `json:"m,omitempty"` |
|
} |
|
|
|
func (d *Dao) getLocalLiveStreamListKey() string { |
|
return _localLiveStreamList |
|
} |
|
|
|
func (d *Dao) getLocalStreamInfoKey(rid int64) string { |
|
return fmt.Sprintf(_localStreamInfo, rid) |
|
} |
|
|
|
// loadLiveStreamList 判断流是否在播 |
|
func (d *Dao) LoadLiveStreamList(c context.Context, rids []int64) map[int64]bool { |
|
list, _ := d.localCache.Get(d.getLocalLiveStreamListKey()) |
|
|
|
isLive := map[int64]bool{} |
|
if res, ok := list.(map[string]int); ok { |
|
for _, v := range rids { |
|
// rid => stream |
|
info, err := d.streamFullInfo(c, v, "") |
|
if err != nil || info == nil { |
|
continue |
|
} |
|
|
|
var sname string |
|
var origin int64 |
|
for _, v := range info.List { |
|
if v.Type == 1 { |
|
sname = v.StreamName |
|
origin = v.Origin |
|
break |
|
} |
|
} |
|
// 不在在播列表&orgin 为0===》不在播 |
|
if _, exe := res[sname]; !exe { |
|
if origin == 0 { |
|
isLive[v] = false |
|
continue |
|
} |
|
} |
|
isLive[v] = true |
|
} |
|
} else { |
|
// 当从缓存中获取失败 or 项目刚刚启动 |
|
for _, v := range rids { |
|
isLive[v] = true |
|
} |
|
} |
|
return isLive |
|
} |
|
|
|
// StoreLiveStreamList 刷新在播列表缓存 |
|
func (d *Dao) StoreLiveStreamList() { |
|
ctx := context.Background() |
|
|
|
type liveStream struct { |
|
Code int `json:"code,omitempty"` |
|
Data *AllOnAirStream `json:"data,omitempty"` |
|
} |
|
|
|
resp := &liveStream{} |
|
uri := d.getLiveStreamUrl("/api/live/vendor/onairstreamlist?cdn=bvc") |
|
|
|
err := d.NewRequst(ctx, http.MethodGet, uri, nil, nil, nil, resp) |
|
if err != nil { |
|
log.Errorv(ctx, log.KV("log", fmt.Sprintf("http_live_err=%v", err))) |
|
return |
|
} |
|
|
|
res := map[string]int{} |
|
if resp.Code == 0 && resp.Data != nil && resp.Data.M != nil && len(resp.Data.M["BVC"].List) > 0 { |
|
for _, v := range resp.Data.M["BVC"].List { |
|
res[v.StreamName] = 1 |
|
} |
|
} else { |
|
res, _ := json.Marshal(resp) |
|
log.Errorv(ctx, log.KV("log", fmt.Sprintf("http_live_err=%v", string(res)))) |
|
} |
|
|
|
// 存10分钟 |
|
d.localCache.SetWithExpire(d.getLocalLiveStreamListKey(), res, _liveExpiredTime*time.Second) |
|
} |
|
|
|
// storeStreamInfo 存储单个流信息 |
|
func (d *Dao) storeStreamInfo(c context.Context, info *model.StreamFullInfo) { |
|
if info == nil || info.RoomID < 0 { |
|
return |
|
} |
|
key := d.getLocalStreamInfoKey(info.RoomID) |
|
|
|
d.localCache.SetWithExpire(key, info, _localStreamExpiredTime*time.Second) |
|
} |
|
|
|
// loadStreamInfo 读取单个信息 |
|
func (d *Dao) loadStreamInfo(c context.Context, rid int64) *model.StreamFullInfo { |
|
key := d.getLocalStreamInfoKey(rid) |
|
info, _ := d.localCache.Get(key) |
|
if res, ok := info.(*model.StreamFullInfo); ok { |
|
if res != nil && res.RoomID > 0 { |
|
return res |
|
} |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// storeMultiStreamInfo 存储多路流 |
|
func (d *Dao) storeMultiStreamInfo(c context.Context, infos map[int64]*model.StreamFullInfo) { |
|
for _, v := range infos { |
|
key := d.getLocalStreamInfoKey(v.RoomID) |
|
|
|
//log.Warn("key=%v", key) |
|
d.localCache.SetWithExpire(key, v, _localStreamExpiredTime*time.Second) |
|
} |
|
} |
|
|
|
// loadMultiStreamInfo 读取批量信息 |
|
func (d *Dao) loadMultiStreamInfo(c context.Context, rids []int64) (map[int64]*model.StreamFullInfo, []int64) { |
|
infos := map[int64]*model.StreamFullInfo{} |
|
missRids := []int64{} |
|
for _, v := range rids { |
|
key := d.getLocalStreamInfoKey(v) |
|
info, _ := d.localCache.Get(key) |
|
if res, ok := info.(*model.StreamFullInfo); ok { |
|
if res != nil && res.RoomID > 0 { |
|
infos[v] = res |
|
} |
|
} else { |
|
missRids = append(missRids, v) |
|
} |
|
} |
|
|
|
return infos, missRids |
|
}
|
|
|