You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
361 lines
8.9 KiB
361 lines
8.9 KiB
package lancerlogstream |
|
|
|
import ( |
|
"net" |
|
"time" |
|
"sync" |
|
"errors" |
|
"math/rand" |
|
"expvar" |
|
|
|
"go-common/library/log" |
|
"go-common/app/service/ops/log-agent/pkg/bufio" |
|
xtime "go-common/library/time" |
|
) |
|
|
|
var ( |
|
ErrAddrListNil = errors.New("addrList can't be nil") |
|
ErrPoolSize = errors.New("Pool size should be no greater then length of addr list") |
|
) |
|
|
|
type LancerBufConn struct { |
|
conn net.Conn |
|
wr *bufio.Writer |
|
enabled bool |
|
ctime time.Time |
|
} |
|
|
|
type connPool struct { |
|
c *ConnPoolConfig |
|
invalidUpstreams map[string]interface{} |
|
invalidUpstreamsLock sync.RWMutex |
|
validBufConnChan chan *LancerBufConn |
|
invalidBufConnChan chan *LancerBufConn |
|
connCounter map[string]int |
|
connCounterLock sync.RWMutex |
|
newConnLock sync.Mutex |
|
} |
|
|
|
type ConnPoolConfig struct { |
|
Name string `tome:"name"` |
|
AddrList []string `tome:"addrList"` |
|
DialTimeout xtime.Duration `tome:"dialTimeout"` |
|
IdleTimeout xtime.Duration `tome:"idleTimeout"` |
|
BufSize int `tome:"bufSize"` |
|
PoolSize int `tome:"poolSize"` |
|
} |
|
|
|
func (c *ConnPoolConfig) ConfigValidate() (error) { |
|
if c == nil { |
|
return errors.New("Config of pool is nil") |
|
} |
|
|
|
if len(c.AddrList) == 0 { |
|
return errors.New("pool addr list can't be empty") |
|
} |
|
if time.Duration(c.DialTimeout) == 0 { |
|
c.DialTimeout = xtime.Duration(time.Second * 5) |
|
} |
|
|
|
if time.Duration(c.IdleTimeout) == 0 { |
|
c.IdleTimeout = xtime.Duration(time.Minute * 15) |
|
} |
|
|
|
if c.BufSize == 0 { |
|
c.BufSize = 1024 * 1024 * 2 // 2M by default |
|
} |
|
|
|
if c.PoolSize == 0 { |
|
c.PoolSize = len(c.AddrList) |
|
} |
|
return nil |
|
} |
|
|
|
// newConn make a connection to lancer |
|
func (cp *connPool) newConn() (conn net.Conn, err error) { |
|
cp.newConnLock.Lock() |
|
defer cp.newConnLock.Unlock() |
|
for { |
|
if addr, err := cp.randomOneUpstream(); err == nil { |
|
if conn, err := net.DialTimeout("tcp", addr, time.Duration(cp.c.DialTimeout)); err == nil && conn != nil { |
|
log.Info("connect to %s success", addr) |
|
cp.connCounterAdd(addr) |
|
return conn, nil |
|
} else { |
|
cp.markUpstreamInvalid(addr) |
|
continue |
|
} |
|
} else { |
|
return nil, err |
|
} |
|
} |
|
} |
|
|
|
// newBufConn 创建一个buf连接, buf连接绑定一个conn(无论连接是否可用) |
|
func (cp *connPool) newBufConn() (bufConn *LancerBufConn, err error) { |
|
bufConn = new(LancerBufConn) |
|
bufConn.wr = bufio.NewWriterSize(nil, cp.c.BufSize) |
|
if err := cp.setConn(bufConn); err == nil { |
|
bufConn.enabled = true |
|
} else { |
|
bufConn.enabled = false |
|
} |
|
|
|
return bufConn, nil |
|
} |
|
|
|
// flushBufConn 定期flush buffer |
|
func (cp *connPool) flushBufConn() { |
|
for { |
|
bufConn, _ := cp.getBufConn() |
|
bufConn.conn.SetWriteDeadline(time.Now().Add(time.Second * 5)) |
|
if err := bufConn.wr.Flush(); err != nil { |
|
log.Error("Error when flush to %s: %s", bufConn.conn.RemoteAddr().String(), err) |
|
bufConn.enabled = false |
|
} |
|
cp.putBufConn(bufConn) |
|
time.Sleep(time.Second * 5) |
|
} |
|
} |
|
|
|
// initConnPool 初始化conn pool对象 |
|
func initConnPool(c *ConnPoolConfig) (cp *connPool, err error) { |
|
if err = c.ConfigValidate(); err != nil { |
|
return nil, err |
|
} |
|
if len(c.AddrList) == 0 { |
|
return nil, ErrAddrListNil |
|
} |
|
|
|
if c.PoolSize > len(c.AddrList) { |
|
return nil, ErrPoolSize |
|
} |
|
|
|
rand.Seed(time.Now().Unix()) |
|
cp = new(connPool) |
|
cp.c = c |
|
cp.validBufConnChan = make(chan *LancerBufConn, cp.c.PoolSize) |
|
cp.invalidBufConnChan = make(chan *LancerBufConn, cp.c.PoolSize) |
|
cp.invalidUpstreams = make(map[string]interface{}) |
|
cp.connCounter = make(map[string]int) |
|
cp.initPool() |
|
go cp.maintainUpstream() |
|
go cp.flushBufConn() |
|
go cp.maintainBufConnPool() |
|
expvar.Publish("conn_pool"+cp.c.Name, expvar.Func(cp.connPoolStatus)) |
|
return cp, nil |
|
} |
|
|
|
// connableUpstreams 返回可以建立连接的upstream列表 |
|
func (cp *connPool) connableUpstreams() ([]string) { |
|
list := make([]string, 0) |
|
cp.invalidUpstreamsLock.RLock() |
|
defer cp.invalidUpstreamsLock.RUnlock() |
|
for _, addr := range cp.c.AddrList { |
|
if _, ok := cp.invalidUpstreams[addr]; !ok { |
|
if count, ok := cp.connCounter[addr]; ok && count == 0 { |
|
list = append(list, addr) |
|
} |
|
} |
|
} |
|
return list |
|
} |
|
|
|
// write write []byte to BufConn |
|
func (bc *LancerBufConn) write(p []byte) (int, error) { |
|
bc.conn.SetWriteDeadline(time.Now().Add(time.Second * 5)) |
|
return bc.wr.Write(p) |
|
} |
|
|
|
// randomOneUpstream 随机返回一个可以建立连接的upstream |
|
func (cp *connPool) randomOneUpstream() (s string, err error) { |
|
list := cp.connableUpstreams() |
|
if len(list) == 0 { |
|
err = errors.New("No valid upstreams") |
|
return |
|
} |
|
return list[rand.Intn(len(list))], nil |
|
} |
|
|
|
// initPool 初始化poolSize个数的bufConn |
|
func (cp *connPool) initPool() { |
|
for _, addr := range cp.c.AddrList { |
|
cp.connCounter[addr] = 0 |
|
} |
|
for i := 0; i < cp.c.PoolSize; i++ { |
|
if bufConn, err := cp.newBufConn(); err == nil { |
|
cp.putBufConn(bufConn) |
|
} |
|
} |
|
} |
|
|
|
// novalidUpstream check if there is no validUpstream |
|
func (cp *connPool) novalidUpstream() bool { |
|
return len(cp.invalidUpstreams) == len(cp.c.AddrList) |
|
} |
|
|
|
//GetConn 从pool中取一个BufConn |
|
func (cp *connPool) getBufConn() (*LancerBufConn, error) { |
|
for { |
|
select { |
|
case bufConn := <-cp.validBufConnChan: |
|
if !bufConn.enabled { |
|
cp.putInvalidBufConn(bufConn) |
|
continue |
|
} |
|
return bufConn, nil |
|
case <-time.After(10 * time.Second): |
|
log.Warn("timeout when get conn from conn pool") |
|
continue |
|
} |
|
} |
|
} |
|
|
|
// setConn 为bufConn绑定一个新的Conn |
|
func (cp *connPool) setConn(bufConn *LancerBufConn) (error) { |
|
if bufConn.conn != nil { |
|
if bufConn.enabled == false { |
|
cp.markUpstreamInvalid(bufConn.conn.RemoteAddr().String()) |
|
} |
|
cp.connCounterDel(bufConn.conn.RemoteAddr().String()) |
|
bufConn.conn.Close() |
|
bufConn.conn = nil |
|
bufConn.enabled = false |
|
} |
|
if conn, err := cp.newConn(); err == nil { |
|
bufConn.conn = conn |
|
bufConn.wr.Reset(conn) |
|
bufConn.ctime = time.Now() |
|
bufConn.enabled = true |
|
return nil |
|
} else { |
|
bufConn.enabled = false |
|
return err |
|
} |
|
} |
|
|
|
//putBufConn 把BufConn放回到pool中 |
|
func (cp *connPool) putBufConn(bufConn *LancerBufConn) { |
|
if bufConn.enabled == false { |
|
cp.putInvalidBufConn(bufConn) |
|
return |
|
} |
|
if bufConn.ctime.Add(time.Duration(cp.c.IdleTimeout)).Before(time.Now()) { |
|
bufConn.wr.Flush() |
|
cp.putInvalidBufConn(bufConn) |
|
return |
|
} |
|
cp.putValidBufConn(bufConn) |
|
} |
|
|
|
// putValidBufConn 把 bufConn放到可用的pool中 |
|
func (cp *connPool) putValidBufConn(bufConn *LancerBufConn) { |
|
select { |
|
case cp.validBufConnChan <- bufConn: |
|
return |
|
default: |
|
log.Warn("BufConnChan full, discard,this shouldn't happen") |
|
return |
|
} |
|
} |
|
|
|
// putInvalidBufConn 把bufConn放到不可用的pool中 |
|
func (cp *connPool) putInvalidBufConn(bufConn *LancerBufConn) { |
|
select { |
|
case cp.invalidBufConnChan <- bufConn: |
|
return |
|
default: |
|
log.Warn("invalidBufConnChan full, discard,this shouldn't happen") |
|
return |
|
} |
|
} |
|
|
|
// maintainBufConnPool 维护BufConnPool状态 |
|
func (cp *connPool) maintainBufConnPool() { |
|
for { |
|
select { |
|
case bufConn := <-cp.invalidBufConnChan: |
|
cp.setConn(bufConn) |
|
cp.putBufConn(bufConn) |
|
} |
|
time.Sleep(time.Second * 1) |
|
} |
|
} |
|
|
|
//markConnInvalid会将链接关闭并且将相应upstreamserver设置为不可用 |
|
func (cp *connPool) markUpstreamInvalid(addr string) (err error) { |
|
log.Error("mark upstream %s invalid", addr) |
|
cp.invalidUpstreamsLock.Lock() |
|
cp.invalidUpstreams[addr] = nil |
|
cp.invalidUpstreamsLock.Unlock() |
|
return |
|
} |
|
|
|
// markUpstreamValid 将某一addr设置为不可用 |
|
func (cp *connPool) markUpstreamValid(addr string) (err error) { |
|
log.Info("%s is valid again", addr) |
|
cp.invalidUpstreamsLock.Lock() |
|
delete(cp.invalidUpstreams, addr) |
|
cp.invalidUpstreamsLock.Unlock() |
|
return |
|
} |
|
|
|
// connCounterAdd 连接数+1 |
|
func (cp *connPool) connCounterAdd(addr string) { |
|
cp.connCounterLock.Lock() |
|
defer cp.connCounterLock.Unlock() |
|
if _, ok := cp.connCounter[addr]; ok { |
|
cp.connCounter[addr] += 1 |
|
} else { |
|
cp.connCounter[addr] = 1 |
|
} |
|
return |
|
} |
|
|
|
//connCounterDel 连接数-1 |
|
func (cp *connPool) connCounterDel(addr string) { |
|
cp.connCounterLock.Lock() |
|
defer cp.connCounterLock.Unlock() |
|
if _, ok := cp.connCounter[addr]; ok { |
|
cp.connCounter[addr] -= 1 |
|
} |
|
} |
|
|
|
// connPoolStatus 返回connPool状态 |
|
func (cp *connPool) connPoolStatus() interface{} { |
|
status := make(map[string]interface{}) |
|
status["conn_num"] = cp.connCounter |
|
status["invalidUpstreams"] = cp.invalidUpstreams |
|
return status |
|
} |
|
|
|
// maintainUpstream 维护upstream的健康状态 |
|
func (cp *connPool) maintainUpstream() { |
|
for { |
|
cp.invalidUpstreamsLock.RLock() |
|
tryAddrs := make([]string, 0, len(cp.invalidUpstreams)) |
|
for k := range cp.invalidUpstreams { |
|
tryAddrs = append(tryAddrs, k) |
|
} |
|
cp.invalidUpstreamsLock.RUnlock() |
|
for _, addr := range tryAddrs { |
|
if conn, err := net.DialTimeout("tcp", addr, time.Duration(cp.c.DialTimeout)); err == nil && conn != nil { |
|
conn.Close() |
|
cp.markUpstreamValid(addr) |
|
} |
|
} |
|
time.Sleep(time.Second * 10) |
|
} |
|
} |
|
|
|
//ReleaseConnPool 释放连接池中所有链接 |
|
func (cp *connPool) ReleaseConnPool() { |
|
log.Info("Release Conn Pool") |
|
close(cp.validBufConnChan) |
|
close(cp.invalidBufConnChan) |
|
for conn := range cp.validBufConnChan { |
|
conn.enabled = false |
|
conn.wr.Flush() |
|
conn.conn.Close() |
|
} |
|
}
|
|
|