You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
419 lines
10 KiB
419 lines
10 KiB
package service |
|
|
|
import ( |
|
"context" |
|
"encoding/json" |
|
"math/rand" |
|
"regexp" |
|
"strings" |
|
"time" |
|
|
|
"go-common/app/job/main/dm2/model" |
|
"go-common/app/service/main/archive/api" |
|
arcMdl "go-common/app/service/main/archive/model/archive" |
|
filterMdl "go-common/app/service/main/filter/api/grpc/v1" |
|
"go-common/library/ecode" |
|
"go-common/library/log" |
|
"go-common/library/net/metadata" |
|
"go-common/library/queue/databus" |
|
xtime "go-common/library/time" |
|
) |
|
|
|
var ( |
|
msgRegex = regexp.MustCompile(`^(\s|\xE3\x80\x80)*$`) // 全文仅空格 |
|
|
|
_bnjDmMsgLen = 100 |
|
|
|
_dateFormat = "2006-01-02 15:04:05" |
|
) |
|
|
|
func init() { |
|
rand.Seed(time.Now().Unix()) |
|
} |
|
|
|
func (s *Service) initBnj() { |
|
var err error |
|
if s.conf.BNJ.Aid <= 0 { |
|
return |
|
} |
|
s.bnjAid = s.conf.BNJ.Aid |
|
//bnj count |
|
if s.conf.BNJ.BnjCounter != nil { |
|
bnjSubAids := make(map[int64]struct{}) |
|
for _, aid := range s.conf.BNJ.BnjCounter.SubAids { |
|
bnjSubAids[aid] = struct{}{} |
|
} |
|
s.bnjSubAids = bnjSubAids |
|
} |
|
// bnj danmu |
|
s.bnjVideos(context.TODO()) |
|
s.bnjLiveConfig(context.TODO()) |
|
go func() { |
|
ticker := time.NewTicker(time.Second * 30) |
|
for range ticker.C { |
|
s.bnjVideos(context.TODO()) |
|
s.bnjLiveConfig(context.TODO()) |
|
} |
|
}() |
|
s.bnjIgnoreRate = s.conf.BNJ.BnjLiveDanmu.IgnoreRate |
|
s.bnjIgnoreBeginTime = time.Duration(s.conf.BNJ.BnjLiveDanmu.IgnoreBegin) |
|
s.bnjIgnoreEndTime = time.Duration(s.conf.BNJ.BnjLiveDanmu.IgnoreEnd) |
|
s.bnjliveRoomID = s.conf.BNJ.BnjLiveDanmu.RoomID |
|
s.bnjUserLevel = s.conf.BNJ.BnjLiveDanmu.Level |
|
if s.bnjStart, err = time.ParseInLocation(_dateFormat, s.conf.BNJ.BnjLiveDanmu.Start, time.Now().Location()); err != nil { |
|
panic(err) |
|
} |
|
s.bnjCsmr = databus.New(s.conf.Databus.BnjCsmr) |
|
log.Info("bnj init start:%v room_id:%v", s.bnjStart.String(), s.conf.BNJ.BnjLiveDanmu.RoomID) |
|
go s.bnjProc() |
|
} |
|
|
|
func (s *Service) bnjProc() { |
|
var ( |
|
err error |
|
c = context.Background() |
|
) |
|
for { |
|
msg, ok := <-s.bnjCsmr.Messages() |
|
if !ok { |
|
log.Error("bnj bnjProc consumer exit") |
|
return |
|
} |
|
log.Info("bnj partition:%d,offset:%d,key:%s,value:%s", msg.Partition, msg.Offset, msg.Key, msg.Value) |
|
m := &model.LiveDanmu{} |
|
if err = json.Unmarshal(msg.Value, m); err != nil { |
|
log.Error("json.Unmarshal(%v) error(%v)", string(msg.Value), err) |
|
continue |
|
} |
|
if err = s.bnjLiveDanmu(c, m); err != nil { |
|
log.Error("bnj bnjLiveDanmu(msg:%+v),error(%v)", m, err) |
|
continue |
|
} |
|
if err = msg.Commit(); err != nil { |
|
log.Error("commit offset(%v) error(%v)", msg, err) |
|
} |
|
} |
|
} |
|
|
|
func (s *Service) bnjVideos(c context.Context) (err error) { |
|
var ( |
|
videos []*model.Video |
|
) |
|
if videos, err = s.dao.Videos(c, s.bnjAid); err != nil { |
|
log.Error("bnj bnjVideos(aid:%v) error(%v)", s.bnjAid, err) |
|
return |
|
} |
|
if len(videos) >= 4 { |
|
videos = videos[:4] |
|
} |
|
for _, video := range videos { |
|
if err = s.syncBnjVideo(c, model.SubTypeVideo, video); err != nil { |
|
log.Error("bnj syncBnjVideo(video:%+v) error(%v)", video, err) |
|
return |
|
} |
|
} |
|
s.bnjArcVideos = videos |
|
return |
|
} |
|
|
|
func (s *Service) syncBnjVideo(c context.Context, tp int32, v *model.Video) (err error) { |
|
sub, err := s.dao.Subject(c, tp, v.Cid) |
|
if err != nil { |
|
return |
|
} |
|
if sub == nil { |
|
if v.XCodeState >= model.VideoXcodeHDFinish { |
|
if _, err = s.dao.AddSubject(c, tp, v.Cid, v.Aid, v.Mid, s.maxlimit(v.Duration), 0); err != nil { |
|
return |
|
} |
|
} |
|
} else { |
|
if sub.Mid != v.Mid { |
|
if _, err = s.dao.UpdateSubMid(c, tp, v.Cid, v.Mid); err != nil { |
|
return |
|
} |
|
} |
|
} |
|
return |
|
} |
|
|
|
// bnjDmCount laji bnj count |
|
func (s *Service) bnjDmCount(c context.Context, sub *model.Subject, dm *model.DM) (err error) { |
|
var ( |
|
dmid int64 |
|
pages []*api.Page |
|
chosen *api.Page |
|
choseSub *model.Subject |
|
) |
|
if _, ok := s.bnjSubAids[sub.Pid]; !ok { |
|
return |
|
} |
|
if pages, err = s.arcRPC.Page3(c, &arcMdl.ArgAid2{ |
|
Aid: s.bnjAid, |
|
RealIP: metadata.String(c, metadata.RemoteIP), |
|
}); err != nil { |
|
log.Error("bnjDmCount Page3(aid:%v) error(%v)", sub.Pid, err) |
|
return |
|
} |
|
if len(pages) <= 0 { |
|
return |
|
} |
|
idx := time.Now().Unix() % int64(len(pages)) |
|
if chosen = pages[idx]; chosen == nil { |
|
return |
|
} |
|
if choseSub, err = s.subject(c, model.SubTypeVideo, chosen.Cid); err != nil { |
|
return |
|
} |
|
if dmid, err = s.genDMID(c); err != nil { |
|
log.Error("bnjDmCount genDMID() error(%v)", err) |
|
return |
|
} |
|
forkDM := &model.DM{ |
|
ID: dmid, |
|
Type: model.SubTypeVideo, |
|
Oid: chosen.Cid, |
|
Mid: dm.Mid, |
|
Progress: int32((chosen.Duration + 1) * 1000), |
|
Pool: dm.Pool, |
|
State: model.StateAdminDelete, |
|
Ctime: dm.Ctime, |
|
Mtime: dm.Mtime, |
|
Content: &model.Content{ |
|
ID: dmid, |
|
FontSize: dm.Content.FontSize, |
|
Color: dm.Content.Color, |
|
Mode: dm.Content.Mode, |
|
IP: dm.Content.IP, |
|
Plat: dm.Content.Plat, |
|
Msg: dm.Content.Msg, |
|
Ctime: dm.Content.Ctime, |
|
Mtime: dm.Content.Mtime, |
|
}, |
|
} |
|
if dm.Pool == model.PoolSpecial { |
|
forkDM.ContentSpe = &model.ContentSpecial{ |
|
ID: dmid, |
|
Msg: dm.ContentSpe.Msg, |
|
Ctime: dm.ContentSpe.Ctime, |
|
Mtime: dm.ContentSpe.Mtime, |
|
} |
|
} |
|
if err = s.bnjAddDM(c, choseSub, forkDM); err != nil { |
|
return |
|
} |
|
return |
|
} |
|
|
|
// bnjAddDM add dm index and content to db by transaction. |
|
func (s *Service) bnjAddDM(c context.Context, sub *model.Subject, dm *model.DM) (err error) { |
|
if dm.State != model.StateAdminDelete { |
|
return |
|
} |
|
tx, err := s.dao.BeginTran(c) |
|
if err != nil { |
|
return |
|
} |
|
// special dm |
|
if dm.Pool == model.PoolSpecial && dm.ContentSpe != nil { |
|
if _, err = s.dao.TxAddContentSpecial(tx, dm.ContentSpe); err != nil { |
|
return tx.Rollback() |
|
} |
|
} |
|
if _, err = s.dao.TxAddContent(tx, dm.Oid, dm.Content); err != nil { |
|
return tx.Rollback() |
|
} |
|
if _, err = s.dao.TxAddIndex(tx, dm); err != nil { |
|
return tx.Rollback() |
|
} |
|
if _, err = s.dao.TxIncrSubjectCount(tx, sub.Type, sub.Oid, 1, 0, sub.Childpool); err != nil { |
|
return tx.Rollback() |
|
} |
|
return tx.Commit() |
|
} |
|
|
|
func (s *Service) genDMID(c context.Context) (dmid int64, err error) { |
|
if dmid, err = s.seqRPC.ID(c, s.seqArg); err != nil { |
|
log.Error("seqRPC.ID() error(%v)", err) |
|
return |
|
} |
|
return |
|
} |
|
|
|
// bnjLiveDanmu laji live to video |
|
// TODO stime |
|
func (s *Service) bnjLiveDanmu(c context.Context, liveDanmu *model.LiveDanmu) (err error) { |
|
var ( |
|
cid, dmid int64 |
|
progress float64 |
|
) |
|
// ignore time before |
|
if time.Since(s.bnjStart) < 0 { |
|
return |
|
} |
|
// limit |
|
if liveDanmu == nil || s.bnjliveRoomID <= 0 || s.bnjliveRoomID != liveDanmu.RoomID || liveDanmu.MsgType != model.LiveDanmuMsgTypeNormal { |
|
return |
|
} |
|
if liveDanmu.UserLevel < s.bnjUserLevel { |
|
return |
|
} |
|
if s.bnjIgnoreRate <= 0 || rand.Int63n(s.bnjIgnoreRate) != 0 { |
|
return |
|
} |
|
if cid, progress, err = s.pickBnjVideo(c, liveDanmu.Time); err != nil { |
|
return |
|
} |
|
// ignore illegal progress |
|
if progress <= 0 { |
|
return |
|
} |
|
if err = s.checkBnjDmMsg(c, liveDanmu.Content); err != nil { |
|
log.Error("bnj bnjLiveDanmu checkBnjDmMsg(liveDanmu:%+v) error(%v)", liveDanmu, err) |
|
return |
|
} |
|
if dmid, err = s.genDMID(c); err != nil { |
|
log.Error("bnj bnjLiveDanmu genDMID() error(%v)", err) |
|
return |
|
} |
|
now := time.Now().Unix() |
|
forkDM := &model.DM{ |
|
ID: dmid, |
|
Type: model.SubTypeVideo, |
|
Oid: cid, |
|
Mid: liveDanmu.UID, |
|
Progress: int32(progress * 1000), |
|
Pool: model.PoolNormal, |
|
State: model.StateMonitorAfter, |
|
Ctime: model.ConvertStime(time.Now()), |
|
Mtime: model.ConvertStime(time.Now()), |
|
Content: &model.Content{ |
|
ID: dmid, |
|
FontSize: 25, |
|
Color: 16777215, |
|
Mode: model.ModeRolling, |
|
Plat: 0, |
|
Msg: liveDanmu.Content, |
|
Ctime: xtime.Time(now), |
|
Mtime: xtime.Time(now), |
|
}, |
|
} |
|
if err = s.bnjCheckFilterService(c, forkDM); err != nil { |
|
log.Error("s.bnjCheckFilterService(%+v) error(%v)", forkDM, err) |
|
return |
|
} |
|
var ( |
|
bs []byte |
|
) |
|
if bs, err = json.Marshal(forkDM); err != nil { |
|
log.Error("json.Marshal(%+v) error(%v)", forkDM, err) |
|
return |
|
} |
|
act := &model.Action{ |
|
Action: model.ActAddDM, |
|
Data: bs, |
|
} |
|
if err = s.actionAct(c, act); err != nil { |
|
log.Error("s.actionAddDM(%+v) error(%v)", liveDanmu, err) |
|
return |
|
} |
|
return |
|
} |
|
|
|
func (s *Service) pickBnjVideo(c context.Context, timestamp int64) (cid int64, progress float64, err error) { |
|
var ( |
|
idx int |
|
video *model.Video |
|
) |
|
progress = float64(timestamp - s.bnjStart.Unix()) |
|
for idx, video = range s.bnjArcVideos { |
|
if progress > float64(video.Duration) { |
|
progress = progress - float64(video.Duration) |
|
continue |
|
} |
|
// ignore p1 start |
|
if idx != 0 && progress < s.bnjIgnoreBeginTime.Seconds() { |
|
err = ecode.DMProgressTooBig |
|
return |
|
} |
|
if float64(video.Duration)-progress < s.bnjIgnoreEndTime.Seconds() { |
|
err = ecode.DMProgressTooBig |
|
return |
|
} |
|
if progress >= 0 { |
|
progress = progress + float64(rand.Int31n(1000)/1000) |
|
} |
|
cid = video.Cid |
|
return |
|
} |
|
err = ecode.DMProgressTooBig |
|
return |
|
} |
|
|
|
func (s *Service) bnjCheckFilterService(c context.Context, dm *model.DM) (err error) { |
|
var ( |
|
filterReply *filterMdl.FilterReply |
|
) |
|
if filterReply, err = s.filterRPC.Filter(c, &filterMdl.FilterReq{ |
|
Area: "danmu", |
|
Message: dm.Content.Msg, |
|
Id: dm.ID, |
|
Oid: dm.Oid, |
|
Mid: dm.Mid, |
|
}); err != nil { |
|
log.Error("checkFilterService(dm:%+v),err(%v)", dm, err) |
|
return |
|
} |
|
if filterReply.Level > 0 || filterReply.Limit == model.SpamBlack || filterReply.Limit == model.SpamOverflow { |
|
dm.State = model.StateFilter |
|
log.Info("bnj filter service delete(dmid:%d,data:+%v)", dm.ID, filterReply) |
|
} |
|
return |
|
} |
|
|
|
func (s *Service) checkBnjDmMsg(c context.Context, msg string) (err error) { |
|
var ( |
|
msgLen = len([]rune(msg)) |
|
) |
|
if msgRegex.MatchString(msg) { // 空白弹幕 |
|
err = ecode.DMMsgIlleagel |
|
return |
|
} |
|
if msgLen > _bnjDmMsgLen { |
|
err = ecode.DMMsgTooLong |
|
return |
|
} |
|
if strings.Contains(msg, `\n`) || strings.Contains(msg, `/n`) { |
|
err = ecode.DMMsgIlleagel |
|
return |
|
} |
|
return |
|
} |
|
|
|
func (s *Service) bnjLiveConfig(c context.Context) (err error) { |
|
var ( |
|
bnjConfig *model.BnjLiveConfig |
|
start time.Time |
|
) |
|
if bnjConfig, err = s.dao.BnjConfig(c); err != nil { |
|
log.Error("bnjLiveConfig error current:%v err:%+v", time.Now().String(), err) |
|
return |
|
} |
|
if bnjConfig == nil { |
|
log.Error("bnjLiveConfig error current:%v bnjConfig nil", time.Now().String()) |
|
return |
|
} |
|
if start, err = time.ParseInLocation(_dateFormat, bnjConfig.DanmuDtarTime, time.Now().Location()); err != nil { |
|
log.Error("bnjLiveConfig start time error current:%v config:%+v", time.Now().String(), bnjConfig) |
|
return |
|
} |
|
if bnjConfig.CommentID <= 0 || bnjConfig.RoomID <= 0 { |
|
log.Info("bnjLiveConfig illegal current:%v config:%+v", time.Now().String(), bnjConfig) |
|
return |
|
} |
|
s.bnjAid = bnjConfig.CommentID |
|
s.bnjliveRoomID = bnjConfig.RoomID |
|
s.bnjStart = start |
|
log.Info("bnjLiveConfig ok current:%v config:%+v", time.Now().String(), bnjConfig) |
|
return |
|
}
|
|
|