Skip to content

第18章 upgrade.rs:WebSocket / CONNECT / HTTP/2 承载

18.1 被严重低估的 407 行代码

hyper 的 src/upgrade.rs 只有 407 行。这 407 行让 hyper 承担了 Rust 整个 WebSocket、HTTP CONNECT 隧道、gRPC-Web 代理生态的"底层入口"。tungstenite、tokio-tungstenite、axum-extra 的 WebSocket、reqwest 的 CONNECT、hyper-socks 的 SOCKS via CONNECT、tonic 的 grpc-web——所有这些库都通过 hyper::upgrade::on() 这一个函数拿到底层 IO,然后按自己的协议继续跑。

这一章我们从源码角度拆开 upgrade.rs:**hyper 是如何让一条 HTTP 连接在握手完成后变成任意双工流的?**答案藏在四个类型里:OnUpgradePendingUpgradedParts——我们一个一个看。

读完本章,你会理解:

  • WebSocket 握手时 Upgrade: websocket 头收到后 hyper 内部发生了什么;
  • 为什么 Upgraded 要用 Rewind<Box<dyn Io + Send>> 而不是直接用 TcpStream;
  • HTTP/2 的 CONNECT(extended CONNECT / RFC 8441)如何复用同一套 Upgraded 抽象——即使底层完全不是 TCP;
  • 自己写一个 WebSocket 代理时应该怎么用 on() + spawn 组合。

源码版本以本书锁定的 hyper 1.9.0(commit 0d6c7d5)为准。本章涉及的两个核心文件:

  • hyper/src/upgrade.rs(407 行)—— 统一 upgrade API。
  • hyper/src/proto/h2/upgrade.rs(280 行)—— HTTP/2 对 Upgraded 的实现。

18.2 Upgrade 到底是什么

"upgrade" 这个词在 HTTP 里被用成了三种不同的机制,但 hyper 把它们包成一个统一 API。看源码开头的文档注释(upgrade.rs:1-40):

rust
//! HTTP Upgrades
//!
//! This module deals with managing HTTP Upgrades in hyper. Since
//! several concepts in HTTP allow for first talking HTTP, and then converting
//! to a different protocol, this module conflates them into a single API.
//! Those include:
//!
//! - HTTP/1.1 Upgrades
//! - HTTP `CONNECT`

注释里特意用了 "conflates"(合并/混同)——hyper 自己都承认这是把三种机制强行合到一个 API 里。这是一个值得品味的设计决策——放弃理论纯粹,换工程简便

18.2.1 三种"协议升级"的场景

场景一:HTTP/1.1 Upgrade(WebSocket)。客户端发:

GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13

服务端回 101 Switching Protocols——之后同一条 TCP 不再说 HTTP,开始说 WebSocket frame。这是 RFC 6455 的标准协议升级。

场景二:HTTP CONNECT(TLS 隧道 / SOCKS-over-HTTP)。客户端发:

CONNECT example.com:443 HTTP/1.1
Host: example.com:443

服务端回 200 OK——之后同一条 TCP 作为原始字节通道,客户端就地做 TLS 握手,隧道透传。浏览器通过 HTTP 代理访问 HTTPS 网站就是这个机制。

场景三:HTTP/2 Extended CONNECT(WebSocket over HTTP/2)。RFC 8441 扩展——在 HTTP/2 连接上用 CONNECT + :protocol = websocket 伪 header 再跑一次 WebSocket。单条 HTTP/2 连接上可以同时跑多个 WebSocket stream。

三种机制协议差异极大:HTTP/1 Upgrade 改变整条 TCP 连接的协议;CONNECT 把 TCP 连接转为"字节管道";HTTP/2 extended CONNECT 只影响一个 stream。但从用户视角,它们都是"HTTP 握手成功后,给我一个 Read + Write 的 handle"——hyper 把这个共性抽出来做成统一 API。

18.2.2 统一 API 的入口点

从用户角度看,所有升级场景的入口都是一个函数(upgrade.rs:105-107):

rust
pub fn on<T: sealed::CanUpgrade>(msg: T) -> OnUpgrade {
    msg.on_upgrade()
}

输入可以是 Request<B>Response<B>、或两者的 &mut 版本——由 sealed trait CanUpgrade 约束。拿到 OnUpgrade 之后,.await 等待升级成功,拿到 Upgraded 句柄——这个句柄就是"协议升级后的双工流"。

典型用法:

rust
use hyper::upgrade;
use tokio::io::AsyncWriteExt;

// 服务端视角
async fn handle_ws(req: Request<Incoming>) -> Response<Empty<Bytes>> {
    let upgrade = upgrade::on(&mut req);
    
    tokio::spawn(async move {
        let upgraded = upgrade.await.unwrap();
        // 从这一刻起,upgraded 是一个 Read + Write 的异步流
        // 可以丢给 tungstenite 去跑 WebSocket
        handle_ws_frames(upgraded).await;
    });
    
    // 同时返回 101 Switching Protocols
    Response::builder()
        .status(101)
        .header("upgrade", "websocket")
        .header("connection", "upgrade")
        .body(Empty::new())
        .unwrap()
}

注意两件事并行:Response 告诉客户端"101 切换协议",spawn 出来的 task 等 upgrade 完成再处理字节流。这两条路径必须都走——Response 不发,客户端看不到切换信号;spawn 的 task 不跑,TCP 连接上堆满的字节没人消费。

18.3 OnUpgrade:异步等待的句柄

18.3.1 结构

rust
// hyper/src/upgrade.rs:72-75
#[derive(Clone)]
pub struct OnUpgrade {
    rx: Option<Arc<Mutex<oneshot::Receiver<crate::Result<Upgraded>>>>>,
}
  • rx: Option<...> —— None 表示这条 request/response 没有 pending upgrade;Some 里包一个 oneshot channel 的接收端。
  • Arc<Mutex<...>> —— 外面套 Arc<Mutex> 是为了可 Cloneoneshot::Receiver 本身不可 Clone(otherwise 多处 await 会冲突),但 hyper 把它包进 Mutex 让 OnUpgrade 可以被复制到多个地方——只有第一次 await 的那个拿到结果,其他的永远 pending
  • tokio::sync::oneshot 而非 tokio::sync::mpsc —— 因为 upgrade 只会 fire 一次(成功或失败),oneshot 语义刚好匹配。

18.3.2 Future 实现

rust
// hyper/src/upgrade.rs:224-241
impl Future for OnUpgrade {
    type Output = Result<Upgraded, crate::Error>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.rx {
            Some(ref rx) => Pin::new(&mut *rx.lock().unwrap())
                .poll(cx)
                .map(|res| match res {
                    Ok(Ok(upgraded)) => Ok(upgraded),
                    Ok(Err(err)) => Err(err),
                    Err(_oneshot_canceled) => {
                        Err(crate::Error::new_canceled().with(UpgradeExpected))
                    }
                }),
            None => Poll::Ready(Err(crate::Error::new_user_no_upgrade())),
        }
    }
}

两层嵌套 Result

  • 外层:channel 是否正常关闭(oneshot::Receiver::poll 返回 Result<T, RecvError>)。
  • 内层:upgrade 本身的成功/失败(crate::Result<Upgraded>)。

三种 Err 路径对应三种真实情况:

  1. channel 正常关闭,upgrade 成功 → 返回 Upgraded
  2. channel 正常关闭,upgrade 失败 → 返回 upgrade 失败的具体错误(比如对端早关、协议异常)。
  3. channel 被 drop 了,upgrade 从未发生 → 返回 new_canceled().with(UpgradeExpected)——这是告诉用户"Connection future 没被 poll 到底"。这个错误很重要——它发生在用户写了 upgrade.await,但 Connection future 本身被 drop 了(比如主 task panic、或手动 abort)——底层连接根本没跑,自然升级不了。

第三种错误的错误信息用到了结构化 source chaining:

rust
// hyper/src/upgrade.rs:277-286
struct UpgradeExpected;

impl fmt::Display for UpgradeExpected {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str("upgrade expected but not completed")
    }
}

impl StdError for UpgradeExpected {}

crate::Error::new_canceled().with(UpgradeExpected) 链起来——用户可以通过 e.source() 一路下钻看到具体原因。回想第 17 章讲过 KeepAliveTimedOut 也是同样的手法——hyper 的错误类型体系在这里是一以贯之的

