Producer.cs 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. using Confluent.Kafka;
  2. using Confluent.Kafka.Admin;
  3. using Newtonsoft.Json;
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using Wicture.DbRESTFul;
  10. namespace JiaZhiQuan.Common.Messaging
  11. {
  12. public class Producer : IDisposable
  13. {
  14. public ClientConfig ClientConfig { get; set; }
  15. private IProducer<Null, string> producer;
  16. public Producer()
  17. {
  18. var cfg = KafkaClientConfig.GetFromConfig();
  19. ClientConfig = new ClientConfig { BootstrapServers = cfg.BootstrapServers };
  20. producer = new ProducerBuilder<Null, string>(ClientConfig).Build();
  21. /// 注册到服务中,以便能自动 Dispose.
  22. MessagingService.RegisterProducer(this);
  23. }
  24. public async Task CreateTopicAsync(TopicSpecification specification, bool force = false)
  25. {
  26. using (var adminClient = new AdminClientBuilder(new AdminClientConfig(ClientConfig)).Build())
  27. {
  28. try
  29. {
  30. var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20));
  31. LogKafkaMeta(meta);
  32. if (meta.Topics.Any(t => t.Topic == specification.Name))
  33. {
  34. LoggerManager.Logger.Info($"The topic already exists {specification.Name}");
  35. if (!force) return;
  36. else
  37. {
  38. await EnsureDeleteTopicAsync(adminClient, specification.Name);
  39. }
  40. }
  41. await adminClient.CreateTopicsAsync(new TopicSpecification[] { specification });
  42. LoggerManager.Logger.Info($"The topic {specification.Name} created.");
  43. }
  44. catch (CreateTopicsException e)
  45. {
  46. LoggerManager.Logger.Error($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
  47. }
  48. }
  49. }
  50. public async Task ProduceAsync(string topicName, object data)
  51. {
  52. try
  53. {
  54. var dr = await producer.ProduceAsync(topicName, new Message<Null, string> { Value = JsonConvert.SerializeObject(data) });
  55. LoggerManager.Logger.Info($"Produced '{dr.Value}' to '{dr.TopicPartitionOffset}'");
  56. }
  57. catch (ProduceException<Null, string> e)
  58. {
  59. LoggerManager.Logger.Error($"Produce message failed: {e.Error.Reason}");
  60. }
  61. }
  62. private async Task EnsureDeleteTopicAsync(IAdminClient adminClient, string topicName, int retry = 0)
  63. {
  64. var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20));
  65. if (!meta.Topics.Any(t => t.Topic == topicName))
  66. {
  67. Console.WriteLine($"The topic {topicName} was deleted");
  68. return;
  69. }
  70. if (retry > 10)
  71. {
  72. throw new TimeoutException($"Failed to delete the topic: {topicName}.");
  73. }
  74. await adminClient.DeleteTopicsAsync(new string[] { topicName });
  75. await Task.Delay(500);
  76. await EnsureDeleteTopicAsync(adminClient, topicName, retry + 1);
  77. }
  78. private void LogKafkaMeta(Metadata meta)
  79. {
  80. LoggerManager.Logger.Debug($"OriginatingBroker: {meta.OriginatingBrokerId} {meta.OriginatingBrokerName}");
  81. meta.Brokers.ForEach(broker => LoggerManager.Logger.Info($"Broker: {broker.BrokerId} {broker.Host}:{broker.Port}"));
  82. meta.Topics.ForEach(topic =>
  83. {
  84. LoggerManager.Logger.Debug($"Topic: {topic.Topic} {topic.Error}");
  85. topic.Partitions.ForEach(partition =>
  86. {
  87. LoggerManager.Logger.Debug($" Partition: {partition.PartitionId}");
  88. LoggerManager.Logger.Debug($" Replicas: [{string.Join(", ", partition.Replicas)}]");
  89. LoggerManager.Logger.Debug($" InSyncReplicas: [{string.Join(", ", partition.InSyncReplicas)}]");
  90. });
  91. });
  92. }
  93. public void Dispose()
  94. {
  95. if (producer != null)
  96. {
  97. producer.Dispose();
  98. producer = null;
  99. GC.SuppressFinalize(this);
  100. }
  101. }
  102. }
  103. }