You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
932 lines
26 KiB
932 lines
26 KiB
package sarama |
|
|
|
import ( |
|
"encoding/binary" |
|
"fmt" |
|
"sync" |
|
"time" |
|
|
|
"github.com/eapache/go-resiliency/breaker" |
|
"github.com/eapache/queue" |
|
) |
|
|
|
// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages |
|
// to the correct broker for the provided topic-partition, refreshing metadata as appropriate, |
|
// and parses responses for errors. You must read from the Errors() channel or the |
|
// producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid |
|
// leaks: it will not be garbage-collected automatically when it passes out of |
|
// scope. |
|
type AsyncProducer interface { |
|
|
|
// AsyncClose triggers a shutdown of the producer. The shutdown has completed |
|
// when both the Errors and Successes channels have been closed. When calling |
|
// AsyncClose, you *must* continue to read from those channels in order to |
|
// drain the results of any messages in flight. |
|
AsyncClose() |
|
|
|
// Close shuts down the producer and waits for any buffered messages to be |
|
// flushed. You must call this function before a producer object passes out of |
|
// scope, as it may otherwise leak memory. You must call this before calling |
|
// Close on the underlying client. |
|
Close() error |
|
|
|
// Input is the input channel for the user to write messages to that they |
|
// wish to send. |
|
Input() chan<- *ProducerMessage |
|
|
|
// Successes is the success output channel back to the user when Return.Successes is |
|
// enabled. If Return.Successes is true, you MUST read from this channel or the |
|
// Producer will deadlock. It is suggested that you send and read messages |
|
// together in a single select statement. |
|
Successes() <-chan *ProducerMessage |
|
|
|
// Errors is the error output channel back to the user. You MUST read from this |
|
// channel or the Producer will deadlock when the channel is full. Alternatively, |
|
// you can set Producer.Return.Errors in your config to false, which prevents |
|
// errors to be returned. |
|
Errors() <-chan *ProducerError |
|
} |
|
|
|
type asyncProducer struct { |
|
client Client |
|
conf *Config |
|
ownClient bool |
|
|
|
errors chan *ProducerError |
|
input, successes, retries chan *ProducerMessage |
|
inFlight sync.WaitGroup |
|
|
|
brokers map[*Broker]chan<- *ProducerMessage |
|
brokerRefs map[chan<- *ProducerMessage]int |
|
brokerLock sync.Mutex |
|
} |
|
|
|
// NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration. |
|
func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) { |
|
client, err := NewClient(addrs, conf) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
p, err := NewAsyncProducerFromClient(client) |
|
if err != nil { |
|
return nil, err |
|
} |
|
p.(*asyncProducer).ownClient = true |
|
return p, nil |
|
} |
|
|
|
// NewAsyncProducerFromClient creates a new Producer using the given client. It is still |
|
// necessary to call Close() on the underlying client when shutting down this producer. |
|
func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) { |
|
// Check that we are not dealing with a closed Client before processing any other arguments |
|
if client.Closed() { |
|
return nil, ErrClosedClient |
|
} |
|
|
|
p := &asyncProducer{ |
|
client: client, |
|
conf: client.Config(), |
|
errors: make(chan *ProducerError), |
|
input: make(chan *ProducerMessage), |
|
successes: make(chan *ProducerMessage), |
|
retries: make(chan *ProducerMessage), |
|
brokers: make(map[*Broker]chan<- *ProducerMessage), |
|
brokerRefs: make(map[chan<- *ProducerMessage]int), |
|
} |
|
|
|
// launch our singleton dispatchers |
|
go withRecover(p.dispatcher) |
|
go withRecover(p.retryHandler) |
|
|
|
return p, nil |
|
} |
|
|
|
type flagSet int8 |
|
|
|
const ( |
|
syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer |
|
fin // final message from partitionProducer to brokerProducer and back |
|
shutdown // start the shutdown process |
|
) |
|
|
|
// ProducerMessage is the collection of elements passed to the Producer in order to send a message. |
|
type ProducerMessage struct { |
|
Topic string // The Kafka topic for this message. |
|
// The partitioning key for this message. Pre-existing Encoders include |
|
// StringEncoder and ByteEncoder. |
|
Key Encoder |
|
// The actual message to store in Kafka. Pre-existing Encoders include |
|
// StringEncoder and ByteEncoder. |
|
Value Encoder |
|
|
|
// The headers are key-value pairs that are transparently passed |
|
// by Kafka between producers and consumers. |
|
Headers []RecordHeader |
|
|
|
// This field is used to hold arbitrary data you wish to include so it |
|
// will be available when receiving on the Successes and Errors channels. |
|
// Sarama completely ignores this field and is only to be used for |
|
// pass-through data. |
|
Metadata interface{} |
|
|
|
// Below this point are filled in by the producer as the message is processed |
|
|
|
// Offset is the offset of the message stored on the broker. This is only |
|
// guaranteed to be defined if the message was successfully delivered and |
|
// RequiredAcks is not NoResponse. |
|
Offset int64 |
|
// Partition is the partition that the message was sent to. This is only |
|
// guaranteed to be defined if the message was successfully delivered. |
|
Partition int32 |
|
// Timestamp is the timestamp assigned to the message by the broker. This |
|
// is only guaranteed to be defined if the message was successfully |
|
// delivered, RequiredAcks is not NoResponse, and the Kafka broker is at |
|
// least version 0.10.0. |
|
Timestamp time.Time |
|
|
|
retries int |
|
flags flagSet |
|
expectation chan *ProducerError |
|
} |
|
|
|
const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. |
|
|
|
func (m *ProducerMessage) byteSize(version int) int { |
|
var size int |
|
if version >= 2 { |
|
size = maximumRecordOverhead |
|
for _, h := range m.Headers { |
|
size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32 |
|
} |
|
} else { |
|
size = producerMessageOverhead |
|
} |
|
if m.Key != nil { |
|
size += m.Key.Length() |
|
} |
|
if m.Value != nil { |
|
size += m.Value.Length() |
|
} |
|
return size |
|
} |
|
|
|
func (m *ProducerMessage) clear() { |
|
m.flags = 0 |
|
m.retries = 0 |
|
} |
|
|
|
// ProducerError is the type of error generated when the producer fails to deliver a message. |
|
// It contains the original ProducerMessage as well as the actual error value. |
|
type ProducerError struct { |
|
Msg *ProducerMessage |
|
Err error |
|
} |
|
|
|
func (pe ProducerError) Error() string { |
|
return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err) |
|
} |
|
|
|
// ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface. |
|
// It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel |
|
// when closing a producer. |
|
type ProducerErrors []*ProducerError |
|
|
|
func (pe ProducerErrors) Error() string { |
|
return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe)) |
|
} |
|
|
|
func (p *asyncProducer) Errors() <-chan *ProducerError { |
|
return p.errors |
|
} |
|
|
|
func (p *asyncProducer) Successes() <-chan *ProducerMessage { |
|
return p.successes |
|
} |
|
|
|
func (p *asyncProducer) Input() chan<- *ProducerMessage { |
|
return p.input |
|
} |
|
|
|
func (p *asyncProducer) Close() error { |
|
p.AsyncClose() |
|
|
|
if p.conf.Producer.Return.Successes { |
|
go withRecover(func() { |
|
for range p.successes { |
|
} |
|
}) |
|
} |
|
|
|
var errors ProducerErrors |
|
if p.conf.Producer.Return.Errors { |
|
for event := range p.errors { |
|
errors = append(errors, event) |
|
} |
|
} else { |
|
<-p.errors |
|
} |
|
|
|
if len(errors) > 0 { |
|
return errors |
|
} |
|
return nil |
|
} |
|
|
|
func (p *asyncProducer) AsyncClose() { |
|
go withRecover(p.shutdown) |
|
} |
|
|
|
// singleton |
|
// dispatches messages by topic |
|
func (p *asyncProducer) dispatcher() { |
|
handlers := make(map[string]chan<- *ProducerMessage) |
|
shuttingDown := false |
|
|
|
for msg := range p.input { |
|
if msg == nil { |
|
Logger.Println("Something tried to send a nil message, it was ignored.") |
|
continue |
|
} |
|
|
|
if msg.flags&shutdown != 0 { |
|
shuttingDown = true |
|
p.inFlight.Done() |
|
continue |
|
} else if msg.retries == 0 { |
|
if shuttingDown { |
|
// we can't just call returnError here because that decrements the wait group, |
|
// which hasn't been incremented yet for this message, and shouldn't be |
|
pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown} |
|
if p.conf.Producer.Return.Errors { |
|
p.errors <- pErr |
|
} else { |
|
Logger.Println(pErr) |
|
} |
|
continue |
|
} |
|
p.inFlight.Add(1) |
|
} |
|
|
|
version := 1 |
|
if p.conf.Version.IsAtLeast(V0_11_0_0) { |
|
version = 2 |
|
} else if msg.Headers != nil { |
|
p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11")) |
|
continue |
|
} |
|
if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes { |
|
p.returnError(msg, ErrMessageSizeTooLarge) |
|
continue |
|
} |
|
|
|
handler := handlers[msg.Topic] |
|
if handler == nil { |
|
handler = p.newTopicProducer(msg.Topic) |
|
handlers[msg.Topic] = handler |
|
} |
|
|
|
handler <- msg |
|
} |
|
|
|
for _, handler := range handlers { |
|
close(handler) |
|
} |
|
} |
|
|
|
// one per topic |
|
// partitions messages, then dispatches them by partition |
|
type topicProducer struct { |
|
parent *asyncProducer |
|
topic string |
|
input <-chan *ProducerMessage |
|
|
|
breaker *breaker.Breaker |
|
handlers map[int32]chan<- *ProducerMessage |
|
partitioner Partitioner |
|
} |
|
|
|
func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage { |
|
input := make(chan *ProducerMessage, p.conf.ChannelBufferSize) |
|
tp := &topicProducer{ |
|
parent: p, |
|
topic: topic, |
|
input: input, |
|
breaker: breaker.New(3, 1, 10*time.Second), |
|
handlers: make(map[int32]chan<- *ProducerMessage), |
|
partitioner: p.conf.Producer.Partitioner(topic), |
|
} |
|
go withRecover(tp.dispatch) |
|
return input |
|
} |
|
|
|
func (tp *topicProducer) dispatch() { |
|
for msg := range tp.input { |
|
if msg.retries == 0 { |
|
if err := tp.partitionMessage(msg); err != nil { |
|
tp.parent.returnError(msg, err) |
|
continue |
|
} |
|
} |
|
|
|
handler := tp.handlers[msg.Partition] |
|
if handler == nil { |
|
handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition) |
|
tp.handlers[msg.Partition] = handler |
|
} |
|
|
|
handler <- msg |
|
} |
|
|
|
for _, handler := range tp.handlers { |
|
close(handler) |
|
} |
|
} |
|
|
|
func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error { |
|
var partitions []int32 |
|
|
|
err := tp.breaker.Run(func() (err error) { |
|
var requiresConsistency = false |
|
if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok { |
|
requiresConsistency = ep.MessageRequiresConsistency(msg) |
|
} else { |
|
requiresConsistency = tp.partitioner.RequiresConsistency() |
|
} |
|
|
|
if requiresConsistency { |
|
partitions, err = tp.parent.client.Partitions(msg.Topic) |
|
} else { |
|
partitions, err = tp.parent.client.WritablePartitions(msg.Topic) |
|
} |
|
return |
|
}) |
|
|
|
if err != nil { |
|
return err |
|
} |
|
|
|
numPartitions := int32(len(partitions)) |
|
|
|
if numPartitions == 0 { |
|
return ErrLeaderNotAvailable |
|
} |
|
|
|
choice, err := tp.partitioner.Partition(msg, numPartitions) |
|
|
|
if err != nil { |
|
return err |
|
} else if choice < 0 || choice >= numPartitions { |
|
return ErrInvalidPartition |
|
} |
|
|
|
msg.Partition = partitions[choice] |
|
|
|
return nil |
|
} |
|
|
|
// one per partition per topic |
|
// dispatches messages to the appropriate broker |
|
// also responsible for maintaining message order during retries |
|
type partitionProducer struct { |
|
parent *asyncProducer |
|
topic string |
|
partition int32 |
|
input <-chan *ProducerMessage |
|
|
|
leader *Broker |
|
breaker *breaker.Breaker |
|
output chan<- *ProducerMessage |
|
|
|
// highWatermark tracks the "current" retry level, which is the only one where we actually let messages through, |
|
// all other messages get buffered in retryState[msg.retries].buf to preserve ordering |
|
// retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and |
|
// therefore whether our buffer is complete and safe to flush) |
|
highWatermark int |
|
retryState []partitionRetryState |
|
} |
|
|
|
type partitionRetryState struct { |
|
buf []*ProducerMessage |
|
expectChaser bool |
|
} |
|
|
|
func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage { |
|
input := make(chan *ProducerMessage, p.conf.ChannelBufferSize) |
|
pp := &partitionProducer{ |
|
parent: p, |
|
topic: topic, |
|
partition: partition, |
|
input: input, |
|
|
|
breaker: breaker.New(3, 1, 10*time.Second), |
|
retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1), |
|
} |
|
go withRecover(pp.dispatch) |
|
return input |
|
} |
|
|
|
func (pp *partitionProducer) dispatch() { |
|
// try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader` |
|
// on the first message |
|
pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition) |
|
if pp.leader != nil { |
|
pp.output = pp.parent.getBrokerProducer(pp.leader) |
|
pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight |
|
pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn} |
|
} |
|
|
|
for msg := range pp.input { |
|
if msg.retries > pp.highWatermark { |
|
// a new, higher, retry level; handle it and then back off |
|
pp.newHighWatermark(msg.retries) |
|
time.Sleep(pp.parent.conf.Producer.Retry.Backoff) |
|
} else if pp.highWatermark > 0 { |
|
// we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level |
|
if msg.retries < pp.highWatermark { |
|
// in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin) |
|
if msg.flags&fin == fin { |
|
pp.retryState[msg.retries].expectChaser = false |
|
pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected |
|
} else { |
|
pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg) |
|
} |
|
continue |
|
} else if msg.flags&fin == fin { |
|
// this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set, |
|
// meaning this retry level is done and we can go down (at least) one level and flush that |
|
pp.retryState[pp.highWatermark].expectChaser = false |
|
pp.flushRetryBuffers() |
|
pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected |
|
continue |
|
} |
|
} |
|
|
|
// if we made it this far then the current msg contains real data, and can be sent to the next goroutine |
|
// without breaking any of our ordering guarantees |
|
|
|
if pp.output == nil { |
|
if err := pp.updateLeader(); err != nil { |
|
pp.parent.returnError(msg, err) |
|
time.Sleep(pp.parent.conf.Producer.Retry.Backoff) |
|
continue |
|
} |
|
Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID()) |
|
} |
|
|
|
pp.output <- msg |
|
} |
|
|
|
if pp.output != nil { |
|
pp.parent.unrefBrokerProducer(pp.leader, pp.output) |
|
} |
|
} |
|
|
|
func (pp *partitionProducer) newHighWatermark(hwm int) { |
|
Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm) |
|
pp.highWatermark = hwm |
|
|
|
// send off a fin so that we know when everything "in between" has made it |
|
// back to us and we can safely flush the backlog (otherwise we risk re-ordering messages) |
|
pp.retryState[pp.highWatermark].expectChaser = true |
|
pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight |
|
pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1} |
|
|
|
// a new HWM means that our current broker selection is out of date |
|
Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID()) |
|
pp.parent.unrefBrokerProducer(pp.leader, pp.output) |
|
pp.output = nil |
|
} |
|
|
|
func (pp *partitionProducer) flushRetryBuffers() { |
|
Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark) |
|
for { |
|
pp.highWatermark-- |
|
|
|
if pp.output == nil { |
|
if err := pp.updateLeader(); err != nil { |
|
pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err) |
|
goto flushDone |
|
} |
|
Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID()) |
|
} |
|
|
|
for _, msg := range pp.retryState[pp.highWatermark].buf { |
|
pp.output <- msg |
|
} |
|
|
|
flushDone: |
|
pp.retryState[pp.highWatermark].buf = nil |
|
if pp.retryState[pp.highWatermark].expectChaser { |
|
Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark) |
|
break |
|
} else if pp.highWatermark == 0 { |
|
Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition) |
|
break |
|
} |
|
} |
|
} |
|
|
|
func (pp *partitionProducer) updateLeader() error { |
|
return pp.breaker.Run(func() (err error) { |
|
if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil { |
|
return err |
|
} |
|
|
|
if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil { |
|
return err |
|
} |
|
|
|
pp.output = pp.parent.getBrokerProducer(pp.leader) |
|
pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight |
|
pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn} |
|
|
|
return nil |
|
}) |
|
} |
|
|
|
// one per broker; also constructs an associated flusher |
|
func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage { |
|
var ( |
|
input = make(chan *ProducerMessage) |
|
bridge = make(chan *produceSet) |
|
responses = make(chan *brokerProducerResponse) |
|
) |
|
|
|
bp := &brokerProducer{ |
|
parent: p, |
|
broker: broker, |
|
input: input, |
|
output: bridge, |
|
responses: responses, |
|
buffer: newProduceSet(p), |
|
currentRetries: make(map[string]map[int32]error), |
|
} |
|
go withRecover(bp.run) |
|
|
|
// minimal bridge to make the network response `select`able |
|
go withRecover(func() { |
|
for set := range bridge { |
|
request := set.buildRequest() |
|
|
|
response, err := broker.Produce(request) |
|
|
|
responses <- &brokerProducerResponse{ |
|
set: set, |
|
err: err, |
|
res: response, |
|
} |
|
} |
|
close(responses) |
|
}) |
|
|
|
return input |
|
} |
|
|
|
type brokerProducerResponse struct { |
|
set *produceSet |
|
err error |
|
res *ProduceResponse |
|
} |
|
|
|
// groups messages together into appropriately-sized batches for sending to the broker |
|
// handles state related to retries etc |
|
type brokerProducer struct { |
|
parent *asyncProducer |
|
broker *Broker |
|
|
|
input <-chan *ProducerMessage |
|
output chan<- *produceSet |
|
responses <-chan *brokerProducerResponse |
|
|
|
buffer *produceSet |
|
timer <-chan time.Time |
|
timerFired bool |
|
|
|
closing error |
|
currentRetries map[string]map[int32]error |
|
} |
|
|
|
func (bp *brokerProducer) run() { |
|
var output chan<- *produceSet |
|
Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID()) |
|
|
|
for { |
|
select { |
|
case msg := <-bp.input: |
|
if msg == nil { |
|
bp.shutdown() |
|
return |
|
} |
|
|
|
if msg.flags&syn == syn { |
|
Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n", |
|
bp.broker.ID(), msg.Topic, msg.Partition) |
|
if bp.currentRetries[msg.Topic] == nil { |
|
bp.currentRetries[msg.Topic] = make(map[int32]error) |
|
} |
|
bp.currentRetries[msg.Topic][msg.Partition] = nil |
|
bp.parent.inFlight.Done() |
|
continue |
|
} |
|
|
|
if reason := bp.needsRetry(msg); reason != nil { |
|
bp.parent.retryMessage(msg, reason) |
|
|
|
if bp.closing == nil && msg.flags&fin == fin { |
|
// we were retrying this partition but we can start processing again |
|
delete(bp.currentRetries[msg.Topic], msg.Partition) |
|
Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n", |
|
bp.broker.ID(), msg.Topic, msg.Partition) |
|
} |
|
|
|
continue |
|
} |
|
|
|
if bp.buffer.wouldOverflow(msg) { |
|
if err := bp.waitForSpace(msg); err != nil { |
|
bp.parent.retryMessage(msg, err) |
|
continue |
|
} |
|
} |
|
|
|
if err := bp.buffer.add(msg); err != nil { |
|
bp.parent.returnError(msg, err) |
|
continue |
|
} |
|
|
|
if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil { |
|
bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency) |
|
} |
|
case <-bp.timer: |
|
bp.timerFired = true |
|
case output <- bp.buffer: |
|
bp.rollOver() |
|
case response := <-bp.responses: |
|
bp.handleResponse(response) |
|
} |
|
|
|
if bp.timerFired || bp.buffer.readyToFlush() { |
|
output = bp.output |
|
} else { |
|
output = nil |
|
} |
|
} |
|
} |
|
|
|
func (bp *brokerProducer) shutdown() { |
|
for !bp.buffer.empty() { |
|
select { |
|
case response := <-bp.responses: |
|
bp.handleResponse(response) |
|
case bp.output <- bp.buffer: |
|
bp.rollOver() |
|
} |
|
} |
|
close(bp.output) |
|
for response := range bp.responses { |
|
bp.handleResponse(response) |
|
} |
|
|
|
Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID()) |
|
} |
|
|
|
func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error { |
|
if bp.closing != nil { |
|
return bp.closing |
|
} |
|
|
|
return bp.currentRetries[msg.Topic][msg.Partition] |
|
} |
|
|
|
func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error { |
|
Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID()) |
|
|
|
for { |
|
select { |
|
case response := <-bp.responses: |
|
bp.handleResponse(response) |
|
// handling a response can change our state, so re-check some things |
|
if reason := bp.needsRetry(msg); reason != nil { |
|
return reason |
|
} else if !bp.buffer.wouldOverflow(msg) { |
|
return nil |
|
} |
|
case bp.output <- bp.buffer: |
|
bp.rollOver() |
|
return nil |
|
} |
|
} |
|
} |
|
|
|
func (bp *brokerProducer) rollOver() { |
|
bp.timer = nil |
|
bp.timerFired = false |
|
bp.buffer = newProduceSet(bp.parent) |
|
} |
|
|
|
func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) { |
|
if response.err != nil { |
|
bp.handleError(response.set, response.err) |
|
} else { |
|
bp.handleSuccess(response.set, response.res) |
|
} |
|
|
|
if bp.buffer.empty() { |
|
bp.rollOver() // this can happen if the response invalidated our buffer |
|
} |
|
} |
|
|
|
func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) { |
|
// we iterate through the blocks in the request set, not the response, so that we notice |
|
// if the response is missing a block completely |
|
sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { |
|
if response == nil { |
|
// this only happens when RequiredAcks is NoResponse, so we have to assume success |
|
bp.parent.returnSuccesses(msgs) |
|
return |
|
} |
|
|
|
block := response.GetBlock(topic, partition) |
|
if block == nil { |
|
bp.parent.returnErrors(msgs, ErrIncompleteResponse) |
|
return |
|
} |
|
|
|
switch block.Err { |
|
// Success |
|
case ErrNoError: |
|
if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() { |
|
for _, msg := range msgs { |
|
msg.Timestamp = block.Timestamp |
|
} |
|
} |
|
for i, msg := range msgs { |
|
msg.Offset = block.Offset + int64(i) |
|
} |
|
bp.parent.returnSuccesses(msgs) |
|
// Retriable errors |
|
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition, |
|
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: |
|
Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n", |
|
bp.broker.ID(), topic, partition, block.Err) |
|
bp.currentRetries[topic][partition] = block.Err |
|
bp.parent.retryMessages(msgs, block.Err) |
|
bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err) |
|
// Other non-retriable errors |
|
default: |
|
bp.parent.returnErrors(msgs, block.Err) |
|
} |
|
}) |
|
} |
|
|
|
func (bp *brokerProducer) handleError(sent *produceSet, err error) { |
|
switch err.(type) { |
|
case PacketEncodingError: |
|
sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { |
|
bp.parent.returnErrors(msgs, err) |
|
}) |
|
default: |
|
Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err) |
|
bp.parent.abandonBrokerConnection(bp.broker) |
|
_ = bp.broker.Close() |
|
bp.closing = err |
|
sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { |
|
bp.parent.retryMessages(msgs, err) |
|
}) |
|
bp.buffer.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { |
|
bp.parent.retryMessages(msgs, err) |
|
}) |
|
bp.rollOver() |
|
} |
|
} |
|
|
|
// singleton |
|
// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock |
|
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel |
|
func (p *asyncProducer) retryHandler() { |
|
var msg *ProducerMessage |
|
buf := queue.New() |
|
|
|
for { |
|
if buf.Length() == 0 { |
|
msg = <-p.retries |
|
} else { |
|
select { |
|
case msg = <-p.retries: |
|
case p.input <- buf.Peek().(*ProducerMessage): |
|
buf.Remove() |
|
continue |
|
} |
|
} |
|
|
|
if msg == nil { |
|
return |
|
} |
|
|
|
buf.Add(msg) |
|
} |
|
} |
|
|
|
// utility functions |
|
|
|
func (p *asyncProducer) shutdown() { |
|
Logger.Println("Producer shutting down.") |
|
p.inFlight.Add(1) |
|
p.input <- &ProducerMessage{flags: shutdown} |
|
|
|
p.inFlight.Wait() |
|
|
|
if p.ownClient { |
|
err := p.client.Close() |
|
if err != nil { |
|
Logger.Println("producer/shutdown failed to close the embedded client:", err) |
|
} |
|
} |
|
|
|
close(p.input) |
|
close(p.retries) |
|
close(p.errors) |
|
close(p.successes) |
|
} |
|
|
|
func (p *asyncProducer) returnError(msg *ProducerMessage, err error) { |
|
msg.clear() |
|
pErr := &ProducerError{Msg: msg, Err: err} |
|
if p.conf.Producer.Return.Errors { |
|
p.errors <- pErr |
|
} else { |
|
Logger.Println(pErr) |
|
} |
|
p.inFlight.Done() |
|
} |
|
|
|
func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) { |
|
for _, msg := range batch { |
|
p.returnError(msg, err) |
|
} |
|
} |
|
|
|
func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) { |
|
for _, msg := range batch { |
|
if p.conf.Producer.Return.Successes { |
|
msg.clear() |
|
p.successes <- msg |
|
} |
|
p.inFlight.Done() |
|
} |
|
} |
|
|
|
func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) { |
|
if msg.retries >= p.conf.Producer.Retry.Max { |
|
p.returnError(msg, err) |
|
} else { |
|
msg.retries++ |
|
p.retries <- msg |
|
} |
|
} |
|
|
|
func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) { |
|
for _, msg := range batch { |
|
p.retryMessage(msg, err) |
|
} |
|
} |
|
|
|
func (p *asyncProducer) getBrokerProducer(broker *Broker) chan<- *ProducerMessage { |
|
p.brokerLock.Lock() |
|
defer p.brokerLock.Unlock() |
|
|
|
bp := p.brokers[broker] |
|
|
|
if bp == nil { |
|
bp = p.newBrokerProducer(broker) |
|
p.brokers[broker] = bp |
|
p.brokerRefs[bp] = 0 |
|
} |
|
|
|
p.brokerRefs[bp]++ |
|
|
|
return bp |
|
} |
|
|
|
func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan<- *ProducerMessage) { |
|
p.brokerLock.Lock() |
|
defer p.brokerLock.Unlock() |
|
|
|
p.brokerRefs[bp]-- |
|
if p.brokerRefs[bp] == 0 { |
|
close(bp) |
|
delete(p.brokerRefs, bp) |
|
|
|
if p.brokers[broker] == bp { |
|
delete(p.brokers, broker) |
|
} |
|
} |
|
} |
|
|
|
func (p *asyncProducer) abandonBrokerConnection(broker *Broker) { |
|
p.brokerLock.Lock() |
|
defer p.brokerLock.Unlock() |
|
|
|
delete(p.brokers, broker) |
|
}
|
|
|