using Confluent.Kafka; using Confluent.Kafka.Admin; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Wicture.DbRESTFul; namespace JiaZhiQuan.Common.Messaging { public class Producer : IDisposable { public ClientConfig ClientConfig { get; set; } private IProducer producer; public Producer() { var cfg = KafkaClientConfig.GetFromConfig(); ClientConfig = new ClientConfig { BootstrapServers = cfg.BootstrapServers }; producer = new ProducerBuilder(ClientConfig).Build(); /// 注册到服务中,以便能自动 Dispose. MessagingService.RegisterProducer(this); } public async Task CreateTopicAsync(TopicSpecification specification, bool force = false) { using (var adminClient = new AdminClientBuilder(new AdminClientConfig(ClientConfig)).Build()) { try { var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20)); LogKafkaMeta(meta); if (meta.Topics.Any(t => t.Topic == specification.Name)) { LoggerManager.Logger.Info($"The topic already exists {specification.Name}"); if (!force) return; else { await EnsureDeleteTopicAsync(adminClient, specification.Name); } } await adminClient.CreateTopicsAsync(new TopicSpecification[] { specification }); LoggerManager.Logger.Info($"The topic {specification.Name} created."); } catch (CreateTopicsException e) { LoggerManager.Logger.Error($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); } } } public async Task ProduceAsync(string topicName, object data) { try { var dr = await producer.ProduceAsync(topicName, new Message { Value = JsonConvert.SerializeObject(data) }); LoggerManager.Logger.Info($"Produced '{dr.Value}' to '{dr.TopicPartitionOffset}'"); } catch (ProduceException e) { LoggerManager.Logger.Error($"Produce message failed: {e.Error.Reason}"); } } private async Task EnsureDeleteTopicAsync(IAdminClient adminClient, string topicName, int retry = 0) { var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20)); if (!meta.Topics.Any(t => t.Topic == topicName)) { Console.WriteLine($"The topic {topicName} was deleted"); return; } if (retry > 10) { throw new TimeoutException($"Failed to delete the topic: {topicName}."); } await adminClient.DeleteTopicsAsync(new string[] { topicName }); await Task.Delay(500); await EnsureDeleteTopicAsync(adminClient, topicName, retry + 1); } private void LogKafkaMeta(Metadata meta) { LoggerManager.Logger.Debug($"OriginatingBroker: {meta.OriginatingBrokerId} {meta.OriginatingBrokerName}"); meta.Brokers.ForEach(broker => LoggerManager.Logger.Info($"Broker: {broker.BrokerId} {broker.Host}:{broker.Port}")); meta.Topics.ForEach(topic => { LoggerManager.Logger.Debug($"Topic: {topic.Topic} {topic.Error}"); topic.Partitions.ForEach(partition => { LoggerManager.Logger.Debug($" Partition: {partition.PartitionId}"); LoggerManager.Logger.Debug($" Replicas: [{string.Join(", ", partition.Replicas)}]"); LoggerManager.Logger.Debug($" InSyncReplicas: [{string.Join(", ", partition.InSyncReplicas)}]"); }); }); } public void Dispose() { if (producer != null) { producer.Dispose(); producer = null; GC.SuppressFinalize(this); } } } }