servicecontext.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package svc
  2. import (
  3. "context"
  4. "github.com/Shopify/sarama"
  5. "github.com/bytedance/sonic"
  6. "github.com/go-redis/redis/v8"
  7. "github.com/gookit/event"
  8. "github.com/zeromicro/go-zero/core/logx"
  9. gozerotrace "github.com/zeromicro/go-zero/core/trace"
  10. "github.com/zeromicro/go-zero/zrpc"
  11. "go.opentelemetry.io/otel"
  12. "go.opentelemetry.io/otel/attribute"
  13. "go.opentelemetry.io/otel/propagation"
  14. oteltrace "go.opentelemetry.io/otel/trace"
  15. "net/http"
  16. "ylink/comm/globalkey"
  17. "ylink/comm/kafka"
  18. "ylink/comm/model"
  19. "ylink/comm/trace"
  20. "ylink/comm/utils"
  21. "ylink/core/inner/rpc/inner"
  22. "ylink/flowsrv/rpc/internal/config"
  23. )
  24. type ServiceContext struct {
  25. Config config.Config
  26. InnerRpc inner.Inner
  27. MessageConsumerGroup *kafka.ConsumerGroup
  28. CommandConsumerGroup *kafka.ConsumerGroup
  29. RedisClient *redis.Client
  30. }
  31. func NewServiceContext(c config.Config) *ServiceContext {
  32. svcCtx := &ServiceContext{
  33. Config: c,
  34. InnerRpc: inner.NewInner(zrpc.MustNewClient(c.InnerRpcConf)),
  35. RedisClient: redis.NewClient(&redis.Options{
  36. Addr: c.Redis.Host,
  37. Password: c.Redis.Pass,
  38. }),
  39. MessageConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
  40. KafkaVersion: sarama.V1_0_0_0,
  41. OffsetsInitial: sarama.OffsetNewest,
  42. IsReturnErr: false,
  43. },
  44. c.KqMsgBoxConsumerConf.Brokers,
  45. []string{c.KqMsgBoxConsumerConf.Topic},
  46. c.KqMsgBoxConsumerConf.GroupId),
  47. CommandConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
  48. KafkaVersion: sarama.V1_0_0_0,
  49. OffsetsInitial: sarama.OffsetNewest,
  50. IsReturnErr: false,
  51. },
  52. c.KqCmdBoxConsumerConf.Brokers,
  53. []string{c.KqCmdBoxConsumerConf.Topic},
  54. c.KqCmdBoxConsumerConf.GroupId),
  55. }
  56. go svcCtx.subscribe()
  57. return svcCtx
  58. }
  59. func (s *ServiceContext) Setup(_ sarama.ConsumerGroupSession) error {
  60. return nil
  61. }
  62. func (s *ServiceContext) Cleanup(_ sarama.ConsumerGroupSession) error {
  63. return nil
  64. }
  65. func (s *ServiceContext) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  66. for msg := range claim.Messages() {
  67. if msg.Topic == s.Config.KqMsgBoxConsumerConf.Topic {
  68. logx.Info("handleMessage")
  69. s.handleMessage(sess, msg)
  70. } else if msg.Topic == s.Config.KqCmdBoxConsumerConf.Topic {
  71. logx.Info("handleCommand")
  72. s.handleCommand(sess, msg)
  73. }
  74. }
  75. return nil
  76. }
  77. func (s *ServiceContext) runWithCtx(callback func(ctx context.Context), kv ...attribute.KeyValue) {
  78. propagator := otel.GetTextMapPropagator()
  79. tracer := otel.GetTracerProvider().Tracer(gozerotrace.TraceName)
  80. ctx := propagator.Extract(context.Background(), propagation.HeaderCarrier(http.Header{}))
  81. spanName := utils.CallerFuncName()
  82. spanCtx, span := tracer.Start(ctx, spanName, oteltrace.WithSpanKind(oteltrace.SpanKindServer), oteltrace.WithAttributes(kv...))
  83. defer span.End()
  84. propagator.Inject(spanCtx, propagation.HeaderCarrier(http.Header{}))
  85. callback(spanCtx)
  86. }
  87. func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
  88. traceId := kafka.GetTraceFromHeader(msg.Headers)
  89. if len(traceId) == 0 {
  90. return
  91. }
  92. trace.RunOnTracing(traceId, func(ctx context.Context) {
  93. var message model.KqMessage
  94. if err := sonic.Unmarshal(msg.Value, &message); err != nil {
  95. logx.Errorf("unmarshal msg error: %v", err)
  96. return
  97. }
  98. logx.WithContext(ctx).Infof("handle message: %s", msg.Value)
  99. trace.StartTrace(ctx, "FlowsrvServer.handleMessage.PushMessage", func(ctx context.Context) {
  100. // 投递到receiver_id对应的redis队列暂存
  101. intCmd := s.RedisClient.LPush(ctx, message.ReceiverId, string(msg.Value))
  102. if size, err := intCmd.Result(); err != nil {
  103. logx.WithContext(ctx).Errorf("push message rmq err %v", err)
  104. } else {
  105. logx.WithContext(ctx).Infof("current rmq size: %d", size)
  106. }
  107. sess.MarkMessage(msg, "")
  108. }, attribute.String("msg.key", string(msg.Key)))
  109. })
  110. }
  111. func (s *ServiceContext) handleCommand(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
  112. traceId := kafka.GetTraceFromHeader(msg.Headers)
  113. if len(traceId) == 0 {
  114. return
  115. }
  116. trace.RunOnTracing(traceId, func(ctx context.Context) {
  117. var message model.KqCmdMessage
  118. if err := sonic.Unmarshal(msg.Value, &message); err != nil {
  119. logx.Errorf("unmarshal msg error: %v", err)
  120. return
  121. }
  122. logx.WithContext(ctx).Infof("handle command: %s", msg.Value)
  123. trace.StartTrace(ctx, "FlowsrvServer.handleCommand.PushMessage", func(ctx context.Context) {
  124. // 投递到receiver_id对应的redis队列暂存
  125. intCmd := s.RedisClient.LPush(ctx, message.ReceiverId, string(msg.Value))
  126. if size, err := intCmd.Result(); err != nil {
  127. logx.WithContext(ctx).Errorf("push message rmq err %v", err)
  128. } else {
  129. logx.WithContext(ctx).Infof("current rmq size: %d", size)
  130. }
  131. sess.MarkMessage(msg, "")
  132. }, attribute.String("msg.key", string(msg.Key)))
  133. })
  134. }
  135. func (s *ServiceContext) subscribe() {
  136. go s.MessageConsumerGroup.RegisterHandleAndConsumer(s)
  137. go s.CommandConsumerGroup.RegisterHandleAndConsumer(s)
  138. // 注册事件
  139. event.On(globalkey.EventUnsubscribeRmq, event.ListenerFunc(func(e event.Event) error {
  140. return nil
  141. }), event.High)
  142. }