ESHelper.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. using Elasticsearch.Net;
  2. using JiaZhiQuan.Common.ElasticSearch.Models;
  3. using JiaZhiQuan.Common.Utils;
  4. using Newtonsoft.Json;
  5. using Newtonsoft.Json.Linq;
  6. using System;
  7. using System.Collections.Generic;
  8. using System.ComponentModel;
  9. using System.Diagnostics;
  10. using System.Linq;
  11. using System.Reflection;
  12. using System.Text;
  13. using System.Threading.Tasks;
  14. using Wicture.DbRESTFul;
  15. using Wicture.DbRESTFul.Configuration;
  16. namespace JiaZhiQuan.Common.ElasticSearch
  17. {
  18. public class ESHelper
  19. {
  20. #region models
  21. public class PaginationResult<T>
  22. {
  23. public List<T> Items { get; set; }
  24. public PaginationModel Pagination { get; set; }
  25. }
  26. public class PaginationModel
  27. {
  28. public int TotalCount { get; set; }
  29. public int PageIndex { get; set; }
  30. public int PageSize { get; set; }
  31. }
  32. #endregion
  33. private static object syncObj = new object();
  34. private static ElasticLowLevelClient _client = null;
  35. public static ElasticLowLevelClient Client
  36. {
  37. get
  38. {
  39. if (_client == null)
  40. {
  41. lock (syncObj)
  42. {
  43. if (_client == null)
  44. {
  45. var config = ConfigurationManager.Settings.GetConfig<ElasticSearchConfig>(ElasticSearchConfig.ConfigSectionName);
  46. var settings = new ConnectionConfiguration(new Uri(config.Url))
  47. .BasicAuthentication(config.User, config.Password)
  48. .RequestTimeout(TimeSpan.FromMinutes(1));
  49. _client = new ElasticLowLevelClient(settings);
  50. }
  51. }
  52. }
  53. return _client;
  54. }
  55. }
  56. /// <summary>
  57. /// pageSize > 0时,页码才起作用
  58. /// </summary>
  59. /// <returns></returns>
  60. public async static Task<PaginationResult<T>> Query<T>(string index, object postData, int pageIndex = 0, int pageSize = 0)
  61. {
  62. SearchRequestParameters parameters = new SearchRequestParameters();
  63. if (pageSize > 0)
  64. {
  65. parameters.SetQueryString("size", pageSize);
  66. if (pageIndex > 0)
  67. {
  68. parameters.SetQueryString("from", (pageIndex - 1) * pageSize);
  69. }
  70. }
  71. try
  72. {
  73. var resp = await Client.SearchAsync<StringResponse>(index, PostData.Serializable(postData), parameters);
  74. if (resp.Success)
  75. {
  76. var obj = JsonConvert.DeserializeObject<JToken>(resp.Body);
  77. var j = new List<dynamic>();
  78. var totalCount = obj.Value<JObject>("hits")?.Value<JToken>("total")?.Value<int>("value") ?? 0;
  79. var hits = obj.Value<JObject>("hits")?.Value<JArray>("hits")?.Select(e => JsonConvert.DeserializeObject<T>(e.Value<JToken>("_source").ToString()))?.ToList();
  80. return new PaginationResult<T>
  81. {
  82. Items = hits,
  83. Pagination = new PaginationModel
  84. {
  85. PageIndex = pageIndex,
  86. PageSize = pageSize,
  87. TotalCount = totalCount
  88. }
  89. };
  90. }
  91. }
  92. catch (Exception ex)
  93. {
  94. Console.WriteLine(ex.Message);
  95. throw ex;
  96. }
  97. return null;
  98. }
  99. public async static Task<JToken> Get(string index, string id, GetRequestParameters param = null)
  100. {
  101. var resp = await Client.GetAsync<StringResponse>(index, id, param);
  102. if (resp.Success)
  103. {
  104. var obj = JsonConvert.DeserializeObject<JToken>(resp.Body);
  105. return obj.Value<JToken>("_source");
  106. }
  107. return null;
  108. }
  109. public async static Task<bool> CheckExist(string index, string id)
  110. {
  111. var resp = await Client.DocumentExistsAsync<StringResponse>(index, id);
  112. return resp.Success;
  113. }
  114. /// <summary>
  115. /// 解析ES返回的对象,提取数据和数据条数
  116. /// </summary>
  117. /// <typeparam name="T">需要序列化成的对象,属性名需要和ES的字段名一样,如果不一样需要设置特性</typeparam>
  118. /// <param name="response">ES返回的对象</param>
  119. /// <param name="methodName">调用本方法的方法名称,用来记录异常日志,如果解析失败可以通过名称定位到具体是哪个方法调用的</param>
  120. /// <returns></returns>
  121. public static ESResponseData<T> GetResponseData<T>(StringResponse response)
  122. {
  123. ESResponseData<T> res = new ESResponseData<T>();
  124. if (response is null)
  125. {
  126. res.errorMsg = "response is null";
  127. return res;
  128. }
  129. var stacktrace = new StackTrace();
  130. var method = stacktrace.GetFrame(1).GetMethod();
  131. string methodName = stacktrace.GetFrame(1).GetMethod().DeclaringType.FullName;
  132. if (response.Success)
  133. {
  134. try
  135. {
  136. var jsonData = JsonConvert.DeserializeObject<JToken>(response.Body);
  137. res.total = jsonData.Value<JObject>(ESConstants.ResponseKey.HITS)
  138. ?.Value<JObject>(ESConstants.ResponseKey.HITS_TOTAL)
  139. ?.Value<int?>(ESConstants.ResponseKey.HITS_TOTAL_VALUE)
  140. ?? 0;
  141. PropertyInfo[] properties = typeof(T).GetProperties();
  142. res.data = jsonData.Value<JObject>(ESConstants.ResponseKey.HITS)
  143. ?.Value<JArray>(ESConstants.ResponseKey.HITS_HITS)
  144. ?.Select(e =>
  145. {
  146. var source = e.Value<JToken>(ESConstants.ResponseKey.HITS_HITS_SOURCE);
  147. T itemData = Activator.CreateInstance<T>();
  148. foreach (PropertyInfo property in properties)
  149. {
  150. object newValue = null;
  151. ESIdAttribute idAttribute = property.GetCustomAttribute<ESIdAttribute>();
  152. if (idAttribute != null)
  153. {
  154. // 主键
  155. newValue = e.Value<JToken>(ESConstants.ResponseKey.HITS_HITS_ID).Value<string>();
  156. }
  157. else
  158. {
  159. // 业务字段
  160. ESFieldAttribute attribute = property.GetCustomAttribute<ESFieldAttribute>();
  161. // 如果没有设置特性,或者特性没有设置映射的名称,那么就用属性名作为ES的字段名查询数据
  162. if (attribute is null || string.IsNullOrWhiteSpace(attribute.ESFieldName))
  163. {
  164. var value = (source[property.Name] as JValue)?.Value;
  165. newValue = ObjectUtils.ConvertObjectToType(value, property.PropertyType);
  166. }
  167. // 如果设置了特性,而且ESFieldName不为空,那么就用ESFieldName作为ES的字段名查询数据
  168. else if (attribute != null && !string.IsNullOrWhiteSpace(attribute.ESFieldName))
  169. {
  170. var value = (source[attribute.ESFieldName] as JValue)?.Value;
  171. newValue = ObjectUtils.ConvertObjectToType(value, property.PropertyType);
  172. }
  173. }
  174. property.SetValue(itemData, newValue, null);
  175. }
  176. return itemData;
  177. })
  178. ?.ToList();
  179. res.success = true;
  180. }
  181. catch (Exception ex)
  182. {
  183. res.errorMsg = ex.Message;
  184. LoggerManager.Logger.Error($"【{methodName}】ES解析数据报错: {ex}");
  185. }
  186. }
  187. else
  188. {
  189. res.errorMsg = response.OriginalException.ToString();
  190. LoggerManager.Logger.Error($"【{methodName}】ES解析数据失败: {response?.ToString()}");
  191. }
  192. return res;
  193. }
  194. /// <summary>
  195. /// 解析ES返回的对象,提取统计结果
  196. /// </summary>
  197. /// <typeparam name="T"></typeparam>
  198. /// <param name="response"></param>
  199. /// <returns></returns>
  200. public static ESResponseAggregations GetResponseAggregations(StringResponse response)
  201. {
  202. ESResponseAggregations res = new ESResponseAggregations();
  203. if (response is null)
  204. {
  205. res.errorMsg = "response is null";
  206. return res;
  207. }
  208. var stacktrace = new StackTrace();
  209. var method = stacktrace.GetFrame(1).GetMethod();
  210. string methodName = stacktrace.GetFrame(1).GetMethod().DeclaringType.FullName;
  211. if (response.Success)
  212. {
  213. try
  214. {
  215. var dic = new Dictionary<string, int>();
  216. var jsonData = JsonConvert.DeserializeObject<JToken>(response.Body);
  217. var aggs = jsonData.Value<JObject>(ESConstants.ResponseKey.AGGREGATIONS);
  218. foreach (var item in aggs)
  219. {
  220. dic.Add(item.Key, item.Value.Value<int>(ESConstants.ResponseKey.AGGREGATIONS_COUNT));
  221. }
  222. res.data = dic;
  223. res.success = true;
  224. }
  225. catch (Exception ex)
  226. {
  227. res.errorMsg = ex.Message;
  228. LoggerManager.Logger.Error($"【{methodName}】ES解析数据报错: {ex}");
  229. }
  230. }
  231. else
  232. {
  233. res.errorMsg = response.OriginalException.ToString();
  234. LoggerManager.Logger.Error($"【{methodName}】ES解析聚合结果失败: {response?.ToString()}");
  235. }
  236. return res;
  237. }
  238. }
  239. }