18.3.3 "所有 msg 上都挂 OnUpgrade 吗"

sealed::CanUpgrade 的实现(upgrade.rs:317-355):

rust
impl<B> CanUpgrade for http::Request<B> {
    fn on_upgrade(mut self) -> OnUpgrade {
        self.extensions_mut()
            .remove::<OnUpgrade>()
            .unwrap_or_else(OnUpgrade::none)
    }
}

impl<B> CanUpgrade for &'_ mut http::Request<B> {
    fn on_upgrade(self) -> OnUpgrade {
        self.extensions_mut()
            .remove::<OnUpgrade>()
            .unwrap_or_else(OnUpgrade::none)
    }
}

几个设计细节:

  1. OnUpgrade 存在 http::Request::extensions——Rust 的 http crate 里,Request 有一个 Extensions(基于 TypeId 的 type-map)字段,hyper 把 OnUpgrade 塞进去。这样做不改变 http::Request 的公开字段结构,又能附加 hyper 特有的状态。
  2. remove::<OnUpgrade>() 是"取走"——语义是一次性消耗。不是 get 而是 remove。这防止了"同一个 request 两次调 upgrade::on()"的误用——第二次会拿到 None
  3. fallback 到 OnUpgrade::none()——如果 request 里根本没有 upgrade 机制(普通 GET /index),返回一个空的 OnUpgrade。用户调它的 .await 会立刻拿到 Err(new_user_no_upgrade())——明确告诉"这个 request 不支持升级"。

这三个细节组合起来,让 upgrade::on() 成为一个幂等友好、错误明确、无侵入的 API。

18.4 Pending:fulfill 与 manual

OnUpgrade 是接收端;发送端Pending

rust
// hyper/src/upgrade.rs:113-129
pub(super) struct Pending {
    tx: oneshot::Sender<crate::Result<Upgraded>>,
}

pub(super) fn pending() -> (Pending, OnUpgrade) {
    let (tx, rx) = oneshot::channel();
    (
        Pending { tx },
        OnUpgrade {
            rx: Some(Arc::new(Mutex::new(rx))),
        },
    )
}

Pending 只有一个字段——oneshot 发送端。它暴露两个方法:

rust
// hyper/src/upgrade.rs:255-269
impl Pending {
    pub(super) fn fulfill(self, upgraded: Upgraded) {
        trace!("pending upgrade fulfill");
        let _ = self.tx.send(Ok(upgraded));
    }

    pub(super) fn manual(self) {
        trace!("pending upgrade handled manually");
        let _ = self.tx.send(Err(crate::Error::new_user_manual_upgrade()));
    }
}
  • fulfill:成功升级,把 Upgraded 送过去。
  • manual:用户选择"手动管理升级"——常见于某些奇怪协议:用户绕过 hyper 的 upgrade 机制,自己从 Request::into_body() 转换的底层 IO 开始接管。发 new_user_manual_upgrade() 错误给 OnUpgrade,让用户明确知道"hyper 这边没做升级——你得自己处理"。

注意两处 let _ = self.tx.send(...)——发送结果被显式忽略oneshot::Sender::send 的返回值 Result<(), T> 表示"接收端已经 drop 了吗"——这里 hyper 不关心,用户可能根本没 await OnUpgrade,那就让它悄悄消失。这种"send 失败也没关系"的模式是对称的——OnUpgrade 被 clone 多份时,只有第一个 await 的拿到结果,其他的永远 pending 直到被 drop。

18.4.1 HTTP/1 dispatch 如何安装 OnUpgrade

proto/h1/dispatch.rs:285-305 展示了 server 侧的安装时机:

rust
// hyper/src/proto/h1/dispatch.rs:285-302 (节选)
let body = match body_len {
    DecodedLength::ZERO => IncomingBody::empty(),
    other => {
        let (tx, rx) = IncomingBody::new_channel(other, wants.contains(Wants::EXPECT));
        self.body_tx = Some(tx);
        rx
    }
};
if wants.contains(Wants::UPGRADE) {
    let upgrade = self.conn.on_upgrade();
    debug_assert!(!upgrade.is_none(), "empty upgrade");
    debug_assert!(
        head.extensions.get::<OnUpgrade>().is_none(),
        "OnUpgrade already set"
    );
    head.extensions.insert(upgrade);
}
self.dispatch.recv_msg(Ok((head, body)))?;

关键是 wants.contains(Wants::UPGRADE) 这个判定——hyper 的 HTTP/1 parser 在解析 header 时,看到 Connection: upgrade + Upgrade: <proto> 会在 Wants 里标记 UPGRADE 位。这个 flag 决定要不要创建 OnUpgrade——如果是普通请求就直接跳过,省掉 oneshot channel 的分配。

这一行 debug_assert! 是"幂等保护"——同一个 Request 里不应该已经有 OnUpgrade(如果有,说明 parse 逻辑或上层代码有 bug)。debug build 时崩溃,release build 时静默。

18.5 Upgraded:Read + Write 的透明封装

18.5.1 结构

rust
// hyper/src/upgrade.rs:65-67
pub struct Upgraded {
    io: Rewind<Box<dyn Io + Send>>,
}

只有一个字段,但嵌套了三层:Upgraded.io: Rewind<Box<dyn Io + Send>>

为什么要这三层?

  • 最里层 Box<dyn Io + Send>类型擦除。hyper 不能预测用户的 IO 是 TcpStreamUnixStream、rustls TlsStream、还是其他 wrapper——用 dyn trait object 统一。Io 是一个 sealed trait,定义在 upgrade.rs:290-296
rust
pub(super) trait Io: Read + Write + Unpin + 'static {
    fn __hyper_type_id(&self) -> TypeId {
        TypeId::of::<Self>()
    }
}

impl<T: Read + Write + Unpin + 'static> Io for T {}

任何 Read + Write + Unpin + 'static 类型自动实现 Io__hyper_type_id 方法保留了原类型信息——这是 downcast 的钥匙。

  • 中间层 Rewind<..>字节回放缓冲。为什么需要?因为 HTTP 解析器可能已经读过了一些字节,这些字节在 upgrade 发生时归属新协议而非 HTTP。Rewind 把这些字节缓存起来,下次 poll_read 时先发它们。比如 WebSocket 升级时,客户端的 Upgrade: header 后面可能紧接着跟了 WebSocket frame 的第一个字节——parser 已经读进来了,但 WebSocket 实现需要这些字节。

  • 最外层 Upgraded对外 API 的门面。对外只暴露 Read + Write,隐藏内部的 Rewind 和 Box——用户只看到"一个双工流"。

这种"三层嵌套"是 Rust 里"能力抽象 + 实现细节隔离"的经典模式。每一层承担单一职责,组合出一个对外极简的 API。

18.5.2 Read + Write 的透传

Read 和 Write trait 实现都是 pass-through(upgrade.rs:165-203):

rust
impl Read for Upgraded {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: ReadBufCursor<'_>,
    ) -> Poll<io::Result<()>> {
        Pin::new(&mut self.io).poll_read(cx, buf)
    }
}

impl Write for Upgraded {
    // ...所有方法都是直接 delegate 到 self.io...
}

这里用的是 hyper 自己的 Read / Write trait(hyper/src/rt/io.rs),不是 Tokio 的 AsyncRead / AsyncWrite。这是 hyper 1.x "runtime 无关" 原则的体现——hyper 不直接绑定 Tokio 的 IO trait,用自己的 trait 作 abstraction 点,让其他 runtime(比如 monoio、glommio)也能接入。

18.5.3 downcast:从 Box<dyn Io> 拿回原类型

某些场景下用户需要拿回具体类型——比如 WebSocket 库可能想做 TLS-specific 优化,需要知道底层是 TlsStream 还是原始 TcpStream。hyper 提供了 downcast

rust
// hyper/src/upgrade.rs:147-162
pub fn downcast<T: Read + Write + Unpin + 'static>(self) -> Result<Parts<T>, Self> {
    let (io, buf) = self.io.into_inner();
    match io.__hyper_downcast() {
        Ok(t) => Ok(Parts {
            io: *t,
            read_buf: buf,
        }),
        Err(io) => Err(Upgraded {
            io: Rewind::new_buffered(io, buf),
        }),
    }
}

