Are you looking to integrate a messaging queue with the Spring Boot application? There are a lot of messaging queue applications and among them here I have listed which are used frequently.
- Apache Kafka – It is a popular distributed streaming platform and fault-tolerant stream processing system.
- Rabbit MQ – It is a message-queueing software known as a message broker or queue manager. Simply; it is software where queues are defined, to which applications connect to transfer a message or messages.
- Spring AMQP (Advanced Messaging Queue Protocol) – It provides a “template” as a high-level abstraction for sending and receiving messages. It also provides support for Message-driven POJOs with a “listener container”.
- Azure Bus – It is an asynchronous messaging cloud platform enabling you to send data between decoupled systems. Microsoft offers this feature as a service, which means that you don’t need to host your hardware to use it
- Amazon Simple Queue Service (Amazon SQS) – It allows you to send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be available.
This article provides a step-by-step guide for integrating Apache Kafka into a Spring Boot application.
Prerequisites
To start building a Spring Boot and Kafka application, you will need to have the following things:
- Java Development Kit (JDK) 8 or later installed.
- Basic knowledge of Maven and Spring Boot.
- A suitable IDE, such as STS(Spring Tool Suite), Eclipse, or IntelliJ IDEA.
- Apache Kafka and Zookeeper are installed and running on your local machine. (To install and Run Kafka, please refer to the guide here.)
Setting Up Your Spring Boot Project
In this example application, we will create a Spring Boot application using Maven.
You can create the project using Spring Initializer, or you can manually add the following dependencies in your Maven pom.xml file.
<!-- Dependency for spring starter web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Dependency for Apache Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<!-- Dependency for spring and Kafka integration -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Configuring Kafka in Spring Boot
This Application assumes that the server is started using the default configuration and that no server ports are changed.
Add a property “spring.kafka.bootstrap-servers“ and specify the address of your Kafka broker in the application.properties file as the following:
spring.kafka.bootstrap-servers=localhost:9092
Configuring Kafka Producer
To produce and send messages to a Kafka Broker(Server), we need to create a Kafka producer.
To create a Kafka Producer, follow the following steps.
- Create a new package called com.example.kafka.config
- Create class KafkaProducerConfig as shown below.
- ProducerFactory sets the strategy for creating Kafka Producer instances to create Kafka messages (events).
- KafkaTemplate wraps a Producer instance and provides suitable methods for sending messages to Kafka topics.
package com.kafka.example.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.apache.kafka.common.serialization.StringSerializer;
@Configurable
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap.servers}")
String bootstrapServers;
public Map<String, Object> producerConfig(){
Map<String, Object> properties = new HashMap();
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
//bootstrapServers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
return properties;
}
@Bean
public ProducerFactory<String, String> producerFactory(){
return new DefaultKafkaProducerFactory<>(producerConfig());
}
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory){
return new KafkaTemplate<String, String>(producerFactory);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return kafkaTemplate(producerFactory());
}
@Autowired
private KafkaTemplate<String, String> kafkaTemplate = kafkaTemplate();
public void sendMessage(String msg) {
kafkaTemplate.send("test-topic", msg);
}
}
Producer instances are thread-safe. So, using a single instance throughout an application context will give higher performance. Corresponding, KafkaTemplate instances are also thread-safe, and one instance is recommended.
Configuring Kafka Consumer
To receive and consume messages from the Kafka Broker (Server), we need to create a Kafka Consumer.
- We need to configure a ConsumerFactory and a KafkaListenerContainerFactory.
- Once these beans are available in the Spring Bean factory, POJO-based consumers can be configured using @KafkaListener annotation.
- @EnableKafka annotation is required on the configuration class to enable the detection of @KafkaListener annotation on spring-managed beans.
To create a Kafka Consumer, follow the following steps.
- Create a new package called com.example.kafka.config
- Create class KafkaConsumerConfig as shown below.
package com.kafka.example.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import com.fasterxml.jackson.databind.ser.std.StringSerializer;
@EnableKafka
@Configurable
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap.servers}")
String bootstrapServers;
public Map<String, Object> consumerConfig(){
Map<String, Object> properties = new HashMap();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
return properties;
}
@Bean
public ConsumerFactory<String, String> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@KafkaListener(topics = "test-topic",groupId = "test-group")
void listener(String data) {
System.out.println("Listener received: " + data);
}
}
We can implement multiple listeners for a topic, each with a different group ID. Furthermore, one consumer can listen for messages from various topics:
@KafkaListener(topics = "test-topic, test-topic2", groupId = "test-group")
Testing the Kafka Integration
Finally, let’s test our Kafka integration by sending and receiving messages. Create a new REST controller in the com.example.kafka.controller package:
package com.kafka.example.controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.kafka.example.config.KafkaProducerConfig;
@RestController
@RequestMapping(path = "/topicSender")
public class TopicSenderController {
KafkaProducerConfig producerConfig = new KafkaProducerConfig();
@PostMapping("/sendTopic")
public void sendTopic() {
producerConfig.sendMessage("This is test topic send by sender");
}
}
Run your Spring Boot application and use a tool like Postman to send a POST request to http://localhost:8080/topicSender/sendTopic. The message will be sent to the Kafka topic, and the consumer will receive and print the message to the console.
Conclusion
In this article, we demonstrated how to integrate Apache Kafka into the Spring Boot application by implementing produce/consume messages using the producer and consumer with sample code and step-by-step instructions.