BaseConsumer.cs 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. using Confluent.Kafka;
  2. using Polly;
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Text;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using Wicture.DbRESTFul;
  9. namespace JiaZhiQuan.Common.Messaging
  10. {
  11. public abstract class BaseConsumer : IConsumer
  12. {
  13. public event EventHandler<MessageReceivedEventArgs> Received;
  14. private IConsumer<Ignore, string> consumer;
  15. public abstract ConsumerConfig ClientConfig { get; set; }
  16. public abstract IEnumerable<string> TopicNames { get; set; }
  17. private CancellationTokenSource cancellationTokenSource;
  18. public void Init()
  19. {
  20. Consume();
  21. }
  22. protected virtual void OnException(Exception ex)
  23. {
  24. }
  25. protected virtual void OnExecuteTooLong(double secs, string message)
  26. {
  27. }
  28. protected virtual void Consume1()
  29. {
  30. Task.Run(async () =>
  31. {
  32. var politicaWaitAndRetry = Policy.Handle<Exception>().WaitAndRetryForeverAsync(idx =>
  33. {
  34. var sec = idx == 0 ? 1 : idx == 1 ? 10 : idx == 2 ? 30 : 60;
  35. return TimeSpan.FromSeconds(sec);
  36. }, (ex, time) => { OnException(ex); });
  37. await politicaWaitAndRetry.ExecuteAsync(async () =>
  38. {
  39. ClientConfig = ClientConfig ?? new ConsumerConfig()
  40. {
  41. GroupId = "0",
  42. EnableAutoCommit = true,
  43. MaxPollIntervalMs = 900000
  44. };
  45. if (string.IsNullOrEmpty(ClientConfig.BootstrapServers))
  46. {
  47. var cfg = KafkaClientConfig.GetFromConfig();
  48. ClientConfig.BootstrapServers = cfg.BootstrapServers;
  49. }
  50. cancellationTokenSource = new CancellationTokenSource();
  51. consumer = new ConsumerBuilder<Ignore, string>(ClientConfig).Build();
  52. consumer.Subscribe(TopicNames);
  53. try
  54. {
  55. while (true)
  56. {
  57. try
  58. {
  59. var cr = consumer.Consume(cancellationTokenSource.Token);
  60. await ConsumeInternal(cr);
  61. Received?.Invoke(this, new MessageReceivedEventArgs { Message = cr.Message.Value });
  62. LoggerManager.Logger.Info($"Consumed message '{cr.Message.Value}' at: '{cr.TopicPartitionOffset}'.");
  63. }
  64. catch (ConsumeException e)
  65. {
  66. LoggerManager.Logger.Error(e, $"Error occured: {e.Error.Reason}");
  67. throw e;
  68. }
  69. catch (Exception e)
  70. {
  71. LoggerManager.Logger.Info($"Canceled Consume operation.");
  72. throw e;
  73. }
  74. }
  75. }
  76. catch (Exception ex)
  77. {
  78. throw ex;
  79. }
  80. finally
  81. {
  82. try
  83. {
  84. cancellationTokenSource.Cancel();
  85. cancellationTokenSource.Dispose();
  86. consumer.Dispose();
  87. }
  88. catch { }
  89. }
  90. });
  91. });
  92. }
  93. protected virtual void Consume()
  94. {
  95. new Task(() =>
  96. {
  97. var politicaWaitAndRetry = Policy.Handle<Exception>().WaitAndRetryForever(idx =>
  98. {
  99. var sec = idx == 0 ? 1 : idx == 1 ? 10 : idx == 2 ? 30 : 60;
  100. return TimeSpan.FromSeconds(sec);
  101. }, (ex, time) => { OnException(ex); });
  102. politicaWaitAndRetry.Execute(() =>
  103. {
  104. var cfg = KafkaClientConfig.GetFromConfig();
  105. ClientConfig = ClientConfig ?? new ConsumerConfig()
  106. {
  107. GroupId = cfg?.GroupId ?? "0",
  108. EnableAutoCommit = true,
  109. MaxPollIntervalMs = 900000
  110. };
  111. if (string.IsNullOrEmpty(ClientConfig.BootstrapServers))
  112. {
  113. ClientConfig.BootstrapServers = cfg.BootstrapServers;
  114. }
  115. cancellationTokenSource = new CancellationTokenSource();
  116. consumer = new ConsumerBuilder<Ignore, string>(ClientConfig).Build();
  117. consumer.Subscribe(TopicNames);
  118. try
  119. {
  120. while (true)
  121. {
  122. try
  123. {
  124. var cr = consumer.Consume(cancellationTokenSource.Token);
  125. ConsumeInternal(cr).Wait();
  126. Received?.Invoke(this, new MessageReceivedEventArgs { Message = cr.Message.Value });
  127. LoggerManager.Logger.Info($"Consumed message '{cr.Message.Value}' at: '{cr.TopicPartitionOffset}'.");
  128. }
  129. catch (ConsumeException e)
  130. {
  131. LoggerManager.Logger.Error(e, $"Error occured: {e.Error.Reason}");
  132. throw e;
  133. }
  134. catch (Exception e)
  135. {
  136. LoggerManager.Logger.Info($"Canceled Consume operation.");
  137. throw e;
  138. }
  139. }
  140. }
  141. catch (Exception ex)
  142. {
  143. throw ex;
  144. }
  145. finally
  146. {
  147. try
  148. {
  149. cancellationTokenSource.Cancel();
  150. cancellationTokenSource.Dispose();
  151. consumer.Dispose();
  152. }
  153. catch { }
  154. }
  155. });
  156. }, TaskCreationOptions.LongRunning).Start();
  157. }
  158. private async Task ConsumeInternal(ConsumeResult<Ignore, string> cr)
  159. {
  160. try
  161. {
  162. var start = DateTime.Now;
  163. await ConsumeAsync(cancellationTokenSource, cr.Message.Value);
  164. var completed = DateTime.Now;
  165. var secs = (completed - start).TotalSeconds;
  166. if (secs > 1)
  167. {
  168. LoggerManager.Logger.Error($"Task【{string.Join(',', TopicNames)}】运行时间过长【{secs}s】,内容:" + cr.Message.Value);
  169. OnExecuteTooLong(secs, cr.Message.Value);
  170. }
  171. // consumer.Commit(new List<TopicPartitionOffset> { cr.TopicPartitionOffset });
  172. }
  173. catch (NullReferenceException)
  174. {
  175. LoggerManager.Logger.Error($"Please check if null returned by ConsumeAsync, use Task.CompletedTask instead.");
  176. }
  177. catch (Exception ex)
  178. {
  179. LoggerManager.Logger.Error(ex, $"Error occured to consume message");
  180. }
  181. }
  182. public abstract Task ConsumeAsync(CancellationTokenSource cts, string message);
  183. public void Dispose()
  184. {
  185. if (consumer != null)
  186. {
  187. cancellationTokenSource.Cancel();
  188. cancellationTokenSource.Dispose();
  189. consumer.Dispose();
  190. consumer = null;
  191. GC.SuppressFinalize(this);
  192. }
  193. }
  194. }
  195. }