三方队列

来自eyou项目的QueueThrd.php+信号处理

composer require anorgan/qutee
<?php

namespace app\common\service;

use Qutee\Queue;
use Qutee\Task;
use Qutee\Worker;
use think\Env;

/**
 * 队列
 * @copyright jcleng
 * @author jcleng
 */
class QueueThrd
{
    private $queue = null;
    public function __construct()
    {
        composerlib(); // include_once ROOT_PATH . 'composerlib/vendor/autoload.php';
        $host = Env::get('DATABASE_HOSTNAME');
        $dbname = Env::get('DATABASE_DATABASE');
        $username = Env::get('DATABASE_USERNAME');
        $password = Env::get('DATABASE_PASSWORD');
        $pdoParams = array(
            'dsn' => "mysql:host=$host;dbname=$dbname;charset=utf8mb4",
            'username' => $username,
            'password' => $password,
            'table_name' => \think\Config::get('database.prefix') . 'queue',
        );
        $queuePersistor = new \Qutee\Persistor\Pdo();
        $queuePersistor->setOptions($pdoParams);

        $this->queue = new Queue();
        $this->queue->setPersistor($queuePersistor);
    }
    /**
     * 推送队列
     *
     * @param string $class
     * @param string $method
     * @param array $data
     * @return void
     * @copyright jcleng
     * @author jcleng
     */
    public function push($class, $method, $data = [])
    {
        $task = new Task();
        $task
            ->setName($class)
            ->setMethodName($method)
            ->setData($data)
            ->setPriority(Task::PRIORITY_HIGH);
        return $this->queue->addTask($task);
    }
    /**
     * 消费队列, 可以多个同时运行
     * 消费异常也会标记is_taken为执行
     * php public/think test -f queueRun
     * @return void
     * @copyright jcleng
     * @author jcleng
     */
    public function workerProcess()
    {
        $worker = new Worker;
        $worker
            ->setQueue($this->queue)
            ->setInterval(1)
            ->setPriority(Task::PRIORITY_HIGH);
        // INFO: 信号处理
        if (PHP_OS == 'Linux') {
            Signal::init();
        }
        while (true) {
            if (PHP_OS == 'Linux') {
                Signal::checkExit();
            }
            try {
                // 可以运行队列
                if (null !== ($task = $worker->run())) {
                    echo 'Ran task: ' . $task->getName() . PHP_EOL;
                }
            } catch (\Exception $e) {
                echo 'Error: ' . $e->getMessage() . PHP_EOL;
                sleep(5);
            }
        }
    }
}