Rabbitmq的安装与使用
安装RabbitmqPHP操作使用rabbitmq发送邮件安装php-amqplib扩展使用php-amqplib将邮件发送的任务压入消息队列使用thinkphp5.1 生成消费者指令使用supervisor管理消费者进程
安装Rabbitmq
为快速使用rabbitmq进行项目的开发,这里直接使用了docker镜像。链接: rabbitmq docker使用。关于docker的使用,这里不作过多讲解。
首先拉取带管理功能的rabbitmq镜像: docker pull rabbitmq:3-management
查看本地镜像: docker images
可以看到本地存在的docker镜像,如图: 执行命令运行rabbitmq镜像: docker run -d --hostname my-rabbit --name my-rabbit -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:3-management
需要注意,这里映射的宿主机端口,需要运营商后台启用该端口。
参数说明:
-p 端口映射-e RABBITMQ_DEFAULT_USER=user 设置用户名-e RABBITMQ_DEFAULT_PASS=123456 设置密码
运行成功后,可通过宿主机地址:15672访问rabbitmq控制面板,如图:
PHP操作使用rabbitmq发送邮件
安装php-amqplib扩展
首先安装对应的扩展php-amqplib,使用php依赖管理工具composer可轻松完成。
搜索php-amqplib: composer search php-amqplib
安装php-amqplib: composer require php-amqplib/php-amqplib
使用php-amqplib将邮件发送的任务压入消息队列
function email_queue($config, $arr)
{
// 设置交换机名
$exchangeName = $config['exchange_name'];
// 设置队列名
$queueName = $config['queue_name'];
// 设置路由关键字(可不设置)
$routeKey = $config['route_key'];
// 读取rabbitmq服务器配置并获取连接
$config = \think\facade\Config::get('rabbitmq');
$connection = new PhpAmqpLib\Connection\AMQPStreamConnection($config['host'], $config['port'], $config['username'], $config['password']);
try {
// 获取通道
$channel = $connection->channel();
// 声明交换机
$channel->exchange_declare($exchangeName, 'direct', false, true, false);
// 声明队列
$channel->queue_declare($queueName, false, false, false, false);
// 绑定队列与交换机
$channel->queue_bind($queueName, $exchangeName, $routeKey);
foreach ($arr as $value){
$msgBody = json_encode($value);
$msg = new PhpAmqpLib\Message\AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]); //生成消息
$channel->basic_publish($msg, $exchangeName, $routeKey); //推送消息到某个交换机
}
$channel->close();
$connection->close();
return ['status'=>true, 'msg'=>'success'];
} catch (\AMQPConnectionException $e) {
return ['status'=>false, 'msg'=>$e->getMessage()];
}
}
使用thinkphp5.1 生成消费者指令
<?php
namespace app\common\command;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\Output;
use think\facade\Config;
class UserInvite extends Command
{
/**
* configure
*/
protected function configure()
{
$this->setName('user_invite')
->addArgument('name', Argument::OPTIONAL, "your name")
->setDescription('Invite User');
}
/**
* execute
*
* @param Input $input
* @param Output $output
* @return int|void|null
* @throws \ErrorException
*/
protected function execute(Input $input, Output $output)
{
// 与消息发送一致
$exchangeName = 'user_invite';
$queueName = 'user_invite';
$routeKey = 'user_invite';
$config = Config::get('rabbitmq');
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['username'], $config['password']);
$channel = $connection->channel();
$channel->exchange_declare($exchangeName, 'direct', false, true, false);
$channel->queue_declare($queueName, false, false, false, false);
$channel->queue_bind($queueName, $exchangeName, $routeKey);
$output->writeln(' [*] Waiting for messages. To exit press CTRL+C', "\n");
$channel->basic_qos(null, 1, null); // 设置消息推送至空闲的消费者
$channel->basic_consume($queueName, '', false, false, false, false, function ($message){ // 消费任务
$arr = json_decode($message->body, true);
$res = send_email($arr['address'], $arr['title'], $arr['content']); // 调用发送邮件的方法
if ($res['status'] === true){
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); // 销毁已消费的数据
}
});
register_shutdown_function(function ($channel, $connection){
$channel->close();
$connection->close();
}, $channel, $connection);
while ($channel ->is_consuming()) {
$channel->wait();
}
}
}
在应用目录下的command.php定义该指令:
<?php
return [
'user_invite' => 'app\common\command\UserInvite',
];
这样就可以在项目根目录下使用php think user_invite运行了。
使用supervisor管理消费者进程
supervisor是一个python开发的进程管理工具,首先需要安装supervisor,在linux下,可使用yum install supervisor或apt-get install supervisor快速安装。
开启服务:supervisord -c /etc/supervisord.conf
安装完成后,进入/etc/supervisor目录,找到conf.d目录,编写需要管理的子进程,如图: 配置参数说明: numprocs:启动进程数 autostart/autorestart :自启动 commnd:执行命令行 process_name:进程名 完成配置后,使用supervisorctl reload启动配置即可。 到此为止,就完成了rabbitmq的一个简单应用。但由于php-amqplib扩展是纯php编写的,在执行效率上会有一定的耗时,关于对此的优化,将在下一文章中阐述。