123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 |
- using Microsoft.AspNetCore.Builder;
- using Microsoft.AspNetCore.Hosting;
- using Microsoft.Extensions.DependencyInjection;
- using Microsoft.Extensions.Hosting;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using Wicture.DbRESTFul;
- using Wicture.DbRESTFul.Configuration;
- namespace JiaZhiQuan.Common.Messaging
- {
- public static class MessagingService
- {
- private static IEnumerable<IConsumer> consumers;
- private static List<Producer> producers;
- public static IApplicationBuilder UserMessageConsumption(this IApplicationBuilder app)
- {
- if (!ConfigurationManager.Settings.Document.ContainsKey(KafkaClientConfig.ConfigSectionName))
- {
- LoggerManager.Logger.Info("No configuration found, ignore message consumption.");
- return app;
- }
- var aft = app.ApplicationServices.GetRequiredService<IHostApplicationLifetime>();
- try
- {
- consumers = app.ApplicationServices.GetServices<IConsumer>();
- if (consumers?.Count() > 0)
- {
- aft.ApplicationStopping.Register(() =>
- {
- consumers.ForEach(c => c.Dispose());
- });
- foreach (var consumer in consumers)
- {
- consumer.Init();
- LoggerManager.Logger.Info($"Initialized consumer `{consumer.GetType().Name}`");
- }
- }
- }
- catch (Exception ex)
- {
- LoggerManager.Logger.Error(ex, "Failed to init consumers.");
- }
- aft.ApplicationStopping.Register(() =>
- {
- producers?.ForEach(p => p?.Dispose());
- });
- return app;
- }
- public static void RegisterProducer(Producer producer)
- {
- if (producers == null) producers = new List<Producer>();
- producers.Add(producer);
- }
- }
- }
|