You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
232 lines
5.8 KiB
232 lines
5.8 KiB
package service |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"time" |
|
|
|
"go-common/app/admin/main/workflow/model/param" |
|
"go-common/app/job/main/workflow/model" |
|
"go-common/library/database/elastic" |
|
"go-common/library/ecode" |
|
"go-common/library/log" |
|
) |
|
|
|
const ( |
|
// LOGAPPID audit log appid . |
|
LOGAPPID = "log_audit" |
|
// LOGINDEX . |
|
LOGINDEX = "log_audit_11_all" |
|
// GROUPAPPID . |
|
GROUPAPPID = "workflow_group_common" |
|
// GROUPINDEX . |
|
GROUPINDEX = "workflow_group_common" |
|
// CHALLAPPID . |
|
CHALLAPPID = "workflow_chall_common" |
|
// CHALLINDEX . |
|
CHALLINDEX = "workflow_chall_common" |
|
// BUSINESS . |
|
BUSINESS = 1 |
|
// MC . |
|
MC = "1_15_1" |
|
) |
|
|
|
// notifyState . |
|
var notifyState = []int64{0, 2} |
|
|
|
// notifyproc . |
|
func (s *Service) notifyproc(c context.Context) { |
|
for { |
|
if err := s.sendMessage(c); err != nil { |
|
log.Error("s.sendMessage type(%d) error(%v)", err) |
|
time.Sleep(time.Second * 3) |
|
continue |
|
} |
|
time.Sleep(time.Second * 60) |
|
} |
|
} |
|
|
|
// SendMessage . |
|
func (s *Service) sendMessage(c context.Context) (err error) { |
|
var ( |
|
group []*model.GroupRes |
|
chall map[int64][]*model.ChallRes |
|
gids []int64 |
|
) |
|
if group, err = s.NotNotifyGroupSearch(c); err != nil { |
|
log.Error("s.NotNotifyGroupSearch error(%v)", err) |
|
return |
|
} |
|
if len(group) <= 0 { |
|
log.Info("group search length is 0") |
|
return |
|
} |
|
for _, g := range group { |
|
gids = append(gids, g.ID) |
|
} |
|
if chall, err = s.NotNotifyChallSearch(c, gids); err != nil { |
|
log.Error("s.NotNotifyChallSearch error(%v)", err) |
|
return |
|
} |
|
if len(chall) <= 0 { |
|
log.Info("chall search length is 0") |
|
return |
|
} |
|
for _, g := range group { |
|
mids := []int64{} |
|
ok := false |
|
var chs []*model.ChallRes |
|
if chs, ok = chall[g.ID]; !ok { |
|
log.Error("gid(%d) in group but not in chall", g.ID) |
|
continue |
|
} |
|
for _, ch := range chs { |
|
mids = append(mids, ch.MID) |
|
} |
|
if len(mids) <= 0 { |
|
log.Error("gid(%d) with mid in chall is empty", g.ID) |
|
continue |
|
} |
|
param := ¶m.MessageParam{ |
|
Type: "json", |
|
Source: 1, |
|
DataType: 4, |
|
MC: MC, |
|
Title: "您的投诉已收到", |
|
Context: fmt.Sprintf("您对稿件(av%d)的举报我们已经收到。感谢您对 bilibili 社区秩序的维护,哔哩哔哩 (゜-゜)つロ 干杯~ ", g.OID), |
|
MidList: mids, |
|
} |
|
log.Info("send message param(%+v)", param) |
|
if err = s.dao.SendMessage(c, chs, param); err != nil { |
|
log.Error("s.dao.SendMessage error(%v)", err) |
|
} |
|
} |
|
return |
|
} |
|
|
|
// NotNotifyGroupSearch . |
|
func (s *Service) NotNotifyGroupSearch(c context.Context) (result []*model.GroupRes, err error) { |
|
var ( |
|
pn = 1 |
|
ps = 1000 |
|
tempRes *model.SearchGroup |
|
) |
|
frontTenMin, _ := time.ParseDuration("-10m") |
|
frontTwelveMin, _ := time.ParseDuration("-12m") |
|
frontTenMinFormat := time.Now().Add(frontTenMin).Format("2006-01-02 15:04:05") |
|
frontTwelveMinFormat := time.Now().Add(frontTwelveMin).Format("2006-01-02 15:04:05") |
|
e := elastic.NewElastic(nil) |
|
r := e.NewRequest(GROUPAPPID).Index(GROUPINDEX).Fields("id", "oid"). |
|
WhereEq("business", BUSINESS). |
|
WhereIn("state", notifyState). |
|
WhereRange("lasttime", frontTwelveMinFormat, frontTenMinFormat, elastic.RangeScopeLoRo). |
|
Pn(pn). |
|
Ps(ps) |
|
if err = r.Scan(context.Background(), &tempRes); err != nil { |
|
log.Error("elastic search group Scan error(%v)", err) |
|
return |
|
} |
|
log.Info("groupparams is(%v)", r.Params()) |
|
res := tempRes.Result |
|
if len(res) <= 0 { |
|
return |
|
} |
|
// search audit log |
|
var logGids map[int64][]*model.LogRes |
|
if logGids, err = s.searchAuditLog(c, res); err != nil { |
|
log.Error("s.searchAuditLog error(%v)", err) |
|
return |
|
} |
|
for _, r := range res { |
|
if _, ok := logGids[r.OID]; !ok { |
|
result = append(result, r) |
|
} |
|
} |
|
return |
|
} |
|
|
|
// searchAuditLog . |
|
func (s *Service) searchAuditLog(c context.Context, grp []*model.GroupRes) (logGids map[int64][]*model.LogRes, err error) { |
|
pn := 1 |
|
ps := 1000 |
|
oids := []int64{} |
|
logGids = make(map[int64][]*model.LogRes) |
|
for _, g := range grp { |
|
oids = append(oids, g.OID) |
|
} |
|
e := elastic.NewElastic(nil) |
|
for { |
|
var res *model.AuditLog |
|
r := e.NewRequest(LOGAPPID).Index(LOGINDEX).Fields("int_1", "oid"). |
|
WhereIn("oid", oids). |
|
WhereEq("type", 2). |
|
WhereEq("business", 11). |
|
WhereEq("action", "notify_users_received"). |
|
Pn(pn). |
|
Ps(ps) |
|
if err = r.Scan(context.Background(), res); err != nil { |
|
log.Error("elastic search audit log Scan error(%v)", err) |
|
return |
|
} |
|
if res == nil { |
|
time.Sleep(time.Second * 3) |
|
continue |
|
} |
|
if res.Page.Total > 10000 { |
|
log.Error("elastic search audit log result too long") |
|
err = ecode.ServerErr |
|
return |
|
} |
|
log.Info("auditlogparams is(%v)", r.Params()) |
|
logRes := res.Result |
|
for _, lg := range logRes { |
|
logGids[lg.Oid] = append(logGids[lg.Oid], lg) |
|
} |
|
if len(res.Result) < ps { |
|
break |
|
} |
|
pn++ |
|
} |
|
return |
|
} |
|
|
|
// NotNotifyChallSearch . |
|
func (s *Service) NotNotifyChallSearch(c context.Context, gids []int64) (chall map[int64][]*model.ChallRes, err error) { |
|
var ( |
|
pn = 1 |
|
ps = 1000 |
|
) |
|
if len(gids) <= 0 { |
|
return |
|
} |
|
chall = make(map[int64][]*model.ChallRes) |
|
frontTenMin, _ := time.ParseDuration("-10m") |
|
frontTwelveMin, _ := time.ParseDuration("-12m") |
|
frontTenMinFormat := time.Now().Add(frontTenMin).Format("2006-01-02 15:04:05") |
|
frontTwelveMinFormat := time.Now().Add(frontTwelveMin).Format("2006-01-02 15:04:05") |
|
e := elastic.NewElastic(nil) |
|
for { |
|
res := &model.SearchChall{} |
|
r := e.NewRequest(CHALLAPPID).Index(CHALLINDEX).Fields([]string{"id", "gid", "mid", "oid"}...). |
|
WhereEq("business", BUSINESS). |
|
WhereEq("state", 0). |
|
WhereIn("gid", gids). |
|
WhereRange("ctime", frontTwelveMinFormat, frontTenMinFormat, elastic.RangeScopeLoRo). |
|
Pn(pn). |
|
Ps(ps) |
|
if err = r.Scan(context.Background(), &res); err != nil { |
|
log.Error("elastic search chall Scan error(%v)", err) |
|
time.Sleep(time.Second * 3) |
|
continue |
|
} |
|
log.Info("challparams is(%v)", r.Params()) |
|
for _, r := range res.Result { |
|
chall[r.GID] = append(chall[r.GID], r) |
|
} |
|
if len(res.Result) < ps { |
|
break |
|
} |
|
pn++ |
|
} |
|
return |
|
}
|
|
|