Getting started with Apache Kafka in Quarkus

Getting started with Apache Kafka in Quarkus

Roman Le

Why?

Quarkus is a Java framework optimized for cloud-native environments, while Kafka is a distributed streaming platform for building real-time data pipelines.

Using Quarkus and Kafka together enables developers to build high-performance, scalable, and reactive applications capable of processing and reacting to streaming data in real-time, while also benefiting from cloud-native deployment and fault-tolerant design.

Introduction to Quarkus

Quarkus is a Java framework optimized for cloud-native, serverless, and microservices architectures.

It offers features such as fast startup times, low memory footprint, seamless integration with popular Java libraries, support for reactive programming, and native Kubernetes support.

Quarkus enhances developer productivity, promotes efficient resource utilization, and simplifies deployment and management in containerized environments.

Introduction to Kafka

Kafka is an open-source distributed event streaming platform used for building real-time data pipelines and streaming applications.

It is designed to handle high-throughput, fault-tolerant, and scalable data processing.

Kafka follows a publish-subscribe messaging model, where producers publish messages to topics and consumers subscribe to topics to receive messages.

With its distributed architecture, durability, and persistence, Kafka enables real-time processing and analysis of data streams, making it suitable for a wide range of use cases including real-time analytics, log aggregation, and messaging systems.

Kafka Architecture


Setting up Kafka

 First, we need a Kafka broker. You can use the following docker-compose.yml file:

version: "3"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181
    ports:
      - 2181:2181
    networks:
      - kafka

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: |
        PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_ADVERTISED_LISTENERS: |
        PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    ports:
      - 29092:29092
    networks:
      - kafka

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - "9080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: kafka
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
    networks:
      - kafka

networks:
  kafka:
    name: kafka-networks

Generate Quarkus project

Go to https://code.quarkus.io/

Select extensions:

  • SmallRye Reactive Messaging - Kafka Connector
  • RESTEasy Jackson

Create Account.class

package org.acme;

public class Account {
    public String name;
    public String email;
}

When using SmallRye Reactive Messaging with Kafka (io.quarkus:quarkus-smallrye-reactive-messaging-kafka), Quarkus can often automatically detect the correct serializer and deserializer class. This autodetection is based on declarations of @Incoming and @Outgoing methods, as well as injected @Channel.

Next, we will create a producer to send messages to Kafka.

Create AccountProducer.class

package org.acme;

import io.smallrye.reactive.messaging.kafka.Record;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import java.util.UUID;

@ApplicationScoped
public class AccountProducer {
    @Inject
    @Channel("account-out")
    Emitter<Record<String, Account>> emitter;

    public void sendToKafka(Account account) {
        emitter.send(Record.of(UUID.randomUUID().toString(), account));
    }
}

In this, we use @Channel annotation to outgoing channel configuration. We inject an Emitter, an object responsible for sending a message to a channel. This emitter is attached to the acocunt-out channel (and so will send messages to Kafka). We are sending Record objects containing the UUID random as the key and the account object as the value.

Create AccountResource.class to trigger the producer.

package org.acme;

import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;

@Path("/account")
public class AccountResource {
    @Inject
    AccountProducer producer;

    @POST
    public Response send(Account account) {
        producer.sendToKafka(account);
        // Return an 202 - Accepted response.
        return Response.accepted().build();
    }
}

Create AccountConsumer.class to consume from Kafka.

package org.acme;

import io.smallrye.reactive.messaging.kafka.Record;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;

@ApplicationScoped
public class AccountConsumer {
    private final Logger logger = Logger.getLogger(AccountConsumer.class);

    @Incoming("account-in")
    public void receive(Record<String, Account> record) {
        logger.infof("Got a record %s: name: %s - email: %s ", record.key(), record.value().name, record.value().email);
    }
}

In this, we use @Incoming annotation to point to incoming channel configuration.

application.properties

# The Kafka broker location
kafka.bootstrap.servers=localhost:29092

# Configuring the incoming channel (reading from Kafka)
mp.messaging.incoming.account-in.connector=smallrye-kafka
mp.messaging.incoming.account-in.topic=account

# Configuring the outgoing channel (writing to Kafka)
mp.messaging.outgoing.account-out.connector=smallrye-kafka
mp.messaging.outgoing.account-out.topic=account

When you use Reactive Messaging, you send messages to a channel and receive them from another channel. These channels are mapped to the underlying messaging technology by configuration. In our application, we must indicate that our reception and publication channels will use the account Kafka channel.

The connector attribute indicates who’s responsible for this channel, here is the Kafka connector (smallrye-kafka).

We configure 2 channels

  • account-in: receive the messages
  • account-out: publishing the messages

Let's run the application

2024-03-24 12:07:21,750 INFO  [io.qua.sma.dep.processor] (build-59) Generating Jackson serializer for type org.acme.Account
__  ____  __  _____   ___  __ ____  ______ 
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/ 
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \   
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/   
2024-03-24 12:07:22,362 INFO  [io.sma.rea.mes.kafka] (Quarkus Main Thread) SRMSG18229: Configured topics for channel 'account-in': [account]
2024-03-24 12:07:22,511 INFO  [io.sma.rea.mes.kafka] (smallrye-kafka-producer-thread-0) SRMSG18258: Kafka producer kafka-producer-account-out, connected to Kafka brokers 'localhost:29092', is configured to write records to 'account'
2024-03-24 12:07:22,531 INFO  [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-0) SRMSG18257: Kafka consumer kafka-consumer-account-in, connected to Kafka brokers 'localhost:29092', belongs to the 'kafka-quarkus' consumer group and is configured to poll records from [account]
2024-03-24 12:07:22,577 INFO  [io.quarkus] (Quarkus Main Thread) kafka-quarkus 1.0.0-SNAPSHOT on JVM (powered by Quarkus 3.8.3) started in 1.570s. Listening on: http://localhost:8080
2024-03-24 12:07:22,577 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2024-03-24 12:07:22,577 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kafka-client, resteasy-reactive, resteasy-reactive-jackson, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]

Execute an API request:

curl --location 'localhost:8080/account' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name":"Roman",
    "email": "roman@test"
}'
2024-03-24 12:07:43,712 INFO  [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-3) SRMSG18256: Initialize record store for topic-partition 'account-0' at position 1.
2024-03-24 12:07:43,716 INFO  [org.acm.AccountConsumer] (vert.x-eventloop-thread-3) Got a record 381913af-35dd-43d6-bc91-60ec6d364a94: name: Roman - email: roman@test 

We just completed the getting started application about Kafka and Quarkus.

Reference: