You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
96 lines
2.8 KiB
96 lines
2.8 KiB
package service |
|
|
|
import ( |
|
"context" |
|
"time" |
|
|
|
"go-common/app/job/main/vip/model" |
|
"go-common/library/log" |
|
) |
|
|
|
// SyncAllUser 同步旧user——info到新db. |
|
// FIXME 切新db后删除. |
|
func (s *Service) SyncAllUser(c context.Context) { |
|
var ( |
|
err error |
|
maxID int |
|
size = s.c.Property.BatchSize |
|
ids = []int64{} |
|
ousers = make(map[int64]*model.VipUserInfoOld, size) |
|
nusers = make(map[int64]*model.VipUserInfo, _ps) |
|
updateDB = s.c.Property.UpdateDB |
|
nu *model.VipUserInfo |
|
ok bool |
|
) |
|
if maxID, err = s.dao.SelOldUserInfoMaxID(context.TODO()); err != nil { |
|
log.Error("sync job s.dao.SelOldUserInfoMaxID err(%+v)", err) |
|
return |
|
} |
|
page := maxID / size |
|
if maxID%size != 0 { |
|
page++ |
|
} |
|
log.Info("sync job vip_user_info total(%d)", page) |
|
for i := 0; i < page; i++ { |
|
log.Info("sync job vip_user_info page index(%d) total(%d)", i, page) |
|
startID := i * size |
|
endID := (i + 1) * size |
|
if endID > maxID { |
|
endID = maxID |
|
} |
|
if ousers, err = s.dao.SelOldUserInfoMaps(context.TODO(), startID, endID); err != nil { |
|
log.Error("sync job s.dao.SelOldUserInfoMaps(%d, %d) err(%+v)", startID, endID, err) |
|
return |
|
} |
|
j := 1 |
|
for _, v := range ousers { |
|
ids = append(ids, v.Mid) |
|
if j%_ps == 0 || j == len(ousers) { |
|
if nusers, err = s.dao.SelVipByIds(context.TODO(), ids); err != nil { |
|
return |
|
} |
|
for _, mid := range ids { |
|
var ou *model.VipUserInfoOld |
|
if ou, ok = ousers[mid]; !ok { |
|
log.Warn("sync job old not found %d", mid) |
|
continue |
|
} |
|
if nu, ok = nusers[mid]; !ok { |
|
log.Warn("sync job need insert to new %d, old(%+v), toNew(%+v)", mid, ou, ou.ToNew()) |
|
if updateDB { |
|
s.dao.SyncAddUser(context.Background(), ou.ToNew()) |
|
} |
|
continue |
|
} |
|
if ou.RecentTime <= 0 { |
|
ou.RecentTime = ou.Mtime |
|
} |
|
if nu.Type != ou.Type || |
|
nu.Status != ou.Status || |
|
!nu.StartTime.Time().Equal(ou.StartTime.Time()) || |
|
!nu.OverdueTime.Time().Equal(ou.OverdueTime.Time()) || |
|
!nu.AnnualVipOverdueTime.Time().Equal(ou.AnnualVipOverdueTime.Time()) || |
|
!nu.Ctime.Time().Equal(ou.Ctime.Time()) || |
|
!nu.Mtime.Time().Equal(ou.Mtime.Time()) || |
|
nu.PayType != ou.IsAutoRenew || |
|
nu.PayChannelID != ou.PayChannelID || |
|
!nu.IosOverdueTime.Time().Equal(ou.IosOverdueTime.Time()) || |
|
nu.Ver != ou.Ver || |
|
!nu.RecentTime.Time().Equal(ou.RecentTime.Time()) { |
|
log.Warn("sync job need update to new %d, old(%+v), new(%+v), toNew(%+v)", mid, ou, nu, ou.ToNew()) |
|
if updateDB { |
|
s.dao.SyncUpdateUser(context.Background(), ou.ToNew(), nu.Ver) |
|
} |
|
continue |
|
} |
|
} |
|
log.Info("sync job vip_user_info page index(%d) ids(%+v)", j, ids) |
|
// reset |
|
ids = []int64{} |
|
} |
|
j++ |
|
} |
|
log.Info("sync job vip_user_info page index(%d) end", i) |
|
time.Sleep(time.Millisecond * _defsleepmsec) |
|
} |
|
}
|
|
|