Apache Kafka client integration

Apache Kafka logo

To get started with the Aspire Apache Kafka client integration, install the 📦 Aspire.Confluent.Kafka NuGet package:

Install the NuGet package
dotnet add package Aspire.Confluent.Kafka

Add Kafka producer

In the Program.cs file of your client-consuming project, call the AddKafkaProducer extension method to register an IProducer<TKey, TValue> for use via the dependency injection container:

builder.AddKafkaProducer<string, string>("kafka");

Tip

The connectionName parameter must match the name used when adding the Kafka resource in the AppHost project.

You can then retrieve the IProducer<TKey, TValue> instance using dependency injection:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

Add Kafka consumer

To register an IConsumer<TKey, TValue> for use via the dependency injection container, call the AddKafkaConsumer extension method:

builder.AddKafkaConsumer<string, string>("kafka");

You can then retrieve the IConsumer<TKey, TValue> instance using dependency injection:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

Add keyed Kafka producers or consumers

There might be situations where you want to register multiple producer or consumer instances with different connection names:

  • AddKeyedKafkaProducer: Registers a keyed Kafka producer.
  • AddKeyedKafkaConsumer: Registers a keyed Kafka consumer.

Connection properties

When you reference a Kafka resource using WithReference, the following connection properties are made available to the consuming project:

Kafka server

The Kafka server resource exposes the following connection properties:

Property Name Description
Host The host-facing Kafka listener hostname or IP address
Port The host-facing Kafka listener port

Example properties:

Host: localhost
Port: 9092

Note

Aspire exposes each property as an environment variable named [RESOURCE]_[PROPERTY]. For instance, the Host property of a resource called kafka becomes KAFKA_HOST.

Configuration

The Apache Kafka integration provides multiple options to configure the connection.

Use a connection string

When using a connection string from the ConnectionStrings configuration section, provide the name when calling builder.AddKafkaProducer() or builder.AddKafkaConsumer():

builder.AddKafkaProducer<string, string>("kafka");

Example configuration:

{
  "ConnectionStrings": {
    "kafka": "broker:9092"
  }
}

Use configuration providers

The Apache Kafka integration supports Microsoft.Extensions.Configuration. It loads the KafkaProducerSettings or KafkaConsumerSettings from configuration using the Aspire:Confluent:Kafka:Producer and Aspire:Confluent:Kafka:Consumer keys. Example appsettings.json:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

The Config properties bind to instances of ProducerConfig and ConsumerConfig.

Note

Confluent.Kafka.Consumer<TKey, TValue> requires the ClientId property to be set to let the broker track consumed message offsets.

Use named configuration

The Apache Kafka integration supports named configuration for multiple instances:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "kafka1": {
            "DisableHealthChecks": false,
            "Config": {
              "Acks": "All"
            }
          },
          "kafka2": {
            "DisableHealthChecks": true,
            "Config": {
              "Acks": "Leader"
            }
          }
        }
      }
    }
  }
}

Use the connection names when calling the registration methods:

builder.AddKafkaProducer<string, string>("kafka1");
builder.AddKafkaConsumer<string, string>("kafka2");

Use inline delegates

You can pass the Action<KafkaProducerSettings> delegate to set up options inline:

builder.AddKafkaProducer<string, string>(
    "kafka",
    static settings => settings.DisableHealthChecks = true);

To configure Confluent.Kafka builders, pass an Action<ProducerBuilder<TKey, TValue>>:

builder.AddKafkaProducer<string, MyMessage>(
    "kafka",
    static producerBuilder =>
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    });

Client integration health checks

By default, Aspire integrations enable health checks for all services. The Apache Kafka integration handles the following health check scenarios:

  • Adds the Aspire.Confluent.Kafka.Producer health check when DisableHealthChecks is false.
  • Adds the Aspire.Confluent.Kafka.Consumer health check when DisableHealthChecks is false.
  • Integrates with the /health HTTP endpoint.

Observability and telemetry

Logging

The Apache Kafka integration uses the following log categories:

  • Aspire.Confluent.Kafka

Tracing

The Apache Kafka integration doesn't emit distributed traces.

Metrics

The Apache Kafka integration emits the following metrics using OpenTelemetry:

  • Aspire.Confluent.Kafka
    • messaging.kafka.network.tx
    • messaging.kafka.network.transmitted
    • messaging.kafka.network.rx
    • messaging.kafka.network.received
    • messaging.publish.messages
    • messaging.kafka.message.transmitted
    • messaging.receive.messages
    • messaging.kafka.message.received