servicecontext.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package svc
  2. import (
  3. "context"
  4. "github.com/Shopify/sarama"
  5. "github.com/bytedance/sonic"
  6. "github.com/go-redis/redis/v8"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. gozerotrace "github.com/zeromicro/go-zero/core/trace"
  9. "github.com/zeromicro/go-zero/zrpc"
  10. "go.opentelemetry.io/otel"
  11. "go.opentelemetry.io/otel/attribute"
  12. "go.opentelemetry.io/otel/propagation"
  13. oteltrace "go.opentelemetry.io/otel/trace"
  14. "net/http"
  15. "ylink/comm/kafka"
  16. "ylink/comm/model"
  17. "ylink/comm/trace"
  18. "ylink/comm/utils"
  19. "ylink/core/inner/rpc/inner"
  20. "ylink/flowsrv/rpc/internal/config"
  21. )
  22. type ServiceContext struct {
  23. Config config.Config
  24. InnerRpc inner.Inner
  25. MessageConsumerGroup *kafka.ConsumerGroup
  26. CommandConsumerGroup *kafka.ConsumerGroup
  27. RedisClient *redis.Client
  28. }
  29. func NewServiceContext(c config.Config) *ServiceContext {
  30. svcCtx := &ServiceContext{
  31. Config: c,
  32. InnerRpc: inner.NewInner(zrpc.MustNewClient(c.InnerRpcConf)),
  33. RedisClient: redis.NewClient(&redis.Options{
  34. Addr: c.Redis.Host,
  35. Password: c.Redis.Pass,
  36. }),
  37. MessageConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
  38. KafkaVersion: sarama.V1_0_0_0,
  39. OffsetsInitial: sarama.OffsetNewest,
  40. IsReturnErr: false,
  41. },
  42. c.KqMsgBoxConsumerConf.Brokers,
  43. []string{c.KqMsgBoxConsumerConf.Topic},
  44. c.KqMsgBoxConsumerConf.GroupId),
  45. CommandConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
  46. KafkaVersion: sarama.V1_0_0_0,
  47. OffsetsInitial: sarama.OffsetNewest,
  48. IsReturnErr: false,
  49. },
  50. c.KqCmdBoxConsumerConf.Brokers,
  51. []string{c.KqCmdBoxConsumerConf.Topic},
  52. c.KqCmdBoxConsumerConf.GroupId),
  53. }
  54. go svcCtx.subscribe()
  55. return svcCtx
  56. }
  57. func (s *ServiceContext) Setup(_ sarama.ConsumerGroupSession) error {
  58. return nil
  59. }
  60. func (s *ServiceContext) Cleanup(_ sarama.ConsumerGroupSession) error {
  61. return nil
  62. }
  63. func (s *ServiceContext) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  64. for msg := range claim.Messages() {
  65. if msg.Topic == s.Config.KqMsgBoxConsumerConf.Topic {
  66. logx.Info("handleMessage")
  67. s.handleMessage(sess, msg)
  68. } else if msg.Topic == s.Config.KqCmdBoxConsumerConf.Topic {
  69. logx.Info("handleCommand")
  70. s.handleCommand(sess, msg)
  71. }
  72. }
  73. return nil
  74. }
  75. func (s *ServiceContext) runWithCtx(callback func(ctx context.Context), kv ...attribute.KeyValue) {
  76. propagator := otel.GetTextMapPropagator()
  77. tracer := otel.GetTracerProvider().Tracer(gozerotrace.TraceName)
  78. ctx := propagator.Extract(context.Background(), propagation.HeaderCarrier(http.Header{}))
  79. spanName := utils.CallerFuncName()
  80. spanCtx, span := tracer.Start(ctx, spanName, oteltrace.WithSpanKind(oteltrace.SpanKindServer), oteltrace.WithAttributes(kv...))
  81. defer span.End()
  82. propagator.Inject(spanCtx, propagation.HeaderCarrier(http.Header{}))
  83. callback(spanCtx)
  84. }
  85. func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
  86. traceId := kafka.GetTraceFromHeader(msg.Headers)
  87. if len(traceId) == 0 {
  88. return
  89. }
  90. trace.RunOnTracing(traceId, func(ctx context.Context) {
  91. var message model.KqMessage
  92. if err := sonic.Unmarshal(msg.Value, &message); err != nil {
  93. logx.Errorf("unmarshal msg error: %v", err)
  94. return
  95. }
  96. trace.StartTrace(ctx, "FlowsrvServer.handleMessage.PushMessage", func(ctx context.Context) {
  97. // 投递到receiver_id对应的redis队列暂存
  98. intCmd := s.RedisClient.LPush(ctx, message.ReceiverId, string(msg.Value))
  99. if size, err := intCmd.Result(); err != nil {
  100. logx.WithContext(ctx).Errorf("push message rmq err %v", err)
  101. } else {
  102. logx.WithContext(ctx).Infof("current rmq size: %d", size)
  103. }
  104. sess.MarkMessage(msg, "")
  105. }, attribute.String("msg.key", string(msg.Key)))
  106. })
  107. }
  108. func (s *ServiceContext) handleCommand(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
  109. traceId := kafka.GetTraceFromHeader(msg.Headers)
  110. if len(traceId) == 0 {
  111. return
  112. }
  113. trace.RunOnTracing(traceId, func(ctx context.Context) {
  114. var message model.KqCmdMessage
  115. if err := sonic.Unmarshal(msg.Value, &message); err != nil {
  116. logx.Errorf("unmarshal msg error: %v", err)
  117. return
  118. }
  119. trace.StartTrace(ctx, "FlowsrvServer.handleCommand.PushMessage", func(ctx context.Context) {
  120. logx.WithContext(ctx).Infof("recv command: %v", message)
  121. //// 投递到receiver_id对应的redis队列暂存
  122. //intCmd := s.RedisClient.LPush(ctx, message.ReceiverId, string(msg.Value))
  123. //if size, err := intCmd.Result(); err != nil {
  124. // logx.WithContext(ctx).Errorf("push message rmq err %v", err)
  125. //} else {
  126. // logx.WithContext(ctx).Infof("current rmq size: %d", size)
  127. //}
  128. sess.MarkMessage(msg, "")
  129. }, attribute.String("msg.key", string(msg.Key)))
  130. })
  131. }
  132. func (s *ServiceContext) subscribe() {
  133. go s.MessageConsumerGroup.RegisterHandleAndConsumer(s)
  134. go s.CommandConsumerGroup.RegisterHandleAndConsumer(s)
  135. }