__hyper_downcast 的实现(upgrade.rs:298-315)是一个手写版的 std::any::Any::downcast

rust
impl dyn Io + Send {
    fn __hyper_is<T: Io>(&self) -> bool {
        let t = TypeId::of::<T>();
        self.__hyper_type_id() == t
    }

    fn __hyper_downcast<T: Io>(self: Box<Self>) -> Result<Box<T>, Box<Self>> {
        if self.__hyper_is::<T>() {
            // Taken from `std::error::Error::downcast()`.
            unsafe {
                let raw: *mut dyn Io = Box::into_raw(self);
                Ok(Box::from_raw(raw as *mut T))
            }
        } else {
            Err(self)
        }
    }
}

为什么不用 Any 因为 Any 要求 'static + Send——Any 本身就行,但 hyper 想只在 Io trait 方法上做 downcast,不让外部利用 dyn Any 能力。用自己的 __hyper_type_id 方法把这个能力局限在 Io trait object 内部——这是一种 "私有 RTTI"。

好处:外部代码想 downcast 必须经过 hyper 的 downcast 方法——不能简单 Box<dyn Io>Box<dyn Any>。这是防止滥用的一个细节。

18.5.4 Parts:把原始 io 和 read_buf 分开

rust
// hyper/src/upgrade.rs:81-95
#[derive(Debug)]
#[non_exhaustive]
pub struct Parts<T> {
    pub io: T,
    pub read_buf: Bytes,
}

downcast 成功后返回 Parts——原始 IO 类型 + 已读缓冲分别暴露。为什么要分开?因为有些协议(比如某些自研二进制协议)知道自己不需要 rewind——他们可以丢弃 read_buf 直接用原 io,省一层 Rewind 的开销。

#[non_exhaustive] 标记让未来可以安全地给 Parts 加字段——用户 match Parts 时必须带 ..,新增字段不破坏用户代码。hyper 1.x 里大量类型打了 non_exhaustive——这是公共 API 稳定的保障手段

18.6 HTTP/2 的特殊战场:H2Upgraded

HTTP/1 的 upgrade 是"改变整条 TCP 连接"——直接把 TCP socket 交给用户。HTTP/2 不同——CONNECT 只影响一个 stream。hyper 为此写了一个专门的 H2Upgradedproto/h2/upgrade.rs:39-44):

rust
pub(super) struct H2Upgraded {
    ping: Recorder,
    send_stream: UpgradedSendStreamBridge,
    recv_stream: RecvStream,
    buf: Bytes,
}

关键字段:

  • send_stream: UpgradedSendStreamBridge—— 经过 mpsc channel 桥接的发送端。不是直接用 h2::SendStream,因为 h2::SendStream 的 API 是 reserve_capacity + send_data——不符合 tokio AsyncWrite 的"一次 write N 字节"语义。桥接层转换这两套 API。
  • recv_stream: RecvStream—— 直接用 h2 的 RecvStream,按 poll_data 返回 Bytes。
  • ping: Recorder—— 第 17 章讲过的 ping Recorder,用来更新 keep-alive 的 last_read_at。即使走 upgrade 路径,keep-alive 探活仍然生效
  • buf: Bytes—— 缓存从 RecvStream 读到但用户没消费完的字节。

18.6.1 UpgradedSendStreamTask:背景驱动任务

HTTP/2 的 upgrade 需要一个独立 task 驱动 SendStream 的流控(proto/h2/upgrade.rs:53-60):

rust
pin_project! {
    #[must_use = "futures do nothing unless polled"]
    pub struct UpgradedSendStreamTask<B> {
        #[pin]
        h2_tx: SendStream<SendBuf<B>>,
        #[pin]
        rx: mpsc::Receiver<Cursor<Box<[u8]>>>,
        error_tx: Option<oneshot::Sender<crate::Error>>,
    }
}

这个 task 做三件事(tick 方法,proto/h2/upgrade.rs:68-133):

  1. 预留容量reserve_capacity(1) + poll_capacity)——告诉 h2 "我要 1 字节的窗口",拿到实际可发量。
  2. 处理 RSTpoll_reset)—— 如果对端 RST 了这个 stream,立刻报错。
  3. 消费来自 Upgraded::poll_write 的字节rx.poll_next)—— 把用户的 write 调用转换成 h2::send_data。

注意这个 task 是 "async function written as manual Future"——用 loop + Poll::Pending / Poll::Ready 组合成一个状态机。为什么不用 async fn?因为 h2 的 reserve_capacity + poll_capacity API 是同步但 return Poll——不是 async fn 友好的签名。手动 Future 给了精确控制。

18.6.2 Read + Write:channel 桥接

H2Upgraded 实现 Readproto/h2/upgrade.rs:158-193)时,从 recv_stream.poll_data 拉 Bytes,切小到 buf 大小,剩余留在 self.buf 下次 poll。每次消费后还要调 flow_control().release_capacity(cnt) 主动增加 window——这是 HTTP/2 流控的端到端背压回传。

Write 实现(proto/h2/upgrade.rs:195-267)把字节推进 mpsc channel,等 UpgradedSendStreamTask 消费。这层间接是必要的——如果直接调 h2::SendStream::send_data,可能被流控卡住,但 AsyncWrite::poll_write 的语义是"写进去就返回"——必须有 buffer。mpsc channel 是这个 buffer。

18.6.3 错误处理:三方 race

H2Upgraded 的读写路径里到处是 "三方 race" 的处理——检查 SendStream 是否 RST、检查 UpgradedSendStreamTask 是否 drop、检查 mpsc 是否关闭。一段典型代码(proto/h2/upgrade.rs:205-219):

rust
match self.send_stream.tx.poll_ready(cx) {
    Poll::Ready(Ok(())) => {}
    Poll::Ready(Err(_task_dropped)) => {
        // if the task dropped, check if there was an error
        // otherwise i guess its a broken pipe
        return match Pin::new(&mut self.send_stream.error_rx).poll(cx) {
            Poll::Ready(Ok(reason)) => Poll::Ready(Err(io_error(reason))),
            Poll::Ready(Err(_task_dropped)) => {
                Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()))
            }
            Poll::Pending => Poll::Pending,
        };
    }
    Poll::Pending => return Poll::Pending,
}

这段代码处理"mpsc channel 的 sender side drop 了"的情况——先 poll error_rx(看后台 task 有没有发具体错误),再 fallback 到 broken pipe。这种"check specific error first, fallback to generic"的模式在 async Rust 里处理 drop 场景很常见——drop 可能由多种原因引起,需要下钻看具体是哪种

注释里的 "i guess its a broken pipe" 是作者的口语风格——实际意思是已排除其他情况后,默认按 broken pipe 处理。这种口语化注释在 hyper 源码里不罕见——它降低了代码的"不可错误"气场,反而更诚实。

18.6.4 对照 HTTP/1 与 HTTP/2 的 upgrade 心智差异

值得把两种协议下的升级流程并排看一眼:

HTTP/1.1 Upgrade 时序

Client → TCP: GET /ws HTTP/1.1
             Upgrade: websocket
             Connection: Upgrade
             Sec-WebSocket-Key: ...

Server → TCP: HTTP/1.1 101 Switching Protocols
             Upgrade: websocket
             Connection: Upgrade
             Sec-WebSocket-Accept: ...

   [从此 TCP 连接上不再有 HTTP 字节,全是 WebSocket frame]

Client → TCP: <WebSocket frame #1>
Server → TCP: <WebSocket frame #2>
...

TCP 连接整体易手——HTTP 状态机退出,WebSocket 状态机接管。hyper 在解析完 101 的 header 后立刻把 TCP 交给 UpgradedHTTP 的那一层彻底消失

HTTP/2 Extended CONNECT 时序

HTTP/2 Connection preface + SETTINGS 已协商完成
  (server 已通告 SETTINGS_ENABLE_CONNECT_PROTOCOL = 1)

Client → h2 stream 1: HEADERS  :method=CONNECT
                                :protocol=websocket
                                :scheme=https
                                :path=/ws
                                :authority=example.com
                                sec-websocket-version=13

Server → h2 stream 1: HEADERS  :status=200

   [stream 1 上开始传 WebSocket frame 作为 DATA frame payload]

