Apache Kafka Apache Kafka 是一個分佈式流處理平台,主要用於建立實時的數據管道和流應用程序。它具有高吞吐量、可擴展性和容錯性等特點。Spring 提供了對 Kafka 的支持,通過 Spring Kafka,可以在 Spring 應用中方便地使用 Kafka 進行消息的生產和消費
配置 pom.xml 要在 Spring 應用中使用 Kafka,首先需要在 pom.xml
文件中添加相關的依賴:
1 2 3 4 5 <dependency > <groupId > org.springframework.kafka</groupId > <artifactId > spring-kafka</artifactId > </dependency >
application.properties 1 2 3 4 5 6 7 8 9 10 11 spring.kafka.bootstrap-servers =localhost:9092 spring.kafka.consumer.group-id =my-group spring.kafka.consumer.auto-offset-reset =earliest spring.kafka.consumer.key-deserializer =org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer =org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.key-serializer =org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer =org.apache.kafka.common.serialization.StringSerializer
Kafka Topic 主題 Kafka 主題(Topic)是消息的分類單位,用於將消息發送到不同的主題中。可以通過 Kafka 命令行工具創建主題:
1 2 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic
也可以通過 Spring Kafka 在應用中創建主題:
1 2 3 4 5 6 7 8 9 10 11 12 13 import org.apache.kafka.clients.admin.NewTopic;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration public class KafkaTopicConfig { @Bean public NewTopic myTopic () { return new NewTopic ("my-topic" , 1 , (short ) 1 ); } }
Kafka Producer 生產者 Kafka 生產者用於發送消息到 Kafka 集群中的主題(Topic)。Spring Kafka 提供了 KafkaTemplate
類,用於發送消息到 Kafka 集群中的主題。以下是一個簡單的 Kafka 生產者示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Service;@Service public class KafkaProducerService { private final KafkaTemplate<String, String> kafkaTemplate; public KafkaProducerService (KafkaTemplate<String, String> kafkaTemplate) { this .kafkaTemplate = kafkaTemplate; } public void sendMessage (String message) { kafkaTemplate.send("my-topic" , message); } }
Kafka Consumer 消費者 1 2 3 4 5 6 7 8 9 10 11 import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Component public class KafkaConsumer { @KafkaListener(topics = "my-topic", groupId = "my-group") public void listen (String message) { System.out.println("Received Message: " + message); } }
Controller 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;@RestController public class MessageController { private final KafkaProducerService producerService; public MessageController (KafkaProducerService producerService) { this .producerService = producerService; } @GetMapping("/send") public String sendMessage (@RequestParam("message") String message) { producerService.sendMessage(message); return "Message sent to Kafka" ; } }
原理解析 KafkaTemplate
Spring 提供的高級抽象,用於簡化 Kafka 消息的發送。
支持同步和異步的消息發送方式。
@KafkaListener
用於標註方法,讓其成為 Kafka 消費者。
可以指定主題、消費者組等參數。
序列化和反序列化
Kafka 中的消息是以 byte[] 的形式存儲的,因此需要將對象序列化為 byte[],或將 byte[] 反序列化為對象。
Spring Kafka 提供了一些內置的序列化器和反序列化器,如 StringSerializer
、StringDeserializer
等。
主題(Topic)管理
可以通過 Kafka 命令行工具或 Spring Kafka 在應用中創建主題。
可以通過 Spring 的配置自動創建主題。