123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- using Confluent.Kafka;
- using Polly;
- using System;
- using System.Collections.Generic;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- using Wicture.DbRESTFul;
- namespace JiaZhiQuan.Common.Messaging
- {
- public abstract class BaseConsumer : IConsumer
- {
- public event EventHandler<MessageReceivedEventArgs> Received;
- private IConsumer<Ignore, string> consumer;
- public abstract ConsumerConfig ClientConfig { get; set; }
- public abstract IEnumerable<string> TopicNames { get; set; }
- private CancellationTokenSource cancellationTokenSource;
- public void Init()
- {
- Consume();
- }
- protected virtual void OnException(Exception ex)
- {
- }
- protected virtual void OnExecuteTooLong(double secs, string message)
- {
- }
- protected virtual void Consume1()
- {
- Task.Run(async () =>
- {
- var politicaWaitAndRetry = Policy.Handle<Exception>().WaitAndRetryForeverAsync(idx =>
- {
- var sec = idx == 0 ? 1 : idx == 1 ? 10 : idx == 2 ? 30 : 60;
- return TimeSpan.FromSeconds(sec);
- }, (ex, time) => { OnException(ex); });
- await politicaWaitAndRetry.ExecuteAsync(async () =>
- {
- ClientConfig = ClientConfig ?? new ConsumerConfig()
- {
- GroupId = "0",
- EnableAutoCommit = true,
- MaxPollIntervalMs = 900000
- };
- if (string.IsNullOrEmpty(ClientConfig.BootstrapServers))
- {
- var cfg = KafkaClientConfig.GetFromConfig();
- ClientConfig.BootstrapServers = cfg.BootstrapServers;
- }
-
- cancellationTokenSource = new CancellationTokenSource();
- consumer = new ConsumerBuilder<Ignore, string>(ClientConfig).Build();
- consumer.Subscribe(TopicNames);
-
- try
- {
- while (true)
- {
- try
- {
- var cr = consumer.Consume(cancellationTokenSource.Token);
- await ConsumeInternal(cr);
- Received?.Invoke(this, new MessageReceivedEventArgs { Message = cr.Message.Value });
- LoggerManager.Logger.Info($"Consumed message '{cr.Message.Value}' at: '{cr.TopicPartitionOffset}'.");
- }
- catch (ConsumeException e)
- {
- LoggerManager.Logger.Error(e, $"Error occured: {e.Error.Reason}");
- throw e;
- }
- catch (Exception e)
- {
- LoggerManager.Logger.Info($"Canceled Consume operation.");
- throw e;
- }
- }
- }
- catch (Exception ex)
- {
- throw ex;
- }
- finally
- {
- try
- {
- cancellationTokenSource.Cancel();
- cancellationTokenSource.Dispose();
- consumer.Dispose();
- }
- catch { }
- }
- });
- });
- }
- protected virtual void Consume()
- {
- new Task(() =>
- {
- var politicaWaitAndRetry = Policy.Handle<Exception>().WaitAndRetryForever(idx =>
- {
- var sec = idx == 0 ? 1 : idx == 1 ? 10 : idx == 2 ? 30 : 60;
- return TimeSpan.FromSeconds(sec);
- }, (ex, time) => { OnException(ex); });
- politicaWaitAndRetry.Execute(() =>
- {
- var cfg = KafkaClientConfig.GetFromConfig();
- ClientConfig = ClientConfig ?? new ConsumerConfig()
- {
- GroupId = cfg?.GroupId ?? "0",
- EnableAutoCommit = true,
- MaxPollIntervalMs = 900000
- };
- if (string.IsNullOrEmpty(ClientConfig.BootstrapServers))
- {
- ClientConfig.BootstrapServers = cfg.BootstrapServers;
- }
-
- cancellationTokenSource = new CancellationTokenSource();
- consumer = new ConsumerBuilder<Ignore, string>(ClientConfig).Build();
- consumer.Subscribe(TopicNames);
-
- try
- {
- while (true)
- {
- try
- {
- var cr = consumer.Consume(cancellationTokenSource.Token);
- ConsumeInternal(cr).Wait();
- Received?.Invoke(this, new MessageReceivedEventArgs { Message = cr.Message.Value });
- LoggerManager.Logger.Info($"Consumed message '{cr.Message.Value}' at: '{cr.TopicPartitionOffset}'.");
- }
- catch (ConsumeException e)
- {
- LoggerManager.Logger.Error(e, $"Error occured: {e.Error.Reason}");
- throw e;
- }
- catch (Exception e)
- {
- LoggerManager.Logger.Info($"Canceled Consume operation.");
- throw e;
- }
- }
- }
- catch (Exception ex)
- {
- throw ex;
- }
- finally
- {
- try
- {
- cancellationTokenSource.Cancel();
- cancellationTokenSource.Dispose();
- consumer.Dispose();
- }
- catch { }
- }
- });
- }, TaskCreationOptions.LongRunning).Start();
- }
- private async Task ConsumeInternal(ConsumeResult<Ignore, string> cr)
- {
- try
- {
- var start = DateTime.Now;
- await ConsumeAsync(cancellationTokenSource, cr.Message.Value);
- var completed = DateTime.Now;
- var secs = (completed - start).TotalSeconds;
- if (secs > 1)
- {
- LoggerManager.Logger.Error($"Task【{string.Join(',', TopicNames)}】运行时间过长【{secs}s】,内容:" + cr.Message.Value);
- OnExecuteTooLong(secs, cr.Message.Value);
- }
- // consumer.Commit(new List<TopicPartitionOffset> { cr.TopicPartitionOffset });
- }
- catch (NullReferenceException)
- {
- LoggerManager.Logger.Error($"Please check if null returned by ConsumeAsync, use Task.CompletedTask instead.");
- }
- catch (Exception ex)
- {
- LoggerManager.Logger.Error(ex, $"Error occured to consume message");
- }
- }
- public abstract Task ConsumeAsync(CancellationTokenSource cts, string message);
- public void Dispose()
- {
- if (consumer != null)
- {
- cancellationTokenSource.Cancel();
- cancellationTokenSource.Dispose();
- consumer.Dispose();
- consumer = null;
- GC.SuppressFinalize(this);
- }
- }
- }
- }
|