php 调用 rabbitmq
1、安装 rabbitmq
Rabbit MQ 依赖 Erlang,需要从 Rabbit MQ 提供的仓库安装以确保兼容性:
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
安装 Erlang:
sudo yum install erlang
添加 Rabbit MQ 仓库:
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
安装 Rabbit MQ:
sudo yum install rabbitmq-server
装完成后,启动 Rabbit MQ 服务并设置为开机自启:
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
启动之后,可以通过浏览器访问管理界面,地址为 http://IP:15672,默认用户名和密码均为 “guest”。建议为安全起见更改默认凭据。 如果无法访问,可能是没有启用管理插件,使用命令启动就可以了
rabbitmq-plugins enable rabbitmq_management
如果需要其他服务器访问,可以在防火墙上允许:5672 和 15672 端口
firewall-cmd --permanent --add-port=5672/tcp
firewall-cmd --permanent --add-port=15672/tcp
firewall-cmd --reload
2、php 调用
这里使用的库是 workerman/rabbitmq,因为需要读取 rabbitmq 的队列,所以需要一直在后台开启一个 php 的进程,使用 workerman 比较方便 使用 composer 安装:composer require workerman/rabbitmq
发送一条消息到 rabbitmq
// sender.php
require_once __DIR__ . '/vendor/autoload.php';
use Bunny\Client;
use Bunny\Exception\BunnyException;
function sendRabbitMQMessage(string $message, string $queue = 'my_queue', array $config = []) {
$defaultConfig = [
'host' => '127.0.0.1',
'vhost' => '/',
'user' => 'user',
'password' => 'pwd',
'port' => 5672
];
$config = array_merge($defaultConfig, $config);
$client = null;
$channel = null;
try {
// 创建并连接客户端
$client = new Client($config);
$client->connect();
$channel = $client->channel();
// 不强制声明队列,仅在需要时发送消息
// 如果队列已存在,直接使用,避免参数冲突
$channel->publish($message, [], '', $queue);
// 正常关闭连接
$channel->close();
$client->disconnect();
return true;
} catch (BunnyException $e) {
exit("RabbitMQ Send Error: " . $e->getMessage());
// 确保在异常情况下清理资源
if ($channel) {
try {
$channel->close();
} catch (Exception $ce) {
exit("Channel close error: " . $ce->getMessage());
}
}
if ($client && $client->isConnected()) {
try {
$client->disconnect();
} catch (Exception $de) {
exit("Client disconnect error: " . $de->getMessage());
}
}
return false;
}
}
sendRabbitMQMessage('hello');
接收消息
// consumer.php
use Bunny\Channel;
use Bunny\Message;
use Workerman\Worker;
use Workerman\RabbitMQ\Client;
require __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->count = 10;
$worker->name = 'article_push';
$worker->onWorkerStart = function() {
function sendPostRequest($url,$data,$header='',$auth=''){
$ch = curl_init();
if($header==''){
$header = array("Accept-Charset"=>"utf-8");
}
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "POST");
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, FALSE);
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, FALSE);
curl_setopt($ch, CURLOPT_HTTPHEADER, $header);
curl_setopt($ch, CURLOPT_FOLLOWLOCATION, 1);
curl_setopt($ch, CURLOPT_AUTOREFERER, 1);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_POSTFIELDS, $data);
$temp = curl_exec($ch);
if($temp===false)trigger_error(curl_error($ch));
return $temp;
}
$options = array(
"host" => "127.0.0.1",
"port" => 5672,
"vhost" => "/",
"mechanism" => "AMQPLAIN",
"user" => "user",
"password" => "pwd",
"timeout" => 10,
"heartbeat" => 60,
"heartbeat_callback" => function(){},
);
(new Client($options))
->connect()
->then(function (Client $client) {
return $client->channel();
})
->then(function (Channel $channel) {
return $channel->queueDeclare('my_queue', false, false, false, false)->then(function () use ($channel) {
return $channel;
});
})
->then(function (Channel $channel) {
$id = $GLOBALS['pid'] = posix_getpid();
echo " [".$id."] Waiting for messages. To exit press CTRL+C", "\n";
$channel->consume(
function (Message $message, Channel $channel, Client $client) {
echo " 【".$GLOBALS['pid']."】 from rabbitmq: ", $message->content, "\n\n";
$load = json_decode($message->content, true);
$post_data = json_encode(array("data" => $load['data']));
echo "\e[32m 【".$GLOBALS['pid']."】from api:".sendPostRequest($load['api'], $post_data)." \e[0m \n";
},
'my_queue',
'',
false,
true
);
});
};
Worker::runAll();
consumer.php 需要一直运行,使用命令:php consumer.php start . mode -d 让它在后台运行,一直接收消息 功能: 接收到消息后调用消息里面的 url 地址,发送 data 参数