三方队列
来自eyou项目的QueueThrd.php+信号处理
composer require anorgan/qutee
<?php
namespace app\common\service;
use Qutee\Queue;
use Qutee\Task;
use Qutee\Worker;
use think\Env;
/**
* 队列
* @copyright jcleng
* @author jcleng
*/
class QueueThrd
{
private $queue = null;
public function __construct()
{
composerlib(); // include_once ROOT_PATH . 'composerlib/vendor/autoload.php';
$host = Env::get('DATABASE_HOSTNAME');
$dbname = Env::get('DATABASE_DATABASE');
$username = Env::get('DATABASE_USERNAME');
$password = Env::get('DATABASE_PASSWORD');
$pdoParams = array(
'dsn' => "mysql:host=$host;dbname=$dbname;charset=utf8mb4",
'username' => $username,
'password' => $password,
'table_name' => \think\Config::get('database.prefix') . 'queue',
);
$queuePersistor = new \Qutee\Persistor\Pdo();
$queuePersistor->setOptions($pdoParams);
$this->queue = new Queue();
$this->queue->setPersistor($queuePersistor);
}
/**
* 推送队列
*
* @param string $class
* @param string $method
* @param array $data
* @return void
* @copyright jcleng
* @author jcleng
*/
public function push($class, $method, $data = [])
{
$task = new Task();
$task
->setName($class)
->setMethodName($method)
->setData($data)
->setPriority(Task::PRIORITY_HIGH);
return $this->queue->addTask($task);
}
/**
* 消费队列, 可以多个同时运行
* 消费异常也会标记is_taken为执行
* php public/think test -f queueRun
* @return void
* @copyright jcleng
* @author jcleng
*/
public function workerProcess()
{
$worker = new Worker;
$worker
->setQueue($this->queue)
->setInterval(1)
->setPriority(Task::PRIORITY_HIGH);
// INFO: 信号处理
if (PHP_OS == 'Linux') {
Signal::init();
}
while (true) {
if (PHP_OS == 'Linux') {
Signal::checkExit();
}
try {
// 可以运行队列
if (null !== ($task = $worker->run())) {
echo 'Ran task: ' . $task->getName() . PHP_EOL;
}
} catch (\Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
sleep(5);
}
}
}
}