Apache Kafka client integration
To get started with the Aspire Apache Kafka client integration, install the 📦 Aspire.Confluent.Kafka NuGet package:
dotnet add package Aspire.Confluent.KafkaAdd Kafka producer
In the Program.cs file of your client-consuming project, call the AddKafkaProducer extension method to register an IProducer<TKey, TValue> for use via the dependency injection container:
builder.AddKafkaProducer<string, string>("kafka");
Tip
The connectionName parameter must match the name used when adding the Kafka resource in the AppHost project.
You can then retrieve the IProducer<TKey, TValue> instance using dependency injection:
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
Add Kafka consumer
To register an IConsumer<TKey, TValue> for use via the dependency injection container, call the AddKafkaConsumer extension method:
builder.AddKafkaConsumer<string, string>("kafka");
You can then retrieve the IConsumer<TKey, TValue> instance using dependency injection:
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}
Add keyed Kafka producers or consumers
There might be situations where you want to register multiple producer or consumer instances with different connection names:
AddKeyedKafkaProducer: Registers a keyed Kafka producer.AddKeyedKafkaConsumer: Registers a keyed Kafka consumer.
Connection properties
When you reference a Kafka resource using WithReference, the following connection properties are made available to the consuming project:
Kafka server
The Kafka server resource exposes the following connection properties:
| Property Name | Description |
|---|---|
Host |
The host-facing Kafka listener hostname or IP address |
Port |
The host-facing Kafka listener port |
Example properties:
Host: localhost
Port: 9092
Note
Aspire exposes each property as an environment variable named [RESOURCE]_[PROPERTY]. For instance, the Host property of a resource called kafka becomes KAFKA_HOST.
Configuration
The Apache Kafka integration provides multiple options to configure the connection.
Use a connection string
When using a connection string from the ConnectionStrings configuration section, provide the name when calling builder.AddKafkaProducer() or builder.AddKafkaConsumer():
builder.AddKafkaProducer<string, string>("kafka");
Example configuration:
{
"ConnectionStrings": {
"kafka": "broker:9092"
}
}
Use configuration providers
The Apache Kafka integration supports Microsoft.Extensions.Configuration. It loads the KafkaProducerSettings or KafkaConsumerSettings from configuration using the Aspire:Confluent:Kafka:Producer and Aspire:Confluent:Kafka:Consumer keys. Example appsettings.json:
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
The Config properties bind to instances of ProducerConfig and ConsumerConfig.
Note
Confluent.Kafka.Consumer<TKey, TValue> requires the ClientId property to be set to let the broker track consumed message offsets.
Use named configuration
The Apache Kafka integration supports named configuration for multiple instances:
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"kafka1": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
},
"kafka2": {
"DisableHealthChecks": true,
"Config": {
"Acks": "Leader"
}
}
}
}
}
}
}
Use the connection names when calling the registration methods:
builder.AddKafkaProducer<string, string>("kafka1");
builder.AddKafkaConsumer<string, string>("kafka2");
Use inline delegates
You can pass the Action<KafkaProducerSettings> delegate to set up options inline:
builder.AddKafkaProducer<string, string>(
"kafka",
static settings => settings.DisableHealthChecks = true);
To configure Confluent.Kafka builders, pass an Action<ProducerBuilder<TKey, TValue>>:
builder.AddKafkaProducer<string, MyMessage>(
"kafka",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
});
Client integration health checks
By default, Aspire integrations enable health checks for all services. The Apache Kafka integration handles the following health check scenarios:
- Adds the
Aspire.Confluent.Kafka.Producerhealth check whenDisableHealthChecksisfalse. - Adds the
Aspire.Confluent.Kafka.Consumerhealth check whenDisableHealthChecksisfalse. - Integrates with the
/healthHTTP endpoint.
Observability and telemetry
Logging
The Apache Kafka integration uses the following log categories:
Aspire.Confluent.Kafka
Tracing
The Apache Kafka integration doesn't emit distributed traces.
Metrics
The Apache Kafka integration emits the following metrics using OpenTelemetry:
Aspire.Confluent.Kafkamessaging.kafka.network.txmessaging.kafka.network.transmittedmessaging.kafka.network.rxmessaging.kafka.network.receivedmessaging.publish.messagesmessaging.kafka.message.transmittedmessaging.receive.messagesmessaging.kafka.message.received