Client → h2 stream 1: DATA <websocket frame>
Server → h2 stream 1: DATA <websocket frame>
...
   [其他 HTTP/2 stream 继续正常跑]

关键差异:TCP 连接不改变协议——HTTP/2 frame 层继续运行,WebSocket frame 作为 DATA frame 的 payload 嵌套进去。同一条 TCP 上可以同时跑 10 个 WebSocket(10 个不同 stream)+ 20 个普通 HTTP 请求——多路复用能力得以保留。这是 extended CONNECT 相对于 HTTP/1 upgrade 的最大价值。

hyper 的 H2Upgraded 就是这个场景的 IO 视图——对用户来说仍然是 Read + Write,但底下跑的是 h2 stream 的 SendStream + RecvStream。

18.6.5 为什么 enable_connect_protocol() 是显式开关

上一章的 17.2 里提过 Builder::enable_connect_protocol()——这是 server 侧显式打开 RFC 8441 支持的开关。为什么不默认打开?

回到 RFC 8441 的机制——server 必须在 SETTINGS 里通告 SETTINGS_ENABLE_CONNECT_PROTOCOL = 1,否则客户端不会尝试发带 :protocol 伪 header 的 CONNECT。hyper 默认不通告,意味着:

  1. 普通 HTTP/2 server 不会被意外"激活"为 WebSocket endpoint——保持最小暴露面。
  2. 老版本客户端看到默认 SETTINGS,直接按"不支持"处理——向后兼容。
  3. 用户主动调用 .enable_connect_protocol() 才通告——显式 opt-in

这是 hyper 设计里反复出现的模式——能力默认关闭,显式打开。第 17 章讲过 keep_alive_interval 默认 None、max_pending_accept_reset_streams 默认 None——都是同一个原则。这背后的哲学是"库不应该替用户决定":每个能力都有代价(性能、复杂度、安全),由调用方按业务需要决定。

18.7 实战:写一个 WebSocket echo server

把所有抽象连起来,写一个最小 WebSocket echo server——用 hyper 接受 upgrade,用 tokio-tungstenite 处理 WebSocket frame:

rust
use hyper::{Request, Response, StatusCode};
use hyper::body::Incoming;
use hyper::service::service_fn;
use http_body_util::Empty;
use bytes::Bytes;
use tokio_tungstenite::{WebSocketStream, tungstenite::protocol::Role};

async fn handler(mut req: Request<Incoming>) -> Result<Response<Empty<Bytes>>, hyper::Error> {
    if !is_websocket_upgrade(&req) {
        return Ok(Response::builder()
            .status(400)
            .body(Empty::new())
            .unwrap());
    }
    
    let key = req.headers().get("Sec-WebSocket-Key").unwrap().clone();
    let accept = compute_accept(&key);
    
    // 提取 OnUpgrade——注意这一步"消耗"了 request extensions 里的 OnUpgrade
    let upgrade = hyper::upgrade::on(&mut req);
    
    // 在背景 task 里等 upgrade 完成
    tokio::spawn(async move {
        match upgrade.await {
            Ok(upgraded) => {
                // upgraded: hyper::upgrade::Upgraded
                // 需要转换成 tokio 的 AsyncRead+AsyncWrite
                let upgraded = hyper_util::rt::TokioIo::new(upgraded);
                let ws = WebSocketStream::from_raw_socket(
                    upgraded, Role::Server, None
                ).await;
                echo_loop(ws).await;
            }
            Err(e) => eprintln!("upgrade failed: {}", e),
        }
    });
    
    // 主路径返回 101
    Ok(Response::builder()
        .status(StatusCode::SWITCHING_PROTOCOLS)
        .header("connection", "upgrade")
        .header("upgrade", "websocket")
        .header("sec-websocket-accept", accept)
        .body(Empty::new())
        .unwrap())
}

几个关键点:

  1. upgrade::on(&mut req) 必须在 service_fn 返回 Response 之前调用——否则 request 被移动/消耗后 extensions 就没了。
  2. spawn + await 模式—— upgrade 在 response 回到客户端之后才真正发生(客户端收到 101 才可能开始发 WebSocket frame)。必须 spawn 到独立 task,不能 block 在 response 返回前。
  3. hyper_util::rt::TokioIo 包装—— Upgraded 用的是 hyper 自己的 Read + Write;tungstenite 需要 tokio 的 AsyncRead + AsyncWriteTokioIo 是 hyper-util 提供的 adapter(下一章详讲)——把两套 IO trait 桥起来。
  4. compute_accept—— WebSocket 握手的 Sec-WebSocket-Accept 需要按 RFC 6455 §4.1 计算(key + magic string → SHA-1 → base64)。hyper 不做这个——它是 WebSocket spec 的事,tokio-tungstenite 里有 handshake::derive_accept_key 可以用。

18.7.1 客户端视角的 upgrade:主动请求切换

上面讲的都是 server 接 upgrade。客户端视角稍微不一样——需要主动发 Upgrade 请求、然后从 Response 拿 OnUpgrade:

rust
use hyper::Request;
use hyper_util::client::legacy::Client;

async fn client_websocket() -> Result<(), Box<dyn std::error::Error>> {
    let client = Client::builder(TokioExecutor::new()).build_http();
    
    let req = Request::builder()
        .method("GET")
        .uri("http://example.com/ws")
        .header("Connection", "Upgrade")
        .header("Upgrade", "websocket")
        .header("Sec-WebSocket-Key", "dGhlIHNhbXBsZSBub25jZQ==")
        .header("Sec-WebSocket-Version", "13")
        .body(Empty::<Bytes>::new())?;
    
    let mut resp = client.request(req).await?;
    if resp.status() == 101 {
        let upgrade = hyper::upgrade::on(&mut resp);
        let upgraded = upgrade.await?;
        // 现在可以跑 WebSocket ...
    }
    Ok(())
}

几个和 server 对称的细节:

  • upgrade::on(&mut resp)——参数变成了 &mut Response<_>。客户端从 response 里拿 OnUpgrade,server 从 request 里拿。CanUpgrade sealed trait 同时实现在 Request / Response 两侧,这就是对称的根源。
  • 判定条件不同——server 看 wants.contains(Wants::UPGRADE)(从 request 的 Connection/Upgrade header 推断);client 看 response.status() == 101
  • 握手时机——client 发完 request 后阻塞等 response;response 回来才知道 upgrade 是否被 accept。如果对端是普通 HTTP server 不支持升级,会回 200 + 普通 body——客户端需要检查 status 做降级。

18.7.2 CONNECT 方法的特殊处理

HTTP CONNECT 稍微特殊——它不走 Upgrade header 机制,而是用 CONNECT host:port HTTP/1.1 这样的特殊 request line。hyper 对 CONNECT 的处理在 proto/h1/role.rs 里识别:CONNECT 方法自动打开 upgrade 通道——不需要 Upgrade header。server 在回 200 之后同一条 TCP 就是 raw byte tunnel。

这是代理服务器最常用的机制。Chrome → HTTP proxy → target server 的典型流程:

Chrome  → proxy: CONNECT target.com:443 HTTP/1.1
proxy   → target.com:443: TCP connect
proxy   → Chrome: HTTP/1.1 200 OK
Chrome  ↔ target via proxy: raw bytes (TLS handshake + 加密业务数据)

proxy 自己 spawn 两个 task:一个从 Chrome 的 Upgraded 往 target 的 TcpStream 写,一个反向。两个 spliced 的 AsyncRead/Write 就把字节管道双向打通。

这是为什么 hyper 的 upgrade 抽象统一了 HTTP/1 Upgrade 和 CONNECT——对用户来说,两者的产物都是 Upgraded,后续处理完全一致。Java 的 Jetty、Node.js 的 http module 也都遵循这个统一,但 Python aiohttp 把 CONNECT 独立出来——每种语言选择了不同边界。

18.7.3 为什么 Upgraded 要 Send

回看 Upgraded 的 inner 类型:Box<dyn Io + Send>Send bound 是必须的——因为用户几乎总会 tokio::spawn 处理 upgrade 后的流,spawn 需要 Send future。如果 Upgraded 不是 Send,spawn 编译不过。

