php 实现mapreduce
实现 MapReduce 的基本思路
MapReduce 是一种分布式计算模型,核心思想是将任务分解为 map 和 reduce 两个阶段。在 PHP 中可以通过多进程或模拟分片的方式实现简化版本。
使用 PHP 多进程模拟 MapReduce
PHP 的 pcntl_fork 可以创建子进程,模拟分布式计算的分片处理:
function mapReduce(array $data, callable $mapper, callable $reducer) {
$pipes = [];
$processes = [];
$chunks = array_chunk($data, ceil(count($data) / 4)); // 分成4个分片
foreach ($chunks as $i => $chunk) {
$pipe = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);
$pid = pcntl_fork();
if ($pid == -1) {
die("Could not fork");
} elseif ($pid) { // 父进程
fclose($pipe[0]);
$processes[$pid] = $pipe[1];
} else { // 子进程
fclose($pipe[1]);
$result = array_map($mapper, $chunk);
fwrite($pipe[0], serialize($result));
exit;
}
}
// 收集结果
$mapped = [];
foreach ($processes as $pid => $pipe) {
pcntl_waitpid($pid, $status);
$mapped = array_merge($mapped, unserialize(stream_get_contents($pipe)));
fclose($pipe);
}
return $reducer($mapped);
}
示例:词频统计
$texts = ["hello world", "hello php", "mapreduce test"];
$mapper = function($text) {
$words = explode(' ', strtolower($text));
return array_count_values($words);
};
$reducer = function($mapped) {
$result = [];
foreach ($mapped as $counts) {
foreach ($counts as $word => $count) {
$result[$word] = ($result[$word] ?? 0) + $count;
}
}
return $result;
};
print_r(mapReduce($texts, $mapper, $reducer));
使用 Gearman 分布式任务系统
对于生产环境,推荐使用 Gearman 实现真正的分布式计算:

-
安装 Gearman 扩展和服务
pecl install gearman sudo apt-get install gearman-job-server -
Worker 端代码

$worker = new GearmanWorker(); $worker->addServer();
$worker->addFunction("wordcount_map", function($job) { $text = $job->workload(); $words = explode(' ', strtolower($text)); return json_encode(array_count_values($words)); });
$worker->addFunction("wordcount_reduce", function($job) { $counts = json_decode($job->workload(), true); $result = []; foreach ($counts as $partial) { foreach ($partial as $word => $cnt) { $result[$word] = ($result[$word] ?? 0) + $cnt; } } return json_encode($result); });
while ($worker->work());
3. Client 端代码
```php
$client = new GearmanClient();
$client->addServer();
$texts = ["hello world", "hello php", "mapreduce test"];
$jobs = [];
foreach ($texts as $text) {
$jobs[] = $client->doBackground("wordcount_map", $text);
}
$partials = [];
foreach ($jobs as $handle) {
while (!$client->jobStatus($handle)['known']) {
usleep(1000);
}
$partials[] = json_decode($client->data($handle), true);
}
$result = $client->do("wordcount_reduce", json_encode($partials));
print_r(json_decode($result, true));
注意事项
- PHP 多进程方案适合单机伪分布式场景
- 真实分布式环境建议使用专用框架如 Hadoop 或 Spark
- 内存管理需谨慎,大数据集应分片处理
- 错误处理和任务监控是生产环境必备功能






