Skip to content

第17章 Body 处理与流式响应

第 15、16 章讲 axum::serve 的连接层。这一章下沉一层——到数据层。Body 是 HTTP 请求和响应的"主要载体"——URL 和 header 只是元信息,真正的负载在 body。axum 里 Body 的类型设计、流式机制、缓冲工具是整个数据流的中枢。

前面章节零散提到过 Body:

  • 第 6 章 body 单次消费:FromRequest 消费 Body、FromRequestParts 借用 Parts
  • 第 8 章 Multipart:Body 按 stream 分 field 流式读
  • 第 10 章 SSE:Body 是 Stream<Item = Event>、Sse::new(stream) 包装
  • 第 10 章 文件下载:Body::from_stream(ReaderStream::new(file)) 流式返回
  • 第 15 章 serve:hyper 通过 Body 读字节、写字节

这一章把这些分散的使用统一起来——讲清楚 Body 的 API、类型结构、流式机制。读完之后你应该能回答:为什么 Body 能既是缓冲的又是流式的?Body::from_stream 内部做了什么?到底什么时候该 to_bytes 缓冲、什么时候该 stream?

为什么 body 是 HTTP server 框架的中枢

HTTP server 框架的工作本质上就是"字节进字节出"——所有路由、提取器、middleware 最终都是"对字节的某种处理"。body 是这些字节的 载体——一个框架对 body 的抽象质量基本决定了它能做什么。

axum 的 body 设计反映了几个成熟的工程取舍:

  • 不造新 trait:用 http_body::Body——避免生态分裂
  • 类型擦除但高效:UnsyncBoxBody + try_downcast——简单 API + 零重复包装开销
  • 流式和缓冲对称:from_stream / into_data_stream / to_bytes——用户按场景选
  • Bytes 零拷贝:整条路径共享引用、不做字节 memcpy

这些取舍让 axum 的 body 处理既语言好写(handler 里直接 Body::from(...))又运行时高效(零拷贝 + 流式)。不同于其他一些框架(Flask、Express)"body 先全缓冲到内存再交给 handler"的简单但低效策略——axum 的 body 是 first-class citizen。

Body 的两重身份

axum 的 Body 有两种"身份":

作为类型axum::body::Body):用户 handler 里见到的、响应和请求里携带的——就是这一个具体类型。没有泛型、没有 trait 对象、handler 签名里直接写 Body 即可。

作为协议http_body::Body trait):hyper 读写 HTTP body 的通用接口——poll_framesize_hintis_end_stream 三件套。所有 HTTP 相关 crate(axum、hyper、tonic、reqwest)都遵守这个 trait。

两个身份的关系:axum::Body(具体类型)实现了 http_body::Body(trait)。axum 提供一个"开箱即用的 body 类型"——背后是标准 trait 的具体包装。

这让 axum 的 body 处理 API 简单(就一个类型)但生态兼容(符合 http_body 规范)——两全其美。

Body:类型擦除的 http_body

Body 的定义在 axum-core/src/body.rs:39,依赖第 13 行的 BoxBody 类型别名:

rust
// axum-core/src/body.rs:13
type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, Error>;

// axum-core/src/body.rs:37-39
#[must_use]
#[derive(Debug)]
pub struct Body(BoxBody);

一个 newtype 包装 UnsyncBoxBody。层层分解:

http_body_util::UnsyncBoxBody<Bytes, Error>http_body::Body 的 trait object wrapper。http_body::Body 是 HTTP body 的标准 trait(hyper / tonic / reqwest 都基于它)——axum 用类型擦除存储任意实现者。

