初尝消息队列(2)

    xiaoxiao2022-07-02  108

    Rabbitmq的安装与使用

    安装RabbitmqPHP操作使用rabbitmq发送邮件安装php-amqplib扩展使用php-amqplib将邮件发送的任务压入消息队列使用thinkphp5.1 生成消费者指令使用supervisor管理消费者进程

    安装Rabbitmq

    为快速使用rabbitmq进行项目的开发,这里直接使用了docker镜像。链接: rabbitmq docker使用。关于docker的使用,这里不作过多讲解。

    首先拉取带管理功能的rabbitmq镜像: docker pull rabbitmq:3-management

    查看本地镜像: docker images

    可以看到本地存在的docker镜像,如图: 执行命令运行rabbitmq镜像: docker run -d --hostname my-rabbit --name my-rabbit -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:3-management

    需要注意,这里映射的宿主机端口,需要运营商后台启用该端口。

    参数说明:

    -p 端口映射-e RABBITMQ_DEFAULT_USER=user 设置用户名-e RABBITMQ_DEFAULT_PASS=123456 设置密码

    运行成功后,可通过宿主机地址:15672访问rabbitmq控制面板,如图:

    PHP操作使用rabbitmq发送邮件

    安装php-amqplib扩展

    首先安装对应的扩展php-amqplib,使用php依赖管理工具composer可轻松完成。

    搜索php-amqplib: composer search php-amqplib

    安装php-amqplib: composer require php-amqplib/php-amqplib

    使用php-amqplib将邮件发送的任务压入消息队列

    function email_queue($config, $arr) { // 设置交换机名 $exchangeName = $config['exchange_name']; // 设置队列名 $queueName = $config['queue_name']; // 设置路由关键字(可不设置) $routeKey = $config['route_key']; // 读取rabbitmq服务器配置并获取连接 $config = \think\facade\Config::get('rabbitmq'); $connection = new PhpAmqpLib\Connection\AMQPStreamConnection($config['host'], $config['port'], $config['username'], $config['password']); try { // 获取通道 $channel = $connection->channel(); // 声明交换机 $channel->exchange_declare($exchangeName, 'direct', false, true, false); // 声明队列 $channel->queue_declare($queueName, false, false, false, false); // 绑定队列与交换机 $channel->queue_bind($queueName, $exchangeName, $routeKey); foreach ($arr as $value){ $msgBody = json_encode($value); $msg = new PhpAmqpLib\Message\AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]); //生成消息 $channel->basic_publish($msg, $exchangeName, $routeKey); //推送消息到某个交换机 } $channel->close(); $connection->close(); return ['status'=>true, 'msg'=>'success']; } catch (\AMQPConnectionException $e) { return ['status'=>false, 'msg'=>$e->getMessage()]; } }

    使用thinkphp5.1 生成消费者指令

    <?php namespace app\common\command; use PhpAmqpLib\Connection\AMQPStreamConnection; use think\console\Command; use think\console\Input; use think\console\input\Argument; use think\console\Output; use think\facade\Config; class UserInvite extends Command { /** * configure */ protected function configure() { $this->setName('user_invite') ->addArgument('name', Argument::OPTIONAL, "your name") ->setDescription('Invite User'); } /** * execute * * @param Input $input * @param Output $output * @return int|void|null * @throws \ErrorException */ protected function execute(Input $input, Output $output) { // 与消息发送一致 $exchangeName = 'user_invite'; $queueName = 'user_invite'; $routeKey = 'user_invite'; $config = Config::get('rabbitmq'); $connection = new AMQPStreamConnection($config['host'], $config['port'], $config['username'], $config['password']); $channel = $connection->channel(); $channel->exchange_declare($exchangeName, 'direct', false, true, false); $channel->queue_declare($queueName, false, false, false, false); $channel->queue_bind($queueName, $exchangeName, $routeKey); $output->writeln(' [*] Waiting for messages. To exit press CTRL+C', "\n"); $channel->basic_qos(null, 1, null); // 设置消息推送至空闲的消费者 $channel->basic_consume($queueName, '', false, false, false, false, function ($message){ // 消费任务 $arr = json_decode($message->body, true); $res = send_email($arr['address'], $arr['title'], $arr['content']); // 调用发送邮件的方法 if ($res['status'] === true){ $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); // 销毁已消费的数据 } }); register_shutdown_function(function ($channel, $connection){ $channel->close(); $connection->close(); }, $channel, $connection); while ($channel ->is_consuming()) { $channel->wait(); } } }

    在应用目录下的command.php定义该指令:

    <?php return [ 'user_invite' => 'app\common\command\UserInvite', ];

    这样就可以在项目根目录下使用php think user_invite运行了。

    使用supervisor管理消费者进程

    supervisor是一个python开发的进程管理工具,首先需要安装supervisor,在linux下,可使用yum install supervisor或apt-get install supervisor快速安装。

    开启服务:supervisord -c /etc/supervisord.conf

    安装完成后,进入/etc/supervisor目录,找到conf.d目录,编写需要管理的子进程,如图: 配置参数说明: numprocs:启动进程数 autostart/autorestart :自启动 commnd:执行命令行 process_name:进程名 完成配置后,使用supervisorctl reload启动配置即可。 到此为止,就完成了rabbitmq的一个简单应用。但由于php-amqplib扩展是纯php编写的,在执行效率上会有一定的耗时,关于对此的优化,将在下一文章中阐述。

    最新回复(0)