Skip to content

第8章 Filter / MapRequest / Steer:请求路由与变换

8.1 另一种中间件

前面几章我们讲的中间件——Timeout、Retry、Buffer、LoadShed、Balance——都在"按规则控制请求的时序":什么时候放过、什么时候挂起、什么时候分发到哪个端点。本章我们看另一组中间件:它们不关心时序,关心请求本身

中间件干什么
Filter / AsyncFilter根据 predicate 决定"过不过",通过的才发给内层
MapRequestReq1 变成 Req2,然后发给内层
MapResponse / MapResult / MapErr对内层响应做变换
Steer多路分流——根据 predicate 选择发给哪个内层

这些中间件是"路由层"的构建块。Axum 的路由、Tonic 的方法分发、代理的 upstream 选择,本质上都是这些原语的组合。它们共享一个特征:只做纯粹的类型/值变换,不涉及异步等待、不涉及容量控制

这一章我们重点讲 FilterMapRequestSteer——三个最有代表性的。源码来自 tower 0.5.3tower/src/filter/tower/src/util/map_request.rstower/src/steer/

8.2 MapRequest:最纯粹的变换

从最简单的开始。MapRequest 就是一个 fn(Req1) -> Req2

rust
// tower/src/util/map_request.rs:6-10
pub struct MapRequest<S, F> {
    inner: S,
    f: F,
}

// 41-62
impl<S, F, R1, R2> Service<R1> for MapRequest<S, F>
where
    S: Service<R2>,
    F: FnMut(R1) -> R2,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = S::Future;

    #[inline]
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
        self.inner.poll_ready(cx)
    }

    #[inline]
    fn call(&mut self, request: R1) -> S::Future {
        self.inner.call((self.f)(request))
    }
}

整整 20 行代码就是全部。call 一句 self.inner.call((self.f)(request))——把请求通过函数 f 变一下,转给内层。

这段代码里有一个非常容易错过的魔法:Service<R1>Service<R2> 是不同的 trait implMapRequest 在泛型上声明它 impl 的是 Service<R1>(外部看到的请求类型),但内层 S impl 的是 Service<R2>(变换后的请求类型)。MapRequest 本身改变了 trait impl 面的 "Request 参数"

换句话说:你有一个 Service<GenericRequest> 想暴露成一个 Service<HttpRequest>,只需要把从 HttpRequestGenericRequest 的映射函数套上 MapRequest。从外面看,这个 service 变成了 Service<HttpRequest> 能处理的东西。这是类型级别的协议转换器

8.2.1 一个真实场景

在 Axum 里你会看到这样的代码:

rust
let app = Router::new()
    .route("/foo", get(handler))
    .layer(MapRequestLayer::new(|req: Request| {
        // 附加一个 X-Request-Id 头
        let mut req = req;
        req.headers_mut().insert(
            "x-request-id",
            HeaderValue::from_str(&Uuid::new_v4().to_string()).unwrap(),
        );
        req
    }));

这类"请求预处理"——打 tag、改 header、插 extension——用 MapRequest 就够了。不需要自己写一个新的 struct + Service impl,一个闭包解决。

8.2.2 MapResponse / MapErr / MapResult

Tower 对称地提供了三个响应侧的变换:

  • MapResponse<S, F>:对成功响应做 F: FnMut(Resp1) -> Resp2
  • MapErr<S, F>:对错误做 F: FnMut(Err1) -> Err2
  • MapResult<S, F>:对 Result<Resp, Err> 做统一变换——可以把错误转成成功,反之亦然。

三者的实现结构和 MapRequest 几乎一致——"call 返回一个包着原 future 的 adapter future"。我们不重复代码了——它们就是沿着同一个模板铺出去的。

8.2.3 为什么这些都只是"工具",不配有章节

读者可能好奇:这么基础的变换为什么 Tower 不把它们做成"built-in 方法"——就像 Iterator::map?答案是:已经是了

ServiceExt trait(tower/src/util/mod.rs)定义了一组便利方法:

rust
pub trait ServiceExt<Request>: tower_service::Service<Request> {
    fn map_request<F, R>(self, f: F) -> MapRequest<Self, F>
    where Self: Sized, F: FnMut(R) -> Request { ... }
    fn map_response<F, Res>(self, f: F) -> MapResponse<Self, F> { ... }
    fn map_err<F, Err>(self, f: F) -> MapErr<Self, F> { ... }
    // ...
}

