rabbitmq配置

  • 安装

nix-env -iA nixpkgs.rabbitmq-server
  • 查看rabbitmq-defaults

which rabbitmq-defaults
cat /Users/jcleng/.nix-profile/bin/rabbitmq-defaults
ls -l /Users/jcleng/.nix-profile/bin/rabbitmq-defaults
cd /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9


# 修改一下SYS_PREFIX
SYS_PREFIX=/nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9
# 新增PLUGINS_DIR,指定插件目录
PLUGINS_DIR="/nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/plugins"


# 一些必要的目录
sudo mkdir -p /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/
sudo chmod 777 /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/

sudo mkdir -p /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/data
sudo chmod 777 /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/data

sudo mkdir -p /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/logs
sudo chmod 777 /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/logs

touch /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/rabbitmq-env.conf
code /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/rabbitmq-env.conf

#配置文件
touch /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/rabbitmq.conf
code /nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/rabbitmq.conf


# 编辑配置文件
# 编辑rabbitmq-env.conf
code rabbitmq-env.conf
RABBITMQ_MNESIA_BASE=/nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/data
RABBITMQ_LOG_BASE=/nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/etc/rabbitmq/logs
# RABBITMQ_PLUGINS_DIR=/nix/store/9ri2lqg5q1rinxiyyg3dcnb3lm06ack1-rabbitmq-server-3.8.9/plugins
RABBITMQ_PID_FILE=/tmp/rabbitmq.pid

# 运行
rabbitmq-server
# 添加web管理插件()
rabbitmq-plugins enable rabbitmq_management

# 访问: http://127.0.0.1:15672/
# 账户密码: guest/guest
  • 新建管理用户

rabbitmqctl add_user leng leng123
rabbitmqctl set_user_tags leng administrator
# 证书不对会提示: Broken pipe or closed connection in ...
# 服务端私钥和证书文件配置
ssl_options.cacertfile = /home/rabbitmq/etc/rabbitmq/ssl/rmqca/cacert.pem
ssl_options.certfile = /home/rabbitmq/etc/rabbitmq/ssl/server/cert.pem
ssl_options.keyfile = /home/rabbitmq/etc/rabbitmq/ssl/server/key.pem

# 有verify_none和verify_peer两个选项,verify_none表示完全忽略验证证书的结果,verify_peer表示要求验证对方证书
ssl_options.verify = verify_peer
# 若为true,服务端会向客户端索要证书,若客户端无证书则中止SSL握手;若为false,则客户端没有证书时依然可完成SSL握手
ssl_options.fail_if_no_peer_cert = true
  • php测试,安装包"php-amqplib/php-amqplib": "^2.12",

<?php
include "./vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

$conf = [
    'host' => '127.0.0.1',
    'port' => 5672,
    'user' => 'guest',
    'pwd' => 'guest',
    'vhost' => '/',
];
$exchangeName = 'queue_default'; //交换机名
$queueName = 'queue_message'; //队列名称
$routingKey = 'reg_message'; //路由关键字(也可以省略)

$conn = new AMQPStreamConnection( //建立生产者与mq之间的连接
    $conf['host'],
    $conf['port'],
    $conf['user'],
    $conf['pwd'],
    $conf['vhost']
);
$channel = $conn->channel(); //在已连接基础上建立生产者与mq之间的通道


$channel->exchange_declare($exchangeName, 'direct', false, true, false); //声明初始化交换机
$channel->queue_declare($queueName, false, true, false, false); //声明初始化一条队列
$channel->queue_bind($queueName, $exchangeName, $routingKey); //将队列与某个交换机进行绑定,并使用路由关键字

$msgBody = json_encode(["name" => "iGoo", "age" => 22]);
$msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]); //生成消息
$r = $channel->basic_publish($msg, $exchangeName, $routingKey); //推送消息到某个交换机


$channel->close();
$conn->close();
<?php
include "./vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

$conf = [
    'host' => '127.0.0.1',
    'port' => 5672,
    'user' => 'guest',
    'pwd' => 'guest',
    'vhost' => '/',
];
$exchangeName = 'queue_default'; //交换机名
$queueName = 'queue_message'; //队列名称
$routingKey = 'reg_message'; //路由关键字(也可以省略)

$conn = new AMQPStreamConnection( //建立生产者与mq之间的连接
    $conf['host'],
    $conf['port'],
    $conf['user'],
    $conf['pwd'],
    $conf['vhost']
);
$channel = $conn->channel(); //在已连接基础上建立生产者与mq之间的通道


