Skip to content

第10章 http-body 与 Body trait:frame、trailers、size_hint

10.1 "body 到底是什么"这个被忽视的问题

上一章我们读完了 http crate——整个 HTTP 数据模型的骨架。但骨架里有一个地方故意留空:body。

Request<T>Response<T> 里的 T 是什么?答案是"随便什么"——StringVec<u8>()BytesIncomingBoxBody……任何类型都可以放进去。这不是 API 设计偷懒——body 本身就是一个独立的抽象问题,它值得一个独立的 crate。

这就是 http-body 的角色。它定义了一个很短的 trait:

rust
// http-body/src/lib.rs:38-88
pub trait Body {
    type Data: Buf;
    type Error;

    fn poll_frame(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>>;

    fn is_end_stream(&self) -> bool { false }
    fn size_hint(&self) -> SizeHint { SizeHint::default() }
}

三个方法,十几行。但它是整个 Rust HTTP 生态"body 作为流"的公共语言。这一章我们把它、以及它的小伙伴 Frame / SizeHint / http-body-util / hyper::body::Incoming 全部读一遍。

源码锁定:http-body 1.0.1 / http-body-util 0.1.3(commit c8cb37f)。

10.2 为什么要把 body 做成"流"

Naive 的 HTTP 服务器会把整个 body 读进一个 Vec<u8> 再交给 handler 处理。这种做法在小请求(JSON API)上合理,但在三种场景下会灾难:

  1. 上传大文件:100MB 的文件请求,Vec 化之后 100MB 都在服务进程的堆上。1000 个并发上传 = 100GB 内存 = OOM。
  2. 流式响应:服务端一边生成数据一边响应(SSE / 流式 JSON / 视频)。等生成完才发——用户感觉卡死。
  3. HTTP/2 stream:HTTP/2 的流天生是分帧的,一个 frame 一个 frame 到,如果要求组装成 Vec 会阻塞流控。

所以 body 必须是一个惰性、增量、双向驱动的流——调用方 poll_frame 拉一帧、用完释放、再拉下一帧。http-body::Body 正是这个抽象。

10.2.1 type Data: Buf 而不是 Vec<u8>

Body 的数据类型是 type Data: Buf——关联类型,约束到 bytes::Buf trait。

为什么不是具体的 BytesVec<u8>?因为不同来源的 body 产出的类型不同:

