You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
267 lines
8.3 KiB
267 lines
8.3 KiB
package dao |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"time" |
|
|
|
pb "go-common/app/service/main/coin/api" |
|
"go-common/library/database/sql" |
|
"go-common/library/log" |
|
) |
|
|
|
const ( |
|
_dayOfEachMonth = 25 |
|
_sharding = 50 |
|
_getTotalCoins = "SELECT aid,type,SUM(multiply) FROM coin_member_%d WHERE mid = ? GROUP BY aid,type ORDER BY cid desc LIMIT 20" |
|
_getTotalCoinsByMid = "SELECT IFNULL(SUM(multiply),0) FROM coin_member_%d WHERE mid=? AND aid=? AND type=?" |
|
_getTotalCoinsByMidAndUpMid = "SELECT IFNULL(coin_count,0) FROM coin_user_count_%02d WHERE mid=? AND up_mid=?" |
|
_getCoinCount = "SELECT IFNULL(count,0) FROM coin_count WHERE aid=? AND type=?" |
|
_updateCoinCount = "INSERT INTO coin_count(aid, type, `count`, ctime, mtime) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE count=count+?,mtime=?" |
|
_insertCoinArchive = "INSERT LOW_PRIORITY INTO coin_archive_%d (aid,type,mid,timestamp,multiply) VALUES (?,?,?,?,?)" |
|
_insertCoinMember = "INSERT LOW_PRIORITY INTO coin_member_%d (aid,type,mid,timestamp,multiply,up_mid) VALUES (?,?,?,?,?,?)" |
|
_updateCoinMemberCount = "INSERT INTO coin_user_count_%02d(mid, up_mid, coin_count, ctime) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE coin_count=coin_count+?" |
|
_getMemberVideoList = "SELECT aid,ip,timestamp,multiply FROM coin_member_%d WHERE mid = ? AND type=? AND timestamp > ? ORDER BY cid DESC LIMIT ?" |
|
// coin settle |
|
_hitSettlePeriod = "SELECT id FROM coin_settle_period WHERE to_year=? AND to_month=?" |
|
_updateSettleBD = "UPDATE coin_settle_%d SET exp_sub=?,description=?,itime=?,mtime=? WHERE aid=? and type=?" // update for bigdata |
|
_updateArchiveCoinsBD = "UPDATE coin_count SET count=?, mtime=? WHERE aid=? AND type=?" // update for bigdata |
|
) |
|
|
|
func (dao *Dao) midHit(mid int64) int64 { |
|
return mid % _sharding |
|
} |
|
|
|
func (dao *Dao) aidHit(aid int64) int64 { |
|
return aid % _sharding |
|
} |
|
|
|
func (dao *Dao) hitCoinPeriod(c context.Context, now time.Time) (id int64, err error) { |
|
year, month, day := now.Year(), now.Month(), now.Day() |
|
if now.Day() < _dayOfEachMonth { |
|
err = fmt.Errorf("opreation from bigdata must after the %dth of each month, today is (%d)", _dayOfEachMonth, day) |
|
log.Error("%v", err) |
|
return |
|
} |
|
row := dao.coin.QueryRow(c, _hitSettlePeriod, year, month) |
|
if err != nil { |
|
log.Error("dao.hitCoinPeriodStmt.QueryRow(%d, %d) error(%v)", year, month, err) |
|
return |
|
} |
|
if err = row.Scan(&id); err != nil { |
|
if err == sql.ErrNoRows { |
|
err = nil |
|
} else { |
|
log.Error("row.Scan error(%v)", err) |
|
} |
|
return |
|
} |
|
if id == 0 { |
|
err = fmt.Errorf("zero id at(%s)", now.String()) |
|
log.Error("%v", err) |
|
} |
|
return |
|
} |
|
|
|
// UpdateCoinSettleBD update table coin_settle_%d. |
|
func (dao *Dao) UpdateCoinSettleBD(c context.Context, aid, tp, expSub int64, describe string, now time.Time) (affect int64, err error) { |
|
id, err := dao.hitCoinPeriod(c, now) |
|
if err != nil { |
|
return |
|
} |
|
sqlStr := fmt.Sprintf(_updateSettleBD, id) |
|
result, err := dao.coin.Exec(c, sqlStr, expSub, describe, now, now, aid, tp) |
|
if err != nil { |
|
log.Error("dao.coin.Exec(%s, %d, %s, %v, %v, %d) error(%v)", sqlStr, expSub, describe, now, now, aid, err) |
|
return |
|
} |
|
affect, err = result.LastInsertId() |
|
return |
|
} |
|
|
|
// CoinList return video list of coin added in one month |
|
func (dao *Dao) CoinList(c context.Context, mid, tp, ts, size int64) (rcs []*pb.ModelList, err error) { |
|
var rows *sql.Rows |
|
if rows, err = dao.coin.Query(c, fmt.Sprintf(_getMemberVideoList, dao.midHit(mid)), mid, tp, ts, size); err != nil { |
|
log.Error("dao.getMemberVideoList.Query() error(%v)", err) |
|
return |
|
} |
|
defer rows.Close() |
|
for rows.Next() { |
|
var rc = &pb.ModelList{} |
|
if err = rows.Scan(&rc.Aid, &rc.IP, &rc.Ts, &rc.Number); err != nil { |
|
log.Error("row.Scan(),error(%v)", err) |
|
rcs = nil |
|
return |
|
} |
|
rcs = append(rcs, rc) |
|
} |
|
return |
|
} |
|
|
|
// CoinsAddedByMid get coin added by mid of aid&tp. |
|
func (dao *Dao) CoinsAddedByMid(c context.Context, mid, aid, tp int64) (added int64, err error) { |
|
row := dao.coin.QueryRow(c, fmt.Sprintf(_getTotalCoinsByMid, dao.midHit(mid)), mid, aid, tp) |
|
if err = row.Scan(&added); err != nil { |
|
if err == sql.ErrNoRows { |
|
err = nil |
|
} else { |
|
PromError("mysql:CoinsAddedByMid") |
|
log.Errorv(c, |
|
log.KV("log", "row.Scan"), |
|
log.KV("err", err), |
|
log.KV("mid", mid), |
|
log.KV("aid", aid), |
|
) |
|
} |
|
} |
|
return |
|
} |
|
|
|
// AddedCoins get coins added to up_mid. |
|
func (dao *Dao) AddedCoins(c context.Context, mid, upMid int64) (added int64, err error) { |
|
row := dao.coin.QueryRow(c, fmt.Sprintf(_getTotalCoinsByMidAndUpMid, dao.midHit(mid)), mid, upMid) |
|
if err = row.Scan(&added); err != nil { |
|
if err == sql.ErrNoRows { |
|
err = nil |
|
} else { |
|
log.Error("row.Scan error(%v)", err) |
|
} |
|
} |
|
return |
|
} |
|
|
|
// UserCoinsAdded get user coin added. |
|
func (dao *Dao) UserCoinsAdded(c context.Context, mid int64) (addeds map[int64]int64, err error) { |
|
var rows *sql.Rows |
|
addeds = make(map[int64]int64) |
|
if rows, err = dao.coin.Query(c, fmt.Sprintf(_getTotalCoins, dao.midHit(mid)), mid); err != nil { |
|
log.Errorv(c, |
|
log.KV("log", "dao.getTotalCoins"), |
|
log.KV("err", err), |
|
log.KV("mid", mid), |
|
) |
|
PromError("db:UserCoinsAdded") |
|
return |
|
} |
|
defer rows.Close() |
|
for rows.Next() { |
|
var ( |
|
added, aid, tp int64 |
|
) |
|
if err = rows.Scan(&aid, &tp, &added); err != nil { |
|
log.Errorv(c, |
|
log.KV("log", "row.Scan"), |
|
log.KV("err", err), |
|
log.KV("mid", mid), |
|
) |
|
PromError("db:UserCoinsAdded") |
|
addeds = nil |
|
return |
|
} |
|
addeds[hashField(aid, tp)] = added |
|
} |
|
return |
|
} |
|
|
|
// InsertCoinArchive . |
|
func (dao *Dao) InsertCoinArchive(c context.Context, aid, tp, mid, timestamp, multiply int64) (err error) { |
|
if _, err = dao.coin.Exec(c, fmt.Sprintf(_insertCoinArchive, dao.aidHit(aid)), aid, tp, mid, timestamp, multiply); err != nil { |
|
log.Errorv(c, |
|
log.KV("log", "coin.Exec"), |
|
log.KV("err", err), |
|
log.KV("mid", mid), |
|
log.KV("aid", aid), |
|
) |
|
PromError("db:InsertCoinArchive") |
|
} |
|
return |
|
} |
|
|
|
// InsertCoinMember . |
|
func (dao *Dao) InsertCoinMember(c context.Context, aid, tp, mid, timestamp, multiply int64, upMid int64) (err error) { |
|
if _, err = dao.coin.Exec(c, fmt.Sprintf(_insertCoinMember, dao.midHit(mid)), aid, tp, mid, timestamp, multiply, upMid); err != nil { |
|
log.Errorv(c, |
|
log.KV("log", "coin.Exec"), |
|
log.KV("err", err), |
|
log.KV("mid", mid), |
|
log.KV("aid", aid), |
|
) |
|
PromError("db:InsertCoinMember") |
|
} |
|
return |
|
} |
|
|
|
// UpdateItemCoinCount update coin_count. |
|
func (dao *Dao) UpdateItemCoinCount(c context.Context, aid, tp, count int64) (err error) { |
|
now := time.Now() |
|
if _, err = dao.coin.Exec(c, _updateCoinCount, aid, tp, count, now, now, count, now); err != nil { |
|
log.Errorv(c, |
|
log.KV("log", "UpdateItemCoinCount("), |
|
log.KV("err", err), |
|
log.KV("count", count), |
|
log.KV("aid", aid), |
|
) |
|
PromError("db:UpdateItemCoinCount") |
|
} |
|
return |
|
} |
|
|
|
// RawItemCoin get count by aid. |
|
func (dao *Dao) RawItemCoin(c context.Context, aid, tp int64) (count int64, err error) { |
|
row := dao.coin.QueryRow(c, _getCoinCount, aid, tp) |
|
if err = row.Scan(&count); err != nil { |
|
if err == sql.ErrNoRows { |
|
err = nil |
|
} else { |
|
log.Errorv(c, |
|
log.KV("log", "row.Scan"), |
|
log.KV("err", err), |
|
) |
|
PromError("db:RawItemCoin") |
|
} |
|
} |
|
return |
|
} |
|
|
|
// UpdateCoinMemberCount for archive |
|
func (dao *Dao) UpdateCoinMemberCount(c context.Context, mid, upMid, count int64) (err error) { |
|
if _, err = dao.coin.Exec(c, fmt.Sprintf(_updateCoinMemberCount, dao.midHit(mid)), mid, upMid, count, time.Now(), count); err != nil { |
|
log.Errorv(c, |
|
log.KV("log", "updateCoinMemberCount"), |
|
log.KV("mid", mid), |
|
log.KV("upmid", upMid), |
|
log.KV("count", count), |
|
log.KV("err", err), |
|
) |
|
PromError("db:UpdateCoinMemberCount") |
|
} |
|
return |
|
} |
|
|
|
// BeginTran begin a tx. |
|
func (dao *Dao) BeginTran(c context.Context) (t *sql.Tx, err error) { |
|
t, err = dao.coin.Begin(c) |
|
if err != nil { |
|
PromError("db:BeginTran") |
|
log.Errorv(c, |
|
log.KV("log", "d.BeginTran"), |
|
log.KV("err", err), |
|
) |
|
} |
|
return |
|
} |
|
|
|
// UpdateItemCoins update table coin_archive |
|
func (dao *Dao) UpdateItemCoins(c context.Context, aid, tp, coins int64, now time.Time) (affect int64, err error) { |
|
if err != nil { |
|
return |
|
} |
|
result, err := dao.coin.Exec(c, _updateArchiveCoinsBD, coins, now, aid, tp) |
|
if err != nil { |
|
log.Error("dao.coin.Exec(%s, %d, %s, %d) error(%v)", _updateArchiveCoinsBD, coins, now, aid, err) |
|
return |
|
} |
|
affect, err = result.RowsAffected() |
|
return |
|
}
|
|
|