有了 ServiceExt,你在业务代码里可以直接:

rust
let svc = my_service.map_request(|req| modify(req))
                    .map_response(|resp| enrich(resp))
                    .map_err(|e| MyError::from(e));

这就像 Iterator 上的 .map().filter().collect()——一个链式的数据流改写。Tower 的意图很明显:让 Service 链式编程感觉像 Iterator 一样自然。读过卷四《Serde 元编程》第 3 章里关于 Serializer 链式方法的讨论——这种组合式 API 在 Rust 生态是一个通用模式:trait 定义核心、trait 扩展方法定义惯用法

8.3 Filter:带 Predicate 的守门人

FilterMapRequest 进了一步:它不止变换,还能拒绝

rust
// tower/src/filter/mod.rs:45-48
pub struct Filter<T, U> {
    inner: T,
    predicate: U,
}

两个类型参数:T 是内层 service、U 是 predicate。

8.3.1 Predicate trait

rust
// tower/src/filter/predicate.rs:24-38
pub trait Predicate<Request> {
    type Request;
    fn check(&mut self, request: Request)
        -> Result<Self::Request, BoxError>;
}

这个 trait 有两个关联类型——不,只有一个(Request)。但它接收一个泛型 Request——这个泛型是 predicate 的输入类型;关联类型 Request 是 predicate 的输出类型

读这两个名字很容易混:

  • trait 的泛型参数 Request(比如 impl Predicate<RawHttpRequest>)——外部传进来的原始请求类型。
  • 关联类型 type Request——predicate 接受之后、传给内层 Service 的类型。

所以 Predicate 本质是:(Request) → Result<Self::Request, Error>——它既可能拒绝(Err),也可能变换(映射到不同类型)。

实际上,MapRequest 是 Filter 的特例:如果 predicate 永远不返回 Err,那 Filter 退化成一个 MapRequest。Tower 没把它们强行统一,因为在语义和 API 清晰度上区分它们更直观——MapRequest 是"我一定要改"、Filter 是"我可能拒绝"。

8.3.2 Predicate 是 blanket impl 覆盖的

rust
// tower/src/filter/predicate.rs:55-65
impl<F, T, R, E> Predicate<T> for F
where F: FnMut(T) -> Result<R, E>, E: Into<BoxError>,
{
    type Request = R;
    fn check(&mut self, request: T) -> Result<Self::Request, BoxError> {
        self(request).map_err(Into::into)
    }
}

任何形如 FnMut(T) -> Result<R, E> 的闭包或函数,都自动满足 Predicate trait。你写 filter 时不需要造一个 struct——直接闭包:

rust
let auth_filter = Filter::new(inner, |req: Request| {
    if req.headers().get("authorization").is_some() {
        Ok(req)
    } else {
        Err(BoxError::from("unauthorized"))
    }
});

看到了吗?auth 中间件就这么几行。"有 authorization header 就通过,没有就拒绝"——就是一个 Predicate。

8.3.3 Filter::call:把两种路径合成一个 future

rust
// tower/src/filter/mod.rs:114-119
fn call(&mut self, request: Request) -> Self::Future {
    ResponseFuture::new(match self.predicate.check(request) {
        Ok(request) => Either::Right(self.inner.call(request).err_into()),
        Err(e) => Either::Left(std::future::ready(Err(e))),
    })
}

Eitherfutures-util 的一个双路径 future——它要么 poll 左边要么 poll 右边:

  • Ok:predicate 通过,返回内层 service 产生的 future(通过 err_into() 把错误类型转成 BoxError)。
  • Err:predicate 拒绝,返回一个立即 Ready 的错误 future(std::future::ready(Err(e)))。

对外统一成一个 ResponseFuture 类型——调用方不需要关心内部走的是哪条路径。这是典型的"用 Either 统一两个异构 future"的模式,在 Tower 里反复出现。

8.3.4 AsyncFilter:异步 predicate 和 Clone 的微妙处理

有时候 predicate 本身需要异步查询——比如"查 Redis 判断这个 token 有没有被吊销"。Tower 为此提供了 AsyncFilterAsyncPredicate