但这也排除了一些场景——比如底层 IO 是 Rc<RefCell<..>> 包着的(非 Send 类型),就不能用 hyper 的 upgrade 路径。这些场景罕见,但值得知道边界。tokio::task::spawn_local + !Send future 是一条备选路径,但 hyper 的 API 不原生支持。

18.8 和其他生态的对照

不同语言栈处理 upgrade 的 API 风格:

upgrade API 风格控制细节
Node.js httprequest.on('upgrade', (req, socket, head) => { ... })事件回调,socket 是原始 TCP
Go net/httphttp.Hijacker interface: conn.(http.Hijacker).Hijack()主动 "抢占" 底层连接
Python aiohttpWebSocketResponse().prepare(request)专用 response 类型
Rust hyperupgrade::on(req) + UpgradedFuture + 统一 IO handle
Rust actix-web更高层的 ws::start(req, stream, actor)内嵌 actor 模型

hyper 的设计特征是"最小暴露、高度通用"——只给你 Read + Write,其他协议的事用户自己处理。这和 hyper 的整体哲学一致——它想当"基础设施",不当"应用框架"。actix-web 走的是反方向——把 WebSocket 抽到 actor 模型,开发体验更好,但对"奇怪协议"的支持不如 hyper 灵活。

这种区别回到第 1 章讨论过的"hyper 是 platform 还是 framework"——hyper 选了 platform 那条路,upgrade.rs 是典型案例。

18.8.1 为什么 "Rewind" 是本章的关键抽象

我们在 §18.5.1 里提过 Rewind<Box<dyn Io + Send>> 中间那层 Rewind。这里展开讲一次——为什么 Rewind 是 hyper 整个 upgrade 机制的隐形基石

HTTP/1 的握手阶段,parser 是按字节推进的。它读到 \r\n\r\n 时知道 header 结束——但在此之前,可能已经把下一个 WebSocket frame 的第一个字节也读进了 buffer。这是因为 TCP 层面 read 返回的是"到手多少算多少"——parser 只能从 buffer 拿,不能拒绝 buffer 里已有的字节。

这些"读多了"的字节在普通 HTTP 场景下属于下一个请求;但在 upgrade 场景下属于新协议。如果 hyper 把底层 TcpStream 直接交给用户(不经过 Rewind),那些字节就丢了——WebSocket 收到半截 frame、TLS 握手失败、你的 SOCKS 隧道一开始就错位。

Rewind 的实现很简单(hyper/src/common/io/rewind.rs 那 ~80 行):

struct Rewind<T> {
    pre: Option<Bytes>,   // parser 读剩下的字节
    inner: T,             // 原 IO
}

impl<T: Read> Read for Rewind<T> {
    fn poll_read(..., buf) -> Poll<io::Result<()>> {
        if let Some(prefix) = self.pre.take() {
            // 先把 prefix 发给用户
            buf.put_slice(&prefix);
            return Poll::Ready(Ok(()));
        }
        // prefix 空了之后才回到 underlying IO
        Pin::new(&mut self.inner).poll_read(cx, buf)
    }
}

"先发 prefix,后发 underlying"——一行代码挽救了字节边界。从 WebSocket 代理到 CONNECT 隧道,所有 upgrade 用户都依赖 Rewind 这一层的正确——即使他们从未听说过它

这是一种隐形抽象——好的基础库会把这种"脏活"内化,让用户写起来感觉"上帝给了我一个完美的双工流"。只有当你去读源码、追问"为什么 Upgraded 不是直接用 TcpStream",才能看到 Rewind 这一层的存在。这是本书特别关注的一种价值——把隐形抽象翻到明面上

18.8.2 Tokio oneshot 在这里到底扮演什么角色

回看 Pending/OnUpgrade 用了 tokio::sync::oneshot——这是一个值得单独讲的选择。

为什么是 oneshot 而非其他

  • oneshot 的 send 是无锁 + 一次性—— oneshot 内部是一个 AtomicUsize 状态 + 两个 Waker slot,比 mpsc 的 ringbuffer 轻得多。upgrade 只需要送一次 Upgraded 过去,过多能力反而是浪费。
  • oneshot 的 drop 语义明确—— sender 被 drop 会把 receiver 标为 closed,receiver 能立刻感知到;反之亦然。我们在 §18.3.2 看到的 new_canceled().with(UpgradeExpected) 就是利用这个机制。
  • oneshot 不需要 Send bound—— 实际上 oneshot 是 Send 的,但即使 hyper 里有些路径不是 Send,oneshot 也能工作(Sender 不必 Send 给别的线程发)。

OnUpgrade 包了一层 Arc<Mutex<receiver>>——为什么

oneshot::Receiver 本身不可 Clone。但用户经常需要把 OnUpgrade 多地方传递 + 偶尔 clone(比如在中间件里转发),所以 hyper 包一层 Arc<Mutex>。代价是:只有第一个 await 的 clone 拿到结果——其他 clone 会永远挂在那里直到被 drop。这是一个细微但重要的语义:OnUpgrade 的 clone 是"引用复制",不是"结果广播"。如果你想让多处代码都拿到 Upgraded,自己用 broadcast channel。

回到卷四《Tokio 源码深度解析》第 13 章 channels 里讲过 oneshot 的实现——如果你读过那一章再看 hyper 这里的用法,会觉得一切都很自然。通信原语选哪一个,往往决定了整个模块的 API 形状

18.9 一个实际的生产事故:升级后的超时

真实复盘——某团队用 hyper + tungstenite 做 WebSocket 网关。生产上发现部分连接在大约 60 秒后无端断开,但业务代码从未主动关。抓包显示:server 主动发了 FIN。

追查:问题在 tower::timeout 中间件。网关用了 axum + tower,整条 layer 里套了 .timeout(Duration::from_secs(60))。60 秒 timer 启动于 service.call(req) 被调——但 upgrade::on() 被 spawn 到独立 task 后,主 future(返回 Response 101)在一瞬间就 resolve 了——timer 不会 cancel?错——timer 是 tower::timeout 中间件跟随主 future 的,主 future resolve 后,timer 就释放了。所以 60 秒 timeout 按理不会影响 WebSocket 长连接。

但实际不是这样——tower::timeout 的某个版本(0.4.x)里,.timeout() layer 会 包住整个 request-response 生命周期,包括已经返回的 response 的 future。这在普通 HTTP 下无意义(response 返回后就结束),但 WebSocket upgrade 里 response 已经返回但底层连接还在跑——被 timer 的 drop 链路间接影响。

解决:把 timeout 中间件从 WebSocket 路径上拿掉——用 axum 的 per-route layer,只给非 upgrade 路由配 timeout。这个教训——upgrade 改变了 request lifecycle 的假设,常规中间件不一定适用。这是 hyper 组件互相耦合的边界坑之一——独立看每一层都正确,组合起来在某个场景下出现非预期行为。

18.9.1 读字节 + 写字节:手写一个最小 CONNECT 代理

为了让 upgrade 的全部路径具象化,写一个最小的 HTTP CONNECT 代理——这是真实世界里 upgrade 用得最频繁的场景之一。代码约 70 行,足够拿去做 MITM 调试工具或者 curl 代理:

rust
use http_body_util::Empty;
use hyper::{Request, Response, StatusCode, body::Incoming};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use bytes::Bytes;
use tokio::{io::copy_bidirectional, net::TcpStream};

async fn proxy(
    req: Request<Incoming>,
) -> Result<Response<Empty<Bytes>>, hyper::Error> {
    // 只处理 CONNECT method
    if req.method() != hyper::Method::CONNECT {
        return Ok(Response::builder()
            .status(StatusCode::METHOD_NOT_ALLOWED)
            .body(Empty::new())
            .unwrap());
    }
    
    // 解析 authority,格式是 "example.com:443"
    let authority = req.uri().authority().cloned();
    let Some(authority) = authority else {
        return Ok(Response::builder()
            .status(StatusCode::BAD_REQUEST)
            .body(Empty::new())
            .unwrap());
    };
    
    // spawn 一个 task,等 upgrade 完成后做隧道
    tokio::spawn(async move {
        // hyper::upgrade::on 拿 OnUpgrade
        match hyper::upgrade::on(req).await {
            Ok(upgraded) => {
                // 连上 target server
                match TcpStream::connect(authority.as_str()).await {
                    Ok(target) => {
                        let mut upgraded = TokioIo::new(upgraded);
                        let mut target = target;
                        // 双向 copy 直到任一侧关闭
                        match copy_bidirectional(&mut upgraded, &mut target).await {
                            Ok((bytes_c2s, bytes_s2c)) => {
                                tracing::info!("tunnel closed: {}↑ {}↓", bytes_c2s, bytes_s2c);
                            }
                            Err(e) => tracing::warn!("tunnel error: {}", e),
                        }
                    }
                    Err(e) => tracing::warn!("target connect failed: {}", e),
                }
            }
            Err(e) => tracing::warn!("upgrade failed: {}", e),
        }
    });
    
    // 主路径立刻回 200
    Ok(Response::builder()
        .status(StatusCode::OK)
        .body(Empty::new())
        .unwrap())
}

