首页 > 编程知识 正文

tp6 队列,thinkphp5消息队列

时间:2023-05-04 13:54:21 阅读:242481 作者:2828

tp6中提供了很多种消息队列的链接方式,这里使用Redis数据库链接方式

免费小程序+公众号源码获取:点击获取

1.配置消息队列链接方式
首先配置/config/queue.php中的redis配置,这个配置可从.env当中读取如下图

2.下发消息队列
把tp原来的方法稍作调整,把延迟消息队列和正常的放在一起方便调用
整理出多任务和单任务的快捷使用方式

<?phpnamespace crmebutils;use crmebtraitsErrorTrait;use thinkfacadeConfig;use thinkfacadeQueue as QueueThink;/** * Class Queue * @package crmebutils * @method $this do(string $do) 设置任务执行方法 * @method $this job(string $job) 设置任务执行类名 * @method $this errorCount(int $errorCount) 执行失败次数 * @method $this data(...$data) 执行数据 * @method $this secs(int $secs) 延迟执行秒数 * @method $this log($log) 记录日志 */class Queue{ use ErrorTrait; /** * 任务执行 * @var string */ protected $do = 'doJob'; /** * 默认任务执行方法名 * @var string */ protected $defaultDo; /** * 任务类名 * @var string */ protected $job; /** * 错误次数 * @var int */ protected $errorCount = 3; /** * 数据 * @var array|string */ protected $data; /** * 任务名 * @var null */ protected $queueName = null; /** * 延迟执行秒数 * @var int */ protected $secs = 0; /** * 记录日志 * @var string|callable|array */ protected $log; /** * @var array */ protected $rules = ['do', 'data', 'errorCount', 'job', 'secs', 'log']; /** * @var static */ protected static $instance; /** * Queue constructor. */ protected function __construct() { $this->defaultDo = $this->do; } /** * @return static */ public static function instance() { if (is_null(self::$instance)) { self::$instance = new static(); } return self::$instance; } /** * 放入消息队列 * @param array|null $data * @return mixed */ public function push(?array $data = null) { if (!$this->job) { return $this->setError('需要执行的队列类必须存在'); } $res = QueueThink::{$this->action()}(...$this->getValues($data)); $this->clean(); return $res; } /** * 清除数据 */ public function clean() { $this->secs = 0; $this->data = []; $this->log = null; $this->queueName = null; $this->errorCount = 3; $this->do = $this->defaultDo; } /** * 获取任务方式 * @return string */ protected function action() { return $this->secs ? 'later' : 'push'; } /** * 获取参数 * @param $data * @return array */ protected function getValues($data) { $jobData['data'] = $data ?: $this->data; $jobData['do'] = $this->do; $jobData['errorCount'] = $this->errorCount; $jobData['log'] = $this->log; if ($this->do != $this->defaultDo) { $this->job .= '@' . Config::get('queue.prefix', 'eb_') . $this->do; } if ($this->secs) { return [$this->secs, $this->job, $jobData, $this->queueName]; } else { return [$this->job, $jobData, $this->queueName]; } } /** * @param $name * @param $arguments * @return $this */ public function __call($name, $arguments) { if (in_array($name, $this->rules)) { if ($name === 'data') { $this->{$name} = $arguments; } else { $this->{$name} = $arguments[0] ?? null; } return $this; } else { throw new RuntimeException('Method does not exist' . __CLASS__ . '->' . $name . '()'); } }}

3.放入任务

<?phpnamespace appadminapicontroller;use crmebutilsQueue;class Test{public function index(){//单例调用Queue::instance()->job(WechatTemplateJob::class)//执行任务类名->data()//执行任务需要的参数,可无限制传数//->secs(1) 延迟1秒后执行任务->push();//放入任务}}

4.写消费任务逻辑

<?php/** * @author: liaofei<136327134@qq.com> * @day: 2020/5/21 */namespace crmebjobs;use crmebbasicBaseJob;use crmebservicestemplateTemplate;use thinkfacadeRoute;/** * Class WechatTemplateJob * @package crmebjobs */class WechatTemplateJob{/** * @param $name * @param $arguments */ public function __call($name, $arguments) { $this->fire(...$arguments); } /** * @param Job $job * @param $data */ public function fire(Job $job, $data): void { try { $action = $data['do'] ?? 'doJob';//任务名 $infoData = $data['data'] ?? [];//执行数据 $errorCount = $data['errorCount'] ?? 0;//最大错误次数 $log = $data['log'] ?? null; if (method_exists($this, $action)) { if ($this->{$action}(...$infoData)) { //删除任务 $job->delete(); //记录日志 $this->info($log); } else { if ($job->attempts() >= $errorCount && $errorCount) { //删除任务 $job->delete(); //记录日志 $this->info($log); } else { //从新放入队列 $job->release(); } } } else { $job->delete(); } } catch (Throwable $e) { $job->delete(); Log::error('执行消息队列发成错误,错误原因:' . $e->getMessage()); } } /** * 打印出成功提示 * @param $log * @return bool */ protected function info($log) { try { if (is_callable($log)) { print_r($log() . "rn"); } else if (is_string($log) || is_array($log)) { print_r($log . "rn"); } } catch (Throwable $e) { print_r($e->getMessage()); } } /** * 任务失败执行方法 * @param $data * @param $e */ public function failed($data, $e) { } //可把面的几个方法写成另外一个类里面继承给当前类//doJob的参数由放入任务方法中的data决定public function doJob(){//执行逻辑//成功返回true,失败返回fasle,会再次加入队列。执行到最大错误后自动删除return true;}}

5.启动消息队列
windows用户可在项目根目录开启命令行输入命令

php think queue:listen --queue CRMEB

进行启动消息队列
Linux需要守护进程才可以,推荐使用Supervisor命令都是一样的

启动报错,按照提示解禁对应函数就行了

启动后就行看到队列加入和消费的日志

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。