You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
176 lines
5.3 KiB
176 lines
5.3 KiB
package service |
|
|
|
import ( |
|
"context" |
|
|
|
"go-common/app/job/main/passport-user/conf" |
|
"go-common/app/job/main/passport-user/dao" |
|
"go-common/app/job/main/passport-user/model" |
|
"go-common/library/log" |
|
"go-common/library/queue/databus" |
|
"go-common/library/queue/databus/databusutil" |
|
) |
|
|
|
const ( |
|
_asoAccountTable = "aso_account" |
|
_asoAccountInfoTable = "aso_account_info" |
|
_asoAccountSnsTable = "aso_account_sns" |
|
_asoAccountRegOriginTable = "aso_account_reg_origin" |
|
_insertAction = "insert" |
|
_updateAction = "update" |
|
_deleteAction = "delete" |
|
_asoAccountInfoSharding = 30 |
|
_asoAccountRegOriginSharding = 20 |
|
_mySQLErrCodeDuplicateEntry = 1062 |
|
_retry = 3 |
|
) |
|
|
|
// Service service. |
|
type Service struct { |
|
c *conf.Config |
|
d *dao.Dao |
|
// aso binlog consumer |
|
asoBinLogConsumer *databus.Databus |
|
group *databusutil.Group |
|
// fullSync chan |
|
asoAccountChan []chan *model.OriginAccount |
|
asoAccountInfoChan []chan *model.OriginAccountInfo |
|
asoAccountRegChan []chan *model.OriginAccountReg |
|
asoAccountSnsChan []chan *model.OriginAccountSns |
|
asoTelBindLogChan []chan *model.UserTel |
|
|
|
countryMap map[int64]string |
|
aesKey []byte |
|
salt []byte |
|
//cron *cron.Cron |
|
|
|
ch chan func() |
|
} |
|
|
|
// New new a service instance. |
|
func New(c *conf.Config) (s *Service) { |
|
s = &Service{ |
|
c: c, |
|
d: dao.New(c), |
|
asoAccountChan: make([]chan *model.OriginAccount, c.FullSync.AsoAccount.ChanNum), |
|
asoAccountInfoChan: make([]chan *model.OriginAccountInfo, c.FullSync.AsoAccountInfo.ChanNum), |
|
asoAccountRegChan: make([]chan *model.OriginAccountReg, c.FullSync.AsoAccountReg.ChanNum), |
|
asoAccountSnsChan: make([]chan *model.OriginAccountSns, c.FullSync.AsoAccountSns.ChanNum), |
|
asoTelBindLogChan: make([]chan *model.UserTel, c.FullSync.AsoTelBindLog.ChanNum), |
|
ch: make(chan func(), 1024000), |
|
} |
|
go s.cacheproc() |
|
if c.FullSync.AsoCountryCodeSwitch { |
|
err := s.syncAsoCountryCode() |
|
if err != nil { |
|
log.Error("fail to sync AsoCountryCode") |
|
panic(err) |
|
} |
|
} |
|
aesKey, err := s.d.AesKey(context.Background()) |
|
if err != nil || aesKey == "" { |
|
log.Error("fail to get rsaKey") |
|
panic(err) |
|
} |
|
s.aesKey = []byte(aesKey) |
|
salt, err := s.d.Salt(context.Background()) |
|
if err != nil || salt == "" { |
|
log.Error("fail to get salt") |
|
panic(err) |
|
} |
|
s.salt = []byte(salt) |
|
s.countryMap, err = s.d.CountryCodeMap(context.Background()) |
|
if err != nil || len(s.countryMap) == 0 { |
|
log.Error("fail to get country map") |
|
panic(err) |
|
} |
|
if c.FullSync.AsoAccount.Switch { |
|
for i := 0; i < c.FullSync.AsoAccount.ChanNum; i++ { |
|
ch := make(chan *model.OriginAccount, c.FullSync.AsoAccount.ChanSize) |
|
s.asoAccountChan[i] = ch |
|
go s.asoAccountConsume(ch) |
|
} |
|
go s.getAsoAccount(c.FullSync.AsoAccount.Start, c.FullSync.AsoAccount.End, c.FullSync.AsoAccount.Count) |
|
} |
|
if c.FullSync.AsoAccountInfo.Switch { |
|
for i := 0; i < c.FullSync.AsoAccountInfo.ChanNum; i++ { |
|
ch := make(chan *model.OriginAccountInfo, c.FullSync.AsoAccountInfo.ChanSize) |
|
s.asoAccountInfoChan[i] = ch |
|
go s.asoAccountInfoConsume(ch) |
|
} |
|
go s.getAsoAccountInfo(c.FullSync.AsoAccountInfo.Start, c.FullSync.AsoAccountInfo.End, c.FullSync.AsoAccountInfo.Count) |
|
} |
|
if c.FullSync.AsoAccountReg.Switch { |
|
for i := 0; i < c.FullSync.AsoAccountReg.ChanNum; i++ { |
|
ch := make(chan *model.OriginAccountReg, c.FullSync.AsoAccountReg.ChanSize) |
|
s.asoAccountRegChan[i] = ch |
|
go s.asoAccountRegConsume(ch) |
|
} |
|
go s.getAsoAccountReg(c.FullSync.AsoAccountReg.Start, c.FullSync.AsoAccountReg.End, c.FullSync.AsoAccountReg.Count) |
|
} |
|
if c.FullSync.AsoAccountSns.Switch { |
|
for i := 0; i < c.FullSync.AsoAccountSns.ChanNum; i++ { |
|
ch := make(chan *model.OriginAccountSns, c.FullSync.AsoAccountSns.ChanSize) |
|
s.asoAccountSnsChan[i] = ch |
|
go s.asoAccountSnsConsume(ch) |
|
} |
|
go s.getAsoAccountSns(c.FullSync.AsoAccountSns.Start, c.FullSync.AsoAccountSns.End, c.FullSync.AsoAccountSns.Count) |
|
} |
|
if c.FullSync.AsoTelBindLog.Switch { |
|
for i := 0; i < c.FullSync.AsoTelBindLog.ChanNum; i++ { |
|
ch := make(chan *model.UserTel, c.FullSync.AsoTelBindLog.ChanSize) |
|
s.asoTelBindLogChan[i] = ch |
|
go s.asoTelBindLogConsume(ch) |
|
} |
|
go s.getAsoTelBindLog(c.FullSync.AsoTelBindLog.Start, c.FullSync.AsoTelBindLog.End, c.FullSync.AsoTelBindLog.Count) |
|
} |
|
if c.IncSync.Switch { |
|
s.asoBinLogConsumer = databus.New(c.DataBus.AsoBinLogSub) |
|
s.group = databusutil.NewGroup( |
|
c.DatabusUtil, |
|
s.asoBinLogConsumer.Messages(), |
|
) |
|
s.consumeproc() |
|
} |
|
//if c.Scheduler.Switch { |
|
// s.cron = cron.New() |
|
// if err := s.cron.AddFunc(c.Scheduler.EmailDuplicateCron, s.checkEmailDuplicateJob); err != nil { |
|
// panic(err) |
|
// } |
|
// if err := s.cron.AddFunc(c.Scheduler.TelDuplicateCron, s.checkTelDuplicateJob); err != nil { |
|
// panic(err) |
|
// } |
|
// s.cron.Start() |
|
//} |
|
return |
|
} |
|
|
|
// Ping check server ok. |
|
func (s *Service) Ping(c context.Context) (err error) { |
|
return s.d.Ping(c) |
|
} |
|
|
|
// Close close service, including databus and outer service. |
|
func (s *Service) Close() (err error) { |
|
if err = s.group.Close(); err != nil { |
|
log.Error("s.group.Close() error(%v)", err) |
|
} |
|
s.d.Close() |
|
return |
|
} |
|
|
|
func (s *Service) addCache(f func()) { |
|
select { |
|
case s.ch <- f: |
|
default: |
|
log.Warn("cacheproc chan full") |
|
} |
|
} |
|
|
|
// cacheproc is a routine for executing closure. |
|
func (s *Service) cacheproc() { |
|
for { |
|
f := <-s.ch |
|
f() |
|
} |
|
}
|
|
|