esclient.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. //@File esclient.go
  2. //@Time 2022/06/16
  3. //@Author #Suyghur,
  4. package es
  5. import (
  6. "bytes"
  7. "context"
  8. "github.com/bytedance/sonic/encoder"
  9. "github.com/elastic/go-elasticsearch/v7"
  10. "github.com/elastic/go-elasticsearch/v7/esapi"
  11. "github.com/zeromicro/go-zero/core/logx"
  12. "io"
  13. )
  14. type IEsClient interface {
  15. Insert(index string, data map[string]interface{})
  16. }
  17. type EsClient struct {
  18. client *elasticsearch.Client
  19. }
  20. func NewEsClient(conf EsConf) *EsClient {
  21. c := elasticsearch.Config{
  22. Addresses: conf.Addresses,
  23. Username: conf.Username,
  24. Password: conf.Password,
  25. }
  26. es, err := elasticsearch.NewClient(c)
  27. if err != nil {
  28. logx.WithContext(context.Background()).Error(err.Error())
  29. panic(err.Error())
  30. }
  31. return &EsClient{
  32. client: es,
  33. }
  34. }
  35. func (es *EsClient) Insert(index string, data interface{}) {
  36. var buf = bytes.NewBuffer(nil)
  37. if err := encoder.NewStreamEncoder(buf).Encode(data); err != nil {
  38. logx.WithContext(context.Background()).Error(err.Error())
  39. }
  40. req := esapi.IndexRequest{
  41. Index: index,
  42. Body: buf,
  43. Refresh: "true",
  44. }
  45. resp, err := req.Do(context.Background(), es.client)
  46. if err != nil {
  47. logx.WithContext(context.Background()).Errorf("error getting response: %s", err)
  48. return
  49. }
  50. logx.WithContext(context.Background()).Infof("%v", resp.String())
  51. defer resp.Body.Close()
  52. if resp.IsError() {
  53. logx.WithContext(context.Background()).Errorf("%s error indexing document data: %s", resp.Status(), data)
  54. }
  55. defer func(Body io.ReadCloser) {
  56. err := Body.Close()
  57. if err != nil {
  58. logx.WithContext(context.Background()).Error(err.Error())
  59. }
  60. }(resp.Body)
  61. }