Kafka's standard error handling policy does not displaying errors while processing your messages. In this tip, you will see a step by step method of how to deal with failures and errors in Kafka.
Introduction
The handling of failures and errors in Kafta is not a trivial task, by default, the producer does not return any errors when trying to connect to the broker.
I'm using the Confluent.Kafka
1.4 library for C#.
Using the Code
The first step is to perform the parameterization of the producer and consumer configuration.
private void KafkaConfig(out string[] topics, out ProducerConfig producerConfig,
out ConsumerConfig consumerConfig)
{
var result = new StringBuilder();
topics = new string[] { "Teste" };
var config = new KafkaConsumerConfig();
_configuration.Bind("KafkaConsumer", config);
Write(this.HttpContext, "--Kafka Config....");
Write(this.HttpContext, "--Kafka topics: " + topics[0].ToString());
if (string.IsNullOrEmpty(config.BootstrapServers))
Write(this.HttpContext, "--Kafka Servers connection is empty.");
Write(this.HttpContext, "--BootstrapServers: " + config.BootstrapServers);
Write(this.HttpContext, "--GroupId: " + config.GroupId);
producerConfig = new ProducerConfig
{
BootstrapServers = config.BootstrapServers,
StatisticsIntervalMs = 5000,
MessageTimeoutMs = 10000,
SocketTimeoutMs = 10000,
ApiVersionRequestTimeoutMs = 10000,
MetadataRequestTimeoutMs = 5000,
RequestTimeoutMs = 5000
};
consumerConfig = new ConsumerConfig
{
BootstrapServers = config.BootstrapServers,
GroupId = config.GroupId,
EnableAutoCommit = true,
StatisticsIntervalMs = 5000,
SessionTimeoutMs = 10000
};
}
The next step is to build the producer to record the message in the queue. If kafka is unable to connect to the broker, it will display the message showing the server is unavailable.
private void KafkaProducer(string[] topics, ProducerConfig producerConfig)
{
Write(this.HttpContext, "--Kafka Producer...");
using (var producer = new ProducerBuilder<Null, string>(producerConfig)
.SetErrorHandler((producer, error) =>
{
Write(this.HttpContext, "--Kafka Producer Error: " + error.Reason);
}
)
.SetStatisticsHandler((_, json) => Write(this.HttpContext, "Statistics: " + json))
.Build())
{
for (int i = 0; i < 10; ++i)
{
var dr = producer.ProduceAsync(topics[0].ToString(),
new Message<Null, string> { Value = i.ToString() });
Write(this.HttpContext, "--Kafka dr: " + dr.Result.Value);
}
producer.Flush(TimeSpan.FromSeconds(1));
}
}
The next step is to carry out the consumer implementation:
private void KafkaConsumer(string[] topics, ConsumerConfig consumerConfig)
{
Write(this.HttpContext, "--Kafka Consumer...");
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig)
.SetErrorHandler((_, e) =>
{
Write(this.HttpContext, "--Kafka Consumer Error: " + e.Reason);
}
)
.SetStatisticsHandler((_, json) => Write(this.HttpContext, "Statistics: " + json ))
.Build())
{
consumer.Subscribe(topics[0].ToString());
try
{
while (true)
{
try
{
var cr = consumer.Consume(10000);
if(cr==null)
Write(this.HttpContext, "--Kafka Falha consumer null");
Write(this.HttpContext, "--Kafka Consumed message: " +
cr?.Message.Value +
" TopicPartitionOffset " + cr?.TopicPartitionOffset + " ");
}
catch (ConsumeException e)
{
Write(this.HttpContext, "--Kafka Error ConsumeException:" +
e.Error.Reason);
}
}
}
catch (OperationCanceledException ex)
{
Write(this.HttpContext,
"--Kafka OperationCanceledException:" + ex.Message);
consumer.Close();
}
}
}
The code below helps in visualizing the server responses are sent according to the progress of Kafka processing:
void Write(HttpContext context, string text)
{
context.Response.WriteAsync(string.Format("<p>[{0}] {1}</p> </br>",
DateTime.Now, text));
context.Response.Body.FlushAsync();
context.Response.Body.Flush();
}
Points of Interest
Kafka's standard error handling policy has a policy of not displaying errors while processing your messages.
Resources
History
- 24th April, 2020: Initial version