package kafka import ( "context" "crypto/tls" "crypto/x509" "io" "log" "os" "time" "git.wishpal.cn/wishpal_ironfan/xframe/base/queue" "git.wishpal.cn/wishpal_ironfan/xframe/base/service" "git.wishpal.cn/wishpal_ironfan/xframe/base/stat" "git.wishpal.cn/wishpal_ironfan/xframe/base/threading" "git.wishpal.cn/wishpal_ironfan/xframe/base/timex" "git.wishpal.cn/wishpal_ironfan/xframe/base/utils" "git.wishpal.cn/wishpal_ironfan/xframe/component/logger" "github.com/segmentio/kafka-go" _ "github.com/segmentio/kafka-go/gzip" _ "github.com/segmentio/kafka-go/lz4" "github.com/segmentio/kafka-go/sasl/plain" _ "github.com/segmentio/kafka-go/snappy" ) const ( defaultCommitInterval = time.Second defaultMaxWait = time.Second defaultQueueCapacity = 1000 ) type ( ConsumeHandle func(msg kafka.Message) error ConsumeErrorHandler func(msg kafka.Message, err error) ConsumeHandler interface { Consume(msg kafka.Message) error } queueOptions struct { commitInterval time.Duration queueCapacity int maxWait time.Duration metrics *stat.Metrics errorHandler ConsumeErrorHandler } QueueOption func(*queueOptions) kafkaQueue struct { c KqConf consumer *kafka.Reader handler ConsumeHandler channel chan kafka.Message producerRoutines *threading.RoutineGroup consumerRoutines *threading.RoutineGroup metrics *stat.Metrics errorHandler ConsumeErrorHandler } kafkaQueues struct { queues []queue.MessageQueue group *service.ServiceGroup } ) func MustNewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue { q, err := NewQueue(c, handler, opts...) if err != nil { log.Fatal(err) } return q } func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error) { var options queueOptions for _, opt := range opts { opt(&options) } ensureQueueOptions(c, &options) if c.Conns < 1 { c.Conns = 1 } q := kafkaQueues{ group: service.NewServiceGroup(), } for i := 0; i < c.Conns; i++ { q.queues = append(q.queues, newKafkaQueue(c, handler, options)) } return q, nil } func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue.MessageQueue { var offset int64 if c.Offset == firstOffset { offset = kafka.FirstOffset } else { offset = kafka.LastOffset } readerConfig := kafka.ReaderConfig{ Brokers: c.Brokers, GroupID: c.Group, GroupTopics: c.GroupTopics, StartOffset: offset, MinBytes: c.MinBytes, MaxBytes: c.MaxBytes, MaxWait: options.maxWait, CommitInterval: options.commitInterval, QueueCapacity: options.queueCapacity, } if len(c.Username) > 0 && len(c.Password) > 0 { readerConfig.Dialer = &kafka.Dialer{ SASLMechanism: plain.Mechanism{ Username: c.Username, Password: c.Password, }, } } if len(c.CaFile) > 0 { caCert, err := os.ReadFile(c.CaFile) if err != nil { log.Fatal(err) } caCertPool := x509.NewCertPool() ok := caCertPool.AppendCertsFromPEM(caCert) if !ok { log.Fatal(err) } readerConfig.Dialer.TLS = &tls.Config{ RootCAs: caCertPool, InsecureSkipVerify: true, } } consumer := kafka.NewReader(readerConfig) return &kafkaQueue{ c: c, consumer: consumer, handler: handler, channel: make(chan kafka.Message), producerRoutines: threading.NewRoutineGroup(), consumerRoutines: threading.NewRoutineGroup(), metrics: options.metrics, errorHandler: options.errorHandler, } } func (q *kafkaQueue) Start() { q.startProcessors() q.startConsumers() q.producerRoutines.Wait() close(q.channel) q.consumerRoutines.Wait() } func (q *kafkaQueue) Stop() { q.consumer.Close() } func (q *kafkaQueue) consumeOne(msg kafka.Message) error { startTime := timex.Now() err := q.handler.Consume(msg) q.metrics.Add(stat.Task{ Duration: timex.Since(startTime), }) return err } func (q *kafkaQueue) startProcessors() { for i := 0; i < q.c.Processors; i++ { q.consumerRoutines.RunVoid(func() { for msg := range q.channel { if err := q.consumeOne(msg); err != nil { if q.errorHandler != nil { q.errorHandler(msg, err) } if !q.c.ForceCommit { continue } } if err := q.consumer.CommitMessages(context.Background(), msg); err != nil { logger.Errorf("commit failed, error: %v", err) } } }) } } func (q *kafkaQueue) startConsumers() { for i := 0; i < q.c.Consumers; i++ { q.producerRoutines.RunVoid(func() { for { msg, err := q.consumer.FetchMessage(context.Background()) // io.EOF means consumer closed // io.ErrClosedPipe means committing messages on the consumer, // kafka will refire the messages on uncommitted messages, ignore if err == io.EOF || err == io.ErrClosedPipe { return } if err != nil { logger.Errorf("Error on reading message, %q", err.Error()) continue } q.channel <- msg } }) } } func (q kafkaQueues) Start() { for _, each := range q.queues { q.group.Add(each) } q.group.Start() } func (q kafkaQueues) Stop() { q.group.Stop() } func WithCommitInterval(interval time.Duration) QueueOption { return func(options *queueOptions) { options.commitInterval = interval } } func WithQueueCapacity(queueCapacity int) QueueOption { return func(options *queueOptions) { options.queueCapacity = queueCapacity } } func WithHandle(handle ConsumeHandle) ConsumeHandler { return innerConsumeHandler{ handle: handle, } } func WithMaxWait(wait time.Duration) QueueOption { return func(options *queueOptions) { options.maxWait = wait } } func WithMetrics(metrics *stat.Metrics) QueueOption { return func(options *queueOptions) { options.metrics = metrics } } func WithErrorHandler(errorHandler ConsumeErrorHandler) QueueOption { return func(options *queueOptions) { options.errorHandler = errorHandler } } type innerConsumeHandler struct { handle ConsumeHandle } func (ch innerConsumeHandler) Consume(msg kafka.Message) error { return ch.handle(msg) } func ensureQueueOptions(c KqConf, options *queueOptions) { if options.commitInterval == 0 { options.commitInterval = defaultCommitInterval } if options.queueCapacity == 0 { options.queueCapacity = defaultQueueCapacity } if options.maxWait == 0 { options.maxWait = defaultMaxWait } if options.metrics == nil { options.metrics = stat.NewMetrics(utils.GetServiceName()) } if options.errorHandler == nil { options.errorHandler = func(msg kafka.Message, err error) { logger.Errorf("consume: %s, error: %v", string(msg.Value), err) } } }