You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
329 lines
8.0 KiB
329 lines
8.0 KiB
package service |
|
|
|
import ( |
|
"context" |
|
"encoding/base64" |
|
"fmt" |
|
"hash/crc32" |
|
"strings" |
|
"time" |
|
|
|
"go-common/app/infra/canal/conf" |
|
"go-common/app/infra/canal/infoc" |
|
"go-common/app/infra/canal/model" |
|
"go-common/library/log" |
|
"go-common/library/queue/databus" |
|
|
|
"github.com/pkg/errors" |
|
"github.com/siddontang/go-mysql/canal" |
|
) |
|
|
|
var ( |
|
errInvalidAction = errors.New("invalid rows action") |
|
errInvalidUpdate = errors.New("invalid update rows event") |
|
errBinlogFormat = errors.New("binlog format failed") |
|
) |
|
|
|
type producer interface { |
|
Rows(int64) |
|
Send(context.Context, string, interface{}) error |
|
Close() |
|
Name() string |
|
} |
|
|
|
type databusP struct { |
|
group, topic string |
|
*databus.Databus |
|
} |
|
|
|
func (d *databusP) Rows(b int64) { |
|
// ignore |
|
} |
|
|
|
func (d *databusP) Send(c context.Context, key string, data interface{}) error { |
|
return d.Databus.Send(c, key, data) |
|
} |
|
|
|
func (d *databusP) Name() string { |
|
return fmt.Sprintf("databus:group(%s)topic(%s)", d.group, d.topic) |
|
} |
|
|
|
func (d *databusP) Close() { |
|
d.Databus.Close() |
|
} |
|
|
|
// infocP infoc producer |
|
type infocP struct { |
|
taskID string |
|
*infoc.Infoc |
|
} |
|
|
|
// Rows rows |
|
func (i *infocP) Rows(b int64) { |
|
i.Infoc.Rows(b) |
|
} |
|
|
|
// Send send msg |
|
func (i *infocP) Send(c context.Context, key string, data interface{}) error { |
|
return i.Infoc.Send(c, key, data) |
|
} |
|
|
|
// Name infoc name |
|
func (i *infocP) Name() string { |
|
return fmt.Sprintf("infoc(%s)", i.taskID) |
|
} |
|
|
|
// Close close infoc |
|
func (i *infocP) Close() { |
|
i.Infoc.Flush() |
|
i.Infoc.Close() |
|
} |
|
|
|
// Target databus target |
|
type Target struct { |
|
producers []producer |
|
eventLen uint32 |
|
events []chan *canal.RowsEvent |
|
db *conf.Database |
|
|
|
closed bool |
|
} |
|
|
|
// NewTarget new databus target |
|
func NewTarget(db *conf.Database) (t *Target) { |
|
t = &Target{ |
|
db: db, |
|
eventLen: uint32(len(db.CTables)), |
|
} |
|
t.events = make([]chan *canal.RowsEvent, t.eventLen) |
|
if db.Databus != nil { |
|
t.producers = append(t.producers, &databusP{group: db.Databus.Group, topic: db.Databus.Topic, Databus: databus.New(db.Databus)}) |
|
} |
|
if db.Infoc != nil { |
|
t.producers = append(t.producers, &infocP{taskID: db.Infoc.TaskID, Infoc: infoc.New(db.Infoc)}) |
|
} |
|
for i := 0; i < int(t.eventLen); i++ { |
|
ch := make(chan *canal.RowsEvent, 1024) |
|
t.events[i] = ch |
|
go t.proc(ch) |
|
} |
|
return |
|
} |
|
|
|
// compare check if the binlog event is needed |
|
// check the table name and schame |
|
func (t *Target) compare(schame, table, action string) bool { |
|
if t.db.Schema == schame { |
|
for _, ctb := range t.db.CTables { |
|
for _, tb := range ctb.Tables { |
|
if table == tb { |
|
for _, act := range ctb.OmitAction { |
|
if act == action { // NOTE: omit action |
|
return false |
|
} |
|
} |
|
return true |
|
} |
|
} |
|
} |
|
} |
|
return false |
|
} |
|
|
|
// send send rows event into event chans |
|
// and hash by table%concurrency. |
|
func (t *Target) send(ev *canal.RowsEvent) { |
|
yu := crc32.ChecksumIEEE([]byte(ev.Table.Name)) |
|
t.events[yu%t.eventLen] <- ev |
|
} |
|
|
|
func (t *Target) close() { |
|
for _, p := range t.producers { |
|
p.Close() |
|
} |
|
t.closed = true |
|
} |
|
|
|
// proc aync method for transfer the binlog data |
|
// when connection is bad, just refresh it with retry |
|
func (t *Target) proc(ch chan *canal.RowsEvent) { |
|
type pData struct { |
|
datas []*model.Data |
|
producer producer |
|
} |
|
var ( |
|
err error |
|
normalDatas []*pData |
|
errorDatas []*pData |
|
ev *canal.RowsEvent |
|
) |
|
for { |
|
if t.closed { |
|
return |
|
} |
|
if len(errorDatas) != 0 { |
|
normalDatas = errorDatas |
|
errorDatas = errorDatas[0:0] |
|
time.Sleep(time.Second) |
|
} else { |
|
ev = <-ch |
|
var datas []*model.Data |
|
if datas, err = makeDatas(ev, t.db.TableMap); err != nil { |
|
log.Error("makeData(%v) error(%v)", ev, err) |
|
continue |
|
} |
|
normalDatas = normalDatas[0:0] |
|
for _, p := range t.producers { |
|
p.Rows(int64(len(datas))) |
|
normalDatas = append(normalDatas, &pData{datas: datas, producer: p}) |
|
if stats != nil { |
|
stats.Incr("send_counter", p.Name(), ev.Table.Schema, tblReplacer.ReplaceAllString(ev.Table.Name, ""), ev.Action) |
|
} |
|
} |
|
} |
|
for _, pd := range normalDatas { |
|
var eDatas []*model.Data |
|
for _, data := range pd.datas { |
|
if err = pd.producer.Send(context.TODO(), data.Key, data); err != nil { |
|
// retry pub error data |
|
eDatas = append(eDatas, data) |
|
continue |
|
} |
|
log.Info("%s pub(key:%s, value:%+v) succeed", pd.producer.Name(), data.Key, data) |
|
} |
|
if len(eDatas) > 0 { |
|
errorDatas = append(errorDatas, &pData{datas: eDatas, producer: pd.producer}) |
|
if stats != nil && ev != nil { |
|
stats.Incr("retry_counter", pd.producer.Name(), ev.Table.Schema, tblReplacer.ReplaceAllString(ev.Table.Name, ""), ev.Action) |
|
} |
|
log.Error("%s scheme(%s) pub fail,add to retry", pd.producer.Name(), ev.Table.Schema) |
|
} |
|
} |
|
} |
|
} |
|
|
|
// makeDatas parse the binlog event and return the model.Data struct |
|
// a little bit cautious about the binlog type |
|
// if the type is update: |
|
// the old value and new value will alternate appearing in the event.Rows |
|
func makeDatas(e *canal.RowsEvent, tbMap map[string]*conf.Addition) (datas []*model.Data, err error) { |
|
var ( |
|
rowsLen = len(e.Rows) |
|
firstRowLen = len(e.Rows[0]) |
|
lenCol = len(e.Table.Columns) |
|
) |
|
if rowsLen == 0 || firstRowLen == 0 || firstRowLen != lenCol { |
|
log.Error("rows length(%d) first row length(%d) columns length(%d)", rowsLen, firstRowLen, lenCol) |
|
err = errBinlogFormat |
|
return |
|
} |
|
datas = make([]*model.Data, 0, rowsLen) |
|
switch e.Action { |
|
case canal.InsertAction, canal.DeleteAction: |
|
for _, values := range e.Rows { |
|
var keys []string |
|
data := &model.Data{ |
|
Action: e.Action, |
|
Table: e.Table.Name, |
|
// the first primary key as the kafka key |
|
Key: fmt.Sprint(values[0]), |
|
New: make(map[string]interface{}, lenCol), |
|
} |
|
for i, c := range e.Table.Columns { |
|
if c.IsUnsigned { |
|
values[i] = unsignIntCase(values[i]) |
|
} |
|
if strings.Contains(c.RawType, "binary") { |
|
if bs, ok := values[i].(string); ok { |
|
values[i] = base64.StdEncoding.EncodeToString([]byte(bs)) |
|
} |
|
} |
|
data.New[c.Name] = values[i] |
|
} |
|
// set kafka key and remove omit columns data |
|
addition, ok := tbMap[e.Table.Name] |
|
if ok { |
|
for _, omit := range addition.OmitField { |
|
delete(data.New, omit) |
|
} |
|
for _, primary := range addition.PrimaryKey { |
|
if _, ok := data.New[primary]; ok { |
|
keys = append(keys, fmt.Sprint(data.New[primary])) |
|
} |
|
} |
|
} |
|
if len(keys) != 0 { |
|
data.Key = strings.Join(keys, ",") |
|
} |
|
datas = append(datas, data) |
|
} |
|
case canal.UpdateAction: |
|
if rowsLen%2 != 0 { |
|
err = errInvalidUpdate |
|
return |
|
} |
|
for i := 0; i < rowsLen; i += 2 { |
|
var keys []string |
|
data := &model.Data{ |
|
Action: e.Action, |
|
Table: e.Table.Name, |
|
// the first primary key as the kafka key |
|
Key: fmt.Sprint(e.Rows[i][0]), |
|
Old: make(map[string]interface{}, lenCol), |
|
New: make(map[string]interface{}, lenCol), |
|
} |
|
for j, c := range e.Table.Columns { |
|
if c.IsUnsigned { |
|
e.Rows[i][j] = unsignIntCase(e.Rows[i][j]) |
|
e.Rows[i+1][j] = unsignIntCase(e.Rows[i+1][j]) |
|
} |
|
if strings.Contains(c.RawType, "binary") { |
|
if bs, ok := e.Rows[i][j].(string); ok { |
|
e.Rows[i][j] = base64.StdEncoding.EncodeToString([]byte(bs)) |
|
} |
|
if bs, ok := e.Rows[i+1][j].(string); ok { |
|
e.Rows[i+1][j] = base64.StdEncoding.EncodeToString([]byte(bs)) |
|
} |
|
} |
|
data.Old[c.Name] = e.Rows[i][j] |
|
data.New[c.Name] = e.Rows[i+1][j] |
|
} |
|
// set kafka key and remove omit columns data |
|
addition, ok := tbMap[e.Table.Name] |
|
if ok { |
|
for _, omit := range addition.OmitField { |
|
delete(data.New, omit) |
|
delete(data.Old, omit) |
|
} |
|
for _, primary := range addition.PrimaryKey { |
|
if _, ok := data.New[primary]; ok { |
|
keys = append(keys, fmt.Sprint(data.New[primary])) |
|
} |
|
} |
|
} |
|
if len(keys) != 0 { |
|
data.Key = strings.Join(keys, ",") |
|
} |
|
datas = append(datas, data) |
|
} |
|
default: |
|
err = errInvalidAction |
|
} |
|
return |
|
} |
|
|
|
func unsignIntCase(i interface{}) (v interface{}) { |
|
switch si := i.(type) { |
|
case int8: |
|
v = uint8(si) |
|
case int16: |
|
v = uint16(si) |
|
case int32: |
|
v = uint32(si) |
|
case int64: |
|
v = uint64(si) |
|
default: |
|
v = i |
|
} |
|
return |
|
}
|
|
|