php实现pipeline
PHP 实现 Pipeline 模式
Pipeline 模式是一种将任务分解为多个步骤并按顺序执行的架构模式。每个步骤处理输入并传递给下一个步骤。以下是 PHP 中实现 Pipeline 模式的几种方法。
基础实现:使用闭包和数组
通过闭包和数组构建一个简单的 Pipeline,逐步传递数据:
$pipeline = [
function ($input) { return $input * 2; },
function ($input) { return $input + 10; },
function ($input) { return $input / 3; }
];
$result = array_reduce(
$pipeline,
function ($carry, $step) { return $step($carry); },
5 // 初始值
);
echo $result; // 输出: (5 * 2 + 10) / 3 = 6.666...
面向对象实现:Pipeline 类
封装一个 Pipeline 类,支持动态添加步骤和链式调用:
class Pipeline
{
private $steps = [];
public function add(callable $step): self
{
$this->steps[] = $step;
return $this;
}
public function execute($input)
{
return array_reduce(
$this->steps,
function ($carry, $step) { return $step($carry); },
$input
);
}
}
// 使用示例
$pipeline = (new Pipeline())
->add(fn($x) => $x * 2)
->add(fn($x) => $x + 10)
->add(fn($x) => $x / 3);
echo $pipeline->execute(5); // 输出: 6.666...
中间件风格实现
结合中间件模式,支持步骤间更复杂的逻辑(如提前终止):
class MiddlewarePipeline
{
private $steps = [];
public function add(callable $step): self
{
$this->steps[] = $step;
return $this;
}
public function execute($input)
{
$next = function ($input) { return $input; }; // 默认终止函数
foreach (array_reverse($this->steps) as $step) {
$next = function ($input) use ($step, $next) {
return $step($input, $next);
};
}
return $next($input);
}
}
// 使用示例
$pipeline = (new MiddlewarePipeline())
->add(function ($input, $next) {
if ($input > 100) return "Too large";
return $next($input * 2);
})
->add(function ($input, $next) {
return $next($input + 10);
});
echo $pipeline->execute(50); // 输出: 110
echo $pipeline->execute(200); // 输出: "Too large"
Laravel 风格的 Pipeline
参考 Laravel 框架的实现方式,支持依赖注入和更灵活的处理器:

class LaravelPipeline
{
private $steps = [];
private $container;
public function __construct($container = null)
{
$this->container = $container;
}
public function through(array $steps): self
{
$this->steps = $steps;
return $this;
}
public function execute($input)
{
return array_reduce(
array_reverse($this->steps),
$this->carry(),
function ($input) { return $input; }
)($input);
}
private function carry()
{
return function ($stack, $step) {
return function ($input) use ($stack, $step) {
if (is_callable($step)) {
return $step($input, $stack);
} elseif (is_string($step) && $this->container) {
$step = $this->container->make($step);
}
return method_exists($step, 'handle')
? $step->handle($input, $stack)
: $step($input, $stack);
};
};
}
}
// 使用示例
$pipeline = (new LaravelPipeline())
->through([
function ($input, $next) { return $next($input * 2); },
function ($input, $next) { return $next($input + 10); },
]);
echo $pipeline->execute(5); // 输出: 20
关键点总结
- 闭包链式调用:通过
array_reduce实现步骤的连续执行 - 中间件支持:允许步骤控制是否继续传递(如权限检查)
- 依赖注入:可通过容器解析步骤的类实例
- 类型灵活性:支持闭包、类名或对象作为处理步骤
根据项目复杂度选择合适实现,简单场景用基础版本,复杂需求可参考 Laravel 风格实现。






