Kafka

Apache Kafka is a scalable distributed streaming platform. It’s best suited for handling real-time data streams. The Kafka add-on provides an integration of both streams and pub/sub clients, using the Kafka API.

Dependencies

To add the Kafka add-on to your project, add the following dependency:

<dependency>
    <groupId>org.seedstack.addons.kafka</groupId>
    <artifactId>kafka</artifactId>
</dependency>
Show version
dependencies {
    compile("org.seedstack.addons.kafka:kafka:1.0.0")
}

You must also add the Apache KAFKA implementation for streams or clients:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>0.11.0.0</version>
</dependency>
dependencies {
    compile("org.apache.kafka:kafka-streams:0.11.0.0")
}
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>
dependencies {
    compile("org.apache.kafka:kafka-clients:0.11.0.0")
}

Configuration

Configuration is done by declaring one or more MQTT clients:

kafka:
  # Configured streams with the name of the stream as key
  streams:
    stream1:
      # List of listening topics
      topics: (List<Class<?>>)
      # Listening topic pattern (topics parameter is ignored when set)
      topicPattern: (String)
      # Needed kafka-streams properties
      properties:
        propertie1: value1
  consumers:
    # Configured kafka-client consumer with the name of the consumer as key
    consumer1:
      # List of listening topics
      topics: (List<Class<?>>)
      # Listening topic pattern (topics parameter is ignored when set)
      topicPattern: (String)
      # Needed kafka-client properties
      properties:
        propertie1: value1
  producers:
    # Configured client producer with the name of the producer as key
    producer1:
      # Needed kafka-client producer properties
      properties:
        propertie1: value1

To dump the kafka configuration options:

mvn -q -Dargs="kafka" seedstack:config

Consuming messages

To receive Kafka messages, create a consumer class which implements the interface MessageConsumer
interface and is annotated with @Consumer:

@Consumer("consumer1")
public class MyMessageConsumer implements MessageConsumer<Integer, String> {
    @Logging
    private Logger logger;

    @Override
    public void onMessage(ConsumerRecord<Integer, String> consumerRecord) {
        logger.debug("Key [{}], Value [{}]", consumerRecord.key(), consumerRecord.value());
    }

    @Override
    public void onException(Throwable cause) {
        logger.error(cause.getMessage(), cause);
    }
}

The @Consumer annotation takes a value matching to a configured consumer name.

Publishing messages

In any class, just inject a Kafka Producer Producer interface qualified with a matching configured producer name:

public class SomeClass {
    @Inject
    @Named("producer1")
    private Producer<Integer, String> producer;
}

To publish a message, use the send() method:

public class SomeClass {
    @Inject
    @Named("producer1")
    private Producer<Integer, String> producer;
    
    public void someMethod() throws InterruptedException, IOException {
        producer.send(new ProducerRecord<>("topic", "test"));
        producer.close();
    }
}

Streaming messages

To stream Kafka messages, create a stream class which implements the MessageStream interface and is annotated with @Stream:

@Stream("stream1")
public class MyMessageStream implements MessageStream<Integer, String> {
    @Logging
    private Logger logger;

    @Override
    public void onStream(KStream<Integer, String> stream) {
        stream.peek((key, value) -> {
            logger.info("Stream test: Key [{}], Value [{}]", key, value);
        }).to("topic");
    }

    @Override
    public void onException(Throwable cause) {
        logger.error(cause.getMessage(), cause);
    }
}

On this page


Edit this page