servicecontext.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package svc
  2. import (
  3. "context"
  4. "encoding/json"
  5. "github.com/Shopify/sarama"
  6. "github.com/liyue201/gostl/ds/list/simplelist"
  7. treemap "github.com/liyue201/gostl/ds/map"
  8. "github.com/zeromicro/go-zero/core/logx"
  9. "go.opentelemetry.io/otel/attribute"
  10. "ylink/comm/kafka"
  11. "ylink/comm/model"
  12. "ylink/comm/trace"
  13. "ylink/core/inner/rpc/internal/config"
  14. "ylink/core/inner/rpc/internal/ext"
  15. )
  16. type ServiceContext struct {
  17. Config config.Config
  18. KqMsgBoxProducer *kafka.Producer
  19. ConsumerGroup *kafka.ConsumerGroup
  20. }
  21. func NewServiceContext(c config.Config) *ServiceContext {
  22. fetchCsCenterInfo()
  23. svcCtx := &ServiceContext{
  24. Config: c,
  25. KqMsgBoxProducer: kafka.NewKafkaProducer(c.KqMsgBoxProducerConf.Brokers, c.KqMsgBoxProducerConf.Topic),
  26. ConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
  27. KafkaVersion: sarama.V1_0_0_0,
  28. OffsetsInitial: sarama.OffsetNewest,
  29. IsReturnErr: false,
  30. },
  31. c.KqMsgBoxConsumerConf.Brokers,
  32. []string{c.KqMsgBoxConsumerConf.Topic},
  33. c.KqMsgBoxConsumerConf.GroupId),
  34. }
  35. go svcCtx.subscribe()
  36. go fetchCsCenterInfo()
  37. return svcCtx
  38. }
  39. func (s *ServiceContext) Setup(_ sarama.ConsumerGroupSession) error {
  40. return nil
  41. }
  42. func (s *ServiceContext) Cleanup(_ sarama.ConsumerGroupSession) error {
  43. return nil
  44. }
  45. func (s *ServiceContext) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  46. for msg := range claim.Messages() {
  47. if msg.Topic == s.Config.KqMsgBoxConsumerConf.Topic {
  48. s.handleMessage(sess, msg)
  49. }
  50. }
  51. return nil
  52. }
  53. func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
  54. traceId := kafka.GetTraceFromHeader(msg.Headers)
  55. if len(traceId) == 0 {
  56. return
  57. }
  58. trace.RunOnTracing(traceId, func(ctx context.Context) {
  59. var message model.KqMessage
  60. if err := json.Unmarshal(msg.Value, &message); err != nil {
  61. logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
  62. return
  63. }
  64. logx.WithContext(ctx).Infof("handle message: %s", msg.Value)
  65. trace.StartTrace(ctx, "InnerServer.handleMessage.SendMessage", func(ctx context.Context) {
  66. if len(message.ReceiverId) == 0 || message.ReceiverId == "" {
  67. // 玩家发的消息,先从connMap找对应的客服,没有则从vipMap找,都没有则丢弃信息不投递
  68. if ext.GameConnMap.Contains(message.GameId) {
  69. // 先从connMap找对应的客服映射
  70. if playerConnMap := ext.GameConnMap.Get(message.GameId).(*treemap.Map); playerConnMap.Contains(message.SenderId) {
  71. message.ReceiverId = playerConnMap.Get(message.SenderId).(string)
  72. } else {
  73. if ext.GameVipMap.Contains(message.GameId) {
  74. // 从vipMap里面找
  75. if playerVipMap := ext.GameVipMap.Get(message.GameId).(*treemap.Map); playerVipMap.Contains(message.SenderId) {
  76. message.ReceiverId = playerVipMap.Get(message.SenderId).(string)
  77. } else {
  78. message.ReceiverId = ""
  79. }
  80. } else {
  81. message.ReceiverId = ""
  82. }
  83. }
  84. } else {
  85. if ext.GameVipMap.Contains(message.GameId) {
  86. // 从vipMap里面找
  87. if playerVipMap := ext.GameVipMap.Get(message.GameId).(*treemap.Map); playerVipMap.Contains(message.SenderId) {
  88. message.ReceiverId = playerVipMap.Get(message.SenderId).(string)
  89. } else {
  90. message.ReceiverId = ""
  91. }
  92. } else {
  93. message.ReceiverId = ""
  94. }
  95. }
  96. // 经过填补后receiver_id还是空的则有异常,丢弃信息不投递
  97. if len(message.ReceiverId) != 0 && message.ReceiverId != "" {
  98. logx.WithContext(ctx).Infof("receiver: %s", message.ReceiverId)
  99. kMsg, _ := json.Marshal(message)
  100. s.KqMsgBoxProducer.SendMessage(ctx, string(kMsg), message.ReceiverId)
  101. } else {
  102. logx.WithContext(ctx).Errorf("can not find receiver of the sender")
  103. }
  104. } else {
  105. s.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), message.ReceiverId)
  106. }
  107. sess.MarkMessage(msg, "")
  108. }, attribute.String("msg.key", string(msg.Key)))
  109. })
  110. }
  111. func (s *ServiceContext) subscribe() {
  112. go s.ConsumerGroup.RegisterHandleAndConsumer(s)
  113. }
  114. func fetchCsCenterInfo() {
  115. // mock info
  116. ext.Game2PlayerStatusMap = treemap.New(treemap.WithGoroutineSafe())
  117. ext.GameConnMap = treemap.New(treemap.WithGoroutineSafe())
  118. ext.CsInfoMap = treemap.New(treemap.WithGoroutineSafe())
  119. ext.WaitingQueue = simplelist.New()
  120. mockInfo()
  121. }
  122. func mockInfo() {
  123. ext.GameVipMap = treemap.New(treemap.WithGoroutineSafe())
  124. ext.CsInfoMap = treemap.New(treemap.WithGoroutineSafe())
  125. // 已连接的映射
  126. // 专属客服映射
  127. game1231P2cMap := treemap.New(treemap.WithGoroutineSafe())
  128. game1231P2cMap.Insert("player_1231", "cs_1231")
  129. game1231P2cMap.Insert("player_1111", "cs_2222")
  130. game1111P2cMap := treemap.New(treemap.WithGoroutineSafe())
  131. game1111P2cMap.Insert("player_1231", "cs_1111")
  132. ext.GameVipMap.Insert("game_1231", game1231P2cMap)
  133. ext.GameVipMap.Insert("game_1111", game1111P2cMap)
  134. ext.CsInfoMap.Insert("cs_1231", &model.CsInfo{
  135. CsId: "cs_1231",
  136. CsNickname: "客服1231",
  137. CsAvatarUrl: "https://www.baidu.com",
  138. CsSignature: "我是客服1231",
  139. OnlineStatus: 0,
  140. })
  141. ext.CsInfoMap.Insert("cs_1111", &model.CsInfo{
  142. CsId: "cs_1111",
  143. CsNickname: "客服1111",
  144. CsAvatarUrl: "https://www.baidu.com",
  145. CsSignature: "我是客服1111",
  146. OnlineStatus: 0,
  147. })
  148. ext.CsInfoMap.Insert("cs_2222", &model.CsInfo{
  149. CsId: "cs_2222",
  150. CsNickname: "客服2222",
  151. CsAvatarUrl: "https://www.baidu.com",
  152. CsSignature: "我是客服2222",
  153. OnlineStatus: 0,
  154. })
  155. }