rabbitmq配置
安装
nix-env -iA nixpkgs.rabbitmq-server
查看
rabbitmq-defaults
which rabbitmq-defaults
cat /Users/jcleng/.nix-profile/bin/rabbitmq-defaults
ls -l /Users/jcleng/.nix-profile/bin/rabbitmq-defaults
cd /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9
# 修改一下SYS_PREFIX
SYS_PREFIX=/nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9
# 新增PLUGINS_DIR,指定插件目录
PLUGINS_DIR="/nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/plugins"
# 一些必要的目录
sudo mkdir -p /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/
sudo chmod 777 /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/
sudo mkdir -p /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/data
sudo chmod 777 /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/data
sudo mkdir -p /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/logs
sudo chmod 777 /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/logs
touch /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/rabbitmq-env.conf
code /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/rabbitmq-env.conf
#配置文件
touch /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/rabbitmq.conf
code /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/rabbitmq.conf
# 编辑配置文件
# 编辑rabbitmq-env.conf
code rabbitmq-env.conf
RABBITMQ_MNESIA_BASE=/nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/data
RABBITMQ_LOG_BASE=/nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/logs
# RABBITMQ_PLUGINS_DIR=/nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/plugins
RABBITMQ_PID_FILE=/tmp/rabbitmq.pid
# 运行
rabbitmq-server
# 添加web管理插件()
rabbitmq-plugins enable rabbitmq_management
# 访问: http://127.0.0.1:15672/
# 账户密码: guest/guest
新建管理用户
rabbitmqctl add_user leng leng123
rabbitmqctl set_user_tags leng administrator
客户端连接参考RabbitMQ 配置 SSL/TSL
# 证书不对会提示: Broken pipe or closed connection in ...
# 服务端私钥和证书文件配置
ssl_options.cacertfile = /home/rabbitmq/etc/rabbitmq/ssl/rmqca/cacert.pem
ssl_options.certfile = /home/rabbitmq/etc/rabbitmq/ssl/server/cert.pem
ssl_options.keyfile = /home/rabbitmq/etc/rabbitmq/ssl/server/key.pem
# 有verify_none和verify_peer两个选项,verify_none表示完全忽略验证证书的结果,verify_peer表示要求验证对方证书
ssl_options.verify = verify_peer
# 若为true,服务端会向客户端索要证书,若客户端无证书则中止SSL握手;若为false,则客户端没有证书时依然可完成SSL握手
ssl_options.fail_if_no_peer_cert = true
php测试,安装包
"php-amqplib/php-amqplib": "^2.12",
<?php
include "./vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$conf = [
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'pwd' => 'guest',
'vhost' => '/',
];
$exchangeName = 'queue_default'; //交换机名
$queueName = 'queue_message'; //队列名称
$routingKey = 'reg_message'; //路由关键字(也可以省略)
$conn = new AMQPStreamConnection( //建立生产者与mq之间的连接
$conf['host'],
$conf['port'],
$conf['user'],
$conf['pwd'],
$conf['vhost']
);
$channel = $conn->channel(); //在已连接基础上建立生产者与mq之间的通道
$channel->exchange_declare($exchangeName, 'direct', false, true, false); //声明初始化交换机
$channel->queue_declare($queueName, false, true, false, false); //声明初始化一条队列
$channel->queue_bind($queueName, $exchangeName, $routingKey); //将队列与某个交换机进行绑定,并使用路由关键字
$msgBody = json_encode(["name" => "iGoo", "age" => 22]);
$msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]); //生成消息
$r = $channel->basic_publish($msg, $exchangeName, $routingKey); //推送消息到某个交换机
$channel->close();
$conn->close();
<?php
include "./vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$conf = [
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'pwd' => 'guest',
'vhost' => '/',
];
$exchangeName = 'queue_default'; //交换机名
$queueName = 'queue_message'; //队列名称
$routingKey = 'reg_message'; //路由关键字(也可以省略)
$conn = new AMQPStreamConnection( //建立生产者与mq之间的连接
$conf['host'],
$conf['port'],
$conf['user'],
$conf['pwd'],
$conf['vhost']
);
$channel = $conn->channel(); //在已连接基础上建立生产者与mq之间的通道
$channel->exchange_declare($exchangeName, 'direct', false, true, false); //声明初始化交换机
$channel->queue_declare($queueName, false, true, false, false); //声明初始化一条队列
$channel->queue_bind($queueName, $exchangeName, $routingKey); //将队列与某个交换机进行绑定,并使用路由关键字
// 消息处理的逻辑回调函数
$callback = function ($msg) {
echo " [x] Received ", $msg->body, "\n";
};
/**
* queue: hello // 被消费的队列名称
* consumer_tag: consumer_tag // 消费者客户端身份标识,用于区分多个客户端
* no_local: false // 这个功能属于AMQP的标准,但是RabbitMQ并没有做实现
* no_ack: true // 收到消息后,是否不需要回复确认即被认为被消费
* exclusive: false // 是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下
* nowait: false // 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错
* callback: $callback // 回调逻辑处理函数
*/
$channel->basic_consume($queueName, false, false, true, false, false, $callback);
// 程序运行完成后关闭链接
function shutdown($channel, $connection)
{
$channel->close();
$connection->close();
}
register_shutdown_function('shutdown', $channel, $connection);
// 阻塞队列监听事件
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$conn->close();
docker镜像
# 拉带有management的,加速: https://hub.daocloud.io/
docker pull daocloud.io/library/rabbitmq:3.7.28-management
# run, 指定密码和用户
docker run -idt --name rabbitmq -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password daocloud.io/library/rabbitmq:3.7.28-management
# 访问管理面板
http://172.17.0.3:15672/
tp6使用think-queue接入rabbitmq使用enqueue/enqueue
# 依赖
"require": {
"php": ">=7.1.0",
"topthink/framework": "^6.0.0",
"topthink/think-orm": "^2.0",
"enqueue/enqueue": "^0.10.15",
"enqueue/amqp-lib": "^0.10.9",
"enqueue/amqp-tools": "^0.10.9",
"topthink/think-queue": "^3.0"
},
Connector实现
<?php
namespace app\controller;
use app\BaseController;
use app\model\Test;
use Enqueue\AmqpLib\AmqpConnectionFactory;
use Enqueue\AmqpLib\AmqpContext;
use Interop\Amqp\AmqpQueue;
use Interop\Amqp\Impl\AmqpBind;
use Interop\Amqp\AmqpTopic;
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
use Interop\Queue\Message;
use Interop\Queue\Consumer;
use Interop\Amqp\AmqpConsumer;
use Interop\Amqp\AmqpMessage;
use think\queue\Connector;
use think\queue\InteractsWithTime;
use Interop\Queue\Context;
use app\controller\RabbitMqJob;
use PhpAmqpLib\Exception\AMQPIOException;
class RabbitMq extends Connector
{
use InteractsWithTime;
protected $rabbit;
/**
* The name of the default queue.
*
* @var string
*/
protected $default;
/**
* The expiration time of a job.
*
* @var int|null
*/
protected $retryAfter = 60;
/**
* The maximum number of seconds to block for a job.
*
* @var int|null
*/
protected $blockFor = null;
public function __construct($rabbit, $default = 'default', $retryAfter = 60, $blockFor = null)
{
$this->rabbit = $rabbit;
$this->default = $default;
$this->retryAfter = $retryAfter;
$this->blockFor = $blockFor;
}
public static function __make($config)
{
$rabbit = new class($config)
{
protected $config;
protected $client;
public function __construct($config)
{
$this->config = $config;
$this->client = $this->createClient();
}
protected function createClient()
{
$config = $this->config;
$factory = new AmqpConnectionFactory([
'host' => $config['host'] ?? '',
'port' => $config['port'] ?? '5672',
'vhost' => $config['vhost'] ?? '/',
'user' => $config['user'] ?? 'guest',
'pass' => $config['pass'] ?? 'guest',
'persisted' => $config['persisted'] ?? false,
]);
$client = $factory->createContext();
return $client;
}
public function __call($name, $arguments)
{
try {
return call_user_func_array([$this->client, $name], $arguments);
} catch (AMQPIOException $e) {
if (strpos($e->getMessage(), 'went away') !== false) {
$this->client = $this->createClient();
}
throw $e;
}
}
};
return new self($rabbit, $config['queue'], $config['retry_after'] ?? 60, $config['block_for'] ?? null);
}
public function size($queue = null)
{
return null;
}
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}
public function pushRaw($payload, $queue = null, array $options = [])
{
$queue_name = $this->getQueue($queue);
$context = $this->rabbit;
// 创建路由
$fooTopic = $context->createTopic('tp_rabbbit');
$fooTopic->setType(AmqpTopic::TYPE_FANOUT);
$context->declareTopic($fooTopic);
// 创建队列
$fooQueue = $context->createQueue($queue_name);
$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
$context->declareQueue($fooQueue);
$context->bind(new AmqpBind($fooTopic, $fooQueue));
// 发送信息
$message = $context->createMessage($payload);
$context->createProducer()
->send($fooTopic, $message);
}
public function later($delay, $job, $data = '', $queue = null)
{
return $this->laterRaw($delay, $this->createPayload($job, $data), $queue);
}
protected function laterRaw($delay, $payload, $queue = null)
{
$queue_name = $this->getQueue($queue);
$context = $this->rabbit;
// 创建路由
$fooTopic = $context->createTopic('tp_rabbbit');
$fooTopic->setType(AmqpTopic::TYPE_FANOUT);
$context->declareTopic($fooTopic);
// 创建队列
$fooQueue = $context->createQueue($queue_name);
$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
$context->declareQueue($fooQueue);
$context->bind(new AmqpBind($fooTopic, $fooQueue));
// 发送信息
$message = $context->createMessage($payload);
$context->createProducer()
->setDelayStrategy(new RabbitMqDlxDelayStrategy())
->setDeliveryDelay($delay * 1000)
->send($fooTopic, $message);
}
// 获取一个队列任务 然后返回RabbitMqJob()
public function pop($queue = null)
{
$context = $this->rabbit;
$queue_name = $this->getQueue($queue);
$fooQueue = $context->createQueue($queue_name);
$consumer = $context->createConsumer($fooQueue);
$message = $consumer->receive();
$job = [
'message' => $message,
'consumer' => $consumer,
];
$body = $message->getBody();
$reserved = false;
if ($body) {
$reserved = json_decode((string)$body, true);
$reserved['attempts'] = ($reserved['attempts'] ?? 0) + 1;
$reserved = json_encode($reserved);
$message->setBody($reserved);
}
return new RabbitMqJob($this->app, $this, $job, $reserved, $this->connection, $queue);
}
public function getQueue($queue)
{
$queue = $queue ?: $this->default;
return "{queues:{$queue}}";
}
/**
* 重新发布任务
*
* @param string $queue
* @param StdClass $job
* @param int $delay
* @return mixed
*/
public function release($queue, $job, $delay)
{
throw new \Exception('自行实现');
}
}
job实现
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <[email protected]>
// +----------------------------------------------------------------------
namespace app\controller;
use think\App;
use app\controller\RabbitMq as RabbitMqQueue;
use think\queue\Job;
class RabbitMqJob extends Job
{
/**
* The rabbit queue instance.
*/
protected $rabbit;
/**
* The database job payload.
* @var Object
*/
protected $job;
/**
* The JSON decoded version of "$job".
*
* @var array
*/
protected $decoded;
/**
* The Redis job payload inside the reserved queue.
*
* @var string
*/
protected $reserved;
public function __construct(App $app, RabbitMqQueue $rabbit, $job, $reserved, $connection, $queue)
{
$this->app = $app;
$this->job = $job;
$this->queue = $queue;
$this->connection = $connection;
$this->rabbit = $rabbit;
$this->reserved = $reserved;
$this->decoded = $this->payload();
}
/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return $this->decoded['id'] ?? null;
}
/**
* Get the number of times the job has been attempted.
* @return int
*/
public function attempts()
{
return ($this->decoded['attempts'] ?? null) + 1;
}
/**
* Get the raw body string for the job.
* @return string
*/
public function getRawBody()
{
return $this->job['message']->getBody();
}
/**
* 删除任务
* @return void
*/
public function delete()
{
parent::delete();
$this->job['consumer']->acknowledge($this->job['message']);
}
// 重新发布
public function release($delay = 0)
{
parent::release($delay);
$this->delete();
$this->rabbit->push($this->queue, $this->job, $delay);
}
}