using Confluent.Kafka; using Polly; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; using Wicture.DbRESTFul; namespace JiaZhiQuan.Common.Messaging { public abstract class BaseConsumer : IConsumer { public event EventHandler Received; private IConsumer consumer; public abstract ConsumerConfig ClientConfig { get; set; } public abstract IEnumerable TopicNames { get; set; } private CancellationTokenSource cancellationTokenSource; public void Init() { Consume(); } protected virtual void OnException(Exception ex) { } protected virtual void OnExecuteTooLong(double secs, string message) { } protected virtual void Consume1() { Task.Run(async () => { var politicaWaitAndRetry = Policy.Handle().WaitAndRetryForeverAsync(idx => { var sec = idx == 0 ? 1 : idx == 1 ? 10 : idx == 2 ? 30 : 60; return TimeSpan.FromSeconds(sec); }, (ex, time) => { OnException(ex); }); await politicaWaitAndRetry.ExecuteAsync(async () => { ClientConfig = ClientConfig ?? new ConsumerConfig() { GroupId = "0", EnableAutoCommit = true, MaxPollIntervalMs = 900000 }; if (string.IsNullOrEmpty(ClientConfig.BootstrapServers)) { var cfg = KafkaClientConfig.GetFromConfig(); ClientConfig.BootstrapServers = cfg.BootstrapServers; } cancellationTokenSource = new CancellationTokenSource(); consumer = new ConsumerBuilder(ClientConfig).Build(); consumer.Subscribe(TopicNames); try { while (true) { try { var cr = consumer.Consume(cancellationTokenSource.Token); await ConsumeInternal(cr); Received?.Invoke(this, new MessageReceivedEventArgs { Message = cr.Message.Value }); LoggerManager.Logger.Info($"Consumed message '{cr.Message.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { LoggerManager.Logger.Error(e, $"Error occured: {e.Error.Reason}"); throw e; } catch (Exception e) { LoggerManager.Logger.Info($"Canceled Consume operation."); throw e; } } } catch (Exception ex) { throw ex; } finally { try { cancellationTokenSource.Cancel(); cancellationTokenSource.Dispose(); consumer.Dispose(); } catch { } } }); }); } protected virtual void Consume() { new Task(() => { var politicaWaitAndRetry = Policy.Handle().WaitAndRetryForever(idx => { var sec = idx == 0 ? 1 : idx == 1 ? 10 : idx == 2 ? 30 : 60; return TimeSpan.FromSeconds(sec); }, (ex, time) => { OnException(ex); }); politicaWaitAndRetry.Execute(() => { var cfg = KafkaClientConfig.GetFromConfig(); ClientConfig = ClientConfig ?? new ConsumerConfig() { GroupId = cfg?.GroupId ?? "0", EnableAutoCommit = true, MaxPollIntervalMs = 900000 }; if (string.IsNullOrEmpty(ClientConfig.BootstrapServers)) { ClientConfig.BootstrapServers = cfg.BootstrapServers; } cancellationTokenSource = new CancellationTokenSource(); consumer = new ConsumerBuilder(ClientConfig).Build(); consumer.Subscribe(TopicNames); try { while (true) { try { var cr = consumer.Consume(cancellationTokenSource.Token); ConsumeInternal(cr).Wait(); Received?.Invoke(this, new MessageReceivedEventArgs { Message = cr.Message.Value }); LoggerManager.Logger.Info($"Consumed message '{cr.Message.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { LoggerManager.Logger.Error(e, $"Error occured: {e.Error.Reason}"); throw e; } catch (Exception e) { LoggerManager.Logger.Info($"Canceled Consume operation."); throw e; } } } catch (Exception ex) { throw ex; } finally { try { cancellationTokenSource.Cancel(); cancellationTokenSource.Dispose(); consumer.Dispose(); } catch { } } }); }, TaskCreationOptions.LongRunning).Start(); } private async Task ConsumeInternal(ConsumeResult cr) { try { var start = DateTime.Now; await ConsumeAsync(cancellationTokenSource, cr.Message.Value); var completed = DateTime.Now; var secs = (completed - start).TotalSeconds; if (secs > 1) { LoggerManager.Logger.Error($"Task【{string.Join(',', TopicNames)}】运行时间过长【{secs}s】,内容:" + cr.Message.Value); OnExecuteTooLong(secs, cr.Message.Value); } // consumer.Commit(new List { cr.TopicPartitionOffset }); } catch (NullReferenceException) { LoggerManager.Logger.Error($"Please check if null returned by ConsumeAsync, use Task.CompletedTask instead."); } catch (Exception ex) { LoggerManager.Logger.Error(ex, $"Error occured to consume message"); } } public abstract Task ConsumeAsync(CancellationTokenSource cts, string message); public void Dispose() { if (consumer != null) { cancellationTokenSource.Cancel(); cancellationTokenSource.Dispose(); consumer.Dispose(); consumer = null; GC.SuppressFinalize(this); } } } }