producer.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. //@File producer.go
  2. //@Time 2022/05/06
  3. //@Author #Suyghur,
  4. package kafka
  5. import (
  6. "context"
  7. "github.com/Shopify/sarama"
  8. "github.com/zeromicro/go-zero/core/logx"
  9. )
  10. type Producer struct {
  11. topic string
  12. addr []string
  13. config *sarama.Config
  14. producer sarama.SyncProducer
  15. }
  16. func NewKafkaProducer(addr []string, topic string) *Producer {
  17. p := Producer{}
  18. p.config = sarama.NewConfig()
  19. // Whether to enable the successes channel to be notified after the message is sent successfully
  20. p.config.Producer.Return.Successes = true
  21. // Set producer Message Reply level 0 1 all
  22. p.config.Producer.RequiredAcks = sarama.WaitForAll
  23. // Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message,
  24. // If there is no key, the partition will be selected randomly
  25. p.config.Producer.Partitioner = sarama.NewHashPartitioner
  26. p.addr = addr
  27. p.topic = topic
  28. // Initialize the client
  29. producer, err := sarama.NewSyncProducer(p.addr, p.config)
  30. if err != nil {
  31. logx.WithContext(context.Background()).Error(err.Error())
  32. return nil
  33. }
  34. p.producer = producer
  35. return &p
  36. }
  37. //func (p *Producer) SendMessage(m proto.Message, key ...string) (int32, int64, error) {
  38. // kMsg := &sarama.ProducerMessage{}
  39. // kMsg.Topic = p.topic
  40. // if len(key) == 1 {
  41. // kMsg.Key = sarama.StringEncoder(key[0])
  42. // }
  43. // bMsg, err := proto.Marshal(m)
  44. // if err != nil {
  45. // logx.WithContext(context.Background()).Errorf("proto marshal err: %s", err.Error())
  46. // return -1, -1, err
  47. // }
  48. // kMsg.Value = sarama.ByteEncoder(bMsg)
  49. // return p.producer.SendMessage(kMsg)
  50. //}
  51. func (p *Producer) SendMessage(m string, key ...string) (int32, int64, error) {
  52. msg := &sarama.ProducerMessage{}
  53. msg.Topic = p.topic
  54. if len(key) == 1 {
  55. msg.Key = sarama.StringEncoder(key[0])
  56. }
  57. //bMsg, err := proto.Marshal(m)
  58. //if err != nil {
  59. // logx.Errorf("proto marshal err: %s", err.Error())
  60. // return -1, -1, err
  61. //}
  62. msg.Value = sarama.StringEncoder(m)
  63. return p.producer.SendMessage(msg)
  64. }