Appearance
第8章 高级提取器:WebSocket、Multipart、ConnectInfo
第 7 章讲的四个提取器——Path/Query/State/Json——覆盖了大多数 HTTP API 场景。但 HTTP 之上还有几类"协议上的协议"或"运行时附着信息"需要 Axum 接住:双向帧流的 WebSocket、流式多部分表单、以及底层 TCP 连接的远端地址等。这一章把三个典型例子拆开,看 Axum 的提取器框架怎么把它们塞进 FromRequestParts / FromRequest 的同一模子里。
它们各自的关键特点:
三者在"消费 body"这条轴上的位置各不相同:WebSocket 完全不消费 body(升级握手是 HTTP/1.1 方式或 HTTP/2 CONNECT,body 留给后续升级协议);Multipart 消费 body 但只以"流"的方式,不缓冲;ConnectInfo 和 body 完全无关——它的数据来自 tokio 的连接监听阶段。这三种场景都被 axum 塞到了同一个提取器框架里,trait 没破例。
WebSocketUpgrade:握手在 parts 层完成
WebSocket 协议(RFC 6455 / RFC 8441)规定:HTTP/1.1 客户端发 GET + Upgrade: websocket + Connection: Upgrade + Sec-WebSocket-Key: <base64> + Sec-WebSocket-Version: 13;HTTP/2 客户端发 CONNECT + :protocol: websocket(扩展 CONNECT)。服务端校验这些头,如果都合法,返回 101 Switching Protocols 带 Sec-WebSocket-Accept: <SHA1-base64(key+guid)>;从此 TCP 连接不再跑 HTTP,变成裸的 WebSocket 帧流。
Axum 的 WebSocketUpgrade 提取器承担的是头验证部分,响应体的 101 返回和升级后的帧流处理都往后推迟。它实现 FromRequestParts——因为只需要看头部——这让它可以和其他提取器共存在 handler 参数列表。
from_request_parts:六种头校验
axum/src/extract/ws.rs:442-517 是 WebSocket 升级验证的全部逻辑:
rust
// axum/src/extract/ws.rs:442-517(节选,HTTP/1.1 分支)
impl<S> FromRequestParts<S> for WebSocketUpgrade<DefaultOnFailedUpgrade>
where S: Send + Sync,
{
type Rejection = WebSocketUpgradeRejection;
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
let sec_websocket_key = if parts.version <= Version::HTTP_11 {
if parts.method != Method::GET {
return Err(MethodNotGet.into());
}
if !header_contains(&parts.headers, &header::CONNECTION, "upgrade") {
return Err(InvalidConnectionHeader.into());
}
if !header_eq(&parts.headers, &header::UPGRADE, "websocket") {
return Err(InvalidUpgradeHeader.into());
}
Some(parts.headers.get(header::SEC_WEBSOCKET_KEY)
.ok_or(WebSocketKeyHeaderMissing)?.clone())
} else {
// HTTP/2 分支:CONNECT + :protocol=websocket
if parts.method != Method::CONNECT { return Err(MethodNotConnect.into()); }
// ... 检查 hyper::ext::Protocol 扩展头
None
};
if !header_eq(&parts.headers, &header::SEC_WEBSOCKET_VERSION, "13") {
return Err(InvalidWebSocketVersionHeader.into());
}
let on_upgrade = parts.extensions.remove::<hyper::upgrade::OnUpgrade>()
.ok_or(ConnectionNotUpgradable)?;
// ... 处理 Sec-WebSocket-Protocol 子协议列表
Ok(Self { /* ... */ })
}
}逐段拆解其中的工程细节:
1. HTTP/1.1 与 HTTP/2 分流:parts.version <= Version::HTTP_11 是分支条件。HTTP/1.1 走传统升级路径——GET + Upgrade: websocket;HTTP/2 走 RFC 8441 扩展 CONNECT——CONNECT + :protocol: websocket(在 hyper 里通过 hyper::ext::Protocol extension 暴露)。Axum 同时支持两条路径,code path 清晰分开
2. 头比较用了两种 helper(ws.rs:519-537):header_eq 做大小写不敏感的完整等值比较(用于 Upgrade: websocket、Sec-WebSocket-Version: 13 这类值固定的头);header_contains 做大小写不敏感的子串包含比较(用于 Connection: Upgrade, keep-alive 这种可能有多个 token 的头)。两种方式分别对应 HTTP 规范里不同头的语法规则
3. parts.extensions.remove::<hyper::upgrade::OnUpgrade>():这是 Axum 和 hyper 协作的关键。hyper 在 HTTP 解析时,如果识别到连接可以升级(看到了 Upgrade: 头并允许升级),会往请求的 extensions 里塞一个 OnUpgrade future——这个 future 在未来 await 时会返回一个可以双向读写的 hyper::upgrade::Upgraded。WebSocketUpgrade 从 extensions 里把它拿走(.remove 不是 .get)——后续的 on_upgrade 调用需要它,只能被一个提取器拿走一次
4. Sec-WebSocket-Protocol 子协议收集(ws.rs:497-506):允许客户端指定多个候选子协议(逗号分隔),服务端选一个。get_all 获取所有同名头、逐项 split 逗号、trim、收集。这种对"多值头"的处理在各种 HTTP 头里常见
校验失败后返回 WebSocketUpgradeRejection——它是一个 composite rejection,不同原因对应不同 HTTP 状态码(通常是 400 Bad Request 或 405 Method Not Allowed)。
on_upgrade:返回 101 + spawn 任务
from_request_parts 完成后,handler 拿到一个 WebSocketUpgrade。真正的协议升级动作是调 .on_upgrade(callback)——ws.rs:346-410:
rust
// axum/src/extract/ws.rs:346-410(节选)
pub fn on_upgrade<C, Fut>(self, callback: C) -> Response
where
C: FnOnce(WebSocket) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
F: OnFailedUpgrade,
{
let on_upgrade = self.on_upgrade;
let protocol = self.protocol.clone();
tokio::spawn(async move {
let upgraded = match on_upgrade.await { /* ... */ };
let upgraded = TokioIo::new(upgraded);
let socket = WebSocketStream::from_raw_socket(
upgraded, protocol::Role::Server, Some(config)
).await;
let socket = WebSocket { inner: socket, protocol };
callback(socket).await;
});
// 构造 101 响应
let mut response = if let Some(sec_websocket_key) = &self.sec_websocket_key {
Response::builder()
.status(StatusCode::SWITCHING_PROTOCOLS)
.header(header::CONNECTION, UPGRADE)
.header(header::UPGRADE, WEBSOCKET)
.header(
header::SEC_WEBSOCKET_ACCEPT,
sign(sec_websocket_key.as_bytes()), // SHA1 + base64
)
.body(Body::empty()).unwrap()
} else {
// HTTP/2: 2XX + 空 body
Response::new(Body::empty())
};
response
}关键工程动作两件:
一、tokio::spawn 一个独立任务跑升级后的握手和用户回调。hyper::upgrade::OnUpgrade 这个 future 必须在 Response 被发送之后才会 resolve——因为 hyper 需要先把 101 响应写出去,然后把底层 TCP 连接的控制权交给上层。如果在同一个 future 里先 await on_upgrade 再返回响应,两边互相等对方,死锁。spawn 一个后台任务是标准解法——handler 立即返回 101 响应,后台任务里 await OnUpgrade 拿到 Upgraded,用 tungstenite 封成 WebSocketStream,然后调用户回调
二、sign(key) 计算 Sec-WebSocket-Accept。RFC 6455 规定:Accept = base64(sha1(key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"))——这个固定 GUID 是协议规范定的。客户端收到响应后会自己算一遍,对比不上就认为是错误的服务端。签名不是为了安全(攻击者也能算),是为了防中间缓存误把 WebSocket 请求当普通 HTTP 请求缓存——GUID 让响应无法被假造
完整数据流:
WebSocket:Sink + Stream 的双向接口
ws.rs:543-546 定义的 WebSocket:
rust
pub struct WebSocket {
inner: WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>,
protocol: Option<HeaderValue>,
}两个字段:内部的 WebSocketStream(tokio-tungstenite 提供)和选中的子协议。WebSocket 实现了 futures::Stream(接收帧)和 futures::Sink<Message>(发送帧)——ws.rs:577-619。recv / send 方法是便利包装:
rust
// ws.rs:552-562
pub async fn recv(&mut self) -> Option<Result<Message, Error>> { self.next().await }
pub async fn send(&mut self, msg: Message) -> Result<(), Error> {
self.inner.send(msg.into_tungstenite()).await.map_err(Error::new)
}设计上不新增抽象,直接复用 futures 生态的 Stream / Sink——这让用户可以用 StreamExt / SinkExt 的所有组合子:socket.map(...)、socket.filter(...)、socket.forward(...)、split 分成独立的 sink 和 stream 等。Axum 不自己定义 WebSocket 事件循环,把这件事交给标准 async 生态。
WebSocket 的安全边界:Origin 检查
WebSocket 协议本身不强制 Origin 校验——服务端接受来自任何源的升级请求是默认行为。这是一个常见的 WebSocket 安全坑:"Cross-Origin WebSocket Hijacking"——恶意网站诱导用户浏览器发起对你服务的 WebSocket 升级,利用浏览器自动带的 Cookie 完成登录态利用。
Axum 本身不强制 Origin 检查——你必须自己校验。标准做法:
rust
use axum::http::HeaderValue;
async fn secure_ws(ws: WebSocketUpgrade, headers: HeaderMap) -> Result<Response, StatusCode> {
let origin = headers.get("origin")
.and_then(|h| h.to_str().ok())
.ok_or(StatusCode::FORBIDDEN)?;
if !["https://app.example.com", "https://admin.example.com"].contains(&origin) {
return Err(StatusCode::FORBIDDEN);
}
Ok(ws.on_upgrade(|socket| async move { /* ... */ }))
}注意 WebSocketUpgrade 的 from_request_parts 不校验 Origin——它只校验 Upgrade/Connection/Sec-WebSocket-* 这些协议必需头。业务级的 Origin / 认证令牌 / Cookie 合法性全部需要 handler 自己写。这是框架"刻意不越权"的体现——安全策略因业务而异,框架默认放开、让应用侧决定。
同理,WebSocket 的认证也不能走浏览器自动 Cookie——最佳实践是客户端在升级 URL 里带 token (wss://host/ws?token=xxx),或者走 Sec-WebSocket-Protocol 头塞 token。前者简单但 token 会出现在日志里;后者更规范但客户端代码需要支持设置 protocol。哪种都是业务选择,Axum 不替你决定。
Message 类型与帧分片
tokio-tungstenite 定义的 Message enum(Axum 重新导出)有几种 variant:
| Variant | 用途 | 帧类型 |
|---|---|---|
Text(Utf8Bytes) | UTF-8 文本 | data frame, opcode=0x1 |
Binary(Bytes) | 任意字节 | data frame, opcode=0x2 |
Ping(Bytes) | 心跳 ping | control frame, opcode=0x9 |
Pong(Bytes) | 心跳响应 | control frame, opcode=0xA |
Close(Option<CloseFrame>) | 关闭连接 | control frame, opcode=0x8 |
Text 的 payload 是 Utf8Bytes——ws.rs:621-640 定义的一个类型,它是 Bytes + UTF-8 保证。为什么不直接用 String?因为 String 必须是堆上 Vec<u8>——每次从网络收到文本帧都要拷贝到新的 String 分配。Utf8Bytes 内部是 Bytes,支持零拷贝共享(Bytes 是共享的引用计数字节缓冲)。在高频 WebSocket 消息流下这个差异会显著——几千消息每秒级别时,节省的分配能让 p99 从毫秒降到微秒。
WebSocket 帧在协议层有"分片"(fragmentation)——大消息可以拆成多个 Continuation 帧。tokio-tungstenite 默认把分片帧在内部合并成一个完整的 Message,用户只看到完整消息。你可以配置 WebSocketConfig 关闭自动合并、或者设置最大帧大小——ws.rs:156-300 的 WebSocketUpgrade::max_frame_size / max_message_size / accept_unmasked_frames 等方法调整。默认配置已经合适大多数场景。
Ping/Pong 的用途不是应用层心跳——是 WebSocket 协议层的 keep-alive。RFC 6455 规定:对方的 Ping 必须被回一个 Pong(payload 相同)。tokio-tungstenite 自动处理 Ping→Pong 的回复,用户代码通常不需要关心——除非你想做应用层心跳逻辑(比如超时检测 "30 秒没收到任何消息就断"),那时可以主动调 socket.send(Message::Ping(...)) 并观察是否收到 Pong。
WebSocket 典型 handler 模式
把三件事拼起来——升级、发消息、收消息。一个最小 echo 服务:
rust
use axum::extract::WebSocketUpgrade;
use axum::extract::ws::Message;
use axum::response::Response;
async fn echo(ws: WebSocketUpgrade) -> Response {
ws.on_upgrade(|mut socket| async move {
while let Some(Ok(msg)) = socket.recv().await {
match msg {
Message::Text(t) => {
let _ = socket.send(Message::Text(t)).await;
}
Message::Binary(b) => {
let _ = socket.send(Message::Binary(b)).await;
}
Message::Close(_) => break,
_ => {} // 忽略 Ping/Pong(自动处理)
}
}
})
}广播场景:多客户端订阅、有人发消息时全员收到。用 tokio::sync::broadcast 配合:
rust
use tokio::sync::broadcast;
use futures::{SinkExt, StreamExt};
#[derive(Clone)]
struct AppState {
tx: broadcast::Sender<String>,
}
async fn chat(ws: WebSocketUpgrade, State(state): State<AppState>) -> Response {
ws.on_upgrade(|socket| async move {
let (mut sink, mut stream) = socket.split(); // 分成 sink / stream
let mut rx = state.tx.subscribe();
// 下行:从广播 channel 往 client 发
let send_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
if sink.send(Message::Text(msg.into())).await.is_err() { break; }
}
});
// 上行:从 client 收到消息广播出去
let recv_task = tokio::spawn({
let tx = state.tx.clone();
async move {
while let Some(Ok(Message::Text(t))) = stream.next().await {
let _ = tx.send(t.to_string());
}
}
});
tokio::select! {
_ = send_task => (),
_ = recv_task => (),
}
})
}关键点:
socket.split()分 sink / stream:futures::StreamExt::split要求Stream + Sink双实现——Axum 的WebSocket正好满足。拆开后两个独立任务一个收一个发,互不阻塞tokio::select!:任一边断开时整个连接终止——客户端关闭时 recv_task 先返回,select!立即触发,send_task 被 drop(底层 future 取消)broadcast::Sender的多生产多消费:任何客户端收到的消息tx.send会进入所有 subscriber——包括自己(所以上面的代码自己会收到自己发的消息;生产中可能想过滤)
这种"spawn 两个方向任务 + select 合并"的模式是 axum WebSocket handler 的范本。第 15 章讲 Serve 的优雅关闭时还会看到它如何和 shutdown 信号配合。
消息格式选择:JSON、MessagePack 还是自定义二进制
WebSocket 只传递字节帧(Text 或 Binary)——帧内内容的序列化格式是应用自己的决策。三种主流选择:
JSON:最简单、最兼容——浏览器里 JSON.stringify / JSON.parse 无依赖;调试方便,抓包就能看内容。但 UTF-8 字符串、字段名重复、数字转字符串等都增加开销。适合低频消息、高调试需求、异构客户端场景
MessagePack / CBOR:二进制 serde 格式——比 JSON 小 30-50%、快 2-3×。但需要客户端支持(浏览器里得用 msgpack-lite、cbor-x 这类库)。适合消息频繁、带宽敏感但仍需 schema 灵活的场景
自定义二进制(bincode、手写 struct-pack):最紧凑、最快,但是 schema 硬编码——版本演进复杂。适合内部服务间 WebSocket、固定协议、需要极致性能的场景
Axum 不为任何一种格式做特殊支持——Message::Text(...) / Message::Binary(...) 里装什么字节是应用自己决定。但生产实践里建议:定义一个顶层 ClientMessage / ServerMessage enum,带 tag 字段(serde 的 #[serde(tag = "type")]),所有通信都走这个 enum 的序列化结果。这让协议演进可审计、类型化错误可检查,远优于散着 match json["type"] 的 handler。
WebSocket 错误处理的几条工程直觉
三个常见错误场景和处理建议:
客户端异常断开:网络掉线、客户端崩溃、socket.recv().await 返回 Some(Err(_)) 或 None。代码应该优雅退出——不要 unwrap、不要 panic、不要 log "unexpected close"(这是常态不是异常)。在广播场景下,广播 sender 的 drop 会自动触发所有 subscriber 收到 RecvError::Closed,清理流自然发生
慢消费者:某个客户端的上行 sink 在某次 .send() 时超时或失败——可能是 TCP buffer 满、客户端处理慢、网络拥塞。默认行为是卡住——send 会一直 await。生产里建议用 tokio::time::timeout 包裹 send:timeout(Duration::from_secs(30), sink.send(msg)),超时就断开这个连接,否则慢消费者会积压内存
协议违规:客户端发来非法 WebSocket 帧(不合法的 opcode、超大 frame、错误的 mask)——tokio-tungstenite 会返回 Error::Protocol。通常的响应是以合适的 close code 关闭(1002 Protocol error),然后 drop socket
这三类错误都不是 axum 层面的——axum 只在 WebSocketUpgrade 阶段做头校验,升级后的帧层错误全部通过 Message / Error 暴露给 handler。这保持了 axum 自己的代码简单——升级后的问题域移交给 tokio-tungstenite。
Multipart:流式解析的设计考虑
multipart/form-data 是 HTML form 上传文件时用的格式——RFC 7578 定义。请求体被 --<boundary> 分隔成多个字段,每个字段有自己的 Content-Disposition 和可选的 Content-Type:
http
POST /upload HTTP/1.1
Content-Type: multipart/form-data; boundary=---XYZ
-----XYZ
Content-Disposition: form-data; name="title"
My Photo
-----XYZ
Content-Disposition: form-data; name="photo"; filename="cat.jpg"
Content-Type: image/jpeg
<二进制字节>
-----XYZ--文件字段可能很大——几十 MB、几百 MB 都常见。把整个 body 缓冲到 Bytes 再解析是灾难,必须流式处理——边读网络字节边切字段、字段本身可以继续流式消费。这决定了 Multipart 的 API 形态和 Json 完全不同。
from_request:把 body 转 stream 包给 multer
axum/src/extract/multipart.rs:68-82:
rust
// axum/src/extract/multipart.rs:68-82
impl<S> FromRequest<S> for Multipart
where S: Send + Sync,
{
type Rejection = MultipartRejection;
async fn from_request(req: Request, _state: &S) -> Result<Self, Self::Rejection> {
let boundary = content_type_str(req.headers())
.and_then(|content_type| multer::parse_boundary(content_type).ok())
.ok_or(InvalidBoundary)?;
let stream = req.with_limited_body().into_body();
let multipart = multer::Multipart::new(stream.into_data_stream(), boundary);
Ok(Self { inner: multipart })
}
}三步:
1. 从 Content-Type 头解析 boundary。multipart/form-data; boundary=---XYZ 这样的头,multer::parse_boundary 从里面抠出 ---XYZ 字符串——用 multer crate 的解析器,axum 不重写
2. req.with_limited_body().into_body():把 body 套一层 body limit(默认 2MB,第 6 章讨论过的 DefaultBodyLimit)再拆出来。注意这里不把 body 缓冲到内存——into_body() 返回的是仍在流式状态的 body
3. multer::Multipart::new(stream.into_data_stream(), boundary):multer 拿到 body stream 和 boundary,构造一个 Multipart 对象。内部不 eagerly 读任何字节——解析是按需进行的,当用户调 next_field() 时才往前推进
Field 的生命周期强制约束
multipart.rs:106-124 的 next_field:
rust
// axum/src/extract/multipart.rs:106-124
impl Multipart {
pub async fn next_field(&mut self) -> Result<Option<Field<'_>>, MultipartError> {
let field = self.inner.next_field().await.map_err(MultipartError::from_multer)?;
if let Some(field) = field {
Ok(Some(Field {
inner: field,
_multipart: self,
}))
} else {
Ok(None)
}
}
}
pub struct Field<'a> {
inner: multer::Field<'static>,
// multer 运行时要求同时最多只有一个活的 Field, Axum 把这个要求编译期化
_multipart: &'a mut Multipart,
}Field<'a> 有一个 _multipart: &'a mut Multipart 字段——它不被读取,只用来借用外层 Multipart。这让类型系统强制:"同一时刻最多只有一个活的 Field"。想拿下一个 field,上一个必须先 drop——next_field 的签名 &mut self 要求独占访问,而第一个 Field 已经持有了 &mut self 借用,直到它 drop。
这个设计来自 multer crate 的底层约束——multer::Multipart 内部维护一个共享的字节流位置指针,两个 Field 同时活着会让指针解释冲突、产生错乱的字节切片。multer crate 本身用运行时检查(Err(Error::FieldBorrowed))来防止;Axum 用生命周期把这个检查提前到编译期,从此不可能出 runtime 错误。这是一种典型的"把运行时约束编译化"的工程模式——和第 6 章讲 body 消费单一性的路子一样。
Field:自身也是 Stream
multipart.rs:135-143:
rust
impl Stream for Field<'_> {
type Item = Result<Bytes, MultipartError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx).map_err(MultipartError::from_multer)
}
}Field 实现了 Stream<Item = Result<Bytes, _>>——可以流式消费单个字段的内容。一个 1 GB 的上传文件不需要缓冲,而是用 while let Some(chunk) = field.next().await { write_to_disk(chunk).await; } 按块处理。
对于小字段(文本),Field::bytes() 方法(对应 TryStreamExt::try_concat)会把所有块累积成一个 Bytes:
rust
async fn upload(mut mp: Multipart) {
while let Some(mut field) = mp.next_field().await.unwrap() {
let name = field.name().unwrap_or("(未命名)").to_string();
if let Some(_filename) = field.file_name() {
// 有 filename → 是文件字段,流式写盘
while let Some(chunk) = field.next().await {
let chunk = chunk.unwrap();
/* 写入存储 */
}
} else {
// 没 filename → 是文本字段,累积读
let data = field.bytes().await.unwrap();
println!("{} = {:?}", name, data);
}
}
}两种消费模式对应两种字段:文件字段流式、文本字段累积。选错了会出问题——把 1 GB 文件 .bytes() 掉会 OOM 或者被 body limit 拦截;把短文本按 stream 轮询则额外开销大。
multer 的工作方式:状态机驱动的流式解析
multer crate 是 multipart 解析的核心——Axum 完全委托它做真正的字节流处理。了解一下它的工作机制有助于把握 Multipart 的真实开销。
multer 内部维护一个简单的状态机:
关键挑战是"如何检测 boundary":body 是字节流,boundary 可能跨 chunk 边界出现。比如一个 chunk 结尾是 ...data\r\n--boun,下一个 chunk 开头是 dary——单独看任一 chunk 都认不出 boundary,必须前后拼起来。multer 的做法是缓冲"最后 (boundary.len()+2) 字节"作为 lookbehind,下次 poll 时加上新 chunk 再 scan。这种 lookbehind 是所有流式 multipart 解析器的标配技术。
boundary 冲突处理:如果上传文件的字节流里恰好包含和 boundary 相同的字节序列,解析会错乱。为了避免这个问题,HTTP/multipart 协议要求 boundary 足够随机(通常是 30+ 字节的随机字符串,客户端生成)。客户端保证了 boundary 不会出现在任何字段内容里——multer 只需按 boundary 字面扫,不用做复杂的冲突检测。生成不够随机的 boundary(比如 ----WebKitFormBoundary)是客户端 bug,multer 不负责兜底。
空文件/空字段:有些客户端上传"空文件"(用户没选文件就点提交),生成的 multipart 里会有一个 Content-Disposition 包含 filename 但 data 部分是空字节流。multer 正确处理——field.next().await 立即返回 None,不会卡住。handler 代码要判断 field.file_name().is_some() && field.has_data() 决定业务行为(通常空文件字段应当被忽略或报错)。
为什么不用 serde
Json/Query/Form 都走 serde,Multipart 不走。原因有三:
一、字段顺序不保证:multipart 的 field 可以任意顺序发送;结构体的字段有固定顺序。serde 的 Deserializer::deserialize_struct 假定字段按某个可预测的方式呈现(或者都在一个 map 里),multipart 的"流式出现"语义很难塞进这个模型——要么强制把全部 fields 缓冲到 map 再 serde,失去流式优势;要么 serde 支持"按名字乱序 visit",但这是大改标准协议
二、字段内容可能是大二进制:serde 的 visitor 要求类型可反序列化,但文件字段的"正确类型"是什么?Vec<u8>?&[u8]?io::Read?都不理想——前两个必须整体缓冲,后者不是 serde 支持的类型。Multipart 直接暴露 Field: Stream<Item = Bytes>,用户按自己需要决定怎么消费
三、typing 复杂度不匹配:文本字段和文件字段有质性差异,塞进同一个结构体类型容易设计错。直接暴露 while let Some(field) 循环让用户看到"流式顺序",显式区分文件和文本,反而更清晰
这是"不是所有提取都适合 serde"的典型场景。Axum 的提取器框架不强制 serde——trait 只关心"能从 Request 产出 Self",怎么解析由实现者自定。Multipart 是这种灵活性的充分利用。
Multipart 实战:文件上传 + 安全考虑
生产里写文件上传 handler 要兼顾几件事:
rust
use axum::extract::{Multipart, DefaultBodyLimit};
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
async fn upload(mut mp: Multipart) -> Result<Json<UploadResult>, AppError> {
let mut title = None;
let mut file_path = None;
while let Some(mut field) = mp.next_field().await? {
match field.name() {
Some("title") => {
title = Some(field.text().await?); // 短文本, 累积
}
Some("photo") => {
let fname = sanitize_filename(field.file_name().ok_or(AppError::NoFilename)?);
let path = format!("/var/uploads/{fname}");
let mut f = File::create(&path).await?;
let mut total = 0usize;
while let Some(chunk) = field.next().await {
let chunk = chunk?;
total += chunk.len();
if total > 50 * 1024 * 1024 { // 单字段 50MB 上限
return Err(AppError::TooLarge);
}
f.write_all(&chunk).await?;
}
file_path = Some(path);
}
_ => {} // 忽略未知字段
}
}
Ok(Json(UploadResult { title, file_path }))
}
// 路由
Router::new()
.route("/upload", post(upload))
.layer(DefaultBodyLimit::max(100 * 1024 * 1024)); // 总 body 100MB几点:
sanitize_filename:客户端给的 filename 可能是../../../etc/passwd或foo\0.jpg——必须过滤。推荐用sanitize-filenamecrate,或者直接 reject 含/、\、\0、..的 filenameDefaultBodyLimit::max:总 body 限制。Axum 默认 2MB 对文件上传远远不够,必须显式放大。但不要盲目设超大——先算清楚"最差情况每个用户能上传多大"- 单字段大小校验:循环里累积统计
total,超限主动返回错误。DefaultBodyLimit是 body 整体限制,不限单字段——文件上传场景通常还想限制"单个文件最大 50MB" - content-type 白名单:
field.content_type()返回客户端声明的 MIME type。如果应用只接受图片,应该校验image/jpeg/image/png等。注意这只是客户端声明——真实类型还要二次检查(比如读 magic bytes 确认是 JPEG) - 空间管理:
File::create后如果后续出错,文件留在磁盘上垃圾。drop guard模式或?后 cleanup——这是写流式上传的老坑
Multipart 不自动防范任何一项——它只暴露原始字段流。安全责任完全在用户。和 Json 反序列化自动校验类型相比,Multipart handler 要写更多防御代码。
ConnectInfo:来自底层连接的信息
客户端的 IP 地址不在 HTTP 请求里——它来自 TCP 层(TcpStream::peer_addr)。但 handler 可能想知道,比如限流、地理位置识别、访问日志。Axum 的 ConnectInfo<T> 解决这个问题,代价是改变 Router 的暴露方式。
数据流:into_make_service_with_connect_info
普通 Axum 应用是 axum::serve(listener, app) 或 axum::serve(listener, app.into_make_service())。想要 ConnectInfo 时,换成 into_make_service_with_connect_info::<SocketAddr>():
rust
let app = Router::new().route("/", get(handler));
axum::serve(
listener,
app.into_make_service_with_connect_info::<SocketAddr>(),
).await;
async fn handler(ConnectInfo(addr): ConnectInfo<SocketAddr>) {
println!("connected from {addr}");
}底层工作流在 axum/src/extract/connect_info.rs:99-118 的 Service impl:
rust
// axum/src/extract/connect_info.rs:99-118
impl<S, C, T> Service<T> for IntoMakeServiceWithConnectInfo<S, C>
where
S: Clone,
C: Connected<T>,
{
type Response = AddExtension<S, ConnectInfo<C>>;
type Error = Infallible;
// ...
fn call(&mut self, target: T) -> Self::Future {
let connect_info = ConnectInfo(C::connect_info(target));
let svc = Extension(connect_info).layer(self.svc.clone());
ResponseFuture::new(ready(Ok(svc)))
}
}IntoMakeServiceWithConnectInfo 是一个 MakeService——它的 call 接收一个连接 target(比如 IncomingStream),返回一个针对该连接的 Service。在 call 里:
C::connect_info(target):从 target 里提取连接信息(通过Connected<T>trait),包装成ConnectInfo<C>Extension(connect_info).layer(self.svc.clone()):给底层 service 加一个AddExtensionlayer,这个 layer 在每次请求处理前把ConnectInfo<C>塞进req.extensions- 返回这个包装后的 service:对该连接所有请求统一用它
关键:每个连接一个 Service 实例——MakeService::call 在每次新连接建立时被调用。所以 ConnectInfo<SocketAddr> 是"按连接注入"的,不是按请求。同一个连接的后续请求(HTTP/1.1 keep-alive 或 HTTP/2 多路复用)共享同一个 ConnectInfo。
Connected<T> trait:适配不同的监听器
connect_info.rs:74-77 的 Connected trait:
rust
pub trait Connected<T>: Clone + Send + Sync + 'static {
fn connect_info(stream: T) -> Self;
}T 是 stream 类型(比如 IncomingStream<'_, L> 或 SocketAddr)。connect_info 返回 Self——要放进 extensions 的连接信息。connect_info.rs:83-97 为 SocketAddr 实现了两种 Connected:
rust
// 从 IncomingStream 拿远端地址
impl<L> Connected<serve::IncomingStream<'_, L>> for SocketAddr
where L: serve::Listener<Addr = Self>,
{
fn connect_info(stream: serve::IncomingStream<'_, L>) -> Self {
*stream.remote_addr()
}
}
// 恒等变换,方便手动注入
impl Connected<Self> for SocketAddr {
fn connect_info(remote_addr: Self) -> Self { remote_addr }
}这个设计允许用户用自定义的连接类型——如果你有一个自定义 Listener 暴露出非 SocketAddr 的 peer info(Unix domain socket 的 PID / UID、TLS 握手的 SNI / cert 等),只要给自己的类型 impl Connected,就能走同一套 ConnectInfo 抽象。
FromRequestParts:从 extensions 取回
connect_info.rs:138-154:
rust
impl<S, T> FromRequestParts<S> for ConnectInfo<T>
where S: Send + Sync, T: Clone + Send + Sync + 'static,
{
type Rejection = <Extension<Self> as FromRequestParts<S>>::Rejection;
async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> {
match Extension::<Self>::from_request_parts(parts, state).await {
Ok(Extension(connect_info)) => Ok(connect_info),
Err(err) => match parts.extensions.get::<MockConnectInfo<T>>() {
Some(MockConnectInfo(connect_info)) => Ok(Self(connect_info.clone())),
None => Err(err),
},
}
}
}两步回退:
- 首选:
Extension<Self>::from_request_parts从 extensions 里取出之前AddExtension放进去的ConnectInfo - 备用:如果没找到,检查是否有
MockConnectInfo<T>(测试用 Layer),有就用它
MockConnectInfo 是为测试设计的——单元测试里没有真实 TCP 连接,但 handler 可能依赖 ConnectInfo。app.layer(MockConnectInfo(addr)) 在 Router 级别注入一个假的——connect_info.rs:158-196 文档里有完整示例。这种"生产路径 + 测试路径"双支持是 axum 对可测试性的有意投入。
自定义 Connected<T>:Unix socket 认证 / TLS SNI
Connected<T> 的真正价值在扩展。示例:Unix domain socket 下想获得对端进程的 pid / uid(UCred):
rust
use tokio::net::UnixStream;
use std::os::unix::net::UCred;
#[derive(Clone)]
struct UnixConnectInfo {
pid: Option<i32>,
uid: u32,
}
// 自定义 Listener 实现:略,要求 IncomingStream::remote_addr 返回 UnixStream
impl<L> Connected<serve::IncomingStream<'_, L>> for UnixConnectInfo
where
L: serve::Listener<Addr = UnixStream>, // 简化声明
{
fn connect_info(stream: serve::IncomingStream<'_, L>) -> Self {
let peer_cred = stream.remote_addr().peer_cred().ok();
Self {
pid: peer_cred.and_then(|c| c.pid()),
uid: peer_cred.map(|c| c.uid()).unwrap_or(0),
}
}
}
// handler 端
async fn who(ConnectInfo(info): ConnectInfo<UnixConnectInfo>) -> String {
format!("uid={}, pid={:?}", info.uid, info.pid)
}类似地,TLS server 可以把握手阶段拿到的 client certificate 或 SNI 塞进自定义 ConnectInfo<TlsInfo>。Axum 自身不内置 TLS——需要 axum-server 或 rustls 配合——但 Connected<T> 的抽象让 TLS 扩展crate能以一致的方式暴露 TLS 元信息给 handler,用户不需要学第二套 API。
这是"依赖倒置"的漂亮例子:Connected<T> 不关心 T 是什么,只要求 T 能从 listener 提供的 stream 产出自己。具体的 T 类型由用户或扩展 crate 定义——axum 的 SocketAddr impl 只是最常用的默认。
反向代理场景:X-Forwarded-For 与真实 IP
生产部署里客户端和 axum server 之间通常隔着反向代理(Nginx、ALB、CloudFlare)。这时 ConnectInfo<SocketAddr> 拿到的是代理的 IP,不是真实客户端 IP。客户端真实 IP 在代理转发请求时加进 HTTP 头里——常见的是 X-Forwarded-For(一串 IP,逗号分隔)或 Forwarded(RFC 7239 标准头)。
axum 本身不内置解析——你需要自己从 headers 读:
rust
use axum::http::HeaderMap;
use std::net::IpAddr;
fn extract_client_ip(headers: &HeaderMap, connect_info: SocketAddr) -> IpAddr {
// 优先用 X-Forwarded-For 的第一个(最靠近客户端的)
if let Some(xff) = headers.get("x-forwarded-for") {
if let Ok(xff) = xff.to_str() {
if let Some(first) = xff.split(',').next() {
if let Ok(ip) = first.trim().parse::<IpAddr>() {
return ip;
}
}
}
}
connect_info.ip() // fallback 到 TCP 层
}安全警告:只在可信代理后面用 X-Forwarded-For——任何人都可以伪造这个头。如果你的 server 直接暴露在 Internet 上,读 X-Forwarded-For 等于让客户端自称 IP,任何限流和日志都失效。正确做法:
- 在可信代理后面:解析
X-Forwarded-For,信任代理加的那部分 - 直接暴露:只用
ConnectInfo<SocketAddr>,无视 X-Forwarded-For - 混合环境(部分流量有代理部分没有):代码上显式配置"这个 Router 的请求全部来自代理" / "那个 Router 不经过代理"
把这个逻辑做成一个自定义提取器 ClientIp 能让 handler 代码更干净——axum-client-ip crate 就是这样的第三方实现。
性能开销:三种提取器的量级对比
| 提取器 | 提取阶段开销 | 运行期开销 | 主要成本来源 |
|---|---|---|---|
WebSocketUpgrade | ~1 µs | 几乎零 | 头比较 + SHA1 签名;真正通信成本在后续帧流处理 |
Multipart | ~1 µs | 取决于字段数 + 字节量 | boundary 解析、stream 迭代开销 |
ConnectInfo<T> | < 100 ns | 零 | 一次 extensions HashMap 查找 + T::clone() |
三个开销几乎都微不足道——WebSocket 的 SHA1 签名是对 24 字节的计算,现代 CPU 纳秒级;Multipart 的 boundary 扫描用 memmem-style 搜索也很快。真实瓶颈都在协议层面而不是提取器本身:
- WebSocket:单连接的每帧 RTT、消息的 serde 成本(如果你在 handler 里 serde_json 每个消息)、tokio 任务调度
- Multipart:网络上行带宽、磁盘写入速度——如果 handler 要把文件落盘,这才是瓶颈
- ConnectInfo:完全可以忽略——它就是一次 HashMap 读 + 一次 Arc clone 级别
这三个提取器的框架开销远低于它们对应业务的复杂度。优化方向永远是"对应业务怎么优化"——WebSocket 是消息协议设计(比如二进制 vs JSON),Multipart 是存储写入策略(流式写文件而非先缓冲),ConnectInfo 一般不需要优化。
与 Tokio / Hyper 的连接生命周期协作
三个提取器都对 HTTP/Tokio/hyper 的底层有不同程度的依赖,梳理一下这些依赖关系能帮助理解 axum 架构的分层。
WebSocket:依赖 hyper 的 hyper::upgrade::OnUpgrade 机制——第 5 章讲过 hyper 在识别出 Upgrade 头时会往 extensions 放 OnUpgrade future。Axum 从 extensions 取出它,在响应发出后 await 得到 Upgraded TCP stream,交给 tokio-tungstenite 封装成 WebSocket。整条链上 axum 只是"粘合剂"——hyper 提供协议级升级、tokio-tungstenite 提供 WebSocket 帧编解码、tokio 提供异步运行时。这是《Hyper 与 Tower:工业级 HTTP 栈》第 12 章讨论的"hyper 如何处理协议升级"的具体应用场景。
Multipart:依赖 hyper 的 Body trait 作为数据源——多亏 Body::poll_frame 是流式的,multer 可以按需拉取字节,不必等全部 body 到齐。这是《Hyper 与 Tower》第 10 章讨论 http-body crate 的 poll-based body 模型的直接收益——如果 body 是"整体返回"的模型,multipart 的流式优势无从谈起。
ConnectInfo:依赖 tokio 的 TCP listener 能 peer_addr()——这是 tokio::net::TcpStream 暴露的基本能力。IntoMakeServiceWithConnectInfo 在每次 accept 时调 peer_addr,塞进每条连接的 extension 层。这是《Tokio 源码深度解析》第 8 章讨论 TcpListener 的一个上层应用。
三个提取器各自踩着不同的底层抽象:协议层(hyper upgrade)、数据流层(http-body)、连接层(tokio TcpListener)。axum 的提取器框架能把这三种差别极大的机制都装进同一个 trait,是因为 trait 本身是最一般化的契约——只要求"能从 Parts/Request 产出 Self",至于数据从哪儿来、是不是流、是不是独立任务,全交给实现自行决定。
协议性质对比:WebSocket vs SSE vs long polling
实时双向通信有三种主流模式,选型会影响 handler 结构:
| 模式 | 协议 | axum 支持 | 特点 |
|---|---|---|---|
| WebSocket | RFC 6455 / RFC 8441 | WebSocketUpgrade | 全双工、二进制或文本、服务端可主动推送、升级后脱离 HTTP |
| SSE (Server-Sent Events) | EventStream | axum::response::sse::Sse | 服务端→客户端单向推送、纯文本、走 HTTP 长连接、浏览器原生支持 |
| Long polling | 普通 HTTP | 普通 handler | 客户端轮询、服务端挂起直到有数据或超时、简单但开销大 |
选型原则:
- 需要全双工 + 高频消息(游戏、实时协作编辑)→ WebSocket
- 只有服务端→客户端推送(通知、实时日志、股票行情)→ SSE(第 10 章会讲)
- 极简场景或无法 WebSocket 的环境(某些企业代理过滤 WebSocket)→ long polling
WebSocket 和 SSE 在 axum 里实现形态截然不同:WebSocket 是提取器(WebSocketUpgrade 接到 handler 后接管连接),SSE 是响应(handler 返回一个 Sse 对象,axum 把它编码成 text/event-stream)。这反映了两者的协议本质差异——WebSocket 是"HTTP 握手 + 独立协议",SSE 是"HTTP 响应 + 长流式 body"。axum 对两种形态的 API 设计精确反映了底层协议差异,没有硬塞进同一个抽象里。
小结:提取器框架的包容性
三个提取器呈现的机制看起来差异巨大:
| 提取器 | 消费 body | 数据来源 | 关键机制 |
|---|---|---|---|
WebSocketUpgrade | 否 | headers + extensions::<OnUpgrade> | 协议升级握手 + tokio::spawn |
Multipart | 是(流式) | body 被转为 byte stream | 流式解析 + Field 生命周期约束 |
ConnectInfo<T> | 否 | extensions(由 MakeService 注入) | 连接级注入 + Connected<T> trait |
但它们都没有突破 FromRequest / FromRequestParts 的接口。WebSocket 走 FromRequestParts(只读 parts),Multipart 走 FromRequest(消费 body),ConnectInfo 走 FromRequestParts(只读 extensions)。Rejection 都实现 IntoResponse,都能和其他提取器共存。这证明了提取器框架的一条重要性质:trait 本身不做假设,它只定义"从 Request / &mut Parts 能产出 Self"的最一般契约。具体实现可以走 serde、可以走流式解析、可以依赖外部注入的 extensions——框架无所谓。
从设计角度看,这三个提取器展示了提取器框架三种不同的扩展方式:
一、WebSocket 是"HTTP 协议级扩展"的提取。它依赖 hyper 提前往 extensions 塞了 OnUpgrade,而这个 OnUpgrade 是 HTTP 协议层的概念(HTTP/1.1 升级、HTTP/2 扩展 CONNECT)。Axum 把这层协议能力封装成"头校验提取器 + 延后的 on_upgrade 方法",保持 handler 签名干净。类似的扩展还有 Server-Sent Events——本章没细讲,但它的机制也是类似的:"handler 返回一个特殊类型的 Response,Axum 在响应层把 EventStream 变成流式 body"
二、Multipart 是"body 协议级扩展"。它不按常规反序列化模式解析 body,而是暴露底层 stream。这种"让用户拿到 stream 自己处理"的设计不是偷懒——是对文件上传这种场景的诚实回应。axum 完全可以做一个 TypedMultipart<T> 把字段按 serde 反序列化到结构体(axum-extra 里就有这样的第三方提取器),但基础 Multipart 保持 stream 接口,把选择权给用户
三、ConnectInfo 是"运行时上下文级扩展"。它的数据根本不在 HTTP 请求里,而是来自运行时(tokio 的 TCP accept)。Axum 通过"MakeService 在连接建立时注入 extensions"的机制,把非请求信息塞进提取器框架。这种模式可以推广到任何"每个连接有独特上下文"的场景——TLS 证书、Unix socket credentials、自定义 metadata
这三种扩展方式加上第 7 章讨论的标准 serde 提取器,几乎覆盖了所有"我想在 handler 里拿到 X"的合理需求。
三个提取器的工程反直觉点
本章讨论的三个提取器各自有一个"初看反直觉、想明白后觉得必然"的设计点,放在一起对比能看到 Axum 的一致思路。
WebSocket 的反直觉点:from_request_parts 并不真正升级连接,它只做头校验和 OnUpgrade future 的转移——真正的协议升级和 WebSocket 帧处理都延迟到 on_upgrade 方法里 tokio::spawn 的独立任务。这违反了"提取器提取完所有参数、handler 就能直接使用"的朴素直觉。但这是唯一合理的设计——响应必须在 handler 返回 Response 时同步发出,升级后的流式交互必须异步独立任务,两者不能塞进同一个 future
Multipart 的反直觉点:Field<'a> 有生命周期参数。朴素想法是"为什么 Field 不是 owned 类型?我想把它存到 Vec 里顺序处理"——你不能。一次只能有一个活的 Field,这是类型系统强制的。但这是唯一合理的设计——multipart 的字节流是一个共享游标,两个 Field 同时活着必然读取冲突
ConnectInfo 的反直觉点:ConnectInfo<T> 作为 handler 参数使用时,必须换 serve 入口到 into_make_service_with_connect_info::<T>()。普通 app.into_make_service() 下 ConnectInfo<SocketAddr> 在 handler 里会提取失败返回 500。这违反了"提取器是插拔的"的朴素期待。但这是唯一合理的设计——ConnectInfo 的数据来自连接建立阶段,必须在 MakeService 层注入,Router 内部看不到那一层
三个反直觉点共享同一个底层原因:Rust 类型系统把"物理约束"和"运行时不变量"强制表达出来。WebSocket 升级必须分两段(响应 + 流处理),Multipart 字节流必须独占使用,ConnectInfo 必须在 MakeService 层注入——这些都不是 Axum 设计者的偏好,而是底层协议 / 数据结构 / 运行时架构的真实约束。Axum 不隐藏这些约束,而是让用户在 API 层面看到它们。代价是初学时要多想一步,收益是一旦理解就不会再踩坑。
下一章开始我们从提取器(输入侧)转向响应(输出侧)——IntoResponse trait 是 handler 返回类型的统一抽象,它的设计同样经过深思。