MessagingService.cs 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. using Microsoft.AspNetCore.Builder;
  2. using Microsoft.AspNetCore.Hosting;
  3. using Microsoft.Extensions.DependencyInjection;
  4. using Microsoft.Extensions.Hosting;
  5. using System;
  6. using System.Collections.Generic;
  7. using System.Linq;
  8. using System.Text;
  9. using Wicture.DbRESTFul;
  10. using Wicture.DbRESTFul.Configuration;
  11. namespace JiaZhiQuan.Common.Messaging
  12. {
  13. public static class MessagingService
  14. {
  15. private static IEnumerable<IConsumer> consumers;
  16. private static List<Producer> producers;
  17. public static IApplicationBuilder UserMessageConsumption(this IApplicationBuilder app)
  18. {
  19. if (!ConfigurationManager.Settings.Document.ContainsKey(KafkaClientConfig.ConfigSectionName))
  20. {
  21. LoggerManager.Logger.Info("No configuration found, ignore message consumption.");
  22. return app;
  23. }
  24. var aft = app.ApplicationServices.GetRequiredService<IHostApplicationLifetime>();
  25. try
  26. {
  27. consumers = app.ApplicationServices.GetServices<IConsumer>();
  28. if (consumers?.Count() > 0)
  29. {
  30. aft.ApplicationStopping.Register(() =>
  31. {
  32. consumers.ForEach(c => c.Dispose());
  33. });
  34. foreach (var consumer in consumers)
  35. {
  36. consumer.Init();
  37. LoggerManager.Logger.Info($"Initialized consumer `{consumer.GetType().Name}`");
  38. }
  39. }
  40. }
  41. catch (Exception ex)
  42. {
  43. LoggerManager.Logger.Error(ex, "Failed to init consumers.");
  44. }
  45. aft.ApplicationStopping.Register(() =>
  46. {
  47. producers?.ForEach(p => p?.Dispose());
  48. });
  49. return app;
  50. }
  51. public static void RegisterProducer(Producer producer)
  52. {
  53. if (producers == null) producers = new List<Producer>();
  54. producers.Add(producer);
  55. }
  56. }
  57. }