You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
219 lines
4.5 KiB
219 lines
4.5 KiB
package cache |
|
|
|
import ( |
|
"context" |
|
"crypto/md5" |
|
"fmt" |
|
"net/http" |
|
"strings" |
|
"sync" |
|
"sync/atomic" |
|
"time" |
|
|
|
"go-common/library/ecode" |
|
"go-common/library/log" |
|
bm "go-common/library/net/http/blademaster" |
|
"go-common/library/net/http/blademaster/middleware/cache/store" |
|
) |
|
|
|
const ( |
|
_degradeInterval = 60 * 10 |
|
_degradePrefix = "bm.degrade" |
|
) |
|
|
|
var ( |
|
_degradeBytes = []byte(fmt.Sprintf("{\"code\":%d, \"message\":\"\"}", ecode.Degrade)) |
|
) |
|
|
|
// Degrader is the common degrader instance. |
|
type Degrader struct { |
|
lock sync.RWMutex |
|
urls map[string]*state |
|
|
|
expire int32 |
|
ch chan *result |
|
pool sync.Pool // degradeWriter pool |
|
} |
|
|
|
// argsDegrader means the degrade will happened by args policy |
|
type argsDegrader struct { |
|
*Degrader |
|
|
|
args []string |
|
} |
|
|
|
type degradeWriter struct { |
|
*Degrader |
|
|
|
ctx *bm.Context |
|
response http.ResponseWriter |
|
store store.Store |
|
key string |
|
state *state |
|
} |
|
|
|
type state struct { |
|
// FIXME(zhoujiahui): using transient map to avoid potential memory leak? |
|
// record last cached time |
|
sync.RWMutex |
|
gens map[string]*int64 |
|
} |
|
|
|
type result struct { |
|
key string |
|
value []byte |
|
store store.Store |
|
} |
|
|
|
var _ http.ResponseWriter = °radeWriter{} |
|
var _ Policy = &argsDegrader{} |
|
|
|
// NewDegrader will create a new degrade struct |
|
func NewDegrader(expire int32) (d *Degrader) { |
|
d = &Degrader{ |
|
urls: make(map[string]*state), |
|
ch: make(chan *result, 1024), |
|
expire: expire, |
|
} |
|
d.pool.New = func() interface{} { |
|
return °radeWriter{ |
|
Degrader: d, |
|
} |
|
} |
|
|
|
go d.degradeproc() |
|
return |
|
} |
|
|
|
func (d *Degrader) degradeproc() { |
|
for { |
|
r := <-d.ch |
|
if err := r.store.Set(context.Background(), r.key, r.value, d.expire); err != nil { |
|
log.Error("store write key(%s) error(%v)", r.key, err) |
|
} |
|
} |
|
} |
|
|
|
// Args means this path will be degrade by specified args |
|
func (d *Degrader) Args(args ...string) Policy { |
|
return &argsDegrader{ |
|
Degrader: d, |
|
args: args, |
|
} |
|
} |
|
|
|
func (d *Degrader) state(path string) *state { |
|
d.lock.RLock() |
|
s, ok := d.urls[path] |
|
d.lock.RUnlock() |
|
if !ok { |
|
s = &state{ |
|
gens: make(map[string]*int64), |
|
} |
|
d.lock.Lock() |
|
d.urls[path] = s |
|
d.lock.Unlock() |
|
} |
|
return s |
|
} |
|
|
|
// Key is used to identify response cache key in most key-value store |
|
func (ad *argsDegrader) Key(ctx *bm.Context) string { |
|
req := ctx.Request |
|
path := req.URL.Path |
|
params := req.Form |
|
|
|
vs := make([]string, 0, len(ad.args)) |
|
for _, arg := range ad.args { |
|
vs = append(vs, params.Get(arg)) |
|
} |
|
return fmt.Sprintf("%s:%s_%x", _degradePrefix, strings.Replace(path, "/", "_", -1), md5.Sum([]byte(strings.Join(vs, "-")))) |
|
} |
|
|
|
// Handler is used to execute degrade service |
|
func (ad *argsDegrader) Handler(store store.Store) bm.HandlerFunc { |
|
return func(ctx *bm.Context) { |
|
req := ctx.Request |
|
path := req.URL.Path |
|
|
|
writer := ad.pool.Get().(*degradeWriter) |
|
writer.response = ctx.Writer |
|
writer.ctx = ctx |
|
writer.store = store |
|
writer.state = ad.state(path) |
|
writer.key = ad.Key(ctx) |
|
|
|
ctx.Writer = writer // replace to degrade writer |
|
ctx.Next() |
|
|
|
ad.pool.Put(writer) |
|
} |
|
} |
|
|
|
func (w *degradeWriter) Header() http.Header { return w.response.Header() } |
|
func (w *degradeWriter) WriteHeader(code int) { w.response.WriteHeader(code) } |
|
|
|
func (w *degradeWriter) Write(data []byte) (size int, err error) { |
|
e := w.ctx.Error |
|
// if an degrade error code is raised from upstream, |
|
// degrade this request directly |
|
if e != nil { |
|
if ec := ecode.Cause(e); ec.Code() == ecode.Degrade.Code() { |
|
return w.write() |
|
} |
|
} |
|
|
|
// write origin response |
|
if size, err = w.response.Write(data); err != nil { |
|
return |
|
} |
|
|
|
// error raised, this is a unsuccessful response |
|
if e != nil { |
|
return |
|
} |
|
|
|
// is required to cache |
|
if !w.state.required(w.key) { |
|
return |
|
} |
|
|
|
// async cache succeeded response for further degradation |
|
select { |
|
case w.ch <- &result{key: w.key, value: data, store: w.store}: |
|
default: |
|
} |
|
|
|
return |
|
} |
|
|
|
func (w *degradeWriter) write() (int, error) { |
|
data, err := w.store.Get(w.ctx, w.key) |
|
if err != nil || len(data) == 0 { |
|
// FIXME(zhoujiahui): The default response data should be respect to render type or content-type header |
|
data = _degradeBytes |
|
} |
|
return w.response.Write(data) |
|
} |
|
|
|
// check is required to cache response |
|
// it depends on last cache time and _degradeInterval |
|
func (st *state) required(key string) bool { |
|
now := time.Now().Unix() |
|
|
|
st.RLock() |
|
pLast, ok := st.gens[key] |
|
st.RUnlock() |
|
if !ok { |
|
st.Lock() |
|
pLast = new(int64) |
|
st.gens[key] = pLast |
|
st.Unlock() |
|
} |
|
|
|
last := atomic.LoadInt64(pLast) |
|
if now-last < _degradeInterval { |
|
return false |
|
} |
|
return atomic.CompareAndSwapInt64(pLast, last, now) |
|
}
|
|
|