producer.go 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. //@File producer.go
  2. //@Time 2022/05/06
  3. //@Author #Suyghur,
  4. package kafka
  5. import (
  6. "context"
  7. "github.com/Shopify/sarama"
  8. "github.com/zeromicro/go-zero/core/logx"
  9. "go.opentelemetry.io/otel/attribute"
  10. "ylink/comm/ctxdata"
  11. "ylink/comm/trace"
  12. )
  13. type Producer struct {
  14. topic string
  15. addr []string
  16. config *sarama.Config
  17. producer sarama.SyncProducer
  18. }
  19. func NewKafkaProducer(addr []string, topic string) *Producer {
  20. p := Producer{}
  21. p.config = sarama.NewConfig()
  22. // Whether to enable the successes channel to be notified after the message is sent successfully
  23. p.config.Producer.Return.Successes = true
  24. // Set producer Message Reply level 0 1 all
  25. p.config.Producer.RequiredAcks = sarama.WaitForAll
  26. // Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message,
  27. // If there is no key, the partition will be selected randomly
  28. p.config.Producer.Partitioner = sarama.NewHashPartitioner
  29. p.addr = addr
  30. p.topic = topic
  31. // Initialize the client
  32. producer, err := sarama.NewSyncProducer(p.addr, p.config)
  33. if err != nil {
  34. logx.WithContext(context.Background()).Error(err.Error())
  35. panic(err.Error())
  36. }
  37. p.producer = producer
  38. return &p
  39. }
  40. func (p *Producer) SendMessage(ctx context.Context, m string, key ...string) (partition int32, offset int64, err error) {
  41. logx.WithContext(ctx).Infof("send msg to kafka, msg: %s", m)
  42. traceId := ctxdata.GetTraceIdFromCtx(ctx)
  43. msg := &sarama.ProducerMessage{}
  44. msg.Headers = []sarama.RecordHeader{{
  45. Key: sarama.ByteEncoder("trace_id"),
  46. Value: sarama.ByteEncoder(traceId),
  47. }}
  48. msg.Topic = p.topic
  49. if len(key) == 1 {
  50. msg.Key = sarama.StringEncoder(key[0])
  51. }
  52. msg.Value = sarama.StringEncoder(m)
  53. trace.StartTrace(ctx, "SendMessageToKafka", func(ctx context.Context) {
  54. partition, offset, err = p.producer.SendMessage(msg)
  55. }, attribute.StringSlice("keys", key), attribute.String("topic", p.topic))
  56. return
  57. }