servicecontext.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package svc
  2. import (
  3. "context"
  4. "github.com/Shopify/sarama"
  5. "github.com/bytedance/sonic"
  6. "github.com/go-redis/redis/v8"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. gozerotrace "github.com/zeromicro/go-zero/core/trace"
  9. "github.com/zeromicro/go-zero/zrpc"
  10. "go.opentelemetry.io/otel"
  11. "go.opentelemetry.io/otel/attribute"
  12. "go.opentelemetry.io/otel/propagation"
  13. oteltrace "go.opentelemetry.io/otel/trace"
  14. "net/http"
  15. "ylink/comm/globalkey"
  16. "ylink/comm/kafka"
  17. "ylink/comm/model"
  18. "ylink/comm/result"
  19. "ylink/comm/trace"
  20. "ylink/comm/utils"
  21. "ylink/core/inner/rpc/inner"
  22. "ylink/flowsrv/rpc/flowsrv"
  23. "ylink/flowsrv/rpc/internal/config"
  24. "ylink/flowsrv/rpc/internal/mgr"
  25. model2 "ylink/flowsrv/rpc/internal/model"
  26. //model2 "ylink/flowsrv/rpc/internal/model"
  27. )
  28. type ServiceContext struct {
  29. Config config.Config
  30. InnerRpc inner.Inner
  31. MessageConsumerGroup *kafka.ConsumerGroup
  32. CommandConsumerGroup *kafka.ConsumerGroup
  33. RedisClient *redis.Client
  34. }
  35. func NewServiceContext(c config.Config) *ServiceContext {
  36. svcCtx := &ServiceContext{
  37. Config: c,
  38. InnerRpc: inner.NewInner(zrpc.MustNewClient(c.InnerRpcConf)),
  39. RedisClient: redis.NewClient(&redis.Options{
  40. Addr: c.Redis.Host,
  41. Password: c.Redis.Pass,
  42. }),
  43. MessageConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
  44. KafkaVersion: sarama.V1_0_0_0,
  45. OffsetsInitial: sarama.OffsetNewest,
  46. IsReturnErr: false,
  47. },
  48. c.KqMsgBoxConsumerConf.Brokers,
  49. []string{c.KqMsgBoxConsumerConf.Topic},
  50. c.KqMsgBoxConsumerConf.GroupId),
  51. CommandConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
  52. KafkaVersion: sarama.V1_0_0_0,
  53. OffsetsInitial: sarama.OffsetNewest,
  54. IsReturnErr: false,
  55. },
  56. c.KqCmdBoxConsumerConf.Brokers,
  57. []string{c.KqCmdBoxConsumerConf.Topic},
  58. c.KqCmdBoxConsumerConf.GroupId),
  59. }
  60. go svcCtx.subscribe()
  61. return svcCtx
  62. }
  63. func (s *ServiceContext) Setup(_ sarama.ConsumerGroupSession) error {
  64. return nil
  65. }
  66. func (s *ServiceContext) Cleanup(_ sarama.ConsumerGroupSession) error {
  67. return nil
  68. }
  69. func (s *ServiceContext) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  70. for msg := range claim.Messages() {
  71. if msg.Topic == s.Config.KqMsgBoxConsumerConf.Topic {
  72. logx.Info("handleMessage")
  73. s.handleMessage(sess, msg)
  74. } else if msg.Topic == s.Config.KqCmdBoxConsumerConf.Topic {
  75. logx.Info("handleCommand")
  76. s.handleCommand(sess, msg)
  77. }
  78. }
  79. return nil
  80. }
  81. func (s *ServiceContext) runWithCtx(callback func(ctx context.Context), kv ...attribute.KeyValue) {
  82. propagator := otel.GetTextMapPropagator()
  83. tracer := otel.GetTracerProvider().Tracer(gozerotrace.TraceName)
  84. ctx := propagator.Extract(context.Background(), propagation.HeaderCarrier(http.Header{}))
  85. spanName := utils.CallerFuncName()
  86. spanCtx, span := tracer.Start(ctx, spanName, oteltrace.WithSpanKind(oteltrace.SpanKindServer), oteltrace.WithAttributes(kv...))
  87. defer span.End()
  88. propagator.Inject(spanCtx, propagation.HeaderCarrier(http.Header{}))
  89. callback(spanCtx)
  90. }
  91. func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
  92. traceId := kafka.GetTraceFromHeader(msg.Headers)
  93. if len(traceId) == 0 {
  94. return
  95. }
  96. trace.RunOnTracing(traceId, func(ctx context.Context) {
  97. logx.WithContext(ctx).Infof("handle message: %s", msg.Value)
  98. var message model.KqMessage
  99. if err := sonic.Unmarshal(msg.Value, &message); err != nil {
  100. logx.Errorf("unmarshal msg error: %v", err)
  101. return
  102. }
  103. if message.Opt != model.CMD_SEND_MESSAGE {
  104. // 指令异常
  105. return
  106. }
  107. var chatMessage model.ChatMessage
  108. if err := sonic.Unmarshal([]byte(message.Payload), &chatMessage); err != nil {
  109. logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
  110. return
  111. }
  112. trace.StartTrace(ctx, "FlowsrvServer.handleMessage.PushMessage", func(ctx context.Context) {
  113. // 投递到receiver_id对应的redis队列暂存
  114. intCmd := s.RedisClient.LPush(ctx, chatMessage.ReceiverId, string(msg.Value))
  115. if size, err := intCmd.Result(); err != nil {
  116. logx.WithContext(ctx).Errorf("push message rmq err %v", err)
  117. } else {
  118. logx.WithContext(ctx).Infof("current rmq size: %d", size)
  119. }
  120. sess.MarkMessage(msg, "")
  121. }, attribute.String("msg.key", string(msg.Key)))
  122. })
  123. }
  124. func (s *ServiceContext) handleCommand(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
  125. traceId := kafka.GetTraceFromHeader(msg.Headers)
  126. if len(traceId) == 0 {
  127. return
  128. }
  129. trace.RunOnTracing(traceId, func(ctx context.Context) {
  130. logx.WithContext(ctx).Infof("handle command: %s", msg.Value)
  131. var message model.KqMessage
  132. if err := sonic.Unmarshal(msg.Value, &message); err != nil {
  133. logx.Errorf("unmarshal msg error: %v", err)
  134. return
  135. }
  136. if message.Opt == model.CMD_SEND_MESSAGE {
  137. // 指令异常
  138. return
  139. }
  140. var cmdMessage model.CommandMessage
  141. if err := sonic.Unmarshal([]byte(message.Payload), &cmdMessage); err != nil {
  142. logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
  143. return
  144. }
  145. // 投递到receiver_id对应的redis队列暂存
  146. switch cmdMessage.ReceiverId {
  147. case globalkey.All:
  148. case globalkey.AllPlayer:
  149. case globalkey.AllVipPlayer:
  150. case globalkey.AllNormalPlayer:
  151. for iter := mgr.GetFlowMgrInstance().All().Begin(); iter.IsValid(); iter.Next() {
  152. flow := iter.Value().(*model2.Flow)
  153. if flow.Type != globalkey.ConnectTypeNormalPlayer {
  154. continue
  155. }
  156. err := flow.Stream.Send(&flowsrv.CommandResp{
  157. Code: result.Ok,
  158. Msg: "success",
  159. Data: msg.Value,
  160. })
  161. if err != nil {
  162. logx.WithContext(ctx).Errorf("%v", err)
  163. }
  164. }
  165. case globalkey.AllCs:
  166. default:
  167. trace.StartTrace(ctx, "FlowsrvServer.handleCommand.PushCmdMessage", func(ctx context.Context) {
  168. intCmd := s.RedisClient.LPush(ctx, cmdMessage.ReceiverId, string(msg.Value))
  169. if size, err := intCmd.Result(); err != nil {
  170. logx.WithContext(ctx).Errorf("push message rmq err %v", err)
  171. } else {
  172. logx.WithContext(ctx).Infof("current rmq size: %d", size)
  173. }
  174. }, attribute.String("msg.key", string(msg.Key)))
  175. sess.MarkMessage(msg, "")
  176. }
  177. })
  178. }
  179. func (s *ServiceContext) subscribe() {
  180. go s.MessageConsumerGroup.RegisterHandleAndConsumer(s)
  181. go s.CommandConsumerGroup.RegisterHandleAndConsumer(s)
  182. //event.On(globalkey.EventHandleRmqJob, event.ListenerFunc(func(e event.Event) error {
  183. // resultCmd := flow.SvcCtx.RedisClient.BRPop(ctx, 30*time.Second, flow.FlowId)
  184. // if message, err := resultCmd.Result(); err != nil {
  185. // logx.WithContext(ctx).Errorf("get message from redis, key: %s, err: %v", flow.FlowId, err)
  186. // } else {
  187. // trace.StartTrace(ctx, "FlowsrvServer.flowmgr.handleRmqMessage", func(ctx context.Context) {
  188. // flow.Message <- message[1]
  189. // })
  190. // }
  191. // return nil
  192. //}), event.High)
  193. // 注册事件
  194. //event.On(globalkey.EventUnsubscribeRmqJob, event.ListenerFunc(func(e event.Event) error {
  195. //
  196. // return nil
  197. //}), event.High)
  198. //event.On(globalkey.EventNotifyUserOfflineJob, event.ListenerFunc(func(e event.Event) error {
  199. // traceId := e.Get("trace_id").(string)
  200. // uType := e.Get("type").(int32)
  201. // uid := e.Get("uid").(string)
  202. // gameId := e.Get("game_id").(string)
  203. // trace.RunOnTracing(traceId, func(ctx context.Context) {
  204. // trace.StartTrace(ctx, "FlowsrvServer.EventNotifyUserOfflineJob.handleUserOffline", func(ctx context.Context) {
  205. // _, err := s.InnerRpc.NotifyUserOffline(ctx, &inner.NotifyUserStatusReq{
  206. // Type: uType,
  207. // Uid: uid,
  208. // GameId: gameId,
  209. // })
  210. // if err != nil {
  211. // logx.WithContext(ctx).Errorf("notify user offline has some error: %v", err)
  212. // }
  213. // })
  214. // })
  215. // return nil
  216. //}), event.High)
  217. }