servicecontext.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package svc
  2. import (
  3. "context"
  4. "github.com/Shopify/sarama"
  5. "github.com/bytedance/sonic"
  6. "github.com/zeromicro/go-zero/core/logx"
  7. "go.opentelemetry.io/otel/attribute"
  8. "ylink/comm/es"
  9. "ylink/comm/kafka"
  10. "ylink/comm/model"
  11. "ylink/comm/trace"
  12. "ylink/core/transfer/rpc/internal/config"
  13. )
  14. type ServiceContext struct {
  15. Config config.Config
  16. EsClient *es.EsClient
  17. KqDbConsumerGroup *kafka.ConsumerGroup
  18. }
  19. func NewServiceContext(c config.Config) *ServiceContext {
  20. svcCtx := &ServiceContext{
  21. Config: c,
  22. EsClient: es.NewEsClient(c.EsConf),
  23. KqDbConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
  24. KafkaVersion: sarama.V1_0_0_0,
  25. OffsetsInitial: sarama.OffsetNewest,
  26. IsReturnErr: false,
  27. },
  28. c.KqMsgBoxConsumerConf.Brokers,
  29. []string{c.KqMsgBoxConsumerConf.Topic},
  30. c.KqMsgBoxConsumerConf.GroupId),
  31. }
  32. go svcCtx.subscribe()
  33. return svcCtx
  34. }
  35. func (s *ServiceContext) Setup(_ sarama.ConsumerGroupSession) error {
  36. return nil
  37. }
  38. func (s *ServiceContext) Cleanup(_ sarama.ConsumerGroupSession) error {
  39. return nil
  40. }
  41. func (s *ServiceContext) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  42. for msg := range claim.Messages() {
  43. if msg.Topic == s.Config.KqMsgBoxConsumerConf.Topic {
  44. s.handleMessage(sess, msg)
  45. }
  46. }
  47. return nil
  48. }
  49. func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
  50. traceId := kafka.GetTraceFromHeader(msg.Headers)
  51. if len(traceId) == 0 {
  52. return
  53. }
  54. trace.RunOnTracing(traceId, func(ctx context.Context) {
  55. logx.WithContext(ctx).Infof("handle message: %s", msg.Value)
  56. var message model.KqMessage
  57. if err := sonic.Unmarshal(msg.Value, &message); err != nil {
  58. logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
  59. return
  60. }
  61. if message.Opt != model.CMD_SEND_MESSAGE {
  62. // 指令异常
  63. return
  64. }
  65. trace.StartTrace(ctx, "TransferServer.handleMessage.InsertMessage2Es", func(ctx context.Context) {
  66. s.EsClient.Insert("chat_message_log", message)
  67. sess.MarkMessage(msg, "")
  68. }, attribute.String("msg.key", string(msg.Key)))
  69. })
  70. }
  71. func (s *ServiceContext) subscribe() {
  72. go s.KqDbConsumerGroup.RegisterHandleAndConsumer(s)
  73. }