flowsrvserver.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. // Code generated by goctl. DO NOT EDIT!
  2. // Source: flowsrv.proto
  3. package server
  4. import (
  5. "context"
  6. "encoding/json"
  7. "github.com/Shopify/sarama"
  8. "github.com/zeromicro/go-zero/core/logx"
  9. gozerotrace "github.com/zeromicro/go-zero/core/trace"
  10. "go.opentelemetry.io/otel"
  11. "go.opentelemetry.io/otel/attribute"
  12. "go.opentelemetry.io/otel/propagation"
  13. oteltrace "go.opentelemetry.io/otel/trace"
  14. "net/http"
  15. "ylink/comm/kafka"
  16. "ylink/comm/model"
  17. "ylink/comm/trace"
  18. "ylink/comm/utils"
  19. "ylink/flowsrv/rpc/internal/logic"
  20. "ylink/flowsrv/rpc/internal/svc"
  21. "ylink/flowsrv/rpc/pb"
  22. )
  23. type FlowsrvServer struct {
  24. svcCtx *svc.ServiceContext
  25. pb.UnimplementedFlowsrvServer
  26. ConsumerGroup *kafka.ConsumerGroup
  27. }
  28. func NewFlowsrvServer(svcCtx *svc.ServiceContext) *FlowsrvServer {
  29. server := &FlowsrvServer{
  30. svcCtx: svcCtx,
  31. ConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
  32. KafkaVersion: sarama.V1_0_0_0,
  33. OffsetsInitial: sarama.OffsetNewest,
  34. IsReturnErr: false,
  35. },
  36. svcCtx.Config.KqMsgBoxConsumerConf.Brokers,
  37. []string{svcCtx.Config.KqMsgBoxConsumerConf.Topic},
  38. svcCtx.Config.KqMsgBoxConsumerConf.GroupId),
  39. }
  40. server.subscribe()
  41. return server
  42. }
  43. func (s *FlowsrvServer) Connect(in *pb.CommandReq, stream pb.Flowsrv_ConnectServer) error {
  44. l := logic.NewConnectLogic(stream.Context(), s.svcCtx)
  45. return l.Connect(in, stream)
  46. }
  47. func (s *FlowsrvServer) Disconnect(ctx context.Context, in *pb.CommandReq) (*pb.CommandResp, error) {
  48. l := logic.NewDisconnectLogic(ctx, s.svcCtx)
  49. return l.Disconnect(in)
  50. }
  51. func (s *FlowsrvServer) Setup(_ sarama.ConsumerGroupSession) error {
  52. return nil
  53. }
  54. func (s *FlowsrvServer) Cleanup(_ sarama.ConsumerGroupSession) error {
  55. return nil
  56. }
  57. func (s *FlowsrvServer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  58. for msg := range claim.Messages() {
  59. if msg.Topic == s.svcCtx.Config.KqMsgBoxConsumerConf.Topic {
  60. s.handleMessage(sess, msg)
  61. }
  62. }
  63. return nil
  64. }
  65. func (s *FlowsrvServer) runWithCtx(callback func(ctx context.Context), kv ...attribute.KeyValue) {
  66. propagator := otel.GetTextMapPropagator()
  67. tracer := otel.GetTracerProvider().Tracer(gozerotrace.TraceName)
  68. ctx := propagator.Extract(context.Background(), propagation.HeaderCarrier(http.Header{}))
  69. spanName := utils.CallerFuncName()
  70. spanCtx, span := tracer.Start(ctx, spanName, oteltrace.WithSpanKind(oteltrace.SpanKindServer), oteltrace.WithAttributes(kv...))
  71. defer span.End()
  72. propagator.Inject(spanCtx, propagation.HeaderCarrier(http.Header{}))
  73. callback(spanCtx)
  74. }
  75. func (s *FlowsrvServer) handleMessage(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
  76. traceId := kafka.GetTraceFromHeader(msg.Headers)
  77. if len(traceId) == 0 {
  78. return
  79. }
  80. trace.RunOnTracing(traceId, func(ctx context.Context) {
  81. var message model.KqMessage
  82. if err := json.Unmarshal(msg.Value, &message); err != nil {
  83. logx.Errorf("unmarshal msg error: %v", err)
  84. return
  85. }
  86. trace.StartTrace(ctx, "FlowsrvServer.handleMessage.SendMessage", func(ctx context.Context) {
  87. //if len(message.ReceiverId) == 0 || message.ReceiverId == "" {
  88. // // 玩家发的消息
  89. // p2cMap := ext.IdMap.Get(message.GameId).(*treemap.Map)
  90. // message.ReceiverId = p2cMap.Get(message.SenderId).(string)
  91. // logx.Infof("receiver: %s", message.ReceiverId)
  92. // b, _ := json.Marshal(message)
  93. // s.svcCtx.KqMsgBoxProducer.SendMessage(ctx, string(b), message.ReceiverId)
  94. //} else {
  95. // s.svcCtx.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), message.ReceiverId)
  96. //}
  97. logx.WithContext(ctx).Infof("headers: %v", msg.Headers)
  98. logx.WithContext(ctx).Infof("traceId: %s", msg.Headers[0].Value)
  99. logx.WithContext(ctx).Infof("flowsrv recv message: %v", string(msg.Value))
  100. sess.MarkMessage(msg, "")
  101. }, attribute.String("msg.key", string(msg.Key)))
  102. })
  103. //s.runWithCtx(func(ctx context.Context) {
  104. // var message model.KqCmdMessage
  105. // if err := json.Unmarshal(msg.Value, &message); err != nil {
  106. // logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
  107. // return
  108. // }
  109. // trace.StartTrace(ctx, "FlowsrvServer.handleMessage.SendMessage", func(ctx context.Context) {
  110. // //if len(message.ReceiverId) == 0 || message.ReceiverId == "" {
  111. // // // 玩家发的消息
  112. // // p2cMap := ext.IdMap.Get(message.GameId).(*treemap.Map)
  113. // // message.ReceiverId = p2cMap.Get(message.SenderId).(string)
  114. // // logx.Infof("receiver: %s", message.ReceiverId)
  115. // // b, _ := json.Marshal(message)
  116. // // s.svcCtx.KqMsgBoxProducer.SendMessage(ctx, string(b), message.ReceiverId)
  117. // //} else {
  118. // // s.svcCtx.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), message.ReceiverId)
  119. // //}
  120. // logx.WithContext(ctx).Infof("flowsrv recv message: %v", string(msg.Value))
  121. // sess.MarkMessage(msg, "")
  122. // }, attribute.String("msg.key", string(msg.Key)))
  123. //})
  124. }
  125. func (s *FlowsrvServer) subscribe() {
  126. go s.ConsumerGroup.RegisterHandleAndConsumer(s)
  127. }