You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
163 lines
4.2 KiB
163 lines
4.2 KiB
package service |
|
|
|
import ( |
|
"context" |
|
"encoding/json" |
|
"sync" |
|
"time" |
|
|
|
"go-common/app/job/main/coin/conf" |
|
"go-common/app/job/main/coin/dao" |
|
"go-common/app/job/main/coin/model" |
|
accrpc "go-common/app/service/main/account/api" |
|
arcrpc "go-common/app/service/main/archive/api/gorpc" |
|
coinrpc "go-common/app/service/main/coin/api/gorpc" |
|
coinmdl "go-common/app/service/main/coin/model" |
|
memrpc "go-common/app/service/main/member/api" |
|
"go-common/library/log" |
|
"go-common/library/queue/databus" |
|
"go-common/library/queue/databus/databusutil" |
|
) |
|
|
|
// Service coin job service. |
|
type Service struct { |
|
coinDao *dao.Dao |
|
c *conf.Config |
|
waiter *sync.WaitGroup |
|
accRPC accrpc.AccountClient |
|
memRPC memrpc.MemberClient |
|
arcRPC *arcrpc.Service2 |
|
coinRPC *coinrpc.Service |
|
databus *databus.Databus |
|
group *databusutil.Group |
|
expGroup *databusutil.Group |
|
} |
|
|
|
// New new and return service. |
|
func New(c *conf.Config) (s *Service) { |
|
s = &Service{ |
|
coinDao: dao.New(c), |
|
c: c, |
|
waiter: new(sync.WaitGroup), |
|
arcRPC: arcrpc.New2(c.ArchiveRPC), |
|
coinRPC: coinrpc.New(c.CoinRPC), |
|
} |
|
var err error |
|
if s.memRPC, err = memrpc.NewClient(c.MemRPC); err != nil { |
|
panic(err) |
|
} |
|
if s.accRPC, err = accrpc.NewClient(c.AccountRPC); err != nil { |
|
panic(err) |
|
} |
|
s.databus = databus.New(c.Databus) |
|
g := databusutil.NewGroup(c.Databusutil, databus.New(c.LoginDatabus).Messages()) |
|
g.New = newMsg |
|
g.Split = split |
|
g.Do = s.awardDo |
|
g.Start() |
|
s.group = g |
|
eg := databusutil.NewGroup(c.Databusutil, databus.New(c.ExpDatabus).Messages()) |
|
eg.New = newExpMsg |
|
eg.Split = split |
|
eg.Do = s.awardDo |
|
eg.Start() |
|
s.expGroup = eg |
|
|
|
s.waiter.Add(1) |
|
go s.consumeproc() |
|
go s.settleproc() |
|
return |
|
} |
|
|
|
func (s *Service) consumeproc() { |
|
defer s.waiter.Done() |
|
var ( |
|
msg *databus.Message |
|
err error |
|
ok bool |
|
period *model.CoinSettlePeriod |
|
ctx = context.TODO() |
|
) |
|
for { |
|
if msg, ok = <-s.databus.Messages(); !ok { |
|
log.Error("s.databus.Message err(%v)", err) |
|
return |
|
} |
|
r := &coinmdl.Record{} |
|
if err = json.Unmarshal([]byte(msg.Value), r); err != nil { |
|
log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err) |
|
dao.PromError("msg:JSON") |
|
continue |
|
} |
|
var ip = r.IPV6 |
|
for i := 0; i < 3; i++ { |
|
if err = s.addCoinExp(ctx, r.Mid, r.AvType, r.Multiply, ip); err != nil { |
|
time.Sleep(time.Millisecond * 50) |
|
continue |
|
} |
|
break |
|
} |
|
if err != nil { |
|
log.Errorv(ctx, log.KV("log", "fix: addCoinExp"), log.KV("mid", r.Mid), log.KV("type", r.AvType), log.KV("num", r.Multiply), log.KV("ip", ip)) |
|
continue |
|
} |
|
if err = s.updateAddCoin(ctx, r); err != nil { |
|
continue |
|
} |
|
at := time.Unix(r.Timestamp, 0) |
|
if period, err = s.coinDao.HitSettlePeriod(ctx, at); err != nil { |
|
log.Errorv(ctx, log.KV("log", "s.coinDao.HitCoinPeriod"), log.KV("record", r), log.KV("err", err)) |
|
dao.PromError("service:HitSettlePeriod") |
|
continue |
|
} |
|
for i := 0; ; i++ { |
|
if err = s.coinDao.UpsertSettle(ctx, period.ID, r.Up, r.Aid, r.AvType, r.Multiply, time.Now()); err != nil { |
|
log.Error("s.coinDao.UpsertCoinSettle(%d, %d, %d) error(%v)", r.Up, r.Aid, r.Multiply) |
|
dao.PromError("service:UpsertSettle") |
|
i++ |
|
if i > 5 { |
|
// if env.DeployEnv == env.DeployEnvProd { |
|
// s.moni.Sms(ctx, s.c.Sms.Phone, s.c.Sms.Token, "coin-job upsetSettle fail for 5 time") |
|
// } |
|
break |
|
} |
|
continue |
|
} |
|
break |
|
} |
|
log.Info("key: %s,partion:%d,offset:%d success %s", msg.Key, msg.Partition, msg.Offset, msg.Value) |
|
err = msg.Commit() |
|
if err != nil { |
|
log.Error("msg.Commit partition:%d offset:%d err %v", msg.Partition, msg.Offset, err) |
|
dao.PromError("service:msgCommit") |
|
} |
|
} |
|
} |
|
|
|
// Close close service. |
|
func (s *Service) Close() { |
|
s.group.Close() |
|
s.expGroup.Close() |
|
s.databus.Close() |
|
} |
|
|
|
// Wait wait routine unitl all close. |
|
func (s *Service) Wait() { |
|
s.waiter.Wait() |
|
} |
|
|
|
// Ping check service health. |
|
func (s *Service) Ping(c context.Context) error { |
|
return s.coinDao.Ping(c) |
|
} |
|
|
|
func (s *Service) updateAddCoin(c context.Context, record *coinmdl.Record) (err error) { |
|
if record == nil { |
|
return |
|
} |
|
if err = s.coinRPC.UpdateAddCoin(c, record); err != nil { |
|
log.Errorv(c, log.KV("log", "UpdateAddCoin"), log.KV("mid", record.Mid), log.KV("record", record)) |
|
dao.PromError("service:updateAddCoin") |
|
} |
|
return |
|
}
|
|
|