package svc import ( "context" "github.com/Shopify/sarama" "github.com/bytedance/sonic" "github.com/zeromicro/go-zero/core/logx" "go.opentelemetry.io/otel/attribute" "ylink/comm/es" "ylink/comm/kafka" "ylink/comm/model" "ylink/comm/trace" "ylink/core/transfer/rpc/internal/config" ) type ServiceContext struct { Config config.Config EsClient *es.EsClient KqDbConsumerGroup *kafka.ConsumerGroup } func NewServiceContext(c config.Config) *ServiceContext { svcCtx := &ServiceContext{ Config: c, EsClient: es.NewEsClient(c.EsConf), KqDbConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{ KafkaVersion: sarama.V1_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, }, c.KqMsgBoxConsumerConf.Brokers, []string{c.KqMsgBoxConsumerConf.Topic}, c.KqMsgBoxConsumerConf.GroupId), } go svcCtx.subscribe() return svcCtx } func (s *ServiceContext) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (s *ServiceContext) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (s *ServiceContext) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { if msg.Topic == s.Config.KqMsgBoxConsumerConf.Topic { s.handleMessage(sess, msg) } } return nil } func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) { traceId := kafka.GetTraceFromHeader(msg.Headers) if len(traceId) == 0 { return } trace.RunOnTracing(traceId, func(ctx context.Context) { logx.WithContext(ctx).Infof("handle message: %s", msg.Value) var message model.KqMessage if err := sonic.Unmarshal(msg.Value, &message); err != nil { logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err) return } if message.Opt != model.CMD_SEND_MESSAGE { // 指令异常 return } trace.StartTrace(ctx, "TransferServer.handleMessage.InsertMessage2Es", func(ctx context.Context) { s.EsClient.Insert("chat_message_log", message) sess.MarkMessage(msg, "") }, attribute.String("msg.key", string(msg.Key))) }) } func (s *ServiceContext) subscribe() { go s.KqDbConsumerGroup.RegisterHandleAndConsumer(s) }