You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
125 lines
4.1 KiB
125 lines
4.1 KiB
/* |
|
* |
|
* Copyright 2017 gRPC authors. |
|
* |
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
* you may not use this file except in compliance with the License. |
|
* You may obtain a copy of the License at |
|
* |
|
* http://www.apache.org/licenses/LICENSE-2.0 |
|
* |
|
* Unless required by applicable law or agreed to in writing, software |
|
* distributed under the License is distributed on an "AS IS" BASIS, |
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
* See the License for the specific language governing permissions and |
|
* limitations under the License. |
|
* |
|
*/ |
|
|
|
//go:generate ./regenerate.sh |
|
|
|
// Package health provides a service that exposes server's health and it must be |
|
// imported to enable support for client-side health checks. |
|
package health |
|
|
|
import ( |
|
"context" |
|
"sync" |
|
|
|
"google.golang.org/grpc/codes" |
|
healthgrpc "google.golang.org/grpc/health/grpc_health_v1" |
|
healthpb "google.golang.org/grpc/health/grpc_health_v1" |
|
"google.golang.org/grpc/status" |
|
) |
|
|
|
// Server implements `service Health`. |
|
type Server struct { |
|
mu sync.Mutex |
|
// statusMap stores the serving status of the services this Server monitors. |
|
statusMap map[string]healthpb.HealthCheckResponse_ServingStatus |
|
updates map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus |
|
} |
|
|
|
// NewServer returns a new Server. |
|
func NewServer() *Server { |
|
return &Server{ |
|
statusMap: map[string]healthpb.HealthCheckResponse_ServingStatus{"": healthpb.HealthCheckResponse_SERVING}, |
|
updates: make(map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus), |
|
} |
|
} |
|
|
|
// Check implements `service Health`. |
|
func (s *Server) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { |
|
s.mu.Lock() |
|
defer s.mu.Unlock() |
|
if servingStatus, ok := s.statusMap[in.Service]; ok { |
|
return &healthpb.HealthCheckResponse{ |
|
Status: servingStatus, |
|
}, nil |
|
} |
|
return nil, status.Error(codes.NotFound, "unknown service") |
|
} |
|
|
|
// Watch implements `service Health`. |
|
func (s *Server) Watch(in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { |
|
service := in.Service |
|
// update channel is used for getting service status updates. |
|
update := make(chan healthpb.HealthCheckResponse_ServingStatus, 1) |
|
s.mu.Lock() |
|
// Puts the initial status to the channel. |
|
if servingStatus, ok := s.statusMap[service]; ok { |
|
update <- servingStatus |
|
} else { |
|
update <- healthpb.HealthCheckResponse_SERVICE_UNKNOWN |
|
} |
|
|
|
// Registers the update channel to the correct place in the updates map. |
|
if _, ok := s.updates[service]; !ok { |
|
s.updates[service] = make(map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus) |
|
} |
|
s.updates[service][stream] = update |
|
defer func() { |
|
s.mu.Lock() |
|
delete(s.updates[service], stream) |
|
s.mu.Unlock() |
|
}() |
|
s.mu.Unlock() |
|
|
|
var lastSentStatus healthpb.HealthCheckResponse_ServingStatus = -1 |
|
for { |
|
select { |
|
// Status updated. Sends the up-to-date status to the client. |
|
case servingStatus := <-update: |
|
if lastSentStatus == servingStatus { |
|
continue |
|
} |
|
lastSentStatus = servingStatus |
|
err := stream.Send(&healthpb.HealthCheckResponse{Status: servingStatus}) |
|
if err != nil { |
|
return status.Error(codes.Canceled, "Stream has ended.") |
|
} |
|
// Context done. Removes the update channel from the updates map. |
|
case <-stream.Context().Done(): |
|
return status.Error(codes.Canceled, "Stream has ended.") |
|
} |
|
} |
|
} |
|
|
|
// SetServingStatus is called when need to reset the serving status of a service |
|
// or insert a new service entry into the statusMap. |
|
func (s *Server) SetServingStatus(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) { |
|
s.mu.Lock() |
|
defer s.mu.Unlock() |
|
|
|
s.statusMap[service] = servingStatus |
|
for _, update := range s.updates[service] { |
|
// Clears previous updates, that are not sent to the client, from the channel. |
|
// This can happen if the client is not reading and the server gets flow control limited. |
|
select { |
|
case <-update: |
|
default: |
|
} |
|
// Puts the most recent update to the channel. |
|
update <- servingStatus |
|
} |
|
}
|
|
|