You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
143 lines
3.6 KiB
143 lines
3.6 KiB
package service |
|
|
|
import ( |
|
"bytes" |
|
"context" |
|
"fmt" |
|
"math/rand" |
|
"time" |
|
|
|
"go-common/app/job/main/ugcpay/dao" |
|
"go-common/app/job/main/ugcpay/model" |
|
xsql "go-common/library/database/sql" |
|
"go-common/library/log" |
|
|
|
"github.com/pkg/errors" |
|
) |
|
|
|
type taskBillMonthly struct { |
|
dao *dao.Dao |
|
rnd *rand.Rand |
|
monthOffset int |
|
namePrefix string |
|
tl *taskLog |
|
} |
|
|
|
func (s *taskBillMonthly) Run() (err error) { |
|
var ( |
|
ctx = context.Background() |
|
finished bool |
|
expectFN = func(ctx context.Context) (expect int64, err error) { |
|
var ( |
|
beginTime, _ = monthRange(s.monthOffset) |
|
monthVer = monthlyBillVer(beginTime) |
|
) |
|
if expect, err = s.dao.CountDailyBillByMonthVer(ctx, monthVer); err != nil { |
|
return |
|
} |
|
return |
|
} |
|
) |
|
if finished, err = checkOrCreateTaskFromLog(ctx, s, s.tl, expectFN); err != nil || finished { |
|
return |
|
} |
|
return s.run(ctx) |
|
} |
|
|
|
func (s *taskBillMonthly) TTL() int32 { |
|
return 3600 * 2 |
|
} |
|
|
|
func (s *taskBillMonthly) Name() string { |
|
return fmt.Sprintf("%s_%d", s.namePrefix, monthlyBillVer(time.Now())) |
|
} |
|
|
|
// 月账单生成 |
|
func (s *taskBillMonthly) run(ctx context.Context) (err error) { |
|
ll := &dailyBillLLByMonthVer{ |
|
limit: 1000, |
|
dao: s.dao, |
|
} |
|
beginTime, _ := monthRange(s.monthOffset) |
|
ll.monthVer = monthlyBillVer(beginTime) |
|
return runLimitedList(ctx, ll, time.Millisecond*2, s.runDailyBill) |
|
} |
|
|
|
func (s *taskBillMonthly) runDailyBill(ctx context.Context, ele interface{}) (err error) { |
|
dailyBill, ok := ele.(*model.DailyBill) |
|
if !ok { |
|
return errors.Errorf("taskBillMonthly convert ele: %+v failed", dailyBill) |
|
} |
|
log.Info("taskBillMonthly start handle daily biil: %+v", dailyBill) |
|
|
|
fn := func(ctx context.Context, tx *xsql.Tx) (affected bool, err error) { |
|
var ( |
|
monthlyBill *model.Bill |
|
monthVer = dailyBill.MonthVer |
|
monthlyBillLog *model.LogBillMonthly |
|
) |
|
affected = true |
|
|
|
// 获得该 mid 的 daily_bill |
|
if monthlyBill, err = s.dao.MonthlyBill(ctx, dailyBill.MID, model.BizAsset, model.CurrencyBP, monthVer); err != nil { |
|
return |
|
} |
|
if monthlyBill == nil { |
|
if monthlyBill, err = s.initMonthlyBill(ctx, dailyBill.MID, dailyBill.Biz, dailyBill.Currency, dailyBill.MonthVer); err != nil { |
|
return |
|
} |
|
} |
|
monthlyBillLog = &model.LogBillMonthly{ |
|
BillID: monthlyBill.BillID, |
|
FromIn: monthlyBill.In, |
|
ToIn: monthlyBill.In + dailyBill.In, |
|
FromOut: monthlyBill.Out, |
|
ToOut: monthlyBill.Out + dailyBill.Out, |
|
BillUserDailyID: dailyBill.BillID, |
|
} |
|
monthlyBill.In += dailyBill.In |
|
monthlyBill.Out += dailyBill.Out |
|
|
|
// 添加 monthly bill log , uk : daily_bill_id |
|
_, err = s.dao.TXInsertLogMonthlyBill(ctx, tx, monthlyBillLog) |
|
if err != nil { |
|
tx.Rollback() |
|
return |
|
} |
|
|
|
// 更新 monthly bill |
|
_, err = s.dao.TXUpdateMonthlyBill(ctx, tx, monthlyBill) |
|
if err != nil { |
|
tx.Rollback() |
|
return |
|
} |
|
log.Info("taskBillMonthly: %+v,from daily bill: %+v", monthlyBill, dailyBill) |
|
return |
|
} |
|
return runTXCASTaskWithLog(ctx, s, s.tl, fn) |
|
} |
|
|
|
func (s *taskBillMonthly) initMonthlyBill(ctx context.Context, mid int64, biz, currency string, ver int64) (data *model.Bill, err error) { |
|
data = &model.Bill{ |
|
BillID: orderID(s.rnd), |
|
MID: mid, |
|
Biz: biz, |
|
Currency: currency, |
|
In: 0, |
|
Out: 0, |
|
Ver: ver, |
|
Version: 1, |
|
} |
|
if data.ID, err = s.dao.InsertMonthlyBill(ctx, data); err != nil { |
|
return |
|
} |
|
return |
|
} |
|
|
|
func orderID(rnd *rand.Rand) string { |
|
var b bytes.Buffer |
|
b.WriteString(fmt.Sprintf("%05d", rnd.Int63n(99999))) |
|
b.WriteString(fmt.Sprintf("%03d", time.Now().UnixNano()/1e6%1000)) |
|
b.WriteString(time.Now().Format("060102150405")) |
|
return b.String() |
|
}
|
|
|