rust
// tower/src/filter/predicate.rs:5-23
pub trait AsyncPredicate<Request> {
    type Future: Future<Output = Result<Self::Request, BoxError>>;
    type Request;
    fn check(&mut self, request: Request) -> Self::Future;
}

返回 Future 而不是 Result——检查本身是异步的。

实现里有一个值得细品的Clone 处理

rust
// tower/src/filter/mod.rs:176-189
fn call(&mut self, request: Request) -> Self::Future {
    use std::mem;

    let inner = self.inner.clone();
    // In case the inner service has state that's driven to readiness and
    // not tracked by clones (such as `Buffer`), pass the version we have
    // already called `poll_ready` on into the future, and leave its clone
    // behind.
    let inner = mem::replace(&mut self.inner, inner);

    // Check the request
    let check = self.predicate.check(request);

    AsyncResponseFuture::new(check, inner)
}

这就是第 4 章警告过的"poll_ready 后 clone 再 call 的陷阱"的正确解法。AsyncFilter 之所以需要这么做,是因为 predicate 是异步的——check 过程中调用方可能并发地 call 另一个请求,这时候 self.inner 不能被独占。方法是:先 clone 一个备用 inner 放回 self.inner,然后把 "已经 poll_ready 过的原 inner" 搬到 future 里

这段代码的注释写得极好,直接告诉你"为什么"(in case the inner service has state that's driven to readiness and not tracked by clones (such as Buffer))——是整本 tower 源码里对这个经典模式最清晰的解释

8.4 Steer:多个 Service 的"分流器"

Steer 让你把多个 Service 组合成一个,按某种规则把请求分流到其中一个。

rust
// tower/src/steer/mod.rs:106-111
pub struct Steer<S, F, Req> {
    router: F,
    services: Vec<S>,
    not_ready: VecDeque<usize>,
    _phantom: PhantomData<Req>,
}

字段:

  • services: Vec<S>——所有内层 service。
  • router: F——picker 函数,fn(&Req, &[S]) -> usize 返回选中的 index。
  • not_ready: VecDeque<usize>——还没就绪的 service index。

8.4.1 Picker trait

rust
// tower/src/steer/mod.rs:74-87
pub trait Picker<S, Req> {
    fn pick(&mut self, r: &Req, services: &[S]) -> usize;
}

impl<S, F, Req> Picker<S, Req> for F
where F: Fn(&Req, &[S]) -> usize,
{
    fn pick(&mut self, r: &Req, services: &[S]) -> usize {
        self(r, services)
    }
}

任何闭包都是 Picker。典型用法来自 Tower 官方文档里的例子:

rust
let mut svc = Steer::new(
    vec![root_service, not_found_service],
    |req: &Request<String>, _services: &[_]| {
        if req.method() == Method::GET && req.uri().path() == "/" {
            0  // root
        } else {
            1  // not_found
        }
    },
);

这就是一个最原始的 HTTP router——按 method + path 选 service。

8.4.2 poll_ready:head-of-line blocking

这里是 Steer 最有争议也最微妙的一段:

rust
// tower/src/steer/mod.rs:138-155
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
    loop {
        if self.not_ready.is_empty() {
            return Poll::Ready(Ok(()));
        } else {
            if self.services[self.not_ready[0]]
                .poll_ready(cx)?
                .is_pending()
            {
                return Poll::Pending;
            }
            self.not_ready.pop_front();
        }
    }
}

关键行为:Steer 在 poll_ready 时要求"所有 service 都 Ready"才报告 Ready

Why?因为 Steer 在收到请求之前不知道 picker 会选哪个 service——所以必须保证每一个候选都能接。不然就会出现 "picker 选了 2 号但 2 号 not ready 要挂起" 的尴尬——而 Tower 协议里 poll_ready 一旦 Ready 就必须能立刻 call。

这个设计带来了一个明显问题:head-of-line blocking。如果你有 10 个 service,一个慢,一个快 9 个快;哪怕所有请求都是要发给 9 个快的,Steer 也必须等那个慢的 ready 才行。文档里明确警告:

This will cause head-of-line blocking unless paired with a [Service] that does buffer items indefinitely, and thus always returns [Poll::Ready]. For example, wrapping each component service with a [Buffer] with a high enough limit ... will prevent head-of-line blocking.

