You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1489 lines
43 KiB
1489 lines
43 KiB
/* |
|
* |
|
* Copyright 2014 gRPC authors. |
|
* |
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
* you may not use this file except in compliance with the License. |
|
* You may obtain a copy of the License at |
|
* |
|
* http://www.apache.org/licenses/LICENSE-2.0 |
|
* |
|
* Unless required by applicable law or agreed to in writing, software |
|
* distributed under the License is distributed on an "AS IS" BASIS, |
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
* See the License for the specific language governing permissions and |
|
* limitations under the License. |
|
* |
|
*/ |
|
|
|
package grpc |
|
|
|
import ( |
|
"context" |
|
"errors" |
|
"io" |
|
"math" |
|
"strconv" |
|
"sync" |
|
"time" |
|
|
|
"golang.org/x/net/trace" |
|
"google.golang.org/grpc/balancer" |
|
"google.golang.org/grpc/codes" |
|
"google.golang.org/grpc/connectivity" |
|
"google.golang.org/grpc/encoding" |
|
"google.golang.org/grpc/grpclog" |
|
"google.golang.org/grpc/internal/binarylog" |
|
"google.golang.org/grpc/internal/channelz" |
|
"google.golang.org/grpc/internal/grpcrand" |
|
"google.golang.org/grpc/internal/transport" |
|
"google.golang.org/grpc/metadata" |
|
"google.golang.org/grpc/peer" |
|
"google.golang.org/grpc/stats" |
|
"google.golang.org/grpc/status" |
|
) |
|
|
|
// StreamHandler defines the handler called by gRPC server to complete the |
|
// execution of a streaming RPC. If a StreamHandler returns an error, it |
|
// should be produced by the status package, or else gRPC will use |
|
// codes.Unknown as the status code and err.Error() as the status message |
|
// of the RPC. |
|
type StreamHandler func(srv interface{}, stream ServerStream) error |
|
|
|
// StreamDesc represents a streaming RPC service's method specification. |
|
type StreamDesc struct { |
|
StreamName string |
|
Handler StreamHandler |
|
|
|
// At least one of these is true. |
|
ServerStreams bool |
|
ClientStreams bool |
|
} |
|
|
|
// Stream defines the common interface a client or server stream has to satisfy. |
|
// |
|
// Deprecated: See ClientStream and ServerStream documentation instead. |
|
type Stream interface { |
|
// Deprecated: See ClientStream and ServerStream documentation instead. |
|
Context() context.Context |
|
// Deprecated: See ClientStream and ServerStream documentation instead. |
|
SendMsg(m interface{}) error |
|
// Deprecated: See ClientStream and ServerStream documentation instead. |
|
RecvMsg(m interface{}) error |
|
} |
|
|
|
// ClientStream defines the client-side behavior of a streaming RPC. |
|
// |
|
// All errors returned from ClientStream methods are compatible with the |
|
// status package. |
|
type ClientStream interface { |
|
// Header returns the header metadata received from the server if there |
|
// is any. It blocks if the metadata is not ready to read. |
|
Header() (metadata.MD, error) |
|
// Trailer returns the trailer metadata from the server, if there is any. |
|
// It must only be called after stream.CloseAndRecv has returned, or |
|
// stream.Recv has returned a non-nil error (including io.EOF). |
|
Trailer() metadata.MD |
|
// CloseSend closes the send direction of the stream. It closes the stream |
|
// when non-nil error is met. It is also not safe to call CloseSend |
|
// concurrently with SendMsg. |
|
CloseSend() error |
|
// Context returns the context for this stream. |
|
// |
|
// It should not be called until after Header or RecvMsg has returned. Once |
|
// called, subsequent client-side retries are disabled. |
|
Context() context.Context |
|
// SendMsg is generally called by generated code. On error, SendMsg aborts |
|
// the stream. If the error was generated by the client, the status is |
|
// returned directly; otherwise, io.EOF is returned and the status of |
|
// the stream may be discovered using RecvMsg. |
|
// |
|
// SendMsg blocks until: |
|
// - There is sufficient flow control to schedule m with the transport, or |
|
// - The stream is done, or |
|
// - The stream breaks. |
|
// |
|
// SendMsg does not wait until the message is received by the server. An |
|
// untimely stream closure may result in lost messages. To ensure delivery, |
|
// users should ensure the RPC completed successfully using RecvMsg. |
|
// |
|
// It is safe to have a goroutine calling SendMsg and another goroutine |
|
// calling RecvMsg on the same stream at the same time, but it is not safe |
|
// to call SendMsg on the same stream in different goroutines. It is also |
|
// not safe to call CloseSend concurrently with SendMsg. |
|
SendMsg(m interface{}) error |
|
// RecvMsg blocks until it receives a message into m or the stream is |
|
// done. It returns io.EOF when the stream completes successfully. On |
|
// any other error, the stream is aborted and the error contains the RPC |
|
// status. |
|
// |
|
// It is safe to have a goroutine calling SendMsg and another goroutine |
|
// calling RecvMsg on the same stream at the same time, but it is not |
|
// safe to call RecvMsg on the same stream in different goroutines. |
|
RecvMsg(m interface{}) error |
|
} |
|
|
|
// NewStream creates a new Stream for the client side. This is typically |
|
// called by generated code. ctx is used for the lifetime of the stream. |
|
// |
|
// To ensure resources are not leaked due to the stream returned, one of the following |
|
// actions must be performed: |
|
// |
|
// 1. Call Close on the ClientConn. |
|
// 2. Cancel the context provided. |
|
// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated |
|
// client-streaming RPC, for instance, might use the helper function |
|
// CloseAndRecv (note that CloseSend does not Recv, therefore is not |
|
// guaranteed to release all resources). |
|
// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg. |
|
// |
|
// If none of the above happen, a goroutine and a context will be leaked, and grpc |
|
// will not call the optionally-configured stats handler with a stats.End message. |
|
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { |
|
// allow interceptor to see all applicable call options, which means those |
|
// configured as defaults from dial option as well as per-call options |
|
opts = combine(cc.dopts.callOptions, opts) |
|
|
|
if cc.dopts.streamInt != nil { |
|
return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...) |
|
} |
|
return newClientStream(ctx, desc, cc, method, opts...) |
|
} |
|
|
|
// NewClientStream is a wrapper for ClientConn.NewStream. |
|
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { |
|
return cc.NewStream(ctx, desc, method, opts...) |
|
} |
|
|
|
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { |
|
if channelz.IsOn() { |
|
cc.incrCallsStarted() |
|
defer func() { |
|
if err != nil { |
|
cc.incrCallsFailed() |
|
} |
|
}() |
|
} |
|
c := defaultCallInfo() |
|
// Provide an opportunity for the first RPC to see the first service config |
|
// provided by the resolver. |
|
if err := cc.waitForResolvedAddrs(ctx); err != nil { |
|
return nil, err |
|
} |
|
mc := cc.GetMethodConfig(method) |
|
if mc.WaitForReady != nil { |
|
c.failFast = !*mc.WaitForReady |
|
} |
|
|
|
// Possible context leak: |
|
// The cancel function for the child context we create will only be called |
|
// when RecvMsg returns a non-nil error, if the ClientConn is closed, or if |
|
// an error is generated by SendMsg. |
|
// https://github.com/grpc/grpc-go/issues/1818. |
|
var cancel context.CancelFunc |
|
if mc.Timeout != nil && *mc.Timeout >= 0 { |
|
ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) |
|
} else { |
|
ctx, cancel = context.WithCancel(ctx) |
|
} |
|
defer func() { |
|
if err != nil { |
|
cancel() |
|
} |
|
}() |
|
|
|
for _, o := range opts { |
|
if err := o.before(c); err != nil { |
|
return nil, toRPCErr(err) |
|
} |
|
} |
|
c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize) |
|
c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) |
|
if err := setCallInfoCodec(c); err != nil { |
|
return nil, err |
|
} |
|
|
|
callHdr := &transport.CallHdr{ |
|
Host: cc.authority, |
|
Method: method, |
|
ContentSubtype: c.contentSubtype, |
|
} |
|
|
|
// Set our outgoing compression according to the UseCompressor CallOption, if |
|
// set. In that case, also find the compressor from the encoding package. |
|
// Otherwise, use the compressor configured by the WithCompressor DialOption, |
|
// if set. |
|
var cp Compressor |
|
var comp encoding.Compressor |
|
if ct := c.compressorType; ct != "" { |
|
callHdr.SendCompress = ct |
|
if ct != encoding.Identity { |
|
comp = encoding.GetCompressor(ct) |
|
if comp == nil { |
|
return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct) |
|
} |
|
} |
|
} else if cc.dopts.cp != nil { |
|
callHdr.SendCompress = cc.dopts.cp.Type() |
|
cp = cc.dopts.cp |
|
} |
|
if c.creds != nil { |
|
callHdr.Creds = c.creds |
|
} |
|
var trInfo traceInfo |
|
if EnableTracing { |
|
trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) |
|
trInfo.firstLine.client = true |
|
if deadline, ok := ctx.Deadline(); ok { |
|
trInfo.firstLine.deadline = deadline.Sub(time.Now()) |
|
} |
|
trInfo.tr.LazyLog(&trInfo.firstLine, false) |
|
ctx = trace.NewContext(ctx, trInfo.tr) |
|
} |
|
ctx = newContextWithRPCInfo(ctx, c.failFast) |
|
sh := cc.dopts.copts.StatsHandler |
|
var beginTime time.Time |
|
if sh != nil { |
|
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast}) |
|
beginTime = time.Now() |
|
begin := &stats.Begin{ |
|
Client: true, |
|
BeginTime: beginTime, |
|
FailFast: c.failFast, |
|
} |
|
sh.HandleRPC(ctx, begin) |
|
} |
|
|
|
cs := &clientStream{ |
|
callHdr: callHdr, |
|
ctx: ctx, |
|
methodConfig: &mc, |
|
opts: opts, |
|
callInfo: c, |
|
cc: cc, |
|
desc: desc, |
|
codec: c.codec, |
|
cp: cp, |
|
comp: comp, |
|
cancel: cancel, |
|
beginTime: beginTime, |
|
firstAttempt: true, |
|
} |
|
if !cc.dopts.disableRetry { |
|
cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler) |
|
} |
|
cs.binlog = binarylog.GetMethodLogger(method) |
|
|
|
cs.callInfo.stream = cs |
|
// Only this initial attempt has stats/tracing. |
|
// TODO(dfawley): move to newAttempt when per-attempt stats are implemented. |
|
if err := cs.newAttemptLocked(sh, trInfo); err != nil { |
|
cs.finish(err) |
|
return nil, err |
|
} |
|
|
|
op := func(a *csAttempt) error { return a.newStream() } |
|
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil { |
|
cs.finish(err) |
|
return nil, err |
|
} |
|
|
|
if cs.binlog != nil { |
|
md, _ := metadata.FromOutgoingContext(ctx) |
|
logEntry := &binarylog.ClientHeader{ |
|
OnClientSide: true, |
|
Header: md, |
|
MethodName: method, |
|
Authority: cs.cc.authority, |
|
} |
|
if deadline, ok := ctx.Deadline(); ok { |
|
logEntry.Timeout = deadline.Sub(time.Now()) |
|
if logEntry.Timeout < 0 { |
|
logEntry.Timeout = 0 |
|
} |
|
} |
|
cs.binlog.Log(logEntry) |
|
} |
|
|
|
if desc != unaryStreamDesc { |
|
// Listen on cc and stream contexts to cleanup when the user closes the |
|
// ClientConn or cancels the stream context. In all other cases, an error |
|
// should already be injected into the recv buffer by the transport, which |
|
// the client will eventually receive, and then we will cancel the stream's |
|
// context in clientStream.finish. |
|
go func() { |
|
select { |
|
case <-cc.ctx.Done(): |
|
cs.finish(ErrClientConnClosing) |
|
case <-ctx.Done(): |
|
cs.finish(toRPCErr(ctx.Err())) |
|
} |
|
}() |
|
} |
|
return cs, nil |
|
} |
|
|
|
func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo traceInfo) error { |
|
cs.attempt = &csAttempt{ |
|
cs: cs, |
|
dc: cs.cc.dopts.dc, |
|
statsHandler: sh, |
|
trInfo: trInfo, |
|
} |
|
|
|
if err := cs.ctx.Err(); err != nil { |
|
return toRPCErr(err) |
|
} |
|
t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method) |
|
if err != nil { |
|
return err |
|
} |
|
cs.attempt.t = t |
|
cs.attempt.done = done |
|
return nil |
|
} |
|
|
|
func (a *csAttempt) newStream() error { |
|
cs := a.cs |
|
cs.callHdr.PreviousAttempts = cs.numRetries |
|
s, err := a.t.NewStream(cs.ctx, cs.callHdr) |
|
if err != nil { |
|
return toRPCErr(err) |
|
} |
|
cs.attempt.s = s |
|
cs.attempt.p = &parser{r: s} |
|
return nil |
|
} |
|
|
|
// clientStream implements a client side Stream. |
|
type clientStream struct { |
|
callHdr *transport.CallHdr |
|
opts []CallOption |
|
callInfo *callInfo |
|
cc *ClientConn |
|
desc *StreamDesc |
|
|
|
codec baseCodec |
|
cp Compressor |
|
comp encoding.Compressor |
|
|
|
cancel context.CancelFunc // cancels all attempts |
|
|
|
sentLast bool // sent an end stream |
|
beginTime time.Time |
|
|
|
methodConfig *MethodConfig |
|
|
|
ctx context.Context // the application's context, wrapped by stats/tracing |
|
|
|
retryThrottler *retryThrottler // The throttler active when the RPC began. |
|
|
|
binlog *binarylog.MethodLogger // Binary logger, can be nil. |
|
// serverHeaderBinlogged is a boolean for whether server header has been |
|
// logged. Server header will be logged when the first time one of those |
|
// happens: stream.Header(), stream.Recv(). |
|
// |
|
// It's only read and used by Recv() and Header(), so it doesn't need to be |
|
// synchronized. |
|
serverHeaderBinlogged bool |
|
|
|
mu sync.Mutex |
|
firstAttempt bool // if true, transparent retry is valid |
|
numRetries int // exclusive of transparent retry attempt(s) |
|
numRetriesSincePushback int // retries since pushback; to reset backoff |
|
finished bool // TODO: replace with atomic cmpxchg or sync.Once? |
|
attempt *csAttempt // the active client stream attempt |
|
// TODO(hedging): hedging will have multiple attempts simultaneously. |
|
committed bool // active attempt committed for retry? |
|
buffer []func(a *csAttempt) error // operations to replay on retry |
|
bufferSize int // current size of buffer |
|
} |
|
|
|
// csAttempt implements a single transport stream attempt within a |
|
// clientStream. |
|
type csAttempt struct { |
|
cs *clientStream |
|
t transport.ClientTransport |
|
s *transport.Stream |
|
p *parser |
|
done func(balancer.DoneInfo) |
|
|
|
finished bool |
|
dc Decompressor |
|
decomp encoding.Compressor |
|
decompSet bool |
|
|
|
mu sync.Mutex // guards trInfo.tr |
|
// trInfo.tr is set when created (if EnableTracing is true), |
|
// and cleared when the finish method is called. |
|
trInfo traceInfo |
|
|
|
statsHandler stats.Handler |
|
} |
|
|
|
func (cs *clientStream) commitAttemptLocked() { |
|
cs.committed = true |
|
cs.buffer = nil |
|
} |
|
|
|
func (cs *clientStream) commitAttempt() { |
|
cs.mu.Lock() |
|
cs.commitAttemptLocked() |
|
cs.mu.Unlock() |
|
} |
|
|
|
// shouldRetry returns nil if the RPC should be retried; otherwise it returns |
|
// the error that should be returned by the operation. |
|
func (cs *clientStream) shouldRetry(err error) error { |
|
if cs.attempt.s == nil && !cs.callInfo.failFast { |
|
// In the event of any error from NewStream (attempt.s == nil), we |
|
// never attempted to write anything to the wire, so we can retry |
|
// indefinitely for non-fail-fast RPCs. |
|
return nil |
|
} |
|
if cs.finished || cs.committed { |
|
// RPC is finished or committed; cannot retry. |
|
return err |
|
} |
|
// Wait for the trailers. |
|
if cs.attempt.s != nil { |
|
<-cs.attempt.s.Done() |
|
} |
|
if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) { |
|
// First attempt, wait-for-ready, stream unprocessed: transparently retry. |
|
cs.firstAttempt = false |
|
return nil |
|
} |
|
cs.firstAttempt = false |
|
if cs.cc.dopts.disableRetry { |
|
return err |
|
} |
|
|
|
pushback := 0 |
|
hasPushback := false |
|
if cs.attempt.s != nil { |
|
if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil { |
|
// Context error; stop now. |
|
return toErr |
|
} else if !to { |
|
return err |
|
} |
|
|
|
// TODO(retry): Move down if the spec changes to not check server pushback |
|
// before considering this a failure for throttling. |
|
sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"] |
|
if len(sps) == 1 { |
|
var e error |
|
if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 { |
|
grpclog.Infof("Server retry pushback specified to abort (%q).", sps[0]) |
|
cs.retryThrottler.throttle() // This counts as a failure for throttling. |
|
return err |
|
} |
|
hasPushback = true |
|
} else if len(sps) > 1 { |
|
grpclog.Warningf("Server retry pushback specified multiple values (%q); not retrying.", sps) |
|
cs.retryThrottler.throttle() // This counts as a failure for throttling. |
|
return err |
|
} |
|
} |
|
|
|
var code codes.Code |
|
if cs.attempt.s != nil { |
|
code = cs.attempt.s.Status().Code() |
|
} else { |
|
code = status.Convert(err).Code() |
|
} |
|
|
|
rp := cs.methodConfig.retryPolicy |
|
if rp == nil || !rp.retryableStatusCodes[code] { |
|
return err |
|
} |
|
|
|
// Note: the ordering here is important; we count this as a failure |
|
// only if the code matched a retryable code. |
|
if cs.retryThrottler.throttle() { |
|
return err |
|
} |
|
if cs.numRetries+1 >= rp.maxAttempts { |
|
return err |
|
} |
|
|
|
var dur time.Duration |
|
if hasPushback { |
|
dur = time.Millisecond * time.Duration(pushback) |
|
cs.numRetriesSincePushback = 0 |
|
} else { |
|
fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback)) |
|
cur := float64(rp.initialBackoff) * fact |
|
if max := float64(rp.maxBackoff); cur > max { |
|
cur = max |
|
} |
|
dur = time.Duration(grpcrand.Int63n(int64(cur))) |
|
cs.numRetriesSincePushback++ |
|
} |
|
|
|
// TODO(dfawley): we could eagerly fail here if dur puts us past the |
|
// deadline, but unsure if it is worth doing. |
|
t := time.NewTimer(dur) |
|
select { |
|
case <-t.C: |
|
cs.numRetries++ |
|
return nil |
|
case <-cs.ctx.Done(): |
|
t.Stop() |
|
return status.FromContextError(cs.ctx.Err()).Err() |
|
} |
|
} |
|
|
|
// Returns nil if a retry was performed and succeeded; error otherwise. |
|
func (cs *clientStream) retryLocked(lastErr error) error { |
|
for { |
|
cs.attempt.finish(lastErr) |
|
if err := cs.shouldRetry(lastErr); err != nil { |
|
cs.commitAttemptLocked() |
|
return err |
|
} |
|
if err := cs.newAttemptLocked(nil, traceInfo{}); err != nil { |
|
return err |
|
} |
|
if lastErr = cs.replayBufferLocked(); lastErr == nil { |
|
return nil |
|
} |
|
} |
|
} |
|
|
|
func (cs *clientStream) Context() context.Context { |
|
cs.commitAttempt() |
|
// No need to lock before using attempt, since we know it is committed and |
|
// cannot change. |
|
return cs.attempt.s.Context() |
|
} |
|
|
|
func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error { |
|
cs.mu.Lock() |
|
for { |
|
if cs.committed { |
|
cs.mu.Unlock() |
|
return op(cs.attempt) |
|
} |
|
a := cs.attempt |
|
cs.mu.Unlock() |
|
err := op(a) |
|
cs.mu.Lock() |
|
if a != cs.attempt { |
|
// We started another attempt already. |
|
continue |
|
} |
|
if err == io.EOF { |
|
<-a.s.Done() |
|
} |
|
if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) { |
|
onSuccess() |
|
cs.mu.Unlock() |
|
return err |
|
} |
|
if err := cs.retryLocked(err); err != nil { |
|
cs.mu.Unlock() |
|
return err |
|
} |
|
} |
|
} |
|
|
|
func (cs *clientStream) Header() (metadata.MD, error) { |
|
var m metadata.MD |
|
err := cs.withRetry(func(a *csAttempt) error { |
|
var err error |
|
m, err = a.s.Header() |
|
return toRPCErr(err) |
|
}, cs.commitAttemptLocked) |
|
if err != nil { |
|
cs.finish(err) |
|
return nil, err |
|
} |
|
if cs.binlog != nil && !cs.serverHeaderBinlogged { |
|
// Only log if binary log is on and header has not been logged. |
|
logEntry := &binarylog.ServerHeader{ |
|
OnClientSide: true, |
|
Header: m, |
|
PeerAddr: nil, |
|
} |
|
if peer, ok := peer.FromContext(cs.Context()); ok { |
|
logEntry.PeerAddr = peer.Addr |
|
} |
|
cs.binlog.Log(logEntry) |
|
cs.serverHeaderBinlogged = true |
|
} |
|
return m, err |
|
} |
|
|
|
func (cs *clientStream) Trailer() metadata.MD { |
|
// On RPC failure, we never need to retry, because usage requires that |
|
// RecvMsg() returned a non-nil error before calling this function is valid. |
|
// We would have retried earlier if necessary. |
|
// |
|
// Commit the attempt anyway, just in case users are not following those |
|
// directions -- it will prevent races and should not meaningfully impact |
|
// performance. |
|
cs.commitAttempt() |
|
if cs.attempt.s == nil { |
|
return nil |
|
} |
|
return cs.attempt.s.Trailer() |
|
} |
|
|
|
func (cs *clientStream) replayBufferLocked() error { |
|
a := cs.attempt |
|
for _, f := range cs.buffer { |
|
if err := f(a); err != nil { |
|
return err |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) { |
|
// Note: we still will buffer if retry is disabled (for transparent retries). |
|
if cs.committed { |
|
return |
|
} |
|
cs.bufferSize += sz |
|
if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize { |
|
cs.commitAttemptLocked() |
|
return |
|
} |
|
cs.buffer = append(cs.buffer, op) |
|
} |
|
|
|
func (cs *clientStream) SendMsg(m interface{}) (err error) { |
|
defer func() { |
|
if err != nil && err != io.EOF { |
|
// Call finish on the client stream for errors generated by this SendMsg |
|
// call, as these indicate problems created by this client. (Transport |
|
// errors are converted to an io.EOF error in csAttempt.sendMsg; the real |
|
// error will be returned from RecvMsg eventually in that case, or be |
|
// retried.) |
|
cs.finish(err) |
|
} |
|
}() |
|
if cs.sentLast { |
|
return status.Errorf(codes.Internal, "SendMsg called after CloseSend") |
|
} |
|
if !cs.desc.ClientStreams { |
|
cs.sentLast = true |
|
} |
|
data, err := encode(cs.codec, m) |
|
if err != nil { |
|
return err |
|
} |
|
compData, err := compress(data, cs.cp, cs.comp) |
|
if err != nil { |
|
return err |
|
} |
|
hdr, payload := msgHeader(data, compData) |
|
// TODO(dfawley): should we be checking len(data) instead? |
|
if len(payload) > *cs.callInfo.maxSendMessageSize { |
|
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize) |
|
} |
|
msgBytes := data // Store the pointer before setting to nil. For binary logging. |
|
op := func(a *csAttempt) error { |
|
err := a.sendMsg(m, hdr, payload, data) |
|
// nil out the message and uncomp when replaying; they are only needed for |
|
// stats which is disabled for subsequent attempts. |
|
m, data = nil, nil |
|
return err |
|
} |
|
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) }) |
|
if cs.binlog != nil && err == nil { |
|
cs.binlog.Log(&binarylog.ClientMessage{ |
|
OnClientSide: true, |
|
Message: msgBytes, |
|
}) |
|
} |
|
return |
|
} |
|
|
|
func (cs *clientStream) RecvMsg(m interface{}) error { |
|
if cs.binlog != nil && !cs.serverHeaderBinlogged { |
|
// Call Header() to binary log header if it's not already logged. |
|
cs.Header() |
|
} |
|
var recvInfo *payloadInfo |
|
if cs.binlog != nil { |
|
recvInfo = &payloadInfo{} |
|
} |
|
err := cs.withRetry(func(a *csAttempt) error { |
|
return a.recvMsg(m, recvInfo) |
|
}, cs.commitAttemptLocked) |
|
if cs.binlog != nil && err == nil { |
|
cs.binlog.Log(&binarylog.ServerMessage{ |
|
OnClientSide: true, |
|
Message: recvInfo.uncompressedBytes, |
|
}) |
|
} |
|
if err != nil || !cs.desc.ServerStreams { |
|
// err != nil or non-server-streaming indicates end of stream. |
|
cs.finish(err) |
|
|
|
if cs.binlog != nil { |
|
// finish will not log Trailer. Log Trailer here. |
|
logEntry := &binarylog.ServerTrailer{ |
|
OnClientSide: true, |
|
Trailer: cs.Trailer(), |
|
Err: err, |
|
} |
|
if logEntry.Err == io.EOF { |
|
logEntry.Err = nil |
|
} |
|
if peer, ok := peer.FromContext(cs.Context()); ok { |
|
logEntry.PeerAddr = peer.Addr |
|
} |
|
cs.binlog.Log(logEntry) |
|
} |
|
} |
|
return err |
|
} |
|
|
|
func (cs *clientStream) CloseSend() error { |
|
if cs.sentLast { |
|
// TODO: return an error and finish the stream instead, due to API misuse? |
|
return nil |
|
} |
|
cs.sentLast = true |
|
op := func(a *csAttempt) error { |
|
a.t.Write(a.s, nil, nil, &transport.Options{Last: true}) |
|
// Always return nil; io.EOF is the only error that might make sense |
|
// instead, but there is no need to signal the client to call RecvMsg |
|
// as the only use left for the stream after CloseSend is to call |
|
// RecvMsg. This also matches historical behavior. |
|
return nil |
|
} |
|
cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }) |
|
if cs.binlog != nil { |
|
cs.binlog.Log(&binarylog.ClientHalfClose{ |
|
OnClientSide: true, |
|
}) |
|
} |
|
// We never returned an error here for reasons. |
|
return nil |
|
} |
|
|
|
func (cs *clientStream) finish(err error) { |
|
if err == io.EOF { |
|
// Ending a stream with EOF indicates a success. |
|
err = nil |
|
} |
|
cs.mu.Lock() |
|
if cs.finished { |
|
cs.mu.Unlock() |
|
return |
|
} |
|
cs.finished = true |
|
cs.commitAttemptLocked() |
|
cs.mu.Unlock() |
|
// For binary logging. only log cancel in finish (could be caused by RPC ctx |
|
// canceled or ClientConn closed). Trailer will be logged in RecvMsg. |
|
// |
|
// Only one of cancel or trailer needs to be logged. In the cases where |
|
// users don't call RecvMsg, users must have already canceled the RPC. |
|
if cs.binlog != nil && status.Code(err) == codes.Canceled { |
|
cs.binlog.Log(&binarylog.Cancel{ |
|
OnClientSide: true, |
|
}) |
|
} |
|
if err == nil { |
|
cs.retryThrottler.successfulRPC() |
|
} |
|
if channelz.IsOn() { |
|
if err != nil { |
|
cs.cc.incrCallsFailed() |
|
} else { |
|
cs.cc.incrCallsSucceeded() |
|
} |
|
} |
|
if cs.attempt != nil { |
|
cs.attempt.finish(err) |
|
} |
|
// after functions all rely upon having a stream. |
|
if cs.attempt.s != nil { |
|
for _, o := range cs.opts { |
|
o.after(cs.callInfo) |
|
} |
|
} |
|
cs.cancel() |
|
} |
|
|
|
func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error { |
|
cs := a.cs |
|
if EnableTracing { |
|
a.mu.Lock() |
|
if a.trInfo.tr != nil { |
|
a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) |
|
} |
|
a.mu.Unlock() |
|
} |
|
if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil { |
|
if !cs.desc.ClientStreams { |
|
// For non-client-streaming RPCs, we return nil instead of EOF on error |
|
// because the generated code requires it. finish is not called; RecvMsg() |
|
// will call it with the stream's status independently. |
|
return nil |
|
} |
|
return io.EOF |
|
} |
|
if a.statsHandler != nil { |
|
a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now())) |
|
} |
|
if channelz.IsOn() { |
|
a.t.IncrMsgSent() |
|
} |
|
return nil |
|
} |
|
|
|
func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) { |
|
cs := a.cs |
|
if a.statsHandler != nil && payInfo == nil { |
|
payInfo = &payloadInfo{} |
|
} |
|
|
|
if !a.decompSet { |
|
// Block until we receive headers containing received message encoding. |
|
if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity { |
|
if a.dc == nil || a.dc.Type() != ct { |
|
// No configured decompressor, or it does not match the incoming |
|
// message encoding; attempt to find a registered compressor that does. |
|
a.dc = nil |
|
a.decomp = encoding.GetCompressor(ct) |
|
} |
|
} else { |
|
// No compression is used; disable our decompressor. |
|
a.dc = nil |
|
} |
|
// Only initialize this state once per stream. |
|
a.decompSet = true |
|
} |
|
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp) |
|
if err != nil { |
|
if err == io.EOF { |
|
if statusErr := a.s.Status().Err(); statusErr != nil { |
|
return statusErr |
|
} |
|
return io.EOF // indicates successful end of stream. |
|
} |
|
return toRPCErr(err) |
|
} |
|
if EnableTracing { |
|
a.mu.Lock() |
|
if a.trInfo.tr != nil { |
|
a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) |
|
} |
|
a.mu.Unlock() |
|
} |
|
if a.statsHandler != nil { |
|
a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{ |
|
Client: true, |
|
RecvTime: time.Now(), |
|
Payload: m, |
|
// TODO truncate large payload. |
|
Data: payInfo.uncompressedBytes, |
|
Length: len(payInfo.uncompressedBytes), |
|
}) |
|
} |
|
if channelz.IsOn() { |
|
a.t.IncrMsgRecv() |
|
} |
|
if cs.desc.ServerStreams { |
|
// Subsequent messages should be received by subsequent RecvMsg calls. |
|
return nil |
|
} |
|
// Special handling for non-server-stream rpcs. |
|
// This recv expects EOF or errors, so we don't collect inPayload. |
|
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp) |
|
if err == nil { |
|
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) |
|
} |
|
if err == io.EOF { |
|
return a.s.Status().Err() // non-server streaming Recv returns nil on success |
|
} |
|
return toRPCErr(err) |
|
} |
|
|
|
func (a *csAttempt) finish(err error) { |
|
a.mu.Lock() |
|
if a.finished { |
|
a.mu.Unlock() |
|
return |
|
} |
|
a.finished = true |
|
if err == io.EOF { |
|
// Ending a stream with EOF indicates a success. |
|
err = nil |
|
} |
|
if a.s != nil { |
|
a.t.CloseStream(a.s, err) |
|
} |
|
|
|
if a.done != nil { |
|
br := false |
|
var tr metadata.MD |
|
if a.s != nil { |
|
br = a.s.BytesReceived() |
|
tr = a.s.Trailer() |
|
} |
|
a.done(balancer.DoneInfo{ |
|
Err: err, |
|
Trailer: tr, |
|
BytesSent: a.s != nil, |
|
BytesReceived: br, |
|
}) |
|
} |
|
if a.statsHandler != nil { |
|
end := &stats.End{ |
|
Client: true, |
|
BeginTime: a.cs.beginTime, |
|
EndTime: time.Now(), |
|
Error: err, |
|
} |
|
a.statsHandler.HandleRPC(a.cs.ctx, end) |
|
} |
|
if a.trInfo.tr != nil { |
|
if err == nil { |
|
a.trInfo.tr.LazyPrintf("RPC: [OK]") |
|
} else { |
|
a.trInfo.tr.LazyPrintf("RPC: [%v]", err) |
|
a.trInfo.tr.SetError() |
|
} |
|
a.trInfo.tr.Finish() |
|
a.trInfo.tr = nil |
|
} |
|
a.mu.Unlock() |
|
} |
|
|
|
func (ac *addrConn) newClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, opts ...CallOption) (_ ClientStream, err error) { |
|
ac.mu.Lock() |
|
if ac.transport != t { |
|
ac.mu.Unlock() |
|
return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use") |
|
} |
|
// transition to CONNECTING state when an attempt starts |
|
if ac.state != connectivity.Connecting { |
|
ac.updateConnectivityState(connectivity.Connecting) |
|
ac.cc.handleSubConnStateChange(ac.acbw, ac.state) |
|
} |
|
ac.mu.Unlock() |
|
|
|
if t == nil { |
|
// TODO: return RPC error here? |
|
return nil, errors.New("transport provided is nil") |
|
} |
|
// defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct. |
|
c := &callInfo{} |
|
|
|
for _, o := range opts { |
|
if err := o.before(c); err != nil { |
|
return nil, toRPCErr(err) |
|
} |
|
} |
|
c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) |
|
c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize) |
|
|
|
// Possible context leak: |
|
// The cancel function for the child context we create will only be called |
|
// when RecvMsg returns a non-nil error, if the ClientConn is closed, or if |
|
// an error is generated by SendMsg. |
|
// https://github.com/grpc/grpc-go/issues/1818. |
|
ctx, cancel := context.WithCancel(ctx) |
|
defer func() { |
|
if err != nil { |
|
cancel() |
|
} |
|
}() |
|
|
|
if err := setCallInfoCodec(c); err != nil { |
|
return nil, err |
|
} |
|
|
|
callHdr := &transport.CallHdr{ |
|
Host: ac.cc.authority, |
|
Method: method, |
|
ContentSubtype: c.contentSubtype, |
|
} |
|
|
|
// Set our outgoing compression according to the UseCompressor CallOption, if |
|
// set. In that case, also find the compressor from the encoding package. |
|
// Otherwise, use the compressor configured by the WithCompressor DialOption, |
|
// if set. |
|
var cp Compressor |
|
var comp encoding.Compressor |
|
if ct := c.compressorType; ct != "" { |
|
callHdr.SendCompress = ct |
|
if ct != encoding.Identity { |
|
comp = encoding.GetCompressor(ct) |
|
if comp == nil { |
|
return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct) |
|
} |
|
} |
|
} else if ac.cc.dopts.cp != nil { |
|
callHdr.SendCompress = ac.cc.dopts.cp.Type() |
|
cp = ac.cc.dopts.cp |
|
} |
|
if c.creds != nil { |
|
callHdr.Creds = c.creds |
|
} |
|
|
|
as := &addrConnStream{ |
|
callHdr: callHdr, |
|
ac: ac, |
|
ctx: ctx, |
|
cancel: cancel, |
|
opts: opts, |
|
callInfo: c, |
|
desc: desc, |
|
codec: c.codec, |
|
cp: cp, |
|
comp: comp, |
|
t: t, |
|
} |
|
|
|
as.callInfo.stream = as |
|
s, err := as.t.NewStream(as.ctx, as.callHdr) |
|
if err != nil { |
|
err = toRPCErr(err) |
|
return nil, err |
|
} |
|
as.s = s |
|
as.p = &parser{r: s} |
|
ac.incrCallsStarted() |
|
if desc != unaryStreamDesc { |
|
// Listen on cc and stream contexts to cleanup when the user closes the |
|
// ClientConn or cancels the stream context. In all other cases, an error |
|
// should already be injected into the recv buffer by the transport, which |
|
// the client will eventually receive, and then we will cancel the stream's |
|
// context in clientStream.finish. |
|
go func() { |
|
select { |
|
case <-ac.ctx.Done(): |
|
as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing")) |
|
case <-ctx.Done(): |
|
as.finish(toRPCErr(ctx.Err())) |
|
} |
|
}() |
|
} |
|
return as, nil |
|
} |
|
|
|
type addrConnStream struct { |
|
s *transport.Stream |
|
ac *addrConn |
|
callHdr *transport.CallHdr |
|
cancel context.CancelFunc |
|
opts []CallOption |
|
callInfo *callInfo |
|
t transport.ClientTransport |
|
ctx context.Context |
|
sentLast bool |
|
desc *StreamDesc |
|
codec baseCodec |
|
cp Compressor |
|
comp encoding.Compressor |
|
decompSet bool |
|
dc Decompressor |
|
decomp encoding.Compressor |
|
p *parser |
|
done func(balancer.DoneInfo) |
|
mu sync.Mutex |
|
finished bool |
|
} |
|
|
|
func (as *addrConnStream) Header() (metadata.MD, error) { |
|
m, err := as.s.Header() |
|
if err != nil { |
|
as.finish(toRPCErr(err)) |
|
} |
|
return m, err |
|
} |
|
|
|
func (as *addrConnStream) Trailer() metadata.MD { |
|
return as.s.Trailer() |
|
} |
|
|
|
func (as *addrConnStream) CloseSend() error { |
|
if as.sentLast { |
|
// TODO: return an error and finish the stream instead, due to API misuse? |
|
return nil |
|
} |
|
as.sentLast = true |
|
|
|
as.t.Write(as.s, nil, nil, &transport.Options{Last: true}) |
|
// Always return nil; io.EOF is the only error that might make sense |
|
// instead, but there is no need to signal the client to call RecvMsg |
|
// as the only use left for the stream after CloseSend is to call |
|
// RecvMsg. This also matches historical behavior. |
|
return nil |
|
} |
|
|
|
func (as *addrConnStream) Context() context.Context { |
|
return as.s.Context() |
|
} |
|
|
|
func (as *addrConnStream) SendMsg(m interface{}) (err error) { |
|
defer func() { |
|
if err != nil && err != io.EOF { |
|
// Call finish on the client stream for errors generated by this SendMsg |
|
// call, as these indicate problems created by this client. (Transport |
|
// errors are converted to an io.EOF error in csAttempt.sendMsg; the real |
|
// error will be returned from RecvMsg eventually in that case, or be |
|
// retried.) |
|
as.finish(err) |
|
} |
|
}() |
|
if as.sentLast { |
|
return status.Errorf(codes.Internal, "SendMsg called after CloseSend") |
|
} |
|
if !as.desc.ClientStreams { |
|
as.sentLast = true |
|
} |
|
data, err := encode(as.codec, m) |
|
if err != nil { |
|
return err |
|
} |
|
compData, err := compress(data, as.cp, as.comp) |
|
if err != nil { |
|
return err |
|
} |
|
hdr, payld := msgHeader(data, compData) |
|
// TODO(dfawley): should we be checking len(data) instead? |
|
if len(payld) > *as.callInfo.maxSendMessageSize { |
|
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize) |
|
} |
|
|
|
if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil { |
|
if !as.desc.ClientStreams { |
|
// For non-client-streaming RPCs, we return nil instead of EOF on error |
|
// because the generated code requires it. finish is not called; RecvMsg() |
|
// will call it with the stream's status independently. |
|
return nil |
|
} |
|
return io.EOF |
|
} |
|
|
|
if channelz.IsOn() { |
|
as.t.IncrMsgSent() |
|
} |
|
return nil |
|
} |
|
|
|
func (as *addrConnStream) RecvMsg(m interface{}) (err error) { |
|
defer func() { |
|
if err != nil || !as.desc.ServerStreams { |
|
// err != nil or non-server-streaming indicates end of stream. |
|
as.finish(err) |
|
} |
|
}() |
|
|
|
if !as.decompSet { |
|
// Block until we receive headers containing received message encoding. |
|
if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity { |
|
if as.dc == nil || as.dc.Type() != ct { |
|
// No configured decompressor, or it does not match the incoming |
|
// message encoding; attempt to find a registered compressor that does. |
|
as.dc = nil |
|
as.decomp = encoding.GetCompressor(ct) |
|
} |
|
} else { |
|
// No compression is used; disable our decompressor. |
|
as.dc = nil |
|
} |
|
// Only initialize this state once per stream. |
|
as.decompSet = true |
|
} |
|
err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp) |
|
if err != nil { |
|
if err == io.EOF { |
|
if statusErr := as.s.Status().Err(); statusErr != nil { |
|
return statusErr |
|
} |
|
return io.EOF // indicates successful end of stream. |
|
} |
|
return toRPCErr(err) |
|
} |
|
|
|
if channelz.IsOn() { |
|
as.t.IncrMsgRecv() |
|
} |
|
if as.desc.ServerStreams { |
|
// Subsequent messages should be received by subsequent RecvMsg calls. |
|
return nil |
|
} |
|
|
|
// Special handling for non-server-stream rpcs. |
|
// This recv expects EOF or errors, so we don't collect inPayload. |
|
err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp) |
|
if err == nil { |
|
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) |
|
} |
|
if err == io.EOF { |
|
return as.s.Status().Err() // non-server streaming Recv returns nil on success |
|
} |
|
return toRPCErr(err) |
|
} |
|
|
|
func (as *addrConnStream) finish(err error) { |
|
as.mu.Lock() |
|
if as.finished { |
|
as.mu.Unlock() |
|
return |
|
} |
|
as.finished = true |
|
if err == io.EOF { |
|
// Ending a stream with EOF indicates a success. |
|
err = nil |
|
} |
|
if as.s != nil { |
|
as.t.CloseStream(as.s, err) |
|
} |
|
|
|
if err != nil { |
|
as.ac.incrCallsFailed() |
|
} else { |
|
as.ac.incrCallsSucceeded() |
|
} |
|
as.cancel() |
|
as.mu.Unlock() |
|
} |
|
|
|
// ServerStream defines the server-side behavior of a streaming RPC. |
|
// |
|
// All errors returned from ServerStream methods are compatible with the |
|
// status package. |
|
type ServerStream interface { |
|
// SetHeader sets the header metadata. It may be called multiple times. |
|
// When call multiple times, all the provided metadata will be merged. |
|
// All the metadata will be sent out when one of the following happens: |
|
// - ServerStream.SendHeader() is called; |
|
// - The first response is sent out; |
|
// - An RPC status is sent out (error or success). |
|
SetHeader(metadata.MD) error |
|
// SendHeader sends the header metadata. |
|
// The provided md and headers set by SetHeader() will be sent. |
|
// It fails if called multiple times. |
|
SendHeader(metadata.MD) error |
|
// SetTrailer sets the trailer metadata which will be sent with the RPC status. |
|
// When called more than once, all the provided metadata will be merged. |
|
SetTrailer(metadata.MD) |
|
// Context returns the context for this stream. |
|
Context() context.Context |
|
// SendMsg sends a message. On error, SendMsg aborts the stream and the |
|
// error is returned directly. |
|
// |
|
// SendMsg blocks until: |
|
// - There is sufficient flow control to schedule m with the transport, or |
|
// - The stream is done, or |
|
// - The stream breaks. |
|
// |
|
// SendMsg does not wait until the message is received by the client. An |
|
// untimely stream closure may result in lost messages. |
|
// |
|
// It is safe to have a goroutine calling SendMsg and another goroutine |
|
// calling RecvMsg on the same stream at the same time, but it is not safe |
|
// to call SendMsg on the same stream in different goroutines. |
|
SendMsg(m interface{}) error |
|
// RecvMsg blocks until it receives a message into m or the stream is |
|
// done. It returns io.EOF when the client has performed a CloseSend. On |
|
// any non-EOF error, the stream is aborted and the error contains the |
|
// RPC status. |
|
// |
|
// It is safe to have a goroutine calling SendMsg and another goroutine |
|
// calling RecvMsg on the same stream at the same time, but it is not |
|
// safe to call RecvMsg on the same stream in different goroutines. |
|
RecvMsg(m interface{}) error |
|
} |
|
|
|
// serverStream implements a server side Stream. |
|
type serverStream struct { |
|
ctx context.Context |
|
t transport.ServerTransport |
|
s *transport.Stream |
|
p *parser |
|
codec baseCodec |
|
|
|
cp Compressor |
|
dc Decompressor |
|
comp encoding.Compressor |
|
decomp encoding.Compressor |
|
|
|
maxReceiveMessageSize int |
|
maxSendMessageSize int |
|
trInfo *traceInfo |
|
|
|
statsHandler stats.Handler |
|
|
|
binlog *binarylog.MethodLogger |
|
// serverHeaderBinlogged indicates whether server header has been logged. It |
|
// will happen when one of the following two happens: stream.SendHeader(), |
|
// stream.Send(). |
|
// |
|
// It's only checked in send and sendHeader, doesn't need to be |
|
// synchronized. |
|
serverHeaderBinlogged bool |
|
|
|
mu sync.Mutex // protects trInfo.tr after the service handler runs. |
|
} |
|
|
|
func (ss *serverStream) Context() context.Context { |
|
return ss.ctx |
|
} |
|
|
|
func (ss *serverStream) SetHeader(md metadata.MD) error { |
|
if md.Len() == 0 { |
|
return nil |
|
} |
|
return ss.s.SetHeader(md) |
|
} |
|
|
|
func (ss *serverStream) SendHeader(md metadata.MD) error { |
|
err := ss.t.WriteHeader(ss.s, md) |
|
if ss.binlog != nil && !ss.serverHeaderBinlogged { |
|
h, _ := ss.s.Header() |
|
ss.binlog.Log(&binarylog.ServerHeader{ |
|
Header: h, |
|
}) |
|
ss.serverHeaderBinlogged = true |
|
} |
|
return err |
|
} |
|
|
|
func (ss *serverStream) SetTrailer(md metadata.MD) { |
|
if md.Len() == 0 { |
|
return |
|
} |
|
ss.s.SetTrailer(md) |
|
} |
|
|
|
func (ss *serverStream) SendMsg(m interface{}) (err error) { |
|
defer func() { |
|
if ss.trInfo != nil { |
|
ss.mu.Lock() |
|
if ss.trInfo.tr != nil { |
|
if err == nil { |
|
ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) |
|
} else { |
|
ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) |
|
ss.trInfo.tr.SetError() |
|
} |
|
} |
|
ss.mu.Unlock() |
|
} |
|
if err != nil && err != io.EOF { |
|
st, _ := status.FromError(toRPCErr(err)) |
|
ss.t.WriteStatus(ss.s, st) |
|
// Non-user specified status was sent out. This should be an error |
|
// case (as a server side Cancel maybe). |
|
// |
|
// This is not handled specifically now. User will return a final |
|
// status from the service handler, we will log that error instead. |
|
// This behavior is similar to an interceptor. |
|
} |
|
if channelz.IsOn() && err == nil { |
|
ss.t.IncrMsgSent() |
|
} |
|
}() |
|
data, err := encode(ss.codec, m) |
|
if err != nil { |
|
return err |
|
} |
|
compData, err := compress(data, ss.cp, ss.comp) |
|
if err != nil { |
|
return err |
|
} |
|
hdr, payload := msgHeader(data, compData) |
|
// TODO(dfawley): should we be checking len(data) instead? |
|
if len(payload) > ss.maxSendMessageSize { |
|
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize) |
|
} |
|
if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil { |
|
return toRPCErr(err) |
|
} |
|
if ss.binlog != nil { |
|
if !ss.serverHeaderBinlogged { |
|
h, _ := ss.s.Header() |
|
ss.binlog.Log(&binarylog.ServerHeader{ |
|
Header: h, |
|
}) |
|
ss.serverHeaderBinlogged = true |
|
} |
|
ss.binlog.Log(&binarylog.ServerMessage{ |
|
Message: data, |
|
}) |
|
} |
|
if ss.statsHandler != nil { |
|
ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now())) |
|
} |
|
return nil |
|
} |
|
|
|
func (ss *serverStream) RecvMsg(m interface{}) (err error) { |
|
defer func() { |
|
if ss.trInfo != nil { |
|
ss.mu.Lock() |
|
if ss.trInfo.tr != nil { |
|
if err == nil { |
|
ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) |
|
} else if err != io.EOF { |
|
ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) |
|
ss.trInfo.tr.SetError() |
|
} |
|
} |
|
ss.mu.Unlock() |
|
} |
|
if err != nil && err != io.EOF { |
|
st, _ := status.FromError(toRPCErr(err)) |
|
ss.t.WriteStatus(ss.s, st) |
|
// Non-user specified status was sent out. This should be an error |
|
// case (as a server side Cancel maybe). |
|
// |
|
// This is not handled specifically now. User will return a final |
|
// status from the service handler, we will log that error instead. |
|
// This behavior is similar to an interceptor. |
|
} |
|
if channelz.IsOn() && err == nil { |
|
ss.t.IncrMsgRecv() |
|
} |
|
}() |
|
var payInfo *payloadInfo |
|
if ss.statsHandler != nil || ss.binlog != nil { |
|
payInfo = &payloadInfo{} |
|
} |
|
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil { |
|
if err == io.EOF { |
|
if ss.binlog != nil { |
|
ss.binlog.Log(&binarylog.ClientHalfClose{}) |
|
} |
|
return err |
|
} |
|
if err == io.ErrUnexpectedEOF { |
|
err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) |
|
} |
|
return toRPCErr(err) |
|
} |
|
if ss.statsHandler != nil { |
|
ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{ |
|
RecvTime: time.Now(), |
|
Payload: m, |
|
// TODO truncate large payload. |
|
Data: payInfo.uncompressedBytes, |
|
Length: len(payInfo.uncompressedBytes), |
|
}) |
|
} |
|
if ss.binlog != nil { |
|
ss.binlog.Log(&binarylog.ClientMessage{ |
|
Message: payInfo.uncompressedBytes, |
|
}) |
|
} |
|
return nil |
|
} |
|
|
|
// MethodFromServerStream returns the method string for the input stream. |
|
// The returned string is in the format of "/service/method". |
|
func MethodFromServerStream(stream ServerStream) (string, bool) { |
|
return Method(stream.Context()) |
|
}
|
|
|