You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
439 lines
11 KiB
439 lines
11 KiB
package service |
|
|
|
import ( |
|
"context" |
|
"encoding/json" |
|
"errors" |
|
"fmt" |
|
"strconv" |
|
"strings" |
|
"sync" |
|
"sync/atomic" |
|
"time" |
|
|
|
"go-common/app/job/main/click/conf" |
|
"go-common/app/job/main/click/dao" |
|
"go-common/app/job/main/click/model" |
|
"go-common/app/service/main/archive/api" |
|
arcrpc "go-common/app/service/main/archive/api/gorpc" |
|
arcmdl "go-common/app/service/main/archive/model/archive" |
|
"go-common/library/cache/redis" |
|
"go-common/library/log" |
|
"go-common/library/log/infoc" |
|
"go-common/library/queue/databus" |
|
) |
|
|
|
const ( |
|
_unLock = 0 |
|
_locked = 1 |
|
) |
|
|
|
// Service struct |
|
type Service struct { |
|
c *conf.Config |
|
db *dao.Dao |
|
// archive |
|
reportMergeSub *databus.Databus |
|
statViewPub *databus.Databus |
|
chanWg sync.WaitGroup |
|
redis *redis.Pool |
|
cliChan []chan *model.ClickMsg |
|
closed bool |
|
maxAID int64 |
|
gotMaxAIDTime int64 |
|
lockedMap []int64 |
|
currentLockedIdx int64 |
|
// aid%50[aid[plat[cnt]]] |
|
aidMap []map[int64]*model.ClickInfo |
|
// send databus chan |
|
busChan chan *model.StatMsg |
|
bigDataChan chan *model.BigDataMsg |
|
// forbid cache |
|
forbids map[int64]map[int8]*model.Forbid |
|
forbidMids map[int64]struct{} |
|
// epid to aid map |
|
eTam map[int64]int64 |
|
etamMutex sync.RWMutex |
|
infoc2 *infoc.Infoc |
|
arcRPC *arcrpc.Service2 |
|
arcDurWithMutex struct { |
|
Durations map[int64]*model.ArcDuration |
|
Mutex sync.RWMutex |
|
} |
|
allowPlat map[int8]struct{} |
|
bnjListAidMap map[int64]struct{} |
|
} |
|
|
|
// New is archive service implementation. |
|
func New(c *conf.Config) (s *Service) { |
|
s = &Service{ |
|
c: c, |
|
arcRPC: arcrpc.New2(c.ArchiveRPC), |
|
redis: redis.NewPool(c.Redis), |
|
db: dao.New(c), |
|
busChan: make(chan *model.StatMsg, 10240), |
|
bigDataChan: make(chan *model.BigDataMsg, 10240), |
|
reportMergeSub: databus.New(c.ReportMergeDatabus), |
|
statViewPub: databus.New(c.StatViewPub), |
|
infoc2: infoc.New(c.Infoc2), |
|
allowPlat: make(map[int8]struct{}), |
|
} |
|
s.allowPlat[model.PlatForWeb] = struct{}{} |
|
s.allowPlat[model.PlatForH5] = struct{}{} |
|
s.allowPlat[model.PlatForOuter] = struct{}{} |
|
s.allowPlat[model.PlatForIos] = struct{}{} |
|
s.allowPlat[model.PlatForAndroid] = struct{}{} |
|
s.allowPlat[model.PlatForAndroidTV] = struct{}{} |
|
s.allowPlat[model.PlatForAutoPlayIOS] = struct{}{} |
|
s.allowPlat[model.PlafForAutoPlayInlineIOS] = struct{}{} |
|
s.allowPlat[model.PlatForAutoPlayAndroid] = struct{}{} |
|
s.allowPlat[model.PlatForAutoPlayInlineAndroid] = struct{}{} |
|
s.arcDurWithMutex.Durations = make(map[int64]*model.ArcDuration) |
|
s.loadConf() |
|
go s.confproc() |
|
go s.releaseAIDMap() |
|
for i := int64(0); i < s.c.ChanNum; i++ { |
|
s.aidMap = append(s.aidMap, make(map[int64]*model.ClickInfo, 300000)) |
|
s.cliChan = append(s.cliChan, make(chan *model.ClickMsg, 256)) |
|
s.lockedMap = append(s.lockedMap, _unLock) |
|
} |
|
for i := int64(0); i < s.c.ChanNum; i++ { |
|
s.chanWg.Add(1) |
|
go s.cliChanProc(i) |
|
} |
|
for i := 0; i < 10; i++ { |
|
s.chanWg.Add(1) |
|
go s.sendStat() |
|
} |
|
s.chanWg.Add(1) |
|
go s.sendBigDataMsg() |
|
s.chanWg.Add(1) |
|
go s.reportMergeSubConsumer() |
|
return s |
|
} |
|
|
|
func (s *Service) reportMergeSubConsumer() { |
|
defer s.chanWg.Done() |
|
msgs := s.reportMergeSub.Messages() |
|
for { |
|
msg, ok := <-msgs |
|
if !ok || s.closed { |
|
log.Info("s.reportMergeSub is closed") |
|
return |
|
} |
|
msg.Commit() |
|
var ( |
|
sbs [][]byte |
|
err error |
|
) |
|
if err = json.Unmarshal(msg.Value, &sbs); err != nil { |
|
log.Error("json.Unmarshal(%v) error(%v)", msg.Value, err) |
|
continue |
|
} |
|
for _, bs := range sbs { |
|
var ( |
|
click *model.ClickMsg |
|
allow bool |
|
now = time.Now().Unix() |
|
) |
|
log.Info("split merged message(%s)", strings.Replace(string(bs), "\001", "|", -1)) |
|
if click, err = s.checkMsgIllegal(bs); err != nil { |
|
log.Error("s.checkMsgIllegal(%s) error(%v)", strings.Replace(string(bs), "\001", "|", -1), err) |
|
continue |
|
} |
|
if s.maxAID > 0 && now-s.gotMaxAIDTime < 120 { |
|
allow = s.maxAID+300 > click.AID |
|
} |
|
if !allow { |
|
log.Error("maxAid(%d) currentAid(%d) not allow!!!!", s.maxAID, click.AID) |
|
continue |
|
} |
|
log.Info("merge consumer(%d) append to chan", click.AID) |
|
s.cliChan[click.AID%s.c.ChanNum] <- click |
|
} |
|
} |
|
} |
|
|
|
func (s *Service) loadConf() { |
|
var ( |
|
forbids map[int64]map[int8]*model.Forbid |
|
bnjListAids = make(map[int64]struct{}) |
|
forbidMids map[int64]struct{} |
|
etam map[int64]int64 |
|
maxAID int64 |
|
err error |
|
) |
|
for _, aid := range s.c.BnjListAids { |
|
bnjListAids[aid] = struct{}{} |
|
} |
|
s.bnjListAidMap = bnjListAids |
|
if forbidMids, err = s.db.ForbidMids(context.Background()); err == nil { |
|
s.forbidMids = forbidMids |
|
log.Info("forbid mids(%d)", len(forbidMids)) |
|
} |
|
if forbids, err = s.db.Forbids(context.TODO()); err == nil { |
|
s.forbids = forbids |
|
log.Info("forbid av(%d)", len(forbids)) |
|
} |
|
if maxAID, err = s.db.MaxAID(context.TODO()); err == nil { |
|
s.maxAID = maxAID |
|
s.gotMaxAIDTime = time.Now().Unix() |
|
} |
|
if etam, err = s.db.LoadAllBangumi(context.TODO()); err == nil { |
|
s.etamMutex.Lock() |
|
s.eTam = etam |
|
s.etamMutex.Unlock() |
|
} |
|
} |
|
|
|
func (s *Service) releaseAIDMap() { |
|
for { |
|
time.Sleep(5 * time.Minute) |
|
now := time.Now() |
|
if (now.Hour() > 1 && now.Hour() < 6) || (now.Hour() == 6 && now.Minute() < 30) { // 2:00 to 6:30 |
|
if s.currentLockedIdx < int64(len(s.aidMap)) { |
|
atomic.StoreInt64(&s.lockedMap[s.currentLockedIdx], _locked) |
|
} |
|
s.currentLockedIdx++ |
|
continue |
|
} |
|
s.currentLockedIdx = 0 |
|
} |
|
} |
|
|
|
func (s *Service) confproc() { |
|
for { |
|
time.Sleep(1 * time.Minute) |
|
s.loadConf() |
|
} |
|
} |
|
|
|
func (s *Service) sendBigDataMsg() { |
|
defer s.chanWg.Done() |
|
for { |
|
var ( |
|
msg *model.BigDataMsg |
|
msgBs []byte |
|
ok bool |
|
err error |
|
infos []interface{} |
|
) |
|
if msg, ok = <-s.bigDataChan; !ok { |
|
break |
|
} |
|
infos = append(infos, strconv.FormatInt(int64(msg.Tp), 10)) |
|
for _, v := range strings.Split(msg.Info, "\001") { |
|
infos = append(infos, v) |
|
} |
|
log.Info("truly used %+v", infos) |
|
if err = s.infoc2.Info(infos...); err != nil { |
|
log.Error("s.infoc2.Info(%s) error(%v)", msgBs, err) |
|
continue |
|
} |
|
} |
|
} |
|
|
|
func (s *Service) sendStat() { |
|
defer s.chanWg.Done() |
|
for { |
|
var ( |
|
msg *model.StatMsg |
|
ok bool |
|
c = context.TODO() |
|
err error |
|
key string |
|
) |
|
if msg, ok = <-s.busChan; !ok { |
|
break |
|
} |
|
key = strconv.FormatInt(msg.AID, 10) |
|
vmsg := &model.StatViewMsg{Type: "archive", ID: msg.AID, Count: msg.Click, Ts: time.Now().Unix()} |
|
if err = s.statViewPub.Send(c, key, vmsg); err != nil { |
|
log.Error("s.statViewPub.Send(%d, %+v) error(%v)", msg.AID, vmsg, err) |
|
} |
|
} |
|
} |
|
|
|
func (s *Service) cliChanProc(i int64) { |
|
defer s.chanWg.Done() |
|
var ( |
|
cli *model.ClickMsg |
|
cliChan = s.cliChan[i] |
|
ok bool |
|
) |
|
for { |
|
if cli, ok = <-cliChan; !ok { |
|
s.countClick(context.TODO(), nil, i) |
|
return |
|
} |
|
var ( |
|
rtype int8 |
|
err error |
|
c = context.TODO() |
|
) |
|
if rtype, err = s.isAllow(c, cli); err != nil { |
|
log.Error("cliChanProc Err %v", err) |
|
} |
|
select { |
|
case s.bigDataChan <- &model.BigDataMsg{Info: string(cli.KafkaBs), Tp: rtype}: |
|
default: |
|
log.Error("s.bigDataChan is full") |
|
} |
|
if rtype == model.LogTypeForTurly { |
|
s.countClick(context.TODO(), cli, i) |
|
} |
|
} |
|
} |
|
|
|
func (s *Service) checkMsgIllegal(msg []byte) (click *model.ClickMsg, err error) { |
|
var ( |
|
aid int64 |
|
clickMsg []string |
|
plat int64 |
|
did string |
|
buvid string |
|
mid int64 |
|
lv int64 |
|
ctime int64 |
|
stime int64 |
|
epid int64 |
|
ip string |
|
seasonType int |
|
userAgent string |
|
) |
|
clickMsg = strings.Split(string(msg), "\001") |
|
if len(clickMsg) < 10 { |
|
err = errors.New("click msg error") |
|
return |
|
} |
|
if aid, err = strconv.ParseInt(clickMsg[1], 10, 64); err != nil { |
|
err = fmt.Errorf("aid(%s) error", clickMsg[1]) |
|
return |
|
} |
|
if aid <= 0 { |
|
err = fmt.Errorf("wocao aid(%s) error", clickMsg[1]) |
|
return |
|
} |
|
if plat, err = strconv.ParseInt(clickMsg[0], 10, 64); err != nil { |
|
err = fmt.Errorf("plat(%s) error", clickMsg[0]) |
|
return |
|
} |
|
|
|
if _, ok := s.allowPlat[int8(plat)]; !ok { |
|
err = fmt.Errorf("plat(%d) is illegal", plat) |
|
return |
|
} |
|
userAgent = clickMsg[10] |
|
did = clickMsg[8] |
|
if did == "" { |
|
err = fmt.Errorf("bvID(%s) is illegal", clickMsg[8]) |
|
return |
|
} |
|
buvid = clickMsg[11] |
|
if clickMsg[4] != "" && clickMsg[4] != "0" { |
|
if mid, err = strconv.ParseInt(clickMsg[4], 10, 64); err != nil { |
|
err = fmt.Errorf("mid(%s) is illegal", clickMsg[4]) |
|
return |
|
} |
|
} |
|
if clickMsg[5] != "" { |
|
if lv, err = strconv.ParseInt(clickMsg[5], 10, 64); err != nil { |
|
err = fmt.Errorf("lv(%s) is illegal", clickMsg[5]) |
|
return |
|
} |
|
} |
|
if ctime, err = strconv.ParseInt(clickMsg[6], 10, 64); err != nil { |
|
err = fmt.Errorf("ctime(%s) is illegal", clickMsg[6]) |
|
return |
|
} |
|
if stime, err = strconv.ParseInt(clickMsg[7], 10, 64); err != nil { |
|
err = fmt.Errorf("stime(%s) is illegal", clickMsg[7]) |
|
return |
|
} |
|
if ip = clickMsg[9]; ip == "" { |
|
err = errors.New("ip is illegal") |
|
return |
|
} |
|
if clickMsg[17] != "" { |
|
if epid, err = strconv.ParseInt(clickMsg[17], 10, 64); err != nil { |
|
err = fmt.Errorf("epid(%s) is illegal", clickMsg[17]) |
|
return |
|
} |
|
if clickMsg[15] != "null" { |
|
if seasonType, err = strconv.Atoi(clickMsg[15]); err != nil { |
|
err = fmt.Errorf("seasonType(%s) is illegal", clickMsg[15]) |
|
return |
|
} |
|
} |
|
} |
|
if strings.Contains(userAgent, "(auto_play)") || |
|
strings.Contains(userAgent, "(inline_play_heartbeat)") || |
|
strings.Contains(userAgent, "(inline_play_to_view)") || // to remove auto_play & inline_play_heartbeat |
|
strings.Contains(userAgent, "(played_time_enough)") { |
|
if did, err = s.getRealDid(context.TODO(), buvid, aid); err != nil || did == "" { |
|
err = fmt.Errorf("bvid(%s) dont have did", buvid) |
|
return |
|
} |
|
did = buvid |
|
msg = []byte(strings.Replace(string(msg), buvid, did, 1)) |
|
} |
|
click = &model.ClickMsg{ |
|
Plat: int8(plat), |
|
AID: aid, |
|
MID: mid, |
|
Lv: int8(lv), |
|
CTime: ctime, |
|
STime: stime, |
|
Did: did, |
|
Buvid: buvid, |
|
IP: ip, |
|
KafkaBs: msg, |
|
EpID: epid, |
|
SeasonType: seasonType, |
|
UserAgent: userAgent, |
|
} |
|
return |
|
} |
|
|
|
// ArcDuration return archive duration, manager local cache |
|
func (s *Service) ArcDuration(c context.Context, aid int64) (duration int64) { |
|
var ( |
|
ok bool |
|
arcDur *model.ArcDuration |
|
now = time.Now().Unix() |
|
err error |
|
) |
|
// duration default |
|
duration = s.c.CacheConf.PGCReplayTime |
|
s.arcDurWithMutex.Mutex.RLock() |
|
arcDur, ok = s.arcDurWithMutex.Durations[aid] |
|
s.arcDurWithMutex.Mutex.RUnlock() |
|
if ok && now-arcDur.GotTime > s.c.CacheConf.ArcUpCacheTime { |
|
duration = arcDur.Duration |
|
return |
|
} |
|
var arc *api.Arc |
|
if arc, err = s.arcRPC.Archive3(c, &arcmdl.ArgAid2{Aid: aid}); err != nil { |
|
// just log |
|
log.Error("s.arcRPC.Archive3(%d) error(%v)", aid, err) |
|
} else { |
|
duration = arc.Duration |
|
} |
|
s.arcDurWithMutex.Mutex.Lock() |
|
s.arcDurWithMutex.Durations[aid] = &model.ArcDuration{Duration: duration, GotTime: now} |
|
s.arcDurWithMutex.Mutex.Unlock() |
|
return |
|
} |
|
|
|
// Close kafaka consumer close. |
|
func (s *Service) Close() (err error) { |
|
s.closed = true |
|
time.Sleep(time.Second) |
|
for i := 0; i < len(s.cliChan); i++ { |
|
close(s.cliChan[i]) |
|
} |
|
close(s.bigDataChan) |
|
s.chanWg.Wait() |
|
s.statViewPub.Close() |
|
return |
|
}
|
|
|