1.What is Kafka?
Apache Kafka is a distributed messaging system for high-throughput, fault-tolerant, real-time data streaming. It uses topics to publish/subscribe messages and is widely used for event-driven architecture.
2.Key Components in Kafka (Spring Boot)
Component | Role |
---|
Producer | Sends messages to Kafka topics |
Consumer | Listens to Kafka topics |
Topic | Logical name for message stream |
KafkaTemplate | Helper class to send messages |
@KafkaListener | Annotation to consume messages |
KafkaAdmin | Creates topics programmatically |
pom.xml
)application.properties
)KafkaAdmin
import org.apache.kafka.clients.admin.NewTopic;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.config.TopicBuilder;
@Configurationpublic class KafkaTopicConfig {
@Bean public NewTopic topic() { return TopicBuilder.name("my-topic") .partitions(3) .replicas(1) .build(); }}
6.Kafka Producer
import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Service;
@Servicepublic class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; }
public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); }}
7.Kafka Consumer
import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Service;
@Servicepublic class KafkaConsumerService {
@KafkaListener(topics = "my-topic", groupId = "my-group") public void consume(String message) { System.out.println("Consumed message: " + message); }}
8. REST Controller to Test Producer
import org.springframework.web.bind.annotation.*;
@RestController@RequestMapping("/api/kafka")public class KafkaController {
private final KafkaProducerService producerService;
public KafkaController(KafkaProducerService producerService) { this.producerService = producerService; }
@GetMapping("/send") public String sendMessage(@RequestParam String message) { producerService.sendMessage("my-topic", message); return "Message sent!"; }}
9.Kafka Annotations in Detail
Annotation Purpose
@KafkaListener
Listens to Kafka topic; binds method to topic
@EnableKafka
Enables detection of @KafkaListener
@KafkaHandler
Used with @KafkaListener
on class level for multi-method listeners
@KafkaListeners
Container for multiple @KafkaListener
annotations on one method
@SendTo
Sends reply to another topic from consumer
@KafkaListenerContainerFactory
Creates custom listener container for advanced settings
@Header
Access Kafka headers (optional metadata)
10.Kafka Listener with Reply
@KafkaListener(topics = "request-topic", groupId = "my-group")@SendTo("response-topic")public String handleRequest(String message) { return "Response to: " + message;}
TipsUse @EnableKafka in a config class if @KafkaListener
doesn’t seem to work.
You can send JSON objects by customizing the serializers/deserializers.
Use DLQ (Dead Letter Queue) for failed message handling.
For transactions, set spring.kafka.producer.transaction-id-prefix=tx-
-----------------------------
11.Enable Kafka Transactions in Configuration
import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.core.*;import org.springframework.transaction.annotation.EnableTransactionManagement;
@Configuration@EnableTransactionManagementpublic class KafkaConfig {
@Bean public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) { KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory); kafkaTemplate.setTransactionalIdPrefix("tx-"); return kafkaTemplate; }}
12. Using Kafka Transactions in Code
import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;
@Servicepublic class TransactionalKafkaService {
private final KafkaTemplate<String, String> kafkaTemplate;
public TransactionalKafkaService(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; }
@Transactional public void sendMessages() { kafkaTemplate.executeInTransaction(t -> { t.send("topic1", "message 1"); t.send("topic2", "message 2"); return true; });
// OR just use this inside a @Transactional method // kafkaTemplate.send("topic1", "message 1"); // kafkaTemplate.send("topic2", "message 2"); }}
Notes: a)If any send fails in a transaction, all others are rolled back.b)The transaction-id-prefix
must be unique per producer instance in a cluster.c)Transactions require idempotent producer settings (handled automatically by Spring Boot when transaction-id-prefix
is set).-----------------------------------------------------------Configurations on windows:
✅ Official Zookeeper Binary Download for Windows
๐ [Download Zookeeper 3.9.2 (Latest Stable)]
๐ https://downloads.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz
๐งฐ How to Set Up Zookeeper on Windows
-
Download the ZIP
Use the link above to download apache-zookeeper-3.9.2-bin.zip
.
-
Extract the ZIP File
Extract to a folder like: C:\zookeeper
-
Create Configuration File
Inside the extracted folder:
-
Go to conf
folder
-
Copy zoo_sample.cfg
and rename it to zoo.cfg
-
Edit zoo.cfg
Update or ensure the following lines are present:
-
Create Data Directory
Create the folder C:\zookeeper\data
-
Start Zookeeper
Open Command Prompt, go to the Zookeeper bin directory:
-
Verify It’s Running
In a second command prompt:
-----------------------------------------------๐งฑ 2. Apache Kafka
-
Version: Use a stable version like Kafka 3.6.x
-
Why: Core message broker required for producing and consuming messages
➡️ Download: https://kafka.apache.org/downloads
✅ Kafka Setup (Manual Steps):
-
Extract Kafka ZIP/TGZ
-
Start Zookeeper:
-
Start Kafka Broker:
(Windows users use .bat
files instead of .sh
)
๐ฆ 3. Spring Boot
-
Why: Framework for building your REST API and Kafka integration
-
Tool: Use Spring Initializr to create the project
➡️ Create project: https://start.spring.io
Required Dependencies:
-
Spring Web
-
Spring Kafka
-
Spring Boot DevTools (optional)
-
Spring Boot Actuator (optional)
๐ 4. Apache Maven or Gradle
-
Why: Build and dependency management tool
-
Command (to verify):
➡️ Download Maven: https://maven.apache.org/download.cgi
๐ป 5. IDE (Optional but Recommended)
-
IntelliJ IDEA (with Spring plugin)
-
Eclipse with Spring Tools Suite (STS)
➡️ IntelliJ: https://www.jetbrains.com/idea/
๐งช 6. Postman or CURL
-
Why: To test REST endpoints exposed by your Kafka producer
๐ณ Optional: Docker (for Kafka & Zookeeper)
-
If you want to avoid manual Kafka installation:
➡️ Docker: https://www.docker.com/products/docker-desktop/
๐งฐ Summary: What to Install
Tool Purpose Required Java JDK Run Spring Boot apps ✅ Apache Kafka Messaging system ✅ Zookeeper Kafka dependency ✅ Spring Boot Framework for building app ✅ Maven/Gradle Dependency management ✅ IDE (IntelliJ) Code editing & debugging ๐ก Optional Postman Test REST APIs ๐ก Optional Docker Run Kafka locally via container ๐ก Optional
---------------------------------------------------------
✅ Event-Driven Architecture with Kafka in Java (Spring Boot)๐ What is Event-Driven Architecture (EDA)?
Event-Driven Architecture (EDA) is a design paradigm where services communicate by producing and consuming events, rather than making direct API calls. It's highly asynchronous, decoupled, and scalable.
Instead of saying "do this", services say "this happened" — and any interested component can respond.
๐งฉ Core Concepts in Kafka-Based EDA
Concept Description Event A message representing a state change (e.g., "UserRegistered") Producer A service that sends (publishes) events to a Kafka topic Consumer A service that listens (subscribes) to events from a Kafka topic Topic A category/feed name to which records are sent Broker A Kafka server that stores and manages messages Partition A way to split topic data for scaling Offset Message ID used by consumers to track reading position
๐ฏ Benefits of EDA with Kafka
Benefit Description
Loose coupling Services don't depend on each other Scalability Kafka handles millions of messages per second Resilience Failures in one service don’t crash the others Asynchronous Non-blocking operations Auditability Events are durable and replayable
๐ง Example: User-Service ➡ Kafka ➡ Notification-Service
Scenario:
-
user-service
emits a UserRegistered
event
-
notification-service
consumes this and sends a welcome email
✅ Step-by-Step with Spring Boot + Kafka
1. Add Dependencies
In pom.xml
:
2. Kafka Configuration (application.yml)
3. Producer Code (UserService)
Usage:
4. Consumer Code (NotificationService)
5. Kafka Topic Configuration (Optional)
๐ณ Docker Compose for Kafka
✅ Summary
Component Role
user-service
Publishes UserRegistered
event to KafkaKafka Broker Queues and distributes the event notification-service
Consumes the event and acts (e.g., sends email)
------------------------------3) Methods in Apache Kafka
Apache Kafka provides several core APIs with different sets of methods for producing, consuming, administering, and streaming data. Here's a detailed breakdown of important methods across the main Kafka classes:
๐ 1. KafkaProducer API (Producer Methods)
Used to send records to Kafka topics.
Common Methods:
Method Description send(ProducerRecord<K,V>)
Sends a record to a topic asynchronously. send(ProducerRecord<K,V>, Callback)
Sends a record with a callback to get acknowledgment. flush()
Forces all buffered records to be sent. close()
Closes the producer gracefully.
Example:
๐ฅ 2. KafkaConsumer API (Consumer Methods)
Used to poll and read messages from Kafka topics.
Common Methods:
Method Description subscribe(Collection<String>)
Subscribe to a list of topics. poll(Duration)
Polls for messages from the broker. commitSync()
Commits the current offset synchronously. commitAsync()
Commits the offset asynchronously. seek(TopicPartition, long)
Seek to a specific offset in a partition. close()
Closes the consumer. assign(Collection<TopicPartition>)
Assign specific partitions manually. position(TopicPartition)
Returns the current offset position. seekToBeginning(Collection<TopicPartition>)
Seeks to beginning of the partition. seekToEnd(Collection<TopicPartition>)
Seeks to end of the partition.
Example:
๐ ️ 3. AdminClient API (Admin Operations)
Used to manage Kafka topics, brokers, etc.
Common Methods:
Method Description createTopics(Collection<NewTopic>)
Creates new topics. listTopics()
Lists all topics. describeTopics(Collection<String>)
Describes one or more topics. deleteTopics(Collection<String>)
Deletes topics. describeCluster()
Describes the Kafka cluster.
Example:
๐ 4. Streams API (KafkaStreams)
Used for real-time stream processing.
Common Methods:
Method Description start()
Starts stream processing. close()
Stops stream processing. cleanUp()
Cleans local state stores. state()
Gets the current state of the KafkaStreams instance.
๐ฆ 5. Other Utility/Support Methods
ProducerRecord Methods:
Method Description topic()
Gets the topic name. key()
Gets the message key. value()
Gets the message value. partition()
Gets the assigned partition (can be null).
ConsumerRecord Methods:
Method Description topic()
Gets the topic name. key()
Gets the key. value()
Gets the value. offset()
Gets the offset. partition()
Gets the partition number. timestamp()
Gets the timestamp of the record.
✅ Summary by Component
Kafka Component Key Classes Important Methods Producer KafkaProducer
, ProducerRecord
send()
, flush()
, close()
Consumer KafkaConsumer
, ConsumerRecord
poll()
, subscribe()
, commitSync()
Admin AdminClient
, NewTopic
createTopics()
, listTopics()
Streams KafkaStreams
start()
, close()
, state()
No comments:
Post a Comment