servicecontext.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package svc
  2. import (
  3. "context"
  4. "github.com/Shopify/sarama"
  5. "github.com/bytedance/sonic"
  6. "github.com/duke-git/lancet/v2/strutil"
  7. "github.com/gookit/event"
  8. treemap "github.com/liyue201/gostl/ds/map"
  9. "github.com/robfig/cron/v3"
  10. "github.com/zeromicro/go-zero/core/logx"
  11. "go.opentelemetry.io/otel/attribute"
  12. "io/ioutil"
  13. "ylink/comm/globalkey"
  14. "ylink/comm/kafka"
  15. "ylink/comm/model"
  16. "ylink/comm/trace"
  17. "ylink/core/inner/rpc/internal/config"
  18. "ylink/core/inner/rpc/internal/ext"
  19. )
  20. type ServiceContext struct {
  21. Config config.Config
  22. KqMsgBoxProducer *kafka.Producer
  23. KqCmdBoxProducer *kafka.Producer
  24. KqMsgConsumerGroup *kafka.ConsumerGroup
  25. TimeoutCron *cron.Cron
  26. }
  27. func NewServiceContext(c config.Config) *ServiceContext {
  28. svcCtx := &ServiceContext{
  29. Config: c,
  30. KqMsgBoxProducer: kafka.NewKafkaProducer(c.KqMsgBoxProducerConf.Brokers, c.KqMsgBoxProducerConf.Topic),
  31. KqCmdBoxProducer: kafka.NewKafkaProducer(c.KqCmdBoxProducerConf.Brokers, c.KqCmdBoxProducerConf.Topic),
  32. KqMsgConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
  33. KafkaVersion: sarama.V1_0_0_0,
  34. OffsetsInitial: sarama.OffsetNewest,
  35. IsReturnErr: false,
  36. },
  37. c.KqMsgBoxConsumerConf.Brokers,
  38. []string{c.KqMsgBoxConsumerConf.Topic},
  39. c.KqMsgBoxConsumerConf.GroupId),
  40. }
  41. go svcCtx.subscribe()
  42. fetchCsCenterInfo(c)
  43. return svcCtx
  44. }
  45. func (s *ServiceContext) Setup(_ sarama.ConsumerGroupSession) error {
  46. return nil
  47. }
  48. func (s *ServiceContext) Cleanup(_ sarama.ConsumerGroupSession) error {
  49. return nil
  50. }
  51. func (s *ServiceContext) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  52. for msg := range claim.Messages() {
  53. if msg.Topic == s.Config.KqMsgBoxConsumerConf.Topic {
  54. s.handleMessage(sess, msg)
  55. }
  56. }
  57. return nil
  58. }
  59. func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
  60. traceId := kafka.GetTraceFromHeader(msg.Headers)
  61. if len(traceId) == 0 {
  62. return
  63. }
  64. trace.RunOnTracing(traceId, func(ctx context.Context) {
  65. logx.WithContext(ctx).Infof("handle message: %s", msg.Value)
  66. var message model.KqMessage
  67. if err := sonic.Unmarshal(msg.Value, &message); err != nil {
  68. logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
  69. return
  70. }
  71. if message.Opt != model.CMD_SEND_MESSAGE {
  72. // 指令异常
  73. return
  74. }
  75. trace.StartTrace(ctx, "InnerServer.handleMessage.SendMessage", func(ctx context.Context) {
  76. if len(message.ReceiverId) == 0 || message.ReceiverId == "" {
  77. // receiverId为空代表这条消息是玩家发送的
  78. // 玩家发的消息,先从connectedMap找对应的客服,没有则从vipMap找,都没有则丢弃信息不投递
  79. if playerInfo := ext.GetConnectedPlayerInfo(message.GameId, message.Uid); playerInfo != nil {
  80. message.ReceiverId = playerInfo.CsId
  81. } else {
  82. if playerInfo := ext.GetVipPlayer(message.GameId, message.Uid); playerInfo != nil {
  83. message.ReceiverId = playerInfo.CsId
  84. } else {
  85. message.ReceiverId = ""
  86. }
  87. }
  88. // 经过填补后receiver_id还是空的则有异常,丢弃信息不投递
  89. if len(message.ReceiverId) != 0 && message.ReceiverId != "" {
  90. logx.WithContext(ctx).Infof("receiver: %s", message.ReceiverId)
  91. kMsg, _ := sonic.MarshalString(message)
  92. s.KqMsgBoxProducer.SendMessage(ctx, kMsg, message.ReceiverId)
  93. } else {
  94. logx.WithContext(ctx).Errorf("can not find receiver of the sender")
  95. }
  96. } else {
  97. // receiverId不为空代表这条消息是客服发的
  98. playerId := strutil.After(message.ReceiverId, message.GameId+"_")
  99. // 判断是不是vip玩家
  100. if playerInfo := ext.GetVipPlayer(message.GameId, playerId); playerInfo != nil {
  101. s.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), message.ReceiverId)
  102. } else {
  103. if playerInfo := ext.GetConnectedPlayerInfo(message.GameId, playerId); playerInfo != nil {
  104. // 客服连接了这个玩家
  105. s.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), message.ReceiverId)
  106. } else {
  107. logx.WithContext(ctx).Errorf("this player is not connected, player id: %s", playerId)
  108. }
  109. }
  110. }
  111. sess.MarkMessage(msg, "")
  112. }, attribute.String("msg.key", string(msg.Key)))
  113. })
  114. }
  115. func (s *ServiceContext) subscribe() {
  116. go s.KqMsgConsumerGroup.RegisterHandleAndConsumer(s)
  117. // 注册事件
  118. event.On(globalkey.EventRemoveTimeoutJob, event.ListenerFunc(func(e event.Event) error {
  119. logx.Info("on event remove timeout job...")
  120. entryId := e.Get("entry_id").(cron.EntryID)
  121. s.TimeoutCron.Remove(entryId)
  122. return nil
  123. }), event.High)
  124. // 初始化定时任务
  125. s.TimeoutCron = cron.New(cron.WithSeconds())
  126. s.TimeoutCron.Start()
  127. }
  128. func fetchCsCenterInfo(c config.Config) {
  129. // mock info
  130. ext.CsInfoMap = treemap.New(treemap.WithGoroutineSafe())
  131. ext.GameVipMap = treemap.New(treemap.WithGoroutineSafe())
  132. ext.GameOnlinePlayerMap = treemap.New(treemap.WithGoroutineSafe())
  133. ext.GameConnectedMap = treemap.New(treemap.WithGoroutineSafe())
  134. ext.WaitingQueue = treemap.New(treemap.WithGoroutineSafe())
  135. go loadMockInfo(c)
  136. }
  137. func loadMockInfo(c config.Config) {
  138. // 加载游戏列表
  139. logx.Info("加载游戏列表")
  140. var gameIds []string
  141. gameIdsData, err := ioutil.ReadFile(c.MockFolder + "/game_id.json")
  142. if err != nil {
  143. logx.Errorf("parse game_id.json has some error: %v", err)
  144. return
  145. }
  146. if err := sonic.Unmarshal(gameIdsData, &gameIds); err != nil {
  147. return
  148. }
  149. // 加载vip玩家信息
  150. logx.Info("加载vip玩家信息")
  151. for _, gameId := range gameIds {
  152. vipPlayerMap := treemap.New(treemap.WithGoroutineSafe())
  153. var playerInfos []*model.PlayerInfo
  154. playerInfosData, _ := ioutil.ReadFile(c.MockFolder + "/" + gameId + ".json")
  155. if err := sonic.Unmarshal(playerInfosData, &playerInfos); err != nil {
  156. return
  157. }
  158. for _, playerInfo := range playerInfos {
  159. vipPlayerMap.Insert(playerInfo.PlayerId, playerInfo)
  160. }
  161. ext.GameVipMap.Insert(gameId, vipPlayerMap)
  162. }
  163. // 加载客服信息
  164. logx.Info("加载客服信息")
  165. var csInfos []*model.CsInfo
  166. csInfosData, err := ioutil.ReadFile(c.MockFolder + "/cs_info.json")
  167. if err := sonic.Unmarshal(csInfosData, &csInfos); err != nil {
  168. return
  169. }
  170. for _, csInfo := range csInfos {
  171. ext.CsInfoMap.Insert(csInfo.CsId, csInfo)
  172. }
  173. }