You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
372 lines
8.0 KiB
372 lines
8.0 KiB
package paladin |
|
|
|
import ( |
|
"context" |
|
"encoding/json" |
|
"flag" |
|
"fmt" |
|
"io/ioutil" |
|
"net/http" |
|
"net/url" |
|
"os" |
|
"path" |
|
"strconv" |
|
"sync" |
|
"time" |
|
|
|
"go-common/library/conf/env" |
|
"go-common/library/ecode" |
|
"go-common/library/log" |
|
xip "go-common/library/net/ip" |
|
"go-common/library/net/netutil" |
|
|
|
"github.com/pkg/errors" |
|
) |
|
|
|
const ( |
|
_apiGet = "http://%s/config/v2/get?%s" |
|
_apiCheck = "http://%s/config/v2/check?%s" |
|
|
|
_maxLoadRetries = 3 |
|
) |
|
|
|
var ( |
|
_ Client = &sven{} |
|
|
|
svenHost string |
|
svenVersion string |
|
svenPath string |
|
svenToken string |
|
svenAppoint string |
|
svenTreeid string |
|
|
|
_debug bool |
|
) |
|
|
|
func init() { |
|
flag.StringVar(&svenHost, "conf_host", os.Getenv("CONF_HOST"), `config api host.`) |
|
flag.StringVar(&svenVersion, "conf_version", os.Getenv("CONF_VERSION"), `app version.`) |
|
flag.StringVar(&svenPath, "conf_path", os.Getenv("CONF_PATH"), `config file path.`) |
|
flag.StringVar(&svenToken, "conf_token", os.Getenv("CONF_TOKEN"), `config token.`) |
|
flag.StringVar(&svenAppoint, "conf_appoint", os.Getenv("CONF_APPOINT"), `config appoint.`) |
|
flag.StringVar(&svenTreeid, "tree_id", os.Getenv("TREE_ID"), `tree id.`) |
|
|
|
if env.DeployEnv == env.DeployEnvDev { |
|
_debug = true |
|
} |
|
} |
|
|
|
type watcher struct { |
|
keys []string |
|
ch chan Event |
|
} |
|
|
|
func newWatcher(keys []string) *watcher { |
|
return &watcher{keys: keys, ch: make(chan Event, 5)} |
|
} |
|
|
|
func (w *watcher) HasKey(key string) bool { |
|
if len(w.keys) == 0 { |
|
return true |
|
} |
|
for _, k := range w.keys { |
|
if k == key { |
|
return true |
|
} |
|
} |
|
return false |
|
} |
|
|
|
func (w *watcher) Handle(event Event) { |
|
select { |
|
case w.ch <- event: |
|
default: |
|
log.Error("paladin: discard event:%+v", event) |
|
} |
|
} |
|
|
|
func (w *watcher) Chan() <-chan Event { |
|
return w.ch |
|
} |
|
|
|
func (w *watcher) Close() { |
|
close(w.ch) |
|
} |
|
|
|
// sven is sven config client. |
|
type sven struct { |
|
values *Map |
|
wmu sync.RWMutex |
|
watchers map[*watcher]struct{} |
|
|
|
httpCli *http.Client |
|
backoff *netutil.BackoffConfig |
|
} |
|
|
|
// NewSven new a config client. |
|
func NewSven() (Client, error) { |
|
s := &sven{ |
|
values: new(Map), |
|
watchers: make(map[*watcher]struct{}), |
|
httpCli: &http.Client{Timeout: 60 * time.Second}, |
|
backoff: &netutil.BackoffConfig{ |
|
MaxDelay: 5 * time.Second, |
|
BaseDelay: 1.0 * time.Second, |
|
Factor: 1.6, |
|
Jitter: 0.2, |
|
}, |
|
} |
|
if err := s.checkEnv(); err != nil { |
|
return nil, err |
|
} |
|
ver, err := s.load() |
|
if err != nil { |
|
return nil, err |
|
} |
|
go s.watchproc(ver) |
|
return s, nil |
|
} |
|
|
|
func (s *sven) checkEnv() error { |
|
if svenHost == "" || svenVersion == "" || svenPath == "" || svenToken == "" || svenTreeid == "" { |
|
return fmt.Errorf("config env invalid. conf_host(%s) conf_version(%s) conf_path(%s) conf_token(%s) conf_appoint(%s) tree_id(%s)", svenHost, svenVersion, svenPath, svenToken, svenAppoint, svenTreeid) |
|
} |
|
return nil |
|
} |
|
|
|
// Get return value by key. |
|
func (s *sven) Get(key string) *Value { |
|
return s.values.Get(key) |
|
} |
|
|
|
// GetAll return value map. |
|
func (s *sven) GetAll() *Map { |
|
return s.values |
|
} |
|
|
|
// WatchEvent watch with the specified keys. |
|
func (s *sven) WatchEvent(ctx context.Context, keys ...string) <-chan Event { |
|
w := newWatcher(keys) |
|
s.wmu.Lock() |
|
s.watchers[w] = struct{}{} |
|
s.wmu.Unlock() |
|
return w.Chan() |
|
} |
|
|
|
// Close close watcher. |
|
func (s *sven) Close() (err error) { |
|
s.wmu.RLock() |
|
for w := range s.watchers { |
|
w.Close() |
|
} |
|
s.wmu.RUnlock() |
|
return |
|
} |
|
|
|
func (s *sven) fireEvent(event Event) { |
|
s.wmu.RLock() |
|
for w := range s.watchers { |
|
if w.HasKey(event.Key) { |
|
w.Handle(event) |
|
} |
|
} |
|
s.wmu.RUnlock() |
|
} |
|
|
|
func (s *sven) load() (ver int64, err error) { |
|
var ( |
|
v *version |
|
cs []*content |
|
) |
|
if v, err = s.check(-1); err != nil { |
|
log.Error("paladin: s.check(-1) error(%v)", err) |
|
return |
|
} |
|
for i := 0; i < _maxLoadRetries; i++ { |
|
if cs, err = s.config(v); err == nil { |
|
all := make(map[string]*Value, len(cs)) |
|
for _, v := range cs { |
|
all[v.Name] = &Value{val: v.Config, raw: v.Config} |
|
} |
|
s.values.Store(all) |
|
return v.Version, nil |
|
} |
|
log.Error("paladin: s.config(%v) error(%v)", ver, err) |
|
time.Sleep(s.backoff.Backoff(i)) |
|
} |
|
return 0, err |
|
} |
|
|
|
func (s *sven) watchproc(ver int64) { |
|
var retry int |
|
for { |
|
v, err := s.check(ver) |
|
if err != nil { |
|
if ecode.NotModified.Equal(err) { |
|
time.Sleep(time.Second) |
|
continue |
|
} |
|
log.Error("paladin: s.check(%d) error(%v)", ver, err) |
|
retry++ |
|
time.Sleep(s.backoff.Backoff(retry)) |
|
continue |
|
} |
|
cs, err := s.config(v) |
|
if err != nil { |
|
log.Error("paladin: s.config(%v) error(%v)", ver, err) |
|
retry++ |
|
time.Sleep(s.backoff.Backoff(retry)) |
|
continue |
|
} |
|
all := s.values.Load() |
|
news := make(map[string]*Value, len(cs)) |
|
for _, v := range cs { |
|
if _, ok := all[v.Name]; !ok { |
|
go s.fireEvent(Event{Event: EventAdd, Key: v.Name, Value: v.Config}) |
|
} else if v.Config != "" { |
|
go s.fireEvent(Event{Event: EventUpdate, Key: v.Name, Value: v.Config}) |
|
} else { |
|
go s.fireEvent(Event{Event: EventRemove, Key: v.Name, Value: v.Config}) |
|
} |
|
news[v.Name] = &Value{val: v.Config, raw: v.Config} |
|
} |
|
for k, v := range all { |
|
if _, ok := news[k]; !ok { |
|
news[k] = v |
|
} |
|
} |
|
s.values.Store(news) |
|
ver = v.Version |
|
retry = 0 |
|
} |
|
} |
|
|
|
type version struct { |
|
Version int64 `json:"version"` |
|
Diffs []int64 `json:"diffs"` |
|
} |
|
|
|
type config struct { |
|
Version int64 `json:"version"` |
|
Content string `json:"content"` |
|
Md5 string `json:"md5"` |
|
} |
|
|
|
type content struct { |
|
Cid int64 `json:"cid"` |
|
Name string `json:"name"` |
|
Config string `json:"config"` |
|
} |
|
|
|
func (s *sven) check(ver int64) (v *version, err error) { |
|
params := newParams() |
|
params.Set("version", strconv.FormatInt(ver, 10)) |
|
params.Set("appoint", svenAppoint) |
|
var res struct { |
|
Code int `json:"code"` |
|
Data *version `json:"data"` |
|
} |
|
uri := fmt.Sprintf(_apiCheck, svenHost, params.Encode()) |
|
if _debug { |
|
fmt.Printf("paladin: check(%d) uri(%s)\n", ver, uri) |
|
} |
|
req, err := http.NewRequest("GET", uri, nil) |
|
if err != nil { |
|
return |
|
} |
|
resp, err := s.httpCli.Do(req) |
|
if err != nil { |
|
return |
|
} |
|
defer resp.Body.Close() |
|
if resp.StatusCode != http.StatusOK { |
|
err = errors.Errorf("paladin: httpCli.GET(%s) error(%d)", params.Encode(), resp.StatusCode) |
|
return |
|
} |
|
b, err := ioutil.ReadAll(resp.Body) |
|
if err != nil { |
|
return |
|
} |
|
if err = json.Unmarshal(b, &res); err != nil { |
|
return |
|
} |
|
if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) { |
|
err = ec |
|
return |
|
} |
|
if res.Data == nil { |
|
err = errors.Errorf("paladin: http version is nil. params(%s)", params.Encode()) |
|
return |
|
} |
|
v = res.Data |
|
return |
|
} |
|
|
|
func (s *sven) config(ver *version) (cts []*content, err error) { |
|
ids, _ := json.Marshal(ver.Diffs) |
|
params := newParams() |
|
params.Set("version", strconv.FormatInt(ver.Version, 10)) |
|
params.Set("ids", string(ids)) |
|
var res struct { |
|
Code int `json:"code"` |
|
Data *config `json:"data"` |
|
} |
|
uri := fmt.Sprintf(_apiGet, svenHost, params.Encode()) |
|
if _debug { |
|
fmt.Printf("paladin: config(%+v) uri(%s)\n", ver, uri) |
|
} |
|
req, err := http.NewRequest("GET", uri, nil) |
|
if err != nil { |
|
return |
|
} |
|
resp, err := s.httpCli.Do(req) |
|
if err != nil { |
|
return |
|
} |
|
defer resp.Body.Close() |
|
if resp.StatusCode != http.StatusOK { |
|
err = errors.Errorf("paladin: httpCli.GET(%s) error(%d)", params.Encode(), resp.StatusCode) |
|
return |
|
} |
|
b, err := ioutil.ReadAll(resp.Body) |
|
if err != nil { |
|
return |
|
} |
|
if err = json.Unmarshal(b, &res); err != nil { |
|
return |
|
} |
|
if !ecode.Int(res.Code).Equal(ecode.OK) || res.Data == nil { |
|
err = errors.Errorf("paladin: http config is nil. params(%s) ecode(%d)", params.Encode(), res.Code) |
|
return |
|
} |
|
if err = json.Unmarshal([]byte(res.Data.Content), &cts); err != nil { |
|
return |
|
} |
|
for _, c := range cts { |
|
if err = ioutil.WriteFile(path.Join(svenPath, c.Name), []byte(c.Config), 0644); err != nil { |
|
return |
|
} |
|
} |
|
return |
|
} |
|
|
|
func newParams() url.Values { |
|
params := url.Values{} |
|
params.Set("service", serviceName()) |
|
params.Set("build", svenVersion) |
|
params.Set("token", svenToken) |
|
params.Set("hostname", env.Hostname) |
|
params.Set("ip", ipAddr()) |
|
return params |
|
} |
|
|
|
func ipAddr() string { |
|
if env.IP != "" { |
|
return env.IP |
|
} |
|
return xip.InternalIP() |
|
} |
|
|
|
func serviceName() string { |
|
return fmt.Sprintf("%s_%s_%s", svenTreeid, env.DeployEnv, env.Zone) |
|
}
|
|
|