Kafka

Apache Kafka is a scalable distributed streaming platform. It’s best suited for handling real-time data streams. The Kafka add-on provides an integration of both streams and pub/sub clients, using the Kafka API.

Dependencies

To add the Kafka add-on to your project, add the following dependency:

<dependency>
    <groupId>org.seedstack.addons.kafka</groupId>
    <artifactId>kafka</artifactId>
</dependency>
Show version
dependencies {
    compile("org.seedstack.addons.kafka:kafka:2.0.2")
}

You must also add the Apache KAFKA implementation for basic clients at least, and optionally for streams:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.2.0</version>
</dependency>
dependencies {
    compile("org.apache.kafka:kafka-streams:2.2.0")
}
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.0</version>
</dependency>
dependencies {
    compile("org.apache.kafka:kafka-clients:2.2.0")
}

Configuration

Configuration is done by declaring one or more MQTT clients:

kafka:
  # Configured Kafka streams with the name of the stream as key
  streams:
    stream1:
      # Kafka properties for configuring the stream
      properties:
        property1: value1
  consumers:
    # Configured Kafka consumer with the name of the consumer as key
    consumer1:
      # Kafka properties for configuring the consumer
      properties:
        property1: value1
  producers:
    # Configured Kafka producer with the name of the producer as key
    producer1:
      # Kafka properties for configuring the producer
      properties:
        property1: value1

To dump the kafka configuration options:

mvn -q -Dargs="kafka" seedstack:config

To enable transactions for a particular producer, specify the transactional.id property on the producer with a unique id. Then use the Kafka transactional API to send messages.

Publishing

To publish messages, inject the Producer interface qualified with a configured producer name:

public class SomeClass {
    @Inject
    @Named("producer1")
    private Producer<Integer, String> producer;
}

Use the Kafka API to send messages. If the produced is configured as transactional, you must enclose your calls to the send() method with the programmatic Kafka transaction methods:

public class SomeClass {
    @Inject
    @Named("producer1")
    private Producer<Integer, String> producer;
    
    public void someMethod() throws InterruptedException, IOException {
        producer.send(new ProducerRecord<>("topic", "test"));
    }
}

Do not explicitly close the producer, it will be automatically closed on application shutdown.

Receiving

To receive Kafka records, create a class implementing the ConsumerListener interface and annotated with @KafkaListener:

@KafkaListener(value = "consumer1", topics = "someTopic")
public class MyConsumerListener implements ConsumerListener<Integer, String> {
    @Logging
    private Logger logger;

    @Override
    public void onConsumerRecord(ConsumerRecord<Integer, String> r) {
        logger.info("Received {}:{}", r.key(), r.value());
    }

    @Override
    public void onException(Exception e) throws Exception {
        // process any exception and re-throw an exception if reception must be temporary stopped 
    }
}

If an exception is re-thrown from the onException() method, the reception will temporarily stop and the underlying consumer will be gracefully shutdown. A new attempt, with new consumer and listener instances, will be scheduled after the retry delay.

Using the annotation, you can specify:

  • The name of the consumer in configuration that will be used to create the underlying consumer.
  • The topic or the topic regular expression pattern to subscribe to.
  • The delay to wait before retry in milliseconds.

Streaming

To build a Kafka stream subscribed to one or more topic(s), create a class implementing the StreamBuilder interface and annotated with @KafkaListener:

@KafkaListener(value = "stream1", topics = "someTopic")
public class MyStreamBuilder implements StreamBuilder<Integer, String> {
    @Logging
    private Logger logger;

    @Override
    public void buildStream(KStream<Integer, String> stream) {
        stream.foreach((key, value) -> {
            logger.info("Processed: {}:{}", key, value);
        });
    }

    @Override
    public void onException(Exception e) {
        // process any exception and re-throw an exception if reception must be temporary stopped 
    }
}

If an exception is re-thrown from the onException() method, the streaming will temporarily stop and the underlying stream client will be gracefully shutdown. A new attempt, with new stream client and builder instances, will be scheduled after the retry delay.

Using the annotation, you can specify:

  • The name of the stream in configuration that will be used to create the underlying stream client.
  • The topic or the topic regular expression pattern to subscribe to.
  • The delay to wait before retry in milliseconds.

On this page


Edit this page