UnsyncBoxBody vs BoxBody:前者不要求 Sync、后者要求。axum 选 UnsyncBoxBody 因为:

  • 大多数 body 是从单个 task 处理的——Send 够、不需要 Sync
  • Sync bound 会把一些合法的 body 类型排除(比如第 10 章 SSE 章节提过的 SyncWrapper<Stream>

Bytes 作为 data typehttp_body::Body::Data 关联类型固定为 bytes::Bytes——最通用的字节容器、零拷贝共享、hyper / h2 等都用 Bytes。

Error = axum_core::Error:第 12 章讲过的 axum 通用错误 wrapper——BoxError 的 newtype。body 的任何错误统一成 axum::Error、downstream 代码只处理一种错误类型。

用户看到一个统一的 Body 类型——下层实际是多种具体 body 类型的类型擦除包装。

Body::new 的 try_downcast 优化

body.rs:43-49

rust
impl Body {
    pub fn new<B>(body: B) -> Self
    where
        B: http_body::Body<Data = Bytes> + Send + 'static,
        B::Error: Into<BoxError>,
    {
        try_downcast(body).unwrap_or_else(|body| Self(boxed(body)))
    }
}

try_downcast<T, K>(k: K) -> Result<T, K> 的实现在 body.rs:23-34

rust
pub(crate) fn try_downcast<T, K>(k: K) -> Result<T, K>
where T: 'static, K: Send + 'static,
{
    let mut k = Some(k);
    if let Some(k) = <dyn std::any::Any>::downcast_mut::<Option<T>>(&mut k) {
        Ok(k.take().unwrap())
    } else {
        Err(k.unwrap())
    }
}

核心:尝试把 K downcast 成 T。如果 K 本身就是 T(TypeId 相等),直接返回——不做任何包装。

Body::new 的语境下:如果用户传入的 body 本来就是 Body 类型(比如中间件里 Body::new(existing_body)),try_downcast 直接返回那个 Body——不重新 box 一层。

这个优化避免了"不必要的类型擦除嵌套"——Body::new(Body::new(Body::new(inner))) 等价于 Body::new(inner)、不会层层 box。生产里中间件栈可能多次 wrap body——这个优化让 wrap 层数对 runtime cost 无影响。

Body::empty 的实现

body.rs:52-54

rust
pub fn empty() -> Self {
    Self::new(http_body_util::Empty::new())
}

http_body_util::Empty——一个 ZST,poll_frame 永远返 NoneBody::empty() 是零大小、零分配的空 body——适合作为"响应无 body"的默认(比如 204 No Content)。

From 各种基础类型

body.rs:93-111 的宏:

rust
macro_rules! body_from_impl {
    ($ty:ty) => {
        impl From<$ty> for Body {
            fn from(buf: $ty) -> Self {
                Self::new(http_body_util::Full::from(buf))
            }
        }
    };
}

body_from_impl!(&'static [u8]);
body_from_impl!(std::borrow::Cow<'static, [u8]>);
body_from_impl!(Vec<u8>);
body_from_impl!(&'static str);
body_from_impl!(std::borrow::Cow<'static, str>);
body_from_impl!(String);
body_from_impl!(Bytes);

7 种基础类型自动 Into<Body>——http_body_util::Full 是"全内容一次性 body"的包装。让 Body::from("hello".to_string())Body::from(vec![1, 2, 3]) 都直接工作。

注意:都是 Full——一次返回整个 body——不是流式。字符串和字节数组天然是内存里的、不需要流。Full 在 poll_frame 里一次返 Frame::data(整个 Bytes)、下一次返 None——两次 poll 就结束。

try_downcast 的类型系统技巧

try_downcast 是 Rust 类型系统的小把戏。原理:

rust
pub(crate) fn try_downcast<T, K>(k: K) -> Result<T, K>
where T: 'static, K: Send + 'static,
{
    let mut k = Some(k);
    if let Some(k) = <dyn std::any::Any>::downcast_mut::<Option<T>>(&mut k) {
        Ok(k.take().unwrap())
    } else {
        Err(k.unwrap())
    }
}

几个微妙点:

一、Option<T> 作为桥梁:downcast_mut::<Option<T>>(&mut k) 要求 k 的类型是 Option<T>——类型完全匹配。把 K 先 wrap 成 Option<K> 再 downcast——如果 K == T,downcast 成功,可以从 &mut Option<T>.take() 拿出值。

二、'static boundAny::downcast_mut 要求类型是 'static——运行时的类型信息(TypeId)基于静态类型、带引用的类型 TypeId 不确定。所以 Body 类型和所有相关的 body impl 都要求 'static——这也是 axum 所有地方 'static bound 如此普遍的原因。

三、零运行时类型信息:Rust 的 TypeId 是编译期确定的——运行时 downcast 只是比较 TypeId(一次整数相等检查)。成本极低。

这个 pattern 不只 axum 用——tower、reqwest 等库都有类似 "smart constructor"。目的都是一样:提供一个统一的 constructor API,但内部对已经是目标类型的情况做短路——避免不必要的包装。

Body::from_stream:流式构造

body.rs:59-68

rust
pub fn from_stream<S>(stream: S) -> Self
where
    S: TryStream + Send + 'static,
    S::Ok: Into<Bytes>,
    S::Error: Into<BoxError>,
{
    Self::new(StreamBody {
        stream: SyncWrapper::new(stream),
    })
}

接收一个 TryStream——Stream<Item = Result<Ok, Err>>——Ok 能变成 BytesErr 能变成 BoxError。输出一个可以作为 http_body::Body 使用的流式 body。

SyncWrapper 的作用:很多合法 Stream 实现不是 Sync——比如捕获了 CellRefCell 的 Stream。http_body::Body 本身不要求 Sync(UnsyncBoxBody 就是明确去掉 Sync 要求的版本),但 poll_frame 的签名需要 &mut self pin projection——需要一些额外机制让非 Sync 类型能 poll。SyncWrapper 用 "通过 &mut 才能访问 inner" 的限制绕过 Sync 要求——安全但不够灵活。

StreamBody 的 http_body::Body impl

body.rs:193-220

rust
pin_project! {
    struct StreamBody<S> {
        #[pin]
        stream: SyncWrapper<S>,
    }
}

impl<S> http_body::Body for StreamBody<S>
where
    S: TryStream, S::Ok: Into<Bytes>, S::Error: Into<BoxError>,
{
    type Data = Bytes;
    type Error = Error;

    fn poll_frame(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
        let stream = self.project().stream.get_pin_mut();
        match ready!(stream.try_poll_next(cx)) {
            Some(Ok(chunk)) => Poll::Ready(Some(Ok(Frame::data(chunk.into())))),
            Some(Err(err)) => Poll::Ready(Some(Err(Error::new(err)))),
            None => Poll::Ready(None),
        }
    }
}

每次 hyper 调 poll_frame,StreamBody 就 poll 一次底层 Stream——把 Ok(chunk) 转成 Frame::data(chunk.into())Err(err) 包成 axum_core::Error

pin_project! { #[pin] stream }:需要 pin projection 让 self.stream 能作为 Pin<&mut S> 访问——try_poll_next 需要这个形式。

chunk.into()S::Ok 只要求 Into<Bytes>——比如可以是 Vec<u8>String&'static [u8]——在这里统一转成 Bytes。

这是 axum 把"任意 Rust Stream"接入"HTTP body 协议"的具体 bridge。类似于第 10 章 SSE 的 SseBody——但 SseBody 需要把 Event 编码成 SSE 帧格式,StreamBody 只是透明转发 Bytes。

Body::into_data_stream:反向转换

body.rs:76-78

rust
pub fn into_data_stream(self) -> BodyDataStream {
    BodyDataStream { inner: self }
}

BodyDataStream 的定义(body.rs:141-143)加 Stream impl(body.rs:145-168):

rust
pub struct BodyDataStream { inner: Body }

impl Stream for BodyDataStream {
    type Item = Result<Bytes, Error>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        loop {
            match ready!(Pin::new(&mut self.inner).poll_frame(cx)?) {
                Some(frame) => match frame.into_data() {
                    Ok(data) => return Poll::Ready(Some(Ok(data))),
                    Err(_frame) => {}  // 跳过非 data frame(比如 trailer)
                },
                None => return Poll::Ready(None),
            }
        }
    }
}

BodyStream<Item = Result<Bytes, Error>> 用——每次 poll 从 body 拿一帧、如果是 data frame 返回 bytes、其他类型帧(trailer)丢弃。

用 loop 是因为可能连续几个 non-data frame——要 skip 到第一个 data frame 才 return。

实际场景:

rust
async fn echo(request: Request) -> impl IntoResponse {
    let (parts, body) = request.into_parts();
    let stream = body.into_data_stream();
    // stream 是 Stream<Item = Result<Bytes, Error>>, 可以做各种 stream 操作
    let doubled = stream.map_ok(|bytes| {
        // 把每块数据大写
        Bytes::from(bytes.to_ascii_uppercase())
    });
    Response::new(Body::from_stream(doubled))
}

把请求 body 转 stream、每块数据转大写、重新包成响应 body。整个过程流式——不缓冲到内存——适合大 body。

from_streaminto_data_stream 是一对互逆操作——让 Body 在 http_body::Bodyfutures::Stream 两个生态间自由转换。

to_bytes:缓冲工具

流式 body 有时需要缓冲——比如签名验证、JSON 反序列化、文件写盘前的验证。axum 提供 to_bytesaxum/src/body/mod.rs):

rust
pub async fn to_bytes(body: Body, limit: usize) -> Result<Bytes, axum_core::Error> {
    Limited::new(body, limit)
        .collect()
        .await
        .map(|col| col.to_bytes())
        .map_err(axum_core::Error::new)
}

三行代码做三件事:

一、Limited::new(body, limit)http_body_util::Limited 包装 body、限制总字节数。超过 limit 后 poll_frame 返 Err——不继续读。

二、.collect().awaithttp_body_util::BodyExt::collect 的方法——一次性读完 body、返回 http_body_util::Collected(一个 bytes + 可选 trailer 的容器)。

三、.to_bytes():从 Collected 里拿 Bytes——所有 data frame 拼接成一块。

整个流程:流式 body → Limited wrap → 全部读完(受 limit 控制)→ 返回 Bytes。

limit 参数的重要性to_bytes(body, usize::MAX) 是"不限"——接受客户端发送的任意大 body。生产里这是 DoS 漏洞——攻击者发个 10GB 的 body 耗尽内存。正确做法:

rust
let bytes = to_bytes(body, 10 * 1024 * 1024).await?;  // 10MB 上限

到限失败后,axum 的错误是 http_body_util::LengthLimitError——handler 捕获返 413 Payload Too Large。

to_bytes 是 Json / Form / Multipart / Bytes 这些提取器的底层工具——所有"需要缓冲到内存再处理"的场景都走这个。

Limited 深入:防超大请求

http_body_util::Limited<B> 是一个 body wrapper——限制 body 总字节数。内部实现:

rust
// http_body_util::Limited (简化)
pub struct Limited<B> {
    inner: B,
    limit: usize,
    consumed: usize,  // 已读字节
}

impl<B: Body> Body for Limited<B> {
    type Data = B::Data;
    type Error = LimitedError<B::Error>;

    fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>>
    {
        // poll inner
        match ready!(self.inner.poll_frame(cx)) {
            Some(Ok(frame)) => {
                if let Some(data) = frame.data_ref() {
                    self.consumed += data.remaining();
                    if self.consumed > self.limit {
                        return Poll::Ready(Some(Err(LengthLimitError.into())));
                    }
                }
                Poll::Ready(Some(Ok(frame)))
            }
            // ...
        }
    }
}

每次 poll 累加读到的字节数——超过 limit 立即 Err——上层停止读、body 不会真的完成缓冲。这让"10GB 上传"被立刻拒绝——而不是"等缓冲完 10GB 才发现超限"。

错误类型 LengthLimitError 能通过 downcast 识别:

rust
match to_bytes(body, 1024).await {
    Ok(bytes) => { /* ... */ }
    Err(err) => {
        let source = std::error::Error::source(&err).unwrap();
        if source.is::<http_body_util::LengthLimitError>() {
            return (StatusCode::PAYLOAD_TOO_LARGE, "body too large").into_response();
        }
        // 其他错误
    }
}

这让 handler 能区分"body 过大"和"其他 body 错误"——分别返 413 和 500。

Limited 和 DefaultBodyLimit

第 6 章讨论过 DefaultBodyLimit——请求 body 的默认大小限制(默认 2MB)。DefaultBodyLimit 是 Tower Layer——给所有经过的请求加一个 Limited wrap。to_byteslimit 参数和 DefaultBodyLimit 独立——在 to_bytes 调用前 body 已经被 Layer wrap 过一次。两层 limit 都生效——取更严格的那个。

http_body::Body trait 内部

axum::Body 的底层是 http_body::Body——HTTP body 的标准 trait(第三方 crate http-body):

rust
// 等价于 http_body crate 的定义
pub trait Body {
    type Data: Buf;
    type Error;

    fn poll_frame(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>>;

    fn is_end_stream(&self) -> bool { false }
    fn size_hint(&self) -> SizeHint { SizeHint::default() }
}

Frame:数据帧或 trailer

Frame<T> 是 http_body 的基本单元——可以是 数据帧(一块 bytes)或 trailer(HTTP/2 的尾部 header):

rust
enum FrameInner<T> {
    Data(T),
    Trailers(HeaderMap),
}

Data frame:body 的实际字节。HTTP/1.1 的 chunked encoding 每个 chunk 都是一个 data frame;HTTP/2 的 DATA frame 也是。

Trailer frame:HTTP/2 特有——body 结束后额外发一批 header(比如 gRPC 的 status code)。

http_body 统一两种形态——用户 poll 同一个方法、根据 Frame 类型分发。这个设计让 HTTP/1.1 和 HTTP/2 的 body 模型统一——上层代码不用分别处理。

axum::BodyDataStream 主动丢弃 trailer frame——因为大多数 axum handler 不关心 trailers。想要 trailer 的场景(比如 gRPC)要用 http_body_util::BodyStream

SizeHint:body 大小的预测

http_body::SizeHint 是 body 大小的(可能粗糙的)预估——lower: u64(最少多少字节)和 upper: Option<u64>(最多多少字节,None 表示未知)。

用途:

  • Content-Length header:如果 upper == lower、hyper 能自动加 Content-Length
  • 流量预估:客户端库(比如 reqwest)用 size_hint 判断是否能 stream
  • 内存预分配to_bytes 的 Collected 根据 size_hint 预先 alloc

axum 的 Full<Bytes> body 返回精确的 size_hint(等于 bytes 长度)——让响应自动带 Content-Length。StreamBody 返回默认的 SizeHint::default()(lower=0, upper=None)——因为 stream 的大小未知——响应用 chunked encoding 而不是 Content-Length。

is_end_stream:body 已经结束吗

is_end_stream(&self)——返回 true 表示"确定下一次 poll_frame 返 None"。用途是让 hyper 不 poll 已知结束的 body——小优化。

Empty 的 is_end_stream 返 true——零次 poll 就知道结束。Full 在发完内容后返 true。自定义 body 可以智能返回——精确实现让 hyper 行为更高效。

流式 vs 缓冲的具体差异

Body 既能缓冲也能流式——两种形态的性能和语义差异:

维度缓冲(Full / Vec<u8>流式(StreamBody)
内存占用O(body size)O(chunk size)
首字节延迟等到 body 构造完第一个 chunk 就绪就开始发
总耗时= 构造 + 传输= 构造和传输并行
Content-Length精确None(chunked)
适用场景小 body(几 KB 到几 MB)大 body(文件、日志流)
典型类型Json、Html 响应SSE、下载、proxy

内存优势:对 1GB 文件下载,流式版本 peak 内存只是一个 chunk(几 KB 到几 MB)——缓冲版本需要 1GB。

延迟优势:流式可以"边生成边发送"——客户端收到第一字节的时间减去服务端开始处理的时间更短(time-to-first-byte)。对 LLM streaming 这类 user-facing 场景尤其重要。

为什么默认缓冲:大多数 API 响应小(几 KB 的 JSON),缓冲更简单且有 Content-Length。只有明确是"大 body" 或 "需要实时推送"才用流式。

实战:三种流式 body 构造

实战一:文件下载(tokio File → Body)

rust
use tokio::fs::File;
use tokio_util::io::ReaderStream;
use axum::body::Body;

async fn download(Path(name): Path<String>) -> Result<Response, AppError> {
    let file = File::open(format!("/data/{}", sanitize(&name))).await?;
    let stream = ReaderStream::new(file);
    let body = Body::from_stream(stream);
    Ok((
        [
            ("content-type", "application/octet-stream"),
            ("content-disposition", &format!("attachment; filename=\"{name}\"")),
        ],
        body,
    ).into_response())
}

核心链条:

  1. File::open:tokio 的异步文件句柄——实现 AsyncRead
  2. ReaderStream::new(file):把 AsyncRead 转成 Stream<Item = io::Result<Bytes>>——tokio-util 提供
  3. Body::from_stream(stream):包装成 axum body

hyper 每次 poll_frame 时从 stream 拉一块,stream 从 file 读一块——完整的流式链。1GB 文件下载的 peak 内存只有 ReaderStream 的 buffer(默认几 KB)。

实战二:proxy 转发(upstream body → downstream body)

rust
async fn proxy(request: Request) -> Result<Response, AppError> {
    let client = reqwest::Client::new();
    let upstream_url = build_upstream_url(&request);

    // 转发请求到 upstream——body 流式传过去
    let (parts, body) = request.into_parts();
    let upstream_body = reqwest::Body::wrap_stream(body.into_data_stream());

    let resp = client
        .request(parts.method, &upstream_url)
        .headers(parts.headers)
        .body(upstream_body)
        .send()
        .await?;

    // 转发 response——body 流式回来
    let status = resp.status();
    let headers = resp.headers().clone();
    let downstream_body = Body::from_stream(resp.bytes_stream());

    Ok((status, headers, downstream_body).into_response())
}

两次 Stream ↔ Body 转换:

  1. 入站:axum Body → data stream → reqwest Body
  2. 出站:reqwest bytes_stream → axum Body from_stream

整条 proxy 路径完全流式——两边都不缓冲。1GB 文件经过 proxy 只占几 MB 内存(两边的 chunk buffer)。

实战三:定时事件源(interval → SSE → Body)

rust
use std::time::Duration;
use tokio_stream::wrappers::IntervalStream;

async fn heartbeat_stream() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let interval = tokio::time::interval(Duration::from_secs(1));
    let stream = IntervalStream::new(interval).map(|_| {
        Ok(Event::default()
            .event("heartbeat")
            .data(chrono::Utc::now().to_rfc3339()))
    });

    Sse::new(stream).keep_alive(KeepAlive::default())
}

Stream 来源是 IntervalStream (tokio-stream crate)——每秒 tick 一次。map 转成 EventSse::new 把 Event stream 包装成 SSE 格式的 Body。

这展示了 axum 流式生态的组合力:任何 Stream 都能作为 Body 源——定时、broadcast channel、数据库游标、LLM token 流——只要能产生 Bytes(或能转成 Bytes 的事件类型)。

实战四:LLM token 流式输出

LLM 应用最经典的流式场景——模型一个 token 一个 token 输出、前端一个一个展示:

rust
use axum::body::Body;
use futures::StreamExt;

async fn chat(Json(req): Json<ChatRequest>) -> Result<Response, AppError> {
    let llm_stream = llm_client.stream_completion(req.messages).await?;

    // llm_stream: Stream<Item = Result<Token, LlmError>>
    let sse_stream = llm_stream.map(|token_result| {
        let data = match token_result {
            Ok(token) => format!("data: {}\n\n", serde_json::to_string(&token).unwrap()),
            Err(e) => format!("event: error\ndata: {}\n\n", e),
        };
        Ok::<_, std::convert::Infallible>(data.into_bytes())
    });

    Ok((
        [
            ("content-type", "text/event-stream"),
            ("cache-control", "no-cache"),
            ("x-accel-buffering", "no"),  // 禁用 Nginx buffering
        ],
        Body::from_stream(sse_stream),
    ).into_response())
}

几个生产细节:

一、x-accel-buffering: no:Nginx 默认对响应做 proxy buffering——SSE 流会被 Nginx 缓冲等完再一次性发给客户端——失去流式意义。这个 header 告诉 Nginx 不要缓冲。Cloudflare 也识别这个 header。

二、直接用 Body::from_stream 而不是 Sse::new:第 10 章的 Sse<S> helper 专门做 SSE 语义(Event + KeepAlive)——但 LLM 流通常自己格式化(OpenAI 格式 data: {...}\n\n),Body::from_stream 更直接。

三、Error 的 Infallible:stream 内部 map 已经把 llm error 转成 SSE event: error 字节——再无其他错误。用 Infallible 告诉编译器这个 stream 永不真正失败。

四、token JSON 格式:和 OpenAI API 的 SSE 格式兼容——方便客户端 SDK(比如 openai 的 JS 客户端)直接复用。

这个模式是每个 LLM 应用的基础——从 stream 来源(模型 backend)到 stream 响应(前端 SSE)的直接链。流式性让用户感觉到"模型在工作"——首 token 延迟比等完整响应好太多。

实战五:chunked upload 与部分下载

大文件上传时用 multipart/form-data(第 8 章讨论过)是一种选——另一种是chunked transfer encoding——客户端分块发 body,axum 流式接收:

rust
async fn upload(request: Request) -> Result<Response, AppError> {
    let (_, body) = request.into_parts();
    let mut stream = body.into_data_stream();

    let path = format!("/uploads/{}", uuid::Uuid::new_v4());
    let mut file = tokio::fs::File::create(&path).await?;
    let mut total = 0usize;

    while let Some(chunk) = stream.next().await {
        let chunk = chunk?;
        total += chunk.len();
        if total > 100 * 1024 * 1024 {  // 100MB 上限
            let _ = tokio::fs::remove_file(&path).await;
            return Err(AppError::PayloadTooLarge);
        }
        file.write_all(&chunk).await?;
    }

    file.flush().await?;
    Ok(Json(json!({"path": path, "size": total})).into_response())
}

边读边写——内存只占单个 chunk 的大小。100MB 文件的 peak 内存几 KB(tokio chunk buffer)+ file OS cache——可控。

对应的断点续传(客户端用 Range header 请求部分):

rust
async fn download_with_range(
    Path(name): Path<String>,
    headers: HeaderMap,
) -> Result<Response, AppError> {
    let path = format!("/files/{}", sanitize(&name));
    let file = File::open(&path).await?;
    let metadata = file.metadata().await?;
    let size = metadata.len();

    // parse Range: bytes=X-Y
    let range = headers.get("range").and_then(|v| parse_range(v.to_str().ok()?, size));

    match range {
        Some((start, end)) => {
            let mut file = file;
            file.seek(std::io::SeekFrom::Start(start)).await?;
            let limited = file.take(end - start + 1);  // 只读部分
            let stream = ReaderStream::new(limited);
            Ok((
                StatusCode::PARTIAL_CONTENT,
                [
                    ("content-range", format!("bytes {}-{}/{}", start, end, size)),
                    ("content-length", (end - start + 1).to_string()),
                ],
                Body::from_stream(stream),
            ).into_response())
        }
        None => Ok((
            [("content-length", size.to_string())],
            Body::from_stream(ReaderStream::new(file)),
        ).into_response()),
    }
}

客户端挂机重启时用 Range header 续传——节省网络。这对大视频流或大文件下载场景必要。

CompressionLayer 与 body 的互动

tower-http::compression::CompressionLayer 自动给响应 body 压缩(gzip / brotli / deflate)——透明、用户不用改 handler:

rust
let app = Router::new()
    .route("/", get(handler))
    .layer(CompressionLayer::new());

CompressionLayer 内部的工作:

  1. 检查客户端的 Accept-Encoding header
  2. 包装响应 body——把每个 chunk 经过压缩 encoder
  3. Content-Encoding: gzip、删除 Content-Length(压缩后长度变了)

和 axum::Body 的互动:body 本身不参与压缩——CompressionLayer 用 body 的 poll_frame 读原始 chunk、压缩、写进它自己的 stream body。对 axum::Body 来说完全透明——它不关心数据是否被压缩。

这层解耦是 http_body trait 设计的价值——body 只管"产出字节"、压缩只管"变换字节"——各自独立、组合工作。想自己写 encryption layer、encoding adapter 都能按相同思路做。

Frame 的 trailer 细节

HTTP/2 的 trailer 是body 之后的额外 header——gRPC 用它传 status code、metadata。轮廓:

text
HTTP/2 DATA frame (body chunk)
HTTP/2 DATA frame (body chunk)
...
HTTP/2 HEADERS frame (trailer, end_stream=true)

axum 的 Body 能产出 trailer——但 handler 的典型用法不涉及:

rust
use http_body::{Body, Frame};
use http::HeaderMap;

struct MyBody { /* ... */ }

impl Body for MyBody {
    // ...
    fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Result<Frame<Bytes>, Error>>>
    {
        if self.done_sending_data() {
            // 发一个 trailer
            let mut trailers = HeaderMap::new();
            trailers.insert("x-status", "ok".parse().unwrap());
            Poll::Ready(Some(Ok(Frame::trailers(trailers))))
        } else {
            // 发 data
            Poll::Ready(Some(Ok(Frame::data(self.next_chunk()))))
        }
    }
}

自定义 Body 产出 Frame::trailers(...)——hyper 会在 HTTP/2 上正确编码。BodyDataStream 过滤掉 trailer——into_data_stream 的 user 不用看到。想要 trailer 用 http_body_util::BodyStream

生产里 axum handler 几乎不产出 trailer——只有 gRPC(tonic)需要。tonic 在 axum 上的集成依赖这个能力——把 gRPC status 通过 trailer 传出去。

HttpBody 和 axum Body 的互操作

axum::body::HttpBody 直接 re-export http_body::Body——axum 用户不需要单独 import http-body crate。但底层类型互相兼容:

rust
use axum::body::{Body, HttpBody};

// 任何 http_body::Body 都能变成 axum Body
fn from_any_body<B>(b: B) -> Body
where B: HttpBody<Data = Bytes> + Send + 'static, B::Error: Into<BoxError>,
{
    Body::new(b)
}

// 反过来——axum Body 本身实现了 HttpBody
fn body_is_http_body(body: Body) -> impl HttpBody<Data = Bytes> {
    body  // axum::Body 实现了 http_body::Body
}

这种"双向兼容"让 axum 能接收任何 http_body::Body 实现——hyper 的 Incoming、tonic 的 gRPC body、自定义 body 都能塞进 Body::new。反过来 axum Body 也能作为 http_body 交给下游(hyper、reqwest)。

深入:Body 的 pin_project 结构

Body 本身是一个 newtype:

rust
pub struct Body(BoxBody);

没有 pin_project——UnsyncBoxBody 内部处理 pin。但 axum 用 Pin::new(&mut self.0) 直接 pin Box 的 inner(body.rs:113-134):

rust
impl http_body::Body for Body {
    type Data = Bytes;
    type Error = Error;

    fn poll_frame(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
        Pin::new(&mut self.0).poll_frame(cx)
    }
}

BoxBody 内部是 Pin<Box<dyn Body>>——pin 已经通过 Box 实现了。Pin::new(&mut self.0) 直接拿 Pin<&mut BoxBody>——因为 BoxBody 是 Unpin(它内部的 Pin<Box> 是 Unpin)。这让代码不需要 pin_project——简洁。

同样的技巧用在 BodyDataStream:内部的 Body 是 Unpin、所以外层不需要 pin_project。只有 StreamBody 的 Stream(可能 !Unpin)需要 pin_project。

SizeHint 深入

SizeHint 是 body 对"我有多少字节"的预估——http_body::SizeHintlower: u64upper: Option<u64>。几种典型情况:

Body 类型lowerupper含义
Empty0Some(0)精确空
Full<Bytes>nSome(n)精确 n 字节
StreamBody<S>0None未知——任意大
Limited<B>inner.lowermin(inner.upper, limit)受限上界
自定义 counting bodynNone至少 n 字节

SizeHint 影响:

HTTP/1.1 的 Content-Length 或 chunked:精确 size 用 Content-Length、不精确用 chunked transfer encoding。chunked 每块前加长度字节、略有开销;Content-Length 更高效但要求预知。

预分配优化to_bytes 读完整 body 前会根据 upper hint 预 alloc 一块 BytesMut——避免多次 realloc。SizeHint 准确能省 alloc 次数。

内存压力估计:客户端库可以用 upper 判断是否值得缓冲——太大的响应选择流式处理而非 to_bytes。

自己写 Body impl 时,尽量提供准确的 SizeHint——让 hyper 和上层代码做出更好的 I/O 决策。

Body 的 FAQ

Q:handler 里先 to_bytes 再手动 parse JSON 对不对?

不对——应该用 Json<T> 提取器。to_bytes + 手动 parse 失去了 Json 提取器的 Content-Type 校验、错误分类(422 vs 400)、body limit 等。只有在"需要自定义行为"时才绕过提取器。

Q:Body::from_stream 的 stream 已 exhausted 后再 poll 会怎样?

poll_frame 返 None。如果再继续 poll,行为取决于 Stream 实现——大多数情况下持续返 None。不应该依赖这个——FusedStream 明确定义 exhausted 后的行为。

Q:Body 能 clone 吗?

不能——Body 的 UnsyncBoxBody 不是 Clone(body 可能是一次性的 stream)。想"body 复用"需要先 to_bytes 缓冲成 Bytes、然后 Bytes 能 clone(Arc 语义)、再各自 Body::from(bytes.clone())。

Q:Body 支持 Seek 吗?

不支持。http_body::Body 是 forward-only 的——只能按顺序 poll_frame。想要 seek(比如断点续传)要在 body 外层处理——比如文件层用 file.seek 再包 body。

Q:返回超大 body 会导致什么?

hyper 按 poll_frame 速率发——backpressure 自动工作。客户端慢、TCP buffer 满——hyper 停止 poll body——body 停止生成。内存不会爆。唯一需要注意是 body 的生成本身不要提前"产出一大堆待发"——应该按 poll 节奏生成。

Q:Body 在 HTTP/2 上有什么不同?

HTTP/2 的 body 经过 flow control——客户端宣告 window size、服务端发到上限就停。这在 hyper 层处理、对 axum::Body 完全透明。body 该怎么写就怎么写——HTTP/1.1 和 HTTP/2 行为一致。

body 在 axum 内部流转

HTTP 请求从 TCP 到 handler、响应从 handler 到 TCP 的完整 body 流转:

几个关键点:

  1. hyper::Incoming → axum::Bodyserve/mod.rs:594req.map(Body::new)——一次性类型统一
  2. handler 有两条路径:to_bytes 缓冲 + 流式 map
  3. hyper 根据 SizeHint 选编码:精确 size → Content-Length;未知 → chunked transfer encoding

整条路径保持类型一致(都是 axum::Body),但实际数据流可以是全缓冲或全流式——取决于 body 内部的具体实现。

Content-Length 与 Transfer-Encoding 的交互

body 在 HTTP wire 上有两种编码:

Content-Length:头里声明 body 长度——客户端读 N 字节就知道结束。要求提前知道长度。

Transfer-Encoding: chunked:body 按 chunk 发送——每 chunk 前是长度字节。不需要提前知道总长度。

axum body 的 SizeHint 决定用哪种——hyper 读 SizeHint:

  • 精确upper == Some(n) 且 lower == n)→ Content-Length: n
  • 不精确 → Transfer-Encoding: chunked

手动覆盖:

rust
// 强制 Content-Length(如果精确)
async fn explicit_length() -> impl IntoResponse {
    let bytes: Bytes = generate_data().await;
    let len = bytes.len();
    ([("content-length", len.to_string())], Body::from(bytes))
}

// 强制 chunked(流式)
async fn chunked() -> impl IntoResponse {
    let stream = generate_stream();
    Body::from_stream(stream)  // SizeHint 是 None, hyper 自动用 chunked
}

客户端偏好:现代 HTTP client 都支持 chunked——但有些极老系统(嵌入式、L4 proxy)偏好 Content-Length。实际看场景选。

HTTP/2 没有这个区分:HTTP/2 全部用 DATA frame——Content-Length 作为 header 传但不影响传输机制。上面的讨论主要是 HTTP/1.1。

Body 的生命周期

一个请求响应周期里 body 的状态变化:

关键 state transitions:

  • Received → AxumBody:第 15 章讲的 req.map(Body::new)——一次类型转换
  • HandlerRead → Bufferedto_bytes + 处理 bytes——常见简单路径
  • HandlerRead → Streamedinto_data_stream + map/filter——高级流式路径
  • ResponseBody → Sent:hyper 的 connection task poll_frame 一个一个 chunk 写 TCP

整个过程body 永远是一次消费——不能读完了回到 Received 重读。这条约束深植于 HTTP 协议本身(第 6 章深入讨论过)。

实战六:tar 流式打包

把多个文件打包成 tar 流式下载——不缓冲整个 tar 到内存:

rust
use axum::body::Body;
use futures::stream::{self, StreamExt};
use tokio::io::AsyncWriteExt;

async fn download_tar(Path(dir): Path<String>) -> Result<Response, AppError> {
    let entries = list_files(&dir).await?;

    // 构造 async stream: 对每个文件产出 tar header + content
    let stream = async_stream::stream! {
        let mut tar = TarWriter::new(Vec::new());

        for entry in entries {
            let path = entry.path;
            let metadata = tokio::fs::metadata(&path).await?;

            // 先 yield tar header
            let header = tar.write_header(&path, metadata.len()).await?;
            yield Ok(Bytes::from(header));

            // 再 yield 文件内容
            let mut file = tokio::fs::File::open(&path).await?;
            let mut buf = vec![0u8; 64 * 1024];
            loop {
                let n = file.read(&mut buf).await?;
                if n == 0 { break; }
                yield Ok(Bytes::copy_from_slice(&buf[..n]));
            }
        }

        // tar 结束 marker
        yield Ok(Bytes::from(tar.finish()));
    };

    Ok((
        [("content-type", "application/x-tar"),
         ("content-disposition", "attachment; filename=\"archive.tar\"")],
        Body::from_stream(stream),
    ).into_response())
}

核心:用 async_stream::stream! 宏构造一个 generator-like 的 stream——每次 yield 一块 bytes。文件内容按 64KB 块流式读、写——总内存占用恒定、几 GB 的 tar 都能流。

这种 pattern 对"按需聚合"的下载场景(zip / tar / 合并多个响应)很有用——所有都遵循 "generate → bytes chunk → yield → body" 的套路。

body 的 DoS 防线

body 处理是很多 DoS 攻击的目标——送大 body 耗尽内存、慢速 body 占用连接、分块 body 里藏非法数据。axum 的防御:

一、DefaultBodyLimit:默认 2MB 请求 body 限制。所有 Bytes::from_requestJson::from_request 等都经过这层——超过直接 413。

二、to_bytes 显式 limit:自己调 to_bytes 时第二参数要写合理值——不用 usize::MAX

三、hyper 的 body read timeout:第 15 章提到的 header_read_timeout——防止 slowloris(送 body 很慢、拖住连接)。axum 默认启用。

四、request body limit layer (tower-http):全局限制 request body——比 DefaultBodyLimit 更 strict(不依赖提取器触发)。

五、Content-Length 验证:hyper 会校验 Content-Length 的 body 不超过头声明的大小。

这五层配合防御几乎所有 body-level DoS。生产的 checklist:

  • 配合前置 LB(nginx / ALB)做第一层 body size 限制(通常几十 MB 硬上限)
  • axum 层用 DefaultBodyLimit 或 RequestBodyLimitLayer 做第二层(按业务上限)
  • handler 里 to_bytes 调用带合理 limit
  • 监控 413 响应数——异常增加可能是攻击

body 的性能维度

Body 的各种形态有不同的性能 profile。几个常见场景的量化:

小 JSON 响应(几 KB)

  • Json(data).into_response() → Body::from(bytes) → Full<Bytes>
  • 开销:serde_json 序列化(几 µs 到几十 µs)、Body::from 零成本、响应传输几 ns
  • bottleneck: serde_json

大文件下载(100 MB)

  • Body::from_stream(ReaderStream::new(file))
  • 开销:每次 poll 从 file 读一块(默认 buffer 8KB)、转成 Bytes、返回
  • bottleneck: 磁盘 I/O,内存 = 单 chunk 大小

SSE 推送

  • Body::from_stream(event_stream)
  • 开销:每次 event 一次 poll、格式化成 bytes、返回
  • bottleneck: event 生成速率

proxy 转发

  • request.body.into_data_stream() → reqwest body → upstream
  • 开销:两次 data_stream ↔ body 转换(零拷贝)、网络 I/O
  • bottleneck: 上游延迟

关键观察:body 本身的开销通常不是 bottleneck——要么是序列化、要么是 I/O。优化方向应该针对这些——Body 层只要"不乱加开销"就够。axum 的 body 设计确实做到了——UnsyncBoxBody 的 vtable 调用只有几 ns、from_stream 的包装零分配(除了 SyncWrapper 的栈空间)。

body 的零拷贝路径

Bytes 的核心特性是零拷贝共享——底层 Arc<[u8]> 或类似结构,clone 只是引用计数加一。Body 内部的 data frame 都是 Bytes——意味着:

  1. hyper 从网络收到字节进 Bytes buffer——不拷贝
  2. Bytes 经过提取器、handler、响应 body——只传引用
  3. hyper 把 Bytes 写到网络 socket——不拷贝

整条路径零拷贝。只有在用户代码里 .clone().to_vec() 时才真正复制字节。这让大 body 的处理 CPU 成本低——瓶颈在网络和业务逻辑、不在 byte shuffling。

proxy body 的精细控制

前面的 proxy 例子是"流式转发"——还有更精细的控制场景。

场景:过滤 / 修改 body

proxy 时想对 body 做修改(比如加请求签名、改 URL references)——需要读完修改再转发:

rust
async fn signed_proxy(request: Request) -> Result<Response, AppError> {
    let (parts, body) = request.into_parts();

    // 缓冲 body (受 limit)
    let bytes = to_bytes(body, 10 * 1024 * 1024).await?;

    // 加签名
    let signature = hmac_sign(&bytes, &SIGNING_KEY);
    let mut headers = parts.headers.clone();
    headers.insert("x-signature", signature.parse().unwrap());

    // 转发
    let resp = client.post(upstream)
        .headers(headers)
        .body(bytes)
        .send()
        .await?;

    // 响应流式回来 (这部分还是流式)
    Ok(axum_response_from_reqwest(resp))
}

这种 pattern 的 tradeoff:

  • 收益:能对 body 做任何处理
  • 成本:request body 全缓冲——1GB upload 要 1GB 内存

生产上 hybrid:对 small body(<1MB)缓冲再修改、对 large body(>1MB)流式不修改(或者用 streaming-capable 加密库边流边签)。具体策略看业务——axum 的 body API 足够灵活支持各种组合。

场景:response body 的条件修改

某些场景想根据 response 的内容决定是否透传——比如只 proxy 成功响应、失败替换为自定义错误页:

rust
async fn conditional_proxy(request: Request) -> Result<Response, AppError> {
    let upstream_resp = client.send(request).await?;
    let status = upstream_resp.status();

    if status.is_success() {
        // 成功——流式转发
        let body = Body::from_stream(upstream_resp.bytes_stream());
        Ok(Response::builder().status(status).body(body).unwrap())
    } else {
        // 失败——用自定义错误替换
        let err_body = upstream_resp.text().await.unwrap_or_default();
        tracing::warn!(status = %status, body = %err_body, "upstream error");
        Ok(Response::builder()
            .status(502)
            .body(Body::from("upstream unavailable"))
            .unwrap())
    }
}

upstream 成功路径流式、失败路径缓冲 error body 后生成新 response——按需选择策略。

Body 的 StreamBody 内部细节

StreamBody 的 pin_project + SyncWrapper 组合值得深入:

rust
pin_project! {
    struct StreamBody<S> {
        #[pin]
        stream: SyncWrapper<S>,
    }
}

为什么用 SyncWrapper?因为底层 Stream S 可能不是 Sync——但 http_body::Body 用在 UnsyncBoxBody 里不强制 Sync。SyncWrapper 的作用是标注类型——让包了它的 struct 是 Sync——即便 inner S 不是。

SyncWrapper 是怎么做到安全的?通过 API 设计:

  • 构造:SyncWrapper::new(s) - 任何 s 都能包
  • 访问:必须通过 &mut self 才能拿到 inner &mut S——不能通过 &self(单线程访问保证)

共享是对 &self 的——SyncWrapper 阻断这条路径——所以即便 S 非 Sync 也安全。代价是有 &mut self 的地方才能用——对 http_body 的 poll_frame(Pin<&mut self>) 签名完美兼容(&mut self)。

这是一种"用 API 限制换 trait bound 放松"的典型 Rust 技巧——牺牲一点灵活(必须 mut 访问)、获得范围更大的类型兼容性。

跨书关联:http_body 规范

http_body crate 是 Rust HTTP 生态的标准 body trait——hyper、axum、tonic、reqwest、actix-web(部分)都基于它。定义简洁但经过几次演进:

  • http_body 0.x:两个方法——poll_datapoll_trailers 分别返回 data 和 trailer
  • http_body 1.0:合并成 poll_frame 返回 Frame<T>——统一接口。axum 0.7+ 用 1.0 版本

axum 紧跟 http_body 的最新版本——因为 hyper 1.0 也迁移到了这版。整个生态协同演进——但偶尔有版本兼容性问题(老项目用 http-body 0.4 会和新 crate 冲突)。

Hyper 与 Tower:工业级 HTTP 栈》第 10 章专门讨论 http_body——从 trait 设计到常见实现(Full、Empty、Limited、Combinators)。读那一章后再看 axum 的 Body,会发现 axum 只是在 http_body 上做最薄的 wrapper——UnsyncBoxBody 类型擦除 + 几个便利 constructor。

测试 body 的常见 pattern

测试 handler 的响应 body 几种写法:

rust
use axum::body::to_bytes;
use tower::ServiceExt;

#[tokio::test]
async fn returns_expected_json() {
    let app = Router::new().route("/data", get(data_handler));
    let response = app
        .oneshot(Request::builder().uri("/data").body(Body::empty()).unwrap())
        .await
        .unwrap();

    assert_eq!(response.status(), StatusCode::OK);

    // 把 body 转成 bytes
    let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
    let parsed: MyData = serde_json::from_slice(&body).unwrap();

    assert_eq!(parsed.field, expected_value);
}

几个要点:

  • to_bytes(body, usize::MAX):测试里不怕 body 大、用 MAX
  • serde_json::from_slice:bytes 直接 parse,省一次 String 转换
  • oneshot:直接跑 Service 不启 TCP、更快

测试流式 body:

rust
#[tokio::test]
async fn streams_tokens() {
    let app = Router::new().route("/stream", get(stream_handler));
    let response = app.oneshot(Request::new(Body::empty())).await.unwrap();

    let mut stream = response.into_body().into_data_stream();

    // 逐块验证
    let chunk1 = stream.next().await.unwrap().unwrap();
    assert_eq!(&chunk1[..], b"event: start\n\n");

    let chunk2 = stream.next().await.unwrap().unwrap();
    assert!(chunk2.starts_with(b"data: "));

    // 最终结束
    assert!(stream.next().await.is_none());
}

对 SSE / 分块响应,验证每个 chunk 符合预期——比 to_bytes 一次性更精细。

与其他 body 类型的 conversion

实际项目会涉及多种 body:

  • hyper::body::Incoming:hyper 服务端收到的 request body(从网络 parse 出来)
  • reqwest::Body:reqwest 客户端用于发请求/接响应的 body
  • http_body_util::Full<Bytes>:全缓冲
  • http_body_util::Empty:空
  • axum::body::Body:axum 用的

它们的 conversion:

rust
// hyper::Incoming → axum::Body
let axum_body = Body::new(incoming);  // try_downcast 识别, 零成本

// Vec<u8> → axum::Body
let axum_body = Body::from(vec);  // From impl, Full wrap

// Stream → axum::Body
let axum_body = Body::from_stream(stream);  // StreamBody wrap

// axum::Body → reqwest::Body (for proxy)
let reqwest_body = reqwest::Body::wrap_stream(axum_body.into_data_stream());

// reqwest::Response → axum::Body
let axum_body = Body::from_stream(response.bytes_stream());

每对转换的具体 API 都在 axum / reqwest / http_body_util 的文档里——规则都很直白。只要两边都遵守 http_body 的 trait、conversion 通常是一行。

http_body 的版本演进

http_body 作为整个 Rust HTTP 生态的基础 trait,经过几轮大改:

  • http_body 0.3:最早期、API 简陋
  • http_body 0.4:加了 SizeHintpoll_data / poll_trailers 分开
  • http_body 1.0:合并 poll_data/poll_trailers 成 poll_frame + Frame<T> 枚举——当前稳定版

axum 的版本对应:

  • axum 0.5/0.6:用 http_body 0.4
  • axum 0.7+:用 http_body 1.0(和 hyper 1.0 同步)

这次重大升级(0.4 → 1.0)让 axum 0.7 有大量不兼容改动——老项目升级 axum 要同时更新 http-body 依赖。好处是整个生态用同一个版本、互操作不用 adapter。

版本演进的经验:HTTP 生态的底层 crate(hyper、http、http-body)升级会波及所有上层。axum 团队非常谨慎地跟进——通常 hyper 发 major 版本后几个月内 axum 跟上、打包完整的升级指南。

本章的几个心得

回头看本章内容可以总结几个关键 insight:

一、Body 是类型擦除的艺术UnsyncBoxBody<Bytes, Error> 让一个具体类型承载多种 body 形态——用户 API 简单、生态兼容性好

二、流式和缓冲的选择是语义而非技术:axum 两种都支持、API 对称。选哪个看业务需求——小 body 缓冲简单、大 body 流式必要

三、pin 和 SyncWrapper 是 Rust 异步 HTTP 的底层术语:pin_project 处理 !Unpin future 的 safe access、SyncWrapper 处理 !Sync 类型的 across-thread 兼容。两者都是为了让更多的 Rust 类型能接入 HTTP body 接口。

四、零拷贝通过 Bytes 实现:整条 body 处理路径 Bytes 共享引用计数——不做实际字节拷贝。I/O 路径的 CPU 开销极低。

五、http_body 是生态级的 trait:axum 的 Body 是 http_body 的"友好封装"——不替代、不分叉、完全兼容。这让 axum 的 body 处理能和 tonic、reqwest、hyper 等其他 HTTP crate 无缝互操作。

和第 10 章 SSE 的回顾

第 10 章讲 SSE 时提到 SseBody<S> 是自定义 http_body::Body 实现。现在可以完整理解它:

rust
// 第 10 章 sse.rs:110-135 (简化)
pin_project! {
    struct SseBody<S> {
        #[pin]
        event_stream: SyncWrapper<S>,
    }
}

impl<S, E> HttpBody for SseBody<S>
where S: Stream<Item = Result<Event, E>>,
{
    type Data = Bytes;
    type Error = E;

    fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>>
    {
        // poll stream, 每个 Event finalize 成 Bytes 再 Frame::data
    }
}

和本章讨论的 StreamBody 结构一模一样——都是 SyncWrapper<S> + pin_project。差异在 SseBody 要把 Event 编码成 SSE 帧格式(data: ...\n\n)、StreamBody 直接用 Bytes。同一套基础结构支持不同协议——是 http_body 设计的强大之处。

小结:Body 的两个维度

Body 在 axum 里承担两个维度的抽象:

一、类型擦除UnsyncBoxBody<Bytes, Error> 隐藏具体 body 类型——用户只看到 Body,不用管是 Full、Empty、StreamBody 还是 Incoming。try_downcast 优化让重复 wrap 零成本。

二、流式/缓冲统一:同一个 Body 类型可以内部是缓冲(Full)或流式(StreamBody)——上层代码(hyper、handler)用 poll_frame 统一 API——SizeHint 告知具体形态。

两个维度合起来让 body 处理既类型简单(就一个 Body 类型)又功能灵活(能覆盖从空响应到 TB 级流式响应)。这是 axum 简洁 API 下的强大能力——关键都在 trait 设计和类型擦除的巧妙应用。

Body FAQ 补充

Q:能在 response 里同时返回 body 和 trailer 吗?

可以——自定义 Body 在 poll_frame 里先 yield 多个 data frame、最后 yield 一个 trailer frame。hyper 会在 HTTP/2 上正确发送 trailer。HTTP/1.1 的 trailer 支持有限、大多数场景不建议用。

Q:hyper 看到 upper: Some(n) 会严格 enforce n 字节吗?

不会——hyper 把它当 hint 用来设 Content-Length。如果 body 实际 yield 了超过 n 字节 data,hyper 会发现并 panic 或 truncate(具体行为版本依赖)。所以 body 实现要保证 SizeHint 准确或者 lower/upper 不精确。

Q:BodyDataStream 丢弃 trailer——有没有 stream 全帧类型?

http_body_util::BodyStream 保留所有 frame 类型——适合需要处理 trailer 的场景(比如 gRPC 实现)。axum 的 BodyDataStream 是"简化版"——大多数 HTTP handler 用它够。

Q:Full<Bytes>Full<BytesMut> 哪个好?

都可以——Full 泛型到 T: Buf。Bytes 是 Arc-based 零拷贝、BytesMut 是可修改版本。构造 body 用 Bytes;内部还要修改用 BytesMut。最后 BytesMut::freeze() 变 Bytes 进 body。

Q:Body 能在多个 handler 间共享吗?

不能——Body 不是 Clone。每个 Body 是单次消费的。想"共享" body 数据要先 to_bytes 缓冲成 Bytes——Bytes 能 clone(引用计数)、各 handler 各自 Body::from(bytes.clone()) 独立消费。

Q:body 的 chunk 大小能控制吗?

由生成侧决定——ReaderStream::new(file) 默认 chunk 8KB、ReaderStream::with_capacity(file, 64 * 1024) 改成 64KB。大 chunk 减少 poll 次数、小 chunk 减少 per-chunk 延迟。根据网络条件权衡——几 KB 到几 MB 都合理。

Q:body 生成失败了怎么办?

Body::from_stream 的 Stream 返回 Err 会让 body 的 poll_frame 返 Err——hyper 看到 Err 会断连(TCP 关)。客户端看到 TCP reset 或 "incomplete message"。如果 body 生成可能失败——应该尽早检查(在 body 构造前)、失败返回 5xx、成功后再流式发送。一旦开始流式发送后就很难"优雅失败"了——HTTP 没有"响应开始后改主意"的机制。

Q:能从 stream 得到 size hint 吗?

不能自动——Stream trait 的 size_hint 是 (lower, upper) 元素数、不是字节数。需要自己的 body 实现手动计算并返回 SizeHint。比如对 File 可以先查 metadata 得到大小、然后 body 的 SizeHint 返回 (size, Some(size))——让响应自动带 Content-Length。

一个容易忽略的细节:body 的 Drop 行为

当 handler 早早 return(不读完 body)、或者客户端中途断开连接——body 被 drop。drop 会发生什么?

对缓冲 body(Full / Bytes):drop 只是释放内存——没副作用。

对流式 body(StreamBody<S>:drop 时内部 Stream 被 drop——Stream 内部的资源(File handle、tokio::sync receiver、数据库游标)随之释放。Rust 的 RAII 保证这层自动清理。

对来自 hyper 的 Incoming body:drop 通知 hyper "client 的后续字节我不读了"——hyper 继续从 TCP 丢弃(或按 HTTP/2 的 RST_STREAM 终止该 stream)——不会卡住连接。

这意味着用户不需要手动处理 "handler 没读完 body 怎么办"——Drop 完全兜底。只需要确保业务逻辑正确——不要让 body 被读一半再继续用(那会 panic 或 error)。

这条 Drop 语义也让第 10 章讲的 SSE 连接清理能工作——客户端断开时 body 被 drop、SSE 的 Stream 被 drop、它里面的 broadcast receiver / 数据库查询都随之释放。Rust 的所有权链让"资源清理"成为自动动作——不用写 try-finally 之类的显式清理。

几个工程级最佳实践

最后给几个生产级的 body 处理建议:

一、默认流式、特殊情况缓冲。对文件下载、SSE、proxy 等天然流式场景——不要用 to_bytes 缓冲。只有业务明确需要"整体访问"才缓冲(比如 HMAC 签名、JSON 反序列化)——并且总带合理 limit。

二、to_bytes 的 limit 按接口设置。不同 handler 的 body 合理上限不同:/api/chat 可能 16KB 够、/upload 可能 100MB。不要全局一刀切——在每个 handler 的 to_bytes 里写具体上限。

三、监控 body 相关指标:HTTP 413 响应数(超限)、body 读取失败数、body 平均大小——这些指标能发现 DDoS 尝试、upstream 异常、业务字段膨胀。

四、流式响应加超时:生产的流式 handler(SSE / download)要设上限 tokio::timeout——防止客户端挂起不消费耗 server 资源。30 秒 / 几分钟是常见上限,具体看场景。

五、测试覆盖两路径:handler 的单元测试至少覆盖"小 body 成功"和"超限失败"两条路径——真实客户端会发两种都——测试里别只测前者。

做到这五点,body 层基本不会成为 axum 服务的稳定性问题源。剩下的注意力可以放到业务逻辑、数据库、外部调用上——那些是真正的挑战。

下一章讲 State 管理与 FromRef 子状态——第 7 章简单介绍过 State 提取器、第 18 章会深入类型状态、FromRef 的实际架构价值、多 state 项目的组织模式。那一章把 axum 的"依赖注入"完整机制讲清楚。

基于 VitePress 构建