1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- //@File sendboxhandler.go
- //@Time 2022/05/12
- //@Author #Suyghur,
- package ext
- import (
- "context"
- "encoding/json"
- "github.com/Shopify/sarama"
- "github.com/zeromicro/go-zero/core/logx"
- "ylink/ext/ds/treemap"
- "ylink/ext/kafka"
- "ylink/ext/model"
- )
- type callback func(msg []byte)
- type SendBoxConsumerHandler struct {
- callbacks map[string]callback
- producer *kafka.Producer
- ConsumerGroup *kafka.ConsumerGroup
- }
- func (handler *SendBoxConsumerHandler) Init(c kafka.KqConfig, producer *kafka.Producer) {
- handler.callbacks = make(map[string]callback)
- handler.callbacks[c.Topic] = handler.handleMessage
- handler.producer = producer
- handler.ConsumerGroup = kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
- KafkaVersion: sarama.V0_10_2_0,
- OffsetsInitial: sarama.OffsetNewest,
- IsReturnErr: false,
- }, c.Brokers, []string{c.Topic}, c.GroupId)
- }
- func (handler *SendBoxConsumerHandler) handleMessage(msg []byte) {
- logx.WithContext(context.Background()).Infof("message recv from send-box, %s", string(msg))
- // todo 将message转发到recv-box
- var message model.ChatMessage
- err := json.Unmarshal(msg, &message)
- if err != nil {
- logx.WithContext(context.Background()).Errorf("unmarshal message err: %s", err.Error())
- return
- }
- if len(message.ReceiverId) == 0 {
- // 玩家发的消息
- p2cMap := IdMap.Get(message.GameId).(*treemap.Map)
- message.ReceiverId = p2cMap.Get(message.SenderId).(string)
- b, _ := json.Marshal(message)
- handler.producer.SendMessage(string(b), message.ReceiverId)
- } else {
- handler.producer.SendMessage(string(msg), message.ReceiverId)
- }
- }
- func (SendBoxConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error {
- return nil
- }
- func (SendBoxConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
- return nil
- }
- func (handler *SendBoxConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
- for msg := range claim.Messages() {
- logx.WithContext(context.Background()).Infof("send-box get info to db, topic: %s, partition: %d, msg: %s", msg.Topic, msg.Partition, string(msg.Value))
- handler.callbacks[msg.Topic](msg.Value)
- }
- return nil
- }
|