Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

Kafka Produce Not Throwing Exception when Unable to Connect to kafka Cluster in C#

0.00/5 (No votes)
24 Apr 2020CPOL 13.4K  
A quick tip on dealing with failures and errors with Kafta
Kafka's standard error handling policy does not displaying errors while processing your messages. In this tip, you will see a step by step method of how to deal with failures and errors in Kafka.

Introduction

The handling of failures and errors in Kafta is not a trivial task, by default, the producer does not return any errors when trying to connect to the broker.

I'm using the Confluent.Kafka 1.4 library for C#.

Using the Code

The first step is to perform the parameterization of the producer and consumer configuration.

C#
private void KafkaConfig(out string[] topics, out ProducerConfig producerConfig,
                         out ConsumerConfig consumerConfig)
{
    var result = new StringBuilder();
    topics = new string[] { "Teste" };
    var config = new KafkaConsumerConfig();
    _configuration.Bind("KafkaConsumer", config);

    Write(this.HttpContext, "--Kafka Config....");
    Write(this.HttpContext, "--Kafka topics: " + topics[0].ToString());

    if (string.IsNullOrEmpty(config.BootstrapServers))
        Write(this.HttpContext, "--Kafka Servers connection is empty.");

    Write(this.HttpContext, "--BootstrapServers: " + config.BootstrapServers);
    Write(this.HttpContext, "--GroupId: " + config.GroupId);

    producerConfig = new ProducerConfig
    {
        BootstrapServers = config.BootstrapServers,
        StatisticsIntervalMs = 5000,
        MessageTimeoutMs = 10000,   // by default, the producer will attempt
                                    // to deliver messages
                                    // for 5 minutes (default value of
                                    // message.timeout.ms
        SocketTimeoutMs = 10000,
        ApiVersionRequestTimeoutMs = 10000,
        MetadataRequestTimeoutMs = 5000,
        RequestTimeoutMs = 5000
    };
    consumerConfig = new ConsumerConfig
    {
        BootstrapServers = config.BootstrapServers,
        GroupId = config.GroupId,
        EnableAutoCommit = true,
        StatisticsIntervalMs = 5000,
        SessionTimeoutMs = 10000
    };
}

The next step is to build the producer to record the message in the queue. If kafka is unable to connect to the broker, it will display the message showing the server is unavailable.

C#
private void KafkaProducer(string[] topics, ProducerConfig producerConfig)
{
    Write(this.HttpContext, "--Kafka Producer...");

    // Action<DeliveryReport<Null, string>> handlerDelivery = r =>
    // Write(this.HttpContext, "--Kafka Producer message: " + r.Error.Reason +
    //                         "Delivered message to" + r.TopicPartitionOffset);
    // I don't recommend using this because of this:
    // https://github.com/confluentinc/confluent-kafka-dotnet/issues/1025

    using (var producer = new ProducerBuilder<Null, string>(producerConfig)
    .SetErrorHandler((producer, error) =>
        {
            Write(this.HttpContext, "--Kafka Producer Error: " + error.Reason);
        }
    )
    .SetStatisticsHandler((_, json) => Write(this.HttpContext, "Statistics: " + json))
    .Build())
    {
        for (int i = 0; i < 10; ++i)
        {
            // producer.Produce(topics[0].ToString(), new Message<Null, string>
            // { Value = i.ToString() },
            //                  handlerDelivery); //Não está retornando erro.
                                                  //(Não recomendo.)
            var dr = producer.ProduceAsync(topics[0].ToString(),
                     new Message<Null, string> { Value = i.ToString() });
            Write(this.HttpContext, "--Kafka dr: " + dr.Result.Value);
        }

        // wait for up to 1 seconds for any inflight messages to be delivered.
        producer.Flush(TimeSpan.FromSeconds(1));
    }
}

The next step is to carry out the consumer implementation:

C#
private void KafkaConsumer(string[] topics, ConsumerConfig consumerConfig)
{
    Write(this.HttpContext, "--Kafka Consumer...");

    using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig)
    .SetErrorHandler((_, e) =>
        {
            Write(this.HttpContext, "--Kafka Consumer Error: " + e.Reason);
        }
    )
    .SetStatisticsHandler((_, json) => Write(this.HttpContext, "Statistics: " + json ))
    .Build())
    {
        consumer.Subscribe(topics[0].ToString());

        try
        {
            while (true)
            {
                try
                {
                    var cr = consumer.Consume(10000);

                    if(cr==null)
                        Write(this.HttpContext, "--Kafka Falha consumer  null");

                    Write(this.HttpContext, "--Kafka Consumed message: " +
                                            cr?.Message.Value +
                          " TopicPartitionOffset " + cr?.TopicPartitionOffset + " ");
                }
                catch (ConsumeException e)
                {
                    Write(this.HttpContext, "--Kafka Error ConsumeException:" +
                                             e.Error.Reason);
                }
            }
        }
        catch (OperationCanceledException ex)
        {
            Write(this.HttpContext,
                  "--Kafka OperationCanceledException:" + ex.Message);
            // Ensure the consumer leaves the group cleanly and
            // final offsets are committed.
            consumer.Close();
        }
    }
}

The code below helps in visualizing the server responses are sent according to the progress of Kafka processing:

C#
void Write(HttpContext context, string text)
{
    context.Response.WriteAsync(string.Format("<p>[{0}] {1}</p> </br>",
                                DateTime.Now, text));
    context.Response.Body.FlushAsync();
    context.Response.Body.Flush();
}

Points of Interest

Kafka's standard error handling policy has a policy of not displaying errors while processing your messages.

Resources

History

  • 24th April, 2020: Initial version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)