You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
444 lines
11 KiB
444 lines
11 KiB
package upcrmservice |
|
|
|
import ( |
|
"context" |
|
"errors" |
|
"sync" |
|
"time" |
|
|
|
"go-common/app/admin/main/up/dao/global" |
|
"go-common/app/admin/main/up/model/datamodel" |
|
"go-common/app/admin/main/up/model/upcrmmodel" |
|
"go-common/app/admin/main/up/util" |
|
accgrpc "go-common/app/service/main/account/api" |
|
"go-common/library/database/elastic" |
|
"go-common/library/log" |
|
"go-common/library/net/metadata" |
|
"go-common/library/sync/errgroup" |
|
) |
|
|
|
const ( |
|
maxSearchItemCount = 10000 |
|
maxBatchCount = 100 |
|
) |
|
|
|
var ( |
|
//ErrTooManySearchItem too many search items, 搜索只支持返回前10000条数据 |
|
ErrTooManySearchItem = errors.New("筛选仅支持展示前1万条") |
|
//ErrNoMid no mid |
|
ErrNoMid = errors.New("Mid为空") |
|
) |
|
|
|
//UpBaseInfoQuery query |
|
func (s *Service) UpBaseInfoQuery(context context.Context, arg *upcrmmodel.InfoQueryArgs) (result *upcrmmodel.InfoQueryResult, err error) { |
|
|
|
var data, e = s.upBaseInfoQueryBatch(s.crmdb.QueryUpBaseInfoBatchByMid, arg.Mid) |
|
err = e |
|
// 没找到按照错误处理 |
|
if err != nil || len(data) == 0 { |
|
log.Error("get from db fail, req=%+v, err=%+v", arg, err) |
|
return |
|
} |
|
|
|
result = data[0] |
|
|
|
log.Info("query base info ok, req=%+v, err=%+v", arg, result) |
|
return |
|
} |
|
|
|
//QueryDbFunc query func type |
|
type QueryDbFunc func(fields string, mid ...int64) (result []upcrmmodel.UpBaseInfo, err error) |
|
|
|
func (s *Service) upBaseInfoQueryBatch(queryfunc QueryDbFunc, ids ...int64) (result []*upcrmmodel.InfoQueryResult, err error) { |
|
var data, e = queryfunc("*", ids...) |
|
err = e |
|
if err != nil { |
|
log.Error("get from db fail, err=%+v", err) |
|
return |
|
} |
|
|
|
for _, v := range data { |
|
var info = upcrmmodel.InfoQueryResult{} |
|
info.CopyFromBaseInfo(v) |
|
info.CalculateAttr() |
|
result = append(result, &info) |
|
} |
|
return |
|
} |
|
|
|
// UpAccountInfo get account info |
|
func (s *Service) UpAccountInfo(c context.Context, arg *upcrmmodel.InfoAccountInfoArgs) (res []*accgrpc.Info, err error) { |
|
var ( |
|
infosReply *accgrpc.InfosReply |
|
mids = util.ExplodeInt64(arg.Mids, ",") |
|
) |
|
if infosReply, err = global.GetAccClient().Infos3(c, &accgrpc.MidsReq{Mids: mids, RealIp: metadata.String(c, metadata.RemoteIP)}); err != nil { |
|
return |
|
} |
|
if infosReply == nil || infosReply.Infos == nil { |
|
return |
|
} |
|
for _, v := range infosReply.Infos { |
|
res = append(res, v) |
|
} |
|
log.Info("query acount info ok, req=%+v, result=%+v", arg, res) |
|
return |
|
} |
|
|
|
//SearchResult struct |
|
type SearchResult struct { |
|
AccountState int `json:"account_state"` |
|
Activity int `json:"activity"` |
|
Attr int `json:"attr"` |
|
ArticleCountAccumulate int `json:"article_count_accumulate"` |
|
ID uint32 |
|
Mid int64 |
|
} |
|
|
|
func getAttrFormat(attrs upcrmmodel.UpAttr) (result []int) { |
|
// 什么要shift,因为es的位是从1开始的,而存储的位是从0开始的 |
|
const shift = 1 |
|
if attrs.AttrVideo != 0 { |
|
result = append(result, upcrmmodel.AttrBitVideo+shift) |
|
} |
|
if attrs.AttrAudio != 0 { |
|
result = append(result, upcrmmodel.AttrBitAudio+shift) |
|
} |
|
if attrs.AttrArticle != 0 { |
|
result = append(result, upcrmmodel.AttrBitArticle+shift) |
|
} |
|
if attrs.AttrPhoto != 0 { |
|
result = append(result, upcrmmodel.AttrBitPhoto+shift) |
|
} |
|
if attrs.AttrSign != 0 { |
|
result = append(result, upcrmmodel.AttrBitSign+shift) |
|
} |
|
|
|
if attrs.AttrGrowup != 0 { |
|
result = append(result, upcrmmodel.AttrBitGrowup+shift) |
|
} |
|
|
|
if attrs.AttrVerify != 0 { |
|
result = append(result, upcrmmodel.AttrBitVerify+shift) |
|
} |
|
|
|
return |
|
} |
|
|
|
func getEsCombo(attrs upcrmmodel.UpAttr) (combos []*elastic.Combo) { |
|
const shift = 1 |
|
|
|
var attrs1, attrs2 []interface{} |
|
var attrFlagList = getAttrFormat(attrs) |
|
for _, v := range attrFlagList { |
|
if _, ok := upcrmmodel.AttrGroup1[v-shift]; ok { |
|
attrs1 = append(attrs1, v) |
|
} else if _, ok := upcrmmodel.AttrGroup2[v-shift]; ok { |
|
attrs2 = append(attrs2, v) |
|
} |
|
} |
|
if attrs1 != nil { |
|
var attrGroup = make(map[string][]interface{}) |
|
attrGroup["attr_format"] = attrs1 |
|
cmb := &elastic.Combo{} |
|
cmb.ComboIn([]map[string][]interface{}{ |
|
attrGroup}, |
|
).MinIn(1).MinAll(1) |
|
combos = append(combos, cmb) |
|
} |
|
|
|
if attrs2 != nil { |
|
var attrGroup = make(map[string][]interface{}) |
|
attrGroup["attr_format"] = attrs2 |
|
cmb := &elastic.Combo{} |
|
cmb.ComboIn([]map[string][]interface{}{ |
|
attrGroup}, |
|
).MinIn(1).MinAll(1) |
|
combos = append(combos, cmb) |
|
} |
|
return |
|
} |
|
|
|
//UpInfoSearch info search |
|
func (s *Service) UpInfoSearch(c context.Context, arg *upcrmmodel.InfoSearchArgs) (result upcrmmodel.InfoSearchResult, err error) { |
|
//调用搜索的接口 |
|
var searchData esResult |
|
searchData, err = s.searchFromEs(c, arg) |
|
if err != nil { |
|
log.Error("search arg=%+v, err=%+v", arg, err) |
|
return |
|
} |
|
|
|
if len(searchData.Result) == 0 { |
|
log.Info("no data return from search, just return") |
|
return |
|
} |
|
var ids []int64 |
|
for _, v := range searchData.Result { |
|
ids = append(ids, int64(v.ID)) |
|
} |
|
|
|
result.Result, err = s.queryUpBaseInfo(c, ids...) |
|
if err != nil { |
|
log.Error("query up base info fail, err=%+v", err) |
|
return |
|
} |
|
result.PageInfo = searchData.Page.ToPageInfo() |
|
|
|
log.Info("res=%+v, page=%+v", searchData, result.PageInfo) |
|
return |
|
} |
|
|
|
type esPage struct { |
|
Num int `json:"num"` |
|
Size int `json:"size"` |
|
Total int `json:"total"` |
|
} |
|
|
|
//ToPageInfo cast to page info |
|
func (e *esPage) ToPageInfo() (pageInfo upcrmmodel.PageInfo) { |
|
if e == nil { |
|
return |
|
} |
|
|
|
pageInfo.TotalCount = e.Total |
|
pageInfo.Size = e.Size |
|
pageInfo.Page = e.Num |
|
|
|
return |
|
} |
|
|
|
type esResult struct { |
|
Page *esPage `json:"page"` |
|
Result []*SearchResult `json:"result"` |
|
} |
|
|
|
func (s *Service) searchFromEs(c context.Context, arg *upcrmmodel.InfoSearchArgs) (searchData esResult, err error) { |
|
if arg.Page*arg.Size > maxSearchItemCount { |
|
err = ErrTooManySearchItem |
|
return |
|
} |
|
|
|
var searchSdk = elastic.NewElastic(nil) |
|
var r = searchSdk.NewRequest("up_crm_info") |
|
r.Pn(arg.Page).Ps(arg.Size).Index("up_base_info"). |
|
Fields("id") |
|
if arg.Mid != 0 { |
|
r.WhereEq("mid", arg.Mid) |
|
} else { |
|
if arg.AccountState != 0 { |
|
// 字段有0值,所以接口改为1,2表示字段的0,1,接口的0表示没有此条件 |
|
var realArg = arg.AccountState - 1 |
|
r.WhereEq("account_state", realArg) |
|
} |
|
if arg.Activity != 0 { |
|
r.WhereEq("activity", arg.Activity) |
|
} |
|
var startdate, _ = time.Parse(upcrmmodel.TimeFmtDate, arg.FirstDateBegin) |
|
var enddate, _ = time.Parse(upcrmmodel.TimeFmtDate, arg.FirstDateEnd) |
|
enddate = enddate.AddDate(0, 0, 1) |
|
var startStr = startdate.Format(upcrmmodel.TimeFmtMysql) |
|
var endStr = enddate.Format(upcrmmodel.TimeFmtMysql) |
|
if arg.FirstDateBegin != "" && arg.FirstDateEnd != "" { |
|
r.WhereRange("first_up_time", startStr, endStr, elastic.RangeScopeLcRc) |
|
} |
|
|
|
if arg.Order.Order == "" { |
|
arg.Order.Order = "desc" |
|
} |
|
|
|
if arg.Order.Field == "" { |
|
arg.Order.Field = "first_up_time" |
|
arg.Order.Order = "desc" |
|
} |
|
var combos = getEsCombo(arg.Attrs) |
|
r.WhereCombo(combos...) |
|
r.Order(arg.Order.Field, arg.Order.Order) |
|
} |
|
|
|
err = r.Scan(c, &searchData) |
|
return |
|
} |
|
|
|
func (s *Service) queryUpBaseInfo(c context.Context, ids ...int64) (result []*upcrmmodel.InfoQueryResult, err error) { |
|
|
|
var group, ctx = errgroup.WithContext(c) |
|
var infoData []*upcrmmodel.InfoQueryResult |
|
group.Go(func() error { |
|
var e error |
|
infoData, e = s.upBaseInfoQueryBatch(s.crmdb.QueryUpBaseInfoBatchByID, ids...) |
|
if e != nil { |
|
err = e |
|
log.Error("get base info fail, err=%+v", err) |
|
} |
|
return nil |
|
}) |
|
|
|
var tidMap = make(map[int64]*datamodel.UpArchiveTypeData) |
|
var mapLock sync.Mutex |
|
for _, mid := range ids { |
|
group.Go(func() error { |
|
var arg = datamodel.GetUpArchiveTypeInfoArg{Mid: mid} |
|
var tidData, e = s.dataService.GetUpArchiveTypeInfo(ctx, &arg) |
|
if e != nil || tidData == nil { |
|
log.Error("get up type info err, err=%v", e) |
|
return nil |
|
} |
|
mapLock.Lock() |
|
tidMap[arg.Mid] = tidData |
|
mapLock.Unlock() |
|
return nil |
|
}) |
|
} |
|
if err = group.Wait(); err != nil { |
|
log.Error("get data error, err=%v", err) |
|
return |
|
} |
|
|
|
var infoIDMap = make(map[uint32]*upcrmmodel.InfoQueryResult) |
|
for _, v := range infoData { |
|
infoIDMap[v.ID] = v |
|
} |
|
|
|
for _, v := range ids { |
|
var info, ok = infoIDMap[uint32(v)] |
|
if !ok { |
|
continue |
|
} |
|
if typeInfo, ok := tidMap[v]; ok { |
|
info.ActiveTid = typeInfo.Tid |
|
info.ActiveSubtid = typeInfo.SubTid |
|
} |
|
result = append(result, info) |
|
} |
|
return |
|
} |
|
|
|
//QueryUpInfoWithViewerData query with view data |
|
func (s *Service) QueryUpInfoWithViewerData(c context.Context, arg *upcrmmodel.UpInfoWithViewerArg) (result upcrmmodel.UpInfoWithViewerResult, err error) { |
|
if arg.Page*arg.Size > maxSearchItemCount { |
|
err = ErrTooManySearchItem |
|
return |
|
} |
|
// 如果是0,则默认设置所有的tag |
|
if arg.Flag == 0 { |
|
arg.Flag = -1 |
|
} |
|
var mids []int64 |
|
if arg.Mids != "" { |
|
mids = util.ExplodeInt64(arg.Mids, ",") |
|
var midlen = len(mids) |
|
if midlen == 0 { |
|
err = ErrNoMid |
|
log.Error("no mid get from mids, arg=%+v", arg) |
|
return |
|
} |
|
if midlen > maxBatchCount { |
|
mids = mids[:maxBatchCount] |
|
} |
|
} else { |
|
var searchSdk = elastic.NewElastic(nil) |
|
var r = searchSdk.NewRequest("up_crm_info") |
|
if arg.Size > maxBatchCount { |
|
arg.Size = maxBatchCount |
|
} |
|
|
|
r.Pn(arg.Page).Ps(arg.Size).Index("up_base_info"). |
|
Fields("mid"). |
|
Order(arg.Sort, arg.Order) |
|
|
|
var searchData esResult |
|
err = r.Scan(c, &searchData) |
|
if err != nil { |
|
log.Error("fail to get from search, arg=%+v", arg) |
|
return |
|
} |
|
|
|
for _, v := range searchData.Result { |
|
mids = append(mids, int64(v.Mid)) |
|
} |
|
|
|
result.PageInfo = searchData.Page.ToPageInfo() |
|
} |
|
|
|
var group, ctx = errgroup.WithContext(c) |
|
var infoData []*upcrmmodel.InfoQueryResult |
|
var playData []*upcrmmodel.UpPlayInfo |
|
group.Go(func() error { |
|
if arg.Flag&upcrmmodel.FlagUpBaseData != 0 { |
|
infoData, err = s.upBaseInfoQueryBatch(s.crmdb.QueryUpBaseInfoBatchByMid, mids...) |
|
if err != nil { |
|
log.Error("query up base error, err=%v", err) |
|
return err |
|
} |
|
} |
|
if arg.Flag&upcrmmodel.FlagUpPlayData != 0 { |
|
playData, err = s.crmdb.QueryPlayInfoBatch(mids, upcrmmodel.BusinessTypeVideo) |
|
if err != nil { |
|
log.Error("query play info err, err=%v", err) |
|
return err |
|
} |
|
} |
|
return nil |
|
}) |
|
|
|
var dataMap = make(map[int64]*upcrmmodel.UpInfoWithViewerData) |
|
if arg.Flag&upcrmmodel.FlagViewData != 0 { |
|
for _, v := range mids { |
|
var mid = v // copy this v |
|
group.Go(func() error { |
|
var info, e = s.dataService.GetViewData(ctx, mid) |
|
if e != nil { |
|
err = e |
|
log.Error("query up view info from hbase error, err=%v", err) |
|
return err |
|
} |
|
var data = getOrCreateUpInfo(dataMap, mid) |
|
data.ViewerBase = info.Base |
|
data.ViewerTrend = info.Trend |
|
data.ViewerArea = info.Area |
|
return err |
|
}) |
|
} |
|
} |
|
if err = group.Wait(); err != nil { |
|
log.Error("get data fail, err=%v", err) |
|
return |
|
} |
|
|
|
for _, baseInfo := range infoData { |
|
var data = getOrCreateUpInfo(dataMap, baseInfo.Mid) |
|
data.UpBaseInfo = baseInfo |
|
} |
|
|
|
for _, playInfo := range playData { |
|
var data = getOrCreateUpInfo(dataMap, playInfo.Mid) |
|
data.UpPlayInfo = playInfo |
|
} |
|
|
|
for _, mid := range mids { |
|
var data, ok = dataMap[mid] |
|
if !ok { |
|
log.Warn("up info not found, mid=%d", mid) |
|
continue |
|
} |
|
data.Mid = mid |
|
result.Result = append(result.Result, data) |
|
} |
|
log.Info("query up with view ok, arg=%+v, result count=%d", arg, len(mids)) |
|
return |
|
} |
|
|
|
var dataMapMutex sync.Mutex |
|
|
|
func getOrCreateUpInfo(dataMap map[int64]*upcrmmodel.UpInfoWithViewerData, mid int64) (result *upcrmmodel.UpInfoWithViewerData) { |
|
dataMapMutex.Lock() |
|
defer dataMapMutex.Unlock() |
|
|
|
var ok bool |
|
if result, ok = dataMap[mid]; !ok { |
|
result = &upcrmmodel.UpInfoWithViewerData{} |
|
dataMap[mid] = result |
|
} |
|
return result |
|
}
|
|
|