You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
477 lines
15 KiB
477 lines
15 KiB
/* |
|
* |
|
* Copyright 2017 gRPC authors. |
|
* |
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
* you may not use this file except in compliance with the License. |
|
* You may obtain a copy of the License at |
|
* |
|
* http://www.apache.org/licenses/LICENSE-2.0 |
|
* |
|
* Unless required by applicable law or agreed to in writing, software |
|
* distributed under the License is distributed on an "AS IS" BASIS, |
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
* See the License for the specific language governing permissions and |
|
* limitations under the License. |
|
* |
|
*/ |
|
|
|
package roundrobin_test |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"net" |
|
"sync" |
|
"testing" |
|
"time" |
|
|
|
"google.golang.org/grpc" |
|
"google.golang.org/grpc/balancer/roundrobin" |
|
"google.golang.org/grpc/codes" |
|
_ "google.golang.org/grpc/grpclog/glogger" |
|
"google.golang.org/grpc/internal/leakcheck" |
|
"google.golang.org/grpc/peer" |
|
"google.golang.org/grpc/resolver" |
|
"google.golang.org/grpc/resolver/manual" |
|
"google.golang.org/grpc/status" |
|
testpb "google.golang.org/grpc/test/grpc_testing" |
|
) |
|
|
|
type testServer struct { |
|
testpb.TestServiceServer |
|
} |
|
|
|
func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { |
|
return &testpb.Empty{}, nil |
|
} |
|
|
|
func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { |
|
return nil |
|
} |
|
|
|
type test struct { |
|
servers []*grpc.Server |
|
addresses []string |
|
} |
|
|
|
func (t *test) cleanup() { |
|
for _, s := range t.servers { |
|
s.Stop() |
|
} |
|
} |
|
|
|
func startTestServers(count int) (_ *test, err error) { |
|
t := &test{} |
|
|
|
defer func() { |
|
if err != nil { |
|
for _, s := range t.servers { |
|
s.Stop() |
|
} |
|
} |
|
}() |
|
for i := 0; i < count; i++ { |
|
lis, err := net.Listen("tcp", "localhost:0") |
|
if err != nil { |
|
return nil, fmt.Errorf("Failed to listen %v", err) |
|
} |
|
|
|
s := grpc.NewServer() |
|
testpb.RegisterTestServiceServer(s, &testServer{}) |
|
t.servers = append(t.servers, s) |
|
t.addresses = append(t.addresses, lis.Addr().String()) |
|
|
|
go func(s *grpc.Server, l net.Listener) { |
|
s.Serve(l) |
|
}(s, lis) |
|
} |
|
|
|
return t, nil |
|
} |
|
|
|
func TestOneBackend(t *testing.T) { |
|
defer leakcheck.Check(t) |
|
r, cleanup := manual.GenerateAndRegisterManualResolver() |
|
defer cleanup() |
|
|
|
test, err := startTestServers(1) |
|
if err != nil { |
|
t.Fatalf("failed to start servers: %v", err) |
|
} |
|
defer test.cleanup() |
|
|
|
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) |
|
if err != nil { |
|
t.Fatalf("failed to dial: %v", err) |
|
} |
|
defer cc.Close() |
|
testc := testpb.NewTestServiceClient(cc) |
|
// The first RPC should fail because there's no address. |
|
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) |
|
defer cancel() |
|
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { |
|
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) |
|
} |
|
|
|
r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}}) |
|
// The second RPC should succeed. |
|
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { |
|
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) |
|
} |
|
} |
|
|
|
func TestBackendsRoundRobin(t *testing.T) { |
|
defer leakcheck.Check(t) |
|
r, cleanup := manual.GenerateAndRegisterManualResolver() |
|
defer cleanup() |
|
|
|
backendCount := 5 |
|
test, err := startTestServers(backendCount) |
|
if err != nil { |
|
t.Fatalf("failed to start servers: %v", err) |
|
} |
|
defer test.cleanup() |
|
|
|
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) |
|
if err != nil { |
|
t.Fatalf("failed to dial: %v", err) |
|
} |
|
defer cc.Close() |
|
testc := testpb.NewTestServiceClient(cc) |
|
// The first RPC should fail because there's no address. |
|
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) |
|
defer cancel() |
|
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { |
|
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) |
|
} |
|
|
|
var resolvedAddrs []resolver.Address |
|
for i := 0; i < backendCount; i++ { |
|
resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]}) |
|
} |
|
|
|
r.NewAddress(resolvedAddrs) |
|
var p peer.Peer |
|
// Make sure connections to all servers are up. |
|
for si := 0; si < backendCount; si++ { |
|
var connected bool |
|
for i := 0; i < 1000; i++ { |
|
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { |
|
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) |
|
} |
|
if p.Addr.String() == test.addresses[si] { |
|
connected = true |
|
break |
|
} |
|
time.Sleep(time.Millisecond) |
|
} |
|
if !connected { |
|
t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si]) |
|
} |
|
} |
|
|
|
for i := 0; i < 3*backendCount; i++ { |
|
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { |
|
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) |
|
} |
|
if p.Addr.String() != test.addresses[i%backendCount] { |
|
t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String()) |
|
} |
|
} |
|
} |
|
|
|
func TestAddressesRemoved(t *testing.T) { |
|
defer leakcheck.Check(t) |
|
r, cleanup := manual.GenerateAndRegisterManualResolver() |
|
defer cleanup() |
|
|
|
test, err := startTestServers(1) |
|
if err != nil { |
|
t.Fatalf("failed to start servers: %v", err) |
|
} |
|
defer test.cleanup() |
|
|
|
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) |
|
if err != nil { |
|
t.Fatalf("failed to dial: %v", err) |
|
} |
|
defer cc.Close() |
|
testc := testpb.NewTestServiceClient(cc) |
|
// The first RPC should fail because there's no address. |
|
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) |
|
defer cancel() |
|
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { |
|
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) |
|
} |
|
|
|
r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}}) |
|
// The second RPC should succeed. |
|
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { |
|
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) |
|
} |
|
|
|
r.NewAddress([]resolver.Address{}) |
|
for i := 0; i < 1000; i++ { |
|
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) |
|
defer cancel() |
|
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) == codes.DeadlineExceeded { |
|
return |
|
} |
|
time.Sleep(time.Millisecond) |
|
} |
|
t.Fatalf("No RPC failed after removing all addresses, want RPC to fail with DeadlineExceeded") |
|
} |
|
|
|
func TestCloseWithPendingRPC(t *testing.T) { |
|
defer leakcheck.Check(t) |
|
r, cleanup := manual.GenerateAndRegisterManualResolver() |
|
defer cleanup() |
|
|
|
test, err := startTestServers(1) |
|
if err != nil { |
|
t.Fatalf("failed to start servers: %v", err) |
|
} |
|
defer test.cleanup() |
|
|
|
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) |
|
if err != nil { |
|
t.Fatalf("failed to dial: %v", err) |
|
} |
|
testc := testpb.NewTestServiceClient(cc) |
|
|
|
var wg sync.WaitGroup |
|
for i := 0; i < 3; i++ { |
|
wg.Add(1) |
|
go func() { |
|
defer wg.Done() |
|
// This RPC blocks until cc is closed. |
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
|
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) == codes.DeadlineExceeded { |
|
t.Errorf("RPC failed because of deadline after cc is closed; want error the client connection is closing") |
|
} |
|
cancel() |
|
}() |
|
} |
|
cc.Close() |
|
wg.Wait() |
|
} |
|
|
|
func TestNewAddressWhileBlocking(t *testing.T) { |
|
defer leakcheck.Check(t) |
|
r, cleanup := manual.GenerateAndRegisterManualResolver() |
|
defer cleanup() |
|
|
|
test, err := startTestServers(1) |
|
if err != nil { |
|
t.Fatalf("failed to start servers: %v", err) |
|
} |
|
defer test.cleanup() |
|
|
|
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) |
|
if err != nil { |
|
t.Fatalf("failed to dial: %v", err) |
|
} |
|
defer cc.Close() |
|
testc := testpb.NewTestServiceClient(cc) |
|
// The first RPC should fail because there's no address. |
|
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) |
|
defer cancel() |
|
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { |
|
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) |
|
} |
|
|
|
r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}}) |
|
// The second RPC should succeed. |
|
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second) |
|
defer cancel() |
|
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err != nil { |
|
t.Fatalf("EmptyCall() = _, %v, want _, nil", err) |
|
} |
|
|
|
r.NewAddress([]resolver.Address{}) |
|
|
|
var wg sync.WaitGroup |
|
for i := 0; i < 3; i++ { |
|
wg.Add(1) |
|
go func() { |
|
defer wg.Done() |
|
// This RPC blocks until NewAddress is called. |
|
testc.EmptyCall(context.Background(), &testpb.Empty{}) |
|
}() |
|
} |
|
time.Sleep(50 * time.Millisecond) |
|
r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}}) |
|
wg.Wait() |
|
} |
|
|
|
func TestOneServerDown(t *testing.T) { |
|
defer leakcheck.Check(t) |
|
r, cleanup := manual.GenerateAndRegisterManualResolver() |
|
defer cleanup() |
|
|
|
backendCount := 3 |
|
test, err := startTestServers(backendCount) |
|
if err != nil { |
|
t.Fatalf("failed to start servers: %v", err) |
|
} |
|
defer test.cleanup() |
|
|
|
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake()) |
|
if err != nil { |
|
t.Fatalf("failed to dial: %v", err) |
|
} |
|
defer cc.Close() |
|
testc := testpb.NewTestServiceClient(cc) |
|
// The first RPC should fail because there's no address. |
|
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) |
|
defer cancel() |
|
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { |
|
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) |
|
} |
|
|
|
var resolvedAddrs []resolver.Address |
|
for i := 0; i < backendCount; i++ { |
|
resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]}) |
|
} |
|
|
|
r.NewAddress(resolvedAddrs) |
|
var p peer.Peer |
|
// Make sure connections to all servers are up. |
|
for si := 0; si < backendCount; si++ { |
|
var connected bool |
|
for i := 0; i < 1000; i++ { |
|
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { |
|
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) |
|
} |
|
if p.Addr.String() == test.addresses[si] { |
|
connected = true |
|
break |
|
} |
|
time.Sleep(time.Millisecond) |
|
} |
|
if !connected { |
|
t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si]) |
|
} |
|
} |
|
|
|
for i := 0; i < 3*backendCount; i++ { |
|
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { |
|
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) |
|
} |
|
if p.Addr.String() != test.addresses[i%backendCount] { |
|
t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String()) |
|
} |
|
} |
|
|
|
// Stop one server, RPCs should roundrobin among the remaining servers. |
|
backendCount-- |
|
test.servers[backendCount].Stop() |
|
// Loop until see server[backendCount-1] twice without seeing server[backendCount]. |
|
var targetSeen int |
|
for i := 0; i < 1000; i++ { |
|
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { |
|
targetSeen = 0 |
|
t.Logf("EmptyCall() = _, %v, want _, <nil>", err) |
|
// Due to a race, this RPC could possibly get the connection that |
|
// was closing, and this RPC may fail. Keep trying when this |
|
// happens. |
|
continue |
|
} |
|
switch p.Addr.String() { |
|
case test.addresses[backendCount-1]: |
|
targetSeen++ |
|
case test.addresses[backendCount]: |
|
// Reset targetSeen if peer is server[backendCount]. |
|
targetSeen = 0 |
|
} |
|
// Break to make sure the last picked address is server[-1], so the following for loop won't be flaky. |
|
if targetSeen >= 2 { |
|
break |
|
} |
|
} |
|
if targetSeen != 2 { |
|
t.Fatal("Failed to see server[backendCount-1] twice without seeing server[backendCount]") |
|
} |
|
for i := 0; i < 3*backendCount; i++ { |
|
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { |
|
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) |
|
} |
|
if p.Addr.String() != test.addresses[i%backendCount] { |
|
t.Errorf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String()) |
|
} |
|
} |
|
} |
|
|
|
func TestAllServersDown(t *testing.T) { |
|
defer leakcheck.Check(t) |
|
r, cleanup := manual.GenerateAndRegisterManualResolver() |
|
defer cleanup() |
|
|
|
backendCount := 3 |
|
test, err := startTestServers(backendCount) |
|
if err != nil { |
|
t.Fatalf("failed to start servers: %v", err) |
|
} |
|
defer test.cleanup() |
|
|
|
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake()) |
|
if err != nil { |
|
t.Fatalf("failed to dial: %v", err) |
|
} |
|
defer cc.Close() |
|
testc := testpb.NewTestServiceClient(cc) |
|
// The first RPC should fail because there's no address. |
|
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) |
|
defer cancel() |
|
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { |
|
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) |
|
} |
|
|
|
var resolvedAddrs []resolver.Address |
|
for i := 0; i < backendCount; i++ { |
|
resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]}) |
|
} |
|
|
|
r.NewAddress(resolvedAddrs) |
|
var p peer.Peer |
|
// Make sure connections to all servers are up. |
|
for si := 0; si < backendCount; si++ { |
|
var connected bool |
|
for i := 0; i < 1000; i++ { |
|
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { |
|
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) |
|
} |
|
if p.Addr.String() == test.addresses[si] { |
|
connected = true |
|
break |
|
} |
|
time.Sleep(time.Millisecond) |
|
} |
|
if !connected { |
|
t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si]) |
|
} |
|
} |
|
|
|
for i := 0; i < 3*backendCount; i++ { |
|
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { |
|
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) |
|
} |
|
if p.Addr.String() != test.addresses[i%backendCount] { |
|
t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String()) |
|
} |
|
} |
|
|
|
// All servers are stopped, failfast RPC should fail with unavailable. |
|
for i := 0; i < backendCount; i++ { |
|
test.servers[i].Stop() |
|
} |
|
time.Sleep(100 * time.Millisecond) |
|
for i := 0; i < 1000; i++ { |
|
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) == codes.Unavailable { |
|
return |
|
} |
|
time.Sleep(time.Millisecond) |
|
} |
|
t.Fatalf("Failfast RPCs didn't fail with Unavailable after all servers are stopped") |
|
}
|
|
|