servicecontext.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package svc
  2. import (
  3. "context"
  4. "encoding/json"
  5. "github.com/Shopify/sarama"
  6. "github.com/zeromicro/go-zero/core/logx"
  7. gozerotrace "github.com/zeromicro/go-zero/core/trace"
  8. "github.com/zeromicro/go-zero/zrpc"
  9. "go.opentelemetry.io/otel"
  10. "go.opentelemetry.io/otel/attribute"
  11. "go.opentelemetry.io/otel/propagation"
  12. oteltrace "go.opentelemetry.io/otel/trace"
  13. "net/http"
  14. "ylink/comm/kafka"
  15. "ylink/comm/model"
  16. "ylink/comm/trace"
  17. "ylink/comm/utils"
  18. "ylink/core/inner/rpc/inner"
  19. "ylink/flowsrv/rpc/internal/config"
  20. )
  21. type ServiceContext struct {
  22. Config config.Config
  23. InnerRpc inner.Inner
  24. ConsumerGroup *kafka.ConsumerGroup
  25. }
  26. func NewServiceContext(c config.Config) *ServiceContext {
  27. svcCtx := &ServiceContext{
  28. Config: c,
  29. InnerRpc: inner.NewInner(zrpc.MustNewClient(c.InnerRpcConf)),
  30. ConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
  31. KafkaVersion: sarama.V1_0_0_0,
  32. OffsetsInitial: sarama.OffsetNewest,
  33. IsReturnErr: false,
  34. },
  35. c.KqMsgBoxConsumerConf.Brokers,
  36. []string{c.KqMsgBoxConsumerConf.Topic},
  37. c.KqMsgBoxConsumerConf.GroupId),
  38. }
  39. go svcCtx.subscribe()
  40. return svcCtx
  41. }
  42. func (s *ServiceContext) Setup(_ sarama.ConsumerGroupSession) error {
  43. return nil
  44. }
  45. func (s *ServiceContext) Cleanup(_ sarama.ConsumerGroupSession) error {
  46. return nil
  47. }
  48. func (s *ServiceContext) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  49. for msg := range claim.Messages() {
  50. if msg.Topic == s.Config.KqMsgBoxConsumerConf.Topic {
  51. s.handleMessage(sess, msg)
  52. }
  53. }
  54. return nil
  55. }
  56. func (s *ServiceContext) runWithCtx(callback func(ctx context.Context), kv ...attribute.KeyValue) {
  57. propagator := otel.GetTextMapPropagator()
  58. tracer := otel.GetTracerProvider().Tracer(gozerotrace.TraceName)
  59. ctx := propagator.Extract(context.Background(), propagation.HeaderCarrier(http.Header{}))
  60. spanName := utils.CallerFuncName()
  61. spanCtx, span := tracer.Start(ctx, spanName, oteltrace.WithSpanKind(oteltrace.SpanKindServer), oteltrace.WithAttributes(kv...))
  62. defer span.End()
  63. propagator.Inject(spanCtx, propagation.HeaderCarrier(http.Header{}))
  64. callback(spanCtx)
  65. }
  66. func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
  67. traceId := kafka.GetTraceFromHeader(msg.Headers)
  68. if len(traceId) == 0 {
  69. return
  70. }
  71. trace.RunOnTracing(traceId, func(ctx context.Context) {
  72. var message model.KqMessage
  73. if err := json.Unmarshal(msg.Value, &message); err != nil {
  74. logx.Errorf("unmarshal msg error: %v", err)
  75. return
  76. }
  77. trace.StartTrace(ctx, "FlowsrvServer.handleMessage.PushMessage", func(ctx context.Context) {
  78. //if len(message.ReceiverId) == 0 || message.ReceiverId == "" {
  79. // // 玩家发的消息
  80. // p2cMap := ext.IdMap.Get(message.GameId).(*treemap.Map)
  81. // message.ReceiverId = p2cMap.Get(message.SenderId).(string)
  82. // logx.Infof("receiver: %s", message.ReceiverId)
  83. // b, _ := json.Marshal(message)
  84. // s.svcCtx.KqMsgBoxProducer.SendMessage(ctx, string(b), message.ReceiverId)
  85. //} else {
  86. // s.svcCtx.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), message.ReceiverId)
  87. //}
  88. logx.WithContext(ctx).Infof("headers: %v", msg.Headers)
  89. logx.WithContext(ctx).Infof("traceId: %s", msg.Headers[0].Value)
  90. logx.WithContext(ctx).Infof("flowsrv recv message: %v", string(msg.Value))
  91. sess.MarkMessage(msg, "")
  92. }, attribute.String("msg.key", string(msg.Key)))
  93. })
  94. }
  95. func (s *ServiceContext) subscribe() {
  96. go s.ConsumerGroup.RegisterHandleAndConsumer(s)
  97. }