几个细节值得体会:

  1. hyper::upgrade::on(req) 消耗了 req——这和我们前面看到的 &mut req 版本不一样。CONNECT 场景通常不需要读 req 的其他字段,所以直接消耗更简洁。
  2. copy_bidirectional 是 tokio 提供的"并发复制两个方向"工具——内部 select 两个 copy 方向,任一方向结束就退出。这比手写两个 spawn 更简洁、错误处理更统一。
  3. spawn 出的 task 里才真正做"upgrade 等待"——主路径已经回 200 让 client 开始发 TLS 握手。两端并行跑,没有阻塞点。
  4. TokioIo 适配——这是 hyper-util 的 adapter,把 hyper 自己的 Read/Write 转成 tokio 的 AsyncRead/AsyncWrite。下一章讲。

跑起来之后拿 curl 测试:curl -x http://localhost:3000 https://example.com —— curl 会发 CONNECT、收 200、做 TLS 握手、读 response——所有字节都经过这 70 行代码中转。一台笔记本能跑到几千并发。

这个例子覆盖了本章讲到的几乎所有 abstractions——OnUpgrade、Upgraded、downcast(隐式用到)、Rewind(隐式用到)、oneshot(隐式用到)。如果你把这 70 行代码彻底读懂,你就真的理解了 hyper upgrade 的每一根齿轮如何咬合

18.10 Upgrade 设计的关联与延伸

Upgrade 模块让 hyper 跨越了"HTTP 库"的边界——它让同一个库同时能做 HTTP 服务、WebSocket 服务、HTTP 代理、TLS 隧道。这种跨协议能力背后有一个更深的设计原则——"HTTP 是传输协议的起点,不是终点"

这个理念在 Upgraded 结构的选择里体现得很清楚——它不是一个 HTTP 对象,而是一个通用 IO。从 hyper 角度看,HTTP 只是一个握手协议——握手完了之后这条连接归用户用。tower 的 Service 抽象也有类似倾向——Service<Req> 不绑定 HTTP,能跑任何 Req 类型。hyper 和 tower 在"不绑协议"这件事上是同气连枝的

和《Serde 元编程》第 2 章 Data Model 里 Serde 的 Serializer/Deserializer 对照——Serde 把"数据结构 → 数据格式"抽象成 29 种原语,让每一种数据结构能输出到任意格式。hyper 的 upgrade 做的是同种事情——把"HTTP 握手 → 任意双工协议"抽象成 Upgraded 一个类型,让每一种 HTTP server/client 都能变成任意协议的端点。"多对多关系拆成中间层"——这是 Rust 生态库设计的共同模式,反复在不同层出现。

18.10.1 从 http crate 的 Extensions 看 upgrade 的嵌入

回头看 CanUpgrade::on_upgrade 里用的 extensions_mut().remove::<OnUpgrade>()——这个 Extensionshttp crate(见本书第 9 章)的产物,不是 hyper 的。hyper 把自己的 state 塞进标准 http::Request/Response 的扩展槽里,保持了与 http crate 的类型对齐

这个做法在 Rust 生态里非常常见:

  • http::Request::extensions 原本是给用户塞任意上下文数据用的(中间件 chain 里的状态)。
  • hyper 借这个机制把 OnUpgrade 塞进去,用户和中间件"看不到"这个 field(Extensions 基于 TypeId 查找,非它认识的 TypeId 看不到)。
  • tower 的中间件完全对 OnUpgrade 无感——它处理完 request 不管 extensions 里多一个类型。
  • 只有当用户调 upgrade::on(&mut req) 时,OnUpgrade 才被从 extensions 里拿出来。

这是一种 "类型命名空间" 作为扩展机制——比传统的 attributes: HashMap<String, Any> 优越得多:编译期类型检查零查找冲突中间件透明

对比 Go 的 context.Context——Go 的 context 用 WithValue(key, value) 把任意 k-v 塞进 context,每次取时要做类型断言。Rust 的 Extensions 用 TypeId 作 key——少一层 runtime 断言。这是 Rust 在 "静态 + 动态" 边界上做的一个小而具体的优势。

18.10.2 Upgrade 模块在 hyper 演进中的角色

翻 hyper 的 git history,会看到 upgrade.rs 文件最早出现于 2018 年,当时 hyper 还在 0.12.x 阶段。随后几年它经过了几轮重大改动:

  • 0.12 → 0.13:引入 OnUpgrade 的 future 形式,替代原来的回调风格。
  • 0.13 → 0.14:把 Upgraded::new 改为 pub(super),开始用 trait object 类型擦除。
  • 0.14 → 1.0:把 IO trait 从 tokio::io::AsyncRead/Write 切到 hyper 自己的 Read/Write trait——"runtime 无关"大跃进。

hyper 1.x 的核心设计决策之一就是 "解绑 Tokio"。upgrade.rs 是这个解绑工作里改得最多的模块之一——因为它暴露的 Upgraded最终要 hand off 给用户的对象,IO trait 选择直接决定生态兼容性。

"runtime 无关"到底值不值?这是个争议话题。支持者说:"这让 hyper 成为真正的 HTTP 基础库,能跑在 tokio、monoio、glommio 上";反对者说:"99% 用户都在 tokio 上,多加一层间接是无谓的复杂"。最终的事实是——hyper-util 这个 crate(下一章主题)把 tokio 兼容层集中到了一个外部 crate,hyper 核心保持 "纯粹",用户可选 tokio 绑定。所以 Upgraded 里的 Read + Write 是 hyper 自己的——我们在上面示例里看到的 TokioIo::new(upgraded) 就是从 hyper trait 桥到 tokio trait 的 adapter。

这段 API 演进史的教训是——库的 trait bound 是一个长期代价。一旦你的类型签名里写了 AsyncRead + AsyncWrite(tokio 的 trait),你就把自己绑死在 tokio 上。hyper 1.x 选择抽出自己的 trait——短期多写几百行代码,长期换来生态空间。这是基础库作者的典型长期主义决策。

18.10.3 读懂 upgrade 带给你的更广视野

本章讲的 "HTTP 是握手、之后是任意协议" 这个思路,在更大视野里是网络协议工程的重要模式

  • TLS 的 ClientHello + ServerHello → application data 是一样的——TLS 协议的前几个包是握手,之后变成加密字节管道。
  • SSH 的 version exchange + kex → session 也是同样——SSH 先协商 version + key,之后走加密隧道。
  • QUIC 的 0-RTT / 1-RTT handshake → stream data 仍然是——QUIC 前几个 frame 做握手,之后多路复用 stream。
  • gRPC 的 HEADERS + DATA + TRAILERS 可以看作是 HTTP/2 上的另一层协议——"HTTP/2 是一条管道,gRPC 在上面跑自己的语义"。

一旦你习惯了这个视角——"握手 + 隧道"—— 你就能快速理解任何新协议的设计。本章只是一个具体例证。

18.10.4 upgrade 与 body 的耦合:容易漏的一步

本章前面一直把 upgrade 和 body 分开讲,但它们在 hyper 内部是有物理交互的。看 proto/h1/dispatch.rs:285-302 的上下文——OnUpgrade 是在解析 body 的同时被安装的。如果 client 发了一个带 body 的 Upgrade 请求(比如 POST /ws ... Upgrade: websocket + 几个字节 body),hyper 的处理是:

  1. 解析 header,识别到 Upgrade。
  2. body 仍然按正常方式处理——创建 IncomingBody channel,把 body 字节流 dispatch 给用户 handler。
  3. 同时安装 OnUpgrade。
  4. 用户 handler 通常会消费掉 bodyreq.into_body().collect().await),然后回 101 response
  5. Response 写完之后——剩余的 body 字节(如果用户没读完)被丢弃;parser 状态机切到 upgrade 模式;OnUpgrade 被 fulfill。

