sendboxhandler.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. //@File sendboxhandler.go
  2. //@Time 2022/05/12
  3. //@Author #Suyghur,
  4. package ext
  5. import (
  6. "context"
  7. "encoding/json"
  8. "github.com/Shopify/sarama"
  9. "github.com/zeromicro/go-zero/core/logx"
  10. "ylink/ext/ds/treemap"
  11. "ylink/ext/kafka"
  12. "ylink/ext/model"
  13. )
  14. type callback func(msg []byte)
  15. type SendBoxConsumerHandler struct {
  16. callbacks map[string]callback
  17. producer *kafka.Producer
  18. ConsumerGroup *kafka.ConsumerGroup
  19. }
  20. func (handler *SendBoxConsumerHandler) Init(c kafka.KqConfig, producer *kafka.Producer) {
  21. handler.callbacks = make(map[string]callback)
  22. handler.callbacks[c.Topic] = handler.handleMessage
  23. handler.producer = producer
  24. handler.ConsumerGroup = kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
  25. KafkaVersion: sarama.V0_10_2_0,
  26. OffsetsInitial: sarama.OffsetNewest,
  27. IsReturnErr: false,
  28. }, c.Brokers, []string{c.Topic}, c.GroupId)
  29. }
  30. func (handler *SendBoxConsumerHandler) handleMessage(msg []byte) {
  31. logx.WithContext(context.Background()).Infof("message recv from send-box, %s", string(msg))
  32. // todo 将message转发到recv-box
  33. var message model.ChatMessage
  34. err := json.Unmarshal(msg, &message)
  35. if err != nil {
  36. logx.WithContext(context.Background()).Errorf("unmarshal message err: %s", err.Error())
  37. return
  38. }
  39. if len(message.ReceiverId) == 0 {
  40. // 玩家发的消息
  41. p2cMap := IdMap.Get(message.GameId).(*treemap.Map)
  42. message.ReceiverId = p2cMap.Get(message.SenderId).(string)
  43. b, _ := json.Marshal(message)
  44. handler.producer.SendMessage(string(b), message.ReceiverId)
  45. } else {
  46. handler.producer.SendMessage(string(msg), message.ReceiverId)
  47. }
  48. }
  49. func (SendBoxConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error {
  50. return nil
  51. }
  52. func (SendBoxConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
  53. return nil
  54. }
  55. func (handler *SendBoxConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  56. for msg := range claim.Messages() {
  57. logx.WithContext(context.Background()).Infof("send-box get info to db, topic: %s, partition: %d, msg: %s", msg.Topic, msg.Partition, string(msg.Value))
  58. handler.callbacks[msg.Topic](msg.Value)
  59. }
  60. return nil
  61. }