$channel->exchange_declare($exchangeName, 'direct', false, true, false); //声明初始化交换机
$channel->queue_declare($queueName, false, true, false, false); //声明初始化一条队列
$channel->queue_bind($queueName, $exchangeName, $routingKey); //将队列与某个交换机进行绑定,并使用路由关键字

// 消息处理的逻辑回调函数
$callback = function ($msg) {
    echo " [x] Received ", $msg->body, "\n";
};

/**
 * queue: hello               // 被消费的队列名称
 * consumer_tag: consumer_tag // 消费者客户端身份标识,用于区分多个客户端
 * no_local: false            // 这个功能属于AMQP的标准,但是RabbitMQ并没有做实现
 * no_ack: true               // 收到消息后,是否不需要回复确认即被认为被消费
 * exclusive: false           // 是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下
 * nowait: false              // 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错
 * callback: $callback        // 回调逻辑处理函数
 */
$channel->basic_consume($queueName, false, false, true, false, false, $callback);

// 程序运行完成后关闭链接
function shutdown($channel, $connection)
{
    $channel->close();
    $connection->close();
}
register_shutdown_function('shutdown', $channel, $connection);

// 阻塞队列监听事件
while (count($channel->callbacks)) {
    $channel->wait();
}



$channel->close();
$conn->close();
  • docker镜像

# 拉带有management的,加速: https://hub.daocloud.io/
docker pull daocloud.io/library/rabbitmq:3.7.28-management

# run, 指定密码和用户
docker run -idt --name rabbitmq -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password daocloud.io/library/rabbitmq:3.7.28-management

# 访问管理面板
http://172.17.0.3:15672/
  • tp6使用think-queue接入rabbitmq使用enqueue/enqueue

# 依赖
"require": {
    "php": ">=7.1.0",
    "topthink/framework": "^6.0.0",
    "topthink/think-orm": "^2.0",
    "enqueue/enqueue": "^0.10.15",
    "enqueue/amqp-lib": "^0.10.9",
    "enqueue/amqp-tools": "^0.10.9",
    "topthink/think-queue": "^3.0"
},

Connector实现

<?php

namespace app\controller;

use app\BaseController;
use app\model\Test;
use Enqueue\AmqpLib\AmqpConnectionFactory;
use Enqueue\AmqpLib\AmqpContext;
use Interop\Amqp\AmqpQueue;
use Interop\Amqp\Impl\AmqpBind;
use Interop\Amqp\AmqpTopic;
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
use Interop\Queue\Message;
use Interop\Queue\Consumer;
use Interop\Amqp\AmqpConsumer;
use Interop\Amqp\AmqpMessage;
use think\queue\Connector;
use think\queue\InteractsWithTime;
use Interop\Queue\Context;
use app\controller\RabbitMqJob;
use PhpAmqpLib\Exception\AMQPIOException;

class RabbitMq extends Connector
{
    use InteractsWithTime;


    protected $rabbit;

    /**
     * The name of the default queue.
     *
     * @var string
     */
    protected $default;

    /**
     * The expiration time of a job.
     *
     * @var int|null
     */
    protected $retryAfter = 60;

    /**
     * The maximum number of seconds to block for a job.
     *
     * @var int|null
     */
    protected $blockFor = null;

    public function __construct($rabbit, $default = 'default', $retryAfter = 60, $blockFor = null)
    {
        $this->rabbit      = $rabbit;
        $this->default    = $default;
        $this->retryAfter = $retryAfter;
        $this->blockFor   = $blockFor;
    }

    public static function __make($config)
    {
        $rabbit = new class($config)
        {
            protected $config;
            protected $client;

            public function __construct($config)
            {
                $this->config = $config;
                $this->client = $this->createClient();
            }

            protected function createClient()
            {
                $config = $this->config;
                $factory = new AmqpConnectionFactory([
                    'host' => $config['host'] ?? '',
                    'port' => $config['port'] ?? '5672',
                    'vhost' => $config['vhost'] ?? '/',
                    'user' => $config['user'] ?? 'guest',
                    'pass' => $config['pass'] ?? 'guest',
                    'persisted' => $config['persisted'] ?? false,
                ]);

                $client = $factory->createContext();
                return $client;
            }

            public function __call($name, $arguments)
            {
                try {
                    return call_user_func_array([$this->client, $name], $arguments);
                } catch (AMQPIOException $e) {
                    if (strpos($e->getMessage(), 'went away') !== false) {
                        $this->client = $this->createClient();
                    }

                    throw $e;
                }
            }
        };

        return new self($rabbit, $config['queue'], $config['retry_after'] ?? 60, $config['block_for'] ?? null);
    }
    public function size($queue = null)
    {
        return null;
    }
    public function  push($job, $data = '', $queue = null)
    {
        return $this->pushRaw($this->createPayload($job, $data), $queue);
    }

