# 聊聊aio-pika异步消息队列的Python实现先说说这个库的来由。写过Python的人都知道RabbitMQ是个老朋友了之前大家用pika这个同步库用得也挺顺手。但随着异步编程在Python生态里越来越普及直接用同步库去调RabbitMQ就显得有点尴尬了——你这边跑着asyncio的事件循环那边一个blocking的函数调用直接就把整个循环卡住了。aio-pika就是专门解决这个问题的它基于pika的底层协议实现但完全走上了异步的道路。aio-pika究竟是什么说白了aio-pika就是RabbitMQ的异步客户端库。但它跟pika的那种异步不太一样pika虽然提供了所谓的异步模式但本质上是基于回调的写起来更像Node.js的感觉一旦逻辑复杂起来回调地狱就找上门了。而aio-pika完全拥抱了asyncio用的是async/await语法读起来跟同步代码的流程感差不多。这里有个比较容易踩的坑很多人以为aio-pika只是把pika的方法简单地包了一层async其实不是。两者虽然都是通过AMQP协议跟RabbitMQ通信但内部实现完全不同。pika是单线程的事件驱动模型aio-pika则是直接跑在asyncio的事件循环上这意味着如果你在现有异步项目里集成RabbitMQaio-pika的接入成本会低很多。它能解决什么实际问题既然说到实际问题就举一个场景假如你在做一个电商后台用户下单后需要做几件事写订单数据库、扣库存、发通知给仓库、给用户发邮件。这些事情如果同步做用户得等到所有操作完成才能看到下单成功的提示体验非常糟糕。用aio-pika就能解决这个问题下单后只需要把订单信息扔到消息队列里就返回用户下单成功然后让后台的几个消费者分别去处理数据库、仓库、邮件这些事情。这种做法既快又解耦仓库系统挂了也不会影响用户下单。另外一个常见场景是做削峰。双十一大促流量巨大直接打到数据库肯定扛不住。用aio-pika接住请求让消费者慢慢处理就能避免系统被瞬间冲垮。具体怎么上手使用先装包pipinstallaio-pika最基本的生产者就是这样importasyncioimportaio_pikaasyncdefsend():connectionawaitaio_pika.connect_robust(amqp://guest:guestlocalhost/)asyncwithconnection:channelawaitconnection.channel()# 声明队列durable表示消息持久化到磁盘queueawaitchannel.declare_queue(task_queue,durableTrue)awaitchannel.default_exchange.publish(aio_pika.Message(bodybHello World,delivery_modeaio_pika.DeliveryMode.PERSISTENT),routing_keytask_queue)消费者这边稍微要注意的是消息确认机制asyncdefprocess_message(message:aio_pika.IncomingMessage):asyncwithmessage.process():# 处理消息的逻辑如果这里抛出异常消息会被重入队列print(f收到消息:{message.body.decode()})# 模拟耗时的业务处理awaitasyncio.sleep(1)# 退出with块时自动发送ackasyncdefconsume():connectionawaitaio_pika.connect_robust(amqp://guest:guestlocalhost/)asyncwithconnection:channelawaitconnection.channel()# 设置prefetch每次只拿一条消息处理完再拿下一条awaitchannel.set_qos(prefetch_count1)queueawaitchannel.declare_queue(task_queue,durableTrue)asyncwithqueue.iterator()asqueue_iter:asyncformessageinqueue_iter:awaitprocess_message(message)有个细节值得提一下message.process()这个上下文管理器会自动处理ack/nack但如果你在上下文管理器外面用了message.ack()之类的操作很容易出问题流程不够清晰时就容易把连接搞乱。建议统一用async with message.process()的模式。实际项目中积累的一些经验消息持久化非常重要。很多人在开发初期图省事觉得消息不持久化也无所谓结果RabbitMQ一重启队列里的消息全没了。生产环境里一定要把delivery_mode设为PERSISTENT同时队列也要声明为durableTrue。关于连接管理connect_robust比普通的connect好很多。生产环境里网络抖动是常态connect_robust会自动重连适合长时间运行的生产者或消费者。但要注意一点重连之后之前在channel上做的一些设置比如exchange声明、queue绑定会丢失所以需要在重连后重新初始化。一个比较常见的做法是写个包装类在检测到连接断开时重新设置这些。另外一个容易忽视的是prefetch的设置。如果不设置prefetch_countRabbitMQ会默认把队列里所有消息一次性推给消费者如果你的消费者处理得慢内存很容易被撑爆。一般设置成1就够用但如果是批量处理的场景比如10条消息一起处理做批量写入可以适当调大。跟同类工具的取舍跟aio-pika功能类似的有aioamqp和pamqp三者都支持异步但各有特点aioamqp实现更轻量代码量小但功能相对少一些有些高级特性比如死信队列、延时队列支持得不够完善。pamqp更底层基本就是裸的AMQP协议解析适合需要做定制化开发的场景但直接用在业务代码里成本太高。aio-pika算是站在了它们中间。它封装得比较完整该有的特性都有交换机类型、绑定、死信、优先级、RPC等同时API设计得比较符合Python的习惯。另外aio-pika有个优势是文档写得清楚示例代码也完整这点对新手比较友好。如果要说不好的地方aio-pika对RabbitMQ 3.8以后新增的一些特性支持得不太及时。比如流式队列Stream Queue这种新功能aio-pika的支持进度就比官方客户端慢一拍。不过大部分场景下基础的exchange/queue/binding机制就足够用了。最后说一句如果你本身不是重度使用RabbitMQ的场景可能会觉得aio-pika的抽象层级有点厚。但一旦业务复杂到需要处理各种交换机路由、死信重投、延迟消息aio-pika提供的这些抽象就能帮你省很多事。