You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
163 lines
3.8 KiB
163 lines
3.8 KiB
// Copyright 2014 The Go Authors. All rights reserved. |
|
// Use of this source code is governed by a BSD-style |
|
// license that can be found in the LICENSE file. |
|
|
|
package http2 |
|
|
|
import ( |
|
"errors" |
|
"io" |
|
"sync" |
|
) |
|
|
|
// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like |
|
// io.Pipe except there are no PipeReader/PipeWriter halves, and the |
|
// underlying buffer is an interface. (io.Pipe is always unbuffered) |
|
type pipe struct { |
|
mu sync.Mutex |
|
c sync.Cond // c.L lazily initialized to &p.mu |
|
b pipeBuffer // nil when done reading |
|
err error // read error once empty. non-nil means closed. |
|
breakErr error // immediate read error (caller doesn't see rest of b) |
|
donec chan struct{} // closed on error |
|
readFn func() // optional code to run in Read before error |
|
} |
|
|
|
type pipeBuffer interface { |
|
Len() int |
|
io.Writer |
|
io.Reader |
|
} |
|
|
|
func (p *pipe) Len() int { |
|
p.mu.Lock() |
|
defer p.mu.Unlock() |
|
if p.b == nil { |
|
return 0 |
|
} |
|
return p.b.Len() |
|
} |
|
|
|
// Read waits until data is available and copies bytes |
|
// from the buffer into p. |
|
func (p *pipe) Read(d []byte) (n int, err error) { |
|
p.mu.Lock() |
|
defer p.mu.Unlock() |
|
if p.c.L == nil { |
|
p.c.L = &p.mu |
|
} |
|
for { |
|
if p.breakErr != nil { |
|
return 0, p.breakErr |
|
} |
|
if p.b != nil && p.b.Len() > 0 { |
|
return p.b.Read(d) |
|
} |
|
if p.err != nil { |
|
if p.readFn != nil { |
|
p.readFn() // e.g. copy trailers |
|
p.readFn = nil // not sticky like p.err |
|
} |
|
p.b = nil |
|
return 0, p.err |
|
} |
|
p.c.Wait() |
|
} |
|
} |
|
|
|
var errClosedPipeWrite = errors.New("write on closed buffer") |
|
|
|
// Write copies bytes from p into the buffer and wakes a reader. |
|
// It is an error to write more data than the buffer can hold. |
|
func (p *pipe) Write(d []byte) (n int, err error) { |
|
p.mu.Lock() |
|
defer p.mu.Unlock() |
|
if p.c.L == nil { |
|
p.c.L = &p.mu |
|
} |
|
defer p.c.Signal() |
|
if p.err != nil { |
|
return 0, errClosedPipeWrite |
|
} |
|
if p.breakErr != nil { |
|
return len(d), nil // discard when there is no reader |
|
} |
|
return p.b.Write(d) |
|
} |
|
|
|
// CloseWithError causes the next Read (waking up a current blocked |
|
// Read if needed) to return the provided err after all data has been |
|
// read. |
|
// |
|
// The error must be non-nil. |
|
func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) } |
|
|
|
// BreakWithError causes the next Read (waking up a current blocked |
|
// Read if needed) to return the provided err immediately, without |
|
// waiting for unread data. |
|
func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) } |
|
|
|
// closeWithErrorAndCode is like CloseWithError but also sets some code to run |
|
// in the caller's goroutine before returning the error. |
|
func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) } |
|
|
|
func (p *pipe) closeWithError(dst *error, err error, fn func()) { |
|
if err == nil { |
|
panic("err must be non-nil") |
|
} |
|
p.mu.Lock() |
|
defer p.mu.Unlock() |
|
if p.c.L == nil { |
|
p.c.L = &p.mu |
|
} |
|
defer p.c.Signal() |
|
if *dst != nil { |
|
// Already been done. |
|
return |
|
} |
|
p.readFn = fn |
|
if dst == &p.breakErr { |
|
p.b = nil |
|
} |
|
*dst = err |
|
p.closeDoneLocked() |
|
} |
|
|
|
// requires p.mu be held. |
|
func (p *pipe) closeDoneLocked() { |
|
if p.donec == nil { |
|
return |
|
} |
|
// Close if unclosed. This isn't racy since we always |
|
// hold p.mu while closing. |
|
select { |
|
case <-p.donec: |
|
default: |
|
close(p.donec) |
|
} |
|
} |
|
|
|
// Err returns the error (if any) first set by BreakWithError or CloseWithError. |
|
func (p *pipe) Err() error { |
|
p.mu.Lock() |
|
defer p.mu.Unlock() |
|
if p.breakErr != nil { |
|
return p.breakErr |
|
} |
|
return p.err |
|
} |
|
|
|
// Done returns a channel which is closed if and when this pipe is closed |
|
// with CloseWithError. |
|
func (p *pipe) Done() <-chan struct{} { |
|
p.mu.Lock() |
|
defer p.mu.Unlock() |
|
if p.donec == nil { |
|
p.donec = make(chan struct{}) |
|
if p.err != nil || p.breakErr != nil { |
|
// Already hit an error. |
|
p.closeDoneLocked() |
|
} |
|
} |
|
return p.donec |
|
}
|
|
|