You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
214 lines
6.6 KiB
214 lines
6.6 KiB
/* |
|
* |
|
* Copyright 2016 gRPC authors. |
|
* |
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
* you may not use this file except in compliance with the License. |
|
* You may obtain a copy of the License at |
|
* |
|
* http://www.apache.org/licenses/LICENSE-2.0 |
|
* |
|
* Unless required by applicable law or agreed to in writing, software |
|
* distributed under the License is distributed on an "AS IS" BASIS, |
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
* See the License for the specific language governing permissions and |
|
* limitations under the License. |
|
* |
|
*/ |
|
|
|
package grpclb |
|
|
|
import ( |
|
"fmt" |
|
"sync" |
|
"time" |
|
|
|
"google.golang.org/grpc/balancer" |
|
"google.golang.org/grpc/connectivity" |
|
"google.golang.org/grpc/resolver" |
|
) |
|
|
|
// The parent ClientConn should re-resolve when grpclb loses connection to the |
|
// remote balancer. When the ClientConn inside grpclb gets a TransientFailure, |
|
// it calls lbManualResolver.ResolveNow(), which calls parent ClientConn's |
|
// ResolveNow, and eventually results in re-resolve happening in parent |
|
// ClientConn's resolver (DNS for example). |
|
// |
|
// parent |
|
// ClientConn |
|
// +-----------------------------------------------------------------+ |
|
// | parent +---------------------------------+ | |
|
// | DNS ClientConn | grpclb | | |
|
// | resolver balancerWrapper | | | |
|
// | + + | grpclb grpclb | | |
|
// | | | | ManualResolver ClientConn | | |
|
// | | | | + + | | |
|
// | | | | | | Transient | | |
|
// | | | | | | Failure | | |
|
// | | | | | <--------- | | | |
|
// | | | <--------------- | ResolveNow | | | |
|
// | | <--------- | ResolveNow | | | | | |
|
// | | ResolveNow | | | | | | |
|
// | | | | | | | | |
|
// | + + | + + | | |
|
// | +---------------------------------+ | |
|
// +-----------------------------------------------------------------+ |
|
|
|
// lbManualResolver is used by the ClientConn inside grpclb. It's a manual |
|
// resolver with a special ResolveNow() function. |
|
// |
|
// When ResolveNow() is called, it calls ResolveNow() on the parent ClientConn, |
|
// so when grpclb client lose contact with remote balancers, the parent |
|
// ClientConn's resolver will re-resolve. |
|
type lbManualResolver struct { |
|
scheme string |
|
ccr resolver.ClientConn |
|
|
|
ccb balancer.ClientConn |
|
} |
|
|
|
func (r *lbManualResolver) Build(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOption) (resolver.Resolver, error) { |
|
r.ccr = cc |
|
return r, nil |
|
} |
|
|
|
func (r *lbManualResolver) Scheme() string { |
|
return r.scheme |
|
} |
|
|
|
// ResolveNow calls resolveNow on the parent ClientConn. |
|
func (r *lbManualResolver) ResolveNow(o resolver.ResolveNowOption) { |
|
r.ccb.ResolveNow(o) |
|
} |
|
|
|
// Close is a noop for Resolver. |
|
func (*lbManualResolver) Close() {} |
|
|
|
// NewAddress calls cc.NewAddress. |
|
func (r *lbManualResolver) NewAddress(addrs []resolver.Address) { |
|
r.ccr.NewAddress(addrs) |
|
} |
|
|
|
// NewServiceConfig calls cc.NewServiceConfig. |
|
func (r *lbManualResolver) NewServiceConfig(sc string) { |
|
r.ccr.NewServiceConfig(sc) |
|
} |
|
|
|
const subConnCacheTime = time.Second * 10 |
|
|
|
// lbCacheClientConn is a wrapper balancer.ClientConn with a SubConn cache. |
|
// SubConns will be kept in cache for subConnCacheTime before being removed. |
|
// |
|
// Its new and remove methods are updated to do cache first. |
|
type lbCacheClientConn struct { |
|
cc balancer.ClientConn |
|
timeout time.Duration |
|
|
|
mu sync.Mutex |
|
// subConnCache only keeps subConns that are being deleted. |
|
subConnCache map[resolver.Address]*subConnCacheEntry |
|
subConnToAddr map[balancer.SubConn]resolver.Address |
|
} |
|
|
|
type subConnCacheEntry struct { |
|
sc balancer.SubConn |
|
|
|
cancel func() |
|
abortDeleting bool |
|
} |
|
|
|
func newLBCacheClientConn(cc balancer.ClientConn) *lbCacheClientConn { |
|
return &lbCacheClientConn{ |
|
cc: cc, |
|
timeout: subConnCacheTime, |
|
subConnCache: make(map[resolver.Address]*subConnCacheEntry), |
|
subConnToAddr: make(map[balancer.SubConn]resolver.Address), |
|
} |
|
} |
|
|
|
func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { |
|
if len(addrs) != 1 { |
|
return nil, fmt.Errorf("grpclb calling NewSubConn with addrs of length %v", len(addrs)) |
|
} |
|
addrWithoutMD := addrs[0] |
|
addrWithoutMD.Metadata = nil |
|
|
|
ccc.mu.Lock() |
|
defer ccc.mu.Unlock() |
|
if entry, ok := ccc.subConnCache[addrWithoutMD]; ok { |
|
// If entry is in subConnCache, the SubConn was being deleted. |
|
// cancel function will never be nil. |
|
entry.cancel() |
|
delete(ccc.subConnCache, addrWithoutMD) |
|
return entry.sc, nil |
|
} |
|
|
|
scNew, err := ccc.cc.NewSubConn(addrs, opts) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
ccc.subConnToAddr[scNew] = addrWithoutMD |
|
return scNew, nil |
|
} |
|
|
|
func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) { |
|
ccc.mu.Lock() |
|
defer ccc.mu.Unlock() |
|
addr, ok := ccc.subConnToAddr[sc] |
|
if !ok { |
|
return |
|
} |
|
|
|
if entry, ok := ccc.subConnCache[addr]; ok { |
|
if entry.sc != sc { |
|
// This could happen if NewSubConn was called multiple times for the |
|
// same address, and those SubConns are all removed. We remove sc |
|
// immediately here. |
|
delete(ccc.subConnToAddr, sc) |
|
ccc.cc.RemoveSubConn(sc) |
|
} |
|
return |
|
} |
|
|
|
entry := &subConnCacheEntry{ |
|
sc: sc, |
|
} |
|
ccc.subConnCache[addr] = entry |
|
|
|
timer := time.AfterFunc(ccc.timeout, func() { |
|
ccc.mu.Lock() |
|
if entry.abortDeleting { |
|
return |
|
} |
|
ccc.cc.RemoveSubConn(sc) |
|
delete(ccc.subConnToAddr, sc) |
|
delete(ccc.subConnCache, addr) |
|
ccc.mu.Unlock() |
|
}) |
|
entry.cancel = func() { |
|
if !timer.Stop() { |
|
// If stop was not successful, the timer has fired (this can only |
|
// happen in a race). But the deleting function is blocked on ccc.mu |
|
// because the mutex was held by the caller of this function. |
|
// |
|
// Set abortDeleting to true to abort the deleting function. When |
|
// the lock is released, the deleting function will acquire the |
|
// lock, check the value of abortDeleting and return. |
|
entry.abortDeleting = true |
|
} |
|
} |
|
} |
|
|
|
func (ccc *lbCacheClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) { |
|
ccc.cc.UpdateBalancerState(s, p) |
|
} |
|
|
|
func (ccc *lbCacheClientConn) close() { |
|
ccc.mu.Lock() |
|
// Only cancel all existing timers. There's no need to remove SubConns. |
|
for _, entry := range ccc.subConnCache { |
|
entry.cancel() |
|
} |
|
ccc.mu.Unlock() |
|
}
|
|
|