翻译:要么每个 service 是"永远 Ready"的,要么用 Buffer 包住每个 service——让 Buffer 把背压吞掉,Steer 看到的就是"永远 Ready"的 service 列表。

这是一个对 Tower 协议有意的牺牲:Steer 本可以在内部维护一个"队列到哪个还在忙着的 service"的机制——但那就变成了 Balance。Steer 选择了简洁——保证它永远不偷偷替你排队,所有决策都由你在 picker 里显式做出。

8.4.3 call:按 picker 分发

rust
// tower/src/steer/mod.rs:157-167
fn call(&mut self, req: Req) -> Self::Future {
    assert!(
        self.not_ready.is_empty(),
        "Steer must wait for all services to be ready. Did you forget to call poll_ready()?"
    );

    let idx = self.router.pick(&req, &self.services[..]);
    let cl = &mut self.services[idx];
    self.not_ready.push_back(idx);
    cl.call(req)
}

三件事:

  1. 断言所有 service 都是 ready(否则 panic)。
  2. 调 picker 选 index。
  3. 把这个 index 加入 not_ready(call 之后这个 service 可能又变回 pending),调 call。

注意self.not_ready.push_back(idx)——这个 service 被 call 之后就再次进入 not_ready 队列。下一次 poll_ready 又要 poll 它一次。这保证了 steer 的状态一致性——每 call 一次、标脏一次、下次 poll_ready 再确认 ready。

8.5 三者的组合实例

我们来看一个真实应用,把这些原语串起来:构造一个 authN + routing 的中间件链。

场景:HTTP 服务端,需求是:

  1. 所有请求必须带 Authorization header,否则 401(auth filter)。
  2. URL path 包含 /admin 的请求打上 X-Admin: true(map_request)。
  3. Path 以 /api 开头的走 API service,以 /web 开头的走 Web service,其他走 fallback(steer)。
rust
use tower::{filter::FilterLayer, util::MapRequestLayer, steer::Steer, ServiceBuilder, ServiceExt};
use http::{Request, Response};
use http_body_util::Full;

// 1. API / Web / Fallback 三个内层 service
let api_svc = /* ... */;
let web_svc = /* ... */;
let fallback_svc = /* ... */;

// 2. 用 Steer 组成一个"按 path 选 service"的组合
let routed = Steer::new(
    vec![api_svc, web_svc, fallback_svc],
    |req: &Request<_>, _svcs: &[_]| {
        if req.uri().path().starts_with("/api") { 0 }
        else if req.uri().path().starts_with("/web") { 1 }
        else { 2 }
    },
);

// 3. 在外面套上 map_request + filter
let app = ServiceBuilder::new()
    .layer(FilterLayer::new(|req: Request<_>| {
        if req.headers().get("authorization").is_some() {
            Ok(req)
        } else {
            Err(BoxError::from("401 unauthorized"))
        }
    }))
    .layer(MapRequestLayer::new(|mut req: Request<_>| {
        if req.uri().path().contains("/admin") {
            req.headers_mut().insert("X-Admin", HeaderValue::from_static("true"));
        }
        req
    }))
    .service(routed);

请求流:

req → Filter(auth)  → MapRequest(admin tag) → Steer(path router) → api|web|fallback → resp
         │              │                        │
         ↓ 401 err      ↓ 变形                    ↓ 分发

整条链是静态类型、零运行时选择、零分配。编译器把它全部单态化成一个具体的 struct。Axum 的 handler 分发、Tonic 的 method 分发,最底层用的就是类似的组合思路(虽然 Axum 自己实现了更复杂的 Router——因为要支持路径参数、动态方法、嵌套路由等,不是朴素的 Steer 能覆盖的)。

8.6 与卷五 Vue 3 的 h() 函数对照

有一个类比也许会让你会心一笑。我们读过卷五《Vue 3 设计与实现》关于响应式组件的章节——Vue 的 h(Component, props, children) 也是一种"用变换构造"的模式

javascript
h('div',
  { class: 'wrapper' },
  [ h(MyComponent, { prop: value }),
    h('span', {}, ['text']) ])

Tower 的 Steer::new(vec![svc_a, svc_b], picker) 从接口角度看几乎是一样的——"给我一堆组件(services)+ 一个选择函数,我给你一个统一的组合"。

