123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- package svc
- import (
- "context"
- "github.com/Shopify/sarama"
- "github.com/bytedance/sonic"
- "github.com/zeromicro/go-zero/core/logx"
- "go.opentelemetry.io/otel/attribute"
- "ylink/comm/es"
- "ylink/comm/kafka"
- "ylink/comm/model"
- "ylink/comm/trace"
- "ylink/core/transfer/rpc/internal/config"
- )
- type ServiceContext struct {
- Config config.Config
- EsClient *es.EsClient
- KqDbConsumerGroup *kafka.ConsumerGroup
- }
- func NewServiceContext(c config.Config) *ServiceContext {
- svcCtx := &ServiceContext{
- Config: c,
- EsClient: es.NewEsClient(c.EsConf),
- KqDbConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
- KafkaVersion: sarama.V1_0_0_0,
- OffsetsInitial: sarama.OffsetNewest,
- IsReturnErr: false,
- },
- c.KqMsgBoxConsumerConf.Brokers,
- []string{c.KqMsgBoxConsumerConf.Topic},
- c.KqMsgBoxConsumerConf.GroupId),
- }
- go svcCtx.subscribe()
- return svcCtx
- }
- func (s *ServiceContext) Setup(_ sarama.ConsumerGroupSession) error {
- return nil
- }
- func (s *ServiceContext) Cleanup(_ sarama.ConsumerGroupSession) error {
- return nil
- }
- func (s *ServiceContext) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
- for msg := range claim.Messages() {
- if msg.Topic == s.Config.KqMsgBoxConsumerConf.Topic {
- s.handleMessage(sess, msg)
- }
- }
- return nil
- }
- func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
- traceId := kafka.GetTraceFromHeader(msg.Headers)
- if len(traceId) == 0 {
- return
- }
- trace.RunOnTracing(traceId, func(ctx context.Context) {
- logx.WithContext(ctx).Infof("handle message: %s", msg.Value)
- var message model.KqMessage
- if err := sonic.Unmarshal(msg.Value, &message); err != nil {
- logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
- return
- }
- if message.Opt != model.CMD_SEND_MESSAGE {
- // 指令异常
- return
- }
- trace.StartTrace(ctx, "TransferServer.handleMessage.InsertMessage2Es", func(ctx context.Context) {
- s.EsClient.Insert("chat_message_log", message)
- sess.MarkMessage(msg, "")
- }, attribute.String("msg.key", string(msg.Key)))
- })
- }
- func (s *ServiceContext) subscribe() {
- go s.KqDbConsumerGroup.RegisterHandleAndConsumer(s)
- }
|