123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- using Confluent.Kafka;
- using Confluent.Kafka.Admin;
- using Newtonsoft.Json;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- using Wicture.DbRESTFul;
- namespace JiaZhiQuan.Common.Messaging
- {
- public class Producer : IDisposable
- {
- public ClientConfig ClientConfig { get; set; }
- private IProducer<Null, string> producer;
- public Producer()
- {
- var cfg = KafkaClientConfig.GetFromConfig();
- ClientConfig = new ClientConfig { BootstrapServers = cfg.BootstrapServers };
- producer = new ProducerBuilder<Null, string>(ClientConfig).Build();
- /// 注册到服务中,以便能自动 Dispose.
- MessagingService.RegisterProducer(this);
- }
- public async Task CreateTopicAsync(TopicSpecification specification, bool force = false)
- {
- using (var adminClient = new AdminClientBuilder(new AdminClientConfig(ClientConfig)).Build())
- {
- try
- {
- var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20));
- LogKafkaMeta(meta);
- if (meta.Topics.Any(t => t.Topic == specification.Name))
- {
- LoggerManager.Logger.Info($"The topic already exists {specification.Name}");
- if (!force) return;
- else
- {
- await EnsureDeleteTopicAsync(adminClient, specification.Name);
- }
- }
- await adminClient.CreateTopicsAsync(new TopicSpecification[] { specification });
- LoggerManager.Logger.Info($"The topic {specification.Name} created.");
- }
- catch (CreateTopicsException e)
- {
- LoggerManager.Logger.Error($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
- }
- }
- }
- public async Task ProduceAsync(string topicName, object data)
- {
- try
- {
- var dr = await producer.ProduceAsync(topicName, new Message<Null, string> { Value = JsonConvert.SerializeObject(data) });
- LoggerManager.Logger.Info($"Produced '{dr.Value}' to '{dr.TopicPartitionOffset}'");
- }
- catch (ProduceException<Null, string> e)
- {
- LoggerManager.Logger.Error($"Produce message failed: {e.Error.Reason}");
- }
- }
- private async Task EnsureDeleteTopicAsync(IAdminClient adminClient, string topicName, int retry = 0)
- {
- var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20));
- if (!meta.Topics.Any(t => t.Topic == topicName))
- {
- Console.WriteLine($"The topic {topicName} was deleted");
- return;
- }
- if (retry > 10)
- {
- throw new TimeoutException($"Failed to delete the topic: {topicName}.");
- }
- await adminClient.DeleteTopicsAsync(new string[] { topicName });
- await Task.Delay(500);
- await EnsureDeleteTopicAsync(adminClient, topicName, retry + 1);
- }
- private void LogKafkaMeta(Metadata meta)
- {
- LoggerManager.Logger.Debug($"OriginatingBroker: {meta.OriginatingBrokerId} {meta.OriginatingBrokerName}");
- meta.Brokers.ForEach(broker => LoggerManager.Logger.Info($"Broker: {broker.BrokerId} {broker.Host}:{broker.Port}"));
- meta.Topics.ForEach(topic =>
- {
- LoggerManager.Logger.Debug($"Topic: {topic.Topic} {topic.Error}");
- topic.Partitions.ForEach(partition =>
- {
- LoggerManager.Logger.Debug($" Partition: {partition.PartitionId}");
- LoggerManager.Logger.Debug($" Replicas: [{string.Join(", ", partition.Replicas)}]");
- LoggerManager.Logger.Debug($" InSyncReplicas: [{string.Join(", ", partition.InSyncReplicas)}]");
- });
- });
- }
- public void Dispose()
- {
- if (producer != null)
- {
- producer.Dispose();
- producer = null;
- GC.SuppressFinalize(this);
- }
- }
- }
- }
|