I am using Testcontainers to setup Zookeeper and Kafka. Containers seem to be starting but I am noticing some errors in kafka. Also, I am trying to create a topic and produce message, but with no success.
2023-09-06 10:03:52 [2023-09-06 08:03:52,569] WARN [Controller id=1, targetBrokerId=1] Error connecting to node kafka_817e3e9d-9da9-42b7-ade0-cf4bbd7c791c:29092 (id: 1 rack: null) (org.apache.kafka.clients.NetworkClient)
2023-09-06 10:03:52 java.net.UnknownHostException: kafka_817e3e9d-9da9-42b7-ade0-cf4bbd7c791c
2023-09-06 10:03:52 at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)
2023-09-06 10:03:52 at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1524)
2023-09-06 10:03:52 at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1382)
2023-09-06 10:03:52 at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306)
2023-09-06 10:03:52 at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)
2023-09-06 10:03:52 at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)
2023-09-06 10:03:52 at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:510)
2023-09-06 10:03:52 at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:467)
2023-09-06 10:03:52 at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173)
2023-09-06 10:03:52 at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:990)
2023-09-06 10:03:52 at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301)
2023-09-06 10:03:52 at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:64)
private static async Task StartupTask()
{
var containerName = Guid.NewGuid().ToString();
var zookeeperContainerName = $"zookeeper_{containerName}";
var zookeeperContainer = new ContainerBuilder()
.WithImage("confluentinc/cp-zookeeper:latest")
.WithName(zookeeperContainerName)
.WithPortBinding(2181, true)
.WithEnvironment(new Dictionary<string, string>
{
{"ZOOKEEPER_CLIENT_PORT", "2181"},
{"ZOOKEEPER_TICK_TIME", "2000"}
})
.WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(2181))
.Build();
await zookeeperContainer.StartAsync()
.ConfigureAwait(false);
var zookeeperHostPort =
zookeeperContainer.GetMappedPublicPort(2181);
var kafkaContainerName = $"kafka_{containerName}";
var kafkaHostPort = FindFreePort();
var kafkaContainer = new ContainerBuilder()
.WithImage("confluentinc/cp-kafka:latest")
.WithName(kafkaContainerName)
.WithPortBinding(kafkaHostPort, 9092)
.WithEnvironment(new Dictionary<string, string>
{
{"KAFKA_BROKER_ID", "1"},
{"KAFKA_ZOOKEEPER_CONNECT",
$"host.docker.internal:{zookeeperHostPort}"},
{"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT"},
{"KAFKA_LISTENERS",
"PLAINTEXT://:9092,PLAINTEXT_INTERNAL://:29092"},
{"KAFKA_ADVERTISED_LISTENERS",
$"PLAINTEXT://localhost: {kafkaHostPort},PLAINTEXT_INTERNAL://{kafkaContainerName}:29092"},
{"KAFKA_INTER_BROKER_LISTENER_NAME",
"PLAINTEXT_INTERNAL"},
{"KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1"},
{"KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1"},
{"KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true"},
{"KAFKA_DELETE_TOPIC_ENABLE", "true"},
{"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1"},
})
.WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(9092))
.Build();
await kafkaContainer.StartAsync()
.ConfigureAwait(false);
}
To publish messages, I am using the following:
var topicName = $"automation";
var bootstrapServers = $"localhost:{kafkaHostPort}";
await PublishMessage(topicName, bootstrapServers);
}
private static async Task PublishMessage(string topicName, string bootstrapServers)
{
var config = new ProducerConfig
{
BootstrapServers = bootstrapServers
};
using var producer = new ProducerBuilder<Null, string>(config).Build();
try
{
var result = await producer.ProduceAsync
(topicName, new Message<Null,
string> { Value = "my message" });
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
What I have tried:
Publish a message, manually create topics before publishing messages:
private static async Task CreateKafkaTopic
(string topicName, string bootstrapServers)
{
var config = new AdminClientConfig
{
BootstrapServers = bootstrapServers
};
using var adminClient = new AdminClientBuilder(config)
.Build();
await adminClient.CreateTopicsAsync(new[] {
new TopicSpecification { Name = topicName,
ReplicationFactor = 1, NumPartitions = 1 } });
}