springboot中的消息队列和用法

张开发
2026/4/12 1:51:49 15 分钟阅读

分享文章

springboot中的消息队列和用法
消息队列简介消息队列是一种异步通信机制允许应用程序通过发送和接收消息进行解耦。在Spring Boot中常用的消息队列实现包括RabbitMQ、Apache Kafka和ActiveMQ。集成RabbitMQRabbitMQ是一个开源消息代理支持多种消息协议。在Spring Boot中集成RabbitMQ需要添加依赖dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-amqp/artifactId /dependency配置RabbitMQ连接信息spring.rabbitmq.hostlocalhost spring.rabbitmq.port5672 spring.rabbitmq.usernameguest spring.rabbitmq.passwordguest创建消息生产者Service public class MessageProducer { Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String exchange, String routingKey, String message) { rabbitTemplate.convertAndSend(exchange, routingKey, message); } }创建消息消费者Component public class MessageConsumer { RabbitListener(queues queue.name) public void receiveMessage(String message) { System.out.println(Received message: message); } }集成Apache KafkaKafka是一个分布式流处理平台适合处理高吞吐量数据。添加Kafka依赖dependency groupIdorg.springframework.kafka/groupId artifactIdspring-kafka/artifactId /dependency配置Kafka连接信息spring.kafka.bootstrap-serverslocalhost:9092 spring.kafka.consumer.group-idgroup_id spring.kafka.consumer.auto-offset-resetearliest创建Kafka生产者Service public class KafkaProducer { Autowired private KafkaTemplateString, String kafkaTemplate; public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); } }创建Kafka消费者Component public class KafkaConsumer { KafkaListener(topics topic.name, groupId group_id) public void listen(String message) { System.out.println(Received message: message); } }消息队列使用场景消息队列适用于异步处理、应用解耦、流量削峰等场景。例如在电商系统中订单创建后通过消息队列通知库存系统减库存而不是直接调用库存服务。消息确认机制为了保证消息可靠传递Spring Boot支持消息确认机制。在RabbitMQ中配置生产者确认spring.rabbitmq.publisher-confirmstrue spring.rabbitmq.publisher-returnstrue在Kafka中配置手动提交偏移量spring.kafka.consumer.enable-auto-commitfalse消息序列化默认情况下Spring Boot使用SimpleMessageConverter进行消息序列化。可以自定义消息转换器Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); }错误处理实现ErrorHandler接口可以处理消息消费过程中的异常Bean public ErrorHandler errorHandler() { return (e, record) - { // 自定义错误处理逻辑 }; }性能优化对于高吞吐量场景可以调整消费者并发数量spring.kafka.listener.concurrency3RabbitMQ中可以预取消息数量spring.rabbitmq.listener.simple.prefetch10消息队列实现票务系统// 配置RabbitMQ连接 Configuration public class RabbitMQConfig { Value(${spring.rabbitmq.host}) private String host; Value(${spring.rabbitmq.port}) private int port; Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); return connectionFactory; } Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } }// 定义票务消息DTO Data AllArgsConstructor NoArgsConstructor public class TicketMessage { private String ticketId; private String eventName; private String userId; private LocalDateTime purchaseTime; private int quantity; }// 创建消息生产者服务 Service public class TicketProducer { private static final String TICKET_QUEUE ticket.queue; Autowired private RabbitTemplate rabbitTemplate; public void sendTicketPurchase(TicketMessage ticketMessage) { rabbitTemplate.convertAndSend(TICKET_QUEUE, ticketMessage); System.out.println(Sent ticket purchase message: ticketMessage); } }// 创建消息消费者服务 Service public class TicketConsumer { private static final String TICKET_QUEUE ticket.queue; RabbitListener(queues TICKET_QUEUE) public void receiveTicketPurchase(TicketMessage ticketMessage) { System.out.println(Received ticket purchase message: ticketMessage); // 处理票务逻辑更新库存 processTicket(ticketMessage); } private void processTicket(TicketMessage ticket) { // 实现票务处理逻辑 减库存插数据 } }// 创建队列配置 Configuration public class TicketQueueConfig { Bean public Queue ticketQueue() { return new Queue(ticket.queue, true); } }异常处理// 添加异常处理器 Component public class RabbitMQErrorHandler implements RabbitListenerErrorHandler { Override public Object handleError(Message amqpMessage, org.springframework.messaging.Message? message, ListenerExecutionFailedException exception) { System.err.println(Error processing message: exception.getMessage()); // 可添加重试或死信队列逻辑 return null; } }测试代码// 测试消息发送 SpringBootTest public class TicketSystemTest { Autowired private TicketProducer ticketProducer; Test public void testSendTicketMessage() { TicketMessage message new TicketMessage( TICKET-12345, Concert, USER-1001, LocalDateTime.now(), 2 ); ticketProducer.sendTicketPurchase(message); } }

更多文章