servicecontext.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package svc
  2. import (
  3. "context"
  4. "github.com/Shopify/sarama"
  5. "github.com/bytedance/sonic"
  6. "github.com/go-redis/redis/v8"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. gozerotrace "github.com/zeromicro/go-zero/core/trace"
  9. "github.com/zeromicro/go-zero/zrpc"
  10. "go.opentelemetry.io/otel"
  11. "go.opentelemetry.io/otel/attribute"
  12. "go.opentelemetry.io/otel/propagation"
  13. oteltrace "go.opentelemetry.io/otel/trace"
  14. "net/http"
  15. "ylink/comm/globalkey"
  16. "ylink/comm/kafka"
  17. "ylink/comm/model"
  18. "ylink/comm/result"
  19. "ylink/comm/trace"
  20. "ylink/comm/utils"
  21. "ylink/core/inner/rpc/inner"
  22. "ylink/flowsrv/rpc/flowsrv"
  23. "ylink/flowsrv/rpc/internal/config"
  24. "ylink/flowsrv/rpc/internal/mgr"
  25. model2 "ylink/flowsrv/rpc/internal/model"
  26. )
  27. type ServiceContext struct {
  28. Config config.Config
  29. InnerRpc inner.Inner
  30. MessageConsumerGroup *kafka.ConsumerGroup
  31. CommandConsumerGroup *kafka.ConsumerGroup
  32. RedisClient *redis.Client
  33. }
  34. func NewServiceContext(c config.Config) *ServiceContext {
  35. svcCtx := &ServiceContext{
  36. Config: c,
  37. InnerRpc: inner.NewInner(zrpc.MustNewClient(c.InnerRpcConf)),
  38. RedisClient: redis.NewClient(&redis.Options{
  39. Addr: c.Redis.Host,
  40. Password: c.Redis.Pass,
  41. }),
  42. MessageConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
  43. KafkaVersion: sarama.V1_0_0_0,
  44. OffsetsInitial: sarama.OffsetNewest,
  45. IsReturnErr: false,
  46. },
  47. c.KqMsgBoxConsumerConf.Brokers,
  48. []string{c.KqMsgBoxConsumerConf.Topic},
  49. c.KqMsgBoxConsumerConf.GroupId),
  50. CommandConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
  51. KafkaVersion: sarama.V1_0_0_0,
  52. OffsetsInitial: sarama.OffsetNewest,
  53. IsReturnErr: false,
  54. },
  55. c.KqCmdBoxConsumerConf.Brokers,
  56. []string{c.KqCmdBoxConsumerConf.Topic},
  57. c.KqCmdBoxConsumerConf.GroupId),
  58. }
  59. go svcCtx.subscribe()
  60. return svcCtx
  61. }
  62. func (s *ServiceContext) Setup(_ sarama.ConsumerGroupSession) error {
  63. return nil
  64. }
  65. func (s *ServiceContext) Cleanup(_ sarama.ConsumerGroupSession) error {
  66. return nil
  67. }
  68. func (s *ServiceContext) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  69. for msg := range claim.Messages() {
  70. if msg.Topic == s.Config.KqMsgBoxConsumerConf.Topic {
  71. s.handleMessage(sess, msg)
  72. } else if msg.Topic == s.Config.KqCmdBoxConsumerConf.Topic {
  73. s.handleCommand(sess, msg)
  74. }
  75. }
  76. return nil
  77. }
  78. func (s *ServiceContext) runWithCtx(callback func(ctx context.Context), kv ...attribute.KeyValue) {
  79. propagator := otel.GetTextMapPropagator()
  80. tracer := otel.GetTracerProvider().Tracer(gozerotrace.TraceName)
  81. ctx := propagator.Extract(context.Background(), propagation.HeaderCarrier(http.Header{}))
  82. spanName := utils.CallerFuncName()
  83. spanCtx, span := tracer.Start(ctx, spanName, oteltrace.WithSpanKind(oteltrace.SpanKindServer), oteltrace.WithAttributes(kv...))
  84. defer span.End()
  85. propagator.Inject(spanCtx, propagation.HeaderCarrier(http.Header{}))
  86. callback(spanCtx)
  87. }
  88. func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
  89. traceId := kafka.GetTraceFromHeader(msg.Headers)
  90. if len(traceId) == 0 {
  91. return
  92. }
  93. trace.RunOnTracing(traceId, func(ctx context.Context) {
  94. logx.WithContext(ctx).Infof("handle message: %s", msg.Value)
  95. var message model.KqMessage
  96. if err := sonic.Unmarshal(msg.Value, &message); err != nil {
  97. logx.Errorf("unmarshal msg error: %v", err)
  98. return
  99. }
  100. if message.Opt != model.CMD_SEND_MESSAGE {
  101. // 指令异常
  102. return
  103. }
  104. trace.StartTrace(ctx, "FlowsrvServer.handleMessage.PushMessage", func(ctx context.Context) {
  105. // 投递到receiver_id对应的redis队列暂存
  106. intCmd := s.RedisClient.LPush(ctx, message.ReceiverId, string(msg.Value))
  107. if size, err := intCmd.Result(); err != nil {
  108. logx.WithContext(ctx).Errorf("push message rmq err %v", err)
  109. } else {
  110. logx.WithContext(ctx).Infof("current rmq size: %d", size)
  111. }
  112. sess.MarkMessage(msg, "")
  113. }, attribute.String("msg.key", string(msg.Key)))
  114. })
  115. }
  116. func (s *ServiceContext) handleCommand(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
  117. traceId := kafka.GetTraceFromHeader(msg.Headers)
  118. if len(traceId) == 0 {
  119. return
  120. }
  121. trace.RunOnTracing(traceId, func(ctx context.Context) {
  122. logx.WithContext(ctx).Infof("handle command: %s", msg.Value)
  123. var message model.KqMessage
  124. if err := sonic.Unmarshal(msg.Value, &message); err != nil {
  125. logx.Errorf("unmarshal msg error: %v", err)
  126. return
  127. }
  128. if message.Opt == model.CMD_SEND_MESSAGE {
  129. // 指令异常
  130. return
  131. }
  132. // 投递到receiver_id对应的redis队列暂存
  133. switch message.ReceiverId {
  134. case globalkey.All:
  135. case globalkey.AllPlayer:
  136. case globalkey.AllVipPlayer:
  137. case globalkey.AllNormalPlayer:
  138. for iter := mgr.GetFlowMgrInstance().All().Begin(); iter.IsValid(); iter.Next() {
  139. flow := iter.Value().(*model2.Flow)
  140. if flow.Type != globalkey.ConnectTypeNormalPlayer {
  141. continue
  142. }
  143. err := flow.Stream.Send(&flowsrv.CommandResp{
  144. Code: result.Ok,
  145. Msg: "success",
  146. Data: msg.Value,
  147. })
  148. if err != nil {
  149. logx.WithContext(ctx).Errorf("%v", err)
  150. }
  151. }
  152. case globalkey.AllCs:
  153. default:
  154. trace.StartTrace(ctx, "FlowsrvServer.handleCommand.PushCmdMessage", func(ctx context.Context) {
  155. intCmd := s.RedisClient.LPush(ctx, message.ReceiverId, string(msg.Value))
  156. if size, err := intCmd.Result(); err != nil {
  157. logx.WithContext(ctx).Errorf("push message rmq err %v", err)
  158. } else {
  159. logx.WithContext(ctx).Infof("current rmq size: %d", size)
  160. }
  161. }, attribute.String("msg.key", string(msg.Key)))
  162. sess.MarkMessage(msg, "")
  163. }
  164. })
  165. }
  166. func (s *ServiceContext) subscribe() {
  167. go s.MessageConsumerGroup.RegisterHandleAndConsumer(s)
  168. go s.CommandConsumerGroup.RegisterHandleAndConsumer(s)
  169. }