php实现推送消息推送消息
PHP 实现消息推送的方法
使用 WebSocket 实现实时推送
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,适合实现实时消息推送。
安装 Ratchet 库(WebSocket 实现):
composer require cboden/ratchet
创建 WebSocket 服务器:
use Ratchet\Server\IoServer;
use Ratchet\Http\HttpServer;
use Ratchet\WebSocket\WsServer;
use MyApp\Chat;
require dirname(__DIR__) . '/vendor/autoload.php';
$server = IoServer::factory(
new HttpServer(
new WsServer(
new Chat()
)
),
8080
);
$server->run();
创建聊天处理类:
namespace MyApp;
use Ratchet\MessageComponentInterface;
use Ratchet\ConnectionInterface;
class Chat implements MessageComponentInterface {
protected $clients;
public function __construct() {
$this->clients = new \SplObjectStorage;
}
public function onOpen(ConnectionInterface $conn) {
$this->clients->attach($conn);
}
public function onMessage(ConnectionInterface $from, $msg) {
foreach ($this->clients as $client) {
$client->send($msg);
}
}
public function onClose(ConnectionInterface $conn) {
$this->clients->detach($conn);
}
public function onError(ConnectionInterface $conn, \Exception $e) {
$conn->close();
}
}
使用 Server-Sent Events (SSE)
SSE 是 HTML5 标准的一部分,允许服务器向浏览器推送更新。
PHP 服务端实现:
header('Content-Type: text/event-stream');
header('Cache-Control: no-cache');
header('Connection: keep-alive');
function sendMsg($id, $msg) {
echo "id: $id" . PHP_EOL;
echo "data: $msg" . PHP_EOL;
echo PHP_EOL;
ob_flush();
flush();
}
while(true) {
$serverTime = time();
sendMsg($serverTime, 'server time: ' . date("h:i:s", $serverTime));
sleep(1);
}
客户端 JavaScript 代码:
const evtSource = new EventSource("sse.php");
evtSource.onmessage = function(e) {
console.log(e.data);
};
使用第三方推送服务
Firebase Cloud Messaging (FCM) 实现:
function sendFCMNotification($token, $title, $body) {
$url = 'https://fcm.googleapis.com/fcm/send';
$fields = [
'to' => $token,
'notification' => [
'title' => $title,
'body' => $body
]
];
$headers = [
'Authorization: key=YOUR_SERVER_KEY',
'Content-Type: application/json'
];
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_POST, true);
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($fields));
$result = curl_exec($ch);
curl_close($ch);
return $result;
}
使用长轮询技术
PHP 长轮询实现:
// poll.php
$lastEventId = isset($_GET['lastEventId']) ? $_GET['lastEventId'] : 0;
while(true) {
$newData = checkForNewData($lastEventId);
if($newData) {
echo json_encode($newData);
break;
}
sleep(2);
}
function checkForNewData($lastId) {
// 查询数据库或其他数据源
// 返回比$lastId新的数据
}
客户端 JavaScript:
function longPoll() {
fetch('poll.php?lastEventId=' + lastId)
.then(response => response.json())
.then(data => {
// 处理数据
lastId = data.id;
longPoll();
});
}
longPoll();
使用 MQTT 协议
安装 PHP MQTT 客户端:
composer require bluerhinos/phpmqtt
发布消息示例:
require("phpMQTT.php");
$mqtt = new phpMQTT("mqtt.example.com", 1883, "phpClient");
if ($mqtt->connect()) {
$mqtt->publish("topic", "message", 0);
$mqtt->close();
}
数据库触发推送
结合数据库和 WebSocket 的推送方案:
// 当数据库有新记录时
function onNewDatabaseRecord($record) {
$context = new ZMQContext();
$socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'pusher');
$socket->connect("tcp://localhost:5555");
$socket->send(json_encode($record));
}
WebSocket 服务器端接收:
$context = new ZMQContext();
$receiver = $context->getSocket(ZMQ::SOCKET_PULL);
$receiver->bind("tcp://0.0.0.0:5555");
while(true) {
$msg = $receiver->recv();
foreach($this->clients as $client) {
$client->send($msg);
}
}
每种方法都有其适用场景,选择取决于项目需求、实时性要求和基础设施支持。WebSocket 适合高实时性场景,SSE 适合简单推送需求,第三方服务适合移动应用推送,长轮询兼容性最好但效率较低,MQTT 适合物联网场景。





