servicecontext.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package svc
  2. import (
  3. "context"
  4. "encoding/json"
  5. "github.com/Shopify/sarama"
  6. "github.com/liyue201/gostl/ds/list/simplelist"
  7. treemap "github.com/liyue201/gostl/ds/map"
  8. "github.com/zeromicro/go-zero/core/logx"
  9. "go.opentelemetry.io/otel/attribute"
  10. "ylink/comm/kafka"
  11. "ylink/comm/model"
  12. "ylink/comm/trace"
  13. "ylink/core/inner/rpc/internal/config"
  14. "ylink/core/inner/rpc/internal/ext"
  15. )
  16. type ServiceContext struct {
  17. Config config.Config
  18. KqMsgBoxProducer *kafka.Producer
  19. ConsumerGroup *kafka.ConsumerGroup
  20. }
  21. func NewServiceContext(c config.Config) *ServiceContext {
  22. fetchCsCenterInfo()
  23. svcCtx := &ServiceContext{
  24. Config: c,
  25. KqMsgBoxProducer: kafka.NewKafkaProducer(c.KqMsgBoxProducerConf.Brokers, c.KqMsgBoxProducerConf.Topic),
  26. ConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
  27. KafkaVersion: sarama.V1_0_0_0,
  28. OffsetsInitial: sarama.OffsetNewest,
  29. IsReturnErr: false,
  30. },
  31. c.KqMsgBoxConsumerConf.Brokers,
  32. []string{c.KqMsgBoxConsumerConf.Topic},
  33. c.KqMsgBoxConsumerConf.GroupId),
  34. }
  35. go svcCtx.subscribe()
  36. go fetchCsCenterInfo()
  37. return svcCtx
  38. }
  39. func (s *ServiceContext) Setup(_ sarama.ConsumerGroupSession) error {
  40. return nil
  41. }
  42. func (s *ServiceContext) Cleanup(_ sarama.ConsumerGroupSession) error {
  43. return nil
  44. }
  45. func (s *ServiceContext) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  46. for msg := range claim.Messages() {
  47. if msg.Topic == s.Config.KqMsgBoxConsumerConf.Topic {
  48. s.handleMessage(sess, msg)
  49. }
  50. }
  51. return nil
  52. }
  53. func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
  54. traceId := kafka.GetTraceFromHeader(msg.Headers)
  55. if len(traceId) == 0 {
  56. return
  57. }
  58. trace.RunOnTracing(traceId, func(ctx context.Context) {
  59. var message model.KqMessage
  60. if err := json.Unmarshal(msg.Value, &message); err != nil {
  61. logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
  62. return
  63. }
  64. logx.WithContext(ctx).Infof("handle message: %s", msg.Value)
  65. trace.StartTrace(ctx, "InnerServer.handleMessage.SendMessage", func(ctx context.Context) {
  66. if len(message.ReceiverId) == 0 || message.ReceiverId == "" {
  67. // 玩家发的消息
  68. p2cMap := ext.GameVipMap.Get(message.GameId).(*treemap.Map)
  69. message.ReceiverId = p2cMap.Get(message.SenderId).(string)
  70. logx.WithContext(ctx).Infof("receiver: %s", message.ReceiverId)
  71. kMsg, _ := json.Marshal(message)
  72. s.KqMsgBoxProducer.SendMessage(ctx, string(kMsg), message.ReceiverId)
  73. } else {
  74. s.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), message.ReceiverId)
  75. }
  76. sess.MarkMessage(msg, "")
  77. }, attribute.String("msg.key", string(msg.Key)))
  78. })
  79. }
  80. func (s *ServiceContext) subscribe() {
  81. go s.ConsumerGroup.RegisterHandleAndConsumer(s)
  82. }
  83. func fetchCsCenterInfo() {
  84. // mock info
  85. ext.Game2PlayerStatusMap = treemap.New(treemap.WithGoroutineSafe())
  86. ext.GameConnMap = treemap.New(treemap.WithGoroutineSafe())
  87. ext.WaitingQueue = simplelist.New()
  88. mockInfo()
  89. }
  90. func loadGameList() {
  91. }
  92. func loadCsInfo() {
  93. ext.CsInfoMap = treemap.New(treemap.WithGoroutineSafe())
  94. ext.CsInfoMap.Insert("cs_1231", &model.CsInfo{
  95. CsId: "cs_1231",
  96. CsNickname: "客服1231",
  97. CsAvatarUrl: "https://www.baidu.com",
  98. CsSignature: "我是客服1231",
  99. OnlineStatus: 0,
  100. })
  101. ext.CsInfoMap.Insert("cs_1111", &model.CsInfo{
  102. CsId: "cs_1111",
  103. CsNickname: "客服1111",
  104. CsAvatarUrl: "https://www.baidu.com",
  105. CsSignature: "我是客服1111",
  106. OnlineStatus: 0,
  107. })
  108. ext.CsInfoMap.Insert("cs_2222", &model.CsInfo{
  109. CsId: "cs_2222",
  110. CsNickname: "客服2222",
  111. CsAvatarUrl: "https://www.baidu.com",
  112. CsSignature: "我是客服2222",
  113. OnlineStatus: 0,
  114. })
  115. }
  116. func mockInfo() {
  117. ext.GameVipMap = treemap.New(treemap.WithGoroutineSafe())
  118. ext.CsInfoMap = treemap.New(treemap.WithGoroutineSafe())
  119. // 已连接的映射
  120. // 专属客服映射
  121. game1231P2cMap := treemap.New(treemap.WithGoroutineSafe())
  122. game1231P2cMap.Insert("player1231", "cs_1231")
  123. game1231P2cMap.Insert("player1111", "cs_2222")
  124. game1111P2cMap := treemap.New(treemap.WithGoroutineSafe())
  125. game1111P2cMap.Insert("player1231", "cs_1111")
  126. ext.GameVipMap.Insert("game1231", game1231P2cMap)
  127. ext.GameVipMap.Insert("game1111", game1111P2cMap)
  128. ext.CsInfoMap.Insert("cs_1231", &model.CsInfo{
  129. CsId: "cs_1231",
  130. CsNickname: "客服1231",
  131. CsAvatarUrl: "https://www.baidu.com",
  132. CsSignature: "我是客服1231",
  133. OnlineStatus: 0,
  134. })
  135. ext.CsInfoMap.Insert("cs_1111", &model.CsInfo{
  136. CsId: "cs_1111",
  137. CsNickname: "客服1111",
  138. CsAvatarUrl: "https://www.baidu.com",
  139. CsSignature: "我是客服1111",
  140. OnlineStatus: 0,
  141. })
  142. ext.CsInfoMap.Insert("cs_2222", &model.CsInfo{
  143. CsId: "cs_2222",
  144. CsNickname: "客服2222",
  145. CsAvatarUrl: "https://www.baidu.com",
  146. CsSignature: "我是客服2222",
  147. OnlineStatus: 0,
  148. })
  149. }