You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
121 lines
3.7 KiB
121 lines
3.7 KiB
package dao |
|
|
|
import ( |
|
"context" |
|
"encoding/json" |
|
"fmt" |
|
"go-common/app/service/bbq/topic/api" |
|
"go-common/library/cache/redis" |
|
"go-common/library/log" |
|
"go-common/library/xstr" |
|
) |
|
|
|
const ( |
|
_selectExtension = "select svid, content from extension where svid in (%s)" |
|
_insertExtension = "insert ignore into extension (`svid`,`type`,`content`) values (?,?,?)" |
|
) |
|
const ( |
|
_videoExtensionKey = "ext:%d" |
|
) |
|
|
|
// RawVideoExtension 从mysql获取extension |
|
func (d *Dao) RawVideoExtension(ctx context.Context, svids []int64) (res map[int64]*api.VideoExtension, err error) { |
|
res = make(map[int64]*api.VideoExtension) |
|
if len(svids) == 0 { |
|
return |
|
} |
|
|
|
querySQL := fmt.Sprintf(_selectExtension, xstr.JoinInts(svids)) |
|
rows, err := d.db.Query(ctx, querySQL) |
|
if err != nil { |
|
log.Errorw(ctx, "log", "get extension error", "err", err, "sql", querySQL) |
|
return |
|
} |
|
defer rows.Close() |
|
|
|
var svid int64 |
|
var content string |
|
for rows.Next() { |
|
if err = rows.Scan(&svid, &content); err != nil { |
|
log.Errorw(ctx, "log", "get extension from mysql fail", "sql", querySQL) |
|
return |
|
} |
|
// 由于数据库中的数据和缓存中还不太一样,因此这里需要对db读取的数据进行额外处理 |
|
var extension api.Extension |
|
json.Unmarshal([]byte(content), &extension.TitleExtra) |
|
// TODO:check |
|
log.V(10).Infow(ctx, "log", "unmarshal content", "result", extension) |
|
data, _ := json.Marshal(&extension) |
|
res[svid] = &api.VideoExtension{Svid: svid, Extension: string(data)} |
|
} |
|
log.V(1).Infow(ctx, "log", "get extension", "req", svids, "rsp_size", len(res)) |
|
return |
|
} |
|
|
|
// CacheVideoExtension 从缓存获取extension |
|
func (d *Dao) CacheVideoExtension(ctx context.Context, svids []int64) (res map[int64]*api.VideoExtension, err error) { |
|
res = make(map[int64]*api.VideoExtension) |
|
|
|
conn := d.redis.Get(ctx) |
|
defer conn.Close() |
|
for _, svid := range svids { |
|
conn.Send("GET", fmt.Sprintf(_videoExtensionKey, svid)) |
|
} |
|
conn.Flush() |
|
var data string |
|
for _, svid := range svids { |
|
if data, err = redis.String(conn.Receive()); err != nil { |
|
if err == redis.ErrNil { |
|
err = nil |
|
} else { |
|
log.Errorv(ctx, log.KV("event", "redis_get"), log.KV("svid", svid)) |
|
} |
|
continue |
|
} |
|
extension := new(api.VideoExtension) |
|
extension.Svid = svid |
|
extension.Extension = data |
|
res[extension.Svid] = extension |
|
} |
|
log.Infov(ctx, log.KV("event", "redis_get"), log.KV("row_num", len(res))) |
|
return |
|
} |
|
|
|
// AddCacheVideoExtension 添加extension缓存 |
|
func (d *Dao) AddCacheVideoExtension(ctx context.Context, extensions map[int64]*api.VideoExtension) (err error) { |
|
conn := d.redis.Get(ctx) |
|
defer conn.Close() |
|
for svid, value := range extensions { |
|
conn.Send("SET", fmt.Sprintf(_videoExtensionKey, svid), value.Extension, "EX", d.topicExpire) |
|
} |
|
conn.Flush() |
|
for i := 0; i < len(extensions); i++ { |
|
conn.Receive() |
|
} |
|
log.Infov(ctx, log.KV("event", "redis_set"), log.KV("row_num", len(extensions))) |
|
return |
|
} |
|
|
|
// DelCacheVideoExtension 删除extension缓存 |
|
func (d *Dao) DelCacheVideoExtension(ctx context.Context, svid int64) { |
|
var key = fmt.Sprintf(_videoExtensionKey, svid) |
|
conn := d.redis.Get(ctx) |
|
defer conn.Close() |
|
conn.Do("DEL", key) |
|
} |
|
|
|
// InsertExtension 插入extension到db |
|
func (d *Dao) InsertExtension(ctx context.Context, svid int64, extensionType int64, extension *api.Extension) (rowsAffected int64, err error) { |
|
data, _ := json.Marshal(extension.TitleExtra) |
|
res, err := d.db.Exec(ctx, _insertExtension, svid, extensionType, string(data)) |
|
if err != nil { |
|
log.Errorw(ctx, "log", "insert extension db fail", "svid", svid, "extension_type", extensionType, "extension", extensionType) |
|
return |
|
} |
|
rowsAffected, tmpErr := res.RowsAffected() |
|
if tmpErr != nil { |
|
log.Warnw(ctx, "log", "get rows affected fail") |
|
} |
|
d.DelCacheVideoExtension(ctx, svid) |
|
return |
|
}
|
|
|