ThinkPHP6 消息队列 think-queue:从配置到高可用部署实战
1. 消息队列基础与ThinkPHP6集成消息队列是现代Web开发中不可或缺的组件特别是在电商系统这类高并发场景下。想象一下双11期间每秒上万笔订单涌入系统如果每个订单都同步处理库存扣减、优惠券核销、物流单生成等操作服务器瞬间就会崩溃。这时候消息队列就像个高效的订单分拣中心把请求先收进来排队后台慢慢处理。ThinkPHP6官方扩展think-queue提供了开箱即用的队列支持我实测下来发现它有几个明显优势配置简单相比直接使用Redis队列、支持多种驱动Redis/Database/Sync、内置失败重试机制。下面我们从一个电商订单处理的实际案例出发手把手搭建完整解决方案。先来看基础环境准备# 确保已安装Redis服务 sudo apt-get install redis-server redis-cli ping # 应该返回PONG # 在ThinkPHP6项目中安装队列扩展 composer require topthink/think-queue配置文件位于config/queue.php建议这样设置return [ default redis, // 默认使用Redis驱动 connections [ redis [ type redis, queue default, host env(redis.host, 127.0.0.1), port env(redis.port, 6379), password env(redis.password, ), select 0, // 使用Redis的0号数据库 timeout 0, // 不超时 persistent false, // 非持久化连接 ], ], failed [ // 失败任务记录 type database, table failed_jobs, ], ];2. 订单异步处理实战开发电商场景最典型的就是订单异步处理。我们创建一个订单处理任务类// app/job/OrderProcess.php namespace app\job; use think\queue\Job; use app\model\Order; class OrderProcess { public function fire(Job $job, $data) { $orderId $data[order_id]; try { // 1. 扣减库存 $this-reduceInventory($orderId); // 2. 核销优惠券 $this-useCoupon($orderId); // 3. 生成物流单 $this-createShipping($orderId); $job-delete(); // 任务成功则删除 // 可记录成功日志 } catch (\Exception $e) { // 失败处理逻辑 if ($job-attempts() 3) { $this-logFailedJob($orderId, $e); $job-delete(); return false; } // 10秒后重试 $job-release(10); } } // 其他业务方法... }在控制器中触发队列任务// app/controller/Order.php public function create() { $orderData input(); // 获取订单数据 // 立即持久化订单到数据库 $order Order::create($orderData); // 推送到队列异步处理后续逻辑 $isPushed Queue::push( app\job\OrderProcess, [order_id $order-id], order_queue // 指定队列名称 ); return $isPushed ? json([code200, msg订单创建成功]) : json([code500, msg系统繁忙请重试]); }3. 高级队列配置与优化实际生产环境中简单的队列配置远远不够。我们需要考虑以下几个关键点3.1 多队列优先级管理电商系统通常需要区分订单优先级比如VIP用户订单需要优先处理// 推送到高优先级队列 Queue::push(app\job\OrderProcess, $data, vip_order_queue); // 普通队列 Queue::push(app\job\OrderProcess, $data, normal_order_queue);启动不同优先级的消费者# 先处理VIP订单再处理普通订单 php think queue:listen --queue vip_order_queue,normal_order_queue3.2 延迟队列实现有些业务需要延迟处理比如30分钟内未支付自动取消订单// 30分钟后执行 Queue::later(1800, app\job\OrderCancel, $orderId, delay_queue);3.3 性能调优参数根据服务器配置调整消费者参数php think queue:work \ --queue order_queue \ --delay 5 \ # 失败后延迟5秒重试 --memory 256 \ # 内存限制256M --sleep 3 \ # 无任务时休眠3秒 --tries 3 \ # 最大重试次数 --timeout 120 # 单个任务最长执行时间建议的监控指标队列积压数量通过redis-cli LLEN queues:order_queue查看消费者进程内存占用任务平均处理时长4. 高可用部署方案线上环境必须保证队列服务的稳定性我分享几个实战经验4.1 Supervisor进程守护安装配置Supervisorsudo apt-get install supervisor创建配置文件/etc/supervisor/conf.d/queue.conf[program:think_queue] commandphp /var/www/your_project/think queue:work --queue order_queue --tries 3 directory/var/www/your_project userwww-data autostarttrue autorestarttrue startretries3 numprocs4 # 启动4个进程 process_name%(program_name)s_%(process_num)02d stderr_logfile/var/log/supervisor/queue_err.log stdout_logfile/var/log/supervisor/queue_out.log管理命令sudo supervisorctl reread sudo supervisorctl update sudo supervisorctl start think_queue:*4.2 失败任务处理think-queue支持失败任务记录首先创建数据表php think queue:failed-table php think migrate:run然后在配置中启用failed [ type database, table failed_jobs, ],可以定期检查失败任务并手动处理$faileds Db::name(failed_jobs)-select(); foreach ($faileds as $failed) { // 分析失败原因并处理 }4.3 Redis高可用方案生产环境建议使用Redis集群或哨兵模式配置示例redis [ type redis, host [ tcp://redis1:6379, tcp://redis2:6379?aliasslave, ], options [ replication sentinel, service mymaster, parameters [ password your_redis_password, database 0, ], ], ],5. 监控与报警机制完善的监控体系能提前发现问题我常用的方案Prometheus Grafana监控自定义指标导出器统计队列积压量设置消费者进程存活检测关键指标报警# 检查队列积压的Shell脚本 backlog$(redis-cli LLEN queues:order_queue) if [ $backlog -gt 1000 ]; then # 触发邮件/短信报警 fi日志分析ELK收集Supervisor日志分析任务处理时长分布监控异常错误频率在电商项目中我通常会为关键队列设置SLA比如普通订单处理延迟不超过5分钟VIP订单处理延迟不超过1分钟失败重试间隔采用指数退避策略这些经验都是在实际项目中踩坑总结出来的。记得第一次上线队列服务时因为没有设置进程守护半夜消费者挂了导致积压上万订单第二天运营直接炸锅。后来引入Supervisor监控才彻底解决问题。