You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
175 lines
3.6 KiB
175 lines
3.6 KiB
package service |
|
|
|
import ( |
|
"context" |
|
"encoding/json" |
|
"fmt" |
|
"sync" |
|
"time" |
|
|
|
appmdl "go-common/app/interface/main/app-player/model/archive" |
|
"go-common/app/job/main/app-player/conf" |
|
"go-common/app/job/main/app-player/dao" |
|
"go-common/app/job/main/app-player/model" |
|
arcrpc "go-common/app/service/main/archive/api" |
|
"go-common/library/ecode" |
|
"go-common/library/log" |
|
"go-common/library/queue/databus" |
|
) |
|
|
|
const ( |
|
_updateAct = "update" |
|
_insertAct = "insert" |
|
_tableArchive = "archive" |
|
) |
|
|
|
// Service is service. |
|
type Service struct { |
|
c *conf.Config |
|
dao *dao.Dao |
|
arcRPC arcrpc.ArchiveClient |
|
// sub |
|
archiveNotifySub *databus.Databus |
|
waiter sync.WaitGroup |
|
closed bool |
|
} |
|
|
|
// New new a service. |
|
func New(c *conf.Config) (s *Service) { |
|
s = &Service{ |
|
c: c, |
|
dao: dao.New(c), |
|
archiveNotifySub: databus.New(c.ArchiveNotifySub), |
|
closed: false, |
|
} |
|
var err error |
|
s.arcRPC, err = arcrpc.NewClient(nil) |
|
if err != nil { |
|
panic(fmt.Sprintf("archive NewClient error(%v)", err)) |
|
} |
|
s.waiter.Add(1) |
|
go s.arcConsumeproc() |
|
s.waiter.Add(1) |
|
go s.retryproc() |
|
return |
|
} |
|
|
|
// Close Databus consumer close. |
|
func (s *Service) Close() { |
|
s.closed = true |
|
s.archiveNotifySub.Close() |
|
s.waiter.Wait() |
|
} |
|
|
|
// Ping is |
|
func (s *Service) Ping(c context.Context) (err error) { |
|
if err = s.dao.PingMc(c); err != nil { |
|
return |
|
} |
|
return |
|
} |
|
|
|
// arcConsumeproc consumer archive |
|
func (s *Service) arcConsumeproc() { |
|
var ( |
|
msg *databus.Message |
|
ok bool |
|
err error |
|
) |
|
msgs := s.archiveNotifySub.Messages() |
|
for { |
|
if msg, ok = <-msgs; !ok { |
|
log.Info("arc databus Consumer exit") |
|
break |
|
} |
|
log.Info("got databus message(%s)", msg.Value) |
|
msg.Commit() |
|
var ms = &model.Message{} |
|
if err = json.Unmarshal(msg.Value, ms); err != nil { |
|
log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err) |
|
continue |
|
} |
|
switch ms.Table { |
|
case _tableArchive: |
|
s.archiveUpdate(ms.Action, ms.New) |
|
} |
|
} |
|
s.waiter.Done() |
|
} |
|
|
|
func (s *Service) archiveUpdate(action string, nwMsg []byte) { |
|
nw := &model.ArcMsg |
|
if err := json.Unmarshal(nwMsg, nw); err != nil { |
|
log.Error("json.Unmarshal(%s) error(%v)", nwMsg, err) |
|
return |
|
} |
|
switch action { |
|
case _updateAct, _insertAct: |
|
s.upArcCache(nw.Aid) |
|
} |
|
} |
|
|
|
func (s *Service) upArcCache(aid int64) { |
|
var ( |
|
view *arcrpc.ViewReply |
|
arc *appmdl.Info |
|
cids []int64 |
|
err error |
|
) |
|
defer func() { |
|
if err != nil { |
|
retry := &model.Retry{Aid: aid} |
|
s.dao.PushList(context.Background(), retry) |
|
log.Warn("upArcCache fail(%+v)", retry) |
|
} |
|
}() |
|
c := context.Background() |
|
if view, err = s.arcRPC.View(c, &arcrpc.ViewRequest{Aid: aid}); err != nil { |
|
if ecode.Cause(err).Equal(ecode.NothingFound) { |
|
err = nil |
|
return |
|
} |
|
log.Error("s.arcRPC.View3(%d) error(%v)", aid, err) |
|
return |
|
} |
|
for _, p := range view.Pages { |
|
cids = append(cids, p.Cid) |
|
} |
|
arc = &appmdl.Info{ |
|
Aid: aid, |
|
Cids: cids, |
|
State: view.Arc.State, |
|
Mid: view.Arc.Author.Mid, |
|
Attribute: view.Arc.Attribute, |
|
} |
|
if err = s.dao.AddArchiveCache(context.Background(), aid, arc); err == nil { |
|
log.Info("update view cahce aid(%d) success", aid) |
|
} |
|
} |
|
|
|
func (s *Service) retryproc() { |
|
for { |
|
if s.closed { |
|
break |
|
} |
|
var ( |
|
retry = &model.Retry{} |
|
bs []byte |
|
err error |
|
) |
|
if bs, err = s.dao.PopList(context.Background()); err != nil || len(bs) == 0 { |
|
time.Sleep(5 * time.Second) |
|
continue |
|
} |
|
if err = json.Unmarshal(bs, retry); err != nil { |
|
log.Error("json.Unmarshal(%s) error(%v)", bs, err) |
|
continue |
|
} |
|
log.Info("retry data(%+v) start", retry) |
|
if retry.Aid != 0 { |
|
s.upArcCache(retry.Aid) |
|
} |
|
log.Info("retry data(%+v) end", retry) |
|
} |
|
s.waiter.Done() |
|
}
|
|
|