  • HTTP/1 解析器从内部环形缓冲读一段出来——那是一个 Bytes(引用计数切片)。
  • 用户从 file 读一段——可能是 Vec<u8>
  • gRPC codec 产生的是预先对齐的 protobuf Bytes
  • 测试代码里可能是 &'static [u8]

要求所有 body 都先转成 Vec 或 Bytes 会强制一次 copy。解决办法是让 Data 类型保持泛型——只约束"能像一个 byte cursor 那样被读取",这正是 bytes::Buf 的契约。

Buf trait 本身是 bytes crate 的核心抽象——提供 chunk() -> &[u8]advance(n)remaining() -> usize 等方法。一个类型能做到这些,就能被当作"可推进的字节序列"使用——不管它底下是单 slice、多 slice 的 chain、还是离散的链表。

这种"不绑定具体容器类型"的 trait 设计在 Rust 生态很普遍。卷四《Tokio 源码深度解析》第 8 章(I/O Driver)里 AsyncRead 里也有类似的 "read into anywhere that implements BufMut" 的思路——把数据移入/移出的"目标类型"延迟到具体场景,是 Rust 高性能库的共同气质。

10.3 poll_frame:一次拉一帧

rust
fn poll_frame(
    self: Pin<&mut Self>,
    cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>>;

返回类型层层嵌套,但语义清晰:

  • Poll::Pending:下一帧还没准备好(等网络、等上游、等磁盘)。
  • Poll::Ready(None):流结束,再也没东西了。
  • Poll::Ready(Some(Ok(frame))):有一帧可以消费。
  • Poll::Ready(Some(Err(e))):读取错误。

三种"有东西"的情况都塞进一个返回类型——Pending 等待、Ready+None 结束、Ready+Some 产出。这是 Rust Stream trait 的标准惯用形式。事实上如果你看 futures::Stream

rust
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
    -> Poll<Option<Self::Item>>;

一模一样的签名——只是把 Item 具体化成了 Result<Frame<Data>, Error>Body 本质就是一个"会产出 Frame 的 Stream",只不过带了一些 HTTP 特有的 helper 方法。

10.3.1 为什么不直接用 Stream trait

这是个好问题,有两个答案。

第一,Stream trait 曾经不稳定futures::Stream 在 futures 0.3 时代是第三方 trait,进入 core 的过程持续了很多年。http-body 不想依赖一个外部 crate 的 trait——于是自己定义。

第二,语义更精确Stream 是通用"连续产出 Item"的抽象;Body 明确表达"这是一个 HTTP body"。两个 helper 方法(is_end_streamsize_hint)是 HTTP 特有的——HTTP/2 需要 is_end_stream 提示来决定是否发 END_STREAM flag;size_hint 影响 Content-Length header。把这些放在独立 trait 上比做 Stream::size_hint(不存在)更合适。

这是另一个 "基础语义 + 领域 helper"的 trait 拆分例子,和第 2 章 Service trait 与 Serializer trait 一脉相承。

10.4 Frame<T>:两种可能

rust
// http-body/src/frame.rs:3-17
#[derive(Debug)]
pub struct Frame<T> {
    kind: Kind<T>,
}

enum Kind<T> {
    // The first two variants are "inlined" since they are undoubtedly
    // the most common. This saves us from having to allocate a
    // boxed trait object for them.
    Data(T),
    Trailers(HeaderMap),
    //Unknown(Box<dyn Frameish>),
}

一个 frame 有两种可能的内容:

  • Data(T):字节数据,T: Buf
  • Trailers(HeaderMap):HTTP/2 的 trailing headers——body 结束后附带的 header。

10.4.1 Trailers 是 HTTP/2 / gRPC 的必需品

初学 HTTP 的人可能没接触过 trailers——它是 HTTP/1.1 存在但很少用、HTTP/2 普遍用、gRPC 必须用的功能。

典型用途:gRPC 把状态码放在 trailer 里。因为 gRPC 的响应体是一个 protobuf 流——请求成功还是失败、错误细节是什么——都不能放在响应头里(响应头要在 body 之前发,那时还不知道 body 处理结果)。解决方案:响应 200 OK、body 里是 protobuf 流、流结束后再发一个 trailer grpc-status: 0。所以 gRPC 客户端必须等到 trailer 才知道请求是否真的成功。

http-body 把 data 和 trailer 统一成一个 Frame——调用方 poll_frame 读到的东西可能是这两种之一:

rust
loop {
    match body.poll_frame(cx).await {
        Some(Ok(frame)) => {
            if let Some(data) = frame.data_ref() {
                // 处理字节数据
            } else if let Some(trailers) = frame.trailers_ref() {
                // 处理 trailing headers
            }
        }
        Some(Err(e)) => return Err(e),
        None => break, // end of stream
    }
}

10.4.2 那个被注释掉的 Unknown variant

rust
//Unknown(Box<dyn Frameish>),

这一行是留给未来的。作者早期考虑过给 frame 加第三种 variant——"未知类型的帧",让扩展协议可以携带非 Data/Trailers 的内容。但这会引入 trait object 和堆分配——破坏 http-body 的零成本气质。最终决定注释掉——但留着这个"曾经考虑过"的痕迹,作为未来讨论的起点。

这也是读工业级开源代码的一个乐趣——注释里的"—"比代码本身还透露信息。你看到这行注释,就知道 http-body 的作者是有意决定保持 Frame 闭合——任何需要更多 frame 种类的协议都要自己扩展,不能通过 trait object 硬塞进 Frame。

10.4.3 map_data:只变换数据

rust
pub fn map_data<F, D>(self, f: F) -> Frame<D>
where F: FnOnce(T) -> D,
{
    match self.kind {
        Kind::Data(data) => Frame { kind: Kind::Data(f(data)) },
        Kind::Trailers(trailers) => Frame { kind: Kind::Trailers(trailers) },
    }
}

变换数据类型,trailers 原样透传。这让你能在中间件里"把所有 Data frame 的类型从 Bytes 变成 String"而不影响 trailers。是 Frame 上最常用的 combinator。

10.5 SizeHint:一个 lower/upper bound 对

rust
// http-body/src/size_hint.rs:7-11
pub struct SizeHint {
    lower: u64,
    upper: Option<u64>,
}

表达"body 至少有多少字节、最多有多少字节(可能未知)"。

用途:

  • 服务端决定要不要发 Content-Length: N header。如果 size_hint.exact() 返回 Some(N)——设这个 header;否则用 Transfer-Encoding: chunked
  • 客户端决定 body buffer 要预分配多大。取 upper 作为预分配量,避免多次 realloc。
  • HTTP/2 的 flow control 决策:知道 body 有多大,可以提前预算流控窗口。

10.5.1 set_lower / set_upper 的 panic 保护

rust
pub fn set_lower(&mut self, value: u64) {
    assert!(value <= self.upper.unwrap_or(u64::MAX));
    self.lower = value;
}

pub fn set_upper(&mut self, value: u64) {
    assert!(value >= self.lower, "`value` is less than than `lower`");
    self.upper = Some(value);
}

维持不变式 lower <= upper——违反就 panic。这是一个小而严肃的 defensive programming——与其容忍不合理状态传播,不如早死早超生

10.5.2 一个被写进单元测试的数学证明

http-bodysize_hint.rs 里有一段相当罕见的东西——一个把数学证明写进单元测试的 block(文件 100-173 行):

rust
#[test]
fn size_hint_addition_proof() {
    // assuming addition itself is perfect, there are 3 distinct states:
    // (_, Some(_)) + (_, Some(_)) => (_ + _, Some(_ + _))
    // (_, Some(_)) + (_, None) => (_ + _, None)
    // (_, None) + (_, None) => (_ + _, None)
    //
    // we can assert this in the typesystem! (and name them for our tests)
    match (to_parts(SizeHint::new()), to_parts(SizeHint::new())) {
        ((_, Some(_)), (_, Some(_))) => {} // 1
        ((_, None), (_, None)) => {}       // 2
        ((_, Some(_)), (_, None)) => {} // 3
        ((_, None), (_, Some(_))) => {}
    }
    ...
}

这段代码用 Rust 的 match exhaustiveness 来证明"SizeHint 加法的四种 upper 状态都被覆盖到了"。如果未来有人改 SizeHint 数据结构加了新 variant,这个 match 会编译失败——强制修改者更新证明。

这比把证明写在注释里强得多——证明变成了可执行的、会随代码变化自动 rechecking 的东西。这是 Rust 作者圈子里的一种美学——用类型系统辅助文档化不变式,既是注释也是检查。

10.6 http-body-util:常用实现 + combinators

trait 定义完了,谁来实现?答:http-body-util 这个 sibling crate。它提供了四类实现:

  1. 静态 bodyEmpty<D>(没 body)、Full<D>(一整块 body)。
  2. 流式 bodyStreamBody<S>(包装一个 Stream)、BodyDataStream<B>(反向:Body 适配成 Stream)。
  3. 通道 bodychannel() 返回 (Sender, Body),生产者写、消费者读。
  4. 修饰 bodyLimited(限制总字节数)、BoxBody(类型擦除)、MapErrMapFrame

以及一个 BodyExt trait——给 Body 添加一堆 combinator 方法(map_errmap_framecollect()boxed())。

10.6.1 Full<D>:最简单的 Body 实现

rust
// http-body-util/src/full.rs:10-74 精简
pub struct Full<D> {
    data: Option<D>,
}

impl<D: Buf> Body for Full<D> {
    type Data = D;
    type Error = Infallible;

    fn poll_frame(mut self: Pin<&mut Self>, _cx: &mut Context<'_>)
        -> Poll<Option<Result<Frame<D>, Self::Error>>>
    {
        Poll::Ready(self.data.take().map(|d| Ok(Frame::data(d))))
    }

    fn is_end_stream(&self) -> bool { self.data.is_none() }

    fn size_hint(&self) -> SizeHint {
        self.data.as_ref()
            .map(|data| SizeHint::with_exact(u64::try_from(data.remaining()).unwrap()))
            .unwrap_or_else(|| SizeHint::with_exact(0))
    }
}

20 行代码——这是一个完整的 Body 实现。分解:

  • Option<D> 存数据:Some(d) 表示"还没发",None 表示"已经发完"。
  • 第一次 poll_frametake() 把数据拿出来,构造一个 Frame::data(d) 返回。
  • 第二次 poll_frameself.data 已经是 None,take() 返回 None,整个 Poll 变成 Ready(None)——结束。
  • size_hint() 返回精确的大小——因为我们事先就知道 body 有多少字节。
  • type Error = Infallible:这种 body 不可能产生错误。

Infallible 是 Rust 标准库里"表示永远不会发生的错误"的类型——它是一个空 enum,没有任何 variant,所以任何返回 Result<T, Infallible> 的函数实际只可能返回 Ok。Rust 编译器对这种类型有特殊优化,不会生成不可达的错误处理代码。

10.6.2 BoxBody<D, E>:类型擦除的逃生舱

rust
// http-body-util/src/combinators/box_body.rs 精简
pub struct BoxBody<D, E> {
    inner: Pin<Box<dyn Body<Data = D, Error = E> + Send>>,
}

impl<D: Buf, E> Body for BoxBody<D, E> {
    type Data = D;
    type Error = E;
    fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Result<Frame<D>, E>>>
    {
        self.project().inner.as_mut().poll_frame(cx)
    }
}

与第 3 章讨论的 BoxService 思路一致——用一次堆分配 + 一次虚方法调用换类型擦除。当你的 body 链路复杂到类型名变成一堆 <<<>>> 时,装进 BoxBody 就把类型统一——route 表就能有同样的 Body 类型。

Axum 的 Router::service 返回的 body 就是 BoxBody<Bytes, Error>——这就是为什么你在 Axum handler 的返回类型里经常看到 BoxBodyAxum 内部 route 分发需要类型统一,而用户的各种 handler(返回 String、Vec、stream、None)可能产出形形色色的 body——全部统一装 box。

10.6.3 Limited:强制限制 body 大小

这是一个非常重要的安全中间件。

rust
// http-body-util/src/limited.rs 精简
pub struct Limited<B> {
    inner: B,
    remaining: usize,
}

impl<B: Body> Body for Limited<B> {
    type Data = B::Data;
    type Error = Box<dyn Error + Send + Sync + 'static>;

    fn poll_frame(...) -> Poll<Option<Result<Frame<B::Data>, Self::Error>>> {
        // 从 inner 拉一帧
        let frame = std::task::ready!(inner.poll_frame(cx));
        match frame {
            Some(Ok(f)) => {
                if let Some(d) = f.data_ref() {
                    let len = d.remaining();
                    if len > self.remaining {
                        // 超限——产出错误
                        return Poll::Ready(Some(Err(LengthLimitError.into())));
                    }
                    self.remaining -= len;
                }
                Poll::Ready(Some(Ok(f)))
            }
            // ...
        }
    }
}

逻辑:读一帧 → 检查 remaining → 扣除 → 返回帧(或超限错误)

这是 HTTP 服务端的必备防护——没有 Limited 的 body 就是 DoS 攻击向量。攻击者可以发一个 Content-Length: 99999999999 的请求(或者 chunked encoding 不断发数据),你的服务端如果 naively 全读进内存就挂了。Axum、Actix-web、warp 等 HTTP 框架默认会把 body 套 Limited——通常 2MB 或 4MB 的上限,可配置。

10.7 hyper::body::Incoming:把三种来源统一成 Body

现在我们来看 Hyper 这一端——当一个 HTTP 请求的 body 真实从网络上到达 Hyper 时,它是什么类型?答:hyper::body::Incoming

rust
// hyper/src/body/incoming.rs:52-75
pub struct Incoming {
    kind: Kind,
}

enum Kind {
    Empty,
    Chan {
        content_length: DecodedLength,
        want_tx: watch::Sender,
        data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
        trailers_rx: oneshot::Receiver<HeaderMap>,
    },
    H2 {
        content_length: DecodedLength,
        data_done: bool,
        ping: ping::Recorder,
        recv: h2::RecvStream,
    },
    #[cfg(feature = "ffi")]
    Ffi(crate::ffi::UserBody),
}

Incoming 是一个多态结构——它可能是三种 body 来源之一:

  • Empty:没有 body(GET 请求、204 响应、HEAD)。
  • Chan:HTTP/1 路径——hyper 的 HTTP/1 解析器把 body chunks 通过 mpsc 送来,want_tx 是一个反向信号(告诉解析器调用方希望继续读)。
  • H2:HTTP/2 路径——直接持有 h2::RecvStream,带 flow control 和 ping 逻辑。
  • Ffi:FFI 用户 body(C API 写的)。

所有这些"来源"对外统一暴露成一个 Body impl——用户只看到 Incoming.poll_frame() 之后返回 frame。内部是哪条路径调用方根本不知道、也不需要知道。

这是 Hyper 1.x 对多协议的优雅处理:协议实现的差别藏在 enum variant 里,对外的 trait impl 把它们统一。看 Incoming 的 Body impl(文件后半段):

rust
impl Body for Incoming {
    type Data = Bytes;
    type Error = crate::Error;

    fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Result<Frame<Bytes>, crate::Error>>>
    {
        match self.kind {
            Kind::Empty => Poll::Ready(None),
            Kind::Chan { ref mut data_rx, ref mut trailers_rx, .. } => {
                // 从 mpsc 拉数据;结束后看 oneshot 拿 trailers
                ...
            }
            Kind::H2 { ref mut recv, .. } => {
                // 调 h2 的 RecvStream
                ...
            }
            ...
        }
    }
}

三种实现、同一个 trait、同一个对外 API。这是协议多态最好的例证。

10.7.1 Chan 的反向信号:want_tx

Chan variant 里有一个字段你可能没见过:want_tx: watch::Sender

作用:告诉 HTTP/1 parser "调用方现在还想继续读 body"。Hyper HTTP/1 parser 不会无脑从 TCP 读数据填 mpsc——它会等调用方 poll_frame 时发出"想要"信号。这是一种 body-level 的背压:如果 handler 处理 body 慢,parser 不会把内存填爆。

HTTP/2 不需要这个 mechanism——h2 crate 自己的 flow control window 处理了背压。HTTP/1 因为没有线路级 flow control,需要应用层配合。

这是hyper 工程上最被忽视但至关重要的一块——它让 HTTP/1 上的大文件流式上传在 Rust 里也能正确应用背压。第 11 章读 HTTP/1 wire 时会把 want_tx / want_rx 的协议完整讲清楚。

10.8 一段完整链路

把所有这些串起来——从 TCP 字节到 handler 消费 body——是这样:

TCP stream

hyper proto::h1::Dispatcher 解析 header

构造 Request<Incoming>

Incoming::Chan { data_rx, want_tx, ... }
    ↓                       ↑
    └── mpsc 发数据 ←── h1 parser 读 TCP
    
handler 拿到 Request<Incoming>

while let Some(frame) = body.frame().await?

    frame.data()  /  frame.trailers()

    handler 处理

上链路里每一个箭头都是一次 .await——整条链是惰性驱动的。handler 不消费,parser 不发;parser 不发,TCP 不读。这就是"端到端的流式处理"在 Rust 里的落地形态。

10.9 实战:一个最小 JSON 流解析器

我们用本章和前几章的知识写一个"流式 JSON 数组解析器"——不把整个 body 读进内存,一边读一边解析。

rust
use http_body_util::BodyExt;
use hyper::body::Incoming;
use serde_json::Deserializer;

async fn stream_parse(body: Incoming) -> Result<Vec<Item>, BoxError> {
    let mut items = Vec::new();
    let mut buffer = bytes::BytesMut::new();

    let mut body = std::pin::pin!(body);
    while let Some(frame) = body.as_mut().frame().await {
        let frame = frame?;
        if let Some(data) = frame.data_ref() {
            buffer.extend_from_slice(data.chunk());

            // 尝试增量解析
            let de = Deserializer::from_slice(&buffer).into_iter::<Item>();
            let mut consumed = 0;
            for result in de {
                match result {
                    Ok(item) => {
                        items.push(item);
                        consumed = de.byte_offset();
                    }
                    Err(e) if e.is_eof() => break,  // 数据不足,等下一帧
                    Err(e) => return Err(e.into()),
                }
            }
            buffer.advance(consumed);
        }
    }
    Ok(items)
}

这段代码真实可用,关键点:

  • body.frame().await 来自 BodyExt——把 poll_frame 包成 async fn
  • BytesMut 做增量缓冲——每收到一帧追加,serde 消费多少就 advance 多少。
  • 解析出 EOF 错误不当失败——是"数据不足"的信号,等下一帧。
  • 永远不把整个 body 读进内存——memory footprint ≈ 一帧 + 未消费字节 + 一个 JSON 对象。

这是 Rust 流式处理的"标准配方"。结合卷四《Serde 元编程》第 4 章(Deserializer 与 Visitor)——你会发现 serde 的 Visitor 设计天然适合这种"增量输入、增量产出"场景。http-body 的流和 serde 的流可以在同一条 async 链上共存。

10.10 小结与落到你键盘上

本章要点:

  1. Body traitpoll_frame 把"HTTP body"抽象成"Frame 流"——每帧可能是 Data 或 Trailers。
  2. Frame<T> + SizeHint 是小而精确的数据原语,甚至在单元测试里把加法的数学证明写进 match。
  3. http-body-util 提供常用 impl:EmptyFullStreamBodyBoxBodyLimitedLimited 是安全防护,生产 HTTP 服务端必配。
  4. hyper::body::Incoming 用 enum 把 HTTP/1 mpsc、HTTP/2 recv stream、FFI body 统一成一个 Body impl。
  5. Chan variant 的 want_tx 在 HTTP/1 上提供 body-level 背压——hyper 工程细节里很少被讲的关键一环。

落到你键盘上:

  • http-body-util/src/channel.rs——它实现了 (Sender, Body) 对,是做 SSE / 流式响应时最实用的工具。
  • 给 Axum 代码加 Limited——如果你还没做,立即加上 tower_http::limit::RequestBodyLimitLayer::new(2 * 1024 * 1024)
  • 实验 BodyExt::collect()——把一个 Incoming 一次性 collect 成 Vec<u8>。跑一次之后读它的源码(大约 50 行),你会明白所有 body 的 .await 链本质上是在做什么。

下一章开始 Hyper HTTP/1 的硬核部分——我们读 httparseproto::h1::Encoderproto::h1::Decoder,看字节流如何被解析成你熟悉的那些 http:: 类型。

基于 VitePress 构建