consumerhandler.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. //@File consumerhandler.go
  2. //@Time 2022/05/07
  3. //@Author #Suyghur,
  4. package ext
  5. import (
  6. "context"
  7. "github.com/Shopify/sarama"
  8. "github.com/zeromicro/go-zero/core/logx"
  9. "ylink/ext/kafka"
  10. )
  11. type callback func(msg []byte)
  12. type ConsumerHandler struct {
  13. Callbacks map[string]callback
  14. ConsumerGroup *kafka.ConsumerGroup
  15. }
  16. func (handler *ConsumerHandler) Init(config kafka.KqConfig) {
  17. handler.Callbacks = make(map[string]callback)
  18. handler.Callbacks[config.Topic] = handler.handleMessage
  19. consumerGroupConfig := kafka.ConsumerGroupConfig{
  20. KafkaVersion: sarama.V2_8_0_0,
  21. OffsetsInitial: sarama.OffsetNewest,
  22. IsReturnErr: false,
  23. }
  24. logx.WithContext(context.Background()).Infof("brokers: %v", config.Brokers)
  25. logx.WithContext(context.Background()).Infof("group id: %s", config.GroupId)
  26. handler.ConsumerGroup = kafka.NewConsumerGroup(&consumerGroupConfig, config.Brokers, []string{config.Topic}, config.GroupId)
  27. }
  28. func (handler *ConsumerHandler) handleMessage(msg []byte) {
  29. logx.WithContext(context.Background()).Infof("handle message from kafka: %s", string(msg))
  30. //msgFromMq:=
  31. }
  32. func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error {
  33. return nil
  34. }
  35. func (ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
  36. return nil
  37. }
  38. func (handler *ConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  39. for msg := range claim.Messages() {
  40. logx.WithContext(context.Background()).Infof("kafka get info to mysql, topic: %s, partition: %d, msg: %s", msg.Topic, msg.Partition, string(msg.Value))
  41. handler.Callbacks[msg.Topic](msg.Value)
  42. }
  43. return nil
  44. }