123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354 |
- //@File consumerhandler.go
- //@Time 2022/05/07
- //@Author #Suyghur,
- package ext
- import (
- "context"
- "github.com/Shopify/sarama"
- "github.com/zeromicro/go-zero/core/logx"
- "ylink/ext/kafka"
- )
- type callback func(msg []byte)
- type ConsumerHandler struct {
- Callbacks map[string]callback
- ConsumerGroup *kafka.ConsumerGroup
- }
- func (handler *ConsumerHandler) Init(config kafka.KqConfig) {
- handler.Callbacks = make(map[string]callback)
- handler.Callbacks[config.Topic] = handler.handleMessage
- consumerGroupConfig := kafka.ConsumerGroupConfig{
- KafkaVersion: sarama.V2_8_0_0,
- OffsetsInitial: sarama.OffsetNewest,
- IsReturnErr: false,
- }
- logx.WithContext(context.Background()).Infof("brokers: %v", config.Brokers)
- logx.WithContext(context.Background()).Infof("group id: %s", config.GroupId)
- handler.ConsumerGroup = kafka.NewConsumerGroup(&consumerGroupConfig, config.Brokers, []string{config.Topic}, config.GroupId)
- }
- func (handler *ConsumerHandler) handleMessage(msg []byte) {
- logx.WithContext(context.Background()).Infof("handle message from kafka: %s", string(msg))
- //msgFromMq:=
- }
- func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error {
- return nil
- }
- func (ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
- return nil
- }
- func (handler *ConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
- for msg := range claim.Messages() {
- logx.WithContext(context.Background()).Infof("kafka get info to mysql, topic: %s, partition: %d, msg: %s", msg.Topic, msg.Partition, string(msg.Value))
- handler.Callbacks[msg.Topic](msg.Value)
- }
- return nil
- }
|