You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
144 lines
4.5 KiB
144 lines
4.5 KiB
package httpstream |
|
|
|
import ( |
|
"net/http" |
|
"sync" |
|
"regexp" |
|
"bytes" |
|
"strconv" |
|
"context" |
|
"go-common/app/service/ops/log-agent/event" |
|
) |
|
|
|
type HttpStream struct { |
|
c *Config |
|
logstreams map[chan *event.ProcessorEvent]*filterRule |
|
l sync.Mutex |
|
} |
|
type filterRule struct { |
|
maxLines int |
|
appId string |
|
instanceId string |
|
reg *regexp.Regexp |
|
} |
|
|
|
var LogSourceChan = make(chan *event.ProcessorEvent) |
|
|
|
// initlogStream init log stream |
|
func NewHttpStream(config *Config) (httpStream *HttpStream, err error) { |
|
h := new(HttpStream) |
|
if err := config.ConfigValidate(); err != nil { |
|
return nil, err |
|
} |
|
h.c = config |
|
h.logstreams = make(map[chan *event.ProcessorEvent]*filterRule) |
|
http.HandleFunc("/logs", h.LogStreamer()) |
|
go h.route() |
|
go http.ListenAndServe(h.c.Addr, nil) |
|
return h, nil |
|
} |
|
|
|
// route 把日志路由到所有注册的logstream |
|
func (s *HttpStream) route() { |
|
for buf := range LogSourceChan { |
|
for logstream, _ := range s.logstreams { |
|
logstream <- buf |
|
} |
|
} |
|
} |
|
|
|
// LogStreamer 接收请求 |
|
func (s *HttpStream) LogStreamer() func(w http.ResponseWriter, req *http.Request) { |
|
logsHandler := func(w http.ResponseWriter, req *http.Request) { |
|
logstream := make(chan *event.ProcessorEvent) |
|
f := new(filterRule) |
|
// parse params |
|
params := req.URL.Query() |
|
if appId, ok := params["app_id"]; ok { |
|
f.appId = appId[0] |
|
} else { |
|
w.Write(append([]byte("必须指定app_id"), '\n')) |
|
return |
|
} |
|
if reg, ok := params["regexp"]; ok { |
|
if filterReg, err := regexp.Compile(reg[0]); err == nil { |
|
f.reg = filterReg |
|
} else { |
|
w.Write(append([]byte("正则表达式格式错误"), '\n')) |
|
return |
|
} |
|
} |
|
if instanceId, ok := params["instance_id"]; ok { |
|
f.instanceId = instanceId[0] |
|
} |
|
|
|
if maxLines, ok := params["max_lines"]; ok { |
|
if n, err := strconv.Atoi(maxLines[0]); err == nil { |
|
f.maxLines = n |
|
} else { |
|
w.Write(append([]byte("max_lines格式错误"), '\n')) |
|
return |
|
} |
|
} |
|
|
|
s.add(logstream, f) |
|
go func() { |
|
select { |
|
case <-req.Context().Done(): |
|
s.remove(logstream) |
|
} |
|
}() |
|
defer s.httpStreamer(req.Context(), w, req, logstream, f) |
|
} |
|
return logsHandler |
|
} |
|
|
|
// httpStreamer 过滤并输出日志 |
|
func (s *HttpStream) httpStreamer(ctx context.Context, w http.ResponseWriter, req *http.Request, logstream chan *event.ProcessorEvent, f *filterRule) { |
|
w.Header().Add("Content-Type", "text/plain; charset=utf-8") |
|
if f.instanceId != "" { |
|
//w.Write([]byte(fmt.Sprintf( |
|
// "\x1b[1;31m%s\x1b[0m\n", "注意:caster中,只有(1)app_id满足服务树三级格式 (2)日志包含instance_id且值为实例名称 的情况下,日志才能输出\n"))) |
|
w.Write([]byte("注意:caster中,只有(1)app_id满足服务树三级格式 (2)日志包含instance_id且值为实例名称 的情况下,日志才能输出 \n")) |
|
w.(http.Flusher).Flush() |
|
} |
|
c := 0 |
|
for { |
|
select { |
|
case e := <-logstream: |
|
if f.appId != "" && string(e.AppId) != f.appId { |
|
continue |
|
} |
|
if f.reg != nil && !f.reg.Match(e.Bytes()) { |
|
continue |
|
} |
|
if f.instanceId != "" && !bytes.Contains(e.Bytes(), []byte(f.instanceId)) { |
|
continue |
|
} |
|
if f.maxLines != 0 && c >= f.maxLines { |
|
s.remove(logstream) |
|
return |
|
} |
|
c += 1 |
|
// TODO event recycle |
|
w.Write(append(e.Bytes(), '\n')) |
|
w.(http.Flusher).Flush() |
|
case <-ctx.Done(): |
|
return |
|
} |
|
} |
|
} |
|
|
|
// add 注册logstream |
|
func (s *HttpStream) add(logstream chan *event.ProcessorEvent, f *filterRule) { |
|
s.l.Lock() |
|
defer s.l.Unlock() |
|
s.logstreams[logstream] = f |
|
} |
|
|
|
// remove 注销logstream |
|
func (s *HttpStream) remove(logstream chan *event.ProcessorEvent) { |
|
s.l.Lock() |
|
defer s.l.Unlock() |
|
delete(s.logstreams, logstream) |
|
}
|
|
|