Appearance
第12章 Connection Dispatcher 与 Role:请求分发状态机
12.1 字节之上的编排
上一章我们读完了 decode.rs / encode.rs / role.rs——Hyper 如何把一个请求的字节变成 http::Request、又把一个 Response 变回字节。但这只是 "单条消息" 的处理。
真实的 HTTP/1.1 连接不只是一条消息。一个连接上:
- 客户端可能发多个请求(keep-alive)。
- 服务端正处理上一个请求时,客户端可能已经发来下一个请求(pipelining——虽然 RFC 允许但实际很少开启)。
- 响应 body 可能在请求读完之前就开始写。
- 发生 Upgrade 时,整条连接要切换到另一个协议(WebSocket、HTTP/2)。
把所有这些协议事件编排成一个可以被 tokio::spawn(connection.await) 驱动的 future——这就是 conn.rs 和 dispatch.rs 的职责。整整 2339 行代码,是整个 HTTP/1 协议栈的指挥中心。
这一章我们把这个指挥中心拆开——Conn<I, B, T> 的状态机、Dispatcher<D, Bs, I, T> 的 poll loop、它们如何把 Decoder / Encoder / user Service / body channel 编排成一个可正确取消、可 keep-alive、可 upgrade 的完整连接。
源码锁定:hyper 1.9.0 / hyper/src/proto/h1/conn.rs (1531 行) / hyper/src/proto/h1/dispatch.rs (808 行)。
12.2 Conn<I, B, T>:一条连接的所有状态
rust
// hyper/src/proto/h1/conn.rs:38-42
pub(crate) struct Conn<I, B, T> {
io: Buffered<I, EncodedBuf<B>>,
state: State,
_marker: PhantomData<fn(T)>,
}三个字段:
io:buffered IO,把原始 IO(TCP stream)包上一层读写缓冲。下一章专题讲。state:连接的所有状态——下面详述。_marker:PhantomData<fn(T)>——T是Server或Client的 tag,编译期分支用。
12.2.1 三套状态:Reading / Writing / KeepAlive
State struct 里有几十个字段(源码 930-961 行),但最核心的是三个 enum:
rust
// hyper/src/proto/h1/conn.rs:963-977, 1022-1028
enum Reading {
Init,
Continue(Decoder),
Body(Decoder),
KeepAlive,
Closed,
}
enum Writing {
Init,
Body(Encoder),
KeepAlive,
Closed,
}
enum KA { // keep-alive status
Idle,
#[default]
Busy,
Disabled,
}三个 enum 是正交的——连接同时有一个"当前在读什么"的状态 + "当前在写什么"的状态 + "还打算复用吗"的状态。三者组合起来决定连接下一步该做什么。
12.2.2 Reading 状态意义
Init:还没收到 request header——等待客户端发第一个字节。Continue(Decoder):收到了带Expect: 100-continue的请求头——发100 Continue响应,等 body 开始。Body(Decoder):正在流式读 body——Decoder 按之前决定的长度语义(Length / Chunked / Eof)切帧。KeepAlive:body 读完了——如果连接允许复用,回到 Init 状态等下一个请求;否则转 Closed。Closed:不能再读了——要么收到了 FIN、要么协议错误、要么用户说别读了。
12.2.3 Writing 状态意义
Init:没有响应正在写——等待 user 产出 response。Body(Encoder):正在写 response body——Encoder 负责 chunk 格式化 / length enforcement。KeepAlive:响应写完了——等对应的 request 也读完就进入 Idle 阶段。Closed:不能再写了。
12.2.4 KeepAlive 状态意义
KA 是最微妙的一个。它记录"这条连接能不能在当前 message 完成之后被复用":
Busy:当前正在处理请求。默认状态。Idle:当前没有请求在跑,可以接收新的。Disabled:永远不复用——下一条 message 结束后必须关。原因可能是 HTTP/1.0、Connection: closeheader、或者协议错误。
源码有个有意思的小妙招:
rust
// conn.rs:1013-1019
impl std::ops::BitAndAssign<bool> for KA {
fn bitand_assign(&mut self, enabled: bool) {
if !enabled {
*self = KA::Disabled;
}
}
}给 KA 重载了 &= 运算符——state.keep_alive &= false; 等价于"如果 false 就设为 Disabled"。这允许代码里这样写:
rust
state.keep_alive &= headers.allows_keep_alive(); // 可能 disable
state.keep_alive &= !headers.has_connection_close(); // 又可能 disable任何一处 false 就永久 disable——这是一个"单向锁"的语义,用 &= 表达得比 if !x { disable() } 更紧凑、更有代数感。这种小运算符重载在很多工业级 Rust 代码里能见到。
12.3 Reading × Writing × KA 的合法转移
把三个 enum 的 product 算出来——3 × 4 × 4 = 48 种状态组合。但实际上大部分组合不合法。典型的生命周期是:
时刻 1:空连接
Reading=Init, Writing=Init, KA=Busy
时刻 2:收到 request header
Reading=Body(decoder), Writing=Init, KA=Busy
时刻 3:body 读完 + 开始写 response
Reading=KeepAlive, Writing=Body(encoder), KA=Busy
时刻 4:response 写完
Reading=KeepAlive, Writing=KeepAlive, KA=Busy
时刻 5:状态清理完,回到空 ←─ 这是 try_keep_alive 完成的
Reading=Init, Writing=Init, KA=Idle
时刻 6:收到下一个 request header
Reading=Body, Writing=Init, KA=Busy
...这套循环通过 State::try_keep_alive() 推动——当 Reading 和 Writing 同时是 KeepAlive 时,检查 KA 状态决定回到 Init 还是 Closed。Source code 在 conn.rs:1108-1130,其实很短:
rust
// 精简版
fn try_keep_alive(&mut self) {
match (&self.reading, &self.writing) {
(Reading::KeepAlive, Writing::KeepAlive) => {
if let KA::Busy = self.keep_alive.status() {
self.keep_alive.idle();
self.reading = Reading::Init;
self.writing = Writing::Init;
} else {
self.close();
}
}
...
}
}读懂这段就掌握了 "keep-alive 复用" 的本质——不是给 socket 起别的名字,而是把两个状态枚举重置回 Init。socket 还是那个 socket、缓冲区还是那个缓冲区——只是这条连接愿意为下一个请求再跑一遍状态机。
12.4 Dispatcher<D, Bs, I, T>:顶层驱动
Conn 只负责状态,driver 在 Dispatcher 里:
rust
// hyper/src/proto/h1/dispatch.rs:21-27
pub(crate) struct Dispatcher<D, Bs: Body, I, T> {
conn: Conn<I, Bs::Data, T>,
dispatch: D,
body_tx: Option<crate::body::Sender>,
body_rx: Pin<Box<Option<Bs>>>,
is_closing: bool,
}五个字段:
conn:上面讲的连接状态。dispatch:D: Dispatchtrait——Server / Client 的分支实现。这个后面细讲。body_tx:入站 body 的发送端——当连接读入 body 数据时,塞给这个 sender,让 user handler 能从对应的Incoming拿到数据。body_rx:出站 body 的接收端——user handler 产出的 Response body,由这里 poll frame 写回连接。is_closing:是不是正在关闭。
12.4.1 Dispatch trait:Server/Client 的分界
rust
// hyper/src/proto/h1/dispatch.rs:29-42
pub(crate) trait Dispatch {
type PollItem;
type PollBody;
type PollError;
type RecvItem;
fn poll_msg(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>;
fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>)
-> crate::Result<()>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>>;
fn should_poll(&self) -> bool;
}四个方法:
poll_msg:产出"下一个要发送"的消息(含 body)。recv_msg:消费"刚收到"的消息(含 body)。poll_ready:底层是否能接受新请求?(映射到 Service::poll_ready)should_poll:有没有待做的工作?
这个抽象是 Server 和 Client 的对称点:
| Server 侧 | Client 侧 | |
|---|---|---|
PollItem(要发的) | StatusCode 响应行 | RequestLine 请求行 |
RecvItem(收到的) | RequestLine 请求行 | StatusCode 响应行 |
poll_msg 的来源 | 从 service.call(req).await 得到 | 从用户 API 队列得到 |
recv_msg 的目的 | 交给 service.call(req) | 交给等待响应的用户 |
两个 impl(Server 和 Client)在源码里分别占 dispatch.rs 500-600 行和 600-700 行。它们各自 handle user-facing 的 API 形状差异——Server 包一个 HttpService<B>、Client 包一个 mpsc receiver。
12.4.2 Server 侧 poll_msg
rust
// hyper/src/proto/h1/dispatch.rs:532-555 精简
fn poll_msg(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Result<(MessageHead<StatusCode>, S::ResBody), S::Error>>>
{
let mut this = self.as_mut();
let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() {
let resp = ready!(fut.as_mut().poll(cx));
// 拿到 user 的 Response<B>
let (parts, body) = resp?.into_parts();
let head = MessageHead {
version: parts.version,
subject: parts.status,
headers: parts.headers,
extensions: parts.extensions,
};
Ok((head, body))
} else {
unreachable!("poll_msg shouldn't be called if no inflight");
};
// 清空 in-flight slot
this.in_flight.set(None);
Poll::Ready(Some(ret))
}逻辑:
in_flight是Pin<Box<Option<S::Future>>>——这就是 user Service 产生的 future。poll它,等 user handler 完成。- 把 Response 拆成 head + body,head 交给 Conn 去写、body 留给 Dispatcher 的 body_rx 持续 poll。
12.4.3 Server 侧 recv_msg
rust
// 简化
fn recv_msg(&mut self, msg: crate::Result<(RequestHead, IncomingBody)>) -> crate::Result<()> {
let (head, body) = msg?;
let req = http::Request::from_parts(head_to_parts(head), body);
let fut = self.service.call(req);
self.in_flight.set(Some(fut));
Ok(())
}收到 request + body,构造 http::Request<Incoming>,调用 user Service 的 call——把返回的 future 存进 in_flight。
self.service.call(req) 就是 user handler 被实际调用的地方。整个 hyper 的所有工作——parse、decode、encode、state machine 管理——都是为了让这一行代码被调用。
12.5 poll_loop:一次 poll 做多少事
Dispatcher 的核心循环:
rust
// hyper/src/proto/h1/dispatch.rs:163-191
fn poll_loop(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
// Limit the looping on this connection, in case it is ready far too
// often, so that other futures don't starve.
//
// 16 was chosen arbitrarily, as that is number of pipelined requests
// benchmarks often use.
for _ in 0..16 {
let _ = self.poll_read(cx)?;
let _ = self.poll_write(cx)?;
let _ = self.poll_flush(cx)?;
if !self.conn.wants_read_again() {
return Poll::Ready(Ok(()));
}
}
trace!("poll_loop yielding (self = {:p})", self);
task::yield_now(cx).map(|never| match never {})
}三件事,循环最多 16 次:
poll_read:从 IO 读 header 或 body。如果读到了 message,传给dispatch.recv_msg。poll_write:如果 user 产出了消息(从dispatch.poll_msg),把 header 写到 buffered IO;如果正在写 body,poll user body 的 frame,通过 encoder 写。poll_flush:把 buffered IO 的字节真正提交到底层 socket。
12.5.1 为什么是 16
rust
// 16 was chosen arbitrarily, as that is number of pipelined requests
// benchmarks often use. Perhaps it should be a config option instead.注释里自己说"arbitrary"——随便选的。但 16 有一定道理:
- HTTP/1.1 pipelining 的 benchmarks 一般开 16 个并发——刚好一次 poll 处理完一个 batch。
- 大多数小请求(header-only)一次 poll 就能完成。
- 如果真有超 16 个 message 堆在同一个 socket 上——最后一行
task::yield_now(cx)让出 task,让 runtime scheduler 有机会先跑别的 future。
这是一个典型的 cooperative yielding 模式——循环到某个上限后主动让贤,避免"饿死其他 task"。卷四《Tokio 源码深度解析》第 14 章(select! 与公平调度)里讨论过 Tokio runtime 的"Budget"机制——tokio 在 await 点会自动插入这类 yielding。Hyper 的 yield_now 是相同的思路——避免任何 future 独占 executor 太久。
12.5.2 wants_read_again 的退出条件
这是 poll_loop 的另一个智慧:循环会提前退出,除非 "读出新东西后状态又变得可读":
rust
if !self.conn.wants_read_again() {
return Poll::Ready(Ok(()));
}大多数时候一次 poll 只做一件事——读一次、写一次、flush 一次,然后返回 Pending 等 IO。只有在"读完一条消息、紧接着下一条又在 buffer 里"(pipelining 场景)时才继续循环。
这样做的好处:非 pipelining 场景(占 99%)没有额外开销,pipelining 场景又能利用缓存中的字节提升吞吐。按 common case 优化是工业级代码的气质。
12.6 poll_read:身兼数职
这是 Dispatcher 最复杂的一个方法(dispatch.rs:192-285,将近 100 行)。我们挑核心看:
rust
fn poll_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
loop {
if self.is_closing { return Poll::Ready(Ok(())); }
if self.conn.can_read_head() {
// 情况 1:还没读 header——读 header
ready!(self.poll_read_head(cx))?;
} else if let Some(mut body) = self.body_tx.take() {
if self.conn.can_read_body() {
// 情况 2:header 读过了、正在流式读 body
match body.poll_ready(cx) { ... }
match self.conn.poll_read_body(cx) {
Poll::Ready(Some(Ok(frame))) => {
if frame.is_data() {
body.try_send_data(frame.into_data().unwrap())
} else if frame.is_trailers() {
body.try_send_trailers(frame.into_trailers().unwrap())
}
...
}
...
}
}
} else {
// 情况 3:没有正在流的 body,可能是 keep-alive 等待
return Poll::Ready(Ok(()));
}
}
}读的三种情况:
- 还没读 header:调
poll_read_head—— 从 IO 读字节,调role::parse_headers解析。解析成功后构造 IncomingBody 的 channel(body_tx 这边存进 Dispatcher,另一边通过dispatch.recv_msg包进http::Request交给 Service)。 - 正在读 body:
body.poll_ready(cx)检查 body 接收者还想继续读(这就是第 10 章讨论的want_tx信号);检查通过后 poll decoder 拉一帧,送到 body_tx。 - 没活儿干:return Ready——让 poll_write 或者下一轮再试。
注意嵌套的三层 Poll 对话:
- dispatcher poll_read 问 Conn:能读 body 吗?
- Conn 问 decoder:decode 一帧吧。
- Decoder 问 buffered IO:要读字节。
- Buffered IO 问 underlying socket:有字节来吗?
任何一层 Pending,整条链挂起。任何一层有东西,信号向上反馈——一路到 dispatcher 把 frame 塞到 body_tx。
12.7 poll_write 与 body_rx 的互动
与 poll_read 对称的 poll_write(略):
- 如果 Writing::Init 且 dispatch 有 message——
poll_msg拿 head + body,encode_headers写 header,body_rx 记住;writing 切到 Body。 - 如果 Writing::Body——poll body_rx 下一帧,用 Encoder 格式化(chunked / length)写 buffered IO。
- body 写完——Writing::KeepAlive;try_keep_alive 检查是否可以复用。
核心循环模式相同:循环往 IO 推数据,直到 Pending。
12.7.1 body_rx 的 Pin<Box<Option<Bs>>>
这个类型很有意思:
rust
body_rx: Pin<Box<Option<Bs>>>,三层包装——Box 堆分配、Pin 固定地址、Option 允许清空。为什么要这么复杂?
Pin:user 的 Body 可能是!Unpin(典型例子是async_stream!宏生成的 Stream),需要 pin 才能 poll。Box:Pin<&mut Bs>需要 Bs 在内存中地址稳定——堆分配保证这一点。Option:body 写完之后要被 drop——set(None)清空 Box 的内容。
Pin<Box<Option<T>>> 这种三层包装是 Rust 异步代码里处理"可替换、可 pin、可 unpin 的 future/stream"的标准模板。理解它比去死记它来得实用——下次你遇到类似需求,知道怎么拼就行了。
12.8 错误、Upgrade、Shutdown 的正交处理
Dispatcher 还要处理三类"不走正常流程"的事件。
12.8.1 错误:poll_catch
rust
// dispatch.rs:122-142
fn poll_catch(&mut self, cx: &mut Context<'_>, should_shutdown: bool)
-> Poll<crate::Result<Dispatched>>
{
Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| {
// Be sure to alert a streaming body of the failure.
if let Some(mut body) = self.body_tx.take() {
body.send_error(crate::Error::new_body("connection error"));
}
self.dispatch.recv_msg(Err(e))?;
Ok(Dispatched::Shutdown)
}))
}任何 poll_inner 的错误都被 poll_catch 吞下:
- 如果有正在接收的 body——把错误发到 body_tx,让 handler 侧的
body.poll_frame看到Err。 - 告诉 dispatch(Server / Client)有错误发生——让它优雅退出,而不是 panic 或挂起。
- 返回
Dispatched::Shutdown——让上层知道连接该关了。
错误不会静默消失——它顺着 body channel 和 dispatch 接口一起传给 user。这是 Hyper 可靠性的一个基石。
12.8.2 Upgrade:特殊分支
当客户端发 Upgrade: websocket + 服务端响应 101 Switching Protocols,连接就脱离 HTTP/1 协议了——TCP stream 被交给 user 继续用。
rust
// dispatch.rs:144-164 简化
if self.is_done() {
if let Some(pending) = self.conn.pending_upgrade() {
self.conn.take_error()?;
return Poll::Ready(Ok(Dispatched::Upgrade(pending)));
} else if should_shutdown {
ready!(self.conn.poll_shutdown(cx))...;
}
...
}pending_upgrade() 返回一个 Pending<Upgraded> ——这个 handle 会在未来的某一刻拿到 raw socket(通过 Upgraded::downcast_into())。Dispatcher 的工作到这里结束——它不会再读写 socket,让给 upgrade handler。
Upgrade 的完整机制是第 18 章的主题。这里只知道 Dispatcher 通过 Dispatched::Upgrade(pending) 这个 退出码 把"连接控制权"交回给上层。
12.8.3 Shutdown 的优雅顺序
rust
else if should_shutdown {
ready!(self.conn.poll_shutdown(cx))?;
}poll_shutdown 调到 AsyncWrite::poll_shutdown——在 TCP 上发一个 FIN,告诉对端"我不再发了"。对端会看到 EOF,自然关闭它的 read half。
这个机制是 TCP 半关闭——hyper 不会粗暴地 close 整个 socket。对端还能继续读缓冲里没被消费的字节(比如你的响应最后几个字节),直到正常 EOF。这是和"快速关"的关键差别——快速关会丢数据,半关保证完整。
12.9 Role 的 maybe_notify 小玩法
最后看一段鲜为人知但极其 clever 的代码。Conn state 里有一个字段:
rust
notify_read: bool,它是一个"我需要被再 poll 一次"的标记。当 Dispatcher 处于 "body 读完了但 KeepAlive 状态还没复用"的瞬间,如果已经有下一个 request 的字节在 buffer 里——不让 task 挂起、直接循环回 poll_read——这就是 wants_read_again 的意义。
rust
// dispatch.rs:186-190
if !self.conn.wants_read_again() {
return Poll::Ready(Ok(()));
}简单两行,但它是 hyper pipelining benchmark 里"吞吐不崩"的秘诀。没有这个检查,每个 pipelined request 要多一次 runtime wake-up 循环——在高并发下开销显著。
有注释特别提到:
Using this instead of task::current() and notify() inside the Conn is noticeably faster in pipelined benchmarks.
翻译:不通过 waker re-notify,直接把下一轮 loop 继续跑——在 pipelined benchmark 里明显更快。
这就是为什么读 hyper 源码你会一再看到"为 benchmark 调优"的痕迹——它是一个 benchmark-driven 的工程代码。每一处非朴素的代码背后几乎都有一个 benchmark 数字推动。
12.10 一张连接生命周期图
把整章讲的东西画成图:
TCP accept
↓
Conn::new(io) — state.reading=Init, writing=Init, KA=Busy
↓
Dispatcher::new(svc, conn)
↓
tokio::spawn(dispatcher) ─── 进入 poll_loop
│
├─ poll_read ───── role::parse ─── dispatch.recv_msg ─── svc.call(req)
│ │
├─ poll_write ───── dispatch.poll_msg ─── role::encode ───── writes header
│ │
│ └── body.poll_frame ─── encoder 写 body
│
├─ poll_flush ───── underlying socket
│
└─ wants_read_again? ─── 继续循环 / yield
事件发生:
- 读完 body → Reading::KeepAlive
- 写完 body → Writing::KeepAlive
- try_keep_alive → 若 KA=Busy,reset 到 Init(开始下一轮)
- 或 KA=Disabled → close(); poll_shutdown
- 或 Upgrade → Dispatched::Upgrade(pending),Dispatcher 退出这张图浓缩了 2339 行代码的指挥流。读懂它你就读懂了 hyper 1.x HTTP/1 的全部顶层逻辑。
12.11 和其他章节的呼应
- 第 11 章讲的 Decoder / Encoder 被 Conn::state 的 Reading / Writing 持有。
- 第 10 章讲的
Incoming::Chan { data_rx, want_tx }的 sender 端就是 Dispatcher 的body_tx——want_tx 的信号传到 Dispatcher 的 poll_read,决定是否 poll decoder。 - 第 13 章(下一章)讲的
hyper::Servicetrait 是被 Server::recv_msg 这一行调用:self.service.call(req)。 - 第 18 章会接回
pending_upgrade()的流程。
整本书到这里进入"hyper 核心最难的部分"——协议状态机。读完第 11、12 章你应该感到有些累——因为它们是全书最 intricate 的章节。但这也是收获最大的两章——这就是工业级 HTTP/1 实现,没有捷径。
12.12 落到你键盘上
- 用
RUSTFLAGS="--cfg hyper_unstable_tracing"编译 hyper 并开tracing——你会看到Reading::Init → Reading::Body → Reading::KeepAlive → Reading::Init这样的状态转移日志。感受状态机实时运转。 - 尝试手动触发 pipelined requests:用
nc手写两个连在一起的 HTTP/1.1 请求(GET /a HTTP/1.1\r\nHost: x\r\n\r\nGET /b HTTP/1.1\r\nHost: x\r\n\r\n),观察 hyper 怎么处理。你会看到wants_read_again的 loop 在工作。 - 读 dispatch.rs 的 Client impl(600-700 行)——它和 Server impl 对称,但承载的是客户端侧的异步队列。读完一遍两者,你会对"同一个 Dispatcher 如何泛化到 Server/Client"有感性认识。
下一章,我们回到 trait 层面——看 hyper 1.x 为什么定义了自己的 Service trait,以及那个看似 "一个字符" 的差别(&self vs &mut self)背后是怎样一段三年讨论。