You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
229 lines
6.4 KiB
229 lines
6.4 KiB
package service |
|
|
|
import ( |
|
"context" |
|
"strconv" |
|
"time" |
|
|
|
"go-common/app/admin/main/videoup/model/archive" |
|
"go-common/app/admin/main/videoup/model/message" |
|
"go-common/app/admin/main/videoup/model/up" |
|
"go-common/library/log" |
|
) |
|
|
|
func (s *Service) busFirstRound(aid int64, fn, encodePurpose string, videoDesign *message.VideoDesign, status int16, encoding int8, regionID, typeID int16, fans int64, adminChange bool) (err error) { |
|
var msg = &message.Videoup{ |
|
Route: message.RouteFirstRound, |
|
Fans: fans, |
|
Timestamp: time.Now().Unix(), |
|
Aid: aid, |
|
Filename: fn, |
|
Status: status, |
|
Xcode: encoding, |
|
EncodeRegionID: regionID, |
|
VideoDesign: videoDesign, |
|
AdminChange: adminChange, |
|
EncodeTypeID: typeID, |
|
} |
|
if len(encodePurpose) != 0 { |
|
msg.EncodePurpose = encodePurpose |
|
} |
|
log.Info("filename(%s) start to send firstRound(%+v) to databus", fn, msg) |
|
if err = s.busSendMsg(msg); err != nil { |
|
s.msgCh <- msg |
|
} |
|
return |
|
} |
|
|
|
func (s *Service) busUGCFirstRound(aid int64, fn, encodePurpose string, videoDesign *message.VideoDesign, status int16, encoding int8, regionID, typeID int16, fans int64, adminChange bool) (err error) { |
|
var msg = &message.Videoup{ |
|
Route: message.RouteUGCFirstRound, |
|
Fans: fans, |
|
Timestamp: time.Now().Unix(), |
|
Aid: aid, |
|
Filename: fn, |
|
Status: status, |
|
Xcode: encoding, |
|
EncodeRegionID: regionID, |
|
VideoDesign: videoDesign, |
|
AdminChange: adminChange, |
|
EncodeTypeID: typeID, |
|
} |
|
if len(encodePurpose) != 0 { |
|
msg.EncodePurpose = encodePurpose |
|
} |
|
log.Info("filename(%s) start to send UGCFirstRound(%+v) to databus", fn, msg) |
|
if err = s.busSendMsg(msg); err != nil { |
|
s.msgCh <- msg |
|
} |
|
return |
|
} |
|
|
|
func (s *Service) busArchiveForceSync(aid int64) (err error) { |
|
var msg = &message.Videoup{ |
|
Route: message.RouteForceSync, |
|
Timestamp: time.Now().Unix(), |
|
Aid: aid, |
|
} |
|
log.Info("aid(%d) send busArchiveForceSync to databus ", aid) |
|
if err = s.busSendMsg(msg); err != nil { |
|
s.msgCh <- msg |
|
} |
|
return |
|
} |
|
|
|
func (s *Service) busModifyArchive(aid int64, aChange, vChange bool) (err error) { |
|
var msg = &message.Videoup{ |
|
Route: message.RouteModifyArchive, |
|
Timestamp: time.Now().Unix(), |
|
Aid: aid, |
|
EditArchive: aChange, |
|
EditVideo: vChange, |
|
} |
|
log.Info("aid(%d) send modifyArchive to databus by EditArchive(%v) EditVideo(%v)", aid, aChange, vChange) |
|
if err = s.busSendMsg(msg); err != nil { |
|
s.msgCh <- msg |
|
} |
|
return |
|
} |
|
|
|
func (s *Service) busSecondRound(aid, missionID int64, notify bool, email, changeTypeID, changeCopyright, changeTitle, ChangeCover bool, fromList string, ap *archive.ArcParam) (err error) { |
|
sendEmail := true |
|
if ap != nil { |
|
sendEmail = !ap.NoEmail |
|
} |
|
var msg = &message.Videoup{ |
|
Route: message.RouteSecondRound, |
|
Aid: aid, |
|
Notify: notify, |
|
MissionID: missionID, |
|
Timestamp: time.Now().Unix(), |
|
AdminChange: email, |
|
ChangeTypeID: changeTypeID, |
|
ChangeCopyright: changeCopyright, |
|
ChangeTitle: changeTitle, |
|
ChangeCover: ChangeCover, |
|
FromList: fromList, |
|
SendEmail: sendEmail, |
|
} |
|
log.Info("aid(%d) start to send secondRound msg(%+v) to databus", aid, msg) |
|
if err = s.busSendMsg(msg); err != nil { |
|
s.msgCh <- msg |
|
} |
|
return |
|
} |
|
|
|
func (s *Service) busSecondRoundUpCredit(aid, cid, mid, uid int64, state, round int8, reasonID int64, reason string) (err error) { |
|
if mid == 0 || aid == 0 { |
|
return |
|
} |
|
var msg = &up.CreditLog{ |
|
Type: round, |
|
Optyte: state, |
|
Reason: reasonID, |
|
BusinessType: up.CreditBusinessTypeArchive, |
|
MID: mid, |
|
OID: aid, |
|
UID: uid, |
|
Content: reason, |
|
Ctime: time.Now().Unix(), |
|
Extra: map[string]interface{}{"cid": cid}, |
|
} |
|
var c = context.TODO() |
|
log.Info("aid(%d) start to send busSecondRoundUpCredit msg(%+v) to databus", aid, msg) |
|
if err = s.upCreditPub.Send(c, string(msg.OID)+string(msg.UID), msg); err != nil { |
|
log.Error("aid(%d) s.upCreditPub.Send(%+v) error(%v)", msg.OID, msg, err) |
|
} |
|
return |
|
} |
|
|
|
//func (s *Service) busDeleteVideo(aid int64, filename string) (err error) { |
|
// var msg = &message.Videoup{ |
|
// Route: message.RouteDeleteVideo, |
|
// Timestamp: time.Now().Unix(), |
|
// Aid: aid, |
|
// Filename: filename, |
|
// } |
|
// log.Info("aid(%d) filename(%s) start to send deleteVideo to databus", aid, filename) |
|
// if err = s.busSendMsg(msg); err != nil { |
|
// s.msgCh <- msg |
|
// } |
|
// return |
|
//} |
|
|
|
func (s *Service) busSendMsg(msg *message.Videoup) (err error) { |
|
var c = context.TODO() |
|
switch msg.Route { |
|
case message.RouteFirstRound, message.RouteUGCFirstRound, message.RouteDeleteVideo: |
|
if err = s.videoupPub.Send(c, msg.Filename, msg); err != nil { |
|
log.Error("filename(%s) %s s.videoupPub.Send(%+v) error(%v)", msg.Filename, msg.Route, msg, err) |
|
} |
|
case message.RouteSecondRound, message.RouteModifyArchive, message.RouteForceSync: |
|
if err = s.videoupPub.Send(c, strconv.FormatInt(msg.Aid, 10), msg); err != nil { |
|
log.Error("aid(%d) %s s.videoupPub.Send(%+v) error(%v)", msg.Aid, msg.Route, msg, err) |
|
} |
|
default: |
|
log.Warn("databuserr can't process the type (%s)", msg.Route) |
|
} |
|
return |
|
} |
|
|
|
// databus err proc |
|
func (s *Service) msgproc() { |
|
// NOTE: chan |
|
s.wg.Add(1) |
|
go func() { |
|
var ( |
|
c = context.TODO() |
|
msg *message.Videoup |
|
ok bool |
|
err error |
|
) |
|
defer s.wg.Done() |
|
for { |
|
if msg, ok = <-s.msgCh; !ok { |
|
log.Info("msgproc s.msgCh stop") |
|
return |
|
} |
|
log.Info("aid(%d) filename(%s) get msg(%+v) from s.msgCh", msg.Aid, msg.Filename, msg) |
|
if err = s.busSendMsg(msg); err != nil { |
|
s.busCache.PushMsgCache(c, msg) |
|
time.Sleep(100 * time.Millisecond) |
|
} |
|
} |
|
}() |
|
// NOTE: from redis list when chan error |
|
s.wg.Add(1) |
|
go func() { |
|
var ( |
|
c = context.TODO() |
|
msg *message.Videoup |
|
err error |
|
) |
|
defer s.wg.Done() |
|
for { |
|
if s.closed { |
|
log.Info("second msgproc service is close") |
|
return |
|
} |
|
if msg, err = s.busCache.PopMsgCache(c); err != nil { |
|
log.Error("msgproc s.busCache.PopMsgCache() error(%v)", err) |
|
time.Sleep(100 * time.Millisecond) |
|
continue |
|
} |
|
if msg == nil { |
|
select { |
|
case <-time.After(3 * time.Minute): |
|
continue |
|
case <-s.stop: |
|
return |
|
} |
|
} |
|
log.Info("aid(%d) filename(%s) get msg(%+v) from redis", msg.Aid, msg.Filename, msg) |
|
if err = s.busSendMsg(msg); err != nil { |
|
s.busCache.PushMsgCache(c, msg) |
|
time.Sleep(100 * time.Millisecond) |
|
} |
|
} |
|
}() |
|
}
|
|
|