You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
208 lines
5.8 KiB
208 lines
5.8 KiB
package business |
|
|
|
import ( |
|
"context" |
|
"encoding/json" |
|
"fmt" |
|
"time" |
|
|
|
"go-common/app/job/main/search/conf" |
|
"go-common/app/job/main/search/dao" |
|
"go-common/app/job/main/search/model" |
|
xsql "go-common/library/database/sql" |
|
"go-common/library/log" |
|
"go-common/library/queue/databus" |
|
) |
|
|
|
// AegisResource single table consume databus. |
|
type AegisResource struct { |
|
d *dao.Dao |
|
c *conf.Config |
|
appid string |
|
attrs *model.Attrs |
|
db *xsql.DB |
|
dtb *databus.Databus |
|
offset *model.LoopOffset |
|
mapData []model.MapData |
|
commits map[int32]*databus.Message |
|
} |
|
|
|
// NewAppDatabus . |
|
func NewAegisResource(d *dao.Dao, appid string, c *conf.Config) (a *AegisResource) { |
|
a = &AegisResource{ |
|
d: d, |
|
c: c, |
|
appid: appid, |
|
attrs: d.AttrPool[appid], |
|
offset: &model.LoopOffset{}, |
|
mapData: []model.MapData{}, |
|
db: d.DBPool[d.AttrPool[appid].DBName], |
|
dtb: d.DatabusPool[d.AttrPool[appid].Databus.Databus], |
|
commits: make(map[int32]*databus.Message), |
|
} |
|
return |
|
} |
|
|
|
// Business return business. |
|
func (a *AegisResource) Business() string { |
|
return a.attrs.Business |
|
} |
|
|
|
// InitIndex init index. |
|
func (a *AegisResource) InitIndex(c context.Context) { |
|
if aliases, err := a.d.GetAliases(a.attrs.ESName, a.attrs.Index.IndexAliasPrefix); err != nil { |
|
a.d.InitIndex(c, nil, a.attrs.ESName, a.attrs.Index.IndexAliasPrefix, a.attrs.Index.IndexEntityPrefix, a.attrs.Index.IndexMapping) |
|
} else { |
|
a.d.InitIndex(c, aliases, a.attrs.ESName, a.attrs.Index.IndexAliasPrefix, a.attrs.Index.IndexAliasPrefix, a.attrs.Index.IndexMapping) |
|
} |
|
} |
|
|
|
// InitOffset insert init value to offset. |
|
func (a *AegisResource) InitOffset(c context.Context) { |
|
a.d.InitOffset(c, a.offset, a.attrs, []string{}) |
|
nowFormat := time.Now().Format("2006-01-02 15:04:05") |
|
a.offset.SetOffset(0, nowFormat) |
|
} |
|
|
|
// Offset get offset. |
|
func (a *AegisResource) Offset(c context.Context) { |
|
for { |
|
offset, err := a.d.Offset(c, a.appid, a.attrs.Table.TablePrefix) |
|
if err != nil { |
|
log.Error("a.d.Offset error(%v)", err) |
|
time.Sleep(time.Second * 3) |
|
continue |
|
} |
|
a.offset.SetReview(offset.ReviewID, offset.ReviewTime) |
|
a.offset.SetOffset(offset.OffsetID(), offset.OffsetTime()) |
|
break |
|
} |
|
} |
|
|
|
// SetRecover set recover |
|
func (a *AegisResource) SetRecover(c context.Context, recoverID int64, recoverTime string, i int) { |
|
a.offset.SetRecoverOffset(recoverID, recoverTime) |
|
} |
|
|
|
// IncrMessages . |
|
func (a *AegisResource) IncrMessages(c context.Context) (length int, err error) { |
|
ticker := time.NewTicker(time.Duration(time.Millisecond * time.Duration(a.attrs.Databus.Ticker))) |
|
defer ticker.Stop() |
|
for { |
|
select { |
|
case msg, ok := <-a.dtb.Messages(): |
|
if !ok { |
|
log.Error("databus: %s binlog consumer exit!!!", a.attrs.Databus) |
|
break |
|
} |
|
m := &model.Message{} |
|
a.commits[msg.Partition] = msg |
|
if err = json.Unmarshal(msg.Value, m); err != nil { |
|
log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err) |
|
continue |
|
} |
|
if m.Table == "resource" || m.Table == "resource_result" || m.Table == "net_flow_resource" { |
|
if m.Action == "insert" || m.Action == "update" { |
|
var parseMap map[string]interface{} |
|
parseMap, err = a.d.JSON2map(m.New) |
|
if _, ok := parseMap["rid"]; ok { |
|
parseMap["_id"] = parseMap["rid"] |
|
parseMap["id"] = parseMap["rid"] |
|
} |
|
if _, sok := parseMap["state"]; m.Table != "resource_result" && sok { |
|
delete(parseMap, "state") |
|
} |
|
log.Info(fmt.Sprintf("%v: %+v", a.attrs.AppID, parseMap)) |
|
if err != nil { |
|
log.Error("a.JSON2map error(%v)", err) |
|
continue |
|
} |
|
a.mapData = append(a.mapData, parseMap) |
|
} |
|
} |
|
if len(a.mapData) < a.attrs.Databus.AggCount { |
|
continue |
|
} |
|
case <-ticker.C: |
|
} |
|
break |
|
} |
|
if len(a.mapData) > 0 { |
|
a.mapData, err = a.d.ExtraData(c, a.mapData, a.attrs, "dtb", []string{}) |
|
} |
|
length = len(a.mapData) |
|
return |
|
} |
|
|
|
// AllMessages . |
|
func (a *AegisResource) AllMessages(c context.Context) (length int, err error) { |
|
rows, err := a.db.Query(c, a.attrs.DataSQL.SQLByID, a.offset.OffsetID, a.attrs.Other.Size) |
|
log.Info("appid: %s allMessages Current offsetID: %d", a.appid, a.offset.OffsetID) |
|
if err != nil { |
|
log.Error("AllMessages db.Query error(%v)", err) |
|
return |
|
} |
|
defer rows.Close() |
|
for rows.Next() { |
|
item, row := dao.InitMapData(a.attrs.DataSQL.DataIndexFields) |
|
if err = rows.Scan(row...); err != nil { |
|
log.Error("AllMessages rows.Scan() error(%v)", err) |
|
return |
|
} |
|
a.mapData = append(a.mapData, item) |
|
} |
|
if len(a.mapData) > 0 { |
|
a.mapData, err = a.d.ExtraData(c, a.mapData, a.attrs, "db", []string{}) |
|
// offset |
|
if v, ok := a.mapData[len(a.mapData)-1]["_id"]; ok && v != nil { |
|
if v2, ok := v.(interface{}); ok { |
|
a.offset.SetTempOffset((v2).(int64), "") |
|
a.offset.SetRecoverTempOffset((v2).(int64), "") |
|
} else { |
|
log.Error("dtb.all._id interface error") |
|
} |
|
} else { |
|
log.Error("dtb.all._id nil error") |
|
} |
|
} |
|
length = len(a.mapData) |
|
return |
|
} |
|
|
|
// BulkIndex . |
|
func (a *AegisResource) BulkIndex(c context.Context, start, end int, writeEntityIndex bool) (err error) { |
|
partData := a.mapData[start:end] |
|
if a.c.Business.Index { |
|
err = a.d.BulkDBData(c, a.attrs, writeEntityIndex, partData...) |
|
} else { |
|
err = a.d.BulkDatabusData(c, a.attrs, writeEntityIndex, partData...) |
|
} |
|
return |
|
} |
|
|
|
// Commit commit offset. |
|
func (a *AegisResource) Commit(c context.Context) (err error) { |
|
if a.c.Business.Index { |
|
err = a.d.CommitOffset(c, a.offset, a.attrs.AppID, a.attrs.Table.TablePrefix) |
|
} else { |
|
for k, cos := range a.commits { |
|
if err = cos.Commit(); err != nil { |
|
log.Error("appid(%s) commit error(%v)", a.attrs.AppID, err) |
|
continue |
|
} |
|
delete(a.commits, k) |
|
} |
|
} |
|
a.mapData = []model.MapData{} |
|
return |
|
} |
|
|
|
// Sleep interval duration. |
|
func (a *AegisResource) Sleep(c context.Context) { |
|
time.Sleep(time.Second * time.Duration(a.attrs.Other.Sleep)) |
|
} |
|
|
|
// Size return size. |
|
func (a *AegisResource) Size(c context.Context) int { |
|
return a.attrs.Other.Size |
|
}
|
|
|