php 调用 rabbitmq

1、安装 rabbitmq

Rabbit MQ 依赖 Erlang,需要从 Rabbit MQ 提供的仓库安装以确保兼容性:

curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash

安装 Erlang:

sudo yum install erlang

添加 Rabbit MQ 仓库:

curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash

安装 Rabbit MQ:

sudo yum install rabbitmq-server

装完成后,启动 Rabbit MQ 服务并设置为开机自启:

sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server

启动之后,可以通过浏览器访问管理界面,地址为 http://IP:15672,默认用户名和密码均为 “guest”。建议为安全起见更改默认凭据。 如果无法访问,可能是没有启用管理插件,使用命令启动就可以了

rabbitmq-plugins enable rabbitmq_management

如果需要其他服务器访问,可以在防火墙上允许:5672 和 15672 端口

firewall-cmd --permanent --add-port=5672/tcp
firewall-cmd --permanent --add-port=15672/tcp
firewall-cmd --reload

2、php 调用

这里使用的库是 workerman/rabbitmq,因为需要读取 rabbitmq 的队列,所以需要一直在后台开启一个 php 的进程,使用 workerman 比较方便 使用 composer 安装:composer require workerman/rabbitmq

发送一条消息到 rabbitmq

// sender.php
require_once __DIR__ . '/vendor/autoload.php';

use Bunny\Client;
use Bunny\Exception\BunnyException;

function sendRabbitMQMessage(string $message, string $queue = 'my_queue', array $config = []) {
    $defaultConfig = [
        'host' => '127.0.0.1',
        'vhost' => '/',
        'user' => 'user',
        'password' => 'pwd',
        'port' => 5672
    ];
    $config = array_merge($defaultConfig, $config);
    $client = null;
    $channel = null;
    try {
        // 创建并连接客户端
        $client = new Client($config);
        $client->connect();
        $channel = $client->channel();
        // 不强制声明队列,仅在需要时发送消息
        // 如果队列已存在,直接使用,避免参数冲突
        $channel->publish($message, [], '', $queue);
        // 正常关闭连接
        $channel->close();
        $client->disconnect();
        return true;
    } catch (BunnyException $e) {
        exit("RabbitMQ Send Error: " . $e->getMessage());
        // 确保在异常情况下清理资源
        if ($channel) {
            try {
                $channel->close();
            } catch (Exception $ce) {
                exit("Channel close error: " . $ce->getMessage());
            }
        }
        if ($client && $client->isConnected()) {
            try {
                $client->disconnect();
            } catch (Exception $de) {
                exit("Client disconnect error: " . $de->getMessage());
            }
        }
        return false;
    }
}

sendRabbitMQMessage('hello');

接收消息

// consumer.php

use Bunny\Channel;
use Bunny\Message;
use Workerman\Worker;
use Workerman\RabbitMQ\Client;

require __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->count = 10;
$worker->name = 'article_push';

$worker->onWorkerStart = function() {
  function sendPostRequest($url,$data,$header='',$auth=''){
    $ch = curl_init();
    if($header==''){
      $header = array("Accept-Charset"=>"utf-8");
    }
    curl_setopt($ch, CURLOPT_URL, $url);
    curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "POST");
    curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, FALSE);
    curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, FALSE);
    curl_setopt($ch, CURLOPT_HTTPHEADER, $header);
    curl_setopt($ch, CURLOPT_FOLLOWLOCATION, 1);
    curl_setopt($ch, CURLOPT_AUTOREFERER, 1);
    curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
    curl_setopt($ch, CURLOPT_POSTFIELDS, $data);
    $temp = curl_exec($ch);
    if($temp===false)trigger_error(curl_error($ch));
    return $temp;
  }

  $options = array(
    "host" => "127.0.0.1",
    "port" => 5672,
    "vhost" => "/",
    "mechanism" => "AMQPLAIN",
    "user" => "user",
    "password" => "pwd",
    "timeout" => 10,
    "heartbeat" => 60,
    "heartbeat_callback" => function(){},
  );
  (new Client($options))
  ->connect()
  ->then(function (Client $client) {
    return $client->channel();
  })
  ->then(function (Channel $channel) {
    return $channel->queueDeclare('my_queue', false, false, false, false)->then(function () use ($channel) {
      return $channel;
    });
  })
  ->then(function (Channel $channel) {
    $id = $GLOBALS['pid'] = posix_getpid();
    echo " [".$id."]  Waiting for messages. To exit press CTRL+C", "\n";
    $channel->consume(
      function (Message $message, Channel $channel, Client $client) {
        echo " 【".$GLOBALS['pid']."】 from rabbitmq: ", $message->content, "\n\n";
        $load = json_decode($message->content, true);
        $post_data = json_encode(array("data" => $load['data']));
        echo "\e[32m 【".$GLOBALS['pid']."】from api:".sendPostRequest($load['api'], $post_data)." \e[0m \n";
      },
      'my_queue',
      '',
      false,
      true
    );
  });
};
Worker::runAll();

consumer.php 需要一直运行,使用命令:php consumer.php start . mode -d 让它在后台运行,一直接收消息 功能: 接收到消息后调用消息里面的 url 地址,发送 data 参数