You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

514 lines
14 KiB

package consumergroup
import (
"errors"
"fmt"
"sync"
"time"
"github.com/Shopify/sarama"
"github.com/wvanbergen/kazoo-go"
)
var (
AlreadyClosing = errors.New("The consumer group is already shutting down.")
)
type Config struct {
*sarama.Config
Zookeeper *kazoo.Config
Offsets struct {
Initial int64 // The initial offset method to use if the consumer has no previously stored offset. Must be either sarama.OffsetOldest (default) or sarama.OffsetNewest.
ProcessingTimeout time.Duration // Time to wait for all the offsets for a partition to be processed after stopping to consume from it. Defaults to 1 minute.
CommitInterval time.Duration // The interval between which the processed offsets are commited.
ResetOffsets bool // Resets the offsets for the consumergroup so that it won't resume from where it left off previously.
}
}
func NewConfig() *Config {
config := &Config{}
config.Config = sarama.NewConfig()
config.Zookeeper = kazoo.NewConfig()
config.Offsets.Initial = sarama.OffsetOldest
config.Offsets.ProcessingTimeout = 60 * time.Second
config.Offsets.CommitInterval = 10 * time.Second
return config
}
func (cgc *Config) Validate() error {
if cgc.Zookeeper.Timeout <= 0 {
return sarama.ConfigurationError("ZookeeperTimeout should have a duration > 0")
}
if cgc.Offsets.CommitInterval < 0 {
return sarama.ConfigurationError("CommitInterval should have a duration >= 0")
}
if cgc.Offsets.Initial != sarama.OffsetOldest && cgc.Offsets.Initial != sarama.OffsetNewest {
return errors.New("Offsets.Initial should be sarama.OffsetOldest or sarama.OffsetNewest.")
}
if cgc.Config != nil {
if err := cgc.Config.Validate(); err != nil {
return err
}
}
return nil
}
// The ConsumerGroup type holds all the information for a consumer that is part
// of a consumer group. Call JoinConsumerGroup to start a consumer.
type ConsumerGroup struct {
config *Config
consumer sarama.Consumer
kazoo *kazoo.Kazoo
group *kazoo.Consumergroup
instance *kazoo.ConsumergroupInstance
wg sync.WaitGroup
singleShutdown sync.Once
messages chan *sarama.ConsumerMessage
errors chan error
stopper chan struct{}
consumers kazoo.ConsumergroupInstanceList
offsetManager OffsetManager
}
// Connects to a consumer group, using Zookeeper for auto-discovery
func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *Config) (cg *ConsumerGroup, err error) {
if name == "" {
return nil, sarama.ConfigurationError("Empty consumergroup name")
}
if len(topics) == 0 {
return nil, sarama.ConfigurationError("No topics provided")
}
if len(zookeeper) == 0 {
return nil, errors.New("You need to provide at least one zookeeper node address!")
}
if config == nil {
config = NewConfig()
}
config.ClientID = name
// Validate configuration
if err = config.Validate(); err != nil {
return
}
var kz *kazoo.Kazoo
if kz, err = kazoo.NewKazoo(zookeeper, config.Zookeeper); err != nil {
return
}
brokers, err := kz.BrokerList()
if err != nil {
kz.Close()
return
}
group := kz.Consumergroup(name)
if config.Offsets.ResetOffsets {
err = group.ResetOffsets()
if err != nil {
kz.Close()
return
}
}
instance := group.NewInstance()
var consumer sarama.Consumer
if consumer, err = sarama.NewConsumer(brokers, config.Config); err != nil {
kz.Close()
return
}
cg = &ConsumerGroup{
config: config,
consumer: consumer,
kazoo: kz,
group: group,
instance: instance,
messages: make(chan *sarama.ConsumerMessage, config.ChannelBufferSize),
errors: make(chan error, config.ChannelBufferSize),
stopper: make(chan struct{}),
}
// Register consumer group
if exists, err := cg.group.Exists(); err != nil {
cg.Logf("FAILED to check for existence of consumergroup: %s!\n", err)
_ = consumer.Close()
_ = kz.Close()
return nil, err
} else if !exists {
cg.Logf("Consumergroup `%s` does not yet exists, creating...\n", cg.group.Name)
if err := cg.group.Create(); err != nil {
cg.Logf("FAILED to create consumergroup in Zookeeper: %s!\n", err)
_ = consumer.Close()
_ = kz.Close()
return nil, err
}
}
// Register itself with zookeeper
if err := cg.instance.Register(topics); err != nil {
cg.Logf("FAILED to register consumer instance: %s!\n", err)
return nil, err
} else {
cg.Logf("Consumer instance registered (%s).", cg.instance.ID)
}
offsetConfig := OffsetManagerConfig{CommitInterval: config.Offsets.CommitInterval}
cg.offsetManager = NewZookeeperOffsetManager(cg, &offsetConfig)
go cg.topicListConsumer(topics)
return
}
// Returns a channel that you can read to obtain events from Kafka to process.
func (cg *ConsumerGroup) Messages() <-chan *sarama.ConsumerMessage {
return cg.messages
}
// Returns a channel that you can read to obtain events from Kafka to process.
func (cg *ConsumerGroup) Errors() <-chan error {
return cg.errors
}
func (cg *ConsumerGroup) Closed() bool {
return cg.instance == nil
}
func (cg *ConsumerGroup) Close() error {
shutdownError := AlreadyClosing
cg.singleShutdown.Do(func() {
defer cg.kazoo.Close()
shutdownError = nil
close(cg.stopper)
cg.wg.Wait()
if err := cg.offsetManager.Close(); err != nil {
cg.Logf("FAILED closing the offset manager: %s!\n", err)
}
if shutdownError = cg.instance.Deregister(); shutdownError != nil {
cg.Logf("FAILED deregistering consumer instance: %s!\n", shutdownError)
} else {
cg.Logf("Deregistered consumer instance %s.\n", cg.instance.ID)
}
if shutdownError = cg.consumer.Close(); shutdownError != nil {
cg.Logf("FAILED closing the Sarama client: %s\n", shutdownError)
}
close(cg.messages)
close(cg.errors)
cg.instance = nil
})
return shutdownError
}
func (cg *ConsumerGroup) Logf(format string, args ...interface{}) {
var identifier string
if cg.instance == nil {
identifier = "(defunct)"
} else {
identifier = cg.instance.ID[len(cg.instance.ID)-12:]
}
sarama.Logger.Printf("[%s/%s] %s", cg.group.Name, identifier, fmt.Sprintf(format, args...))
}
func (cg *ConsumerGroup) InstanceRegistered() (bool, error) {
return cg.instance.Registered()
}
func (cg *ConsumerGroup) CommitUpto(message *sarama.ConsumerMessage) error {
cg.offsetManager.MarkAsProcessed(message.Topic, message.Partition, message.Offset)
return nil
}
func (cg *ConsumerGroup) FlushOffsets() error {
return cg.offsetManager.Flush()
}
func (cg *ConsumerGroup) topicListConsumer(topics []string) {
for {
select {
case <-cg.stopper:
return
default:
}
consumers, consumerChanges, err := cg.group.WatchInstances()
if err != nil {
cg.Logf("FAILED to get list of registered consumer instances: %s\n", err)
return
}
cg.consumers = consumers
cg.Logf("Currently registered consumers: %d\n", len(cg.consumers))
stopper := make(chan struct{})
for _, topic := range topics {
cg.wg.Add(1)
go cg.topicConsumer(topic, cg.messages, cg.errors, stopper)
}
select {
case <-cg.stopper:
close(stopper)
return
case <-consumerChanges:
registered, err := cg.instance.Registered()
if err != nil {
cg.Logf("FAILED to get register status: %s\n", err)
} else if !registered {
err = cg.instance.Register(topics)
if err != nil {
cg.Logf("FAILED to register consumer instance: %s!\n", err)
} else {
cg.Logf("Consumer instance registered (%s).", cg.instance.ID)
}
}
cg.Logf("Triggering rebalance due to consumer list change\n")
close(stopper)
cg.wg.Wait()
}
}
}
func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.ConsumerMessage, errors chan<- error, stopper <-chan struct{}) {
defer cg.wg.Done()
select {
case <-stopper:
return
default:
}
cg.Logf("%s :: Started topic consumer\n", topic)
// Fetch a list of partition IDs
partitions, err := cg.kazoo.Topic(topic).Partitions()
if err != nil {
cg.Logf("%s :: FAILED to get list of partitions: %s\n", topic, err)
cg.errors <- &sarama.ConsumerError{
Topic: topic,
Partition: -1,
Err: err,
}
return
}
partitionLeaders, err := retrievePartitionLeaders(partitions)
if err != nil {
cg.Logf("%s :: FAILED to get leaders of partitions: %s\n", topic, err)
cg.errors <- &sarama.ConsumerError{
Topic: topic,
Partition: -1,
Err: err,
}
return
}
dividedPartitions := dividePartitionsBetweenConsumers(cg.consumers, partitionLeaders)
myPartitions := dividedPartitions[cg.instance.ID]
cg.Logf("%s :: Claiming %d of %d partitions", topic, len(myPartitions), len(partitionLeaders))
// Consume all the assigned partitions
var wg sync.WaitGroup
for _, pid := range myPartitions {
wg.Add(1)
go cg.partitionConsumer(topic, pid.ID, messages, errors, &wg, stopper)
}
wg.Wait()
cg.Logf("%s :: Stopped topic consumer\n", topic)
}
func (cg *ConsumerGroup) consumePartition(topic string, partition int32, nextOffset int64) (sarama.PartitionConsumer, error) {
consumer, err := cg.consumer.ConsumePartition(topic, partition, nextOffset)
if err == sarama.ErrOffsetOutOfRange {
cg.Logf("%s/%d :: Partition consumer offset out of Range.\n", topic, partition)
// if the offset is out of range, simplistically decide whether to use OffsetNewest or OffsetOldest
// if the configuration specified offsetOldest, then switch to the oldest available offset, else
// switch to the newest available offset.
if cg.config.Offsets.Initial == sarama.OffsetOldest {
nextOffset = sarama.OffsetOldest
cg.Logf("%s/%d :: Partition consumer offset reset to oldest available offset.\n", topic, partition)
} else {
nextOffset = sarama.OffsetNewest
cg.Logf("%s/%d :: Partition consumer offset reset to newest available offset.\n", topic, partition)
}
// retry the consumePartition with the adjusted offset
consumer, err = cg.consumer.ConsumePartition(topic, partition, nextOffset)
}
if err != nil {
cg.Logf("%s/%d :: FAILED to start partition consumer: %s\n", topic, partition, err)
return nil, err
}
return consumer, err
}
// Consumes a partition
func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messages chan<- *sarama.ConsumerMessage, errors chan<- error, wg *sync.WaitGroup, stopper <-chan struct{}) {
defer wg.Done()
select {
case <-stopper:
return
default:
}
// Since ProcessingTimeout is the amount of time we'll wait for the final batch
// of messages to be processed before releasing a partition, we need to wait slightly
// longer than that before timing out here to ensure that another consumer has had
// enough time to release the partition. Hence, +2 seconds.
maxRetries := int(cg.config.Offsets.ProcessingTimeout/time.Second) + 2
for tries := 0; tries < maxRetries; tries++ {
if err := cg.instance.ClaimPartition(topic, partition); err == nil {
break
} else if tries+1 < maxRetries {
if err == kazoo.ErrPartitionClaimedByOther {
// Another consumer still owns this partition. We should wait longer for it to release it.
time.Sleep(1 * time.Second)
} else {
// An unexpected error occurred. Log it and continue trying until we hit the timeout.
cg.Logf("%s/%d :: FAILED to claim partition on attempt %v of %v; retrying in 1 second. Error: %v", topic, partition, tries+1, maxRetries, err)
time.Sleep(1 * time.Second)
}
} else {
cg.Logf("%s/%d :: FAILED to claim the partition: %s\n", topic, partition, err)
cg.errors <- &sarama.ConsumerError{
Topic: topic,
Partition: partition,
Err: err,
}
return
}
}
defer func() {
err := cg.instance.ReleasePartition(topic, partition)
if err != nil {
cg.Logf("%s/%d :: FAILED to release partition: %s\n", topic, partition, err)
cg.errors <- &sarama.ConsumerError{
Topic: topic,
Partition: partition,
Err: err,
}
}
}()
nextOffset, err := cg.offsetManager.InitializePartition(topic, partition)
if err != nil {
cg.Logf("%s/%d :: FAILED to determine initial offset: %s\n", topic, partition, err)
return
}
if nextOffset >= 0 {
cg.Logf("%s/%d :: Partition consumer starting at offset %d.\n", topic, partition, nextOffset)
} else {
nextOffset = cg.config.Offsets.Initial
if nextOffset == sarama.OffsetOldest {
cg.Logf("%s/%d :: Partition consumer starting at the oldest available offset.\n", topic, partition)
} else if nextOffset == sarama.OffsetNewest {
cg.Logf("%s/%d :: Partition consumer listening for new messages only.\n", topic, partition)
}
}
consumer, err := cg.consumePartition(topic, partition, nextOffset)
if err != nil {
cg.Logf("%s/%d :: FAILED to start partition consumer: %s\n", topic, partition, err)
return
}
defer consumer.Close()
err = nil
var lastOffset int64 = -1 // aka unknown
partitionConsumerLoop:
for {
select {
case <-stopper:
break partitionConsumerLoop
case err := <-consumer.Errors():
if err == nil {
cg.Logf("%s/%d :: Consumer encountered an invalid state: re-establishing consumption of partition.\n", topic, partition)
// Errors encountered (if any) are logged in the consumerPartition function
var cErr error
consumer, cErr = cg.consumePartition(topic, partition, lastOffset)
if cErr != nil {
break partitionConsumerLoop
}
continue partitionConsumerLoop
}
for {
select {
case errors <- err:
continue partitionConsumerLoop
case <-stopper:
break partitionConsumerLoop
}
}
case message := <-consumer.Messages():
if message == nil {
cg.Logf("%s/%d :: Consumer encountered an invalid state: re-establishing consumption of partition.\n", topic, partition)
// Errors encountered (if any) are logged in the consumerPartition function
var cErr error
consumer, cErr = cg.consumePartition(topic, partition, lastOffset)
if cErr != nil {
break partitionConsumerLoop
}
continue partitionConsumerLoop
}
for {
select {
case <-stopper:
break partitionConsumerLoop
case messages <- message:
lastOffset = message.Offset
continue partitionConsumerLoop
}
}
}
}
cg.Logf("%s/%d :: Stopping partition consumer at offset %d\n", topic, partition, lastOffset)
if err := cg.offsetManager.FinalizePartition(topic, partition, lastOffset, cg.config.Offsets.ProcessingTimeout); err != nil {
cg.Logf("%s/%d :: %s\n", topic, partition, err)
}
}