servicecontext.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package svc
  2. import (
  3. "context"
  4. "encoding/json"
  5. "github.com/Shopify/sarama"
  6. "github.com/bytedance/sonic"
  7. "github.com/liyue201/gostl/ds/list/simplelist"
  8. treemap "github.com/liyue201/gostl/ds/map"
  9. "github.com/zeromicro/go-zero/core/logx"
  10. "go.opentelemetry.io/otel/attribute"
  11. "io/ioutil"
  12. "ylink/comm/kafka"
  13. "ylink/comm/model"
  14. "ylink/comm/trace"
  15. "ylink/core/inner/rpc/internal/config"
  16. "ylink/core/inner/rpc/internal/ext"
  17. )
  18. type ServiceContext struct {
  19. Config config.Config
  20. KqMsgBoxProducer *kafka.Producer
  21. KqCmdBoxProducer *kafka.Producer
  22. ConsumerGroup *kafka.ConsumerGroup
  23. }
  24. func NewServiceContext(c config.Config) *ServiceContext {
  25. svcCtx := &ServiceContext{
  26. Config: c,
  27. KqMsgBoxProducer: kafka.NewKafkaProducer(c.KqMsgBoxProducerConf.Brokers, c.KqMsgBoxProducerConf.Topic),
  28. KqCmdBoxProducer: kafka.NewKafkaProducer(c.KqCmdBoxProducerConf.Brokers, c.KqCmdBoxProducerConf.Topic),
  29. ConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
  30. KafkaVersion: sarama.V1_0_0_0,
  31. OffsetsInitial: sarama.OffsetNewest,
  32. IsReturnErr: false,
  33. },
  34. c.KqMsgBoxConsumerConf.Brokers,
  35. []string{c.KqMsgBoxConsumerConf.Topic},
  36. c.KqMsgBoxConsumerConf.GroupId),
  37. }
  38. go svcCtx.subscribe()
  39. fetchCsCenterInfo(c)
  40. return svcCtx
  41. }
  42. func (s *ServiceContext) Setup(_ sarama.ConsumerGroupSession) error {
  43. return nil
  44. }
  45. func (s *ServiceContext) Cleanup(_ sarama.ConsumerGroupSession) error {
  46. return nil
  47. }
  48. func (s *ServiceContext) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  49. for msg := range claim.Messages() {
  50. if msg.Topic == s.Config.KqMsgBoxConsumerConf.Topic {
  51. s.handleMessage(sess, msg)
  52. }
  53. }
  54. return nil
  55. }
  56. func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
  57. traceId := kafka.GetTraceFromHeader(msg.Headers)
  58. if len(traceId) == 0 {
  59. return
  60. }
  61. trace.RunOnTracing(traceId, func(ctx context.Context) {
  62. var message model.KqMessage
  63. if err := json.Unmarshal(msg.Value, &message); err != nil {
  64. logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
  65. return
  66. }
  67. logx.WithContext(ctx).Infof("handle message: %s", msg.Value)
  68. trace.StartTrace(ctx, "InnerServer.handleMessage.SendMessage", func(ctx context.Context) {
  69. if len(message.ReceiverId) == 0 || message.ReceiverId == "" {
  70. // 玩家发的消息,先从connMap找对应的客服,没有则从vipMap找,都没有则丢弃信息不投递
  71. if ext.GameConnectedMap.Contains(message.GameId) {
  72. // 先从connMap找对应的客服映射
  73. if playerConnMap := ext.GameConnectedMap.Get(message.GameId).(*treemap.Map); playerConnMap.Contains(message.SenderId) {
  74. message.ReceiverId = playerConnMap.Get(message.SenderId).(string)
  75. } else {
  76. if ext.GameVipMap.Contains(message.GameId) {
  77. // 从vipMap里面找
  78. if playerVipMap := ext.GameVipMap.Get(message.GameId).(*treemap.Map); playerVipMap.Contains(message.SenderId) {
  79. message.ReceiverId = playerVipMap.Get(message.SenderId).(string)
  80. } else {
  81. message.ReceiverId = ""
  82. }
  83. } else {
  84. message.ReceiverId = ""
  85. }
  86. }
  87. } else {
  88. if ext.GameVipMap.Contains(message.GameId) {
  89. // 从vipMap里面找
  90. if playerVipMap := ext.GameVipMap.Get(message.GameId).(*treemap.Map); playerVipMap.Contains(message.SenderId) {
  91. message.ReceiverId = playerVipMap.Get(message.SenderId).(string)
  92. } else {
  93. message.ReceiverId = ""
  94. }
  95. } else {
  96. message.ReceiverId = ""
  97. }
  98. }
  99. // 经过填补后receiver_id还是空的则有异常,丢弃信息不投递
  100. if len(message.ReceiverId) != 0 && message.ReceiverId != "" {
  101. logx.WithContext(ctx).Infof("receiver: %s", message.ReceiverId)
  102. kMsg, _ := json.Marshal(message)
  103. s.KqMsgBoxProducer.SendMessage(ctx, string(kMsg), message.ReceiverId)
  104. } else {
  105. logx.WithContext(ctx).Errorf("can not find receiver of the sender")
  106. }
  107. } else {
  108. s.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), message.ReceiverId)
  109. }
  110. sess.MarkMessage(msg, "")
  111. }, attribute.String("msg.key", string(msg.Key)))
  112. })
  113. }
  114. func (s *ServiceContext) subscribe() {
  115. go s.ConsumerGroup.RegisterHandleAndConsumer(s)
  116. }
  117. func fetchCsCenterInfo(c config.Config) {
  118. // mock info
  119. ext.CsInfoMap = treemap.New(treemap.WithGoroutineSafe())
  120. ext.GameVipMap = treemap.New(treemap.WithGoroutineSafe())
  121. ext.GameOnlinePlayerMap = treemap.New(treemap.WithGoroutineSafe())
  122. ext.GameConnectedMap = treemap.New(treemap.WithGoroutineSafe())
  123. ext.WaitingQueue = simplelist.New()
  124. go loadMockInfo(c)
  125. }
  126. func loadMockInfo(c config.Config) {
  127. // 加载游戏列表
  128. logx.Info("加载游戏列表")
  129. var gameIds []string
  130. gameIdsData, err := ioutil.ReadFile(c.MockFolder + "/game_id.json")
  131. if err != nil {
  132. logx.Errorf("parse game_id.json has some error: %v", err)
  133. return
  134. }
  135. if err := sonic.Unmarshal(gameIdsData, &gameIds); err != nil {
  136. return
  137. }
  138. // 加载vip玩家信息
  139. logx.Info("加载vip玩家信息")
  140. for _, gameId := range gameIds {
  141. vipPlayerMap := treemap.New(treemap.WithGoroutineSafe())
  142. var playerInfos []*model.PlayerInfo
  143. playerInfosData, _ := ioutil.ReadFile(c.MockFolder + "/" + gameId + ".json")
  144. if err := sonic.Unmarshal(playerInfosData, &playerInfos); err != nil {
  145. return
  146. }
  147. for _, playerInfo := range playerInfos {
  148. vipPlayerMap.Insert(playerInfo.PlayerId, playerInfo)
  149. }
  150. ext.GameVipMap.Insert(gameId, vipPlayerMap)
  151. }
  152. // 加载客服信息
  153. logx.Info("加载客服信息")
  154. var csInfos []*model.CsInfo
  155. csInfosData, err := ioutil.ReadFile(c.MockFolder + "/cs_info.json")
  156. if err := sonic.Unmarshal(csInfosData, &csInfos); err != nil {
  157. return
  158. }
  159. for _, csInfo := range csInfos {
  160. ext.CsInfoMap.Insert(csInfo.CsId, csInfo)
  161. }
  162. }