producer.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. //@File producer.go
  2. //@Time 2022/05/06
  3. //@Author #Suyghur,
  4. package kafka
  5. import (
  6. "context"
  7. "github.com/Shopify/sarama"
  8. "github.com/zeromicro/go-zero/core/logx"
  9. "go.opentelemetry.io/otel/attribute"
  10. "ylink/comm/ctxdata"
  11. "ylink/comm/trace"
  12. )
  13. type Producer struct {
  14. topic string
  15. addr []string
  16. config *sarama.Config
  17. producer sarama.SyncProducer
  18. }
  19. func NewKafkaProducer(addr []string, topic string) *Producer {
  20. logx.Infof("brokers: %v", addr)
  21. logx.Infof("topic: %s", topic)
  22. p := Producer{}
  23. p.config = sarama.NewConfig()
  24. // Whether to enable the successes channel to be notified after the message is sent successfully
  25. p.config.Producer.Return.Successes = true
  26. // Set producer Message Reply level 0 1 all
  27. p.config.Producer.RequiredAcks = sarama.WaitForAll
  28. // Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message,
  29. // If there is no key, the partition will be selected randomly
  30. p.config.Producer.Partitioner = sarama.NewHashPartitioner
  31. p.addr = addr
  32. p.topic = topic
  33. // Initialize the client
  34. producer, err := sarama.NewSyncProducer(p.addr, p.config)
  35. if err != nil {
  36. logx.WithContext(context.Background()).Error(err.Error())
  37. panic(err.Error())
  38. }
  39. p.producer = producer
  40. return &p
  41. }
  42. func (p *Producer) SendMessage(ctx context.Context, m string, key ...string) (partition int32, offset int64, err error) {
  43. traceId := ctxdata.GetTraceIdFromCtx(ctx)
  44. msg := &sarama.ProducerMessage{}
  45. msg.Headers = []sarama.RecordHeader{{
  46. Key: sarama.ByteEncoder("trace_id"),
  47. Value: sarama.ByteEncoder(traceId),
  48. }}
  49. msg.Topic = p.topic
  50. if len(key) == 1 {
  51. msg.Key = sarama.StringEncoder(key[0])
  52. }
  53. msg.Value = sarama.StringEncoder(m)
  54. trace.StartTrace(ctx, "SendMessageToKafka", func(ctx context.Context) {
  55. partition, offset, err = p.producer.SendMessage(msg)
  56. }, attribute.StringSlice("keys", key), attribute.String("topic", p.topic))
  57. return
  58. }