You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
245 lines
5.0 KiB
245 lines
5.0 KiB
package infoc |
|
|
|
import ( |
|
"bytes" |
|
"context" |
|
"encoding/binary" |
|
"errors" |
|
"fmt" |
|
"net" |
|
"strconv" |
|
"strings" |
|
"sync" |
|
"time" |
|
|
|
"go-common/library/log" |
|
"go-common/library/net/metadata" |
|
"go-common/library/net/netutil" |
|
xtime "go-common/library/time" |
|
) |
|
|
|
const ( |
|
_infocSpliter = "\001" |
|
_infocReplacer = "|" |
|
_infocLenStart = 2 |
|
_infocLenEnd = 6 |
|
_protocolLen = 6 |
|
_infocTimeout = 50 * time.Millisecond |
|
) |
|
|
|
var ( |
|
_infocMagic = []byte{172, 190} // NOTE: magic 0xAC0xBE |
|
_infocHeaderLen = []byte{0, 0, 0, 0} // NOTE: body len placeholder |
|
_infocType = []byte{0, 0} // NOTE: type 0 |
|
_maxRetry = 10 |
|
// ErrFull error chan buffer full. |
|
ErrFull = errors.New("infoc: chan buffer full") |
|
) |
|
|
|
var ( |
|
// ClientWeb ... |
|
ClientWeb = "web" |
|
ClientIphone = "iphone" |
|
ClientIpad = "ipad" |
|
ClientAndroid = "android" |
|
|
|
ItemTypeAv = "av" |
|
ItemTypeBangumi = "bangumi" |
|
ItemTypeLive = "live" |
|
ItemTypeTopic = "topic" |
|
ItemTypeRank = "rank" |
|
ItemTypeActivity = "activity" |
|
ItemTypeTag = "tag" |
|
ItemTypeAD = "ad" |
|
ItemTypeLV = "lv" |
|
|
|
ActionClick = "click" |
|
ActionPlay = "play" |
|
ActionFav = "fav" |
|
ActionCoin = "coin" |
|
ActionDM = "dm" |
|
ActionToView = "toview" |
|
ActionShare = "share" |
|
ActionSpace = "space" |
|
Actionfollow = "follow" |
|
ActionHeartbeat = "heartbeat" |
|
ActionAnswer = "answer" |
|
) |
|
|
|
// Config is infoc config. |
|
type Config struct { |
|
TaskID string |
|
// udp or tcp |
|
Proto string |
|
Addr string |
|
ChanSize int |
|
DialTimeout xtime.Duration |
|
WriteTimeout xtime.Duration |
|
} |
|
|
|
// Infoc infoc struct. |
|
type Infoc struct { |
|
c *Config |
|
header []byte |
|
msgs chan *bytes.Buffer |
|
dialTimeout time.Duration |
|
writeTimeout time.Duration |
|
pool sync.Pool |
|
waiter sync.WaitGroup |
|
} |
|
|
|
// New new infoc logger. |
|
func New(c *Config) (i *Infoc) { |
|
i = &Infoc{ |
|
c: c, |
|
header: []byte(c.TaskID), |
|
msgs: make(chan *bytes.Buffer, c.ChanSize), |
|
dialTimeout: time.Duration(c.DialTimeout), |
|
writeTimeout: time.Duration(c.WriteTimeout), |
|
pool: sync.Pool{ |
|
New: func() interface{} { |
|
return &bytes.Buffer{} |
|
}, |
|
}, |
|
} |
|
if i.dialTimeout == 0 { |
|
i.dialTimeout = _infocTimeout |
|
} |
|
if i.writeTimeout == 0 { |
|
i.writeTimeout = _infocTimeout |
|
} |
|
i.waiter.Add(1) |
|
go i.writeproc() |
|
return |
|
} |
|
|
|
func (i *Infoc) buf() *bytes.Buffer { |
|
return i.pool.Get().(*bytes.Buffer) |
|
} |
|
|
|
func (i *Infoc) freeBuf(buf *bytes.Buffer) { |
|
buf.Reset() |
|
i.pool.Put(buf) |
|
} |
|
|
|
// Info record log to file. |
|
func (i *Infoc) Info(args ...interface{}) (err error) { |
|
err, res := i.info(args...) |
|
if err != nil { |
|
return |
|
} |
|
select { |
|
case i.msgs <- res: |
|
default: |
|
i.freeBuf(res) |
|
err = ErrFull |
|
} |
|
return |
|
} |
|
|
|
// Infov support filter mirror request |
|
func (i *Infoc) Infov(ctx context.Context, args ...interface{}) (err error) { |
|
if metadata.Bool(ctx, metadata.Mirror) { |
|
return |
|
} |
|
|
|
return i.Info(args...) |
|
} |
|
|
|
func getValue(i interface{}) (s string) { |
|
switch v := i.(type) { |
|
case int: |
|
s = strconv.FormatInt(int64(v), 10) |
|
case int64: |
|
s = strconv.FormatInt(v, 10) |
|
case string: |
|
s = v |
|
case bool: |
|
s = strconv.FormatBool(v) |
|
default: |
|
s = fmt.Sprint(i) |
|
} |
|
return |
|
} |
|
|
|
// Close close the connection. |
|
func (i *Infoc) Close() error { |
|
i.msgs <- nil |
|
i.waiter.Wait() |
|
return nil |
|
} |
|
|
|
// writeproc write data into connection. |
|
func (i *Infoc) writeproc() { |
|
var ( |
|
msg *bytes.Buffer |
|
conn net.Conn |
|
err error |
|
) |
|
bc := netutil.BackoffConfig{ |
|
MaxDelay: 15 * time.Second, |
|
BaseDelay: 1.0 * time.Second, |
|
Factor: 1.6, |
|
Jitter: 0.2, |
|
} |
|
for { |
|
if msg = <-i.msgs; msg == nil { |
|
break // quit infoc writeproc |
|
} |
|
var j int |
|
for j = 0; j < _maxRetry; j++ { |
|
if conn == nil || err != nil { |
|
if conn, err = net.DialTimeout(i.c.Proto, i.c.Addr, i.dialTimeout); err != nil { |
|
log.Error("infoc net dial error(%v)", err) |
|
time.Sleep(bc.Backoff(j)) |
|
continue |
|
} |
|
} |
|
if i.writeTimeout != 0 { |
|
conn.SetWriteDeadline(time.Now().Add(i.writeTimeout)) |
|
} |
|
if _, err = conn.Write(msg.Bytes()); err != nil { |
|
log.Error("infoc conn write error(%v)", err) |
|
conn.Close() |
|
time.Sleep(bc.Backoff(j)) |
|
continue |
|
} |
|
break |
|
} |
|
if j == _maxRetry { |
|
log.Error("infoc reached max retry times") |
|
} |
|
i.freeBuf(msg) |
|
} |
|
i.waiter.Done() |
|
if conn != nil && err == nil { |
|
conn.Close() |
|
} |
|
} |
|
|
|
func (i *Infoc) info(args ...interface{}) (err error, buf *bytes.Buffer) { |
|
if len(args) == 0 { |
|
return nil, nil |
|
} |
|
res := i.buf() |
|
res.Write(_infocMagic) // type and body buf, for calc length. |
|
res.Write(_infocHeaderLen) // placeholder |
|
res.Write(_infocType) |
|
res.Write(i.header) |
|
res.WriteString(strconv.FormatInt(time.Now().UnixNano()/int64(time.Millisecond), 10)) |
|
// // append first arg |
|
_, err = res.WriteString(getValue(args[0])) |
|
for _, arg := range args[1:] { |
|
// append ,arg |
|
res.WriteString(_infocSpliter) |
|
_, err = res.WriteString(strings.Replace(getValue(arg), _infocSpliter, _infocReplacer, -1)) |
|
} |
|
if err != nil { |
|
i.freeBuf(res) |
|
return |
|
} |
|
bs := res.Bytes() |
|
binary.BigEndian.PutUint32(bs[_infocLenStart:_infocLenEnd], uint32(res.Len()-_protocolLen)) |
|
buf = res |
|
return |
|
}
|
|
|