
Apache Kafka is a scalable distributed streaming platform. It’s best suited for handling real-time data streams. The Kafka add-on provides an integration of both streams and pub/sub clients, using the Kafka API.


To add the Kafka add-on to your project, add the following dependency:

Show version
dependencies {

You must also add the Apache KAFKA implementation for basic clients at least, and optionally for streams:

dependencies {
dependencies {


Configuration is done by declaring one or more MQTT clients:

  # Configured Kafka streams with the name of the stream as key
      # Kafka properties for configuring the stream
        property1: value1
    # Configured Kafka consumer with the name of the consumer as key
      # Kafka properties for configuring the consumer
        property1: value1
    # Configured Kafka producer with the name of the producer as key
      # Kafka properties for configuring the producer
        property1: value1

To dump the kafka configuration options:

mvn -q -Dargs="kafka" seedstack:config

To enable transactions for a particular producer, specify the transactional.id property on the producer with a unique id. Then use the Kafka transactional API to send messages.


To publish messages, inject the Producer interface qualified with a configured producer name:

public class SomeClass {
    private Producer<Integer, String> producer;

Use the Kafka API to send messages. If the produced is configured as transactional, you must enclose your calls to the send() method with the programmatic Kafka transaction methods:

public class SomeClass {
    private Producer<Integer, String> producer;
    public void someMethod() throws InterruptedException, IOException {
        producer.send(new ProducerRecord<>("topic", "test"));

Do not explicitly close the producer, it will be automatically closed on application shutdown.


To receive Kafka records, create a class implementing the ConsumerListener interface and annotated with @KafkaListener:

@KafkaListener(value = "consumer1", topics = "someTopic")
public class MyConsumerListener implements ConsumerListener<Integer, String> {
    private Logger logger;

    public void onConsumerRecord(ConsumerRecord<Integer, String> r) {
        logger.info("Received {}:{}", r.key(), r.value());

    public void onException(Exception e) throws Exception {
        // process any exception and re-throw an exception if reception must be temporary stopped 

If an exception is re-thrown from the onException() method, the reception will temporarily stop and the underlying consumer will be gracefully shutdown. A new attempt, with new consumer and listener instances, will be scheduled after the retry delay.

Using the annotation, you can specify:

  • The name of the consumer in configuration that will be used to create the underlying consumer.
  • The topic or the topic regular expression pattern to subscribe to.
  • The delay to wait before retry in milliseconds.


To build a Kafka stream subscribed to one or more topic(s), create a class implementing the StreamBuilder interface and annotated with @KafkaListener:

@KafkaListener(value = "stream1", topics = "someTopic")
public class MyStreamBuilder implements StreamBuilder<Integer, String> {
    private Logger logger;

    public void buildStream(KStream<Integer, String> stream) {
        stream.foreach((key, value) -> {
            logger.info("Processed: {}:{}", key, value);

    public void onException(Exception e) {
        // process any exception and re-throw an exception if reception must be temporary stopped 

If an exception is re-thrown from the onException() method, the streaming will temporarily stop and the underlying stream client will be gracefully shutdown. A new attempt, with new stream client and builder instances, will be scheduled after the retry delay.

Using the annotation, you can specify:

  • The name of the stream in configuration that will be used to create the underlying stream client.
  • The topic or the topic regular expression pattern to subscribe to.
  • The delay to wait before retry in milliseconds.

On this page

Edit this page