Getting started with Apache Kafka in Quarkus
Why?
Quarkus is a Java framework optimized for cloud-native environments, while Kafka is a distributed streaming platform for building real-time data pipelines.
Using Quarkus and Kafka together enables developers to build high-performance, scalable, and reactive applications capable of processing and reacting to streaming data in real-time, while also benefiting from cloud-native deployment and fault-tolerant design.
Introduction to Quarkus
Quarkus is a Java framework optimized for cloud-native, serverless, and microservices architectures.
It offers features such as fast startup times, low memory footprint, seamless integration with popular Java libraries, support for reactive programming, and native Kubernetes support.
Quarkus enhances developer productivity, promotes efficient resource utilization, and simplifies deployment and management in containerized environments.
Introduction to Kafka
Kafka is an open-source distributed event streaming platform used for building real-time data pipelines and streaming applications.
It is designed to handle high-throughput, fault-tolerant, and scalable data processing.
Kafka follows a publish-subscribe messaging model, where producers publish messages to topics and consumers subscribe to topics to receive messages.
With its distributed architecture, durability, and persistence, Kafka enables real-time processing and analysis of data streams, making it suitable for a wide range of use cases including real-time analytics, log aggregation, and messaging systems.
Setting up Kafka
First, we need a Kafka broker. You can use the following docker-compose.yml file:
version: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
- ZOOKEEPER_CLIENT_PORT=2181
ports:
- 2181:2181
networks:
- kafka
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: |
PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_ADVERTISED_LISTENERS: |
PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
ports:
- 29092:29092
networks:
- kafka
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "9080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: kafka
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
networks:
- kafka
networks:
kafka:
name: kafka-networks
Generate Quarkus project
Go to https://code.quarkus.io/
Select extensions:
- SmallRye Reactive Messaging - Kafka Connector
- RESTEasy Jackson
Create Account.class
package org.acme;
public class Account {
public String name;
public String email;
}
When using SmallRye Reactive Messaging with Kafka (io.quarkus:quarkus-smallrye-reactive-messaging-kafka
), Quarkus can often automatically detect the correct serializer and deserializer class. This autodetection is based on declarations of @Incoming
and @Outgoing
methods, as well as injected @Channel
.
Next, we will create a producer to send messages to Kafka.
Create AccountProducer.class
package org.acme;
import io.smallrye.reactive.messaging.kafka.Record;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import java.util.UUID;
@ApplicationScoped
public class AccountProducer {
@Inject
@Channel("account-out")
Emitter<Record<String, Account>> emitter;
public void sendToKafka(Account account) {
emitter.send(Record.of(UUID.randomUUID().toString(), account));
}
}
In this, we use @Channel annotation to outgoing channel configuration. We inject an Emitter
, an object responsible for sending a message to a channel. This emitter is attached to the acocunt-out
channel (and so will send messages to Kafka). We are sending Record
objects containing the UUID random as the key and the account object as the value.
Create AccountResource.class to trigger the producer.
package org.acme;
import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;
@Path("/account")
public class AccountResource {
@Inject
AccountProducer producer;
@POST
public Response send(Account account) {
producer.sendToKafka(account);
// Return an 202 - Accepted response.
return Response.accepted().build();
}
}
Create AccountConsumer.class to consume from Kafka.
package org.acme;
import io.smallrye.reactive.messaging.kafka.Record;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;
@ApplicationScoped
public class AccountConsumer {
private final Logger logger = Logger.getLogger(AccountConsumer.class);
@Incoming("account-in")
public void receive(Record<String, Account> record) {
logger.infof("Got a record %s: name: %s - email: %s ", record.key(), record.value().name, record.value().email);
}
}
In this, we use @Incoming annotation to point to incoming channel configuration.
application.properties
# The Kafka broker location
kafka.bootstrap.servers=localhost:29092
# Configuring the incoming channel (reading from Kafka)
mp.messaging.incoming.account-in.connector=smallrye-kafka
mp.messaging.incoming.account-in.topic=account
# Configuring the outgoing channel (writing to Kafka)
mp.messaging.outgoing.account-out.connector=smallrye-kafka
mp.messaging.outgoing.account-out.topic=account
When you use Reactive Messaging, you send messages to a channel and receive them from another channel. These channels are mapped to the underlying messaging technology by configuration. In our application, we must indicate that our reception and publication channels will use the account Kafka channel.
The connector attribute indicates who’s responsible for this channel, here is the Kafka connector (smallrye-kafka).
We configure 2 channels
- account-in: receive the messages
- account-out: publishing the messages
Let's run the application
2024-03-24 12:07:21,750 INFO [io.qua.sma.dep.processor] (build-59) Generating Jackson serializer for type org.acme.Account
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2024-03-24 12:07:22,362 INFO [io.sma.rea.mes.kafka] (Quarkus Main Thread) SRMSG18229: Configured topics for channel 'account-in': [account]
2024-03-24 12:07:22,511 INFO [io.sma.rea.mes.kafka] (smallrye-kafka-producer-thread-0) SRMSG18258: Kafka producer kafka-producer-account-out, connected to Kafka brokers 'localhost:29092', is configured to write records to 'account'
2024-03-24 12:07:22,531 INFO [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-0) SRMSG18257: Kafka consumer kafka-consumer-account-in, connected to Kafka brokers 'localhost:29092', belongs to the 'kafka-quarkus' consumer group and is configured to poll records from [account]
2024-03-24 12:07:22,577 INFO [io.quarkus] (Quarkus Main Thread) kafka-quarkus 1.0.0-SNAPSHOT on JVM (powered by Quarkus 3.8.3) started in 1.570s. Listening on: http://localhost:8080
2024-03-24 12:07:22,577 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2024-03-24 12:07:22,577 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kafka-client, resteasy-reactive, resteasy-reactive-jackson, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]
Execute an API request:
curl --location 'localhost:8080/account' \
--header 'Content-Type: application/json' \
--data-raw '{
"name":"Roman",
"email": "roman@test"
}'
2024-03-24 12:07:43,712 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-3) SRMSG18256: Initialize record store for topic-partition 'account-0' at position 1.
2024-03-24 12:07:43,716 INFO [org.acm.AccountConsumer] (vert.x-eventloop-thread-3) Got a record 381913af-35dd-43d6-bc91-60ec6d364a94: name: Roman - email: roman@test
We just completed the getting started application about Kafka and Quarkus.
Reference: