Monday, 12 May 2025

Apache Kafka and Event-driven architecture (Kafka)

1.What is Kafka?
 
Apache Kafka is a distributed messaging system for high-throughput, fault-tolerant, real-time data streaming. It uses topics to publish/subscribe messages and is widely used for event-driven architecture.

2.Key Components in Kafka (Spring Boot)

Component                   Role
Producer                 Sends messages to Kafka topics
Consumer               Listens to Kafka topics
Topic                      Logical name for message stream
KafkaTemplate      Helper class to send messages
@KafkaListener    Annotation to consume messages
KafkaAdmin          Creates topics programmatically

3.Dependencies (pom.xml)

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

4.Configuration (application.properties)

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.transaction-id-prefix=tx-   # ๐Ÿ”‘ Required for transactions


5. Kafka Topic Creation with KafkaAdmin

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaTopicConfig {

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("my-topic")
                .partitions(3)
                .replicas(1)
                .build();
    }
}

6.Kafka Producer

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

7.Kafka Consumer

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void consume(String message) {
        System.out.println("Consumed message: " + message);
    }
}

8. REST Controller to Test Producer

import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api/kafka")
public class KafkaController {

    private final KafkaProducerService producerService;

    public KafkaController(KafkaProducerService producerService) {
        this.producerService = producerService;
    }

    @GetMapping("/send")
    public String sendMessage(@RequestParam String message) {
        producerService.sendMessage("my-topic", message);
        return "Message sent!";
    }
}

9.Kafka Annotations in Detail

Annotation                Purpose
@KafkaListener            Listens to Kafka topic; binds method to topic
@EnableKafka              Enables detection of @KafkaListener
@KafkaHandler             Used with @KafkaListener on class level for multi-method listeners
@KafkaListeners           Container for multiple @KafkaListener annotations on one method
@SendTo                   Sends reply to another topic from consumer
@KafkaListenerContainerFactoryCreates custom listener container for advanced settings
@Header                   Access Kafka headers (optional metadata)

10.Kafka Listener with Reply

@KafkaListener(topics = "request-topic", groupId = "my-group")
@SendTo("response-topic")
public String handleRequest(String message) {
    return "Response to: " + message;
}

Tips
  • Use @EnableKafka in a config class if @KafkaListener doesn’t seem to work.

  • You can send JSON objects by customizing the serializers/deserializers.

  • Use DLQ (Dead Letter Queue) for failed message handling.

  • For transactions, set spring.kafka.producer.transaction-id-prefix=tx-

-----------------------------

11.Enable Kafka Transactions in Configuration

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.*;
import org.springframework.transaction.annotation.EnableTransactionManagement;

@Configuration
@EnableTransactionManagement
public class KafkaConfig {

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
        kafkaTemplate.setTransactionalIdPrefix("tx-");
        return kafkaTemplate;
    }
}

12. Using Kafka Transactions in Code

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class TransactionalKafkaService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public TransactionalKafkaService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Transactional
    public void sendMessages() {
        kafkaTemplate.executeInTransaction(t -> {
            t.send("topic1", "message 1");
            t.send("topic2", "message 2");
            return true;
        });

        // OR just use this inside a @Transactional method
        // kafkaTemplate.send("topic1", "message 1");
        // kafkaTemplate.send("topic2", "message 2");
    }
}

Notes: 
a)If any send fails in a transaction, all others are rolled back.
b)The transaction-id-prefix must be unique per producer instance in a cluster.
c)Transactions require idempotent producer settings (handled automatically by Spring Boot when transaction-id-prefix is set).
-----------------------------------------------------------

Configurations on windows:

✅ Official Zookeeper Binary Download for Windows

๐Ÿ‘‰ [Download Zookeeper 3.9.2 (Latest Stable)]
๐Ÿ”— https://downloads.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz


๐Ÿงฐ How to Set Up Zookeeper on Windows

  1. Download the ZIP
    Use the link above to download apache-zookeeper-3.9.2-bin.zip.

  2. Extract the ZIP File
    Extract to a folder like: C:\zookeeper

  3. Create Configuration File
    Inside the extracted folder:

    • Go to conf folder

    • Copy zoo_sample.cfg and rename it to zoo.cfg

  4. Edit zoo.cfg
    Update or ensure the following lines are present:

    properties
    tickTime=2000
    dataDir=C:/zookeeper/data clientPort=2181 initLimit=5 syncLimit=2
  5. Create Data Directory
    Create the folder C:\zookeeper\data

  6. Start Zookeeper
    Open Command Prompt, go to the Zookeeper bin directory:

    cmd

    cd C:\zookeeper\bin zkServer.cmd
  7. Verify It’s Running
    In a second command prompt:

    cmd

    zkCli.cmd
-----------------------------------------------

๐Ÿงฑ 2. Apache Kafka

  • Version: Use a stable version like Kafka 3.6.x

  • Why: Core message broker required for producing and consuming messages