漏洞场景:用户忘记消费 body 就直接升级——结果那几个字节永远留在 Rewind buffer 里,被当作 WebSocket 的前几字节吐给 tokio-tungstenite——解析错误

实际遇到这个 bug 的调试方式是:打开 RUST_LOG=hyper=trace,观察 Rewind 的 prefix 字节有没有异常膨胀。生产中这个 bug 极少——因为 WebSocket 握手请求通常没 body。但自定义协议里可能踩到——"协商时带一些参数"的设计者要注意。

防御:在 handler 里先消费 body、再发 response、再 spawn upgrade 处理。这个顺序保证了 hyper 状态机不会在 upgrade 时卡住。

18.10.5 源码的阅读建议

如果你只有半小时读 upgrade 模块,建议这样走:

  1. 10 分钟读 hyper/src/upgrade.rs——所有公开 API 在这 407 行里。重点看 OnUpgrade::Future::pollCanUpgrade sealed trait 实现。
  2. 10 分钟读 hyper/src/common/io/rewind.rs——上面提到的那个隐形基石,80 行
  3. 10 分钟读 hyper/examples/upgrades.rs——官方示例,从用户视角走一遍所有 API。

如果你有 3 小时——追加以下文件:

  1. hyper/src/proto/h1/dispatch.rs:285-305——HTTP/1 server 如何安装 OnUpgrade
  2. hyper/src/proto/h1/conn.rs:163-...——pending_upgrade 如何被触发。
  3. hyper/src/proto/h2/upgrade.rs——HTTP/2 的 H2Upgraded 全套

这个顺序从"用户 API"到"内部实现"逐步下钻——最能体会 hyper 的"分层清晰、边界明确"风格。

18.10.6 一次性 vs 长期性:API 形状的深层讨论

看到这里,应该对 OnUpgrade 有一个直观的理解——它是"一次性事件 + 最终产物交接"这个模式的模板实现。这个模式在 Rust 库设计里反复出现:

  • tokio::sync::oneshot::Receiver —— 一次性通知,收到 T 就结束。
  • JoinHandle<T> —— spawn 出去的 task 的"最终结果"句柄,一次性。
  • tonic::Response<T> —— gRPC 的响应,一次性。
  • http_body::Body::frame —— 单个 frame,链式调用直到结束。
  • OnUpgrade —— upgrade 完成后的产物句柄,一次性。

这些 API 的共同特征:

  1. Output = Result<T, Error> 的 Future——不是 Stream、不是 Iterator。
  2. 一旦 resolve 不能再用——第二次 await 通常挂起或报错。
  3. 错误类型有语义分类——不是一个 Box<dyn Error>,而是明确的 enum / 结构化 Error。
  4. Clone 语义要明确——可以 Clone 的话,是"多方引用同一个"还是"广播复制"必须在文档里写清楚。

反模式——如果某个库给了你一个 OnUpgrade-like 的对象但允许多次 poll 每次返回不同值——你应该警觉。这违反了 Future 的契约(Future::poll 返回 Ready 之后行为未定义)。

设计自己的 API 时,如果你的事件是"一次性 + 最终产物",照搬 OnUpgrade 的形状基本总是对的:

rust
pub struct YourHandle {
    rx: Option<Arc<Mutex<oneshot::Receiver<Result<YourThing, YourError>>>>>,
}

impl Future for YourHandle { /* 和 OnUpgrade 一模一样 */ }

这个习语级的复用非常普遍——你已经在 tonic、axum-extra、reqwest 的内部看到类似结构——都是这一套。

18.10.7 错误消息的人性化

最后讲一个很细节的点——upgrade 模块的错误消息非常人性化。看 upgrade.rs:280-286

rust
impl fmt::Display for UpgradeExpected {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str("upgrade expected but not completed")
    }
}

"upgrade expected but not completed"——一句话说清三件事:

  1. upgrade 本应发生——用户确实配置了 upgrade 路径。
  2. 但没完成——具体原因要看 source chain。
  3. 是个 expected but not fulfilled 的状态——不是 panic,是状态未达成。

对比如果写 "upgrade error" 或 "channel closed"——信息少、迷茫度高。这种精确到场景语义的错误消息,是 hyper 生态好口碑的一个来源——你在 Cargo check 或运行时 trace 里看到 hyper 的错误,大部分能不看源码就猜出问题出在哪里。

自己写库时在 Display 实现里花 5 分钟考虑"用户看到这行会怎么想"——这是长期回报最大的时间投入之一。

18.10.8 为什么 on 是自由函数而非方法

仔细看看 upgrade 的入口:hyper::upgrade::on(req)。这是一个自由函数(free function),不是 req.upgrade() 方法。为什么?

几种候选方案:

  1. req.upgrade() inherent method——放在 http::Request 上。但 http crate 不是 hyper 所有,hyper 不能给它加方法。
  2. req.upgrade() via extension trait——例如 HyperRequestExt::upgrade(&mut self) -> OnUpgrade。用户必须 use HyperRequestExt 才能用,增加认知负担。
  3. hyper::upgrade::on(req) 自由函数——hyper 里任何地方 use hyper::upgrade; 后调 upgrade::on(req)。无需 trait import。

hyper 选了第 3 种——模块级命名空间 + 自由函数。好处是:

  • 用户在 IDE 里输入 hyper::upgrade:: 立刻看到所有相关 API(onUpgradedOnUpgradeParts)——一个明确的"功能域"。
  • 无需额外 trait import——import 少一行,编译错误少一种。
  • 在 hyper 的 API surface 里占的位置明确——"upgrade 这个词就属于 hyper"。

反对意见是——"方法链更流畅"。但对 upgrade 这种不常用 API(绝大多数用户永远不会碰)来说,可见性比流畅更重要。这是一次在"API 流畅性 vs 可发现性"之间的权衡,hyper 选了后者。

类似的决策在 hyper 其他地方也反复出现——hyper::rt::*hyper::body::*hyper::service::*——都是模块级命名空间组织 API,而非方法链。这是 hyper 整体 API 风格的一个侧影。

18.10.9 和 Service/Layer 抽象的交叉点

上一部分讨论了 hyper::upgrade::on 的自由函数选择。更往深挖——它的背后是整个 hyper/tower 生态对状态和类型归属的哲学。

回想第 13 章讲过 hyper::Servicetower::Service 的差异。hyper::Service<Request> 接受一个 Request,返回 Future<Output = Response>。中间件用 tower::Layer 组合多个 Service——每一层都可能给 Request 做变换(加 header、包装 body、改 extensions)。

upgrade 和这个 Service 链有一个微妙耦合——OnUpgrade 存在 req.extensions 里。中间件链里任何一步如果into_parts / from_parts 重建了 Request,可能会丢掉 extensions——包括 OnUpgrade。这意味着中间件作者需要小心地保留 extensions,否则 upgrade 就在中间件链里消失了。

Tower 的 Service/Layer 设计本身不知道这件事——它处理的是通用 Req → Res。是 hyper 的 upgrade 机制借用了 extensions。如果中间件作者用 req.into_parts().4 忘了带 extensions,就悄悄坏了 upgrade。这是一个跨模块契约——没有编译期保证,只能在文档 / code review 里传承。

这也是 Tower 生态长期的一个争议点——Service 的 Request/Response 完全泛型,没有办法在类型层面保证"OnUpgrade 在 extensions 里"。解决方案要么是中间件自觉,要么是在更高层加约束类型(比如 axum 把 Request 收窄到自己的 Request 类型)。

这个跨模块耦合是 "统一抽象的代价" 的典型样本——Tower 选择不绑 HTTP 换来通用性,代价是 HTTP 特有的语义(Upgrade、body tailers 等)需要靠文档约定保护。权衡总是存在。

18.10.10 落到类型之前:几个常见的 upgrade 使用错误

最后收一个"常见错误清单",这些都是我在做 code review 时看过的真实坑:

错误 1:在 Response 写出之前 await 了 upgrade

