You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
170 lines
3.8 KiB
170 lines
3.8 KiB
package service |
|
|
|
import ( |
|
"context" |
|
"encoding/json" |
|
"strconv" |
|
"strings" |
|
"time" |
|
|
|
"go-common/app/job/main/passport/model" |
|
igmdl "go-common/app/service/main/identify-game/model" |
|
"go-common/library/log" |
|
"go-common/library/queue/databus" |
|
) |
|
|
|
const ( |
|
_changePwd = "changePwd" |
|
|
|
_retryCount = 3 |
|
_retryDuration = time.Second |
|
) |
|
|
|
func (s *Service) tokenconsumeproc() { |
|
mergeNum := s.c.Group.AsoBinLog.Num |
|
var ( |
|
err error |
|
n int |
|
msgs = s.dsToken.Messages() |
|
) |
|
for { |
|
msg, ok := <-msgs |
|
if !ok { |
|
log.Error("s.tokenconsumeproc closed") |
|
return |
|
} |
|
// marked head to first commit |
|
m := &message{data: msg} |
|
if n, err = strconv.Atoi(msg.Key); err != nil { |
|
log.Error("strconv.Atoi(%s) error(%v)", msg.Key, err) |
|
continue |
|
} |
|
s.mu.Lock() |
|
if s.head == nil { |
|
s.head = m |
|
s.last = m |
|
} else { |
|
s.last.next = m |
|
s.last = m |
|
} |
|
s.mu.Unlock() |
|
// use specify goroutine to merge messages |
|
s.tokenMergeChans[n%mergeNum] <- m |
|
log.Info("tokenconsumeproc key:%s partition:%d offset:%d", msg.Key, msg.Partition, msg.Offset) |
|
} |
|
} |
|
|
|
func (s *Service) tokencommitproc() { |
|
commits := make(map[int32]*databus.Message, s.c.Group.AsoBinLog.Size) |
|
for { |
|
done := <-s.tokenDoneChan |
|
// merge partitions to commit offset |
|
for _, d := range done { |
|
d.done = true |
|
} |
|
s.mu.Lock() |
|
for ; s.head != nil && s.head.done; s.head = s.head.next { |
|
commits[s.head.data.Partition] = s.head.data |
|
} |
|
s.mu.Unlock() |
|
for k, m := range commits { |
|
log.Info("tokencommitproc committed, key:%s partition:%d offset:%d", m.Key, m.Partition, m.Offset) |
|
m.Commit() |
|
delete(commits, k) |
|
} |
|
} |
|
} |
|
|
|
func (s *Service) tokenmergeproc(c chan *message) { |
|
var ( |
|
err error |
|
max = s.c.Group.AsoBinLog.Size |
|
merges = make([]*model.AccessInfo, 0, max) |
|
marked = make([]*message, 0, max) |
|
ticker = time.NewTicker(time.Duration(s.c.Group.AsoBinLog.Ticker)) |
|
) |
|
for { |
|
select { |
|
case msg, ok := <-c: |
|
if !ok { |
|
log.Error("s.tokenmergeproc closed") |
|
return |
|
} |
|
bmsg := &model.BMsg{} |
|
if err = json.Unmarshal(msg.data.Value, bmsg); err != nil { |
|
log.Error("json.Unmarshal(%s) error(%v)", string(msg.data.Value), err) |
|
continue |
|
} |
|
if bmsg.Action == "delete" && strings.HasPrefix(bmsg.Table, "aso_app_perm") { |
|
t := &model.AccessInfo{} |
|
if err = json.Unmarshal(bmsg.New, t); err != nil { |
|
log.Error("json.Unmarshal(%s) error(%v)", string(bmsg.New), err) |
|
continue |
|
} |
|
merges = append(merges, t) |
|
} |
|
marked = append(marked, msg) |
|
if len(marked) < max && len(merges) < max { |
|
continue |
|
} |
|
case <-ticker.C: |
|
} |
|
if len(merges) > 0 { |
|
s.cleanTokens(merges) |
|
merges = make([]*model.AccessInfo, 0, max) |
|
} |
|
if len(marked) > 0 { |
|
s.tokenDoneChan <- marked |
|
marked = make([]*message, 0, max) |
|
} |
|
} |
|
} |
|
|
|
// cleanTokens clean tokens. |
|
func (s *Service) cleanTokens(tokens []*model.AccessInfo) { |
|
for _, token := range tokens { |
|
s.cleanToken(token) |
|
} |
|
} |
|
|
|
// cleanToken to notify other clean access token. |
|
func (s *Service) cleanToken(token *model.AccessInfo) (err error) { |
|
if token == nil || token.Expires < time.Now().Unix() { |
|
return |
|
} |
|
isGame := false |
|
for _, id := range s.gameAppIDs { |
|
if id == token.AppID { |
|
isGame = true |
|
break |
|
} |
|
} |
|
if !isGame { |
|
return |
|
} |
|
for { |
|
if err = s.d.DelCache(context.TODO(), token.Token); err == nil { |
|
break |
|
} |
|
time.Sleep(_retryDuration) |
|
} |
|
for i := 0; i < _retryCount; i++ { |
|
arg := &igmdl.CleanCacheArgs{ |
|
Token: token.Token, |
|
Mid: token.Mid, |
|
} |
|
if err = s.igRPC.DelCache(context.TODO(), arg); err == nil { |
|
break |
|
} |
|
log.Error("service.identifyGameRPC.DelCache(%+v) error(%v)", arg, err) |
|
time.Sleep(_retryDuration) |
|
} |
|
for i := 0; i < _retryCount; i++ { |
|
if err = s.d.NotifyGame(token, _changePwd); err == nil { |
|
return |
|
} |
|
time.Sleep(_retryDuration) |
|
} |
|
log.Error("notify err, token(%+v)", token) |
|
return |
|
}
|
|
|