You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
462 lines
15 KiB
462 lines
15 KiB
package service |
|
|
|
import ( |
|
"bytes" |
|
"context" |
|
"encoding/json" |
|
"fmt" |
|
"go-common/app/job/live/xlottery/internal/model" |
|
"go-common/library/ecode" |
|
"go-common/library/queue/databus/report" |
|
"net/http" |
|
"strconv" |
|
"sync" |
|
"time" |
|
|
|
"github.com/robfig/cron" |
|
|
|
"go-common/app/job/live/xlottery/internal/conf" |
|
"go-common/app/job/live/xlottery/internal/dao" |
|
"go-common/library/log" |
|
bm "go-common/library/net/http/blademaster" |
|
"go-common/library/queue/databus" |
|
) |
|
|
|
// Service struct |
|
type Service struct { |
|
c *conf.Config |
|
dao *dao.Dao |
|
cron *cron.Cron |
|
giftPaySub *databus.Databus |
|
giftFreeSub *databus.Databus |
|
capsuleSub *databus.Databus |
|
ExpireCountFrequency string |
|
CouponRetryFrequency string |
|
httpClient *bm.Client |
|
wg *sync.WaitGroup |
|
} |
|
|
|
const _sendGiftKey = "lottery:gift:msgid:%s" |
|
|
|
const _addCapsuleKey = "lottery:gift:msgid:%s" |
|
|
|
type info struct { |
|
MsgContent string `json:"msg_content"` |
|
} |
|
|
|
type msgContent struct { |
|
Body *body `json:"body"` |
|
} |
|
type body struct { |
|
GiftId int64 `json:"giftid"` |
|
RoomId int64 `json:"roomid"` |
|
Num int64 `json:"num"` |
|
Uid int64 `json:"uid"` |
|
Ruid int64 `json:"ruid"` |
|
TotalCoin int64 `json:"totalCoin"` |
|
CoinType string `json:"coinType"` |
|
Tid string `json:"tid"` |
|
Platform string `json:"platform"` |
|
RoomInfo *roomInfo `json:"roomInfo"` |
|
} |
|
type roomInfo struct { |
|
AreaV2Id int64 `json:"area_v2_id"` |
|
AreaV2ParentId int64 `json:"area_v2_parent_id"` |
|
} |
|
|
|
// New init |
|
func New(c *conf.Config) (s *Service) { |
|
s = &Service{ |
|
c: c, |
|
dao: dao.New(c), |
|
cron: cron.New(), |
|
giftPaySub: databus.New(c.GiftPaySub), |
|
giftFreeSub: databus.New(c.GiftFreeSub), |
|
capsuleSub: databus.New(c.AddCapsuleSub), |
|
wg: new(sync.WaitGroup), |
|
ExpireCountFrequency: c.Cfg.ExpireCountFrequency, |
|
CouponRetryFrequency: c.Cfg.CouponRetryFrequency, |
|
httpClient: bm.NewClient(c.HTTPClient), |
|
} |
|
report.InitUser(conf.Conf.UserReport) |
|
dao.InitAPI() |
|
s.addCrontab() |
|
s.cron.Start() |
|
s.tickerReloadCapsuleConf(context.TODO()) |
|
log.Info("[service.lottery| 11start") |
|
var i int64 |
|
for i = 0; i < c.Cfg.ConsumerProcNum; i++ { |
|
s.wg.Add(1) |
|
go s.giftConsumeProc() |
|
} |
|
s.wg.Add(1) |
|
go s.capsuleConsumeProc() |
|
return s |
|
} |
|
|
|
// Ping Service |
|
func (s *Service) Ping(ctx context.Context) (err error) { |
|
return s.dao.Ping(ctx) |
|
} |
|
|
|
// Close Service |
|
func (s *Service) Close() { |
|
s.subClose() |
|
s.wg.Wait() |
|
s.dao.Close() |
|
} |
|
|
|
// subClose Close all sub channels |
|
func (s *Service) subClose() { |
|
s.giftPaySub.Close() |
|
s.giftFreeSub.Close() |
|
s.capsuleSub.Close() |
|
} |
|
|
|
func (s *Service) addCrontab() (err error) { |
|
//spew.Dump(s.ExpireCountFrequency) |
|
err = s.cron.AddFunc(s.ExpireCountFrequency, s.TransCapsule) |
|
if err != nil { |
|
log.Error("cron job transCapsule error(%v)", err) |
|
} |
|
err = s.cron.AddFunc(s.CouponRetryFrequency, s.CouponRetry) |
|
if err != nil { |
|
log.Error("cron job couponRetry error(%v)", err) |
|
} |
|
return |
|
} |
|
|
|
// CouponRetry 抽奖券重试 |
|
func (s *Service) CouponRetry() { |
|
var ctx = context.Background() |
|
if s.c.CouponConf == nil || s.c.CouponConf.Url == "" || len(s.c.CouponConf.Coupon) == 0 { |
|
log.Error("[service.capsule | sendAward] couponConf is empty") |
|
return |
|
} |
|
nowTime := time.Now() |
|
log.Info("[service.service | couponRetry]couponRetry %s", nowTime.Format("2006-01-02 15:04:05")) |
|
extraData, _ := s.dao.GetCouponData(ctx) |
|
if len(extraData) == 0 { |
|
return |
|
} |
|
for _, extra := range extraData { |
|
s.dao.UpdateExtraMtimeById(ctx, extra.Id, nowTime.Format("2006-01-02 15:04:05")) |
|
awardType := extra.ItemExtra |
|
if _, ok := s.c.CouponConf.Coupon[awardType]; !ok { |
|
log.Error("[service.capsule | sendAward] couponConf.coupon is empty %s", awardType) |
|
continue |
|
} |
|
uid := extra.Uid |
|
var res struct { |
|
Code int `json:"code"` |
|
Msg string `json:"message"` |
|
} |
|
endPoint := s.c.CouponConf.Url |
|
postJson := make(map[string]interface{}) |
|
postJson["mid"] = uid |
|
postJson["couponId"] = s.c.CouponConf.Coupon[awardType] |
|
bytesData, err := json.Marshal(postJson) |
|
if err != nil { |
|
log.Error("[service.capsule | sendAward] json.Marshal(%v) error(%v)", postJson, err) |
|
continue |
|
} |
|
req, err := http.NewRequest("POST", endPoint, bytes.NewReader(bytesData)) |
|
if err != nil { |
|
log.Error("[service.capsule | sendAward] http.NewRequest(%v) url(%v) error(%v)", postJson, endPoint, err) |
|
continue |
|
} |
|
req.Header.Add("Content-Type", "application/json;charset=UTF-8") |
|
log.Info("coupon vip mid(%d) couponID(%s)", uid, s.c.CouponConf.Coupon[awardType]) |
|
if err = s.httpClient.Do(ctx, req, &res); err != nil { |
|
log.Error("[service.capsule | sendAward] s.client.Do error(%v)", err) |
|
continue |
|
} |
|
if res.Code != 0 && res.Code != 83110005 { |
|
err = ecode.Int(res.Code) |
|
log.Error("coupon vip url(%v) res code(%d)", endPoint, res.Code) |
|
continue |
|
} |
|
log.Info("[service.capsule | sendAward] s.client.Do endpoint (%v) req (%v)", endPoint, postJson) |
|
s.dao.UpdateExtraValueById(ctx, extra.Id, 1) |
|
} |
|
|
|
} |
|
|
|
// TransCapsule 转换扭蛋币 |
|
func (s *Service) TransCapsule() { |
|
var ctx = context.Background() |
|
pools, err := s.dao.GetActiveColorPool(ctx) |
|
if err != nil { |
|
log.Error("[service.service | TransCapsule]CronJob TransCapsule GetActiveColorPool error(%v)", err) |
|
return |
|
} |
|
nowTime := time.Now().Add(-(60 * time.Second)).Format("2006-01-02 15:04") |
|
log.Info("[service.service | TransCapsule]TranCapsule %s", nowTime) |
|
flag := 0 |
|
coinId := int64(0) |
|
for _, pool := range pools { |
|
if pool.EndTime == 0 { |
|
continue |
|
} else { |
|
endTimeUnix := time.Unix(pool.EndTime, 0) |
|
endTime := endTimeUnix.Format("2006-01-02 15:04") |
|
if endTime == nowTime { |
|
flag = 1 |
|
coinId = pool.CoinId |
|
} |
|
} |
|
} |
|
if flag == 1 { |
|
colorChangeNum, err := s.dao.GetTransNum(ctx, coinId) |
|
if err != nil || colorChangeNum == 0 { |
|
log.Error("[service.service | TransCapsule] GetTransNum colorChangeNum: %d, err: %v", colorChangeNum, err) |
|
return |
|
} |
|
normalChangeNum, err := s.dao.GetTransNum(ctx, dao.NormalCoinId) |
|
if err != nil || normalChangeNum == 0 { |
|
log.Error("[service.service | TransCapsule] GetTransNum normalChangeNum: %d, err: %v", normalChangeNum, err) |
|
return |
|
} |
|
for i := int64(0); i < 10; i++ { |
|
err := s.dao.TransCapsule(ctx, strconv.FormatInt(i, 10), colorChangeNum, normalChangeNum) |
|
if err != nil { |
|
log.Error("[service.service | TransCapsule]TranCapsule error %v", err) |
|
return |
|
} |
|
log.Info("[service.service | TransCapsule]TranCapsule %s", strconv.FormatInt(i, 10)) |
|
} |
|
} |
|
} |
|
|
|
// expCanalConsumeproc consumer archive |
|
func (s *Service) giftConsumeProc() { |
|
defer func() { |
|
log.Warn("giftConsumeProc exited.") |
|
s.wg.Done() |
|
}() |
|
var ( |
|
payMsgs = s.giftPaySub.Messages() |
|
freeMsgs = s.giftFreeSub.Messages() |
|
) |
|
log.Info("[service.lottery|giftConsumeProc") |
|
for { |
|
select { |
|
case msg, ok := <-payMsgs: |
|
if !ok { |
|
log.Warn("[service.lottery|giftConsumeProc] giftPaySub has been closed.") |
|
return |
|
} |
|
|
|
var value *info |
|
var subValue *msgContent |
|
err := json.Unmarshal([]byte(msg.Value), &value) |
|
if err != nil { |
|
log.Error("[service.lottery|giftConsumeProc] giftPaySub json decode error:%v", err) |
|
continue |
|
} |
|
err = json.Unmarshal([]byte(value.MsgContent), &subValue) |
|
if err != nil { |
|
log.Error("[service.lottery|giftConsumeProc] giftPaySub json decode error:%v", err) |
|
continue |
|
} |
|
areaV2Id := subValue.Body.RoomInfo.AreaV2Id |
|
areaV2ParentId := subValue.Body.RoomInfo.AreaV2ParentId |
|
giftId := subValue.Body.GiftId |
|
roomId := subValue.Body.RoomId |
|
num := subValue.Body.Num |
|
uid := subValue.Body.Uid |
|
ruid := subValue.Body.Ruid |
|
totalCoin := subValue.Body.TotalCoin |
|
coinType := subValue.Body.CoinType |
|
platform := subValue.Body.Platform |
|
key := fmt.Sprintf(_sendGiftKey, subValue.Body.Tid) |
|
isGetLock, _, err := s.dao.Lock(context.Background(), key, 86400000, 0, 0) |
|
if err != nil || !isGetLock { |
|
log.Error("[service.lottery|giftConsumeProc Lock Error msgKey(%s) uid(%d) ruid(%d) roomId(%d) giftId(%d) num(%d) totalCoin(%d) coinType(%s) tid(%s) key(%s) offset(%d) err(%v)", msg.Key, uid, ruid, roomId, giftId, num, totalCoin, coinType, subValue.Body.Tid, msg.Key, msg.Offset, err) |
|
continue |
|
} |
|
msg.Commit() |
|
log.Info("[service.lottery|giftConsumeProc] pay-msgKey(%s) uid(%d) ruid(%d) roomId(%d) giftId(%d) num(%d) totalCoin(%d) coinType(%s) tid(%s) key(%s) offset(%d)", msg.Key, uid, ruid, roomId, giftId, num, totalCoin, coinType, subValue.Body.Tid, msg.Key, msg.Offset) |
|
s.sendGift(context.Background(), uid, giftId, num, totalCoin, coinType, areaV2ParentId, areaV2Id, platform) |
|
case msg, ok := <-freeMsgs: |
|
if !ok { |
|
log.Warn("[service.lottery|giftConsumeProc] giftFreeSub has been closed.") |
|
return |
|
} |
|
var value *info |
|
var subValue *msgContent |
|
err := json.Unmarshal([]byte(msg.Value), &value) |
|
if err != nil { |
|
log.Error("[service.lottery|giftConsumeProc] giftFreeSub message:%s json decode error:%v", msg.Value, err) |
|
continue |
|
} |
|
err = json.Unmarshal([]byte(value.MsgContent), &subValue) |
|
if err != nil { |
|
log.Error("[service.lottery|giftConsumeProc] giftFreeSub message:%s json decode error:%v", msg.Value, err) |
|
continue |
|
} |
|
areaV2Id := subValue.Body.RoomInfo.AreaV2Id |
|
areaV2ParentId := subValue.Body.RoomInfo.AreaV2ParentId |
|
giftId := subValue.Body.GiftId |
|
roomId := subValue.Body.RoomId |
|
num := subValue.Body.Num |
|
uid := subValue.Body.Uid |
|
ruid := subValue.Body.Ruid |
|
totalCoin := subValue.Body.TotalCoin |
|
coinType := subValue.Body.CoinType |
|
platform := subValue.Body.Platform |
|
key := fmt.Sprintf(_sendGiftKey, subValue.Body.Tid) |
|
isGetLock, _, err := s.dao.Lock(context.Background(), key, 86400000, 0, 0) |
|
if err != nil || !isGetLock { |
|
log.Error("[service.lottery|giftConsumeProc Lock Error msgKey(%s) uid(%d) ruid(%d) roomId(%d) giftId(%d) num(%d) totalCoin(%d) coinType(%s) tid(%s) key(%s) offset(%d) err(%v)", msg.Key, uid, ruid, roomId, giftId, num, totalCoin, coinType, subValue.Body.Tid, msg.Key, msg.Offset, err) |
|
continue |
|
} |
|
msg.Commit() |
|
log.Info("[service.lottery|giftConsumeProc] pay-msgKey(%s) uid(%d) ruid(%d) roomId(%d) giftId(%d) num(%d) totalCoin(%d) coinType(%s) tid(%s) key(%s) offset(%d)", msg.Key, uid, ruid, roomId, giftId, num, totalCoin, coinType, subValue.Body.Tid, msg.Key, msg.Offset) |
|
s.sendGift(context.Background(), uid, giftId, num, totalCoin, coinType, areaV2ParentId, areaV2Id, platform) |
|
default: |
|
time.Sleep(time.Second * 3) |
|
continue |
|
} |
|
} |
|
} |
|
|
|
func (s *Service) capsuleConsumeProc() { |
|
defer func() { |
|
log.Warn("capsuleConsumeProc exited.") |
|
s.wg.Done() |
|
}() |
|
var ( |
|
capsuleMsgs = s.capsuleSub.Messages() |
|
) |
|
log.Info("[service.lottery|capsuleConsumeProc") |
|
for { |
|
select { |
|
case msg, ok := <-capsuleMsgs: |
|
if !ok { |
|
log.Warn("[service.lottery|capsuleConsumeProc] giftPaySub has been closed.") |
|
return |
|
} |
|
var msgContent *info |
|
var value *model.AddCapsule |
|
err := json.Unmarshal([]byte(msg.Value), &msgContent) |
|
if err != nil { |
|
log.Error("[service.lottery|capsuleConsumeProc] json decode error:%v", err) |
|
continue |
|
} |
|
err = json.Unmarshal([]byte(msgContent.MsgContent), &value) |
|
if err != nil { |
|
log.Error("[service.lottery|giftConsumeProc] giftFreeSub message:%s json decode error:%v", msg.Value, err) |
|
continue |
|
} |
|
|
|
uid := value.Uid |
|
cType := value.Type |
|
coinId := value.CoinId |
|
num := value.Num |
|
key := fmt.Sprintf(_addCapsuleKey, value.MsgId) |
|
isGetLock, _, err := s.dao.Lock(context.Background(), key, 86400000, 0, 0) |
|
if err != nil || !isGetLock { |
|
log.Error("[service.lottery|capsuleConsumeProc Lock Error msgKey(%s) uid(%d) num(%d) type(%s) coinId(%d) tid(%s) offset(%d) err(%v)", msg.Key, uid, num, cType, coinId, value.MsgId, msg.Offset, err) |
|
continue |
|
} |
|
msg.Commit() |
|
log.Info("[service.lottery|capsuleConsumeProc] msgKey(%s) uid(%d) num(%d) type(%s) coinId(%s) tid(%s) offset(%d)", msg.Key, uid, num, cType, coinId, value.MsgId, msg.Offset) |
|
s.addCapsule(context.Background(), uid, coinId, num) |
|
default: |
|
time.Sleep(time.Second * 3) |
|
continue |
|
} |
|
} |
|
} |
|
|
|
// SendGift 送礼增加扭蛋积分 |
|
func (s *Service) sendGift(ctx context.Context, uid, giftId, num, totalCoin int64, coinType string, areaV2ParentId, areaV2Id int64, platform string) { |
|
if totalCoin <= 0 { |
|
return |
|
} |
|
coinConfMap, err := s.dao.GetCapsuleConf(ctx) |
|
if err != nil || len(coinConfMap) == 0 { |
|
return |
|
} |
|
var addCoinId = int64(dao.NormalCoinId) |
|
var coinIds = []int64{dao.BlessCoinId, dao.LplCoinId, dao.WeekCoinId, dao.ColorfulCoinId, dao.NormalCoinId} |
|
for _, coinId := range coinIds { |
|
if _, ok := coinConfMap[coinId]; ok { |
|
if coinConfMap[coinId].AreaMap != nil { |
|
_, v2ID := coinConfMap[coinId].AreaMap[areaV2Id] |
|
_, v2ParentID := coinConfMap[coinId].AreaMap[areaV2ParentId] |
|
if v2ID || v2ParentID { |
|
if coinConfMap[coinId].GiftType == dao.CapsuleGiftTypeAll { |
|
addCoinId = coinId |
|
} else if coinConfMap[coinId].GiftType == dao.CapsuleGiftTypeGold { |
|
if coinType == "gold" { |
|
addCoinId = coinId |
|
} |
|
} else if coinConfMap[coinId].GiftType == dao.CapsuleGiftTypeSelected { |
|
if coinConfMap[coinId].GiftMap != nil { |
|
if _, ok := coinConfMap[coinId].GiftMap[giftId]; ok { |
|
addCoinId = coinId |
|
} |
|
} |
|
} |
|
} |
|
} |
|
} |
|
if addCoinId != dao.NormalCoinId { |
|
break |
|
} |
|
} |
|
// 首次赠送 |
|
if addCoinId == dao.LplCoinId { |
|
if s.dao.CheckLplFirstGift(ctx, uid, giftId) { |
|
totalCoin = totalCoin + coinConfMap[addCoinId].ChangeNum |
|
} |
|
} |
|
if addCoinId <= dao.ColorfulCoinId { |
|
_, err = s.dao.UpdateScore(ctx, uid, addCoinId, totalCoin, "sendGift", platform, nil, coinConfMap[addCoinId]) |
|
} else { |
|
_, err = s.dao.UpdateCapsule(ctx, uid, addCoinId, totalCoin, "sendGift", platform, coinConfMap[addCoinId]) |
|
} |
|
if err != nil { |
|
log.Error("[service.lottery|sendGift] UpdateScore type:%d error:%v", addCoinId, err) |
|
return |
|
} |
|
} |
|
|
|
func (s *Service) addCapsule(ctx context.Context, uid, coinId, num int64) { |
|
coinConfMap, err := s.dao.GetCapsuleConf(ctx) |
|
if err != nil || len(coinConfMap) == 0 { |
|
return |
|
} |
|
addCoinId := coinId |
|
if _, ok := coinConfMap[addCoinId]; !ok { |
|
return |
|
} |
|
totalCoin := coinConfMap[addCoinId].ChangeNum * num |
|
if addCoinId <= dao.ColorfulCoinId { |
|
_, err = s.dao.UpdateScore(ctx, uid, addCoinId, totalCoin, "sendGift", "", nil, coinConfMap[addCoinId]) |
|
} else { |
|
_, err = s.dao.UpdateCapsule(ctx, uid, addCoinId, totalCoin, "sendGift", "", coinConfMap[addCoinId]) |
|
} |
|
if err != nil { |
|
log.Error("[service.lottery|addCapsule] UpdateScore type:%d error:%v", addCoinId, err) |
|
return |
|
} |
|
} |
|
|
|
//定时重置Capusule |
|
func (s *Service) tickerReloadCapsuleConf(ctx context.Context) { |
|
changeFlag, _ := s.dao.GetCapsuleChangeFlag(ctx) |
|
s.dao.RelaodCapsuleConfig(ctx, changeFlag) |
|
ticker := time.NewTicker(time.Second) |
|
go func() { |
|
for range ticker.C { |
|
redisChangeFlag, _ := s.dao.GetCapsuleChangeFlag(ctx) |
|
capsuleCacheTime, capsuleChangeFlag := s.dao.GetCapsuleChangeInfo(ctx) |
|
if redisChangeFlag != capsuleChangeFlag || time.Now().Unix()-capsuleCacheTime > 60 { |
|
s.dao.RelaodCapsuleConfig(ctx, redisChangeFlag) |
|
} |
|
} |
|
}() |
|
}
|
|
|