rust
// 错
async fn bad(req: Request<Incoming>) -> Response<...> {
    let upgraded = hyper::upgrade::on(&mut req).await?;  // 卡死在这里!
    // 永远走不到 response
    Response::builder().status(101).body(...).unwrap()
}

upgrade 要等 Response 写到 socket 之后 才能 fulfill——因为只有 response 出去了,parser 才会切到 upgrade 模式。在 response 返回之前 await upgrade 是死锁。

正确做法是 spawn 独立 task 处理 upgrade,主 future 直接返回 response。

错误 2:忘了处理 OnUpgrade::none() 的情况

rust
// 错
async fn maybe_bad(mut req: Request<Incoming>) {
    let upgraded = hyper::upgrade::on(&mut req).await.unwrap();  // 对非 upgrade 请求会 panic
}

普通请求没有 OnUpgrade,.await 会返回 Err(new_user_no_upgrade())——.unwrap() 会 panic。应该先检查 header 或 status 再决定是否 await。

错误 3:在中间件链里 rebuild Request 丢了 extensions

rust
// 错——extensions 没保留
fn some_middleware(req: Request<B>) -> Request<B2> {
    let (parts, body) = req.into_parts();
    let new_body = transform_body(body);
    let mut new_parts = parts;
    // 这里重建 Request 时 extensions 可能被覆盖或丢失
    Request::from_parts(new_parts, new_body)
}

这种 "走一圈 Parts" 的中间件看起来没问题,但实际 parts.extensions 里可能有 OnUpgrade——如果你在这期间调用了 parts.extensions.clear() 或直接替换了 extensions,upgrade 就断了。

防御:中间件作者对 http::Extensions 保持"只增不删"的原则。要加新状态用 extensions.insert(my_state),不要 clear。

错误 4:对 Upgraded::downcast 的类型 mismatch

rust
// 可能 Err
let parts = upgraded.downcast::<TcpStream>()
    .expect("should be a tcp stream");

问题是:如果底层是 hyper_util::rt::TokioIo<TcpStream>rustls::ServerConnection<TcpStream>,downcast 到 TcpStream失败——类型不匹配。你必须 downcast 到实际创建时的确切类型

这个约束让 downcast 在类型路径较深时不太好用——你得一直追到最底。实用建议是:只对自己控制的简单 IO 类型 downcast,对用户可能换 TLS 的场景避免 downcast。

错误 5:upgrade 之后没做流控

upgrade 之后的 Upgraded 就是一个无流控的 Read/Write——如果你不自己做背压,恶意对端可以爆你内存。典型做法是给 Upgraded 外层套一个 tokio::io::BufReader / BufWriter 限制 buffer size,或者在应用协议层做 frame-level 限流。

这些错误合起来提醒一件事:upgrade 是强大的工具,但它把很多协议层的保护脱下来。你重新负责字节流的安全——没有 body parser、没有 header limit、没有 connection timeout——一切自己来。工具越强,责任越重

18.10.11 本章小结

hyper 的 upgrade 模块是一个"小而巧"的例子——只有 407 行核心代码,却让整个 Rust HTTP 生态可以在 hyper 基础上自由扩展到任意协议。关键设计要点:

  1. 三种 upgrade 机制(HTTP/1 Upgrade、CONNECT、HTTP/2 extended CONNECT)统一到 Upgraded 一个类型——降低用户认知负担。
  2. OnUpgrade 是 oneshot-backed Future——一次性事件,Clone 允许多路引用。
  3. Upgraded 内部的 Rewind + Box<dyn Io> 两层——回放缓冲 + 类型擦除,换来 API 的极简与通用。
  4. Pending/OnUpgrade 对子通过 oneshot::channel 通信——最轻量的单值传输原语。
  5. hyper::upgrade::on 作为自由函数——模块级命名空间 + 无 trait import 的入口。

以及更深一层——它验证了一个设计哲学:HTTP 只是一个握手协议,协商完交出字节流给应用层。这个视角让 hyper 有能力成为 Rust 所有协议扩展的"公共门厅",而不局限于 HTTP 本身。这种"低层库只做自己的事,更高层由生态组装"的分工思路——会在本书的后续章节里反复看到,特别是下一章的 hyper-util。

18.10.12 实测:§18.1"407 行"在 hyper 1.9.0 一字不差 + 生态对比

§18.1 标题 "被严重低估的 407 行代码" 是个具体数字——把它和 hyper 1.9.0 实测对照——

文件实测行章节引用
hyper-1.9.0/src/upgrade.rs407§18.1 标题精确实测 ✓
hyper-1.9.0/src/proto/h2/upgrade.rs280§18.6 主角
hyper upgrade 相关合计687

生态层 WebSocket 实现规模对照(同样实测 ~/.cargo/registry)——

crate角色
tungstenite-0.24.0/src/4847同步纯 Rust WebSocket 协议实现(握手 / frame / mask / close)
tokio-tungstenite-0.24.0/src/1233把 tungstenite 桥接到 tokio AsyncRead/AsyncWrite
WebSocket 协议实现合计6080

两条值得记住的物理事实——

  1. hyper 自己的 upgrade 仅 687 行 vs WebSocket 协议生态 6080 行 = 1:9——印证 §18.1 标题 "407 行" 的工程含义:hyper 只负责"协议升级握手"这件最小的事——握手完成后把字节流交给应用层(如 tungstenite),WebSocket 协议本身(frame parser / mask / ping-pong / close handshake)由生态承担——和 ch10 §10.11.1 测得的 Future trait 仅 135 行 + 生态(tokio/async-std/smol)数万行同款 "核心抽象薄、生态承担重" 模式
  2. upgrade.rs 407 行 + proto/h2/upgrade.rs 280 行——HTTP/1 vs HTTP/2 两套实现 HTTP/1 比 HTTP/2 多 45%——印证 §18.6 标题 "HTTP/2 的特殊战场" 在工程量上是真实的特殊战场:HTTP/2 帧多路复用 + extended CONNECT + UpgradedSendStreamTask 背景驱动 + 三方 race(§18.6.3)——280 行专门处理这些复杂性

串联 ch02 §2.8.1 实测的 tower-service 整 crate 仅 390 行 + ch03 §3.10.1 tower-layer 655 行 + ch09 §9.9.3 http crate 14395 行 + 本节 hyper upgrade 687 行 = 16127 行——是 Rust HTTP 协议栈"Service 抽象 + HTTP 数据结构 + Upgrade 机制" 总工程量;hyper upgrade 仅 4.3% 印证它确实是 §18.1 标题"被严重低估"的小工程量。

18.11 落到你键盘上

  • hyper/examples/upgrades.rs—— 官方示例把 HTTP/1 Upgrade 用到了极致,客户端和服务端完整流程。看完你能写出 WebSocket 代理、CONNECT 隧道、自定义协议升级。
  • 跑一次 Upgraded::downcast—— 手写一个小 server:upgrade.await? 之后 .downcast::<TokioIo<TcpStream>>() 试试。成功拿到 Parts { io: TcpStream, read_buf }——然后手动用 TcpStream 做一些 raw socket 级别操作(比如 setsockopt)。这会让你对"类型擦除的代价和回收"有物理感受。
  • 比较 HTTP/1 upgrade 和 HTTP/2 CONNECT—— 在同一个服务里支持两种协议承载 WebSocket。配 enable_connect_protocol() 让 HTTP/2 能收 extended CONNECT。观察单 TCP 连接 vs 多 TCP 连接在客户端侧的行为差别——HTTP/2 extended CONNECT 让"10 个 WebSocket 只占 1 条 TCP"成为可能。
  • proto/h2/upgrade.rstick 方法—— 这是一个经典的"手写 select"范本。手写成 loop { poll_a(); poll_b(); poll_c(); } 而非 tokio::select!——研究 hyper 为什么选择手写,你会对 Future 手动组合有更深理解。

下一章我们离开 hyper 自己的代码,看 hyper-util——这个看似"杂物箱"的 crate。它其实承担了 hyper 1.x 架构里一个关键角色:桥接 hyper::Service 与 tower::Service。这两个 Service 长得像但有一处关键差异,hyper-util 是专门为了这个差异而存在的——也是"为什么 hyper 1.0 不直接用 tower::Service"的答案。

基于 VitePress 构建