    public function pushRaw($payload, $queue = null, array $options = [])
    {
        $queue_name = $this->getQueue($queue);
        $context = $this->rabbit;
        // 创建路由
        $fooTopic = $context->createTopic('tp_rabbbit');
        $fooTopic->setType(AmqpTopic::TYPE_FANOUT);
        $context->declareTopic($fooTopic);
        // 创建队列
        $fooQueue = $context->createQueue($queue_name);
        $fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
        $context->declareQueue($fooQueue);
        $context->bind(new AmqpBind($fooTopic, $fooQueue));
        //  发送信息
        $message = $context->createMessage($payload);
        $context->createProducer()
            ->send($fooTopic, $message);
    }
    public function later($delay, $job, $data = '', $queue = null)
    {
        return $this->laterRaw($delay, $this->createPayload($job, $data), $queue);
    }
    protected function laterRaw($delay, $payload, $queue = null)
    {
        $queue_name = $this->getQueue($queue);
        $context = $this->rabbit;
        // 创建路由
        $fooTopic = $context->createTopic('tp_rabbbit');
        $fooTopic->setType(AmqpTopic::TYPE_FANOUT);
        $context->declareTopic($fooTopic);
        // 创建队列
        $fooQueue = $context->createQueue($queue_name);
        $fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
        $context->declareQueue($fooQueue);
        $context->bind(new AmqpBind($fooTopic, $fooQueue));
        //  发送信息
        $message = $context->createMessage($payload);
        $context->createProducer()
            ->setDelayStrategy(new RabbitMqDlxDelayStrategy())
            ->setDeliveryDelay($delay * 1000)
            ->send($fooTopic, $message);
    }
    // 获取一个队列任务 然后返回RabbitMqJob()
    public function pop($queue = null)
    {
        $context = $this->rabbit;
        $queue_name = $this->getQueue($queue);
        $fooQueue = $context->createQueue($queue_name);
        $consumer = $context->createConsumer($fooQueue);
        $message = $consumer->receive();


        $job = [
            'message' => $message,
            'consumer' => $consumer,
        ];
        $body = $message->getBody();
        $reserved = false;
        if ($body) {
            $reserved = json_decode((string)$body, true);

            $reserved['attempts'] = ($reserved['attempts'] ?? 0) + 1;
            $reserved = json_encode($reserved);
            $message->setBody($reserved);
        }
        return new RabbitMqJob($this->app, $this, $job, $reserved, $this->connection, $queue);
    }
    public function getQueue($queue)
    {
        $queue = $queue ?: $this->default;
        return "{queues:{$queue}}";
    }
    /**
     * 重新发布任务
     *
     * @param string $queue
     * @param StdClass $job
     * @param int $delay
     * @return mixed
     */
    public function release($queue, $job, $delay)
    {
        throw new \Exception('自行实现');
    }
}

job实现

<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <[email protected]>
// +----------------------------------------------------------------------

namespace app\controller;

use think\App;
use app\controller\RabbitMq as RabbitMqQueue;
use think\queue\Job;

class RabbitMqJob extends Job
{

    /**
     * The rabbit queue instance.
     */
    protected $rabbit;

    /**
     * The database job payload.
     * @var Object
     */
    protected $job;

    /**
     * The JSON decoded version of "$job".
     *
     * @var array
     */
    protected $decoded;

    /**
     * The Redis job payload inside the reserved queue.
     *
     * @var string
     */
    protected $reserved;

    public function __construct(App $app, RabbitMqQueue $rabbit, $job, $reserved, $connection, $queue)
    {
        $this->app        = $app;
        $this->job        = $job;
        $this->queue      = $queue;
        $this->connection = $connection;
        $this->rabbit      = $rabbit;
        $this->reserved   = $reserved;

        $this->decoded = $this->payload();
    }

    /**
     * Get the job identifier.
     *
     * @return string
     */
    public function getJobId()
    {
        return $this->decoded['id'] ?? null;
    }

    /**
     * Get the number of times the job has been attempted.
     * @return int
     */
    public function attempts()
    {
        return ($this->decoded['attempts'] ?? null) + 1;
    }

    /**
     * Get the raw body string for the job.
     * @return string
     */
    public function getRawBody()
    {
        return $this->job['message']->getBody();
    }
    /**
     * 删除任务
     * @return void
     */
    public function delete()
    {
        parent::delete();
        $this->job['consumer']->acknowledge($this->job['message']);
    }
    // 重新发布
    public function release($delay = 0)
    {
        parent::release($delay);
        $this->delete();
        $this->rabbit->push($this->queue, $this->job, $delay);
    }
}