➡️ Download: https://kafka.apache.org/downloads

✅ Kafka Setup (Manual Steps):

  1. Extract Kafka ZIP/TGZ

  2. Start Zookeeper:

    bash

    bin/zookeeper-server-start.sh config/zookeeper.properties
  3. Start Kafka Broker:

    bash

    bin/kafka-server-start.sh config/server.properties

(Windows users use .bat files instead of .sh)


๐Ÿ“ฆ 3. Spring Boot

  • Why: Framework for building your REST API and Kafka integration

  • Tool: Use Spring Initializr to create the project

➡️ Create project: https://start.spring.io

Required Dependencies:

  • Spring Web

  • Spring Kafka

  • Spring Boot DevTools (optional)

  • Spring Boot Actuator (optional)


๐Ÿ˜ 4. Apache Maven or Gradle

  • Why: Build and dependency management tool

  • Command (to verify):

    bash

    mvn -v # OR gradle -v

➡️ Download Maven: https://maven.apache.org/download.cgi


๐Ÿ’ป 5. IDE (Optional but Recommended)

  • IntelliJ IDEA (with Spring plugin)

  • Eclipse with Spring Tools Suite (STS)

➡️ IntelliJ: https://www.jetbrains.com/idea/


๐Ÿงช 6. Postman or CURL

  • Why: To test REST endpoints exposed by your Kafka producer


๐Ÿณ Optional: Docker (for Kafka & Zookeeper)

  • If you want to avoid manual Kafka installation:

yaml
# docker-compose.yml (Sample) version: '3' services: zookeeper: image: confluentinc/cp-zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: image: confluentinc/cp-kafka ports: - "9092:9092" environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

➡️ Docker: https://www.docker.com/products/docker-desktop/


๐Ÿงฐ Summary: What to Install

ToolPurposeRequired
Java JDKRun Spring Boot apps
Apache KafkaMessaging system
ZookeeperKafka dependency
Spring BootFramework for building app
Maven/GradleDependency management
IDE (IntelliJ)Code editing & debugging๐ŸŸก Optional
PostmanTest REST APIs๐ŸŸก Optional
DockerRun Kafka locally via container๐ŸŸก Optional
---------------------------------------------------------

Event-Driven Architecture with Kafka in Java (Spring Boot)

๐Ÿ” What is Event-Driven Architecture (EDA)?

Event-Driven Architecture (EDA) is a design paradigm where services communicate by producing and consuming events, rather than making direct API calls. It's highly asynchronous, decoupled, and scalable.

Instead of saying "do this", services say "this happened" — and any interested component can respond.


๐Ÿงฉ Core Concepts in Kafka-Based EDA

ConceptDescription
EventA message representing a state change (e.g., "UserRegistered")
ProducerA service that sends (publishes) events to a Kafka topic
ConsumerA service that listens (subscribes) to events from a Kafka topic
TopicA category/feed name to which records are sent
BrokerA Kafka server that stores and manages messages
PartitionA way to split topic data for scaling
OffsetMessage ID used by consumers to track reading position

๐ŸŽฏ Benefits of EDA with Kafka

BenefitDescription
Loose couplingServices don't depend on each other
Scalability     Kafka handles millions of messages per second
ResilienceFailures in one service don’t crash the others
AsynchronousNon-blocking operations
AuditabilityEvents are durable and replayable

๐Ÿ”ง Example: User-Service ➡ Kafka ➡ Notification-Service

Scenario:

  • user-service emits a UserRegistered event

  • notification-service consumes this and sends a welcome email


✅ Step-by-Step with Spring Boot + Kafka

1. Add Dependencies

In pom.xml:

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>

2. Kafka Configuration (application.yml)

spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: notification-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer

3. Producer Code (UserService)

@Service
public class KafkaProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendUserRegisteredEvent(String message) { kafkaTemplate.send("user-registered-topic", message); } }

Usage:

java

producer.sendUserRegisteredEvent("{\"email\":\"test@example.com\"}");

4. Consumer Code (NotificationService)

