servicecontext.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package svc
  2. import (
  3. "context"
  4. "encoding/json"
  5. "github.com/Shopify/sarama"
  6. "github.com/go-redis/redis/v8"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. gozerotrace "github.com/zeromicro/go-zero/core/trace"
  9. "github.com/zeromicro/go-zero/zrpc"
  10. "go.opentelemetry.io/otel"
  11. "go.opentelemetry.io/otel/attribute"
  12. "go.opentelemetry.io/otel/propagation"
  13. oteltrace "go.opentelemetry.io/otel/trace"
  14. "net/http"
  15. "ylink/comm/kafka"
  16. "ylink/comm/model"
  17. "ylink/comm/trace"
  18. "ylink/comm/utils"
  19. "ylink/core/inner/rpc/inner"
  20. "ylink/flowsrv/rpc/internal/config"
  21. )
  22. type ServiceContext struct {
  23. Config config.Config
  24. InnerRpc inner.Inner
  25. ConsumerGroup *kafka.ConsumerGroup
  26. RedisClient *redis.Client
  27. }
  28. func NewServiceContext(c config.Config) *ServiceContext {
  29. svcCtx := &ServiceContext{
  30. Config: c,
  31. InnerRpc: inner.NewInner(zrpc.MustNewClient(c.InnerRpcConf)),
  32. RedisClient: redis.NewClient(&redis.Options{
  33. Addr: c.Redis.Host,
  34. Password: c.Redis.Pass,
  35. }),
  36. //RedisClient: redis.New(c.Redis.Host, func(r *redis.Redis) {
  37. // r.Type = c.Redis.Type
  38. // r.Pass = c.Redis.Pass
  39. //}),
  40. ConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
  41. KafkaVersion: sarama.V1_0_0_0,
  42. OffsetsInitial: sarama.OffsetNewest,
  43. IsReturnErr: false,
  44. },
  45. c.KqMsgBoxConsumerConf.Brokers,
  46. []string{c.KqMsgBoxConsumerConf.Topic},
  47. c.KqMsgBoxConsumerConf.GroupId),
  48. }
  49. go svcCtx.subscribe()
  50. return svcCtx
  51. }
  52. func (s *ServiceContext) Setup(_ sarama.ConsumerGroupSession) error {
  53. return nil
  54. }
  55. func (s *ServiceContext) Cleanup(_ sarama.ConsumerGroupSession) error {
  56. return nil
  57. }
  58. func (s *ServiceContext) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  59. for msg := range claim.Messages() {
  60. if msg.Topic == s.Config.KqMsgBoxConsumerConf.Topic {
  61. s.handleMessage(sess, msg)
  62. }
  63. }
  64. return nil
  65. }
  66. func (s *ServiceContext) runWithCtx(callback func(ctx context.Context), kv ...attribute.KeyValue) {
  67. propagator := otel.GetTextMapPropagator()
  68. tracer := otel.GetTracerProvider().Tracer(gozerotrace.TraceName)
  69. ctx := propagator.Extract(context.Background(), propagation.HeaderCarrier(http.Header{}))
  70. spanName := utils.CallerFuncName()
  71. spanCtx, span := tracer.Start(ctx, spanName, oteltrace.WithSpanKind(oteltrace.SpanKindServer), oteltrace.WithAttributes(kv...))
  72. defer span.End()
  73. propagator.Inject(spanCtx, propagation.HeaderCarrier(http.Header{}))
  74. callback(spanCtx)
  75. }
  76. func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
  77. traceId := kafka.GetTraceFromHeader(msg.Headers)
  78. if len(traceId) == 0 {
  79. return
  80. }
  81. trace.RunOnTracing(traceId, func(ctx context.Context) {
  82. var message model.KqMessage
  83. if err := json.Unmarshal(msg.Value, &message); err != nil {
  84. logx.Errorf("unmarshal msg error: %v", err)
  85. return
  86. }
  87. trace.StartTrace(ctx, "FlowsrvServer.handleMessage.PushMessage", func(ctx context.Context) {
  88. // 投递到receiver_id对应的redis队列暂存
  89. intCmd := s.RedisClient.LPush(ctx, message.ReceiverId, string(msg.Value))
  90. if size, err := intCmd.Result(); err != nil {
  91. logx.WithContext(ctx).Errorf("push message rmq err %v", err)
  92. } else {
  93. logx.WithContext(ctx).Infof("current rmq size: %d", size)
  94. }
  95. //if _, err := s.RedisClient.(message.ReceiverId, string(msg.Value)); err != nil {
  96. // logx.WithContext(ctx).Errorf("push message err %v", err)
  97. //}
  98. sess.MarkMessage(msg, "")
  99. }, attribute.String("msg.key", string(msg.Key)))
  100. })
  101. }
  102. func (s *ServiceContext) subscribe() {
  103. go s.ConsumerGroup.RegisterHandleAndConsumer(s)
  104. }