You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
155 lines
2.7 KiB
155 lines
2.7 KiB
package deliver |
|
|
|
import ( |
|
"encoding/binary" |
|
"fmt" |
|
"math/rand" |
|
"net" |
|
"sync" |
|
"time" |
|
|
|
"go-common/library/log" |
|
) |
|
|
|
var ( |
|
_magicBuf = []byte{0xAC, 0xBE} |
|
_bufpool sync.Pool |
|
) |
|
|
|
func init() { |
|
rand.Seed(time.Now().UnixNano()) |
|
_bufpool = sync.Pool{New: func() interface{} { |
|
return make([]byte, 0, 4096) |
|
}} |
|
} |
|
|
|
func freeBuf(buf []byte) { |
|
buf = buf[:0] |
|
_bufpool.Put(buf) |
|
} |
|
|
|
func getBuf() []byte { |
|
return _bufpool.Get().([]byte) |
|
} |
|
|
|
// Deliver deliver span to dapper-service through tcp |
|
type Deliver struct { |
|
servers []string |
|
readFn func() ([]byte, error) |
|
conn *net.TCPConn |
|
dataCh chan []byte |
|
closeCh chan struct{} |
|
closed bool |
|
} |
|
|
|
// New Deliver |
|
func New(servers []string, readFn func() ([]byte, error)) (*Deliver, error) { |
|
if len(servers) == 0 { |
|
return nil, fmt.Errorf("no server provide") |
|
} |
|
d := &Deliver{ |
|
servers: servers, |
|
readFn: readFn, |
|
closeCh: make(chan struct{}, 1), |
|
dataCh: make(chan []byte), |
|
} |
|
return d, d.start() |
|
} |
|
|
|
func (d *Deliver) start() error { |
|
if err := d.dial(); err != nil { |
|
return err |
|
} |
|
go d.fetch() |
|
go d.loop() |
|
return nil |
|
} |
|
|
|
func (d *Deliver) fetch() { |
|
for { |
|
if d.closed { |
|
return |
|
} |
|
data, err := d.readFn() |
|
if err != nil { |
|
log.Error("deliver read data error: %s", err) |
|
continue |
|
} |
|
d.dataCh <- data |
|
} |
|
} |
|
|
|
func (d *Deliver) loop() { |
|
for { |
|
select { |
|
case <-d.closeCh: |
|
return |
|
case data := <-d.dataCh: |
|
data = warpData(data) |
|
send: |
|
_, err := d.conn.Write(data) |
|
if err == nil { |
|
freeBuf(data) |
|
continue |
|
} |
|
d.reDial() |
|
goto send |
|
} |
|
} |
|
} |
|
|
|
// Close deliver |
|
func (d *Deliver) Close() error { |
|
if d.closed { |
|
return fmt.Errorf("already closed") |
|
} |
|
d.closed = true |
|
d.closeCh <- struct{}{} |
|
timer := time.NewTimer(50 * time.Millisecond) |
|
select { |
|
case data := <-d.dataCh: |
|
// write last data to conn |
|
_, err := d.conn.Write(data) |
|
return fmt.Errorf("write last data error: %s", err) |
|
case <-timer.C: |
|
return nil |
|
} |
|
return nil |
|
} |
|
|
|
func (d *Deliver) reDial() { |
|
if d.conn != nil { |
|
d.conn.Close() |
|
} |
|
for { |
|
if err := d.dial(); err != nil { |
|
log.Error("redial error: %s, retry after second", err) |
|
time.Sleep(time.Second) |
|
} |
|
break |
|
} |
|
} |
|
|
|
func (d *Deliver) dial() error { |
|
server := chioceServer(d.servers) |
|
conn, err := net.Dial("tcp", server) |
|
if err != nil { |
|
return fmt.Errorf("dial tcp://%s error: %s", server, err) |
|
} |
|
d.conn = conn.(*net.TCPConn) |
|
d.conn.SetKeepAlive(true) |
|
return nil |
|
} |
|
|
|
func chioceServer(servers []string) string { |
|
return servers[rand.Intn(len(servers))] |
|
} |
|
|
|
func warpData(data []byte) []byte { |
|
buf := getBuf() |
|
buf = append(buf, _magicBuf...) |
|
buf = append(buf, []byte{0, 0, 0, 0, 0, 0}...) |
|
binary.BigEndian.PutUint32(buf[2:6], uint32(len(data)+2)) |
|
buf = append(buf, data...) |
|
return buf |
|
}
|
|
|