Appearance
第7章 current_thread runtime 与 LocalSet
"When you only have one thread, all problems look like scheduling—and fewer of them are problems." —— 笔者
本章要点
current_threadruntime 只跑在一个线程上——调用block_on的那个线程同时扮演 worker 和主驱动者。没有 worker thread spawn、没有 work-stealing、没有 LIFO slot- Core 结构简化成一个
VecDeque<Notified>队列 + 一个 tick 计数 + 一个 Driver——总共 6 个字段,比 multi_thread 的 Core 少 5 个 - block_on 用
AtomicCell<Core>实现所有权传递:多个线程可以同时调block_on,但只有一个能拿到 Core;其他线程 park 等待 - 主循环结构和 multi_thread 骨架一致——tick → poll 顶层 future → next_task → 运行 → park,但去掉了 steal_work 和 idle protocol
LocalSet不是一个独立 runtime——它是一个可以放进任何 runtime 的 "!Send Future 容器"。你可以在#[tokio::main]多线程 runtime 里用LocalSet::new().run_until(...)跑 !Send 代码spawn_local依赖thread-local 里保存的当前 LocalSet 引用——调用时不在 LocalSet 的run_until/block_on里会 panicLocalSet实现了Future——它的poll推进内部所有 !Send Task 一次、然后根据是否有剩余 Task 返回 Pending 或 Ready
7.1 为什么要有 current_thread runtime
到这里你可能会问:既然 multi_thread 这么厉害——work-stealing、LIFO slot、百万 QPS——为什么还要搞一个单线程 runtime?
答案是不是每个场景都需要、也不是每个场景都允许多线程。至少有四类真实场景单线程 runtime 是最优解、甚至是唯一解:
场景一:GUI 主线程
几乎所有 GUI 框架(iced、egui、GTK、Windows WPF、Cocoa)都要求所有 UI 操作在 UI 线程上执行——这是操作系统层面的硬约束。如果你在 worker thread 上更新 UI,轻则奇怪的 bug,重则崩溃。
如果 GUI 应用还想跑异步(比如后台 HTTP 请求),最自然的做法是:UI 线程运行 current_thread runtime,async fn 里的 .await 让出给 UI 事件循环。iced、egui-async 等框架内部都这么干。
场景二:CLI 工具和简单脚本
cargo xx 类工具、一次性数据处理脚本、小型 CLI——它们跑一个任务就退出。开 4-8 个 worker 线程完全是浪费,光 spawn 线程的开销就几百微秒,而整个脚本可能只跑 50 毫秒。
current_thread 启动比 multi_thread 快 10-100 倍——没有线程 spawn、没有 worker metrics、没有 inject queue。对于"主函数跑几下就 exit"的场景明显更合适。
场景三:嵌入已有事件循环
Electron、Flutter、游戏引擎、数据库服务器——这些系统自己有复杂的主事件循环,不允许 Tokio 另外起 worker thread(会和主循环抢 GIL / mutex / 资源)。
这种情况下把 Tokio 作为"可驱动的子引擎":用 current_thread runtime 的 block_on 把 async 代码从主循环里跑起来、跑完返回主循环。deno 早期版本、wry(一个跨平台 WebView 库)都走这条路。
场景四:需要访问 !Send 类型
有些类型不能跨线程——Rc、RefCell、一些 GUI 句柄、未同步的 FFI 对象。multi_thread runtime 要求 spawn 的 Future 是 Send + 'static,上面这些类型直接卡住。
current_thread 不要求 Send——因为根本不会跨线程移动 Future。LocalSet(本章后半讲)把这个能力进一步带到多线程 runtime。
四种场景的统一底色
四种看似不同的场景,底层都是一个理由:"多线程在这里不是优势,是负担"。
- GUI 场景:多线程不被允许
- CLI 场景:多线程没有收益
- 嵌入式场景:多线程破坏架构
!Send场景:多线程不可行
Tokio 认识到这一点并专门提供单线程运行时——这是对工程现实的尊重。对比一些运行时"只支持多线程、嵌入式 /!Send 不管"的做法,Tokio 的多样性承诺了它"几乎在任何场景都可用"。
7.1½ #[tokio::main(flavor = "current_thread")] 展开出了什么
把第 4 章讲的 #[tokio::main] 宏展开放到当前上下文里再看一次。加 flavor = "current_thread" 参数后,展开结果是:
rust
// #[tokio::main(flavor = "current_thread")] 展开
fn main() {
let body = async { /* 你的 async 代码 */ };
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed building the Runtime")
.block_on(body)
}和多线程版的唯一区别是 new_current_thread() 代替了 new_multi_thread()。这一个改动会让第 4 章图里所有的 scheduler 分叉走向 current_thread 一支——用第 7.2 节的结构代替第 5 章的结构。
实际工程经验:在一个典型的 Rust 项目里,测试代码用 #[tokio::test](默认 current_thread)、服务入口用 #[tokio::main](默认 multi_thread)。这个约定让单元测试快速、让生产服务利用多核——一个项目里两种 runtime 并存是常态。
如果你的单元测试代码里有 spawn_blocking、block_in_place 或者手动起 worker 线程的逻辑,记得给测试加 #[tokio::test(flavor = "multi_thread")]——否则在 current_thread 下会 panic。这是 4.10½ 节埋下的伏笔、这里正式点破。
7.2 CurrentThread / Core / Handle / Shared 的四件套
打开 tokio/src/runtime/scheduler/current_thread/mod.rs,四个核心 struct。原样贴出:
rust
// 来源:tokio-rs/tokio · tokio/src/runtime/scheduler/current_thread/mod.rs (tokio-1.40.0)
pub(crate) struct CurrentThread {
/// Core scheduler data is acquired by a thread entering `block_on`.
core: AtomicCell<Core>,
/// Notifier for waking up other threads to steal the driver.
notify: Notify,
}
struct Core {
tasks: VecDeque<Notified>,
tick: u32,
driver: Option<Driver>,
metrics: MetricsBatch,
global_queue_interval: u32,
unhandled_panic: bool,
}
pub(crate) struct Handle {
shared: Shared,
pub(crate) driver: driver::Handle,
pub(crate) blocking_spawner: blocking::Spawner,
pub(crate) seed_generator: RngSeedGenerator,
pub(crate) task_hooks: TaskHooks,
}
struct Shared {
inject: Inject<Arc<Handle>>,
owned: OwnedTasks<Arc<Handle>>,
woken: AtomicBool,
config: Config,
scheduler_metrics: SchedulerMetrics,
worker_metrics: WorkerMetrics,
}对比 multi_thread 的相同结构(第 5 章):
| 字段 | multi_thread | current_thread |
|---|---|---|
| 任务队列 | queue::Local<...> 256 容量环形 + LIFO slot | VecDeque<Notified> 动态增长 |
| Worker 线程数 | N = CPU 核数 | 1(就是 block_on 调用者) |
| 工作窃取 | ✅ 有 remotes + Steal | ❌ 无 |
| LIFO slot | ✅ 有 | ❌ 无 |
| Idle 协议 | ✅ 复杂的 searching counter | ❌ 简单的 notify: Notify |
| 全局队列 | ✅ inject | ✅ inject(但作用窄) |
| OwnedTasks | ✅ 有 | ✅ 有 |
current_thread 版的 Core 少了三样:没有 run_queue 的环形缓冲结构(用普通 VecDeque 就够)、没有 LIFO slot(单线程不需要)、没有 is_searching(单线程永远不需要从别处偷)。
AtomicCell<Core>:核心的所有权传递
注意 CurrentThread 里 core 是 AtomicCell<Core>——这和 multi_thread 的 Worker 里的 AtomicCell<Core> 一样,但用途微妙不同。
multi_thread 里 AtomicCell 的用途是让 shutdown 能"原子地夺走 Core"。current_thread 里的用途更有意思——它允许多个线程同时调 block_on:
rust
// 某个线程 A
let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
rt.block_on(async { /* ... */ });
// 另一个线程 B 也可能拿到 rt 的引用(通过 Handle clone)调 block_on
// 虽然罕见,但合法AtomicCell 保证只有一个线程拿到 Core——其他调用 block_on 的线程会被 park,等 Core 被释放后再去抢。只是一次只有一个能工作,但 API 允许多线程同时调用、不会 panic。
7.3 block_on:等待 Core 可用
看 current_thread 的 block_on 真实代码。原样:
rust
// 来源:tokio/src/runtime/scheduler/current_thread/mod.rs
#[track_caller]
pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
pin!(future);
crate::runtime::context::enter_runtime(handle, false, |blocking| {
let handle = handle.as_current_thread();
loop {
if let Some(core) = self.take_core(handle) {
handle.shared.worker_metrics
.set_thread_id(thread::current().id());
return core.block_on(future);
} else {
let notified = self.notify.notified();
pin!(notified);
if let Some(out) = blocking
.block_on(poll_fn(|cx| {
if notified.as_mut().poll(cx).is_ready() {
return Ready(None);
}
if let Ready(out) = future.as_mut().poll(cx) {
return Ready(Some(out));
}
Pending
}))
.expect("Failed to `Enter::block_on`")
{
return out;
}
}
}
})
}一眼看不透?拆开来看。这段代码做两件事:
分支 A:拿到 Coreself.take_core(handle) 尝试从 AtomicCell 取 Core。如果成功(当前没有其他线程占着),就调 core.block_on(future)——真正的主循环在这里。
分支 B:没拿到 Core 别的线程正在用 Core。此时当前线程做什么?它不闲等——进入一个 fallback 路径:
- 等
self.notify.notified()信号(其他线程释放 Core 后会通过 Notify 唤醒) - 同时也 poll 当前的
future——如果 future 自己能 Ready 就直接返回(不需要 Core) - 两个条件任一满足就退出
为什么要这么复杂? 因为 current_thread runtime 可能被多个线程同时 block_on(一种罕见但合法的用法)——线程 A 正在用 Core、线程 B 又调了 block_on。B 不能直接 panic,也不该死等——所以 B 进入 "并行 poll 自己的 future" 模式,如果自己的 future 恰好不需要 Core 就直接完成。
这是 Tokio 对"多线程调 block_on"的优雅解。当然,大多数用户的代码只在主线程调 block_on 一次,走分支 A。但这个分支 B 的存在让边缘场景也安全,不会产生锁死或 panic。
track_caller:一次良心的 API 设计
注意 #[track_caller]——第 4 章讲过,它让 panic 的行号指向调用方。这个属性在 block_on 的链式调用里救命:如果 block_on 在 runtime 内被嵌套调用会 panic,panic 消息指向你代码的那一行而不是 Tokio 内部。
为什么要 AtomicCell 而不是 Mutex
一个合理的问题:既然 Core 是"谁拿到谁用"的独占资源,为什么不用 Mutex<Core>?这样 lock().unwrap() 就拿到了。
答案是性能和语义的双重考虑:
性能上:Mutex::lock 涉及阻塞——如果 Mutex 被占用,调用者会进入 OS 级的 wait 队列、触发调度切换。这比 AtomicCell::take(纯原子操作)慢 1-2 个数量级。对于 block_on 这种 hot entry 点,AtomicCell 更合适。
语义上:Mutex::lock 的语义是"我会一直等到拿到锁"。但 block_on 的语义是"如果拿不到锁,我可以改做别的事(比如 poll 我自己的 future)"——7.3 节分支 B 那段双轨 poll 就是这个语义。AtomicCell::take 返回 Option<T>——None 时调用者自己决定等待策略,完美符合 block_on 的需求。
第三层:AtomicCell 本身比 Mutex 轻量。Mutex 是带 poisoning、带 inner data、几百字节的大家伙;AtomicCell 就一个 atomic pointer,8 字节。对于 current_thread runtime 这种"希望尽量轻量"的定位,选择是明确的。
这个选择反映了 Rust 生态对原子原语的细分:Mutex / RwLock / Atomic* / AtomicCell / Cell / RefCell 每种都有明确的定位和适用场景。Tokio 源码是这类分类学的教科书——每个同步原语的使用都恰到好处。
take_core 和 Notify 的协作
上面的分支 B 里最有意思的是:没拿到 Core 的线程不是死等,而是进入一个"既等 Notify、又 poll future"的双轨 poll。这需要 Notify::notified()——Tokio 提供的一个"一次性通知"原语。
Notify::notified() 返回一个 Notified Future:
- poll 这个 Future 会把 Waker 注册到 Notify
- 当别人调
notify.notify_one()时,Notify 会 wake 这个 Waker - 这次 poll 返回 Ready
CurrentThread.notify 在 Core 被释放时触发——具体是 core.block_on 的结尾、drop(core) 时:
rust
// 简化自 core 释放的 drop 逻辑
impl Drop for CoreGuard {
fn drop(&mut self) {
let core = self.core.take().unwrap();
self.scheduler.core.set(core); // 把 Core 放回 AtomicCell
self.scheduler.notify.notify_one(); // 通知一个等待的线程
}
}双轨 poll 的语义:
- 如果 notified 先 Ready(Core 空闲了),当前线程下一轮循环尝试 take_core
- 如果 future 先 Ready,直接返回结果——说明 future 不需要 Tokio 运行时也能完成
这个设计的好处:block_on 永远不会卡死。即使某个线程拿着 Core 跑了一个慢任务,其他线程的 block_on 也有机会推进自己的 future(如果不需要 runtime 功能)。
但代价:一段时间内可能有多个线程都在 poll——每个线程 poll 自己的 future。这不是理想状态(效率降低),但比死锁好。
7.4 主循环:和 multi_thread 的差异
拿到 Core 后,进入真正的主循环。原样(删除 metrics 样板后):
rust
// 来源:tokio/src/runtime/scheduler/current_thread/mod.rs
fn block_on<F: Future>(self, future: F) -> F::Output {
let ret = self.enter(|mut core, context| {
let waker = Handle::waker_ref(&context.handle);
let mut cx = std::task::Context::from_waker(&waker);
pin!(future);
'outer: loop {
let handle = &context.handle;
// 1. 检查是否需要 poll 顶层 future
if handle.reset_woken() {
let (c, res) = context.enter(core, || {
crate::runtime::coop::budget(|| future.as_mut().poll(&mut cx))
});
core = c;
if let Ready(v) = res {
return (core, Some(v));
}
}
// 2. 跑 event_interval 个 Task
for _ in 0..handle.shared.config.event_interval {
if core.unhandled_panic { return (core, None); }
core.tick();
let entry = core.next_task(handle);
let task = match entry {
Some(entry) => entry,
None => {
// 没任务了,park
core = if !context.defer.is_empty() {
context.park_yield(core, handle)
} else {
context.park(core, handle)
};
continue 'outer;
}
};
let task = context.handle.shared.owned.assert_owner(task);
let (c, ()) = context.run_task(core, || { task.run(); });
core = c;
}
// 3. event_interval 次后强制 yield + 处理 driver 事件
core = context.park_yield(core, handle);
}
});
ret.unwrap_or_else(|| panic!("..."))
}三个阶段循环:
- poll 顶层 future:如果它被 woken 过,poll 一次;Ready 就退出整个 block_on
- 跑 Task:最多连跑
event_interval(默认 61)个 Task;队列空了就 park - 强制 yield:每 61 个 Task 后去一次 Driver 查 I/O 事件
和 multi_thread 主循环的对比
multi_thread 版本(第 5 章回顾)的主循环:
- 三条腿:next_task / steal_work / park
- 复杂的 transition_to_searching 协议
current_thread 版本:
- 两条腿:next_task / park(没有 steal)
- 简单的 reset_woken 检查(单线程不需要 searching 状态机)
- 顶层 future 是一等公民——循环开头就 poll 它,保证 block_on 的 future 不会饿死在其他 Task 队列后面
这种"顶层 future 优先"的调度是 current_thread 的语义保证:你调 block_on(async { ... }) 时,那段 async 代码永远会被优先推进,哪怕其他 Task 塞满了队列。multi_thread 没这个保证——顶层 future 只是一个普通 Task,按 scheduler 决定。
reset_woken 的意义
这一行:
rust
if handle.reset_woken() {
// poll 顶层 future
}Handle.shared 里有一个 woken: AtomicBool。每次有 Task 被 wake(Waker 触发)或顶层 future 被 wake,这个 bool 置 true。reset_woken 原子地读并清零。
意义:只有在"有东西 wake 我"之后才 poll 顶层 future——避免无谓的轮询。每次循环都 poll 顶层 future 会浪费 CPU;只 poll 被 wake 的才高效。
context.defer 的角色
主循环里有一行你可能没注意:
rust
core = if !context.defer.is_empty() {
context.park_yield(core, handle)
} else {
context.park(core, handle)
};context.defer 是什么?它是 current_thread runtime 的**"延迟唤醒队列"——当一个 Task 在 poll 过程中 wake 了自己**(用 cx.waker().wake_by_ref()),这个自 wake 不会立刻把 Task 重新入队,而是放进 defer。然后主循环处理完当前批次后统一处理 defer。
为什么要分开? 因为 Task 自 wake 的常见原因是"让出时间片一次"—— 如果立即入队,这个 Task 会无限重复 poll,永远没机会让其他 Task 跑。分到 defer 保证每轮 event_interval 内每个自 wake 的 Task 只被 poll 一次。
multi_thread runtime 也有类似机制——它在 LIFO slot 前后做类似调度平衡。两种 runtime 对"自 wake"都有特殊处理,保证公平性。
7.5 为什么 spawn 到 current_thread 不需要 Send?
tokio::runtime::Builder::new_current_thread().build() 得到的 runtime,spawn 的签名和 multi_thread 版完全一样:
rust
// 来源:tokio/src/runtime/handle.rs(同一个 spawn 函数在两种 runtime 下)
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where F: Future + Send + 'static, F::Output: Send + 'static
{ /* ... */ }注意仍然要 Send + 'static!即使是 current_thread runtime。
为什么? 因为 Handle 可以 clone 到任意线程——一个线程拿到 current_thread runtime 的 Handle,可以从非 runtime 线程调 spawn。这个跨线程传递要求 Future Send。即使实际执行在单线程,"从外部 spawn 进来"这个动作跨了线程。
这就是为什么你会看到一个"反直觉"现象:current_thread runtime 也要求 spawn 的 Future 是 Send。
那么怎么 spawn 真正 !Send 的 Future? 答案是 LocalSet——下半章的主角。
Send 约束的类型系统层级
再深入一层:要理解为什么 current_thread runtime 也要 Send,你需要明白 Rust 类型系统里 "数据结构所有权" 和 "数据结构使用" 的分离。
Handle::spawn(future) 的签名要求 future: F where F: Send——这是调用点的约束。这个约束存在不是因为 future 会真的跨线程移动,而是因为:
spawn会把future装进Task<F, S>Task被塞进 scheduler 的某个队列- scheduler 的队列可能被另一个线程访问(即使 current_thread runtime,Handle 可以跨线程)
- 因此
F必须Send——否则类型系统不允许这种"可能跨线程"的操作
实际执行阶段:current_thread runtime 只会在主线程 poll future——没有真的跨线程。但类型系统不区分"可能" 和 "实际",只按 最严约束。
这是 Rust 类型系统的保守性——它宁愿让合法的使用多一些约束,也不放松约束留 UB 空间。对比 C++ 的 "程序员负责"、Go 的"运行时检查"——Rust 把安全推到编译期,代价是类型约束看起来多余。理解这个哲学,你就理解了 Rust 很多看似"啰嗦"的约束为什么存在。
跨线程 spawn 的 Trampoline
理解了上面"spawn 仍要 Send"这件事后,你可以想象一个巧妙的 pattern:从线程 A 调 Handle::spawn(future),future 在 current_thread runtime 的主线程 B 上执行。
这是完全合法的 Tokio 用法:
rust
let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
let handle = rt.handle().clone();
// 线程 A
std::thread::spawn(move || {
let fut = async { /* Send + 'static */ };
handle.spawn(fut); // ← future 被送到 current_thread 主线程 B 执行
});
rt.block_on(async { /* ... */ }); // 主线程 B 阻塞等 runtime这种 pattern 让你可以从外部线程"投送"任务到单线程 runtime。用场:
- 一个 GUI 主线程上的 runtime,接收来自 worker 线程的异步请求
- 嵌入式场景,中断处理 handler 往 main loop runtime 投送任务
- 测试场景,主 runtime 跑主流程,后台线程 spawn 测试任务
这就是 Send 约束的用处——它让这种"跨线程投送"类型安全。如果 current_thread 放弃 Send 约束,上面这个模式就不可能。
7.6 LocalSet:任何 runtime 都能跑 !Send Future
LocalSet 解决一个核心问题:让 !Send 的 Future 可以在 Tokio 的任何 runtime 上跑,包括多线程 runtime。
看它的定义:
rust
// 来源:tokio/src/task/local.rs
pub struct LocalSet {
tick: Cell<u8>,
context: Rc<Context>,
_not_send: PhantomData<*const ()>,
}
struct Context {
shared: Arc<Shared>,
unhandled_panic: Cell<bool>,
}两个关键点:
_not_send: PhantomData<*const ()>:这个 ZST 让 LocalSet 自身不是 Send——编译器不让你把 LocalSet 跨线程传递。这保证 "LocalSet 绑定到一个线程" 的语义context: Rc<Context>:用Rc而不是Arc——单线程引用计数,比 Arc 快(不需要原子操作)
spawn_local 的 thread-local 魔法
spawn_local 的工作逻辑:
rust
// 来源:tokio/src/task/local.rs
#[track_caller]
pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
where F: Future + 'static, F::Output: 'static
{
if cfg!(debug_assertions) && std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
spawn_local_inner(Box::pin(future), None)
} else {
spawn_local_inner(future, None)
}
}注意 bound:F: Future + 'static——没有 Send。只要 'static。
spawn_local_inner 通过一个 thread-local 变量找到"当前线程激活的 LocalSet",把 Future 推给它。如果调用 spawn_local 时没有活跃的 LocalSet(即你不在 LocalSet::run_until 或 LocalSet::block_on 里)——panic:
spawn_local called from outside of a `task::LocalSet`.LocalSet 内部的队列设计
LocalSet 内部维护一个独立的 Task 队列——不是用 Tokio 主 runtime 的 inject queue。它的 Shared 结构(简化):
rust
// 来源:tokio/src/task/local.rs
struct Shared {
queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>,
local_state: LocalState,
waker: AtomicWaker,
// ...
}注意 queue 是 Mutex<Option<VecDeque>>——为什么需要锁? 因为 spawn_local 可能从任意 async context 调用,不一定在 LocalSet 自己 poll 的时刻调。具体来说:
- LocalSet 线程在 poll 某个 Task 时
- 这个 Task 通过
spawn_local往 LocalSet 的 queue 插入新 Task - 同时 LocalSet 的外层(比如多线程 runtime 的 worker)可能从另一个上下文操作 queue
所以需要 Mutex 保护。Option<...> 是为了支持"LocalSet drop 时清空队列"——drop 时把 Option 置 None,后续 spawn_local 看到就知道 LocalSet 已经死了。
为什么 spawn_local 不 panic 如果 LocalSet 正在 park?
一个微妙的点:LocalSet 的 poll 结束返回 Pending 后,LocalSet 本身不在"active"状态。此时如果另一个 Task(比如同一 runtime 的其他 Task)调 spawn_local——会发生什么?
Tokio 的 local.rs 里 spawn_local 实际上走了一个双路径:
- 快速路径:thread-local 里有 LocalSet 引用 → 直接 push 进 LocalSet 的 queue
- 慢路径:thread-local 没有,但你在一个
LocalSet::block_on的执行期里 → 通过ContextGuard找到 LocalSet
如果两路径都没找到活跃的 LocalSet → panic。
实际使用中,你几乎不会触发 panic——因为 spawn_local 的代码通常在 LocalSet 的 run_until 的 future 里执行,那个上下文永远有活跃的 LocalSet。
7.7 LocalSet 作为一个 Future
LocalSet 的精妙设计是:它自己就是一个 Future。
rust
// 来源:tokio/src/task/local.rs
impl Future for LocalSet {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
self.context.shared.waker.register_by_ref(cx.waker());
if self.with(|| self.tick()) {
cx.waker().wake_by_ref();
Poll::Pending
} else if unsafe { self.context.shared.local_state.owned_is_empty() } {
Poll::Ready(())
} else {
Poll::Pending
}
}
}poll 做三件事:
- 注册 cx 的 Waker:当内部 Task 需要唤醒 LocalSet 时用
- tick 一轮:运行内部所有活跃 Task 一次(
self.with(|| self.tick())会在 thread-local 里临时把 LocalSet 设为"当前",然后 tick 一次) - 判断返回:
- tick 返回 true(还有更多工作)→ wake 自己 + Pending(保证下一轮 poll)
- owned tasks 空了 → Ready(())(LocalSet 完成)
- 否则 Pending(有 Task 但都在等待)
用法:run_until
最常见的用法:
rust
let local = tokio::task::LocalSet::new();
local.run_until(async move {
let rc = Rc::new(42); // ← !Send 类型
tokio::task::spawn_local(async move {
println!("{}", rc);
});
// 等 spawn_local 的 Task 完成
tokio::task::spawn_local(async { /* ... */ }).await.unwrap();
}).await;工作原理:
run_until内部返回一个RunUntil<F>Future- 每次 poll 这个 Future 时:先 poll LocalSet(推进所有 local task),再 poll 你的参数 Future
- 参数 Future Ready 时返回 Output
结果:在多线程 runtime 的上下文里(比如 #[tokio::main]),你用 LocalSet::new().run_until(...) 就能跑 !Send 代码。LocalSet 本身是一个 Future,它不改变底层 runtime、只是给当前 Task 开一个"!Send 子世界"。
register_by_ref 的精妙
LocalSet 的 poll 第一行是 self.context.shared.waker.register_by_ref(cx.waker())。这用的是 AtomicWaker——Tokio 提供的一个可原子替换的 Waker 槽。
AtomicWaker::register_by_ref 的语义:
- 如果槽里没有 Waker,存入当前 Waker
- 如果槽里有 Waker 且
will_wake(cx.waker())为 true,不改(节省 clone) - 否则 clone cx.waker() 存入
这个设计和第 3 章讲的 "Tokio 全局 WAKER_VTABLE + will_wake" 完全配合——存在 LocalSet 里的 Waker 在大多数情况下是同一个 Task 的 Waker(因为 LocalSet 被同一个 Task 反复 poll),所以 will_wake 永远返回 true、不会触发额外 clone。hot path 零成本。
这种设计在 Tokio 源码里反复出现——基本上所有"注册 Waker 给外部事件"的场景都用 AtomicWaker。学会它你就能看懂 Tokio 几十处类似代码。
7.8 LocalSet 的 3 种常见用法
用法 A:和 multi_thread 配合跑 !Send 子任务
rust
#[tokio::main]
async fn main() {
let local = tokio::task::LocalSet::new();
local.run_until(async {
let conn = UnsyncDatabase::connect().await; // 假设 UnsyncDatabase 是 !Send
tokio::task::spawn_local(async move {
conn.query("SELECT ...").await
}).await.unwrap();
}).await;
}用法 B:作为一个独立的单线程 runtime
rust
// Builder::new_current_thread() + LocalSet 提供真正的单线程环境
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let local = tokio::task::LocalSet::new();
local.block_on(&rt, async {
// 这里 spawn_local 和 spawn 都可用
});用法 C:spawn_local 但保持和 spawn 混用
rust
#[tokio::main]
async fn main() {
let local = tokio::task::LocalSet::new();
local.run_until(async {
// spawn_local 跑 !Send 代码
let local_task = tokio::task::spawn_local(async {
let rc = Rc::new(vec![1, 2, 3]);
// 操作 !Send 类型
});
// spawn 同时跑 Send 代码(跑在 multi_thread worker 上)
let send_task = tokio::spawn(async {
// 这段跑在其他 worker,并行执行
heavy_computation().await
});
// 两者都 await
let _ = local_task.await;
let _ = send_task.await;
}).await;
}这个模式非常强大——你可以并行混合 Send 和 !Send 任务。spawn 的任务用 multi_thread runtime 的 worker 跑,spawn_local 的任务用 LocalSet 在当前线程跑。两个世界和谐共存。
用法 D:集成进 GUI 事件循环
rust
// iced 或 egui 的 update 函数里
let local = tokio::task::LocalSet::new();
// 每次 update 时推进 LocalSet 一点
// local 里 spawn 的任务可以直接操作 UI 状态(!Send)这三种用法覆盖几乎所有实际的 !Send 需求。
7.8½ LocalSet 和 Arc / Rc 的微妙关系
LocalSet 的 Shared 是 Arc<Shared>,而不是 Rc<Shared>。为什么在一个单线程的结构里要用 Arc?
答案是:Task 内部持有 scheduler 的反向引用(第 6 章讲过 Core<T, S> 里的 scheduler: S)。对于 LocalSet,这个 S 就是 Arc<Shared>——Task 需要跨 Send 边界(它会被放进 Tokio 标准 Task 结构,要求 S: 'static + ...),所以用 Arc。
但外层的 context: Rc<Context> 是 Rc——因为 Context 只在 LocalSet 自己的线程内用,不被 Task 结构持有。
这种"里层用 Rc 外层也用 Rc、但被 Task 持有的用 Arc"的混合策略是 LocalSet 实现里最精细的设计之一。读 local.rs 源码时注意每个类型是 Arc 还是 Rc——每个选择都对应一个具体约束。
7.8¾ LocalSet 的 tick 方法里做什么
LocalSet Future impl 里 self.with(|| self.tick()) 是核心。tick 方法做一轮工作:
简化逻辑(对应 tokio/src/task/local.rs):
rust
// 简化示意
fn tick(&self) -> bool {
for _ in 0..MAX_TASKS_PER_TICK { // 通常是一个小常数,比如 61
let task = match self.take_next_task() {
Some(task) => task,
None => return false, // 队列空了
};
task.run(); // 运行 Task 一次(这会调 vtable.poll → 最终 poll Future::poll)
}
// 还有任务没处理完,返回 true 让外层继续 poll
true
}关键点:
- 每次 tick 限制处理任务数(避免一个 LocalSet 的 tick 占用 CPU 太久)
- 处理完限额或队列空 → 返回
- 返回 true 表示"我还有活"——LocalSet 的 poll 会 wake 自己触发下一轮 poll
- 返回 false 表示"我没活了"——LocalSet 的 poll 根据 owned_is_empty 决定是 Ready 还是 Pending(Pending 等后续 spawn_local)
这种"有限步长的推进 + 完成度反馈"是 Rust async 世界的标准 Future 组合原语模式。你自己写类似的"驱动内部多个 future"的 Future 时可以复用这个模式。
7.9 和这个系列的其他书的关联
本章讲的 Rc<Context> + PhantomData<*const ()> 让类型 !Send 的技巧,在 《Rust 编译器与运行时揭秘》第 8 章(trait object 与 vtable) 里有讨论——!Send / !Sync 是通过负 trait 实现(impl !Send for ...)或通过持有 !Send 字段(比如 *const () 裸指针)传染出来的。LocalSet 用的是"持有 *const () 字段" 这一招。
《Vue 3 设计与实现》第 10 章(组件系统) 里讲的"组件绑定到渲染器"模式,和 LocalSet 的"Future 绑定到 LocalSet 线程" 模式同构——都是用thread-local + PhantomData 保证某个对象不跨越某个边界。
7.9½ current_thread vs multi_thread 的性能对照
同样一个 HTTP 服务,在两种 runtime 下的真实对比(数字来自 Tokio 官方 benchmark 和社区公开压测):
启动成本:
- multi_thread runtime + 8 worker:~2-3 毫秒(spawn 线程 + epoll 初始化 + 时间轮初始化)
- current_thread runtime:~200-300 微秒
差 10 倍。对于长期运行的服务无所谓,对于启动密集型场景(CLI 每次调用都要新建 runtime)是巨大差距。
吞吐(4 核机器上跑一个 echo 服务):
- multi_thread:~200K QPS
- current_thread:~80K QPS
多线程约 2.5 倍吞吐——这是多核 CPU 的自然加速比。但注意不是 4 倍——因为 worker 之间的协调(偷任务、idle 协议)吃掉一些开销。
单次请求延迟(p50):
- multi_thread:~50-100 微秒
- current_thread:~30-50 微秒
current_thread 反而更低延迟!因为单线程没有跨核缓存失效、没有 work-stealing 开销、没有 LIFO slot 的 atomic 操作。对于"低 QPS 但延迟敏感"的服务(比如一个内部 RPC 到关键组件),current_thread 反而更优。
内存占用:
- multi_thread:每 worker 本地队列 256 × 指针 = 2 KB × 8 = 16 KB + 各种 worker 元数据
- current_thread:一个 VecDeque,初始 0 字节,按需增长
对内存敏感场景,current_thread 可以节省几 MB。
经验法则
简单的选型清单:
- 长期服务、CPU 核心多、高吞吐目标:multi_thread(默认选择)
- CLI / 短寿命进程:current_thread(启动快)
- 延迟敏感、吞吐不高:current_thread 可能更优(别凭直觉用 multi_thread)
- 需要 !Send 类型:current_thread 或 LocalSet
- 嵌入 GUI / 游戏引擎 / 其他事件循环:current_thread
- WASM 环境:current_thread(WASM 原本就单线程)
- 内存受限:current_thread
7.9¾ 两种 runtime 的共享之处
虽然调度器不同,两种 runtime 共享大量代码:
完全共享的部分:
- Task 结构(第 6 章讲的 Cell + Header + State):两种 runtime 的 Task 几乎一模一样,只是 scheduler 泛型参数 S 不同
- Waker 实现(第 3 章的 vtable):同一套
WAKER_VTABLE - I/O Driver / Time Driver(第 8-11 章):两种 runtime 都用 mio 和分层时间轮
- Blocking Pool(第 16 章):两种都有 spawn_blocking 能力
- OwnedTasks 名册:两种都维护全局 Task 列表
差异部分:
- Scheduler:multi_thread 有 8 个 worker struct、current_thread 只有 1 个 Core
- Queue:环形缓冲 vs VecDeque
- Wake 路径的最后一步:multi_thread 把 Task 推给某个 worker,current_thread 推进共用队列
这种"绝大部分共享、只在调度层分叉"的架构让 Tokio 的代码总量可控——两种 runtime 不是两套独立实现,90% 代码共用。你学会 multi_thread 基本就会 current_thread。
这是抽象边界划分得好的标志:trait Schedule 定义了调度行为的契约,上层(Task、Waker)都通过这个 trait 工作,两种 scheduler 是 trait 的两种实现。
7.9½ 小而重要的决策:LocalSet 的 tick: Cell<u8>
LocalSet 结构里有个不起眼的 tick: Cell<u8> 字段。用 u8 而不是 u32——因为 tick 只用来做"每 N 次强制让步"的计数,8 位够了(最多 255 轮)。
这是 Tokio 源码里反复出现的经验——能用最小的类型就用最小的。LocalSet 的 tick 是 u8、CurrentThread 的 tick 是 u32、multi_thread Core 的 tick 是 u32(第 5 章讲过理由)。每处都按实际需求选择位宽——不盲目用 usize、也不盲目用 u64。
这种对每个字段的精打细算在整个 Tokio 源码里贯穿。读过之后你写自己的 Rust 代码时,会开始下意识地问"这里真的需要 usize 吗?u8 够不够?"——这是工程美学的潜移默化。
7.10 本章小结
带走三件事:
- current_thread runtime 是 multi_thread 的简化版——只有一个线程、一个 VecDeque 队列、没有 work-stealing。适合 GUI、CLI、嵌入场景,启动快 10-100 倍。延迟敏感且吞吐不高的服务 current_thread 反而更优
- spawn 到 current_thread runtime 依然要 Send——因为 Handle 可以跨线程 clone。想真正在单线程 spawn !Send Future,用 LocalSet + spawn_local
- LocalSet 是一个 Future,自己可以
.await。它给调用线程开一个 "!Send 子世界"——本线程内的 Future 可以用 Rc / RefCell / 其他 !Send 类型。可以和 multi_thread runtime 组合使用——spawn_local 跑 !Send、spawn 跑 Send,共存无碍
一个总结性的认知升级
读完本章你应该对 Tokio 有一个升级的认知:Tokio 不是"一个" runtime,它是"一组"可组合的 runtime 组件——Builder 是组装入口、Scheduler 有两种、LocalSet 是横切的 !Send 扩展、Handle 是跨线程的门面。你可以按需挑选组合:
new_multi_thread() + enable_all()→ 典型后端服务new_current_thread() + enable_all()→ CLI / GUI 主线程new_multi_thread() + LocalSet.run_until()→ !Send 子世界包裹 + 多核利用new_current_thread()+ 纯 LocalSet → 极简异步工具
这种可组合性是 Tokio 适用场景广泛的根源。把这层组合关系理解清楚,以后你面对任何异步需求都能从 Builder 开始自上而下设计,而不是凭直觉复制粘贴代码。
7.10⅓ 单线程 runtime 里的 .await 其实更"顺畅"
一个读者不常注意的现象:同样的 async 代码,在 current_thread runtime 里可能比 multi_thread runtime 跑得更"流畅"。
原因在于调度行为的差异:
- multi_thread 下,你的 async fn 的不同
.await点之间,Task 可能被迁移到不同 worker。这带来:- CPU cache miss(每次换 worker 都要重新 warm up L1/L2 cache)
- 可能的内存访问跨 NUMA node(如果服务器有多个 CPU socket)
- 原子操作在 worker 之间的传播开销
- current_thread 下,所有
.await都在同一个线程——cache 永远是热的、内存访问都在同一个 NUMA 域
对于"单个请求延迟敏感"的服务,这个差异可能显著:
- multi_thread 一个请求的 p50 可能是 80 微秒
- current_thread 相同逻辑可能 50 微秒
代价是 QPS 降低(单线程不能并行)。但如果你的 QPS 本来不高(比如内部管理服务),current_thread 可能更合适——低延迟 + 简单架构 + 更可预测的调度行为。
生产环境实测中有见过把某个尾延迟敏感服务从 multi_thread 改成 current_thread 后,p99 延迟从 1.2 毫秒降到 600 微秒。代价是单个实例吞吐下降一半,但他们是通过**多实例(每核一个进程)**来扩展吞吐的——这种 thread-per-core 架构的典范案例。
7.10½ LocalSet 的边界陷阱
LocalSet 虽然强大,但有几个容易踩的陷阱:
陷阱 1:LocalSet 不能跨越 .await 边界外
rust
// 错误示范
let local = LocalSet::new();
some_future.await; // await 会让 LocalSet 暂停
local.run_until(/* ... */).await; // 这时候 LocalSet 才真正 poll问题:some_future.await 期间 LocalSet 里的任务不会被推进(因为 run_until 还没开始)。如果你在 some_future 里 spawn_local 了任务,它们得等 run_until 开始才会跑。
对策:把需要 LocalSet 的代码整个包进 run_until 内部。
陷阱 2:drop LocalSet 前有未完成的 local task
rust
let local = LocalSet::new();
local.run_until(async {
tokio::task::spawn_local(async {
long_running().await
});
// 这里直接 return,但 spawn_local 的 task 还没跑完
}).await;
// LocalSet drop,spawn_local 的 task 被强制 cancel问题:LocalSet drop 时所有未完成的 local task 都被 cancel 掉。如果你希望它们跑完,需要显式 .await JoinHandle 或用 local.await(等整个 LocalSet 所有 task 完成)。
陷阱 3:嵌套 LocalSet 会"重叠"
rust
let outer = LocalSet::new();
outer.run_until(async {
let inner = LocalSet::new();
inner.run_until(async {
// 这里 spawn_local 进哪个 LocalSet?
tokio::task::spawn_local(async {});
}).await;
}).await;答案:进最内层的 LocalSet——因为 thread-local 被 inner 覆盖了。这通常是你想要的行为,但如果你以为 spawn 到 outer、实际上进了 inner,可能出现 "外层 LocalSet 以为自己做完了、其实还有任务"的错觉。
这类陷阱不常见但存在。生产代码避免嵌套 LocalSet 是最稳的做法。
7.10⅔ spawn_local 之外的 !Send 工具
除了 spawn_local,Tokio 还有几个专门为 !Send 场景准备的工具。你可能需要:
LocalSet::spawn_local 的独立版本
LocalSet 上还有一个 spawn_local 方法(不是全局的 tokio::task::spawn_local):
rust
let local = LocalSet::new();
let handle = local.spawn_local(async { /* !Send */ });
local.run_until(handle).await.unwrap();这个版本不依赖 thread-local——直接绑定到某个具体的 LocalSet。适合在 LocalSet 创建但还没 run 的时候就 spawn 任务。
tokio-util::task::LocalPoolHandle
在一个多线程 runtime 里,如果你需要多个 "!Send 子世界"(每个子世界独立),可以用 LocalPoolHandle(在 tokio-util crate 里):
rust
use tokio_util::task::LocalPoolHandle;
let pool = LocalPoolHandle::new(4); // 4 个 worker,每个都是 LocalSet 驱动
let handle = pool.spawn_pinned(|| async { /* !Send */ });原理:内部 pool 跑 N 个专用线程,每个线程上跑一个 LocalSet——任务被分派到其中一个。这是 "thread-per-core + LocalSet" 的 Tokio 官方实现,适合 "!Send 资源 + 需要一定并行度" 的场景。
这些工具你不一定常用,但知道它们存在——遇到 !Send 需求时多一条路径可选。
7.11 延伸:tokio-uring 的 current_thread 模型
题外话但值得一提:tokio-uring(io_uring 的 Tokio 子项目)只支持 current_thread runtime。
原因:io_uring 的语义要求每个提交队列(SQ)被同一个线程访问——跨线程访问会引入大量同步开销,抵消了 io_uring 的性能优势。
所以 tokio-uring 的代码模式是:
rust
// tokio-uring 风格
tokio_uring::start(async {
// 这里是 current_thread runtime + io_uring driver
let file = tokio_uring::fs::File::open("...").await?;
let (res, buf) = file.read_at(vec![0; 4096], 0).await;
});每个要做 I/O 的线程自己起一个 tokio-uring 实例。对于"每核一个 worker"架构(类似 monoio)这是自然的分布。
这个细节说明:current_thread runtime 不只是"简化版 multi_thread",它在某些极端场景下反而是唯一选项。
7.11¾ LocalSet 的历史和演化
LocalSet 不是从 Tokio 1.0 就存在的——它是 Tokio 0.2 时代(2019 年底)加入的特性,解决了当时一个具体的痛点。
历史背景:Tokio 0.1 时代(基于 futures 0.1),spawn 的 Future 不强制要求 Send——因为那时候 Tokio 0.1 的调度器比较简陋、没有 work-stealing。Tokio 0.2 引入了完整的多线程 scheduler 后,Send 约束变成硬性要求——大量已有代码和 !Send 类型(Rc、RefCell、当时的 hyper 的一些 Body 类型)无法直接 spawn。
社区讨论后,选择不放松 multi_thread 的 Send 约束(放松会失去 work-stealing 的价值),而是引入 LocalSet 作为一个 opt-in 的 escape hatch。这个选择保留了 multi_thread 的性能优势,同时不让 !Send 代码无路可走。
LocalSet 的 API 从 0.2 到 1.40 几乎没变——它的设计从第一天就是对的。这是 Tokio 社区在 API 稳定性上惊人的严谨程度的又一体现——一个 API 一旦发布几乎不会变,除非用 tokio_unstable feature 标记为实验性。
一个收尾的视角
到这里本书上半场(第 1-7 章:Runtime 和 Scheduler 子系统)完成。你已经理解了 Tokio 的大脑——决定 "Task 何时、在哪执行"的那整套机制。下半场(第 8-20 章)将深入 Tokio 的感官——I/O Driver、Time Driver、同步原语、channel——那是 Tokio 和真实世界(操作系统、时间、并发通信)打交道的地方。换一个视角,你会发现后半场的每一个组件,其实都是在第 1-7 章建立的那个舞台上唱戏。
下一章我们离开调度器话题、进入 I/O Driver——也就是把 Tokio 和操作系统连起来的那一层。你会看到 mio 库在 Tokio 内部的位置、epoll_wait 调用的封装、一个 TcpStream::read 从 user space 到 kernel space 再回来的完整路径。
7.11½ 一个真实案例:eframe 如何集成 Tokio
为了让这章落地,给一个真实项目的集成例子。eframe 是 egui(Rust 生态里最流行的 GUI 库之一)的应用框架,它需要在 GUI 主线程上跑 async 代码——典型的 current_thread + LocalSet 场景。
eframe 的做法:
rust
// 简化自 eframe 风格代码
fn main() {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let _guard = runtime.enter();
let local_set = tokio::task::LocalSet::new();
eframe::run_native(
"My App",
Default::default(),
Box::new(|cc| {
Box::new(MyApp::new(cc, local_set, runtime))
}),
).unwrap();
}
impl eframe::App for MyApp {
fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) {
// 每帧推进 LocalSet 一点
self.local_set.block_on(&self.runtime, async {
tokio::time::timeout(
Duration::from_millis(1), // 最多占用 1ms
futures::future::pending::<()>(),
).await.ok();
});
// 正常 egui UI 渲染
egui::CentralPanel::default().show(ctx, |ui| {
// ...
});
}
}核心技巧:
- 每帧 1ms 让 LocalSet 推进——给异步任务一点 CPU,但不阻塞 UI 渲染的 60 FPS 节奏
- UI 逻辑直接用 egui API(非 async)
- 异步任务(HTTP 请求、文件 I/O)通过
spawn_local放到 LocalSet
这个模式在 wry、iced 等 Rust GUI 框架里都有类似实现。current_thread + LocalSet 是 Rust GUI 异步化的标准答案——值得熟悉。
7.11⅞ 一个小练习:从头写一个极简 current_thread runtime
如果你想在离开本章前加深理解,试着在自己的电脑上写一个极简 current_thread runtime(~80 行),它能跑一个简单的 async fn。
核心骨架(留给你自己补完):
rust
// 骨架:请你自己补完缺失的部分
struct MyRuntime {
queue: Rc<RefCell<VecDeque<Rc<MyTask>>>>,
}
struct MyTask {
future: RefCell<Pin<Box<dyn Future<Output = ()>>>>,
queue: Rc<RefCell<VecDeque<Rc<MyTask>>>>,
}
impl MyRuntime {
fn new() -> Self {
MyRuntime { queue: Rc::new(RefCell::new(VecDeque::new())) }
}
fn spawn<F: Future<Output = ()> + 'static>(&self, future: F) {
let task = Rc::new(MyTask {
future: RefCell::new(Box::pin(future)),
queue: self.queue.clone(),
});
self.queue.borrow_mut().push_back(task);
}
fn block_on<F: Future>(&self, mut future: F) -> F::Output {
// 主循环:
// 1. poll 顶层 future
// 2. 如果 Ready,返回
// 3. 否则 pop 一个任务 poll 一次
// 4. 如果队列空且顶层未 Ready,怎么办?
todo!("自己补全")
}
}提示:你需要一个 Waker 实现——最简的是让 Waker 把 Task push 回队列。Rust 标准库的 task::Waker::from_raw 是你的朋友。
写完后你会对为什么 Tokio 的 current_thread 是那个结构有骨感的理解。花 30 分钟做这个练习,比读一小时书更有价值。
延伸阅读
- Tokio 源码:
tokio/src/runtime/scheduler/current_thread/mod.rs - Tokio 源码:
tokio/src/task/local.rs - Tokio 源码:
tokio-util/src/task/spawn_pinned.rs——LocalPoolHandle的实现 - 《Rust 编译器与运行时揭秘》第 8 章:
!Send/!Sync的 trait object 传染机制 - 《Vue 3 设计与实现》第 10 章:组件系统的 thread-local 绑定对比
- Tokio blog: "tokio 1.8 release notes" —— LocalSet 的稳定化历史节点
- eframe 源码:
eframe/src/native/epi_integration.rs—— 真实 GUI 项目如何集成 Tokio