consumergroup.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. //@File consumergroup.go
  2. //@Time 2022/05/06
  3. //@Author #Suyghur,
  4. package kafka
  5. import (
  6. "context"
  7. "github.com/Shopify/sarama"
  8. "github.com/zeromicro/go-zero/core/logx"
  9. )
  10. type ConsumerGroup struct {
  11. sarama.ConsumerGroup
  12. groupId string
  13. topics []string
  14. }
  15. type ConsumerGroupConfig struct {
  16. KafkaVersion sarama.KafkaVersion
  17. OffsetsInitial int64
  18. IsReturnErr bool
  19. }
  20. func NewConsumerGroup(c *ConsumerGroupConfig, addr, topics []string, groupId string) *ConsumerGroup {
  21. config := sarama.NewConfig()
  22. config.Version = c.KafkaVersion
  23. config.Consumer.Offsets.Initial = c.OffsetsInitial
  24. config.Consumer.Return.Errors = c.IsReturnErr
  25. client, err := sarama.NewClient(addr, config)
  26. if err != nil {
  27. logx.WithContext(context.Background()).Error(err.Error())
  28. panic(err.Error())
  29. }
  30. consumerGroup, err := sarama.NewConsumerGroupFromClient(groupId, client)
  31. if err != nil {
  32. logx.WithContext(context.Background()).Error(err.Error())
  33. panic(err.Error())
  34. }
  35. return &ConsumerGroup{consumerGroup, groupId, topics}
  36. }
  37. func (cg *ConsumerGroup) RegisterHandleAndConsumer(handler sarama.ConsumerGroupHandler) {
  38. ctx := context.Background()
  39. for {
  40. if err := cg.ConsumerGroup.Consume(ctx, cg.topics, handler); err != nil {
  41. logx.Error(err.Error())
  42. panic(err.Error())
  43. }
  44. }
  45. }