using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using System; using System.Collections.Generic; using System.Linq; using System.Text; using Wicture.DbRESTFul; using Wicture.DbRESTFul.Configuration; namespace JiaZhiQuan.Common.Messaging { public static class MessagingService { private static IEnumerable consumers; private static List producers; public static IApplicationBuilder UserMessageConsumption(this IApplicationBuilder app) { if (!ConfigurationManager.Settings.Document.ContainsKey(KafkaClientConfig.ConfigSectionName)) { LoggerManager.Logger.Info("No configuration found, ignore message consumption."); return app; } var aft = app.ApplicationServices.GetRequiredService(); try { consumers = app.ApplicationServices.GetServices(); if (consumers?.Count() > 0) { aft.ApplicationStopping.Register(() => { consumers.ForEach(c => c.Dispose()); }); foreach (var consumer in consumers) { consumer.Init(); LoggerManager.Logger.Info($"Initialized consumer `{consumer.GetType().Name}`"); } } } catch (Exception ex) { LoggerManager.Logger.Error(ex, "Failed to init consumers."); } aft.ApplicationStopping.Register(() => { producers?.ForEach(p => p?.Dispose()); }); return app; } public static void RegisterProducer(Producer producer) { if (producers == null) producers = new List(); producers.Add(producer); } } }