Kafka
Apache Kafka is a scalable distributed streaming platform. It’s best suited for handling real-time data streams. The Kafka add-on provides an integration of both streams and pub/sub clients, using the Kafka API.
Dependencies
To add the Kafka add-on to your project, add the following dependency:
<dependency>
<groupId>org.seedstack.addons.kafka</groupId>
<artifactId>kafka</artifactId>
</dependency>
Show version
dependencies {
compile("org.seedstack.addons.kafka:kafka:2.0.2")
}
You must also add the Apache KAFKA implementation for basic clients at least, and optionally for streams:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.2.0</version>
</dependency>
dependencies {
compile("org.apache.kafka:kafka-streams:2.2.0")
}
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
dependencies {
compile("org.apache.kafka:kafka-clients:2.2.0")
}
Configuration
Configuration is done by declaring one or more MQTT clients:
kafka:
# Configured Kafka streams with the name of the stream as key
streams:
stream1:
# Kafka properties for configuring the stream
properties:
property1: value1
consumers:
# Configured Kafka consumer with the name of the consumer as key
consumer1:
# Kafka properties for configuring the consumer
properties:
property1: value1
producers:
# Configured Kafka producer with the name of the producer as key
producer1:
# Kafka properties for configuring the producer
properties:
property1: value1
To dump the kafka
configuration options:
mvn -q -Dargs="kafka" seedstack:config
To enable transactions for a particular producer, specify the transactional.id
property on the producer with a unique id. Then use the Kafka transactional API to send messages.
Publishing
To publish messages, inject the Producer
interface qualified with a
configured producer name:
public class SomeClass {
@Inject
@Named("producer1")
private Producer<Integer, String> producer;
}
Use the Kafka API to send messages. If the produced is configured as transactional, you must enclose your calls to the
send()
method with the programmatic Kafka transaction methods:
public class SomeClass {
@Inject
@Named("producer1")
private Producer<Integer, String> producer;
public void someMethod() throws InterruptedException, IOException {
producer.send(new ProducerRecord<>("topic", "test"));
}
}
Do not explicitly close the producer, it will be automatically closed on application shutdown.
Receiving
To receive Kafka records, create a class implementing the ConsumerListener
interface and annotated with @KafkaListener
:
@KafkaListener(value = "consumer1", topics = "someTopic")
public class MyConsumerListener implements ConsumerListener<Integer, String> {
@Logging
private Logger logger;
@Override
public void onConsumerRecord(ConsumerRecord<Integer, String> r) {
logger.info("Received {}:{}", r.key(), r.value());
}
@Override
public void onException(Exception e) throws Exception {
// process any exception and re-throw an exception if reception must be temporary stopped
}
}
If an exception is re-thrown from the onException()
method, the reception will temporarily stop and the underlying
consumer will be gracefully shutdown. A new attempt, with new consumer and listener instances, will be scheduled after
the retry delay.
Using the annotation, you can specify:
- The name of the consumer in configuration that will be used to create the underlying consumer.
- The topic or the topic regular expression pattern to subscribe to.
- The delay to wait before retry in milliseconds.
Streaming
To build a Kafka stream subscribed to one or more topic(s), create a class implementing the StreamBuilder
interface and annotated with @KafkaListener
:
@KafkaListener(value = "stream1", topics = "someTopic")
public class MyStreamBuilder implements StreamBuilder<Integer, String> {
@Logging
private Logger logger;
@Override
public void buildStream(KStream<Integer, String> stream) {
stream.foreach((key, value) -> {
logger.info("Processed: {}:{}", key, value);
});
}
@Override
public void onException(Exception e) {
// process any exception and re-throw an exception if reception must be temporary stopped
}
}
If an exception is re-thrown from the onException()
method, the streaming will temporarily stop and the underlying
stream client will be gracefully shutdown. A new attempt, with new stream client and builder instances, will be
scheduled after the retry delay.
Using the annotation, you can specify:
- The name of the stream in configuration that will be used to create the underlying stream client.
- The topic or the topic regular expression pattern to subscribe to.
- The delay to wait before retry in milliseconds.