es_mgr.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. //@Author : KaiShin
  2. //@Time : 2021/10/28
  3. package es_mgr
  4. import (
  5. "bytes"
  6. "context"
  7. "encoding/json"
  8. "github.com/elastic/go-elasticsearch/v7"
  9. "github.com/elastic/go-elasticsearch/v7/esapi"
  10. "github.com/farmerx/elasticsql"
  11. "github.com/tal-tech/go-zero/core/logx"
  12. "io"
  13. )
  14. type EsMgr struct {
  15. Es *elasticsearch.Client
  16. }
  17. type EsMgrInterface interface {
  18. Insert(index string, data map[string]interface{})
  19. Query(sql string) []interface{}
  20. }
  21. func New(conf EsConfig) EsMgrInterface {
  22. var sel = new(EsMgr)
  23. config := elasticsearch.Config{
  24. Addresses: conf.Addresses,
  25. Username: conf.UserName,
  26. Password: conf.Password,
  27. }
  28. es, err := elasticsearch.NewClient(config)
  29. if err != nil {
  30. logx.Error("[EsMgr.Init] elasticsearch.NewClient failed, err:", err)
  31. return nil
  32. }
  33. sel.Es = es
  34. logx.Info("[EsMgr.Init], address: ", conf.Addresses)
  35. return sel
  36. }
  37. func (sel *EsMgr) Insert(index string, data map[string]interface{}) {
  38. var buf bytes.Buffer
  39. if err := json.NewEncoder(&buf).Encode(data); err != nil {
  40. logx.Errorf("[EsMgr.Insert] err:", err)
  41. return
  42. }
  43. req := esapi.IndexRequest{
  44. Index: index,
  45. Body: &buf,
  46. Refresh: "true",
  47. }
  48. res, err := req.Do(context.Background(), sel.Es)
  49. if err != nil {
  50. logx.Errorf("[EsMgr.Insert] Error getting response: %s", err)
  51. return
  52. }
  53. defer res.Body.Close()
  54. if res.IsError() {
  55. logx.Errorf("[EsMgr.Insert] [%s] Error indexing document data=%s", res.Status(), data)
  56. }
  57. defer func(Body io.ReadCloser) {
  58. err := Body.Close()
  59. if err != nil {
  60. }
  61. }(res.Body)
  62. }
  63. func (sel *EsMgr) Query(sql string) []interface{} {
  64. // dsl, index, err := elasticsql.Convert(sql)
  65. index, dsl, err := elasticsql.NewElasticSQL().SQLConvert(sql)
  66. if err != nil {
  67. logx.Errorf("[EsMgr.Query] Convert, err: %s", err)
  68. return nil
  69. }
  70. // logx.Infof("[DEBUG][EsMgr.Query] dsl:", dsl)
  71. var query map[string]interface{}
  72. err = json.Unmarshal([]byte(dsl), &query)
  73. if err != nil {
  74. logx.Errorf("[EsMgr.Query] json.Unmarshal err: %s", err)
  75. return nil
  76. }
  77. var buf bytes.Buffer
  78. if err := json.NewEncoder(&buf).Encode(query); err != nil {
  79. logx.Errorf("[EsMgr.Query] json.NewEncoder err: %s", err)
  80. return nil
  81. }
  82. ////// todo sql query demo
  83. //q := map[string] interface{}{
  84. // "query": sql,
  85. //}
  86. //jsonBody, _ := json.Marshal(q)
  87. //req := esapi.SQLQueryRequest{Body: bytes.NewReader(jsonBody)}
  88. //res, _ := req.Do(context.Background(), sel.Es)
  89. //
  90. //defer res.Body.Close()
  91. //log.Println(res.String())
  92. res, err := sel.Es.Search(
  93. sel.Es.Search.WithContext(context.Background()),
  94. sel.Es.Search.WithIndex(index),
  95. sel.Es.Search.WithBody(&buf),
  96. sel.Es.Search.WithPretty(),
  97. )
  98. if err != nil {
  99. logx.Errorf("[EsMgr.Query] Es.Search err: %s", err)
  100. return nil
  101. }
  102. defer func(Body io.ReadCloser) {
  103. err := Body.Close()
  104. if err != nil {
  105. }
  106. }(res.Body)
  107. var r map[string]interface{}
  108. if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
  109. logx.Errorf("[EsMgr.Query] err: %s", err)
  110. return nil
  111. }
  112. _, ok := r["error"]
  113. if ok == true {
  114. logx.Infof("[EsMgr.Query] es search err, err:%+v", r)
  115. return nil
  116. }
  117. var dataList []interface{}
  118. hits := r["hits"].(map[string]interface{})["hits"].([]interface{})
  119. for _, data := range hits {
  120. dataList = append(dataList, data.(map[string]interface{})["_source"])
  121. }
  122. return dataList
  123. }
  124. func (sel *EsMgr) parseConf(confStr string) ([]string, string, string) {
  125. var mapConfStr map[string]interface{}
  126. err := json.Unmarshal([]byte(confStr), &mapConfStr)
  127. if err != nil {
  128. return nil, "", ""
  129. }
  130. address, ok := mapConfStr["Addresses"].([]interface{})
  131. if ok != true {
  132. return nil, "", ""
  133. }
  134. var addStrList []string
  135. for _, addr := range address {
  136. addStrList = append(addStrList, addr.(string))
  137. }
  138. username, ok := mapConfStr["Username"].(string)
  139. if ok != true {
  140. return nil, "", ""
  141. }
  142. password, ok := mapConfStr["Password"].(string)
  143. if ok != true {
  144. return nil, "", ""
  145. }
  146. return addStrList, username, password
  147. }