Appearance
第17章 Body 处理与流式响应
第 15、16 章讲 axum::serve 的连接层。这一章下沉一层——到数据层。Body 是 HTTP 请求和响应的"主要载体"——URL 和 header 只是元信息,真正的负载在 body。axum 里 Body 的类型设计、流式机制、缓冲工具是整个数据流的中枢。
前面章节零散提到过 Body:
- 第 6 章 body 单次消费:FromRequest 消费 Body、FromRequestParts 借用 Parts
- 第 8 章 Multipart:Body 按 stream 分 field 流式读
- 第 10 章 SSE:Body 是 Stream<Item = Event>、
Sse::new(stream)包装 - 第 10 章 文件下载:
Body::from_stream(ReaderStream::new(file))流式返回 - 第 15 章 serve:hyper 通过 Body 读字节、写字节
这一章把这些分散的使用统一起来——讲清楚 Body 的 API、类型结构、流式机制。读完之后你应该能回答:为什么 Body 能既是缓冲的又是流式的?Body::from_stream 内部做了什么?到底什么时候该 to_bytes 缓冲、什么时候该 stream?
为什么 body 是 HTTP server 框架的中枢
HTTP server 框架的工作本质上就是"字节进字节出"——所有路由、提取器、middleware 最终都是"对字节的某种处理"。body 是这些字节的 载体——一个框架对 body 的抽象质量基本决定了它能做什么。
axum 的 body 设计反映了几个成熟的工程取舍:
- 不造新 trait:用 http_body::Body——避免生态分裂
- 类型擦除但高效:UnsyncBoxBody + try_downcast——简单 API + 零重复包装开销
- 流式和缓冲对称:from_stream / into_data_stream / to_bytes——用户按场景选
- Bytes 零拷贝:整条路径共享引用、不做字节 memcpy
这些取舍让 axum 的 body 处理既语言好写(handler 里直接 Body::from(...))又运行时高效(零拷贝 + 流式)。不同于其他一些框架(Flask、Express)"body 先全缓冲到内存再交给 handler"的简单但低效策略——axum 的 body 是 first-class citizen。
Body 的两重身份
axum 的 Body 有两种"身份":
作为类型(axum::body::Body):用户 handler 里见到的、响应和请求里携带的——就是这一个具体类型。没有泛型、没有 trait 对象、handler 签名里直接写 Body 即可。
作为协议(http_body::Body trait):hyper 读写 HTTP body 的通用接口——poll_frame、size_hint、is_end_stream 三件套。所有 HTTP 相关 crate(axum、hyper、tonic、reqwest)都遵守这个 trait。
两个身份的关系:axum::Body(具体类型)实现了 http_body::Body(trait)。axum 提供一个"开箱即用的 body 类型"——背后是标准 trait 的具体包装。
这让 axum 的 body 处理 API 简单(就一个类型)但生态兼容(符合 http_body 规范)——两全其美。
Body:类型擦除的 http_body
Body 的定义在 axum-core/src/body.rs:39,依赖第 13 行的 BoxBody 类型别名:
rust
// axum-core/src/body.rs:13
type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, Error>;
// axum-core/src/body.rs:37-39
#[must_use]
#[derive(Debug)]
pub struct Body(BoxBody);一个 newtype 包装 UnsyncBoxBody。层层分解:
http_body_util::UnsyncBoxBody<Bytes, Error>:http_body::Body 的 trait object wrapper。http_body::Body 是 HTTP body 的标准 trait(hyper / tonic / reqwest 都基于它)——axum 用类型擦除存储任意实现者。
UnsyncBoxBody vs BoxBody:前者不要求 Sync、后者要求。axum 选 UnsyncBoxBody 因为:
- 大多数 body 是从单个 task 处理的——
Send够、不需要Sync Syncbound 会把一些合法的 body 类型排除(比如第 10 章 SSE 章节提过的SyncWrapper<Stream>)
Bytes 作为 data type:http_body::Body::Data 关联类型固定为 bytes::Bytes——最通用的字节容器、零拷贝共享、hyper / h2 等都用 Bytes。
Error = axum_core::Error:第 12 章讲过的 axum 通用错误 wrapper——BoxError 的 newtype。body 的任何错误统一成 axum::Error、downstream 代码只处理一种错误类型。
用户看到一个统一的 Body 类型——下层实际是多种具体 body 类型的类型擦除包装。
Body::new 的 try_downcast 优化
body.rs:43-49:
rust
impl Body {
pub fn new<B>(body: B) -> Self
where
B: http_body::Body<Data = Bytes> + Send + 'static,
B::Error: Into<BoxError>,
{
try_downcast(body).unwrap_or_else(|body| Self(boxed(body)))
}
}try_downcast<T, K>(k: K) -> Result<T, K> 的实现在 body.rs:23-34:
rust
pub(crate) fn try_downcast<T, K>(k: K) -> Result<T, K>
where T: 'static, K: Send + 'static,
{
let mut k = Some(k);
if let Some(k) = <dyn std::any::Any>::downcast_mut::<Option<T>>(&mut k) {
Ok(k.take().unwrap())
} else {
Err(k.unwrap())
}
}核心:尝试把 K downcast 成 T。如果 K 本身就是 T(TypeId 相等),直接返回——不做任何包装。
在 Body::new 的语境下:如果用户传入的 body 本来就是 Body 类型(比如中间件里 Body::new(existing_body)),try_downcast 直接返回那个 Body——不重新 box 一层。
这个优化避免了"不必要的类型擦除嵌套"——Body::new(Body::new(Body::new(inner))) 等价于 Body::new(inner)、不会层层 box。生产里中间件栈可能多次 wrap body——这个优化让 wrap 层数对 runtime cost 无影响。
Body::empty 的实现
body.rs:52-54:
rust
pub fn empty() -> Self {
Self::new(http_body_util::Empty::new())
}用 http_body_util::Empty——一个 ZST,poll_frame 永远返 None。Body::empty() 是零大小、零分配的空 body——适合作为"响应无 body"的默认(比如 204 No Content)。
From 各种基础类型
body.rs:93-111 的宏:
rust
macro_rules! body_from_impl {
($ty:ty) => {
impl From<$ty> for Body {
fn from(buf: $ty) -> Self {
Self::new(http_body_util::Full::from(buf))
}
}
};
}
body_from_impl!(&'static [u8]);
body_from_impl!(std::borrow::Cow<'static, [u8]>);
body_from_impl!(Vec<u8>);
body_from_impl!(&'static str);
body_from_impl!(std::borrow::Cow<'static, str>);
body_from_impl!(String);
body_from_impl!(Bytes);7 种基础类型自动 Into<Body>——http_body_util::Full 是"全内容一次性 body"的包装。让 Body::from("hello".to_string())、Body::from(vec![1, 2, 3]) 都直接工作。
注意:都是 Full——一次返回整个 body——不是流式。字符串和字节数组天然是内存里的、不需要流。Full 在 poll_frame 里一次返 Frame::data(整个 Bytes)、下一次返 None——两次 poll 就结束。
try_downcast 的类型系统技巧
try_downcast 是 Rust 类型系统的小把戏。原理:
rust
pub(crate) fn try_downcast<T, K>(k: K) -> Result<T, K>
where T: 'static, K: Send + 'static,
{
let mut k = Some(k);
if let Some(k) = <dyn std::any::Any>::downcast_mut::<Option<T>>(&mut k) {
Ok(k.take().unwrap())
} else {
Err(k.unwrap())
}
}几个微妙点:
一、Option<T> 作为桥梁:downcast_mut::<Option<T>>(&mut k) 要求 k 的类型是 Option<T>——类型完全匹配。把 K 先 wrap 成 Option<K> 再 downcast——如果 K == T,downcast 成功,可以从 &mut Option<T> 里 .take() 拿出值。
二、'static bound:Any::downcast_mut 要求类型是 'static——运行时的类型信息(TypeId)基于静态类型、带引用的类型 TypeId 不确定。所以 Body 类型和所有相关的 body impl 都要求 'static——这也是 axum 所有地方 'static bound 如此普遍的原因。
三、零运行时类型信息:Rust 的 TypeId 是编译期确定的——运行时 downcast 只是比较 TypeId(一次整数相等检查)。成本极低。
这个 pattern 不只 axum 用——tower、reqwest 等库都有类似 "smart constructor"。目的都是一样:提供一个统一的 constructor API,但内部对已经是目标类型的情况做短路——避免不必要的包装。
Body::from_stream:流式构造
body.rs:59-68:
rust
pub fn from_stream<S>(stream: S) -> Self
where
S: TryStream + Send + 'static,
S::Ok: Into<Bytes>,
S::Error: Into<BoxError>,
{
Self::new(StreamBody {
stream: SyncWrapper::new(stream),
})
}接收一个 TryStream——Stream<Item = Result<Ok, Err>>——Ok 能变成 Bytes、Err 能变成 BoxError。输出一个可以作为 http_body::Body 使用的流式 body。
SyncWrapper 的作用:很多合法 Stream 实现不是 Sync——比如捕获了 Cell 或 RefCell 的 Stream。http_body::Body 本身不要求 Sync(UnsyncBoxBody 就是明确去掉 Sync 要求的版本),但 poll_frame 的签名需要 &mut self pin projection——需要一些额外机制让非 Sync 类型能 poll。SyncWrapper 用 "通过 &mut 才能访问 inner" 的限制绕过 Sync 要求——安全但不够灵活。
StreamBody 的 http_body::Body impl
body.rs:193-220:
rust
pin_project! {
struct StreamBody<S> {
#[pin]
stream: SyncWrapper<S>,
}
}
impl<S> http_body::Body for StreamBody<S>
where
S: TryStream, S::Ok: Into<Bytes>, S::Error: Into<BoxError>,
{
type Data = Bytes;
type Error = Error;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let stream = self.project().stream.get_pin_mut();
match ready!(stream.try_poll_next(cx)) {
Some(Ok(chunk)) => Poll::Ready(Some(Ok(Frame::data(chunk.into())))),
Some(Err(err)) => Poll::Ready(Some(Err(Error::new(err)))),
None => Poll::Ready(None),
}
}
}每次 hyper 调 poll_frame,StreamBody 就 poll 一次底层 Stream——把 Ok(chunk) 转成 Frame::data(chunk.into())、Err(err) 包成 axum_core::Error。
pin_project! { #[pin] stream }:需要 pin projection 让 self.stream 能作为 Pin<&mut S> 访问——try_poll_next 需要这个形式。
chunk.into():S::Ok 只要求 Into<Bytes>——比如可以是 Vec<u8>、String、&'static [u8]——在这里统一转成 Bytes。
这是 axum 把"任意 Rust Stream"接入"HTTP body 协议"的具体 bridge。类似于第 10 章 SSE 的 SseBody——但 SseBody 需要把 Event 编码成 SSE 帧格式,StreamBody 只是透明转发 Bytes。
Body::into_data_stream:反向转换
body.rs:76-78:
rust
pub fn into_data_stream(self) -> BodyDataStream {
BodyDataStream { inner: self }
}BodyDataStream 的定义(body.rs:141-143)加 Stream impl(body.rs:145-168):
rust
pub struct BodyDataStream { inner: Body }
impl Stream for BodyDataStream {
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match ready!(Pin::new(&mut self.inner).poll_frame(cx)?) {
Some(frame) => match frame.into_data() {
Ok(data) => return Poll::Ready(Some(Ok(data))),
Err(_frame) => {} // 跳过非 data frame(比如 trailer)
},
None => return Poll::Ready(None),
}
}
}
}把 Body 当 Stream<Item = Result<Bytes, Error>> 用——每次 poll 从 body 拿一帧、如果是 data frame 返回 bytes、其他类型帧(trailer)丢弃。
用 loop 是因为可能连续几个 non-data frame——要 skip 到第一个 data frame 才 return。
实际场景:
rust
async fn echo(request: Request) -> impl IntoResponse {
let (parts, body) = request.into_parts();
let stream = body.into_data_stream();
// stream 是 Stream<Item = Result<Bytes, Error>>, 可以做各种 stream 操作
let doubled = stream.map_ok(|bytes| {
// 把每块数据大写
Bytes::from(bytes.to_ascii_uppercase())
});
Response::new(Body::from_stream(doubled))
}把请求 body 转 stream、每块数据转大写、重新包成响应 body。整个过程流式——不缓冲到内存——适合大 body。
from_stream 和 into_data_stream 是一对互逆操作——让 Body 在 http_body::Body 和 futures::Stream 两个生态间自由转换。
to_bytes:缓冲工具
流式 body 有时需要缓冲——比如签名验证、JSON 反序列化、文件写盘前的验证。axum 提供 to_bytes(axum/src/body/mod.rs):
rust
pub async fn to_bytes(body: Body, limit: usize) -> Result<Bytes, axum_core::Error> {
Limited::new(body, limit)
.collect()
.await
.map(|col| col.to_bytes())
.map_err(axum_core::Error::new)
}三行代码做三件事:
一、Limited::new(body, limit):http_body_util::Limited 包装 body、限制总字节数。超过 limit 后 poll_frame 返 Err——不继续读。
二、.collect().await:http_body_util::BodyExt::collect 的方法——一次性读完 body、返回 http_body_util::Collected(一个 bytes + 可选 trailer 的容器)。
三、.to_bytes():从 Collected 里拿 Bytes——所有 data frame 拼接成一块。
整个流程:流式 body → Limited wrap → 全部读完(受 limit 控制)→ 返回 Bytes。
limit 参数的重要性:to_bytes(body, usize::MAX) 是"不限"——接受客户端发送的任意大 body。生产里这是 DoS 漏洞——攻击者发个 10GB 的 body 耗尽内存。正确做法:
rust
let bytes = to_bytes(body, 10 * 1024 * 1024).await?; // 10MB 上限到限失败后,axum 的错误是 http_body_util::LengthLimitError——handler 捕获返 413 Payload Too Large。
to_bytes 是 Json / Form / Multipart / Bytes 这些提取器的底层工具——所有"需要缓冲到内存再处理"的场景都走这个。
Limited 深入:防超大请求
http_body_util::Limited<B> 是一个 body wrapper——限制 body 总字节数。内部实现:
rust
// http_body_util::Limited (简化)
pub struct Limited<B> {
inner: B,
limit: usize,
consumed: usize, // 已读字节
}
impl<B: Body> Body for Limited<B> {
type Data = B::Data;
type Error = LimitedError<B::Error>;
fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Result<Frame<Self::Data>, Self::Error>>>
{
// poll inner
match ready!(self.inner.poll_frame(cx)) {
Some(Ok(frame)) => {
if let Some(data) = frame.data_ref() {
self.consumed += data.remaining();
if self.consumed > self.limit {
return Poll::Ready(Some(Err(LengthLimitError.into())));
}
}
Poll::Ready(Some(Ok(frame)))
}
// ...
}
}
}每次 poll 累加读到的字节数——超过 limit 立即 Err——上层停止读、body 不会真的完成缓冲。这让"10GB 上传"被立刻拒绝——而不是"等缓冲完 10GB 才发现超限"。
错误类型 LengthLimitError 能通过 downcast 识别:
rust
match to_bytes(body, 1024).await {
Ok(bytes) => { /* ... */ }
Err(err) => {
let source = std::error::Error::source(&err).unwrap();
if source.is::<http_body_util::LengthLimitError>() {
return (StatusCode::PAYLOAD_TOO_LARGE, "body too large").into_response();
}
// 其他错误
}
}这让 handler 能区分"body 过大"和"其他 body 错误"——分别返 413 和 500。
Limited 和 DefaultBodyLimit
第 6 章讨论过 DefaultBodyLimit——请求 body 的默认大小限制(默认 2MB)。DefaultBodyLimit 是 Tower Layer——给所有经过的请求加一个 Limited wrap。to_bytes 的 limit 参数和 DefaultBodyLimit 独立——在 to_bytes 调用前 body 已经被 Layer wrap 过一次。两层 limit 都生效——取更严格的那个。
http_body::Body trait 内部
axum::Body 的底层是 http_body::Body——HTTP body 的标准 trait(第三方 crate http-body):
rust
// 等价于 http_body crate 的定义
pub trait Body {
type Data: Buf;
type Error;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>>;
fn is_end_stream(&self) -> bool { false }
fn size_hint(&self) -> SizeHint { SizeHint::default() }
}Frame:数据帧或 trailer
Frame<T> 是 http_body 的基本单元——可以是 数据帧(一块 bytes)或 trailer(HTTP/2 的尾部 header):
rust
enum FrameInner<T> {
Data(T),
Trailers(HeaderMap),
}Data frame:body 的实际字节。HTTP/1.1 的 chunked encoding 每个 chunk 都是一个 data frame;HTTP/2 的 DATA frame 也是。
Trailer frame:HTTP/2 特有——body 结束后额外发一批 header(比如 gRPC 的 status code)。
http_body 统一两种形态——用户 poll 同一个方法、根据 Frame 类型分发。这个设计让 HTTP/1.1 和 HTTP/2 的 body 模型统一——上层代码不用分别处理。
axum::BodyDataStream 主动丢弃 trailer frame——因为大多数 axum handler 不关心 trailers。想要 trailer 的场景(比如 gRPC)要用 http_body_util::BodyStream。
SizeHint:body 大小的预测
http_body::SizeHint 是 body 大小的(可能粗糙的)预估——lower: u64(最少多少字节)和 upper: Option<u64>(最多多少字节,None 表示未知)。
用途:
- Content-Length header:如果 upper == lower、hyper 能自动加
Content-Length - 流量预估:客户端库(比如 reqwest)用 size_hint 判断是否能 stream
- 内存预分配:
to_bytes的 Collected 根据 size_hint 预先 alloc
axum 的 Full<Bytes> body 返回精确的 size_hint(等于 bytes 长度)——让响应自动带 Content-Length。StreamBody 返回默认的 SizeHint::default()(lower=0, upper=None)——因为 stream 的大小未知——响应用 chunked encoding 而不是 Content-Length。
is_end_stream:body 已经结束吗
is_end_stream(&self)——返回 true 表示"确定下一次 poll_frame 返 None"。用途是让 hyper 不 poll 已知结束的 body——小优化。
Empty 的 is_end_stream 返 true——零次 poll 就知道结束。Full 在发完内容后返 true。自定义 body 可以智能返回——精确实现让 hyper 行为更高效。
流式 vs 缓冲的具体差异
Body 既能缓冲也能流式——两种形态的性能和语义差异:
| 维度 | 缓冲(Full / Vec<u8>) | 流式(StreamBody) |
|---|---|---|
| 内存占用 | O(body size) | O(chunk size) |
| 首字节延迟 | 等到 body 构造完 | 第一个 chunk 就绪就开始发 |
| 总耗时 | = 构造 + 传输 | = 构造和传输并行 |
| Content-Length | 精确 | None(chunked) |
| 适用场景 | 小 body(几 KB 到几 MB) | 大 body(文件、日志流) |
| 典型类型 | Json、Html 响应 | SSE、下载、proxy |
内存优势:对 1GB 文件下载,流式版本 peak 内存只是一个 chunk(几 KB 到几 MB)——缓冲版本需要 1GB。
延迟优势:流式可以"边生成边发送"——客户端收到第一字节的时间减去服务端开始处理的时间更短(time-to-first-byte)。对 LLM streaming 这类 user-facing 场景尤其重要。
为什么默认缓冲:大多数 API 响应小(几 KB 的 JSON),缓冲更简单且有 Content-Length。只有明确是"大 body" 或 "需要实时推送"才用流式。
实战:三种流式 body 构造
实战一:文件下载(tokio File → Body)
rust
use tokio::fs::File;
use tokio_util::io::ReaderStream;
use axum::body::Body;
async fn download(Path(name): Path<String>) -> Result<Response, AppError> {
let file = File::open(format!("/data/{}", sanitize(&name))).await?;
let stream = ReaderStream::new(file);
let body = Body::from_stream(stream);
Ok((
[
("content-type", "application/octet-stream"),
("content-disposition", &format!("attachment; filename=\"{name}\"")),
],
body,
).into_response())
}核心链条:
File::open:tokio 的异步文件句柄——实现AsyncReadReaderStream::new(file):把AsyncRead转成Stream<Item = io::Result<Bytes>>——tokio-util提供Body::from_stream(stream):包装成 axum body
hyper 每次 poll_frame 时从 stream 拉一块,stream 从 file 读一块——完整的流式链。1GB 文件下载的 peak 内存只有 ReaderStream 的 buffer(默认几 KB)。
实战二:proxy 转发(upstream body → downstream body)
rust
async fn proxy(request: Request) -> Result<Response, AppError> {
let client = reqwest::Client::new();
let upstream_url = build_upstream_url(&request);
// 转发请求到 upstream——body 流式传过去
let (parts, body) = request.into_parts();
let upstream_body = reqwest::Body::wrap_stream(body.into_data_stream());
let resp = client
.request(parts.method, &upstream_url)
.headers(parts.headers)
.body(upstream_body)
.send()
.await?;
// 转发 response——body 流式回来
let status = resp.status();
let headers = resp.headers().clone();
let downstream_body = Body::from_stream(resp.bytes_stream());
Ok((status, headers, downstream_body).into_response())
}两次 Stream ↔ Body 转换:
- 入站:axum Body → data stream → reqwest Body
- 出站:reqwest bytes_stream → axum Body from_stream
整条 proxy 路径完全流式——两边都不缓冲。1GB 文件经过 proxy 只占几 MB 内存(两边的 chunk buffer)。
实战三:定时事件源(interval → SSE → Body)
rust
use std::time::Duration;
use tokio_stream::wrappers::IntervalStream;
async fn heartbeat_stream() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let interval = tokio::time::interval(Duration::from_secs(1));
let stream = IntervalStream::new(interval).map(|_| {
Ok(Event::default()
.event("heartbeat")
.data(chrono::Utc::now().to_rfc3339()))
});
Sse::new(stream).keep_alive(KeepAlive::default())
}Stream 来源是 IntervalStream (tokio-stream crate)——每秒 tick 一次。map 转成 Event。Sse::new 把 Event stream 包装成 SSE 格式的 Body。
这展示了 axum 流式生态的组合力:任何 Stream 都能作为 Body 源——定时、broadcast channel、数据库游标、LLM token 流——只要能产生 Bytes(或能转成 Bytes 的事件类型)。
实战四:LLM token 流式输出
LLM 应用最经典的流式场景——模型一个 token 一个 token 输出、前端一个一个展示:
rust
use axum::body::Body;
use futures::StreamExt;
async fn chat(Json(req): Json<ChatRequest>) -> Result<Response, AppError> {
let llm_stream = llm_client.stream_completion(req.messages).await?;
// llm_stream: Stream<Item = Result<Token, LlmError>>
let sse_stream = llm_stream.map(|token_result| {
let data = match token_result {
Ok(token) => format!("data: {}\n\n", serde_json::to_string(&token).unwrap()),
Err(e) => format!("event: error\ndata: {}\n\n", e),
};
Ok::<_, std::convert::Infallible>(data.into_bytes())
});
Ok((
[
("content-type", "text/event-stream"),
("cache-control", "no-cache"),
("x-accel-buffering", "no"), // 禁用 Nginx buffering
],
Body::from_stream(sse_stream),
).into_response())
}几个生产细节:
一、x-accel-buffering: no:Nginx 默认对响应做 proxy buffering——SSE 流会被 Nginx 缓冲等完再一次性发给客户端——失去流式意义。这个 header 告诉 Nginx 不要缓冲。Cloudflare 也识别这个 header。
二、直接用 Body::from_stream 而不是 Sse::new:第 10 章的 Sse<S> helper 专门做 SSE 语义(Event + KeepAlive)——但 LLM 流通常自己格式化(OpenAI 格式 data: {...}\n\n),Body::from_stream 更直接。
三、Error 的 Infallible:stream 内部 map 已经把 llm error 转成 SSE event: error 字节——再无其他错误。用 Infallible 告诉编译器这个 stream 永不真正失败。
四、token JSON 格式:和 OpenAI API 的 SSE 格式兼容——方便客户端 SDK(比如 openai 的 JS 客户端)直接复用。
这个模式是每个 LLM 应用的基础——从 stream 来源(模型 backend)到 stream 响应(前端 SSE)的直接链。流式性让用户感觉到"模型在工作"——首 token 延迟比等完整响应好太多。
实战五:chunked upload 与部分下载
大文件上传时用 multipart/form-data(第 8 章讨论过)是一种选——另一种是chunked transfer encoding——客户端分块发 body,axum 流式接收:
rust
async fn upload(request: Request) -> Result<Response, AppError> {
let (_, body) = request.into_parts();
let mut stream = body.into_data_stream();
let path = format!("/uploads/{}", uuid::Uuid::new_v4());
let mut file = tokio::fs::File::create(&path).await?;
let mut total = 0usize;
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
total += chunk.len();
if total > 100 * 1024 * 1024 { // 100MB 上限
let _ = tokio::fs::remove_file(&path).await;
return Err(AppError::PayloadTooLarge);
}
file.write_all(&chunk).await?;
}
file.flush().await?;
Ok(Json(json!({"path": path, "size": total})).into_response())
}边读边写——内存只占单个 chunk 的大小。100MB 文件的 peak 内存几 KB(tokio chunk buffer)+ file OS cache——可控。
对应的断点续传(客户端用 Range header 请求部分):
rust
async fn download_with_range(
Path(name): Path<String>,
headers: HeaderMap,
) -> Result<Response, AppError> {
let path = format!("/files/{}", sanitize(&name));
let file = File::open(&path).await?;
let metadata = file.metadata().await?;
let size = metadata.len();
// parse Range: bytes=X-Y
let range = headers.get("range").and_then(|v| parse_range(v.to_str().ok()?, size));
match range {
Some((start, end)) => {
let mut file = file;
file.seek(std::io::SeekFrom::Start(start)).await?;
let limited = file.take(end - start + 1); // 只读部分
let stream = ReaderStream::new(limited);
Ok((
StatusCode::PARTIAL_CONTENT,
[
("content-range", format!("bytes {}-{}/{}", start, end, size)),
("content-length", (end - start + 1).to_string()),
],
Body::from_stream(stream),
).into_response())
}
None => Ok((
[("content-length", size.to_string())],
Body::from_stream(ReaderStream::new(file)),
).into_response()),
}
}客户端挂机重启时用 Range header 续传——节省网络。这对大视频流或大文件下载场景必要。
CompressionLayer 与 body 的互动
tower-http::compression::CompressionLayer 自动给响应 body 压缩(gzip / brotli / deflate)——透明、用户不用改 handler:
rust
let app = Router::new()
.route("/", get(handler))
.layer(CompressionLayer::new());CompressionLayer 内部的工作:
- 检查客户端的
Accept-Encodingheader - 包装响应 body——把每个 chunk 经过压缩 encoder
- 设
Content-Encoding: gzip、删除 Content-Length(压缩后长度变了)
和 axum::Body 的互动:body 本身不参与压缩——CompressionLayer 用 body 的 poll_frame 读原始 chunk、压缩、写进它自己的 stream body。对 axum::Body 来说完全透明——它不关心数据是否被压缩。
这层解耦是 http_body trait 设计的价值——body 只管"产出字节"、压缩只管"变换字节"——各自独立、组合工作。想自己写 encryption layer、encoding adapter 都能按相同思路做。
Frame 的 trailer 细节
HTTP/2 的 trailer 是body 之后的额外 header——gRPC 用它传 status code、metadata。轮廓:
text
HTTP/2 DATA frame (body chunk)
HTTP/2 DATA frame (body chunk)
...
HTTP/2 HEADERS frame (trailer, end_stream=true)axum 的 Body 能产出 trailer——但 handler 的典型用法不涉及:
rust
use http_body::{Body, Frame};
use http::HeaderMap;
struct MyBody { /* ... */ }
impl Body for MyBody {
// ...
fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Result<Frame<Bytes>, Error>>>
{
if self.done_sending_data() {
// 发一个 trailer
let mut trailers = HeaderMap::new();
trailers.insert("x-status", "ok".parse().unwrap());
Poll::Ready(Some(Ok(Frame::trailers(trailers))))
} else {
// 发 data
Poll::Ready(Some(Ok(Frame::data(self.next_chunk()))))
}
}
}自定义 Body 产出 Frame::trailers(...)——hyper 会在 HTTP/2 上正确编码。BodyDataStream 过滤掉 trailer——into_data_stream 的 user 不用看到。想要 trailer 用 http_body_util::BodyStream。
生产里 axum handler 几乎不产出 trailer——只有 gRPC(tonic)需要。tonic 在 axum 上的集成依赖这个能力——把 gRPC status 通过 trailer 传出去。
HttpBody 和 axum Body 的互操作
axum::body::HttpBody 直接 re-export http_body::Body——axum 用户不需要单独 import http-body crate。但底层类型互相兼容:
rust
use axum::body::{Body, HttpBody};
// 任何 http_body::Body 都能变成 axum Body
fn from_any_body<B>(b: B) -> Body
where B: HttpBody<Data = Bytes> + Send + 'static, B::Error: Into<BoxError>,
{
Body::new(b)
}
// 反过来——axum Body 本身实现了 HttpBody
fn body_is_http_body(body: Body) -> impl HttpBody<Data = Bytes> {
body // axum::Body 实现了 http_body::Body
}这种"双向兼容"让 axum 能接收任何 http_body::Body 实现——hyper 的 Incoming、tonic 的 gRPC body、自定义 body 都能塞进 Body::new。反过来 axum Body 也能作为 http_body 交给下游(hyper、reqwest)。
深入:Body 的 pin_project 结构
Body 本身是一个 newtype:
rust
pub struct Body(BoxBody);没有 pin_project——UnsyncBoxBody 内部处理 pin。但 axum 用 Pin::new(&mut self.0) 直接 pin Box 的 inner(body.rs:113-134):
rust
impl http_body::Body for Body {
type Data = Bytes;
type Error = Error;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
Pin::new(&mut self.0).poll_frame(cx)
}
}BoxBody 内部是 Pin<Box<dyn Body>>——pin 已经通过 Box 实现了。Pin::new(&mut self.0) 直接拿 Pin<&mut BoxBody>——因为 BoxBody 是 Unpin(它内部的 Pin<Box> 是 Unpin)。这让代码不需要 pin_project——简洁。
同样的技巧用在 BodyDataStream:内部的 Body 是 Unpin、所以外层不需要 pin_project。只有 StreamBody 的 Stream(可能 !Unpin)需要 pin_project。
SizeHint 深入
SizeHint 是 body 对"我有多少字节"的预估——http_body::SizeHint 有 lower: u64 和 upper: Option<u64>。几种典型情况:
| Body 类型 | lower | upper | 含义 |
|---|---|---|---|
Empty | 0 | Some(0) | 精确空 |
Full<Bytes> | n | Some(n) | 精确 n 字节 |
StreamBody<S> | 0 | None | 未知——任意大 |
Limited<B> | inner.lower | min(inner.upper, limit) | 受限上界 |
| 自定义 counting body | n | None | 至少 n 字节 |
SizeHint 影响:
HTTP/1.1 的 Content-Length 或 chunked:精确 size 用 Content-Length、不精确用 chunked transfer encoding。chunked 每块前加长度字节、略有开销;Content-Length 更高效但要求预知。
预分配优化:to_bytes 读完整 body 前会根据 upper hint 预 alloc 一块 BytesMut——避免多次 realloc。SizeHint 准确能省 alloc 次数。
内存压力估计:客户端库可以用 upper 判断是否值得缓冲——太大的响应选择流式处理而非 to_bytes。
自己写 Body impl 时,尽量提供准确的 SizeHint——让 hyper 和上层代码做出更好的 I/O 决策。
Body 的 FAQ
Q:handler 里先 to_bytes 再手动 parse JSON 对不对?
不对——应该用 Json<T> 提取器。to_bytes + 手动 parse 失去了 Json 提取器的 Content-Type 校验、错误分类(422 vs 400)、body limit 等。只有在"需要自定义行为"时才绕过提取器。
Q:Body::from_stream 的 stream 已 exhausted 后再 poll 会怎样?
poll_frame 返 None。如果再继续 poll,行为取决于 Stream 实现——大多数情况下持续返 None。不应该依赖这个——FusedStream 明确定义 exhausted 后的行为。
Q:Body 能 clone 吗?
不能——Body 的 UnsyncBoxBody 不是 Clone(body 可能是一次性的 stream)。想"body 复用"需要先 to_bytes 缓冲成 Bytes、然后 Bytes 能 clone(Arc 语义)、再各自 Body::from(bytes.clone())。
Q:Body 支持 Seek 吗?
不支持。http_body::Body 是 forward-only 的——只能按顺序 poll_frame。想要 seek(比如断点续传)要在 body 外层处理——比如文件层用 file.seek 再包 body。
Q:返回超大 body 会导致什么?
hyper 按 poll_frame 速率发——backpressure 自动工作。客户端慢、TCP buffer 满——hyper 停止 poll body——body 停止生成。内存不会爆。唯一需要注意是 body 的生成本身不要提前"产出一大堆待发"——应该按 poll 节奏生成。
Q:Body 在 HTTP/2 上有什么不同?
HTTP/2 的 body 经过 flow control——客户端宣告 window size、服务端发到上限就停。这在 hyper 层处理、对 axum::Body 完全透明。body 该怎么写就怎么写——HTTP/1.1 和 HTTP/2 行为一致。
body 在 axum 内部流转
HTTP 请求从 TCP 到 handler、响应从 handler 到 TCP 的完整 body 流转:
几个关键点:
- hyper::Incoming → axum::Body 在
serve/mod.rs:594的req.map(Body::new)——一次性类型统一 - handler 有两条路径:to_bytes 缓冲 + 流式 map
- hyper 根据 SizeHint 选编码:精确 size → Content-Length;未知 → chunked transfer encoding
整条路径保持类型一致(都是 axum::Body),但实际数据流可以是全缓冲或全流式——取决于 body 内部的具体实现。
Content-Length 与 Transfer-Encoding 的交互
body 在 HTTP wire 上有两种编码:
Content-Length:头里声明 body 长度——客户端读 N 字节就知道结束。要求提前知道长度。
Transfer-Encoding: chunked:body 按 chunk 发送——每 chunk 前是长度字节。不需要提前知道总长度。
axum body 的 SizeHint 决定用哪种——hyper 读 SizeHint:
- 精确(
upper == Some(n)且 lower == n)→ Content-Length: n - 不精确 → Transfer-Encoding: chunked
手动覆盖:
rust
// 强制 Content-Length(如果精确)
async fn explicit_length() -> impl IntoResponse {
let bytes: Bytes = generate_data().await;
let len = bytes.len();
([("content-length", len.to_string())], Body::from(bytes))
}
// 强制 chunked(流式)
async fn chunked() -> impl IntoResponse {
let stream = generate_stream();
Body::from_stream(stream) // SizeHint 是 None, hyper 自动用 chunked
}客户端偏好:现代 HTTP client 都支持 chunked——但有些极老系统(嵌入式、L4 proxy)偏好 Content-Length。实际看场景选。
HTTP/2 没有这个区分:HTTP/2 全部用 DATA frame——Content-Length 作为 header 传但不影响传输机制。上面的讨论主要是 HTTP/1.1。
Body 的生命周期
一个请求响应周期里 body 的状态变化:
关键 state transitions:
- Received → AxumBody:第 15 章讲的
req.map(Body::new)——一次类型转换 - HandlerRead → Buffered:
to_bytes+ 处理 bytes——常见简单路径 - HandlerRead → Streamed:
into_data_stream+ map/filter——高级流式路径 - ResponseBody → Sent:hyper 的 connection task poll_frame 一个一个 chunk 写 TCP
整个过程body 永远是一次消费——不能读完了回到 Received 重读。这条约束深植于 HTTP 协议本身(第 6 章深入讨论过)。
实战六:tar 流式打包
把多个文件打包成 tar 流式下载——不缓冲整个 tar 到内存:
rust
use axum::body::Body;
use futures::stream::{self, StreamExt};
use tokio::io::AsyncWriteExt;
async fn download_tar(Path(dir): Path<String>) -> Result<Response, AppError> {
let entries = list_files(&dir).await?;
// 构造 async stream: 对每个文件产出 tar header + content
let stream = async_stream::stream! {
let mut tar = TarWriter::new(Vec::new());
for entry in entries {
let path = entry.path;
let metadata = tokio::fs::metadata(&path).await?;
// 先 yield tar header
let header = tar.write_header(&path, metadata.len()).await?;
yield Ok(Bytes::from(header));
// 再 yield 文件内容
let mut file = tokio::fs::File::open(&path).await?;
let mut buf = vec![0u8; 64 * 1024];
loop {
let n = file.read(&mut buf).await?;
if n == 0 { break; }
yield Ok(Bytes::copy_from_slice(&buf[..n]));
}
}
// tar 结束 marker
yield Ok(Bytes::from(tar.finish()));
};
Ok((
[("content-type", "application/x-tar"),
("content-disposition", "attachment; filename=\"archive.tar\"")],
Body::from_stream(stream),
).into_response())
}核心:用 async_stream::stream! 宏构造一个 generator-like 的 stream——每次 yield 一块 bytes。文件内容按 64KB 块流式读、写——总内存占用恒定、几 GB 的 tar 都能流。
这种 pattern 对"按需聚合"的下载场景(zip / tar / 合并多个响应)很有用——所有都遵循 "generate → bytes chunk → yield → body" 的套路。
body 的 DoS 防线
body 处理是很多 DoS 攻击的目标——送大 body 耗尽内存、慢速 body 占用连接、分块 body 里藏非法数据。axum 的防御:
一、DefaultBodyLimit:默认 2MB 请求 body 限制。所有 Bytes::from_request、Json::from_request 等都经过这层——超过直接 413。
二、to_bytes 显式 limit:自己调 to_bytes 时第二参数要写合理值——不用 usize::MAX。
三、hyper 的 body read timeout:第 15 章提到的 header_read_timeout——防止 slowloris(送 body 很慢、拖住连接)。axum 默认启用。
四、request body limit layer (tower-http):全局限制 request body——比 DefaultBodyLimit 更 strict(不依赖提取器触发)。
五、Content-Length 验证:hyper 会校验 Content-Length 的 body 不超过头声明的大小。
这五层配合防御几乎所有 body-level DoS。生产的 checklist:
- 配合前置 LB(nginx / ALB)做第一层 body size 限制(通常几十 MB 硬上限)
- axum 层用 DefaultBodyLimit 或 RequestBodyLimitLayer 做第二层(按业务上限)
- handler 里 to_bytes 调用带合理 limit
- 监控 413 响应数——异常增加可能是攻击
body 的性能维度
Body 的各种形态有不同的性能 profile。几个常见场景的量化:
小 JSON 响应(几 KB):
- Json(data).into_response() → Body::from(bytes) →
Full<Bytes> - 开销:serde_json 序列化(几 µs 到几十 µs)、Body::from 零成本、响应传输几 ns
- bottleneck: serde_json
大文件下载(100 MB):
- Body::from_stream(ReaderStream::new(file))
- 开销:每次 poll 从 file 读一块(默认 buffer 8KB)、转成 Bytes、返回
- bottleneck: 磁盘 I/O,内存 = 单 chunk 大小
SSE 推送:
- Body::from_stream(event_stream)
- 开销:每次 event 一次 poll、格式化成 bytes、返回
- bottleneck: event 生成速率
proxy 转发:
request.body.into_data_stream()→ reqwest body → upstream- 开销:两次 data_stream ↔ body 转换(零拷贝)、网络 I/O
- bottleneck: 上游延迟
关键观察:body 本身的开销通常不是 bottleneck——要么是序列化、要么是 I/O。优化方向应该针对这些——Body 层只要"不乱加开销"就够。axum 的 body 设计确实做到了——UnsyncBoxBody 的 vtable 调用只有几 ns、from_stream 的包装零分配(除了 SyncWrapper 的栈空间)。
body 的零拷贝路径
Bytes 的核心特性是零拷贝共享——底层 Arc<[u8]> 或类似结构,clone 只是引用计数加一。Body 内部的 data frame 都是 Bytes——意味着:
- hyper 从网络收到字节进 Bytes buffer——不拷贝
- Bytes 经过提取器、handler、响应 body——只传引用
- hyper 把 Bytes 写到网络 socket——不拷贝
整条路径零拷贝。只有在用户代码里 .clone() 或 .to_vec() 时才真正复制字节。这让大 body 的处理 CPU 成本低——瓶颈在网络和业务逻辑、不在 byte shuffling。
proxy body 的精细控制
前面的 proxy 例子是"流式转发"——还有更精细的控制场景。
场景:过滤 / 修改 body
proxy 时想对 body 做修改(比如加请求签名、改 URL references)——需要读完修改再转发:
rust
async fn signed_proxy(request: Request) -> Result<Response, AppError> {
let (parts, body) = request.into_parts();
// 缓冲 body (受 limit)
let bytes = to_bytes(body, 10 * 1024 * 1024).await?;
// 加签名
let signature = hmac_sign(&bytes, &SIGNING_KEY);
let mut headers = parts.headers.clone();
headers.insert("x-signature", signature.parse().unwrap());
// 转发
let resp = client.post(upstream)
.headers(headers)
.body(bytes)
.send()
.await?;
// 响应流式回来 (这部分还是流式)
Ok(axum_response_from_reqwest(resp))
}这种 pattern 的 tradeoff:
- 收益:能对 body 做任何处理
- 成本:request body 全缓冲——1GB upload 要 1GB 内存
生产上 hybrid:对 small body(<1MB)缓冲再修改、对 large body(>1MB)流式不修改(或者用 streaming-capable 加密库边流边签)。具体策略看业务——axum 的 body API 足够灵活支持各种组合。
场景:response body 的条件修改
某些场景想根据 response 的内容决定是否透传——比如只 proxy 成功响应、失败替换为自定义错误页:
rust
async fn conditional_proxy(request: Request) -> Result<Response, AppError> {
let upstream_resp = client.send(request).await?;
let status = upstream_resp.status();
if status.is_success() {
// 成功——流式转发
let body = Body::from_stream(upstream_resp.bytes_stream());
Ok(Response::builder().status(status).body(body).unwrap())
} else {
// 失败——用自定义错误替换
let err_body = upstream_resp.text().await.unwrap_or_default();
tracing::warn!(status = %status, body = %err_body, "upstream error");
Ok(Response::builder()
.status(502)
.body(Body::from("upstream unavailable"))
.unwrap())
}
}upstream 成功路径流式、失败路径缓冲 error body 后生成新 response——按需选择策略。
Body 的 StreamBody 内部细节
StreamBody 的 pin_project + SyncWrapper 组合值得深入:
rust
pin_project! {
struct StreamBody<S> {
#[pin]
stream: SyncWrapper<S>,
}
}为什么用 SyncWrapper?因为底层 Stream S 可能不是 Sync——但 http_body::Body 用在 UnsyncBoxBody 里不强制 Sync。SyncWrapper 的作用是标注类型——让包了它的 struct 是 Sync——即便 inner S 不是。
SyncWrapper 是怎么做到安全的?通过 API 设计:
- 构造:
SyncWrapper::new(s)- 任何s都能包 - 访问:必须通过
&mut self才能拿到 inner&mut S——不能通过&self(单线程访问保证)
共享是对 &self 的——SyncWrapper 阻断这条路径——所以即便 S 非 Sync 也安全。代价是有 &mut self 的地方才能用——对 http_body 的 poll_frame(Pin<&mut self>) 签名完美兼容(&mut self)。
这是一种"用 API 限制换 trait bound 放松"的典型 Rust 技巧——牺牲一点灵活(必须 mut 访问)、获得范围更大的类型兼容性。
跨书关联:http_body 规范
http_body crate 是 Rust HTTP 生态的标准 body trait——hyper、axum、tonic、reqwest、actix-web(部分)都基于它。定义简洁但经过几次演进:
- http_body 0.x:两个方法——
poll_data和poll_trailers分别返回 data 和 trailer - http_body 1.0:合并成
poll_frame返回Frame<T>——统一接口。axum 0.7+ 用 1.0 版本
axum 紧跟 http_body 的最新版本——因为 hyper 1.0 也迁移到了这版。整个生态协同演进——但偶尔有版本兼容性问题(老项目用 http-body 0.4 会和新 crate 冲突)。
《Hyper 与 Tower:工业级 HTTP 栈》第 10 章专门讨论 http_body——从 trait 设计到常见实现(Full、Empty、Limited、Combinators)。读那一章后再看 axum 的 Body,会发现 axum 只是在 http_body 上做最薄的 wrapper——UnsyncBoxBody 类型擦除 + 几个便利 constructor。
测试 body 的常见 pattern
测试 handler 的响应 body 几种写法:
rust
use axum::body::to_bytes;
use tower::ServiceExt;
#[tokio::test]
async fn returns_expected_json() {
let app = Router::new().route("/data", get(data_handler));
let response = app
.oneshot(Request::builder().uri("/data").body(Body::empty()).unwrap())
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
// 把 body 转成 bytes
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
let parsed: MyData = serde_json::from_slice(&body).unwrap();
assert_eq!(parsed.field, expected_value);
}几个要点:
to_bytes(body, usize::MAX):测试里不怕 body 大、用 MAXserde_json::from_slice:bytes 直接 parse,省一次 String 转换oneshot:直接跑 Service 不启 TCP、更快
测试流式 body:
rust
#[tokio::test]
async fn streams_tokens() {
let app = Router::new().route("/stream", get(stream_handler));
let response = app.oneshot(Request::new(Body::empty())).await.unwrap();
let mut stream = response.into_body().into_data_stream();
// 逐块验证
let chunk1 = stream.next().await.unwrap().unwrap();
assert_eq!(&chunk1[..], b"event: start\n\n");
let chunk2 = stream.next().await.unwrap().unwrap();
assert!(chunk2.starts_with(b"data: "));
// 最终结束
assert!(stream.next().await.is_none());
}对 SSE / 分块响应,验证每个 chunk 符合预期——比 to_bytes 一次性更精细。
与其他 body 类型的 conversion
实际项目会涉及多种 body:
hyper::body::Incoming:hyper 服务端收到的 request body(从网络 parse 出来)reqwest::Body:reqwest 客户端用于发请求/接响应的 bodyhttp_body_util::Full<Bytes>:全缓冲http_body_util::Empty:空axum::body::Body:axum 用的
它们的 conversion:
rust
// hyper::Incoming → axum::Body
let axum_body = Body::new(incoming); // try_downcast 识别, 零成本
// Vec<u8> → axum::Body
let axum_body = Body::from(vec); // From impl, Full wrap
// Stream → axum::Body
let axum_body = Body::from_stream(stream); // StreamBody wrap
// axum::Body → reqwest::Body (for proxy)
let reqwest_body = reqwest::Body::wrap_stream(axum_body.into_data_stream());
// reqwest::Response → axum::Body
let axum_body = Body::from_stream(response.bytes_stream());每对转换的具体 API 都在 axum / reqwest / http_body_util 的文档里——规则都很直白。只要两边都遵守 http_body 的 trait、conversion 通常是一行。
http_body 的版本演进
http_body 作为整个 Rust HTTP 生态的基础 trait,经过几轮大改:
- http_body 0.3:最早期、API 简陋
- http_body 0.4:加了
SizeHint、poll_data/poll_trailers分开 - http_body 1.0:合并 poll_data/poll_trailers 成
poll_frame+Frame<T>枚举——当前稳定版
axum 的版本对应:
- axum 0.5/0.6:用 http_body 0.4
- axum 0.7+:用 http_body 1.0(和 hyper 1.0 同步)
这次重大升级(0.4 → 1.0)让 axum 0.7 有大量不兼容改动——老项目升级 axum 要同时更新 http-body 依赖。好处是整个生态用同一个版本、互操作不用 adapter。
版本演进的经验:HTTP 生态的底层 crate(hyper、http、http-body)升级会波及所有上层。axum 团队非常谨慎地跟进——通常 hyper 发 major 版本后几个月内 axum 跟上、打包完整的升级指南。
本章的几个心得
回头看本章内容可以总结几个关键 insight:
一、Body 是类型擦除的艺术:UnsyncBoxBody<Bytes, Error> 让一个具体类型承载多种 body 形态——用户 API 简单、生态兼容性好
二、流式和缓冲的选择是语义而非技术:axum 两种都支持、API 对称。选哪个看业务需求——小 body 缓冲简单、大 body 流式必要
三、pin 和 SyncWrapper 是 Rust 异步 HTTP 的底层术语:pin_project 处理 !Unpin future 的 safe access、SyncWrapper 处理 !Sync 类型的 across-thread 兼容。两者都是为了让更多的 Rust 类型能接入 HTTP body 接口。
四、零拷贝通过 Bytes 实现:整条 body 处理路径 Bytes 共享引用计数——不做实际字节拷贝。I/O 路径的 CPU 开销极低。
五、http_body 是生态级的 trait:axum 的 Body 是 http_body 的"友好封装"——不替代、不分叉、完全兼容。这让 axum 的 body 处理能和 tonic、reqwest、hyper 等其他 HTTP crate 无缝互操作。
和第 10 章 SSE 的回顾
第 10 章讲 SSE 时提到 SseBody<S> 是自定义 http_body::Body 实现。现在可以完整理解它:
rust
// 第 10 章 sse.rs:110-135 (简化)
pin_project! {
struct SseBody<S> {
#[pin]
event_stream: SyncWrapper<S>,
}
}
impl<S, E> HttpBody for SseBody<S>
where S: Stream<Item = Result<Event, E>>,
{
type Data = Bytes;
type Error = E;
fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Result<Frame<Self::Data>, Self::Error>>>
{
// poll stream, 每个 Event finalize 成 Bytes 再 Frame::data
}
}和本章讨论的 StreamBody 结构一模一样——都是 SyncWrapper<S> + pin_project。差异在 SseBody 要把 Event 编码成 SSE 帧格式(data: ...\n\n)、StreamBody 直接用 Bytes。同一套基础结构支持不同协议——是 http_body 设计的强大之处。
小结:Body 的两个维度
Body 在 axum 里承担两个维度的抽象:
一、类型擦除:UnsyncBoxBody<Bytes, Error> 隐藏具体 body 类型——用户只看到 Body,不用管是 Full、Empty、StreamBody 还是 Incoming。try_downcast 优化让重复 wrap 零成本。
二、流式/缓冲统一:同一个 Body 类型可以内部是缓冲(Full)或流式(StreamBody)——上层代码(hyper、handler)用 poll_frame 统一 API——SizeHint 告知具体形态。
两个维度合起来让 body 处理既类型简单(就一个 Body 类型)又功能灵活(能覆盖从空响应到 TB 级流式响应)。这是 axum 简洁 API 下的强大能力——关键都在 trait 设计和类型擦除的巧妙应用。
Body FAQ 补充
Q:能在 response 里同时返回 body 和 trailer 吗?
可以——自定义 Body 在 poll_frame 里先 yield 多个 data frame、最后 yield 一个 trailer frame。hyper 会在 HTTP/2 上正确发送 trailer。HTTP/1.1 的 trailer 支持有限、大多数场景不建议用。
Q:hyper 看到 upper: Some(n) 会严格 enforce n 字节吗?
不会——hyper 把它当 hint 用来设 Content-Length。如果 body 实际 yield 了超过 n 字节 data,hyper 会发现并 panic 或 truncate(具体行为版本依赖)。所以 body 实现要保证 SizeHint 准确或者 lower/upper 不精确。
Q:BodyDataStream 丢弃 trailer——有没有 stream 全帧类型?
http_body_util::BodyStream 保留所有 frame 类型——适合需要处理 trailer 的场景(比如 gRPC 实现)。axum 的 BodyDataStream 是"简化版"——大多数 HTTP handler 用它够。
Q:Full<Bytes> 和 Full<BytesMut> 哪个好?
都可以——Full 泛型到 T: Buf。Bytes 是 Arc-based 零拷贝、BytesMut 是可修改版本。构造 body 用 Bytes;内部还要修改用 BytesMut。最后 BytesMut::freeze() 变 Bytes 进 body。
Q:Body 能在多个 handler 间共享吗?
不能——Body 不是 Clone。每个 Body 是单次消费的。想"共享" body 数据要先 to_bytes 缓冲成 Bytes——Bytes 能 clone(引用计数)、各 handler 各自 Body::from(bytes.clone()) 独立消费。
Q:body 的 chunk 大小能控制吗?
由生成侧决定——ReaderStream::new(file) 默认 chunk 8KB、ReaderStream::with_capacity(file, 64 * 1024) 改成 64KB。大 chunk 减少 poll 次数、小 chunk 减少 per-chunk 延迟。根据网络条件权衡——几 KB 到几 MB 都合理。
Q:body 生成失败了怎么办?
Body::from_stream 的 Stream 返回 Err 会让 body 的 poll_frame 返 Err——hyper 看到 Err 会断连(TCP 关)。客户端看到 TCP reset 或 "incomplete message"。如果 body 生成可能失败——应该尽早检查(在 body 构造前)、失败返回 5xx、成功后再流式发送。一旦开始流式发送后就很难"优雅失败"了——HTTP 没有"响应开始后改主意"的机制。
Q:能从 stream 得到 size hint 吗?
不能自动——Stream trait 的 size_hint 是 (lower, upper) 元素数、不是字节数。需要自己的 body 实现手动计算并返回 SizeHint。比如对 File 可以先查 metadata 得到大小、然后 body 的 SizeHint 返回 (size, Some(size))——让响应自动带 Content-Length。
一个容易忽略的细节:body 的 Drop 行为
当 handler 早早 return(不读完 body)、或者客户端中途断开连接——body 被 drop。drop 会发生什么?
对缓冲 body(Full / Bytes):drop 只是释放内存——没副作用。
对流式 body(StreamBody<S>):drop 时内部 Stream 被 drop——Stream 内部的资源(File handle、tokio::sync receiver、数据库游标)随之释放。Rust 的 RAII 保证这层自动清理。
对来自 hyper 的 Incoming body:drop 通知 hyper "client 的后续字节我不读了"——hyper 继续从 TCP 丢弃(或按 HTTP/2 的 RST_STREAM 终止该 stream)——不会卡住连接。
这意味着用户不需要手动处理 "handler 没读完 body 怎么办"——Drop 完全兜底。只需要确保业务逻辑正确——不要让 body 被读一半再继续用(那会 panic 或 error)。
这条 Drop 语义也让第 10 章讲的 SSE 连接清理能工作——客户端断开时 body 被 drop、SSE 的 Stream 被 drop、它里面的 broadcast receiver / 数据库查询都随之释放。Rust 的所有权链让"资源清理"成为自动动作——不用写 try-finally 之类的显式清理。
几个工程级最佳实践
最后给几个生产级的 body 处理建议:
一、默认流式、特殊情况缓冲。对文件下载、SSE、proxy 等天然流式场景——不要用 to_bytes 缓冲。只有业务明确需要"整体访问"才缓冲(比如 HMAC 签名、JSON 反序列化)——并且总带合理 limit。
二、to_bytes 的 limit 按接口设置。不同 handler 的 body 合理上限不同:/api/chat 可能 16KB 够、/upload 可能 100MB。不要全局一刀切——在每个 handler 的 to_bytes 里写具体上限。
三、监控 body 相关指标:HTTP 413 响应数(超限)、body 读取失败数、body 平均大小——这些指标能发现 DDoS 尝试、upstream 异常、业务字段膨胀。
四、流式响应加超时:生产的流式 handler(SSE / download)要设上限 tokio::timeout——防止客户端挂起不消费耗 server 资源。30 秒 / 几分钟是常见上限,具体看场景。
五、测试覆盖两路径:handler 的单元测试至少覆盖"小 body 成功"和"超限失败"两条路径——真实客户端会发两种都——测试里别只测前者。
做到这五点,body 层基本不会成为 axum 服务的稳定性问题源。剩下的注意力可以放到业务逻辑、数据库、外部调用上——那些是真正的挑战。
下一章讲 State 管理与 FromRef 子状态——第 7 章简单介绍过 State 提取器、第 18 章会深入类型状态、FromRef 的实际架构价值、多 state 项目的组织模式。那一章把 axum 的"依赖注入"完整机制讲清楚。