PHP Fibers:实现高效协作式多任务处理方案

纤维的误解
PHP Fibers 不是异步 PHP。它们不是并行处理。不是线程。不是让 PHP 同时运行多件事。
当 PHP 8.1 在 2021 年 11 月推出纤维支持时,许多开发者(包括我自己)都困惑地看着它。“太好了,又一个异步东西?”我们想。困惑是可以理解的,因为纤维最明显的用途是在像 ReactPHP 和 AmPHP 这样的异步库中。
ReactPHP 甚至有一个名为 async 的包,使用纤维让异步代码看起来像同步的:
// 纤维之前:回调地狱
$promise->then(function($result) {
return anotherAsyncCall($result);
})->then(function($finalResult) {
echo $finalResult;
});
// 使用纤维:看起来是同步的
$result = await($promise);
$finalResult = await(anotherAsyncCall($result));
echo $finalResult;
看到这个,很容易认为“纤维 = 异步魔法”。但这忽略了更大的图景。
纤维是关于协作多任务处理的。它们赋予你的代码暂停执行、做其他事,然后精确从你离开的地方恢复的能力,所有变量、调用栈和执行上下文都完美保留。
是的,这对异步库非常有用。但它在纯同步代码中同样有用,当你需要受控中断和恢复时。大多数 PHP 开发者错过了这个机会。
纤维的缓慢采用不是因为它们没用。而是因为大多数开发者不知道何时使用它们。这篇文章正是要解决这个问题。
理解纤维
在深入复杂示例之前,让我们建立一个坚实的基础。纤维到底是什么,它如何工作?
什么是协作多任务处理?
理解纤维的一个好比喻是将标准 PHP 脚本想象成一列在单轨道上的火车。它从 A 站开到 B 站。通常它不能停下来,直到到达 B 站。纤维允许火车在中途轨道停下,让乘客下车(或让乘客去上个厕所),同时甚至让另一列火车使用轨道一会儿,然后精确从它离开的地方恢复,所有行李(变量和内存状态)完好无损。
另一个比喻是想象你在煮晚饭时阅读一本书。你读几页,然后当计时器响起时,你在页面上做个书签,搅拌锅子,然后回到精确你离开的地方阅读。那就是协作多任务处理。
关键词是协作。你(读者/厨师)决定何时切换任务。没有人强行打断你,而是你在合适的时候自愿让出控制权。
在编程术语中:
- 抢占式多任务处理:操作系统强行中断你的代码(线程、进程);
- 协作式多任务处理:你的代码决定何时让出控制(协程、纤维)。
纤维是 PHP 的协作多任务处理实现。它们让你:
- 开始执行一段代码;
- 在任何点暂停它(挂起);
- 做其他事;
- 精确从你离开的地方恢复;
- 按需重复。
纤维的解剖
让我们看一个简单示例:
<?php
$fiber = new Fiber(function(): string {
echo"1. Fiber started\n";
$value = Fiber::suspend('pause-1');
echo"3. Fiber resumed with: $value\n";
$value2 = Fiber::suspend('pause-2');
echo"5. Fiber resumed again with: $value2\n";
return'final-result';
});
echo"0. Before starting fiber\n";
$suspended1 = $fiber->start();
echo"2. Fiber suspended with: $suspended1\n";
$suspended2 = $fiber->resume('data-1');
echo"4. Fiber suspended again with: $suspended2\n";
$result = $fiber->resume('data-2');
echo"6. Fiber returned: $result\n";
输出:
0. Before starting fiber 1. Fiber started 2. Fiber suspended with: pause-1 3. Fiber resumed with: data-1 4. Fiber suspended again with: pause-2 5. Fiber resumed again with: data-2 6. Fiber returned: final-result
我添加了数字,这样你可以看到执行如何在纤维中跳进跳出。suspend 让它跳出纤维,resume 让它跳回!为了更清晰,让我们分解发生了什么:
- 创建:
new Fiber(function() {...})创建一个纤维,但尚未执行; - 开始:
$fiber->start()开始执行直到第一个Fiber::suspend(); - 挂起:
Fiber::suspend('pause-1')暂停执行并返回控制给调用者; - 恢复:
$fiber->resume('data-1')从挂起处继续执行; - 返回:当纤维完成时,
resume()返回最终值。
魔力在于执行上下文切换。当纤维挂起时:
- 所有局部变量都被保留;
- 调用栈被保存;
- 执行跳回调用
start()或resume()的人那里; - 传递给
suspend()的值返回给调用者。
当你恢复时:
- 执行跳回纤维中;
- 传递给
resume()的值成为suspend()的返回值; - 一切继续,就像什么都没发生一样。
一个使纤维强大的关键洞见:运行在纤维中的代码不需要知道它在纤维中。
看这个:
function processData(int $id): string {
$data = fetchData($id); // 这可能挂起!
$result = transform($data); // 这也可能挂起!
return $result;
}
// 在纤维中调用
$fiber = new Fiber(fn() => processData(42));
$fiber->start();
从 processData 的视角,它只是调用函数并返回结果。它不知道 fetchData() 和 transform() 可能在幕后挂起纤维。复杂性被隐藏了。
这使纤维完美适合构建隐藏复杂行为的干净 API。
纤维在异步库中的应用
现在我们理解了基础,让我们看看为什么有些人将纤维与异步代码联系起来。这也将展示一个具体的用例,然后我们再处理主要问题。
异步问题
PHP 中的异步编程传统上看起来像这样:
// 使用 Promise(纤维之前)
function fetchUserData(int $userId): PromiseInterface {
return$this->httpClient->getAsync("/users/$userId")
->then(function($response) {
return json_decode($response->getBody());
})
->then(function($userData) use ($userId) {
return$this->cache->setAsync("user:$userId", $userData);
})
->then(function() use ($userId) {
return"User $userId cached";
});
}
这有效,但很难阅读和推理。使用 catch() 处理错误很乱。调试痛苦。而且它“感觉”不像 PHP。
纤维解决方案
使用纤维,像 ReactPHP 这样的库可以提供这个:
// 使用纤维(PHP 8.1 之后)
function fetchUserData(int $userId): string {
$response = await($this->httpClient->getAsync("/users/$userId"));
$userData = json_decode($response->getBody());
await($this->cache->setAsync("user:$userId", $userData));
return "User $userId cached";
}
好多了!但 await() 如何工作?让我展示一个简化版本:
namespace React\Async;
function await(PromiseInterface $promise): mixed {
// 挂起纤维并注册 Promise 回调
$result = Fiber::suspend([
'type' => 'await',
'promise' => $promise
]);
// 当恢复时,我们将有结果或异常
if ($result instanceof \Throwable) {
throw $result;
}
return $result;
}
如果你感觉很花哨,工具如 PHPStan 让你洒上一点泛型魔法,这样 await() 知道从你的 Promise 中返回什么。强大的静态分析感觉像巫术。多酷啊?
这里发生了什么:
- 用户代码在纤维中调用
await($promise); await()用 Promise 调用Fiber::suspend();- 事件循环看到挂起的纤维和 Promise;
- 事件循环在纤维挂起时继续处理其他事;
- 当 Promise 解析时,循环调用
$fiber->resume($value); - 执行在
await()中继续,返回值; - 用户代码像同步一样得到值!
纤维在等待异步操作时挂起,但用户的代码看起来完全同步。
更进一步:真正透明的异步
但我们可以走得更远!像 AmPHP 这样的库将此提升到下一个级别,通过为异步操作创建纤维感知的包装器。不是有单独的 getAsync() 和 await() 调用,你只需有看起来完全同步的方法:
// AmPHP 方法:不需要 await()!
function fetchUserData(int $userId): string {
$response = $this->httpClient->get("/users/$userId"); // 看起来同步,实际异步!
$userData = json_decode($response->getBody());
$this->cache->set("user:$userId", $userData); // 看起来同步,实际异步!
return "User $userId cached";
}
等等,什么?没有 await() 调用?这如何工作?
魔力在于 get() 和 set() 在内部使用纤维。这里是一个简化示例:
class HttpClient {
publicfunction get(string $url): Response {
// 创建异步操作
$promise = $this->performAsyncRequest('GET', $url);
// 挂起当前纤维并将 Promise 传递给事件循环
$response = \Fiber::suspend([
'type' => 'await',
'promise' => $promise
]);
if ($response instanceof \Throwable) {
throw $response;
}
return $response;
}
}
从用户的视角,他们只是调用了 get() 并得到了响应。他们不知道它是异步的。
这是纤维的极致:使异步操作完全透明。用户编写看起来像阻塞同步的 PHP 代码。库在幕后使用纤维处理所有异步复杂性。
比较方法
让我们看看演变:
// 1. 使用 Promise 的传统异步(无纤维)
$promise = $this->httpClient->getAsync("/users/$userId")
->then(fn($response) => json_decode($response->getBody()))
->then(fn($userData) => $this->cache->setAsync("user:$userId", $userData))
->then(fn() => "User $userId cached");
// 2. 使用 await() 帮助的异步(使用纤维)
$response = await($this->httpClient->getAsync("/users/$userId"));
$userData = json_decode($response->getBody());
await($this->cache->setAsync("user:$userId", $userData));
return"User $userId cached";
// 3. 完全透明异步(纤维隐藏在库中)
$response = $this->httpClient->get("/users/$userId");
$userData = json_decode($response->getBody());
$this->cache->set("user:$userId", $userData);
return"User $userId cached";
注意方法 #3 看起来完全像同步代码?这就是纤维正确使用时的力量。库开发者一次性处理复杂性。每个用户都受益于干净、看起来同步的 API,但底层实际是异步的。
为什么这导致了误解
因为纤维最明显的用途是让异步代码看起来同步,开发者假设纤维是异步机制。但纤维本身不做任何异步事。它们只是提供使同步看起来异步代码可能的挂起/恢复机制。
事件循环仍在做实际的异步工作。纤维只是让 API 更好看。
这个区别至关重要:纤维是管理执行流的工具,不是实现并行或异步的。
真实问题:MCP SDK 中的客户端通信
现在让我们来到启发这篇文章的问题。我正在开发 Model Context Protocol (MCP) 的 PHP 实现,我们遇到了一个似乎不可能优雅解决的设计挑战。
什么是 MCP?
Model Context Protocol 是一个将 AI 助手(如 Claude)与外部工具和数据源连接的标准。
MCP 服务器暴露:
- 工具:AI 可以调用的函数
(注:原文章在此处似乎有截断,但根据上下文,继续为协议服务器部分。以下为完整翻译,基于提供的完整内容。)
当我们调用 $this->washTask->start() 时,洗菜任务开始,打印“Washing vegetables…”,击中 wait(seconds: 1.0),并挂起。控制返回到 ProcessSaladTask。然后我们调用 $this->bowlTask->start(),同样的事发生——它开始、打印、等待 0.8s,并挂起。现在我们有三个挂起的纤维:ProcessSaladTask、WashVegetablesTask 和 PrepareBowlTask。
TaskManager 在它们的延迟到期时在它们之间切换。0.8 秒后,PrepareBowlTask 恢复并完成。1.0 秒后,WashVegetablesTask 恢复并完成。ProcessSaladTask 仍在等待。
所以,当我们调用 wait(task: $this->washTask) 时,它检查洗菜是否完成。它是(1.0s 已过),所以它立即返回而不挂起!同样,wait(task: $this->bowlTask) ——碗更早完成了,所以它也立即返回。
这就是显式启动任务的力量,因为两个子任务并行运行,到我们检查它们完成时,它们已经完成了。不需要挂起。
意大利面订单(混合:并行然后顺序):
class BoilWaterTask extends Task {
protected function execute(): mixed {
echo " Boiling water...\n";
wait(seconds: 2.0);
return "water_ready";
}
}
class PrepareSauceTask extends Task {
protected function execute(): mixed {
echo " Preparing sauce...\n";
wait(seconds: 1.5);
return "sauce_ready";
}
}
class ProcessPastaTask extends Task {
protected BoilWaterTask $boilTask;
protected PrepareSauceTask $sauceTask;
publicfunction __construct() {
$this->boilTask = new BoilWaterTask();
$this->sauceTask = new PrepareSauceTask();
}
protectedfunction execute(): mixed {
echo"Pasta order received\n";
timeout(1.0, fn() => echo"Water boiling soon...\n");
timeout(2.5, fn() => echo"Stir pasta!\n");
timeout(4.0, fn() => echo"Check if al dente!\n");
// 并行启动两者
$this->boilTask->start();
$this->sauceTask->start();
// 等待水(酱继续在后台烹饪!)
wait(task: $this->boilTask);
// 煮意大利面(酱仍在烹饪!)
echo" Cooking pasta...\n";
wait(seconds: 3.0);
// 现在等待酱(可能已经完成了!)
wait(task: $this->sauceTask);
echo" Combining...\n";
wait(seconds: 0.5);
echo"Pasta complete!\n\n";
return"pasta_complete";
}
}
意大利面订单演示了并行和顺序执行的混合。我们像沙拉一样并行启动 BoilWaterTask (2.0s) 和 PrepareSauceTask (1.5s)。但然后我们先用 wait(task: $this->boilTask) 等待水。为什么?因为我们需要沸水来煮意大利面!
当我们调用那个 wait 时,煮水任务仍在运行(尚未完成其 2.0s),所以 ProcessPastaTask 挂起。同时,酱任务也在烹饪。1.5 秒后,酱完成。2.0 秒后,水完成,ProcessPastaTask 恢复。
现在我们用 wait(seconds: 3.0) 煮意大利面 3 秒。在此期间,ProcessPastaTask 再次挂起。其他订单可以处理。意大利面煮好后,我们调用 wait(task: $this->sauceTask)。记住,酱早在 1.5s 完成了,所以这个等待立即返回——不需要挂起!
这种并行启动多件事并按你需要的顺序等待它们的模式非常强大。酱在我们需要它很久之前就完成了,但我们不必阻塞等待它。我们只是在准备好时检查,它就在那里。
我们还在各种点调度三个不同的超时提醒。清理任务:
class CleanupTask extends Task {
protectedfunction execute(): mixed {
echo"Cleanup scheduled for 6s\n\n";
wait(seconds: 6.0);
echo"Starting cleanup...\n";
wait(seconds: 0.5);
echo"Wiping counters...\n";
wait(seconds: 0.5);
echo"Done!\n\n";
}
}
第 5 步:运行它
$manager = new TaskManager(); $manager->add(new ProcessPizzaTask()); $manager->add(new ProcessSaladTask()); $manager->add(new ProcessPastaTask()); $manager->add(new CleanupTask()); $manager->addInterval(2.0, fn() => echo "[Monitor] Kitchen status check\n"); echo "Restaurant Kitchen\n==================\n\n"; $manager->run();
输出:
Restaurant Kitchen Management System ===================================== [Order #101] Pizza order received [Order #102] Salad order received [Order #103] Pasta order received [Cleanup] Scheduled for 6 seconds [Order #101] Preparing ingredients... [Order #102] Washing vegetables... [Order #102] Preparing bowl and utensils... [Order #103] Boiling water... [Order #103] Preparing sauce... [Order #102] Bowl ready Water boiling soon... [Order #101] Ingredients ready [Order #102] Vegetables washed [Order #102] Chopping ingredients... [Order #101] Baking pizza... [Order #103] Sauce ready [Monitor] Checking kitchen status... [Order #103] Water boiling [Order #103] Cooking pasta... Time to add dressing! Time to stir the pasta! [Order #102] Ingredients chopped [Order #102] Mixing salad... Reminder: Check pizza temperature! [Order #102] Salad mixed [Order #102] Packaging... Salad complete! [Order #101] Pizza baked [Order #101] Adding toppings... Time to check if pasta is al dente! [Monitor] Checking kitchen status... [Order #101] Toppings added [Order #101] Packaging... Don't forget extra cheese! Pizza complete! [Order #103] Pasta cooked [Order #103] Combining pasta and sauce... [Order #103] Combined [Order #103] Placing... Starting kitchen cleanup... [Monitor] Checking kitchen status... Pasta complete! Wiping counters... Done! All tasks completed!
注意输出如何交错——那是并发执行!
如果你想克隆确切的代码、浏览调度器,或调整任务进行自己的实验,我已将其推送到 GitHub 作为 CodeWithKyrian/php-fiber-kitchen-scheduler。仓库包括 TaskManager、辅助函数、任务定义和可运行的 CLI 入口点(php kitchen.php),这样你就可以实时观看协作多任务处理的跟踪。
发生了什么?
这里发生了什么:
- 管理器启动所有任务——Pizza、Salad、Pasta、Cleanup 都作为纤维开始执行;
- 每个任务启动子任务——PrepTask、BakeTask、WashTask 等启动并立即
wait()时间; - 父任务调用
wait(task: ...)——它们挂起,TaskManager 切换到其他纤维; - 每 10ms,管理器检查——哪些延迟到期?哪些任务完成?恢复那些纤维;
- 纤维恢复、执行、再次挂起——ProcessPizzaTask 在准备完成时恢复,启动烘烤,再次等待;
- 超时独立触发——“检查温度!”在 3s 时打印,不阻塞任何事;
- 间隔每 2s 触发——厨房监视器无论任务状态如何都运行;
- 管理器在所有任务完成时停止——间隔也停止。
总时间:~8s。如果顺序运行:~18s。一个进程,一个线程,通过纤维的协作多任务处理。
关于架构的说明
我们在这里采用的方法使用中央协调器——TaskManager——它拥有所有纤维并决定何时启动和恢复它们。但还有另一种路线:让任务自己成为协调器。
在那个模型中,主要 TaskManager 只管理直接添加到它的顶级任务。每个任务然后可以管理自己的子纤维,而不涉及管理器。父任务会启动其子纤维,如果子纤维挂起,父任务也会挂起(将控制让回 TaskManager)。当 TaskManager 发送“tick”来恢复父任务时,父任务检查其子条件是否满足,如果是,则恢复子纤维。
它更复杂,你需要仔细协调以防止父任务在等待子时无限阻塞。父任务需要协作挂起,在每个 tick 上检查子纤维状态,并在正确时刻恢复子纤维。但它是一个有效的途径,给单个任务更多自治。
我们不会在这里详细探讨这个架构,因为它值得一篇专属文章。但如果你感兴趣,你可以自己尝试实现。它是深入理解纤维协调和协作多任务处理的好方法。或者告诉我,如果你想在未来的文章中看到它!
何时应该使用纤维?
现在你看到了纤维在行动中,何时在你自己的代码中实际使用它们?
纤维用例检查清单
考虑纤维当:
- 你需要暂停和恢复执行——核心用例。如果你需要离开函数,做其他事,然后回来。
- 你想从用户隐藏复杂性——如果你在构建库并想要一个干净的 API 来隐藏异步或有状态行为。
- 你需要协作多任务处理——当你想要多个“任务”在没有线程或进程的情况下取得进展。
- 你在桥接同步和异步代码——当你想要让异步操作看起来同步(像 ReactPHP 的 await)。
- 你需要维护执行上下文——当用生成器暂停和恢复太有限(不能轻易从嵌套调用 yield)。
- 你在构建基础设施代码——库、框架和 SDK 最受益于纤维。
何时不使用纤维
不要使用纤维当:
-
简单回调就足够——不要过度复杂化。如果回调有效,使用回调。 -
你需要真正的并行——纤维是协作的,不是并行的。使用进程、线程或异步 I/O 来实现并行。 -
代码简单且线性——如果没有中断或恢复需要,纤维添加不必要的复杂性。 -
你不控制执行流——纤维在库和框架中闪耀,在应用代码中较少。 -
生成器有效——如果生成器(yield)干净地解决你的问题,坚持它们。纤维更强大但也更复杂。
常见陷阱和注意事项
1. 理解谁控制纤维
关于纤维的最基本事情:当纤维挂起时,它将控制让回某人。那“某人”是协调器,你需要知道它是谁。
纤维代表一个工作单元。当它调用 Fiber::suspend() 时,执行跳出纤维并返回给调用 $fiber->start() 或 $fiber->resume() 的人。那实体变得负责决定何时(如果)恢复纤维。
在我们的 MCP 传输中:
- StdioTransport:主循环(
while (!feof($input)))是协调器。它连续处理输入、管理纤维并刷新输出。 - StreamableHttpTransport:SSE 流的阻塞循环成为该请求生命周期的协调器。它阻塞整个进程并管理纤维直到完成。
- ReactPHP:事件循环是协调器。我们不阻塞它;相反,我们注册循环管理的定时器。
关键原则:协调器不得永久阻塞,否则你的纤维永不恢复。如果你编写挂起纤维的代码,确保接收控制的人有机制恢复它们。
另请注意:纤维可以嵌套。你可以在纤维内创建纤维。协调器可以是中央管理器(如我们的 TaskManager 示例),或父纤维可以自己充当其子纤维的协调器。只需明确谁管理谁,并确保协调器不无限阻塞。
2. 忘记你在纤维中
function myHandler() {
$client->sample("Generate text"); // 这挂起!
// 此处的任何代码在挂起和恢复**后**运行
}
记住挂起可能发生在调用栈深处。总是思考在挂起期间什么状态可能改变。
3. 资源生命周期
$lock = $mutex->acquire();
$client->sample("..."); // 纤维在此挂起
$lock->release(); // 这运行得晚得多!
小心跨越挂起点(锁、数据库事务、文件句柄)的资源。纤维可能挂起几秒或几分钟。
4. 跨越挂起的异常处理
try {
$result = $client->sample("..."); // 挂起
} catch (\Throwable $e) {
// 这捕获**纤维内**的异常
// 不是挂起期间的异常
// 除非用 `throw()` 恢复纤维
}
异常在纤维内正常工作,但挂起/恢复机制本身有单独的错误处理。所以理解异常不自动跨越纤维和协调器边界至关重要。
- 如果纤维抛出异常,它冒泡到协调器(通过
$fiber->start()或$fiber->resume())。 - 如果协调器抛出异常,它不自动进入纤维(因为异常可能与挂起的纤维无关)
你必须明确决定如何桥接那个差距。你想要协调器独立崩溃吗?你想要捕获错误并用失败对象 resume() 吗?还是用 throw() 抛入纤维?这些是架构决策,不是默认。
5. 全局状态
global $counter;
$counter++;
$client->log("Count: $counter"); // 挂起
$counter++; // 如果另一个纤维在挂起期间修改了 $counter 呢?
小心全局状态。其他代码(或其他纤维)可能在你挂起时修改它。
6. 纤维创建开销
创建纤维有小开销。不要创建数百万个。它们比线程轻量,但不是免费的。
结论
当你学习数据结构和算法时,你不只是记住定义和语法,你还学习何时使用它们。例如,双向链表如果不认识到你需要在两端快速插入/删除,就没用。
语言特性也一样。PHP 自 8.1 以来就有纤维,但大多数开发者不用它们,因为他们不认识到纤维解决的问题。所以,下次你面对涉及的问题时:
- 暂停和恢复执行;
- 在干净 API 后隐藏复杂性;
- 让异步代码感觉同步;
- 协作多任务处理.
问自己:“纤维能优雅解决这个吗?”
你可能会惊讶于答案有多经常是“是”。
以上关于PHP Fibers:实现高效协作式多任务处理方案的文章就介绍到这了,更多相关内容请搜索码云笔记以前的文章或继续浏览下面的相关文章,希望大家以后多多支持码云笔记。
如若内容造成侵权/违法违规/事实不符,请将相关资料发送至 admin@mybj123.com 进行投诉反馈,一经查实,立即处理!
重要:如软件存在付费、会员、充值等,均属软件开发者或所属公司行为,与本站无关,网友需自行判断
码云笔记 » PHP Fibers:实现高效协作式多任务处理方案

微信
支付宝