You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
362 lines
8.6 KiB
362 lines
8.6 KiB
package model |
|
|
|
import ( |
|
"encoding/json" |
|
"strconv" |
|
"sync" |
|
"time" |
|
|
|
"go-common/library/ecode" |
|
"go-common/library/log" |
|
) |
|
|
|
// InstanceStatus Status of instance |
|
// type InstanceStatus uint32 |
|
|
|
const ( |
|
// InstanceStatusUP Ready to receive traffic |
|
InstanceStatusUP = uint32(1) |
|
// InstancestatusWating Intentionally shutdown for traffic |
|
InstancestatusWating = uint32(1) << 1 |
|
) |
|
|
|
func (i *Instance) filter(status uint32) bool { |
|
return status&i.Status > 0 |
|
} |
|
|
|
// Action Replicate type of node |
|
type Action int |
|
|
|
const ( |
|
// Register Replicate the add action to all nodes |
|
Register Action = iota |
|
// Renew Replicate the heartbeat action to all nodes |
|
Renew |
|
// Cancel Replicate the cancel action to all nodes |
|
Cancel |
|
// Weight Replicate the Weight action to all nodes |
|
Weight |
|
// Delete Replicate the Delete action to all nodes |
|
Delete |
|
// Status Replicate the Status action to all nodes |
|
Status |
|
) |
|
|
|
// Instance holds information required for registration with |
|
// <Discovery Server> and to be discovered by other components. |
|
type Instance struct { |
|
Region string `json:"region"` |
|
Zone string `json:"zone"` |
|
Env string `json:"env"` |
|
Appid string `json:"appid"` |
|
Treeid int64 `json:"treeid"` |
|
Hostname string `json:"hostname"` |
|
HTTP string `json:"http"` |
|
RPC string `json:"rpc"` |
|
Version string `json:"version"` |
|
Metadata map[string]string `json:"metadata"` |
|
Addrs []string `json:"addrs"` |
|
// Status enum instance status |
|
Status uint32 `json:"status"` |
|
|
|
// timestamp |
|
RegTimestamp int64 `json:"reg_timestamp"` |
|
UpTimestamp int64 `json:"up_timestamp"` // NOTE: It is latest timestamp that status becomes UP. |
|
RenewTimestamp int64 `json:"renew_timestamp"` |
|
DirtyTimestamp int64 `json:"dirty_timestamp"` |
|
|
|
LatestTimestamp int64 `json:"latest_timestamp"` |
|
} |
|
|
|
// NewInstance new a instance. |
|
func NewInstance(arg *ArgRegister) (i *Instance) { |
|
now := time.Now().UnixNano() |
|
i = &Instance{ |
|
Region: arg.Region, |
|
Zone: arg.Zone, |
|
Env: arg.Env, |
|
Appid: arg.Appid, |
|
Treeid: arg.Treeid, |
|
Hostname: arg.Hostname, |
|
HTTP: arg.HTTP, |
|
RPC: arg.RPC, |
|
Version: arg.Version, |
|
Status: arg.Status, |
|
Addrs: arg.Addrs, |
|
RegTimestamp: now, |
|
UpTimestamp: now, |
|
LatestTimestamp: now, |
|
RenewTimestamp: now, |
|
DirtyTimestamp: now, |
|
} |
|
i.Metadata = make(map[string]string) |
|
if arg.Metadata != "" { |
|
if err := json.Unmarshal([]byte(arg.Metadata), &i.Metadata); err != nil { |
|
log.Error("json unmarshal metadata err %v", err) |
|
} |
|
} |
|
return |
|
} |
|
|
|
// InstanceInfo the info get by consumer. |
|
type InstanceInfo struct { |
|
Instances []*Instance `json:"instances"` |
|
ZoneInstances map[string][]*Instance `json:"zone_instances"` |
|
LatestTimestamp int64 `json:"latest_timestamp"` |
|
LatestTimestampStr string `json:"latest_timestamp_str"` |
|
} |
|
|
|
// Apps app distinguished by zone |
|
type Apps struct { |
|
apps map[string]*App |
|
lock sync.RWMutex |
|
latestTimestamp int64 |
|
} |
|
|
|
// NewApps return new Apps. |
|
func NewApps() *Apps { |
|
return &Apps{ |
|
apps: make(map[string]*App), |
|
} |
|
} |
|
|
|
// NewApp news a app by appid. If ok=false, returns the app of already exist. |
|
func (p *Apps) NewApp(zone, appid string, treeid, lts int64) (a *App, new bool) { |
|
p.lock.Lock() |
|
a, ok := p.apps[zone] |
|
if !ok { |
|
a = NewApp(zone, appid, treeid) |
|
p.apps[zone] = a |
|
} |
|
if lts <= p.latestTimestamp { |
|
// insure increase |
|
lts = p.latestTimestamp + 1 |
|
} |
|
p.latestTimestamp = lts |
|
p.lock.Unlock() |
|
new = !ok |
|
return |
|
} |
|
|
|
// App get app by zone. |
|
func (p *Apps) App(zone string) (as []*App) { |
|
p.lock.RLock() |
|
if zone != "" { |
|
a, ok := p.apps[zone] |
|
if !ok { |
|
p.lock.RUnlock() |
|
return |
|
} |
|
as = []*App{a} |
|
} else { |
|
for _, a := range p.apps { |
|
as = append(as, a) |
|
} |
|
} |
|
p.lock.RUnlock() |
|
return |
|
} |
|
|
|
// Del del app by zone. |
|
func (p *Apps) Del(zone string) { |
|
p.lock.Lock() |
|
delete(p.apps, zone) |
|
p.lock.Unlock() |
|
} |
|
|
|
// InstanceInfo return slice of instances.if up is true,return all status instance else return up status instance |
|
func (p *Apps) InstanceInfo(zone string, latestTime int64, status uint32) (ci *InstanceInfo, err error) { |
|
p.lock.RLock() |
|
defer p.lock.RUnlock() |
|
|
|
if latestTime >= p.latestTimestamp { |
|
err = ecode.NotModified |
|
return |
|
} |
|
ci = &InstanceInfo{ |
|
LatestTimestamp: p.latestTimestamp, |
|
LatestTimestampStr: strconv.FormatInt(p.latestTimestamp/int64(time.Second), 10), |
|
ZoneInstances: make(map[string][]*Instance), |
|
} |
|
var ok bool |
|
for z, app := range p.apps { |
|
if zone == "" || z == zone { |
|
ok = true |
|
as := app.Instances() |
|
if len(as) == 0 { |
|
continue |
|
} |
|
instance := make([]*Instance, 0, len(as)) |
|
for _, i := range as { |
|
// if up is false return all status instance |
|
if i.filter(status) { |
|
// if i.Status == InstanceStatusUP && i.LatestTimestamp > latestTime { // TODO(felix): increase |
|
ni := new(Instance) |
|
*ni = *i |
|
instance = append(instance, ni) |
|
} |
|
} |
|
ci.Instances = append(ci.Instances, instance...) |
|
ci.ZoneInstances[z] = instance |
|
} |
|
} |
|
if !ok { |
|
err = ecode.NothingFound |
|
} else if len(ci.Instances) == 0 { |
|
err = ecode.NotModified |
|
} |
|
return |
|
} |
|
|
|
// UpdateLatest update LatestTimestamp. |
|
func (p *Apps) UpdateLatest(latestTime int64) { |
|
if latestTime <= p.latestTimestamp { |
|
// insure increase |
|
latestTime = p.latestTimestamp + 1 |
|
} |
|
p.latestTimestamp = latestTime |
|
} |
|
|
|
// App Instances distinguished by hostname |
|
type App struct { |
|
AppID string |
|
Treeid int64 |
|
Zone string |
|
instances map[string]*Instance |
|
latestTimestamp int64 |
|
lock sync.RWMutex |
|
} |
|
|
|
// NewApp new App. |
|
func NewApp(zone, appid string, treeid int64) (a *App) { |
|
a = &App{ |
|
Treeid: treeid, |
|
AppID: appid, |
|
Zone: zone, |
|
instances: make(map[string]*Instance), |
|
} |
|
return |
|
} |
|
|
|
// Instances return slice of instances. |
|
func (a *App) Instances() (is []*Instance) { |
|
a.lock.RLock() |
|
is = make([]*Instance, 0, len(a.instances)) |
|
for _, i := range a.instances { |
|
ni := new(Instance) |
|
*ni = *i |
|
is = append(is, ni) |
|
} |
|
a.lock.RUnlock() |
|
return |
|
} |
|
|
|
// NewInstance new a instance. |
|
func (a *App) NewInstance(ni *Instance, latestTime int64) (i *Instance, ok bool) { |
|
i = new(Instance) |
|
a.lock.Lock() |
|
oi, ok := a.instances[ni.Hostname] |
|
if ok { |
|
ni.UpTimestamp = oi.UpTimestamp |
|
if ni.DirtyTimestamp < oi.DirtyTimestamp { |
|
log.Warn("register exist(%v) dirty timestamp over than caller(%v)", oi, ni) |
|
ni = oi |
|
} |
|
} |
|
a.instances[ni.Hostname] = ni |
|
a.updateLatest(latestTime) |
|
*i = *ni |
|
a.lock.Unlock() |
|
ok = !ok |
|
return |
|
} |
|
|
|
// Renew new a instance. |
|
func (a *App) Renew(hostname string) (i *Instance, ok bool) { |
|
i = new(Instance) |
|
a.lock.Lock() |
|
defer a.lock.Unlock() |
|
oi, ok := a.instances[hostname] |
|
if !ok { |
|
return |
|
} |
|
oi.RenewTimestamp = time.Now().UnixNano() |
|
*i = *oi |
|
return |
|
} |
|
|
|
func (a *App) updateLatest(latestTime int64) { |
|
if latestTime <= a.latestTimestamp { |
|
// insure increase |
|
latestTime = a.latestTimestamp + 1 |
|
} |
|
a.latestTimestamp = latestTime |
|
} |
|
|
|
// Cancel cancel a instance. |
|
func (a *App) Cancel(hostname string, latestTime int64) (i *Instance, l int, ok bool) { |
|
i = new(Instance) |
|
a.lock.Lock() |
|
defer a.lock.Unlock() |
|
oi, ok := a.instances[hostname] |
|
if !ok { |
|
return |
|
} |
|
delete(a.instances, hostname) |
|
l = len(a.instances) |
|
oi.LatestTimestamp = latestTime |
|
a.updateLatest(latestTime) |
|
*i = *oi |
|
return |
|
} |
|
|
|
// Len returns the length of instances. |
|
func (a *App) Len() (l int) { |
|
a.lock.RLock() |
|
l = len(a.instances) |
|
a.lock.RUnlock() |
|
return |
|
} |
|
|
|
// Set set new status,metadata of instance . |
|
func (a *App) Set(changes *ArgSet) (ok bool) { |
|
a.lock.Lock() |
|
defer a.lock.Unlock() |
|
var ( |
|
dst *Instance |
|
setTime int64 |
|
) |
|
if changes.SetTimestamp == 0 { |
|
setTime = time.Now().UnixNano() |
|
} |
|
for i, hostname := range changes.Hostname { |
|
if dst, ok = a.instances[hostname]; !ok { |
|
log.Error("Set hostname(%s) not found", hostname) |
|
return |
|
} |
|
if len(changes.Status) != 0 { |
|
if uint32(changes.Status[i]) != InstanceStatusUP && uint32(changes.Status[i]) != InstancestatusWating { |
|
log.Error("SetStatus change status(%d) is error", changes.Status[i]) |
|
ok = false |
|
return |
|
} |
|
dst.Status = uint32(changes.Status[i]) |
|
if dst.Status == InstanceStatusUP { |
|
dst.UpTimestamp = setTime |
|
} |
|
} |
|
if len(changes.Metadata) != 0 { |
|
metadata := make(map[string]string) |
|
if err := json.Unmarshal([]byte(changes.Metadata[i]), &metadata); err != nil { |
|
log.Error("set change metadata err %s", changes.Metadata[i]) |
|
ok = false |
|
return |
|
} |
|
dst.Metadata = metadata |
|
} |
|
dst.LatestTimestamp = setTime |
|
dst.DirtyTimestamp = setTime |
|
} |
|
a.updateLatest(setTime) |
|
return |
|
}
|
|
|