@Service
public class KafkaConsumer { @KafkaListener(topics = "user-registered-topic", groupId = "notification-group") public void consume(String message) { System.out.println("Received UserRegistered event: " + message); // send email or SMS } }

5. Kafka Topic Configuration (Optional)

@Configuration public class KafkaTopicConfig { @Bean public NewTopic topic() { return TopicBuilder.name("user-registered-topic") .partitions(3) .replicas(1) .build(); } }

๐Ÿณ Docker Compose for Kafka

yaml

version: '3' services: zookeeper: image: confluentinc/cp-zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: image: confluentinc/cp-kafka ports: - "9092:9092" environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

✅ Summary

ComponentRole

user-service

Publishes UserRegistered event to Kafka
Kafka BrokerQueues and distributes the event
notification-serviceConsumes the event and acts (e.g., sends email)

------------------------------

3) Methods in Apache Kafka

Apache Kafka provides several core APIs with different sets of methods for producing, consuming, administering, and streaming data. Here's a detailed breakdown of important methods across the main Kafka classes:


๐Ÿ”‘ 1. KafkaProducer API (Producer Methods)

Used to send records to Kafka topics.

Common Methods:

MethodDescription
send(ProducerRecord<K,V>)Sends a record to a topic asynchronously.
send(ProducerRecord<K,V>, Callback)Sends a record with a callback to get acknowledgment.
flush()Forces all buffered records to be sent.
close()Closes the producer gracefully.

Example:
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value"); producer.send(record);

๐Ÿ“ฅ 2. KafkaConsumer API (Consumer Methods)

Used to poll and read messages from Kafka topics.

Common Methods:

MethodDescription
subscribe(Collection<String>)Subscribe to a list of topics.
poll(Duration)Polls for messages from the broker.
commitSync()Commits the current offset synchronously.
commitAsync()Commits the offset asynchronously.
seek(TopicPartition, long)Seek to a specific offset in a partition.
close()Closes the consumer.
assign(Collection<TopicPartition>)Assign specific partitions manually.
position(TopicPartition)Returns the current offset position.
seekToBeginning(Collection<TopicPartition>)Seeks to beginning of the partition.
seekToEnd(Collection<TopicPartition>)Seeks to end of the partition.

Example:
consumer.subscribe(Arrays.asList("my-topic")); ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); }

๐Ÿ› ️ 3. AdminClient API (Admin Operations)

Used to manage Kafka topics, brokers, etc.

Common Methods:

MethodDescription
createTopics(Collection<NewTopic>)Creates new topics.
listTopics()Lists all topics.
describeTopics(Collection<String>)Describes one or more topics.
deleteTopics(Collection<String>)Deletes topics.
describeCluster()Describes the Kafka cluster.

Example:
java
AdminClient admin = AdminClient.create(props);
admin.createTopics(Arrays.asList(new NewTopic("new-topic", 1, (short) 1)));

๐Ÿ”„ 4. Streams API (KafkaStreams)

Used for real-time stream processing.

Common Methods:

MethodDescription
start()Starts stream processing.
close()Stops stream processing.
cleanUp()Cleans local state stores.
state()Gets the current state of the KafkaStreams instance.

๐Ÿ“ฆ 5. Other Utility/Support Methods

ProducerRecord Methods:

MethodDescription
topic()Gets the topic name.
key()Gets the message key.
value()Gets the message value.
partition()Gets the assigned partition (can be null).

ConsumerRecord Methods:

MethodDescription
topic()Gets the topic name.
key()Gets the key.
value()Gets the value.
offset()Gets the offset.
partition()Gets the partition number.
timestamp()Gets the timestamp of the record.

✅ Summary by Component

Kafka ComponentKey ClassesImportant Methods
ProducerKafkaProducer, ProducerRecordsend(), flush(), close()
ConsumerKafkaConsumer, ConsumerRecordpoll(), subscribe(), commitSync()
AdminAdminClient, NewTopiccreateTopics(), listTopics()
StreamsKafkaStreamsstart(), close(), state()

4)poll() in kafka

In Apache Kafka, the poll() method is a key part of the Kafka Consumer API. It is used to fetch data from Kafka topics.

๐Ÿ” What is poll() in Kafka?

In Kafka, the consumer retrieves records from the broker using the poll() method. This is how the consumer gets new messages from the partitions it subscribes to.


๐Ÿ“˜ Syntax


ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  • consumer: An instance of KafkaConsumer.

  • poll(Duration): Tries to fetch data from Kafka for a given timeout (here, 100 milliseconds).

  • Returns a ConsumerRecords object, which is an iterable collection of ConsumerRecord.


๐Ÿ” Internals of poll()

  • Heartbeat: It also acts as a heartbeat to the Kafka broker to indicate the consumer is alive.

  • Rebalance Participation: It ensures the consumer participates in group rebalancing.

  • Offset Management: Kafka tracks the offset of consumed messages, and poll() plays a key role in this.

  • Fetching messages: Pulls batches of messages from the topic partitions assigned to the consumer.


๐Ÿ›‘ Important Notes

  1. Mandatory loop:
    You must call poll() continuously in a loop, or Kafka considers the consumer as dead.

    java
    while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
  2. Timeout value:

    • Shorter timeouts mean less waiting time but more CPU usage.

    • Longer timeouts might cause delays in rebalancing or consumer group heartbeats.

  3. Thread-safety:

    • poll() is not thread-safe.

    • Call it from one thread only per consumer instance.

  4. Auto Offset Commit:
    If enable.auto.commit=true, offsets are committed automatically after the poll.


๐Ÿง  Summary

AspectDescription
PurposeFetch records from Kafka
Return TypeConsumerRecords<K, V>
Needs LoopYes
Heartbeat to KafkaYes
Timeout ParamDefines how long to wait for new data
Auto Offset CommitHappens after poll() if enabled