从零开始:Java原生连接RabbitMQ完整流程(个人学习笔记001)
TOC)该文章仅用于个人复习与记录如有错误烦请指出非常感谢RabbitMQ 是一款开源的消息中间件也称为消息队列其核心作用是让不同的系统、服务或组件之间能够异步地传递数据。使用该中间件可以很好地实现业务异步处理、削峰填谷、服务解耦以及数据同步。本文主要讲解如何使用原生 Java 集成 RabbitMQ。由于主播目前仍处于学习阶段本文不涉及深层原理仅说明基本的使用方法。在进行学习和测试之前需要先通过 Maven 导入 RabbitMQ 的依赖并使用 Docker 拉取 RabbitMQ 的镜像以确保 Java 程序能够与 RabbitMQ 正常连接。Maven 依赖xmldependency groupIdcom.rabbitmq/groupId artifactIdamqp-client/artifactId version5.30.0/version /dependencyDocker 命令bash# 拉取镜像 docker pull rabbitmq:management # 创建并运行容器 docker run -d --name rabbitmq -p 5672:5672 rabbitmq:management接下来就可以愉快地学习 RabbitMQ 了。一、基本消息队列点对点模式RabbitMQ 中的组件并不多在本模式中只讲解以下几种生产者、消费者、队列。生产者消息的源头负责发送消息。消费者消息的终点负责接收并消费处理消息。队列消息实际存储的容器等待消费者将其取走。下面先给出一段最简单的生产者和消费者代码然后分段解释其作用。生产者代码javapublic class Send { private final static String QUEUE_NAME hello; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory new ConnectionFactory(); factory.setHost(localhost); try (Connection connection factory.newConnection()) { Channel channel connection.createChannel(); // 指定队列类型 MapString, Object map Map.of(x-queue-type, quorum); channel.queueDeclare(QUEUE_NAME, true, false, false, map); String message Hello,World!; // 发送消息 channel.basicPublish(, QUEUE_NAME, null, message.getBytes()); System.out.println(发送消息成功 message ); } } }代码分段解释javaConnectionFactory factory new ConnectionFactory(); factory.setHost(localhost);ConnectionFactory是用于创建 Java 程序与 RabbitMQ 之间连接的工厂类。setHost()方法用于指定 RabbitMQ 服务器的地址使 Java 程序能够连接到 RabbitMQ。此外还可以设置用户名、密码等其他属性。javatry (Connection connection factory.newConnection()) { Channel channel connection.createChannel(); // 指定队列类型 MapString, Object map Map.of(x-queue-type, quorum); channel.queueDeclare(QUEUE_NAME, true, false, false, map); String message Hello,World!; // 发送消息 channel.basicPublish(, QUEUE_NAME, null, message.getBytes()); System.out.println(发送消息成功 message ); }这里使用的是try-with-resources语法而非传统的try-catch。当退出try代码块时无论以何种方式Java 程序与 RabbitMQ 的连接会自动关闭。Connection代表 Java 程序与 RabbitMQ 之间的连接由factory直接创建。Channel是在Connection中开辟的通道一个连接中可以创建多个通道每个通道中也可以创建多个队列。Map.of()是一个静态工厂方法用于创建包含指定键值对的Map。这里用于指定队列的类型。队列类型分为classic和quorumRabbitMQ 官方推荐使用quorum。如果创建队列时不指定类型则默认创建classic队列。queueDeclare()方法用于在通道中声明一个队列其参数含义如下参数顺序含义说明1队列名称队列的唯一标识2是否持久化开启后RabbitMQ 重启后队列不会消失3是否独占开启后队列只能在当前连接中使用4是否自动删除开启后当队列中的最后一个消费者取消订阅时队列会被自动删除5扩展参数用于配置队列的高级特性queueDeclare()的幂等性如果队列已存在且参数一致则什么也不做。如果队列已存在但参数不一致则会抛出异常。channel.basicPublish()方法用于将消息发布到指定的队列中其参数含义如下参数顺序含义说明1交换机名称空字符串代表使用默认的无名交换机2队列名称或路由键消息要传递的目的地此处传入队列名称3消息属性例如持久化、优先级、过期时间等4消息体消息内容通常以字节数组形式传递生产者端的流程总结创建连接工厂 → 建立连接 → 创建通道 → 声明队列 → 发布消息消费者代码javapublic class Recv { private final static String QUEUE_NAME hello; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory new ConnectionFactory(); factory.setHost(localhost); Connection connection factory.newConnection(); Channel channel connection.createChannel(); // 指定队列类型 MapString, Object map Map.of(x-queue-type, quorum); channel.queueDeclare(QUEUE_NAME, true, false, false, map); System.out.println( [*] Waiting for messages. To exit press CTRLC); DeliverCallback deliverCallback (consumerTag, delivery) - { String message new String(delivery.getBody(), UTF-8); System.out.println([x] Received message ); }; boolean autoAck true; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag - {}); } }消费者代码解释第 14 行之前的代码与生产者部分相同。DeliverCallback是一个回调函数当消费者收到消息时会自动调用该函数执行 Lambda 表达式中的逻辑consumerTagRabbitMQ 为当前消费者生成的唯一标识符。delivery封装了消息所有信息的对象包含消息体但它本身不是消息体。getBody()从delivery对象中取出消息体字节数组。basicConsume()方法用于消费队列中的消息其参数含义如下参数顺序含义说明1队列名称从哪个队列消费消息2是否自动确认autoAck为true时RabbitMQ 在发送消息后立即将其标记为已确认无论消费者是否处理成功。生产环境不建议开启否则消息可能丢失3消息回调收到消息后执行的回调函数4取消回调消费者被取消时执行的回调消费者端的流程总结等待消息 → 从队列中取出消息 → 自动确认如果启用→ 消费消息至此最简单的点对点生产者-队列-消费者模式就介绍完了。二、工作消息队列点对多模式问题在生产环境中点对点的单生产者-单消费者模式性能过低需要改为单生产者对应多个消费者以提升性能。解决方案代码与点对点模式相同只需同时启动多个消费者即可。RabbitMQ 会以轮询的方式平均向各个消费者发送消息从而提高消息的消费效率。消息持久化要实现消息持久化需要同时开启队列持久化和消息持久化。由于涉及磁盘 I/O 操作持久化模式的性能相对较低。队列持久化javaboolean durable true; MapString, Object args Map.of(x-queue-type, quorum); channel.queueDeclare(hello, durable, false, false, args);消息持久化javachannel.basicPublish(, task_queue, MessageProperties.PERSISTENT_TEXT_PLAIN, // 持久化标志 message.getBytes());公平分发问题轮询分发不够公平正在忙碌的消费者仍然会收到新消息。解决方案使用basicQos方法并将prefetchCount设置为 1。javaint prefetchCount 1; channel.basicQos(prefetchCount);这行代码的作用是告知 RabbitMQ 每次只向一个消费者发送不超过一条消息。在消费者处理并确认前一条消息之前RabbitMQ 不会向其发送新消息而是将新消息派发给下一个空闲的消费者。简单理解“我一次只处理 1 条消息没处理完别给我发新的。”公平分发模式相比轮询模式效率更高。三、交换机在实际生产环境中简单的点对点和点对多模式无法满足多样化的业务需求。交换机可以根据不同的类型或匹配规则将消息发送到对应的队列再由队列转发给消费者从而适应复杂的业务逻辑。交换机有以下几种可用类型direct、topic、headers、fanout以及默认的无名交换机。无名交换机默认交换机javachannel.basicPublish(, QUEUE_NAME, null, message.getBytes());在前面的示例中我们并没有显式使用交换机但仍然能够向队列发送消息这正是无名交换机发力了。basicPublish()方法的第一个参数空字符串就代表无名交换机。当使用无名交换机时消息会被转发到与第二个参数名称匹配的队列。fanout 交换机首先创建一个fanout类型的交换机命名为ex。由于生产者不关心消息具体发往哪个队列因此在生产者代码中只需要声明交换机即可无需声明队列。javachannel.exchangeDeclare(ex, fanout);fanout交换机的功能非常简单它会将接收到的消息广播给所有与之绑定的队列。这种模式常用于搭建日志系统。有了交换机之后如何让交换机把消息转发到指定的队列呢这就需要用到绑定Binding。javachannel.queueBind(queueName, ex, );通过绑定ex交换机会将消息转发到queueName队列中。第三个参数空字符串是路由键将在其他类型的交换机中详细讲解。使用交换机发送消息javachannel.basicPublish(ex, , null, message.getBytes());参数说明交换机名称ex路由规则空字符串fanout类型忽略路由键消息属性null消息体消息内容可以看到与仅使用队列时相比发送消息的代码有以下变化参数 1 从无名交换机空字符串变成了我们自己声明的fanout交换机。参数 2 从队列名变成了路由规则。使用交换机接收并消费消息java// 创建交换机fanout类型 channel.exchangeDeclare(ex, fanout); // 让 RabbitMQ 自动生成一个随机的队列并获取这个队列的名字 String queueName channel.queueDeclare().getQueue(); // 绑定交换机与队列fanout交换机routingKey为空字符串 channel.queueBind(queueName, ex, ); int prefetchCount 1; channel.basicQos(prefetchCount); DeliverCallback deliverCallback (String consumerTag, Delivery delivery) - { String message new String(delivery.getBody(), UTF-8); System.out.println([x] Received message ); }; boolean autoAck true; channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag - {});消费者代码的变动主要体现在创建交换机绑定交换机与队列需要注意的是消费者仍然需要声明队列这里使用的是 RabbitMQ 自动生成的随机队列因为消费者只能从队列中取出消息进行消费。Direct 交换机在实际生产环境中可能需要根据消息的重要程度进行不同的转发处理这时就需要用到direct交换机。direct交换机会将消息转发给路由键完全匹配的队列然后由对应的消费者取出消息进行消费。javachannel.queueBind(queueName, ex, user); channel.basicPublish(ex, user, null, message.getBytes());在这段代码中队列以user作为路由键绑定到交换机。生产者发布消息时指定的路由键也是user。交换机将消息的路由键与队列的路由键进行匹配匹配成功后将消息转发给对应的队列。如果多个队列绑定了同一个路由键交换机会将消息转发给所有这些队列。Topic 交换机问题在生产环境中有时希望一个队列能够同时监听不同路由的消息。解决方案使用topic交换机。topic交换机支持通配符可以实现模糊匹配的路由规则。通配符说明*星号可恰好替换一个单词。#井号可替代零个或多个单词。示例场景假设单词按顺序表示速度.颜色.动物物种则上图中的绑定关系可以总结为Q1监听所有橙色动物的消息。Q2监听所有兔子的消息以及所有懒动物的消息。通过topic交换机一个队列可以监听多个不同路由的消息从而应对复杂的业务场景。headers 交换机headers交换机依靠哈希表和字典匹配的方式进行路由性能比direct和topic交换机低很多因此本文不作展开。四、其他功能(待补充)发布者确认问题生产者发送消息后无法确认消息是否成功到达 RabbitMQ。解决方案使用发布者确认机制。当消息到达 RabbitMQ 时RabbitMQ 会向生产者返回一条确认消息如果未到达则抛出异常。单条消息确认java// 发送消息 String message hello; channel.basicPublish(, QUEUE_NAME, null, message.getBytes()); // 等待确认5秒超时 // 如果没有抛出异常说明消息已到达 RabbitMQ channel.waitForConfirmsOrDie(5_000);批量消息确认java// 开启确认模式 channel.confirmSelect(); // 批量确认参数 int batchSize 100; int count 0; // 发送 1000 条消息每 100 条确认一次 for (int i 0; i 1000; i) { String message msg i; channel.basicPublish(, QUEUE_NAME, null, message.getBytes()); count; if (count batchSize) { // 等待这一批消息确认5秒超时 channel.waitForConfirmsOrDie(5_000); System.out.println(Confirmed batchSize messages (up to msg i )); count 0; } } // 确认最后一批不足 batchSize 的消息 if (count 0) { channel.waitForConfirmsOrDie(5_000); System.out.println(Confirmed last count messages); }waitForConfirmsOrDie()方法会等待从上一次确认点到当前确认点之间的所有消息被确认通过这种方式实现批量消息确认。手动确认机制前面介绍basicConsume()方法时提到过autoAck自动确认参数。在生产环境中不建议开启自动确认而是使用手动确认机制。手动确认只需一行代码javachannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);这行代码的作用是告诉 RabbitMQ 服务器“这条消息我已经处理完了你可以将它从队列中删除了。”使用手动确认机制后如果消费者在处理消息时发生错误队列中的消息依然存在消费者可以重新尝试消费从而避免消息丢失。感谢浏览