You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
219 lines
5.9 KiB
219 lines
5.9 KiB
package service |
|
|
|
import ( |
|
"context" |
|
"encoding/json" |
|
"fmt" |
|
"strings" |
|
"time" |
|
|
|
"go-common/app/job/main/aegis/model" |
|
"go-common/library/log" |
|
"go-common/library/xstr" |
|
) |
|
|
|
func (s *Service) initCache() { |
|
s.newactiveBizFlow = make(map[string]struct{}) |
|
s.syncConfigCache(context.Background()) |
|
s.syncConsumerCache(context.Background()) |
|
s.oldactiveBizFlow = s.newactiveBizFlow |
|
} |
|
|
|
func (s *Service) cacheProc() { |
|
for { |
|
s.syncTaskCache() |
|
time.Sleep(3 * time.Minute) |
|
s.syncConfigCache(context.Background()) |
|
s.syncWeightWatch(context.Background()) |
|
s.syncConsumerCache(context.Background()) |
|
} |
|
} |
|
|
|
func (s *Service) syncTaskCache() { |
|
var ( |
|
tasks []*model.Task |
|
lastid = int64(0) |
|
err error |
|
) |
|
|
|
// 1.停滞任务,10分钟未变化,检查是否遗漏 |
|
lastid = 0 |
|
for { |
|
mtime := time.Now().Add(-10 * time.Minute) |
|
if tasks, lastid, err = s.dao.QueryTask(context.Background(), model.TaskStateDispatch, mtime, lastid, 1000); err != nil || len(tasks) == 0 { |
|
break |
|
} |
|
for _, task := range tasks { |
|
log.Info("检测到遗漏 停滞任务(%+v)", task) |
|
s.dao.SetTask(context.Background(), task) |
|
s.dao.PushPersonalTask(context.Background(), task.BusinessID, task.FlowID, task.UID, task.ID) |
|
} |
|
time.Sleep(time.Second) |
|
} |
|
|
|
// 2.延迟任务,半小时未变化,检查是否遗漏 |
|
lastid = 0 |
|
for { |
|
mtime := time.Now().Add(-30 * time.Minute) |
|
if tasks, lastid, err = s.dao.QueryTask(context.Background(), model.TaskStateDelay, mtime, lastid, 1000); err != nil || len(tasks) == 0 { |
|
break |
|
} |
|
for _, task := range tasks { |
|
log.Info("检测到遗漏 延迟任务(%+v)", task) |
|
s.dao.SetTask(context.Background(), task) |
|
s.dao.PushDelayTask(context.Background(), task.BusinessID, task.FlowID, task.UID, task.ID) |
|
} |
|
time.Sleep(time.Second) |
|
} |
|
|
|
// 3.实时任务,1小时未变化,检查是否遗漏 |
|
lastid = 0 |
|
for { |
|
mtime := time.Now().Add(-60 * time.Minute) |
|
if tasks, lastid, err = s.dao.QueryTask(context.Background(), model.TaskStateInit, mtime, lastid, 1000); err != nil || len(tasks) == 0 { |
|
break |
|
} |
|
for _, task := range tasks { |
|
log.Info("检测到遗漏 实时任务(%+v)", task) |
|
s.dao.SetTask(context.Background(), task) |
|
} |
|
|
|
s.dao.PushPublicTask(context.Background(), tasks...) |
|
time.Sleep(time.Second) |
|
} |
|
} |
|
|
|
func (s *Service) syncConfigCache(c context.Context) (err error) { |
|
s.oldactiveBizFlow = s.newactiveBizFlow |
|
|
|
configs, err := s.dao.TaskActiveConfigs(c) |
|
if err != nil { |
|
return |
|
} |
|
|
|
rangeWCCache := make(map[int64]map[string]*model.RangeWeightConfig) |
|
equalWCCache := make(map[string][]*model.EqualWeightConfig) |
|
assignCache := make(map[string][]*model.AssignConfig) |
|
activeBizFlow := make(map[string]struct{}) |
|
|
|
for _, item := range configs { |
|
key := fmt.Sprintf("%d-%d", item.BusinessID, item.FlowID) |
|
activeBizFlow[key] = struct{}{} |
|
switch item.ConfType { |
|
case model.TaskConfigAssign: |
|
assign := new(struct { |
|
Mids string `json:"mids"` |
|
Uids string `json:"uids"` |
|
}) |
|
if err = json.Unmarshal([]byte(item.ConfJSON), assign); err != nil { |
|
log.Error("json.Unmarshal error(%v)", err) |
|
continue |
|
} |
|
ac := &model.AssignConfig{} |
|
if item.UID > 0 { |
|
ac.Admin = item.UID |
|
} else { |
|
ac.Admin = 399 |
|
} |
|
assign.Mids = strings.TrimSpace(assign.Mids) |
|
assign.Uids = strings.TrimSpace(assign.Uids) |
|
if ac.Mids, err = xstr.SplitInts(assign.Mids); err != nil { |
|
log.Error("xstr.SplitInts error(%v)", err) |
|
continue |
|
} |
|
if ac.Uids, err = xstr.SplitInts(assign.Uids); err != nil { |
|
log.Error("xstr.SplitInts error(%v)", err) |
|
continue |
|
} |
|
|
|
if _, ok := assignCache[key]; ok { |
|
assignCache[key] = append(assignCache[key], ac) |
|
} else { |
|
assignCache[key] = []*model.AssignConfig{ac} |
|
} |
|
case model.TaskConfigRangeWeight: |
|
wcitem := &model.RangeWeightConfig{} |
|
if err = json.Unmarshal([]byte(item.ConfJSON), wcitem); err != nil { |
|
log.Error("json.Unmarshal error(%v)", err) |
|
continue |
|
} |
|
|
|
if _, ok := rangeWCCache[item.BusinessID]; ok { |
|
rangeWCCache[item.BusinessID][wcitem.Name] = wcitem |
|
} else { |
|
rangeWCCache[item.BusinessID] = map[string]*model.RangeWeightConfig{ |
|
wcitem.Name: wcitem, |
|
} |
|
} |
|
|
|
case model.TaskConfigEqualWeight: |
|
ewcitem := &model.EqualWeightConfig{} |
|
if err = json.Unmarshal([]byte(item.ConfJSON), ewcitem); err != nil { |
|
log.Error("json.Unmarshal error(%v)", err) |
|
continue |
|
} |
|
|
|
ewcitem.Uname = item.Uname |
|
ewcitem.Description = item.Description |
|
ewcitem.IDs = strings.TrimSpace(ewcitem.IDs) |
|
if _, ok := equalWCCache[key]; ok { |
|
equalWCCache[key] = append(equalWCCache[key], ewcitem) |
|
} else { |
|
equalWCCache[key] = []*model.EqualWeightConfig{ewcitem} |
|
} |
|
} |
|
} |
|
s.rangeWeightCfg = rangeWCCache |
|
s.equalWeightCfg = equalWCCache |
|
s.assignConfig = assignCache |
|
s.newactiveBizFlow = activeBizFlow |
|
|
|
return |
|
} |
|
|
|
func (s *Service) syncWeightWatch(c context.Context) { |
|
for key := range s.oldactiveBizFlow { |
|
if _, ok := s.newactiveBizFlow[key]; !ok { |
|
if wm, ok := s.wmHash[key]; ok { |
|
wm.close = true |
|
log.Info("关闭权重计算器 bizid(%d) flowid(%d)", wm.businessID, wm.flowID) |
|
delete(s.wmHash, key) |
|
} |
|
} |
|
} |
|
|
|
for key := range s.newactiveBizFlow { |
|
if _, ok := s.oldactiveBizFlow[key]; !ok { |
|
bizid, _ := parseKey(key) |
|
s.wmHash[key] = NewWeightManager(s, s.getWeightOpt(bizid), key) |
|
} |
|
} |
|
} |
|
|
|
func (s *Service) getWeightOpt(bizid int) *model.WeightOPT { |
|
for _, item := range s.c.BizCfg.WeightOpt { |
|
if item.BusinessID == int64(bizid) { |
|
return item |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
func (s *Service) syncConsumerCache(c context.Context) (err error) { |
|
s.ccMux.Lock() |
|
defer s.ccMux.Unlock() |
|
consumerCache, err := s.dao.TaskActiveConsumer(c) |
|
if err != nil { |
|
return |
|
} |
|
s.consumerCache = consumerCache |
|
return |
|
} |
|
|
|
// getWeightCache . |
|
func (s *Service) getWeightCache(c context.Context, businessid, flowid int64) (rwc map[string]*model.RangeWeightConfig, ewc []*model.EqualWeightConfig) { |
|
key := fmt.Sprintf("%d-%d", businessid, flowid) |
|
rwc = s.rangeWeightCfg[businessid] |
|
ewc = s.equalWeightCfg[key] |
|
return |
|
}
|
|
|