You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
267 lines
7.0 KiB
267 lines
7.0 KiB
package lancergrpc |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"bytes" |
|
"sync" |
|
"strconv" |
|
"time" |
|
"math" |
|
|
|
"go-common/app/service/ops/log-agent/event" |
|
"go-common/app/service/ops/log-agent/output" |
|
"go-common/app/service/ops/log-agent/pkg/flowmonitor" |
|
"go-common/app/service/ops/log-agent/pkg/common" |
|
"go-common/app/service/ops/log-agent/output/cache/file" |
|
"go-common/library/log" |
|
"go-common/app/service/ops/log-agent/pkg/lancermonitor" |
|
"google.golang.org/grpc" |
|
"google.golang.org/grpc/codes" |
|
"go-common/app/service/ops/log-agent/output/lancergrpc/lancergateway" |
|
) |
|
|
|
const ( |
|
_appIdKey = `"app_id":` |
|
_levelKey = `"level":` |
|
_logTime = `"time":` |
|
) |
|
|
|
var ( |
|
logMagic = []byte{0xAC, 0xBE} |
|
logMagicBuf = []byte{0xAC, 0xBE} |
|
_logType = []byte{0, 1} |
|
_logLength = []byte{0, 0, 0, 0} |
|
local, _ = time.LoadLocation("Local") |
|
) |
|
|
|
type logDoc struct { |
|
b []byte |
|
logId string |
|
} |
|
|
|
func init() { |
|
err := output.Register("lancergrpc", NewLancer) |
|
if err != nil { |
|
panic(err) |
|
} |
|
} |
|
|
|
type Lancer struct { |
|
c *Config |
|
next chan string |
|
i chan *event.ProcessorEvent |
|
cache *file.FileCache |
|
logAggrBuf map[string]*bytes.Buffer |
|
logAggrBufLock sync.Mutex |
|
sendChan chan *logDoc |
|
lancerClient lancergateway.Gateway2ServerClient |
|
ctx context.Context |
|
cancel context.CancelFunc |
|
} |
|
|
|
func NewLancer(ctx context.Context, config interface{}) (output.Output, error) { |
|
var err error |
|
|
|
lancer := new(Lancer) |
|
if c, ok := config.(*Config); !ok { |
|
return nil, fmt.Errorf("Error config for Lancer output") |
|
} else { |
|
if err = c.ConfigValidate(); err != nil { |
|
return nil, err |
|
} |
|
lancer.c = c |
|
} |
|
if output.OutputRunning(lancer.c.Name) { |
|
return nil, fmt.Errorf("Output %s already running", lancer.c.Name) |
|
} |
|
|
|
lancer.i = make(chan *event.ProcessorEvent) |
|
lancer.next = make(chan string, 1) |
|
lancer.logAggrBuf = make(map[string]*bytes.Buffer) |
|
lancer.sendChan = make(chan *logDoc) |
|
cache, err := file.NewFileCache(lancer.c.CacheConfig) |
|
if err != nil { |
|
return nil, err |
|
} |
|
lancer.cache = cache |
|
lancer.lancerClient, err = lancergateway.NewClient(lancer.c.LancerGateway) |
|
if err != nil { |
|
return nil, err |
|
} |
|
lancer.ctx, lancer.cancel = context.WithCancel(ctx) |
|
return lancer, nil |
|
} |
|
|
|
func (l *Lancer) InputChan() (chan *event.ProcessorEvent) { |
|
return l.i |
|
} |
|
|
|
func (l *Lancer) Run() (err error) { |
|
go l.readFromProcessor() |
|
go l.consumeCache() |
|
go l.flushLogAggrPeriodically() |
|
for i := 0; i < l.c.SendConcurrency; i++ { |
|
go l.sendToLancer() |
|
} |
|
if l.c.Name != "" { |
|
output.RegisterOutput(l.c.Name, l) |
|
} |
|
return nil |
|
} |
|
|
|
func (l *Lancer) Stop() { |
|
l.cancel() |
|
} |
|
|
|
func (l *Lancer) readFromProcessor() { |
|
for e := range l.i { |
|
// only cache for sock input |
|
if e.Source == "sock" { |
|
l.cache.WriteToCache(e) |
|
continue |
|
} |
|
// without cache |
|
l.preWriteToLancer(e) |
|
} |
|
} |
|
|
|
func (l *Lancer) preWriteToLancer(e *event.ProcessorEvent) { |
|
flowmonitor.Fm.AddEvent(e, "log-agent.output.lancer", "OK", "write to lancer") |
|
lancermonitor.IncreaseLogCount("agent.send.success.count", e.LogId) |
|
if l.c.Name == "lancer-ops-log" { |
|
l.logAggr(e) |
|
} else { |
|
l.sendLogDirectToLancer(e) |
|
} |
|
} |
|
|
|
// consumeCache consume logs from cache |
|
func (l *Lancer) consumeCache() { |
|
for { |
|
e := l.cache.ReadFromCache() |
|
if e.Length < _logLancerHeaderLen { |
|
event.PutEvent(e) |
|
continue |
|
} |
|
// monitor should be called before event recycle |
|
l.parseOpslog(e) |
|
l.preWriteToLancer(e) |
|
} |
|
} |
|
|
|
func (l *Lancer) parseOpslog(e *event.ProcessorEvent) { |
|
if l.c.Name == "lancer-ops-log" { |
|
e.AppId, _ = common.SeekValue([]byte(_appIdKey), e.Bytes()) |
|
|
|
if timeValue, err := common.SeekValue([]byte(_logTime), e.Bytes()); err == nil { |
|
if len(timeValue) >= 19 { |
|
// parse time |
|
var t time.Time |
|
if t, err = time.Parse(time.RFC3339Nano, string(timeValue)); err != nil { |
|
if t, err = time.ParseInLocation("2006-01-02T15:04:05", string(timeValue), local); err != nil { |
|
if t, err = time.ParseInLocation("2006-01-02T15:04:05", string(timeValue[0:19]), local); err != nil { |
|
} |
|
} |
|
} |
|
if !t.IsZero() { |
|
e.TimeRangeKey = strconv.FormatInt(t.Unix()/100*100, 10) |
|
} |
|
} |
|
} |
|
} |
|
} |
|
|
|
// sendLogDirectToLancer send log direct to lancer without aggr |
|
func (l *Lancer) sendLogDirectToLancer(e *event.ProcessorEvent) { |
|
logDoc := new(logDoc) |
|
logDoc.b = make([]byte, e.Length) |
|
copy(logDoc.b, e.Bytes()) |
|
logDoc.logId = e.LogId |
|
event.PutEvent(e) |
|
l.sendChan <- logDoc |
|
} |
|
|
|
func (l *Lancer) nextRetry(retry int) (time.Duration) { |
|
// avoid d too large |
|
if retry > 10 { |
|
return time.Duration(l.c.MaxRetryDuration) |
|
} |
|
|
|
d := time.Duration(math.Pow(2, float64(retry))) * time.Duration(l.c.InitialRetryDuration) |
|
|
|
if d > time.Duration(l.c.MaxRetryDuration) { |
|
return time.Duration(l.c.MaxRetryDuration) |
|
} |
|
|
|
return d |
|
} |
|
|
|
func (l *Lancer) bulkSendToLancerWithRetry(in *lancergateway.EventList) { |
|
retry := 0 |
|
for { |
|
ctx, _ := context.WithTimeout(context.Background(), time.Duration(l.c.SendBatchTimeout)) |
|
t1 := time.Now() |
|
resp, err := l.lancerClient.SendList(ctx, in) |
|
if err == nil { |
|
if resp.Code == lancergateway.StatusCode_SUCCESS { |
|
log.Info("get 200 from lancer gateway: size %d, count %d, cost %s", in.Size(), len(in.Events), time.Since(t1).String()) |
|
return |
|
} |
|
flowmonitor.Fm.Add("log-agent", "log-agent.output.lancer", "", "ERROR", fmt.Sprintf("write to lancer None 200: %s", resp.Code)) |
|
log.Warn("get None 200 from lancer gateway, retry: %s", resp.Code) |
|
} |
|
|
|
if err != nil { |
|
switch grpc.Code(err) { |
|
case codes.Canceled, codes.DeadlineExceeded, codes.Unavailable, codes.ResourceExhausted: |
|
flowmonitor.Fm.Add("log-agent", "log-agent.output.lancer", "", "ERROR", fmt.Sprintf("write to lancer failed, retry: %s", err)) |
|
log.Warn("get error from lancer gateway, retry: %s", err) |
|
default: |
|
flowmonitor.Fm.Add("log-agent", "log-agent.output.lancer", "", "ERROR", fmt.Sprintf("write to lancer failed, no retry: %s", err)) |
|
log.Warn("get error from lancer gateway, no retry: %s", err) |
|
return |
|
} |
|
} |
|
|
|
time.Sleep(l.nextRetry(retry)) |
|
retry ++ |
|
} |
|
} |
|
|
|
// sendproc send the proc to lancer |
|
func (l *Lancer) sendToLancer() { |
|
eventList := new(lancergateway.EventList) |
|
eventListLock := sync.Mutex{} |
|
lastSend := time.Now() |
|
ticker := time.Tick(time.Second * 1) |
|
size := 0 |
|
for { |
|
select { |
|
case <-ticker: |
|
if lastSend.Add(time.Duration(l.c.SendFlushInterval)).Before(time.Now()) && len(eventList.Events) > 0 { |
|
eventListLock.Lock() |
|
l.bulkSendToLancerWithRetry(eventList) |
|
eventList.Reset() |
|
size = 0 |
|
eventListLock.Unlock() |
|
lastSend = time.Now() |
|
} |
|
case logDoc := <-l.sendChan: |
|
event := new(lancergateway.SimpleEvent) |
|
event.LogId = logDoc.logId |
|
event.Header = map[string]string{"timestamp": strconv.FormatInt(time.Now().Unix()/100*100, 10)} |
|
event.Data = logDoc.b |
|
size += len(event.Data) |
|
eventListLock.Lock() |
|
eventList.Events = append(eventList.Events, event) |
|
if size > l.c.SendBatchSize || len(eventList.Events) > l.c.SendBatchNum { |
|
l.bulkSendToLancerWithRetry(eventList) |
|
eventList.Reset() |
|
size = 0 |
|
lastSend = time.Now() |
|
} |
|
eventListLock.Unlock() |
|
} |
|
} |
|
}
|
|
|