Appearance
第8章 Reactor 与 I/O Driver 架构
"An async runtime's heart beats on the rhythm of epoll_wait." —— 笔者
本章要点
- Tokio I/O Driver 是 mio 库的薄封装——
Driverstruct 只有 3 个字段:signal_ready: bool、events: mio::Events、poll: mio::Poll Handle里管mio::Registry(注册 fd 的接口)、RegistrationSet(所有活跃注册)、mio::Waker(让 turn 从外部被唤醒)park()的本质就是一次mio::Poll::poll系统调用 + 事件分发——每个 mio event 对应 Tokio 的一个 ScheduledIo- Token 编码:
mio::Token(n)在 Tokio 里被设计成直接存ScheduledIo的堆地址——let ptr: *const ScheduledIo = token.0 as *const _。这避免了 Hash map 查找 ScheduledIo是每个 fd 的 per-resource 结构:readiness: AtomicUsize(又一个 bit packing:Tick + Readiness + Shutdown)+waiters: Mutex<Waiters>poll_readiness和wake两个方法是 Future 侧和 Driver 侧的对接点——poll_readiness让 Task 返回 Pending 并注册 Waker,wake在 epoll 事件到达时唤醒所有满足条件的 Waker- 一次 TcpStream::read.await 的完整 8 步走位:用户调
.await→ poll_readiness → Pending → 主 Task park → epoll_wait → 内核 返回 → 匹配 Token 找到 ScheduledIo → set_readiness + wake → Task 被重新 schedule → 再次 poll → Ready
8.0½ 开始前的"前置小测验"
这一章是本书的分水岭——前 7 章都在讲"Task 怎么被调度",从本章起讲"Task 怎么和真实世界(OS、时间、I/O)对接"。读之前做个自测,看看你已经有多少心智模型:
1. 一个 TcpStream::connect("1.2.3.4:80").await 的 .await 让步那一刻,Waker 被存到哪里? 答:被存到某个 ScheduledIo 的 waiters.writer(因为 connect 完成的标志是"fd 可写")
2. 如果 ScheduledIo 已经 drop 了,Tokio 还会用第 3 章讲的 vtable 去 wake 吗? 答:不会——drop ScheduledIo 前会 deregister fd,epoll 以后不会再报事件
3. 一个 fd 的 ScheduledIo 从哪里分配? 答:一般是 Box::new(ScheduledIo { ... }) 分配在堆上,地址被塞进 mio::Token
4. epoll_wait 等事件时,Tokio 的 worker 线程在做什么? 答:该线程没事做——它进入 OS 级的阻塞等待(不占 CPU),直到 epoll 有事件
5. 为什么 multi_thread runtime 的 scheduler 章节讲 LIFO slot 和 work-stealing 非常复杂,但 I/O 这一章看起来相对简单? 答:因为 I/O Driver 本质上委托给 mio + OS。Tokio 只负责"把 mio 事件翻译成 Waker 调用"——大部分复杂逻辑在 OS 内核里(TCP/IP 栈、epoll 实现)。Scheduler 是 Tokio 自己的"原创",代码复杂度自然高
如果这 5 题你能答上 3 题以上,你已经是合格的 Tokio 使用者了。读完本章后应该能全部答对,并且能解释"为什么这样"。
8.1 I/O Driver 在 Tokio 里的定位
前 7 章讲完了"任务如何被调度"——Scheduler 和 Task 的故事。但还有个硬核问题没回答:tokio::net::TcpStream::read().await 这一行代码,具体怎么做到"有数据时唤醒任务、没数据时挂起"的?
答案是 I/O Driver——Tokio 运行时的"外部事件管道"。它的职责极其明确:
- 接收 Future 的"关注请求":某个 Task 在
.await一个 socket 可读 → Driver 记下"这个 Task 在等这个 fd 的读事件" - 主动等 OS 事件:调
epoll_wait(Linux)/kqueue(macOS)/IOCP(Windows)等系统调用 - 翻译事件:OS 说"fd X 现在可读了"→ Driver 找到"在等 fd X 的 Task"→ 调 Waker 唤醒
整个 Tokio 的 I/O Driver 是 mio 库的薄封装。mio 做跨平台的 poll 抽象(epoll/kqueue/IOCP 统一成一套 API),Tokio 做"从 mio 事件到 Task Waker"的那一层桥接。
"Reactor" 是个老派的设计模式名字——它指的就是这种"等事件 + 分发到关注者"的架构。Tokio 用了这个名字是向传统致敬,核心逻辑是几十年来 event-driven 编程的集大成。Node.js 的 libuv、Python asyncio、Netty、libevent——底层都是这个模式。
和常见的 Reactor 教程讲法的区别
市面上大多数 "写个异步 runtime" 教程会这样开头:
首先我们需要一个 Reactor,它监听 I/O 事件、通知 Executor ...
然后开始展开这个 Reactor。这种讲法会让你误以为 Reactor 是 runtime 的独立模块。
实际上 Tokio 的 I/O Driver 不是一个独立的 thread / service,它是某个 worker 线程在空闲时顺手做的事。每个 worker 的主循环(第 5 章)在 park 阶段会调 I/O Driver 的 turn——turn 里才真的调 epoll_wait。I/O Driver 是一个被动组件,不是主动的线程。
这个认知差异对理解 Tokio 很重要:
- Tokio 的线程数 = worker 数(multi_thread 是 N 个,current_thread 是 1 个)
- 没有"专门的 I/O 线程"
- I/O 事件处理的延迟 = "某个空闲 worker 什么时候调到 turn"
这种设计让 Tokio 的线程资源开销最小化——不像某些事件驱动系统有"一个 I/O 线程 + N 个 worker 线程"的额外开销。
8.2 Driver 与 Handle 两件套
打开 tokio/src/runtime/io/driver.rs,核心结构只有两个 struct——极简。原样贴出:
rust
// 来源:tokio-rs/tokio · tokio/src/runtime/io/driver.rs (tokio-1.40.0)
pub(crate) struct Driver {
/// True when an event with the signal token is received
signal_ready: bool,
/// Reuse the `mio::Events` value across calls to poll.
events: mio::Events,
/// The system event queue.
poll: mio::Poll,
}
pub(crate) struct Handle {
/// Registers I/O resources.
registry: mio::Registry,
/// Tracks all registrations
registrations: RegistrationSet,
/// State that should be synchronized
synced: Mutex<registration_set::Synced>,
/// Used to wake up the reactor from a call to `turn`.
#[cfg(not(target_os = "wasi"))]
waker: mio::Waker,
pub(crate) metrics: IoDriverMetrics,
}两个 struct 的分工:
Driver 是 "主动的轮询循环者"——持有 mio::Poll 实例,每次 park() 都会用它调 epoll_wait。3 个字段:
poll: mio::Poll——封装 epoll/kqueue/IOCP 的 handleevents: mio::Events——复用的事件缓冲区(避免每次 poll 都分配 1024 个 Event 的内存)signal_ready: bool——是否有 signal 事件待处理(signal handling 是 I/O Driver 的兼职工作)
Handle 是 "注册管理员"——对外提供注册新 fd、唤醒 Driver、查统计的接口。4-5 个字段:
registry: mio::Registry——从 poll clone 出的注册 handle(可以跨线程注册新 fd)registrations: RegistrationSet——Tokio 自己维护的所有注册记录synced: Mutex<Synced>——需要互斥的注册状态waker: mio::Waker——关键一手:一个特殊的注册项,让其他线程可以通过调waker.wake()打断 Driver 正在运行的epoll_waitmetrics——可观测性
一个典型的部署模式
Runtime 启动时:
Driver::new(nevents)构造出一对(Driver, Handle)- Driver 被藏在某个 worker 线程里(multi_thread runtime 选一个 worker 专门跑 park,其他 worker 跑 Task;current_thread runtime 就是主线程自己跑 park)
- Handle 被 clone 到所有需要注册 fd 的地方——user code、Task 内部、其他系统(Time Driver、Signal Driver)
这种 "独占 Driver + 可共享 Handle" 的分离和 Tokio 其他地方完全一致——Runtime vs Handle、Scheduler Core vs Shared 都是同一套思路。
一个工程层面的观察:为什么 Driver 不持有 Handle
注意 Driver 和 Handle 是两个独立的 struct,不是 Driver 嵌套 Handle。这是一个有意识的架构决策:
- Driver 独占使用——永远只有一个地方(某个 worker 或主线程)在 turn
- Handle 可以 clone + 跨线程——任何代码都能通过 Handle 注册新 fd、wake Driver
如果 Driver 持有 Handle 的字段,Driver 就必须要能 Send + Sync——这会破坏 Driver 的"独占语义"。分离出 Handle 让 Driver 可以持有 mio::Poll 这种不是 Send 的资源。
这种**"分离可变量和不变量、分离独占和共享"**的模式在 Rust 并发数据结构里反复出现——Tokio 源码每层几乎都这么组织。学会辨认它,你读任何复杂 Rust 并发代码都会顺畅。
8.3 Token:mio 和 Tokio 之间的桥
mio::Poll::register(&source, token, interest) 的 token: Token 是一个 usize wrapper——它是你在调用 register 时给 mio 的"标签",mio 在 events 里返回事件时会把这个 token 还给你,让你找到"这是哪个 fd 的事件"。
其他 mio 用户(比如 pre-async Rust 时代的 rotor crate) 通常这么用:
rust
// 传统 mio 用法
struct MyServer {
connections: HashMap<Token, TcpStream>, // 用 HashMap 做 token → 资源的映射
next_token: Token,
}
fn on_event(&mut self, event: Event) {
let stream = self.connections.get(&event.token()).unwrap();
// ...
}每次 event 处理都有一次 HashMap 查找——这对 hot path 来说不优。
Tokio 用了一个更激进的技巧:把 ScheduledIo 的堆地址直接塞进 Token。
看 turn 方法里的这段:
rust
// 来源:tokio/src/runtime/io/driver.rs
for event in events.iter() {
let token = event.token();
// ...
} else {
let ready = Ready::from_mio(event);
let ptr: *const ScheduledIo = token.0 as *const _; // ← 关键:token 就是指针
let io: &ScheduledIo = unsafe { &*ptr };
io.set_readiness(Tick::Set, |curr| curr | ready);
io.wake(ready);
// ...
}
}token.0 as *const _——一次类型转换,不查表、不 hash、零额外开销。mio 的 Token 被 Tokio 用作**"指针的不透明包装"**,编码方案是:
- 注册一个 fd 时,先
Box::new(ScheduledIo { ... })分配在堆上 - 取这个 Box 的地址,转成 usize,打包成
mio::Token(addr) - 调
registry.register(&source, token, interest) - 后续 epoll 事件返回时,从 token 里直接拿地址,还原成
&ScheduledIo
为什么这安全?
两个前提保证了 unsafe 的正确性:
- ScheduledIo 的地址必须稳定——Tokio 用
Box<ScheduledIo>分配、永远不 move(直到资源 drop)。所以地址在整个注册期内合法 - 事件返回时 ScheduledIo 必须还活着——
RegistrationSet跟踪所有活跃注册,resource drop 时会从 mio deregister,epoll 之后不会再对这个 token 报事件
这是 Tokio 源码最典型的 "用类型系统 + 生命周期管理换性能" 的例子——一次 usize 到指针的转换省掉了 HashMap 查找,单次 I/O 事件省几百纳秒。在高 QPS 下累积可观。
8.4 park() 与 turn():一次 epoll_wait 的完整循环
Driver 的核心方法是 park 和 turn。原样:
rust
// 来源:tokio/src/runtime/io/driver.rs
pub(crate) fn park(&mut self, rt_handle: &driver::Handle) {
let handle = rt_handle.io();
self.turn(handle, None);
}
fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
debug_assert!(!handle.registrations.is_shutdown(&handle.synced.lock()));
handle.release_pending_registrations();
let events = &mut self.events;
match self.poll.poll(events, max_wait) {
Ok(()) => {}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
#[cfg(target_os = "wasi")]
Err(e) if e.kind() == io::ErrorKind::InvalidInput => {}
Err(e) => panic!("unexpected error when polling the I/O driver: {:?}", e),
}
let mut ready_count = 0;
for event in events.iter() {
let token = event.token();
if token == TOKEN_WAKEUP {
// mio::Waker 被调用,唤醒这个 turn——不需要做其他事
} else if token == TOKEN_SIGNAL {
self.signal_ready = true; // 标记有 signal 事件
} else {
let ready = Ready::from_mio(event);
let ptr: *const ScheduledIo = token.0 as *const _;
let io: &ScheduledIo = unsafe { &*ptr };
io.set_readiness(Tick::Set, |curr| curr | ready);
io.wake(ready);
ready_count += 1;
}
}
handle.metrics.incr_ready_count_by(ready_count);
}这 30 行是 Tokio 的 I/O 心脏。每一次调用做四件事:
1. release_pending_registrations——先处理"等待注册"的队列。其他线程注册新 fd 时不直接调 mio(可能有锁竞争),而是放进一个 pending 队列;Driver 在 park 开始时批量处理。这是延迟批量处理的典型做法。
2. poll.poll(events, max_wait)——真正的系统调用。max_wait = None 表示无限等(直到有事件);Some(duration) 表示最多等多久(Time Driver 用的)。
底层 mio 在 Linux 上调 epoll_wait(2)、macOS 调 kevent(2)、Windows 调 GetQueuedCompletionStatusEx。这条系统调用是 Tokio 运行时唯一"能阻塞"的地方——其他所有地方(Mutex、Semaphore、channel)都是"挂到 Waker 上然后让出"。
3. Interrupted 错误处理——epoll_wait 可能被信号打断返回 EINTR。这不是错,继续跑就行。这是 Unix 系统调用编程的基础约定,Tokio 很标准地处理了。
4. 事件分发循环:
TOKEN_WAKEUP= 0,TOKEN_SIGNAL= 1——这两个 token 是保留的- 其他 token 都是
*const ScheduledIo地址——每个都对应一个活跃注册
对于普通事件:
- 从 mio event 拿到 ready flags(readable/writable/error 等)
- token 转指针 → ScheduledIo 引用
set_readiness更新 AtomicUsize 里的 readiness 位wake唤醒正在等这个 fd 的所有 Waker
一次 turn 就是一个"等-分发"的闭环。multi_thread runtime 里,某个 worker 在 park 自己时会顺带调这个 turn 函数(第 5 章的主循环 maintenance 环节);current_thread runtime 里,主线程的 block_on 循环在没事做时调 turn。
8.5 ScheduledIo:每个 fd 的状态机
打开 tokio/src/runtime/io/scheduled_io.rs:
rust
// 来源:tokio/src/runtime/io/scheduled_io.rs
pub(crate) struct ScheduledIo {
pub(super) linked_list_pointers: UnsafeCell<linked_list::Pointers<Self>>,
readiness: AtomicUsize,
waiters: Mutex<Waiters>,
}
#[derive(Debug, Default)]
struct Waiters {
list: WaitList,
reader: Option<Waker>,
writer: Option<Waker>,
}3 个字段:
linked_list_pointers——让 ScheduledIo 可以放进RegistrationSet的侵入式链表(第 6 章 Task 的 Trailer.owned 同款设计)readiness: AtomicUsize——当前资源的就绪状态,用一个 AtomicUsize 编码多种信息(下节展开)waiters: Mutex<Waiters>——等待这个 fd 的 Waker 们
Waiters 结构:
list: WaitList——通用的 "等待者链表",用于多个读者 / 多个写者的情况(比如几个 Task 同时关注同一个 fd 的不同 Ready 事件)reader: Option<Waker>——等读就绪的 Waker(fast path,单一 reader 时避免进 list)writer: Option<Waker>——等写就绪的 Waker
为什么 reader/writer 是独立字段而不是放进 list?因为最常见的模式是"单 reader + 单 writer"(一个 Task 专门 read、另一个 Task 专门 write)。给它们独立的 fast-path slot 避免进通用 list、避免 Box 分配。hot path 零额外开销,冷场景(多 waiter)fallback 到 list。
Mutex 的角色:必须但被最小化
waiters: Mutex<Waiters> 是 ScheduledIo 里唯一需要锁的字段。为什么这里非用 Mutex 不可?
因为 Waiters 是变长数据:
- WaitList 可能有几百个 waiter
Option<Waker>要原子地 take 出来
Atomic 变量处理不了变长链表——Mutex 是不可避免的。但 Tokio 做了两件事最小化锁的代价:
- 锁持有时间极短——任何进锁的代码路径都是"几行 load/store",不做任何可能阻塞的事
- fast path 绕开锁——poll_readiness 第一次检查 AtomicUsize(无锁),只有"没 ready 需要注册 Waker"才进 Mutex
在典型工作负载下,poll_readiness 里进 Mutex 的频率远小于 "直接返回 Ready"——Mutex 不是 hot path。这让 ScheduledIo 的整体性能几乎等同于无锁方案。
这种"尽量无锁,必须锁的地方把锁的范围压到最小"是 Tokio 到处都有的设计哲学。
8.6 readiness 的 bit packing:又一个 AtomicUsize
readiness 是 AtomicUsize,里面编码了三种信息:
- Tick:一个递增的时钟(用来检测 "我保存的 Ready 是不是已经过期了")
- Readiness flags:READABLE / WRITABLE / READ_CLOSED / WRITE_CLOSED 等位
- Shutdown 标志:资源是否关闭
具体位分布:
rust
// 简化自 scheduled_io.rs 的常量定义
const SHUTDOWN: BitPack = ...; // 最高 1 bit
const TICK: BitPack = ...; // 中间 N bits(比如 8 bit)
const READINESS: BitPack = ...; // 低位存 ready flagsset_readiness 的 CAS 循环(原样):
rust
// 来源:tokio/src/runtime/io/scheduled_io.rs
pub(super) fn set_readiness(&self, tick: Tick, f: impl Fn(Ready) -> Ready) {
let mut current = self.readiness.load(Acquire);
loop {
let current_readiness = Ready::from_usize(current);
let new = f(current_readiness);
let new_tick = match tick {
Tick::Set => {
let current = TICK.unpack(current);
current.wrapping_add(1) % (TICK.max_value() + 1)
}
Tick::Clear(t) => {
if TICK.unpack(current) as u8 != t {
return; // tick 已经变过,说明有新事件,不要清
}
t as usize
}
};
let next = TICK.pack(new_tick, new.as_usize());
match self.readiness.compare_exchange(current, next, AcqRel, Acquire) {
Ok(_) => return,
Err(actual) => current = actual,
}
}
}这段代码精巧在哪:
Tick 的作用——避免"过期 Ready"。想象这个序列:
- Task A poll → readiness 显示 fd readable → A 去 read
- A read 到一半,fd 被内核标记 not-readable(buffer 空了)
- 但 Driver 在此期间又收到 READABLE 事件(又来新数据)
- A 继续 read 发现确实有新数据
如果没有 Tick,A 可能误以为"我看到的 Readable 已经过期"从而错过本次事件。Tick 提供一个单调递增的序列号——A 保存自己看到的 tick,下次 poll 时对比,知道是否有新事件发生过。
每次 set_readiness(由 Driver 在收到事件时调):tick += 1 (wrapping)。 每次 clear_readiness(由 Future 在 read 后调,表示"我消费了这次 Ready"):如果 tick 变过,不要清(因为还有新事件没处理)。
这是一个完整的 "producer-consumer 版本号协议"——比你想象的 "一个 atomic readiness flag" 精细得多。这类精细度是 Tokio 在"看似简单的 I/O 状态"上能达到工业级正确性的原因。
8.7 wake:把 mio 事件翻译成 Waker 调用
Driver 调 io.wake(ready) 时,具体做什么?原样:
rust
// 来源:tokio/src/runtime/io/scheduled_io.rs
pub(super) fn wake(&self, ready: Ready) {
let mut wakers = WakeList::new();
let mut waiters = self.waiters.lock();
if ready.is_readable() {
if let Some(waker) = waiters.reader.take() {
wakers.push(waker);
}
}
if ready.is_writable() {
if let Some(waker) = waiters.writer.take() {
wakers.push(waker);
}
}
'outer: loop {
let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
while wakers.can_push() {
match iter.next() {
Some(waiter) => {
let waiter = unsafe { &mut *waiter.as_ptr() };
if let Some(waker) = waiter.waker.take() {
waiter.is_ready = true;
wakers.push(waker);
}
}
None => { break 'outer; }
}
}
drop(waiters);
wakers.wake_all();
waiters = self.waiters.lock();
}
drop(waiters);
wakers.wake_all();
}5 步逻辑:
- 取 reader / writer slot:如果 ready 包含 readable → 取 reader 的 Waker;类似对 writer
- 批量扫描 list 里的其他 waiter:过滤出"兴趣匹配"的 waiter、取出它们的 Waker
- 关键优化:分批 wake——
WakeList有容量限制(通常 32),积累一批就 drop lock 去 wake。避免在 Mutex 锁内调用外部代码(Waker 可能触发任意副作用) - drop 锁后调 wake_all——所有 Waker 被调用,对应的 Task 被推回 scheduler
- 循环直到 list 遍历完
为什么要分批:第 6 章讲过的一个原则——原子操作和锁内不能有副作用。Waker::wake 可能触发 schedule、spawn、甚至别的 wake——这些操作可能反过来需要我们现在锁着的 Mutex。在锁内调 wake = 可能死锁。
分批模式解决这个问题:收集 N 个 Waker → drop 锁 → wake_all → 重新拿锁继续扫。死锁隐患消失,性能也好(锁的持有时间短)。
这又是一个可迁移模式:写任何"锁内要触发外部回调"的代码时,把回调先收集到 list、drop 锁、再触发。这个模式在整个 async 生态(不只 Tokio)反复出现。
8.8 poll_readiness:Future 侧的对接
Driver 负责"外部事件进来",Future 负责"读 Ready 状态"。两者的对接点是 poll_readiness:
rust
// 来源:tokio/src/runtime/io/scheduled_io.rs
pub(super) fn poll_readiness(
&self,
cx: &mut Context<'_>,
direction: Direction,
) -> Poll<ReadyEvent> {
let curr = self.readiness.load(Acquire);
let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
let is_shutdown = SHUTDOWN.unpack(curr) != 0;
if ready.is_empty() && !is_shutdown {
// 还没就绪,注册 Waker
let mut waiters = self.waiters.lock();
let slot = match direction {
Direction::Read => &mut waiters.reader,
Direction::Write => &mut waiters.writer,
};
match slot {
Some(existing) => {
if !existing.will_wake(cx.waker()) {
existing.clone_from(cx.waker());
}
}
None => {
*slot = Some(cx.waker().clone());
}
}
// 双重检查:上锁期间可能事件已经来了
let curr = self.readiness.load(Acquire);
let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
// ...
if ready.is_empty() {
Poll::Pending
} else {
Poll::Ready(ReadyEvent { /* ... */ })
}
} else {
Poll::Ready(ReadyEvent { /* ... */ })
}
}双检查模式(double-checked):
- 第一次检查:无锁读 readiness。如果已经 ready → 直接返回 Poll::Ready
- 如果没 ready:上锁,注册 Waker
- 注册完立刻再读一次 readiness:因为上锁期间可能有事件到来(事件到来 → Driver 调 wake → 尝试拿锁 → 但我们持着锁 → Driver 挂起等锁)
- 如果第二次检查发现 ready 了:直接返回 Ready,不用等 wake(因为我们刚 miss 掉了)
这是"register & check" 模式的经典形态。Linux 的 futex、C++ 的 std::atomic CAS loop、Java 的 park/unpark——都是这个模式。没有这个双检查,你就有竞态:在"看见未 ready + 注册 Waker"之间,事件到了,Waker 就永远不会被触发(因为它注册之前事件已经消费了)。
和第 3 章的 will_wake 呼应
看 existing.will_wake(cx.waker()) 这行——第 3 章讲过的 Tokio 全局 VTABLE 设计让这个比对高效。没有全局 VTABLE,这里每次 poll 都会 clone 一次 Waker——TcpStream 每秒几千次 poll,每次多一次 atomic inc,日积月累就是显著开销。
架构决策的连锁收益:第 3 章的设计在第 8 章显示了它的价值。
一个漏诊的 bug:你以为 fast path 的开销为零,其实
上面那段 existing.will_wake(cx.waker()) 看似只是"比较两个指针",实际它依赖第 3 章的全局 VTABLE 设计才能真正省 clone。
反面案例:有个读者 2022 年报告了一个"Tokio 比 smol 慢 15%"的 issue。调查后发现:
- 他的 benchmark 频繁 spawn 任务 + 频繁 poll I/O
- 他测试的 Tokio 版本早于全局 VTABLE 优化(那个优化在 Tokio 1.6 前后上线)
- 没有全局 VTABLE,每次 poll I/O 都会 clone 一次 Waker
- 累计几百万次 poll = 几百万次原子 ref_inc
修完(升级到有全局 VTABLE 的版本)后差距消失。
这个故事的教训:性能 bug 往往隐藏在"看似免费"的地方。fast path 的每一个操作都值得审视——尤其是看起来 O(1) 但实际涉及原子操作、memory barrier、cache miss 的地方。
Tokio 的作者们比任何人都明白这点——他们在第 3 章、第 8 章、接下来的第 11 章反复展示"无锁 fast path + 精细 atomic 使用" 的范式。
8.9 一次 TcpStream::read.await 的完整 8 步走位
现在把前面所有东西拼起来。一次典型的 read:
rust
let n = stream.read(&mut buf).await?;Step 1:TcpStream::read 构造 Future 内部返回一个 Read<'a> Future,它的 poll 调 stream.poll_read(cx, buf)。
Step 2:poll_read 检查 readiness 内部调 self.scheduled_io.poll_readiness(cx, Direction::Read)。第一次检查 readiness AtomicUsize—— READABLE bit 没置位(socket 当前没数据)。
Step 3:注册 Waker 到 waiters.readerpoll_readiness 进入 "not ready" 分支,拿 waiters Mutex 锁,把 cx.waker().clone() 存到 waiters.reader。
Step 4:双重检查 + 返回 Pending 注册完再读一次 readiness——依然没 READABLE。返回 Poll::Pending。
Step 5:Task 让出,Scheduler 找下一个 Task 或 park Task 的 poll 结束,Task 被标记为 idle(state RUNNING → Idle)。如果 worker 没其他 Task,它会尝试 Driver::park(可能自己调,可能让独立 Driver 线程调)。
Step 6:park 里 epoll_wait 被触发 Driver 的 self.poll.poll(events, None) 阻塞在 epoll_wait 系统调用上。CPU 让出给 OS。
Step 7:内核有数据 远端发数据到来 → 内核 TCP 栈把数据放进 socket 的 receive buffer → 内核在 epoll 的就绪列表里标记这个 fd READABLE → epoll_wait 返回。
Step 8:Driver 的分发循环
events.iter()拿到 mio Eventtoken.0 as *const ScheduledIo拿到 ScheduledIo 引用io.set_readiness(Set, |curr| curr | READABLE)更新 readinessio.wake(READABLE)拿 Mutex 锁 → 取出 waiters.reader 的 Waker → drop 锁 → 调waker.wake()- 调到 waker.wake() 的幕后(第 3 章路径)→ Task 被重新入 scheduler 队列
Step 9(回到 Task 侧):Task 被重新 poll Worker 从队列拿到 Task → transition_to_running → 再次 poll Read Future → poll_read → poll_readiness → 这次 readiness 包含 READABLE → 返回 Ready → Future 继续,调 recv(fd, buf) 真正读数据 → 返回字节数。
整个流程 9 步——每一步都有具体的 Tokio 或内核代码对应。下次你写 .await 时,就是这 9 步在背后跑。
这条路径上的每一步延迟
把第 8.10⅔ 节的数字套到这 9 步上:
- Step 1-5(用户代码 + poll_readiness 注册 + Task idle):~200 纳秒
- Step 6(epoll_wait 阻塞等):从几纳秒到无限(取决于 socket 什么时候有数据)
- Step 7(内核 TCP 栈 + epoll 唤醒):~1-10 微秒(内核侧延迟)
- Step 8(Driver 分发 + wake):~500 纳秒
- Step 9(Task 重 poll + read):~500 纳秒
从 wake 触发到 Task 拿到数据的总开销:~2-11 微秒。
和传统阻塞 I/O 对比:
- 阻塞 read:kernel 唤醒线程 + context switch + 应用代码继续 → ~5-20 微秒
- 差距不大,但 Tokio 让一个线程能维持几万个连接 idle
什么情况会让这条路径变慢
- Step 3 拿 Mutex 锁时遇到竞争:多个 Task 关注同一个 fd → Mutex 锁开销从 30 纳秒升到几微秒
- Step 8 wake_all 触发大量回调:如果一个 fd 被几百个 Task 关注(极端场景),wake 时要迭代所有 waiter,可能几十微秒
- Step 9 Task 被调度到另一个 worker:跨核 cache miss → 几百纳秒开销
但这些都是边缘情况。正常使用下这条路径是极快的。
8.9½ 写自己的 AsyncRead:对接 poll_readiness 的实战模板
如果你想写自己的 AsyncRead 实现(比如一个自定义 I/O 资源),poll_readiness 是核心对接点:
rust
// 典型 AsyncRead 实现模板
use tokio::io::{AsyncRead, ReadBuf, Interest};
impl AsyncRead for MyResource {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let this = self.get_mut();
loop {
// 1. 尝试读取(不等)
match this.inner.try_read(buf.initialize_unfilled()) {
Ok(n) => {
buf.advance(n);
return Poll::Ready(Ok(()));
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
// 2. 没数据,注册 Waker
match this.scheduled_io.poll_readiness(cx, Direction::Read) {
Poll::Ready(ready) => {
// readiness 已经有 READABLE,重试读
continue;
}
Poll::Pending => return Poll::Pending,
}
}
Err(e) => return Poll::Ready(Err(e)),
}
}
}
}关键点:
- 先 try_read(非阻塞尝试)
- WouldBlock → poll_readiness 注册 Waker
- poll_readiness 返回 Ready 时说明有新 readiness,loop 回去重试 try_read
- poll_readiness 返回 Pending → 向上返回 Pending,等 wake
这个模板覆盖了几乎所有 Tokio I/O 类型的 poll_read / poll_write 实现——TcpStream、UnixStream、File、ChildStdout 全都是这个模式。学会写它,你就能给任何自定义 I/O 资源做异步包装。
8.10 和这个系列的其他书的关联
本章讲的 "把堆地址塞进 usize Token" 技巧,和 《Rust 编译器与运行时揭秘》第 5 章(内存布局与 Niche 优化) 里讲的指针 ↔ 整数相互转换的 unsafe 规则直接相关。看过那章后你知道 *const T as usize + usize as *const T 的往返在 provenance 意义上是安全的(只要原指针指向有效内存)——Tokio 这里就是利用这个规则。
readiness 的 Tick + Readiness bit packing,和第 6 章(Task State)的 bit layout 是同一个家族的模式。两章对比读,你会发现 Tokio 源码里"单 atomic 装多字段 + CAS 循环"这个模式至少出现 4-5 次——Task state、IO readiness、AtomicWaker、channel state ... 学会这个模式对整个源码的理解价值巨大。
《Vue 3 设计与实现》第 12 章(生命周期与调度) 讲 Vue 响应式系统的 "标记 dirty → 微任务 flush"——和 Tokio 的 "Driver set_readiness → wake → scheduler poll"同构。两边都是"producer 标记状态 → 延迟分发给 consumer"的模式。
8.9½ RegistrationSet:跟踪所有活跃的 fd
Handle 里的 registrations: RegistrationSet 是什么?它是 Tokio 自己维护的一个 "活跃 ScheduledIo 链表"——记录所有当前注册到 mio 的 fd。
为什么需要这个?mio 本身不提供"列出所有注册"的 API——注册是单向的(注册了就是注册了,mio 记在它的内部数据结构)。Tokio 需要一个外部视图来管理:
- Shutdown 时遍历所有 Registration,deregister + drop ScheduledIo
- Metrics 统计活跃注册数
- 调试和 tracing
RegistrationSet 内部是一个侵入式双向链表(用 linked_list_pointers 字段链接 ScheduledIo)——不需要额外分配,每个 ScheduledIo 既是数据又是链表节点。和第 6 章 OwnedTasks、第 7 章 LocalSet 的 owned list 是同一个家族。
为什么用侵入式链表而不是 Vec 或 HashMap
Vec 的问题:删除某个元素需要 O(n) 或 swap_remove(乱序)。Tokio 的 ScheduledIo 频繁增删,Vec 不合适。
HashMap 的问题:需要一个 key(id)——但 ScheduledIo 本身没有自然 id,如果为 HashMap 造一个 ID 还要维护 id → ScheduledIo 的映射,反而复杂。而且 HashMap 每次 insert / remove 都有 hash 计算 + 可能的 rehash,代价不低。
侵入式链表的优势:
- 增:push 到链表尾部,O(1)
- 删:通过已有指针 unlink,O(1)
- 遍历:顺序遍历,O(n) 但对 shutdown 这种稀有操作够用
- 零额外分配(链表节点直接嵌在 ScheduledIo 里)
这是 C 风格高性能数据结构在 Rust 里的标准做法。第 6 章、第 7 章、第 8 章三次看到侵入式链表——说明这是 Tokio 源码的 signature pattern。
一个微妙的内存安全问题
侵入式链表涉及 UnsafeCell<Pointers<Self>> + 大量 unsafe 代码——因为 "同一个 ScheduledIo 既被持有指针又被链表引用"。Rust 的借用规则原本不允许——但通过生命周期不变式 + unsafe 代码 + Loom 测试,Tokio 保证了它的正确性。
这类 unsafe 代码是 Tokio 源码里最不容易读懂的部分——初学者很难一眼看懂为什么它对。但如果你带着"链表节点的整个生命周期都在 RegistrationSet 持有的 Mutex 保护下"这个约束去看,会发现代码的每一步都合理。
本书不深入这段 unsafe——第 20 章(设计模式)会专门讲 Tokio 里侵入式链表的安全边界。
8.10½ 把 epoll readiness 模型和 Rust async 对齐
上面讲的 Tokio I/O Driver 有一个预设前提:epoll 的 readiness-based 模型和 Rust async 的 pull-based 模型天然对齐。这一节展开一下为什么。
epoll 的语义:
- "告诉我什么时候 fd X 变成可读 / 可写"
- 但 "可读"只是一个信号,不代表真的能读(可能内核提示多了、读实际返回 EAGAIN)
- 应用需要主动
recv/read——epoll 只是提示
Rust async 的语义:
- Future 被 poll 时要么 Ready 要么 Pending
- Pending 意味着"我注册了 Waker,事件到了会叫醒我"
- 被唤醒后再次 poll,还得重新尝试操作(因为状态可能变了)
两者天然匹配:
- epoll "fd 可读" → wake Task
- Task 被 poll → 尝试 read → 如果真读到就 Ready,如果 EAGAIN 就再 Pending(重新注册)
完美的阻抗匹配。
对比 IOCP(Windows 的 completion 模型):
- "提交一个 read 请求,告诉我什么时候完成"
- 完成时数据已经在你的 buffer 里了,不用你再 read
这和 Rust async 的 pull 模型就不那么匹配——IOCP 完成时实际上是 "数据已经准备好",更像一个 push 模型。mio 在 Windows 上模拟 readiness 语义(把 IOCP 完成包装成"可以 read"事件),但这有额外开销。
这也是为什么 monoio 在 Linux 上用 io_uring 比 Tokio 默认的 epoll 更快——io_uring 是 completion 模型,和 IOCP 一致,但可以用 completion 模式更激进地优化(比如零拷贝)。不过 Tokio 选择 readiness + epoll 作为默认,是为了跨平台一致性。
这是 Tokio 的一个主动的技术权衡:用 epoll 方案牺牲在 Linux 上的最后一点性能,换取 Windows / BSD / macOS 也能同样跑。对于它的"通用运行时"定位,这是对的选择。
8.10⅔ 用数字感受 I/O Driver 的开销
为了让抽象的"I/O Driver 很快"具象化,给几个真实数字(基于 Tokio 在 Linux x86_64 上的基准测试):
注册一个 fd(TcpStream::connect 后 register):
mio::Registry::register调用 → 大约 200-500 纳秒(一次 epoll_ctl 系统调用)Box<ScheduledIo>分配 → ~100 纳秒- 总共:~400-700 纳秒
一次 epoll_wait(没有事件、阻塞等):
- 用户态部分:~50 纳秒
- 内核态(阻塞等):无限(取决于有没有事件)
- 如果是非阻塞调用(带 timeout 0):~200-500 纳秒 纯 syscall 开销
一次 epoll_wait 返回 N 个事件 + Tokio 分发 + wake Task:
- epoll_wait 用户态部分:~50 纳秒
- 每个事件处理:~100 纳秒(unpack token + set_readiness 的 CAS + wake 的 mutex + Waker 调用)
- 假设 10 个事件:总共 ~1 微秒
poll_readiness(未 ready,注册 Waker):
- Atomic load:~1 纳秒
- Waiters mutex lock + Waker store + unlock:~30-50 纳秒
- 双检查:~5 纳秒
- 总共:~40-60 纳秒
poll_readiness(已 ready,直接返回):
- Atomic load + bit mask:~1 纳秒
- 总共:~1-5 纳秒(快到可以忽略)
对比传统同步 I/O(比如一个 std::thread 调 recv):
- 线程 spawn:~5 微秒
recv阻塞调用本身:~200-500 纳秒- 线程 context switch(OS 调度另一个线程):~1-5 微秒
Tokio 的优势在"大量并发 I/O + 少量真实 CPU"场景:
- 1 万个并发连接在 1 个线程上跑——用 std::thread 要 1 万个线程、栈开销 80 GB(每个 8 MB);用 Tokio 十几 KB 的 ScheduledIo 就够了
- 单次 I/O 的微秒级延迟 Tokio 和 std 接近;但 Tokio 可以同时维持几万个连接 idle 而不付出任何 CPU / 内存代价
理解这些数字让你在性能调优时有基线参考——"我的服务比这慢很多"就是 bug、"比这快很多"基本不可能。
8.10¾ mio::Waker 的跨线程唤醒机制
Handle 里有一个 waker: mio::Waker 字段。这是 mio 提供的一个特殊 "事件源"——允许其他线程通过它触发 Driver 的 epoll_wait 返回。
具体工作原理(Linux 下):
- mio::Waker 底层是一个
eventfd(2)或pipe(2) - 注册到 mio::Poll 时用
TOKEN_WAKEUP - 其他线程调
waker.wake()→ 往 eventfd 写一个字节 → epoll 立刻发现这个 fd 可读 →epoll_wait返回
为什么需要这个?几种场景:
- 主线程 spawn 新 Task:新 Task 可能需要 Driver 注册 I/O。如果 Driver 正在 epoll_wait 等(永久 block),新 Task 得不到响应——需要 waker 打断
- Time Driver 超时:Timer 到期时需要检查所有定时器——用 waker 打断 I/O Driver 让它出来处理
- Shutdown:关闭 runtime 时 Driver 需要停下来——用 waker + shutdown 标志位
mio::Waker 和 std::task::Waker 名字相似但完全不同。前者是 I/O Driver 的内部唤醒机制,后者是 async 的公开 API。容易混淆但不能混。
TOKEN_WAKEUP = 0 的分支
回到 turn 的事件循环:
rust
if token == TOKEN_WAKEUP {
// 什么都不做
}对 WAKEUP event 什么都不做。因为唤醒的目的已经达到——Driver 从 epoll_wait 返回了。上层 loop 会继续处理 Tokio 调度的其他事情(比如新 Task 的注册、定时器的处理)。token 本身就是信号,不需要 event 携带数据。
这种 "事件即信号、无 payload" 的设计在 OS 事件系统里极常见——比较类似 Unix signal handler。
8.10⅓ 上下游的对接:I/O Driver 和 Scheduler 的协作
前面主要讲 I/O Driver 自己。下面看它和 Scheduler 怎么配合。
multi_thread runtime 的对接:
- 每个 worker 在主循环的 "park" 阶段(第 5 章)调 I/O Driver 的 park
- 但只有一个 worker 在任意时刻真的 epoll_wait——这是通过一个内部 leader/follower 协议保证的
- 其他 worker 在 park 时去"等"那个 leader,一旦事件回来 leader 处理完释放,follower 可能接手成为新 leader
current_thread runtime 的对接:
- 主线程在主循环末尾(event_interval 满或队列空)调 park
- park 直接走 I/O Driver 的 turn
- 没有 leader/follower 问题——只有一个线程
Time Driver 和 I/O Driver 的协作:
- Time Driver 有一个 "下一个定时器到期时间"
- I/O Driver 的
turn(handle, max_wait)接受 max_wait 参数——由 Time Driver 提供 - 意思是:"我要 epoll_wait,但最多等 X 毫秒"——保证 X 毫秒内定时器到期能被处理
这种"I/O 事件和时间事件统一成一次 epoll_wait"的设计是现代事件循环的标准做法——避免 I/O 线程和 Time 线程分开(那样会增加同步开销)。
8.10⅙ 处理"信号"事件
看到 TOKEN_SIGNAL 那段了吗?
rust
} else if token == TOKEN_SIGNAL {
self.signal_ready = true;
}Tokio 支持异步处理 Unix 信号(SIGINT、SIGTERM、SIGUSR1 等)。Signal Driver 往 I/O Driver 里注册一个特殊的 fd(通过 signalfd(2) 或类似机制),事件 token 固定为 TOKEN_SIGNAL。
为什么要固定 token?因为只有一个 Signal Driver、只用一个 fd。直接硬编码 token = 1 简单清晰。对比 ScheduledIo 的动态指针 token,SIGNAL 是静态的。
signal_ready = true 而不是立刻处理——典型的"把外部事件转化为内部标志位,主循环统一处理"模式。实际处理在 Signal Driver 的单独代码路径里,和 I/O Driver 不耦合。
8.10⅞ I/O Driver 代码演化史简述
Tokio I/O Driver 不是一蹴而就的。简要演化:
Tokio 0.1 时代(2017-2018):基于 futures 0.1 + mio 0.6。I/O Driver 结构比现在复杂、使用一个中心化的 "Reactor" 对象、每个资源有 Token 索引。有一些性能问题(特别是注册 fd 时的锁竞争)。
Tokio 0.2-1.0(2019-2020):重构 I/O Driver。引入 ScheduledIo 的概念、Token 作为指针、分离 Driver 和 Handle。这一轮改造让 fd 注册延迟从几微秒降到几百纳秒。
Tokio 1.x(2020-2026):持续细节优化。引入分批 wake_all(减少 Mutex 锁的持有时间)、AtomicWaker + will_wake 的集成、metrics 增强。核心架构没变——说明 1.0 的设计是扎实的。
未来方向:
- io_uring 集成:Tokio 1.x 主分支已经有
tokio-uring子 crate(实验)。未来可能合并到主干作为可选 driver 后端 - 更细粒度的 metrics:tokio-console 的 I/O 可视化持续迭代
- 更好的跨平台:Windows 上的 IOCP 性能和 Linux epoll 不是完全对等,持续打磨
读这段历史的意义:软件系统的"最终设计"不是一天得到的。你今天看到的 Tokio I/O Driver 是 10 年打磨的产物。如果你自己做一个类似系统,第一版做得像 Tokio 1.0 是几乎不可能的——允许自己从简单版本开始,逐步优化。
8.11 本章小结
带走三件事:
- I/O Driver 是 mio 的薄封装——Driver struct 3 个字段、Handle struct 4-5 个字段。核心方法
turn()就是一次mio::Poll::poll+ 事件分发 - Token 编码为 ScheduledIo 堆地址——避开 HashMap 查找,一次事件处理只用 usize→指针的转换。这是 Tokio 性能关键秘密之一
- 每个 fd 有一个 ScheduledIo:readiness(AtomicUsize,Tick + Ready + Shutdown 的 bit packing)+ waiters(
Mutex<Waiters>,单 reader/writer fast path + list fallback)。Future 通过 poll_readiness 注册 Waker、Driver 通过 wake 唤醒 Waker
一个更大的 takeaway
读完前 8 章,你会注意到 Tokio 源码里反复出现几个签名模式:
- 分离独占和共享(Driver/Handle、Core/Shared、Task/JoinHandle)
- 单 AtomicUsize 装多字段(Task State、IO Readiness、Waker refcount)
- 侵入式链表(OwnedTasks、LocalSet tasks、RegistrationSet)
- vtable + 指针作标识(Task header、Waker vtable、Token)
- fast path 的"借用语义"版本(WakerRef、ScheduledIo 的 reader/writer slot)
- CAS loop 封装成 helper(fetch_update_action、set_readiness)
- 分批 wake 避开锁内副作用(Harness wake_all、ScheduledIo wake)
这七个模式合起来就是 Tokio 的"语法"。一旦你识别它们,Tokio 源码里每一段新代码都变成"我之前见过的某个模式的变体"。这种模式库比任何单个功能的理解更值钱——它能让你在下一个运行时、下一个并发系统、下一个性能敏感的项目里,直接取出对应工具。
读完本章再去看一眼 tokio-console
tokio-console 提供了 I/O 资源的可视化视图——打开它运行你的应用,在 "Resources" 标签页里能看到:
- Total I/O events: 累计 epoll 事件数
- Events per turn: 每次 turn 平均处理多少事件
- Registered resources: 当前活跃 fd 数量
- Reader/writer stats: 各 fd 的读写 Waker 触发次数
第一次看可能看不出问题,但当你的服务遇到性能问题时——比如"为什么某个 TcpStream 的 read 延迟越来越高"——tokio-console 能告诉你 "它被 wake 了 1000 次但 poll 后大多返回 Pending"(说明有 readiness 误报或代码 bug)。
第 17 章会更详细讲 tokio-console 的使用。本章先有个印象。
下一章我们深入下一层——mio 本身是什么。你会看到 mio 如何把 epoll / kqueue / IOCP 抽象成统一 API、mio::Poll::poll 的内部实现、为什么 Windows 的 IOCP 是"completion"语义而 Linux 的 epoll 是"readiness"语义——两种根本不同的 I/O 模型如何在 mio 里统一。
延伸阅读
- Tokio 源码:
tokio/src/runtime/io/driver.rs - Tokio 源码:
tokio/src/runtime/io/scheduled_io.rs - mio 仓库:
tokio-rs/mio - 《Rust 编译器与运行时揭秘》第 5 章:指针 ↔ usize 转换的 provenance 规则
- 《Vue 3 设计与实现》第 12 章:响应式调度与 Tokio I/O 调度的同构对比