You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
116 lines
2.9 KiB
116 lines
2.9 KiB
package service |
|
|
|
import ( |
|
"encoding/base64" |
|
"encoding/hex" |
|
"encoding/json" |
|
"strings" |
|
|
|
"go-common/app/job/main/identify/model" |
|
mdl "go-common/app/service/main/identify/model" |
|
"go-common/library/log" |
|
"go-common/library/queue/databus" |
|
) |
|
|
|
func (s *Service) new(msg *databus.Message) (interface{}, error) { |
|
bmsg := new(model.BMsg) |
|
if err := json.Unmarshal(msg.Value, bmsg); err != nil { |
|
log.Error("json.Unmarshal(%s) error(%v)", string(msg.Value), err) |
|
return nil, err |
|
} |
|
if bmsg.Action != "delete" { |
|
return bmsg, nil |
|
} |
|
if strings.HasPrefix(bmsg.Table, "user_token_") { |
|
t := new(model.AuthToken) |
|
if err := json.Unmarshal(bmsg.New, t); err != nil { |
|
log.Error("json.Unmarshal(%s) error(%v)", string(bmsg.New), err) |
|
return nil, err |
|
} |
|
log.Info("identifyconsumeproc table:%s key:%s partition:%d offset:%d", bmsg.Table, msg.Key, msg.Partition, msg.Offset) |
|
return t, nil |
|
} else if strings.HasPrefix(bmsg.Table, "user_cookie_") { |
|
t := new(model.AuthCookie) |
|
if err := json.Unmarshal(bmsg.New, t); err != nil { |
|
log.Error("json.Unmarshal(%s) error(%v)", string(bmsg.New), err) |
|
return nil, err |
|
} |
|
log.Info("identifyconsumeproc table:%s key:%s partition:%d offset:%d", bmsg.Table, msg.Key, msg.Partition, msg.Offset) |
|
return t, nil |
|
} |
|
return bmsg, nil |
|
} |
|
|
|
func (s *Service) spilt(msg *databus.Message, data interface{}) int { |
|
switch t := data.(type) { |
|
case *model.AuthToken: |
|
return int(t.Mid) |
|
case *model.AuthCookie: |
|
return int(t.Mid) |
|
default: |
|
return 0 |
|
} |
|
} |
|
|
|
func (s *Service) processAuthBinlog2(bmsgs []interface{}) { |
|
for _, msg := range bmsgs { |
|
switch t := msg.(type) { |
|
case *model.AuthToken: |
|
var ( |
|
bytes []byte |
|
err error |
|
) |
|
if bytes, err = base64.StdEncoding.DecodeString(t.Token); err != nil { |
|
log.Error("cleanCookieCache base64 decode err %v", err) |
|
err = nil |
|
return |
|
} |
|
info := &mdl.IdentifyInfo{ |
|
Mid: t.Mid, |
|
Expires: int32(t.Expires), |
|
} |
|
|
|
if ok := isGameAppID(t.AppID); ok { |
|
continue |
|
} |
|
for { |
|
log.Info("auth service process token databus, key(%s)", hex.EncodeToString(bytes)) |
|
if err := s.processIdentify("delete", hex.EncodeToString(bytes), info); err != nil { |
|
continue |
|
} |
|
break |
|
} |
|
case *model.AuthCookie: |
|
var ( |
|
bytes []byte |
|
bytesCSRF []byte |
|
err error |
|
) |
|
if bytes, err = base64.StdEncoding.DecodeString(t.Session); err != nil { |
|
log.Error("cleanCookieCache base64 decode err %v", err) |
|
err = nil |
|
return |
|
} |
|
if bytesCSRF, err = base64.StdEncoding.DecodeString(t.CSRF); err != nil { |
|
log.Error("cleanCookieCache base64 decode err %v", err) |
|
err = nil |
|
return |
|
} |
|
info := &mdl.IdentifyInfo{ |
|
Mid: t.Mid, |
|
Csrf: hex.EncodeToString(bytesCSRF), |
|
Expires: int32(t.Expires), |
|
} |
|
|
|
for { |
|
log.Info("auth service process cookie databus, key(%s)", string(bytes)) |
|
if err := s.processIdentify("delete", string(bytes), info); err != nil { |
|
continue |
|
} |
|
break |
|
} |
|
default: |
|
return |
|
} |
|
} |
|
}
|
|
|