You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
243 lines
6.3 KiB
243 lines
6.3 KiB
package dao |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"reflect" |
|
"strconv" |
|
"time" |
|
|
|
"go-common/app/job/main/search/model" |
|
"go-common/library/log" |
|
"go-common/library/stat/prom" |
|
|
|
"gopkg.in/olivere/elastic.v5" |
|
) |
|
|
|
// BulkDatabusData 写入es数据来自databus. |
|
func (d *Dao) BulkDatabusData(c context.Context, attrs *model.Attrs, writeEntityIndex bool, bulkData ...model.MapData) (err error) { |
|
// TODO 需要兼容 |
|
var ( |
|
request elastic.BulkableRequest |
|
bulkRequest = d.ESPool[attrs.ESName].Bulk() |
|
//indexField = "" |
|
) |
|
//s := strings.Split(attrs.DataSQL.DataIndexSuffix, ";") |
|
//if len(s) >= 2 { |
|
// indexField = strings.Split(s[1], ":")[0] |
|
//} |
|
for _, b := range bulkData { |
|
var ( |
|
indexName string |
|
strID string |
|
) |
|
if name, ok := b["index_name"]; ok { |
|
if indexName, ok = name.(string); ok { |
|
delete(b, "index_name") |
|
} else { |
|
log.Error("dao.es.BulkDBData index_name err") |
|
continue |
|
} |
|
} else { |
|
if !writeEntityIndex { |
|
indexName, _ = b.Index(attrs) |
|
} else { |
|
_, indexName = b.Index(attrs) |
|
} |
|
} |
|
if id, ok := b["index_id"]; ok { |
|
if strID, ok = id.(string); !ok { |
|
log.Error("es.BulkDBData.strID(%v)", id) |
|
continue |
|
} |
|
} else { |
|
if strID, ok = b.StrID(attrs); !ok { |
|
log.Error("es.BulkDBData.strID") |
|
continue |
|
} |
|
} |
|
if indexName == "" { |
|
continue |
|
} |
|
for _, v := range attrs.DataSQL.DataIndexRemoveFields { |
|
delete(b, v) |
|
} |
|
if _, ok := b["index_field"]; ok { |
|
delete(b, "index_field") |
|
//delete(b, indexField) |
|
delete(b, "ctime") |
|
delete(b, "mtime") |
|
} |
|
for k := range b { |
|
if !d.Contain(k, attrs.DataSQL.DataIndexFormatFields) { |
|
delete(b, k) |
|
} |
|
} |
|
key := []string{} |
|
for k := range b { |
|
key = append(key, k) |
|
} |
|
for _, k := range key { |
|
customType, ok := attrs.DataSQL.DataIndexFormatFields[k] |
|
if ok { |
|
switch customType { |
|
case "ip": |
|
switch b[k].(type) { |
|
case float64: |
|
ipFormat := b.InetNtoA(int64(b[k].(float64))) |
|
b[k+"_format"] = ipFormat |
|
case int64: |
|
ipFormat := b.InetNtoA(b[k].(int64)) |
|
b[k+"_format"] = ipFormat |
|
} |
|
case "arr": |
|
var arr []int |
|
binaryAttributes := strconv.FormatInt(b[k].(int64), 2) |
|
for i := len(binaryAttributes) - 1; i >= 0; i-- { |
|
b := fmt.Sprintf("%c", binaryAttributes[i]) |
|
if b == "1" { |
|
arr = append(arr, len(binaryAttributes)-i) |
|
} |
|
} |
|
b[k+"_format"] = arr |
|
case "bin": |
|
var arr []int |
|
binaryAttributes := strconv.FormatInt(b[k].(int64), 2) |
|
for i := len(binaryAttributes) - 1; i >= 0; i-- { |
|
b := fmt.Sprintf("%c", binaryAttributes[i]) |
|
if b == "1" { |
|
arr = append(arr, len(binaryAttributes)-i) |
|
} |
|
} |
|
b[k] = arr |
|
case "workflow": |
|
if state, ok := b[k].(int64); ok { |
|
b["state"] = state & 15 |
|
b["business_state"] = state >> 4 & 15 |
|
delete(b, k) |
|
} |
|
case "time": |
|
if v, ok := b[k].(string); ok { |
|
if v == "0000-00-00 00:00:00" { |
|
b[k] = "0001-01-01 00:00:00" |
|
} |
|
} |
|
default: |
|
// as long as you happy |
|
} |
|
} |
|
} |
|
if strID == "" { |
|
request = elastic.NewBulkIndexRequest().Index(indexName).Type(attrs.Index.IndexType).Doc(b) |
|
} else { |
|
request = elastic.NewBulkUpdateRequest().Index(indexName).Type(attrs.Index.IndexType).Id(strID).Doc(b).DocAsUpsert(true) |
|
} |
|
//fmt.Println(request) |
|
bulkRequest.Add(request) |
|
} |
|
if bulkRequest.NumberOfActions() == 0 { |
|
return |
|
} |
|
now := time.Now() |
|
// prom.BusinessInfoCount.Add("redis:bulk:doc", int64(bulkRequest.NumberOfActions())) |
|
for i := 0; i < bulkRequest.NumberOfActions(); i++ { |
|
prom.BusinessInfoCount.Incr("redis:bulk:doc") |
|
} |
|
if _, err = bulkRequest.Do(c); err != nil { |
|
log.Error("appid(%s) bulk error(%v)", attrs.AppID, err) |
|
} |
|
prom.LibClient.Timing("redis:bulk", int64(time.Since(now)/time.Millisecond)) |
|
return |
|
} |
|
|
|
// BulkDBData 写入es数据来自db. |
|
func (d *Dao) BulkDBData(c context.Context, attrs *model.Attrs, writeEntityIndex bool, bulkData ...model.MapData) (err error) { |
|
var ( |
|
indexName string |
|
strID string |
|
request elastic.BulkableRequest |
|
bulkRequest = d.ESPool[attrs.ESName].Bulk() |
|
) |
|
for _, b := range bulkData { |
|
if name, ok := b["index_name"]; ok { |
|
if indexName, ok = name.(string); ok { |
|
delete(b, "index_name") |
|
} else { |
|
log.Error("dao.es.BulkDBData index_name err") |
|
continue |
|
} |
|
} else { |
|
if !writeEntityIndex { |
|
indexName, _ = b.Index(attrs) |
|
} else { |
|
_, indexName = b.Index(attrs) |
|
} |
|
} |
|
if id, ok := b["index_id"]; ok { |
|
if strID, ok = id.(string); !ok { |
|
log.Error("es.BulkDBData.strID(%v)", id) |
|
continue |
|
} |
|
} else { |
|
if strID, ok = b.StrID(attrs); !ok { |
|
log.Error("es.BulkDBData.strID") |
|
continue |
|
} |
|
} |
|
if indexName == "" || strID == "" { |
|
continue |
|
} |
|
//attr提供要去除掉的字段,不往ES中写 |
|
for _, v := range attrs.DataSQL.DataIndexRemoveFields { |
|
delete(b, v) |
|
} |
|
request = elastic.NewBulkUpdateRequest().Index(indexName).Type(attrs.Index.IndexType).Id(strID).Doc(b).DocAsUpsert(true).RetryOnConflict(3) |
|
//fmt.Println(request) |
|
bulkRequest.Add(request) |
|
} |
|
|
|
if bulkRequest.NumberOfActions() == 0 { |
|
// 注意这里request格式问题,会引起action为0 |
|
return |
|
} |
|
log.Info("insert number is %d", bulkRequest.NumberOfActions()) |
|
now := time.Now() |
|
// prom.BusinessInfoCount.Add("redis:bulk:doc", int64(bulkRequest.NumberOfActions())) |
|
for i := 0; i < bulkRequest.NumberOfActions(); i++ { |
|
prom.BusinessInfoCount.Incr("redis:bulk:doc") |
|
} |
|
if _, err = bulkRequest.Do(c); err != nil { |
|
log.Error("appid(%s) bulk error(%v)", attrs.AppID, err) |
|
} |
|
prom.LibClient.Timing("redis:bulk", int64(time.Since(now)/time.Millisecond)) |
|
return |
|
} |
|
|
|
// pingEsCluster ping es cluster |
|
func (d *Dao) pingESCluster(ctx context.Context) (err error) { |
|
//for name, client := range d.ESPool { |
|
// if _, _, err = client.Ping(d.c.Es[name].Addr[0]).Do(ctx); err != nil { |
|
// d.PromError("Es:Ping", "%s:Ping error(%v)", name, err) |
|
// return |
|
// } |
|
//} |
|
return |
|
} |
|
|
|
// Contain . |
|
func (d *Dao) Contain(obj interface{}, target interface{}) bool { |
|
targetValue := reflect.ValueOf(target) |
|
switch reflect.TypeOf(target).Kind() { |
|
case reflect.Slice, reflect.Array: |
|
for i := 0; i < targetValue.Len(); i++ { |
|
if targetValue.Index(i).Interface() == obj { |
|
return true |
|
} |
|
} |
|
case reflect.Map: |
|
if targetValue.MapIndex(reflect.ValueOf(obj)).IsValid() { |
|
return true |
|
} |
|
} |
|
return false |
|
}
|
|
|