You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
158 lines
4.4 KiB
158 lines
4.4 KiB
package dao |
|
|
|
import ( |
|
"context" |
|
"go-common/app/service/bbq/search/api/grpc/v1" |
|
"go-common/app/service/bbq/search/conf" |
|
"go-common/library/log" |
|
"io/ioutil" |
|
"math/rand" |
|
"net/http" |
|
"strconv" |
|
"strings" |
|
"time" |
|
|
|
"github.com/json-iterator/go" |
|
|
|
"gopkg.in/olivere/elastic.v5" |
|
) |
|
|
|
const ( |
|
_bbqEsName = "bbq" |
|
_videoIndex = "video" |
|
_videoType = "video_info" |
|
_videoMapping = ` |
|
{ |
|
"settings":{ |
|
"number_of_shards":5, |
|
"number_of_replicas":2, |
|
"index":{ |
|
"analysis.analyzer.default.type":"ik_smart" |
|
} |
|
} |
|
} |
|
` |
|
) |
|
|
|
//SaveVideo 保存视频信息 |
|
func (d *Dao) SaveVideo(c context.Context, videos *v1.SaveVideoRequest) (err error) { |
|
d.createESIndex(_bbqEsName, _videoIndex, _videoMapping) |
|
bulkRequest := d.esPool[_bbqEsName].Bulk() |
|
for _, v := range videos.List { |
|
request := elastic.NewBulkUpdateRequest().Index(_videoIndex).Type(_videoType).Id(strconv.Itoa(int(v.SVID))).Doc(v).DocAsUpsert(true) |
|
bulkRequest.Add(request) |
|
} |
|
if _, err = bulkRequest.Do(c); err != nil { |
|
log.Error("save es [%d] err(%v)", 1, err) |
|
} |
|
return |
|
} |
|
|
|
//RecVideoDataElastic 获取视频信息 |
|
func (d *Dao) RecVideoDataElastic(c context.Context, query elastic.Query, script *elastic.ScriptSort, from, size int) (total int64, list []*v1.RecVideoInfo, err error) { |
|
search := d.esPool[_bbqEsName].Search().Index(_videoIndex).Type(_videoType) |
|
if query != nil { |
|
search.Query(query) |
|
} |
|
if script != nil { |
|
search.SortBy(script) |
|
} |
|
log.Error("start time(%d)", time.Now().UnixNano()) |
|
res, err := search.From(from).Size(size).Timeout(d.c.Es[_bbqEsName].Timeout).Do(c) |
|
if err != nil { |
|
log.Error("video search es (%s) err(%v)", _bbqEsName, err) |
|
return |
|
} |
|
log.Error("do time(%d)", time.Now().UnixNano()) |
|
total = res.TotalHits() |
|
list = []*v1.RecVideoInfo{} |
|
for _, value := range res.Hits.Hits { |
|
tmp := new(v1.RecVideoInfo) |
|
byte, _ := jsoniter.Marshal(value.Source) |
|
jsoniter.Unmarshal(byte, tmp) |
|
if value.Score != nil { |
|
tmp.ESScore = float64(*value.Score) |
|
} |
|
if value.Sort != nil { |
|
for _, v := range value.Sort { |
|
tmp.CustomScore = append(tmp.CustomScore, v.(float64)) |
|
} |
|
} |
|
list = append(list, tmp) |
|
} |
|
return |
|
} |
|
|
|
//VideoData 获取视频信息 |
|
func (d *Dao) VideoData(c context.Context, query elastic.Query, from, size int) (total int64, list []*v1.VideoESInfo, err error) { |
|
res, err := d.esPool[_bbqEsName].Search().Index(_videoIndex).Type(_videoType).Query(query).From(from).Size(size).Timeout(d.c.Es[_bbqEsName].Timeout).Do(c) |
|
if err != nil { |
|
log.Error("video search es (%s) err(%v)", _bbqEsName, err) |
|
return |
|
} |
|
total = res.TotalHits() |
|
list = []*v1.VideoESInfo{} |
|
for _, value := range res.Hits.Hits { |
|
tmp := new(v1.VideoESInfo) |
|
byte, _ := jsoniter.Marshal(value.Source) |
|
jsoniter.Unmarshal(byte, tmp) |
|
list = append(list, tmp) |
|
} |
|
return |
|
} |
|
|
|
//ESVideoData 获取视频信息 |
|
func (d *Dao) ESVideoData(c context.Context, query string) (total int64, list []*v1.RecVideoInfo, err error) { |
|
i := rand.Intn(len(conf.Conf.Es["bbq"].Addr)) |
|
req, err := http.NewRequest("POST", conf.Conf.Es["bbq"].Addr[i]+"/video/_search", strings.NewReader(query)) |
|
if err != nil { |
|
log.Error("conn es err(%v)", err) |
|
return |
|
} |
|
req.Header.Set("Content-Type", "application/json; charset=UTF-8") |
|
j := rand.Intn(len(d.httpClient)) |
|
res, err := d.httpClient[j].Do(req) |
|
if err != nil || res.StatusCode != 200 { |
|
log.Error("conn es http err(%v)", err) |
|
return |
|
} |
|
body, err := ioutil.ReadAll(res.Body) |
|
if err != nil { |
|
log.Error("es read body err(%v)", err) |
|
return |
|
} |
|
log.Infov(c, log.KV("query", query), log.KV("response", string(body))) |
|
|
|
videos := new(elastic.SearchResult) |
|
jsoniter.Unmarshal(body, &videos) |
|
if videos == nil { |
|
return |
|
} |
|
|
|
list = make([]*v1.RecVideoInfo, 0) |
|
total = videos.TotalHits() |
|
for _, value := range videos.Hits.Hits { |
|
tmp := new(v1.RecVideoInfo) |
|
byte, _ := jsoniter.Marshal(value.Source) |
|
jsoniter.Unmarshal(byte, tmp) |
|
list = append(list, tmp) |
|
} |
|
return |
|
} |
|
|
|
// DelVideoDataBySVID 根据svid删除视频 |
|
func (d *Dao) DelVideoDataBySVID(c context.Context, svid int64) (err error) { |
|
i := rand.Intn(len(conf.Conf.Es["bbq"].Addr)) |
|
url := conf.Conf.Es["bbq"].Addr[i] + "/video/video_info/" + strconv.Itoa(int(svid)) |
|
req, err := http.NewRequest("DELETE", url, nil) |
|
if err != nil { |
|
log.Error("conn es err(%v)", err) |
|
return |
|
} |
|
j := rand.Intn(len(d.httpClient)) |
|
res, err := d.httpClient[j].Do(req) |
|
if err != nil || res.StatusCode != 200 { |
|
log.Error("conn read body err(%v)", err) |
|
} |
|
return |
|
}
|
|
|