更深一层:两者都解决"如何在不破坏类型的前提下把多路选择表达出来"。Vue 用 VNode 做运行时多态(每个 h() 产生一个 VNode 对象,在 diff 时决定行为),Tower 用 Picker + index 做编译期多态(每个 service 单态化保留其原始类型,picker 决定走哪条)。

这是一个有意思的"语言哲学影响抽象实现方式"的案例。JS 动态类型让 VNode 可以是任何东西、h() 返回值是同一个类型——这是动态语言的方便。Rust 静态类型要求你在 Steer 外面用 BoxService 擦除每一个 service 的类型(或者保证它们类型一致)——这是静态语言的约束。但两套系统表达的核心思想是一致的:组合 + 选择

8.7 一些容易踩的坑

8.7.1 Steer 里不能混 Service 类型

你不能直接 Steer::new(vec![http_svc, grpc_svc], ...)——因为 Vec 要求所有 service 类型一致。Tower 官方例子里总是先 BoxService::new(svc) 把每个 service 擦除类型,再塞进 Vec。这意味着每次请求分发都付一次 vtable 查找——但这是必须的代价。

8.7.2 AsyncFilter 的延迟累积

AsyncFilter 每次 call 都会异步查一次 predicate。如果你 10 个中间件都是 AsyncFilter,每个都查 Redis——单次请求就有 10 次 Redis round trip。AsyncFilter 只用在真的需要异步查询的场景,能用同步 Predicate 就用同步。

8.7.3 Filter 错误被 BoxError 包裹

Filter 的 type Error = BoxError——即使你内层 service 用了具体的错误类型 MyError,经过 Filter 之后就变成 BoxError。Axum 这类框架的顶层要 downcast_ref::<MyError> 才能识别业务错误。详见第 3 章关于 Layer 错误擦除的讨论。

8.8 和 tower-http 的关系

有一个关键的区分:tower 的 Filter / MapRequest 是完全协议无关的——它们只看 Service<Req>,不知道 Req 具体是什么。如果你要做HTTP 专用的变换(比如加 Authorization header、判断方法、解析路径参数),应该用 tower-http crate 里的专用中间件:

tower-http 中间件作用
SetRequestHeaderLayer给每个请求加一个固定 header
SetResponseHeaderLayer给每个响应加一个固定 header
ValidateRequestHeaderLayer校验某个 header 必须符合 predicate
CorsLayerCORS 中间件
TraceLayer自动把请求方法、路径、状态码记进 tracing span
AuthorizeLayerBearer/Basic token 鉴权

这些中间件底层都是 Tower 的 Filter / MapRequest 组合——只是它们带了 HTTP 语义,实现里能直接操作 http::Request 的 headers / method / uri。在生产代码里优先用 tower-http 的版本,不要手搓——除非你真的需要一个它没有提供的 predicate。

tower-http 和 tower 的分层在这里很清楚:

  • tower:协议无关的中间件"原子"。
  • tower-http:把原子组合成 HTTP 语义常用套件。

读一遍 tower-http/src/trace/make_span.rstower-http/src/auth/require_authorization.rs 的源码,你会发现它们基本都是 FilterMapRequest 的 http 特化——语法糖,核心在 tower 里。

8.9 落到你键盘上

本章读完:

  • 浏览 tower/src/util/ 下所有 map_ 文件*——map_err.rsmap_response.rsmap_result.rsthen.rsand_then.rs。一个接一个读,整套"Service 的 combinators"就清楚了。
  • 给自己的项目写一个 Filter——一个简单的 Content-Type 检查,或者一个 User-Agent 解析。用 Filter::new + 闭包,5 行代码解决,你会体会到"纯函数路由层"的干净。
  • 读 tower-http 的 TraceLayer 源码——它用 MapRequest + MapResponse 的思路构造了一个自动记录 tracing 的中间件,是 tower-http 里最优雅的中间件之一,不到 300 行。

第二部分(Tower 中间件源码实录)到这里结束。下一章开始我们离开 Tower,进入 HTTP 数据模型层——http crate 和 http-body。它们是 Hyper 和整个 Rust HTTP 生态的"数据语言"。

基于 VitePress 构建