Skip to content

第5章 Timeout / Retry / RateLimit:基础中间件的源码剖析

5.1 三个最小单元

读过前四章的铺垫之后,我们终于可以坐下来读一段真实、完整、跑在 Linkerd 代理和 Tonic gRPC 流量上的工业级中间件源码了。

本章挑三个最基础也是最常用的 Tower 中间件:

  • Timeout — "给这次调用 30 秒时间,到点没回就放弃。"
  • Retry — "失败了再试一次,最多三次,每次间隔不一样。"
  • RateLimit — "每秒最多 100 个请求,超了就排队。"

每一个单独看都简单,但它们在 Tower 源码里的实现各自走了不同的路:Timeout 用的是纯"两 future 赛跑"、Retry 用的是"状态机 + Service clone"、RateLimit 用的是"token bucket + 预分配 Sleep"。读完这三段代码,你会掌握 Tower 中间件的全部三种主要实现范式——后面章节里的 BufferLoadShedBalance 都是这三种范式的变奏。

所有的源码引用来自 tower 0.5.3(commit 251296d),读者可以在本地 git checkout 251296d 之后对照验证。

5.2 Timeout:两 future 赛跑

5.2.1 整体结构

Timeout 的全部源码只有 timeout/mod.rs(70 行)+ future.rs(53 行)+ layer.rs(24 行)+ error.rs(22 行)——加起来不到 200 行。拆下来就三件事:

  1. struct Timeout<T> 存 inner + duration。
  2. ResponseFuture<T> 把两个 future 捆在一起。
  3. TimeoutLayer 是工厂。
rust
// tower/src/timeout/mod.rs:18-22
#[derive(Debug, Clone)]
pub struct Timeout<T> {
    inner: T,
    timeout: Duration,
}

注意 #[derive(Clone)]——这意味着"Timeout 本身不阻止 Clone"。内层 T 能 clone,Timeout 就能 clone。这点重要,它让 Timeout 能被塞到需要 Service: Clone 的场景里(比如 Retry 或 Buffer 的 worker)。

5.2.2 call 只做一件事:构造 future

rust
// tower/src/timeout/mod.rs:64-69
fn call(&mut self, request: Request) -> Self::Future {
    let response = self.inner.call(request);
    let sleep = tokio::time::sleep(self.timeout);
    ResponseFuture::new(response, sleep)
}

call没有 await。它构造两个独立的 future:一个是业务的(self.inner.call(request)——这也是 future,不 await),一个是 tokio 的 sleep。然后把它们装进 ResponseFuture——这就是返回值。

这是 Tower 中间件最典型的惯用法——call 是"future 工厂",不做实际工作。所有真正的 await 都发生在返回的 future 里。原因是 call 的签名 &mut self 会对"在 call 里 await"造成严重束缚(第 2 章讨论过)。把 await 延迟到返回 future 里,&mut self 的借用在 call 返回的瞬间结束,后续调度完全自由。

5.2.3 ResponseFuture::poll:顺序决定语义

rust
// tower/src/timeout/future.rs:38-52
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    let this = self.project();

    // First, try polling the future
    match this.response.poll(cx) {
        Poll::Ready(v) => return Poll::Ready(v.map_err(Into::into)),
        Poll::Pending => {}
    }

    // Now check the sleep
    match this.sleep.poll(cx) {
        Poll::Pending => Poll::Pending,
        Poll::Ready(_) => Poll::Ready(Err(Elapsed(()).into())),
    }
}

十五行代码,全部加起来。但每一行都值得琢磨。

先 poll response,再 poll sleep——顺序反过来就错了。为什么?如果先 poll sleep 返回 Ready(时间到了),那不管 response 是不是也恰好在这一纳秒完成,都直接报超时——业务成功的结果会被白白丢掉。先 poll response,给成功的路径最后一次机会。响应已经到了的话,直接 Ready 出去,sleep 变成"永远不会完成但也不会再被 poll"的 future 挂在那里,随 ResponseFuture 一起 drop。

map_err(Into::into):内层 error 被转换成 BoxErrorBox<dyn Error + Send + Sync + 'static>)。这是因为 Timeout 要和自己产生的 Elapsed 错误合成一个统一的错误类型——整个中间件对外承诺 type Error = BoxError。上一章讲 Layer 组合时提过这个权衡:错误类型擦除是中间件组合性的必要代价

pin_project! 的作用#[pin] response#[pin] sleep 两个字段在 self.project() 里被安全地拿出 Pin<&mut T>。这是因为 tokio::time::Sleep!Unpin 的(它内部持有一个 timer registration 的指针,地址必须稳定),不能被安全地移动。pin_project_lite 宏帮你生成正确的投影代码,让你以声明式的方式描述"哪些字段要 pin"。

这个机制的细节在卷三《Rust 编译器与运行时揭秘》第 10 章(Pin / Waker / Future)里讲透过。如果你没读过,现在的直觉是:pin_project!Pin 系统的"声明式模板",你告诉它"这些字段需要被 pin",它帮你生成 unsafe 代码保证安全。

5.2.4 为什么没有"Timer cancel"?

老手可能会问:tokio Sleep 是不是一个会注册到 Time Driver 全局时间轮的定时器?Timeout 如果业务先完成了,sleep 有没有被 cancel?

答案:sleep 的 cancel 是隐式的——ResponseFuture 被 drop 时,sleep 字段被 drop,tokio 的 Sleep::drop 会把自己从时间轮上摘掉。你不需要手动调用 sleep.cancel()sleep.stop() 之类的 API。

这又是 Rust 的 RAII 在起作用——资源清理是"通过析构自然发生的"。Tower 的代码里看不到任何显式的 cancel/release——但每一次 ResponseFuture 的 drop(包括因为超时丢弃、因为调用方放弃、因为父任务被取消),都会把 sleep 从 tokio Time Driver 上摘掉。卷四《Tokio 源码深度解析》第 11 章(Time Driver 与分层定时器轮)里讲过这个机制——定时器轮里每个 entry 是一个侵入式链表节点,drop 触发 unlink。

5.2.5 TimeoutLayer:把上面这堆包成一个 Layer

rust
// tower/src/timeout/layer.rs 精简版
#[derive(Debug, Clone, Copy)]
pub struct TimeoutLayer {
    timeout: Duration,
}

impl TimeoutLayer {
    pub const fn new(timeout: Duration) -> Self { TimeoutLayer { timeout } }
}

impl<S> Layer<S> for TimeoutLayer {
    type Service = Timeout<S>;
    fn layer(&self, service: S) -> Self::Service {
        Timeout::new(service, self.timeout)
    }
}

TimeoutLayer 是 Copy——DurationTimeoutLayer 都是 POD-like 结构,没有任何堆分配。这让它能在 ServiceBuilder 里自由移动、多次 clone,不产生任何运行时开销。

Timeout 的故事到这里完结。你如果把 timeout/ 目录下的四个文件连起来读一遍,不到 200 行代码构成一个生产级、零分配、正确处理取消语义的工业级超时中间件。这就是 Tower 的品味——每一行都精简到不能再少

5.3 Retry:状态机 + 请求克隆

Retry 比 Timeout 复杂得多。因为"重试"这件事本质上包含三种状态:

  1. Called:正在跑本次请求。
  2. Waiting:请求失败了,按 policy 等 N 毫秒再重试。
  3. Retrying:等够时间了,等 inner service 再次就绪,然后重新 call。

这是一个清晰的状态机,Tower 把它编码在一个 enum State

rust
// tower/src/retry/future.rs:27-43
pin_project! {
    #[project = StateProj]
    #[derive(Debug)]
    enum State<F, P> {
        Called { #[pin] future: F },
        Waiting { #[pin] waiting: P },
        Retrying,
    }
}

三个变体分别对应三种状态:Called 持有本次 call 的 future、Waiting 持有 policy 返回的等待 future、Retrying 没字段(是瞬时状态,只负责做一次 poll_ready 然后再切回 Called)。

5.3.1 Policy trait:策略即接口

rust
// tower/src/retry/policy.rs:46-90
pub trait Policy<Req, Res, E> {
    type Future: Future<Output = ()>;

    fn retry(&mut self, req: &mut Req, result: &mut Result<Res, E>)
        -> Option<Self::Future>;

    fn clone_request(&mut self, req: &Req) -> Option<Req>;
}

两个方法定义了 Retry 的所有策略空间:

  • retry:拿到这次的 req 和结果,决定"要不要再试"。返回 None 表示"到此为止"、返回 Some(fut) 表示"等这个 future 完成之后重试"。
  • clone_request给 Retry 一个克隆请求的方式。有些请求能克隆(比如只读 GET),有些不能(比如流式上传),返回 None 表示"没法克隆"——Retry 就退化成"跑一次就返"。

第一个方法拿的是 &mut Req&mut Result——意味着 Policy 可以修改请求或结果。文档里举了例子:每次重试时给请求加一个 X-Retry-Count: 1 头(修改 req),或者在最后一次失败时把错误类型换成 RetriesExhausted(修改 result)。这种"可变访问"比只读更强大,是一个有意的设计。

5.3.2 为什么 Retry 需要 S: Clone

rust
// tower/src/retry/mod.rs:72-76
impl<P, S, Request> Service<Request> for Retry<P, S>
where
    P: Policy<Request, S::Response, S::Error> + Clone,
    S: Service<Request> + Clone,
{

S: Clone——inner service 必须能克隆。为什么?看 ResponseFuture 的字段:

rust
// tower/src/retry/future.rs:13-24
pub struct ResponseFuture<P, S, Request>
where P: Policy<Request, S::Response, S::Error>,
      S: Service<Request>,
{
    request: Option<Request>,
    #[pin] retry: Retry<P, S>,    // ← 整个 Retry 被存进 future
    #[pin] state: State<S::Future, P::Future>,
}

ResponseFuture 里存了一份完整的 Retry<P, S> 本身——也就是一份 S。为什么?因为重试的时候需要对 service poll_readycall 一次——调用方原来持有的 &mut Retry 在 call 返回后已经放开了,future 内部必须自己持有 service 才能后续再调用。

这就是 Retry 需要 S: Clone 的原因:每次 call 都要克隆一份 service 存进 future。如果 S 本身太大(比如持有大量配置),clone 成本会累计。Tower 对此的建议是在 Retry 外面再套一层 Buffer——Buffer 本身是 Clone(内部是 Arc+mpsc),把重的 Service 藏在 Arc 后面共享。

Retry::call 的实现完美对应这个思路:

rust
// tower/src/retry/mod.rs:88-93
fn call(&mut self, request: Request) -> Self::Future {
    let cloned = self.policy.clone_request(&request);
    let future = self.service.call(request);
    ResponseFuture::new(cloned, self.clone(), future)
}

policy.clone_request(&request) 尝试克隆请求(可能失败)、self.service.call(request) 启动第一次调用(使用原始请求)、然后构造 ResponseFuture(cloned_request, self.clone(), first_future)——整个 self(包括 policy 和 service)被 clone 进 future

5.3.3 状态机:ResponseFuture::poll

这是 Retry 最关键的 30 行:

rust
// tower/src/retry/future.rs:70-118 精简版
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    let mut this = self.project();

    loop {
        match this.state.as_mut().project() {
            StateProj::Called { future } => {
                let mut result = ready!(future.poll(cx));
                if let Some(req) = &mut this.request {
                    match this.retry.policy.retry(req, &mut result) {
                        Some(waiting) => this.state.set(State::Waiting { waiting }),
                        None => return Poll::Ready(result),
                    }
                } else {
                    return Poll::Ready(result);   // 请求没法 clone,不重试
                }
            }
            StateProj::Waiting { waiting } => {
                ready!(waiting.poll(cx));
                this.state.set(State::Retrying);
            }
            StateProj::Retrying => {
                ready!(this.retry.as_mut().project().service.poll_ready(cx))?;
                let req = this.request.take()
                    .expect("retrying requires cloned request");
                *this.request = this.retry.policy.clone_request(&req);
                this.state.set(State::Called {
                    future: this.retry.as_mut().project().service.call(req),
                });
            }
        }
    }
}

一个 loop 包着一个 match——这是 Rust 状态机 future 的标准结构。每次外层 poll 被调用时:

  1. Called:poll 当前 future。如果还没完成(ready! 返回 Pending),直接返回 Pending;完成了,拿到 result,问 policy 要不要重试。
  2. Waiting:poll 等待 future。完成之后切到 Retrying。
  3. Retrying:先 poll_ready(拿许可),然后拿出保存的 request再克隆一份(因为重试后可能还需要再次重试,所以留一份在 request 字段),call(req) 启动新一次请求,切回 Called。

这个状态机最精巧的地方是:整个重试链在同一个 ResponseFuture 的生命周期里被驱动。调用方只看到一个 async fn retry_service.call(req).await——背后可能发生了十次网络失败、等待、重试,全部封闭在 future 内部。

5.3.4 poll_ready 的尴尬假设

Retry::poll_ready 的实现是:

rust
// tower/src/retry/mod.rs:81-86
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
    // NOTE: the Future::poll impl for ResponseFuture assumes that
    // Retry::poll_ready is equivalent to Ready.service.poll_ready. ...
    self.service.poll_ready(cx)
}

直接透传到 inner。看起来合理,但注释里藏了一个微妙问题:

the Future::poll impl for ResponseFuture assumes that Retry::poll_ready is equivalent to Ready.service.poll_ready.

翻译:ResponseFuture::pollRetrying 状态里,直接 poll 了 service 的 poll_ready(第 106 行 this.retry.as_mut().project().service.poll_ready(cx))——而不是 poll 了整个 Retry::poll_ready。之所以这样写,是因为 Retry::poll_ready(&mut self) 要求 self: &mut Retry,而在 Future::poll 里你只有 Pin<&mut ResponseFuture>,想拿到 &mut Retry 需要 Retry: Unpin——但 Retry 被 #[pin] 标注了。

解决办法是"绕过 Retry 的 poll_ready,直接 poll 底层 service 的 poll_ready"——这在语义上正确当且仅当 Retry::poll_readyself.service.poll_ready 的透传。注释里这个 NOTE 是在告诉未来的维护者:"如果你改 Retry::poll_ready 的行为,要记得同步改 ResponseFuture::poll"。

这是一个工程细节——当你跨越 Pin 做状态机时,常常需要在某些方法上做"绕过"。Tower 的代码里坦白记录了假设前提,让 future 的维护者不会踩坑。写自己的 !Unpin future 时,这是值得借鉴的文档习惯。

5.4 RateLimit:token bucket + 预分配 Sleep

RateLimit 又是另一种实现范式。它要做的事情听起来很简单:"每秒最多 100 次"——但真要写对、写快、不抖动,需要一些技巧。

5.4.1 Rate 的定义

rust
// tower/src/limit/rate/rate.rs:1-29
#[derive(Debug, Copy, Clone)]
pub struct Rate {
    num: u64,
    per: Duration,
}

impl Rate {
    pub const fn new(num: u64, per: Duration) -> Self {
        assert!(num > 0);
        assert!(per.as_nanos() > 0);
        Rate { num, per }
    }
    pub fn num(&self) -> u64 { self.num }
    pub fn per(&self) -> Duration { self.per }
}

Rate 就是"N 个请求 / duration"。没有 "per second" 这种硬编码——你可以 Rate::new(100, Duration::from_secs(1))(100/s)也可以 Rate::new(5, Duration::from_millis(100))(50/s 但 burst 更紧)。这个分离是有意识的——有些场景需要平滑节流(小周期),有些场景接受突发(大周期)。

5.4.2 状态机:Ready vs Limited

rust
// tower/src/limit/rate/service.rs:13-25
pub struct RateLimit<T> {
    inner: T,
    rate: Rate,
    state: State,
    sleep: Pin<Box<Sleep>>,
}

enum State {
    Limited,
    Ready { until: Instant, rem: u64 },
}

两个状态:

  • Ready { until, rem }:当前周期到 until,还剩 rem 个许可。
  • Limited:当前周期许可用完,正在等下一个周期开始。

注意那个 sleep: Pin<Box<Sleep>> 字段——它在构造时就被预分配好:

rust
// tower/src/limit/rate/service.rs:29-45
pub fn new(inner: T, rate: Rate) -> Self {
    let until = Instant::now();
    let state = State::Ready { until, rem: rate.num() };
    RateLimit {
        inner, rate, state,
        // The sleep won't actually be used with this duration, but
        // we create it eagerly so that we can reset it in place rather than
        // `Box::pin`ning a new `Sleep` every time we need one.
        sleep: Box::pin(tokio::time::sleep_until(until)),
    }
}

注释解释了一个性能优化Sleep!Unpin 的,每次用都得 Box::pin 一个新的——那会频繁分配堆内存。作者选择在构造时预分配一个,后面需要重置时调用 Sleep::reset(new_deadline) 原地改截止时间,avoiding allocation hot path。

tokio Sleep::reset 的实现是 O(1) 的——它会把当前的 timer entry 从时间轮里摘下来、改 deadline、重新插入。这比 Box::pin(new_sleep) 快得多(不经过堆分配器,也不经过时间轮的创建开销)。这种"预分配 + 原地重置"是 hot-path 中间件的通用技巧。

5.4.3 poll_ready 的逻辑

rust
// tower/src/limit/rate/service.rs:71-88
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
    match self.state {
        State::Ready { .. } => return self.inner.poll_ready(cx),
        State::Limited => {
            if Pin::new(&mut self.sleep).poll(cx).is_pending() {
                tracing::trace!("rate limit exceeded; sleeping.");
                return Poll::Pending;
            }
        }
    }

    // 走到这里意味着: 要么本来就 Ready,要么 Limited 状态里 sleep 到期了
    self.state = State::Ready {
        until: Instant::now() + self.rate.per(),
        rem: self.rate.num(),
    };

    self.inner.poll_ready(cx)
}

逻辑:

  • Ready 状态:直接透传到 inner。不减 remainder——减 remainder 发生在 call 里。
  • Limited 状态:poll 那个预分配的 sleep;没到时间返回 Pending。到时间了,周期开始——重置 state 为 Ready,rem 重新变为 rate.num(),然后继续 poll inner。

注意这里有一个小缺陷:如果 Limited 状态下 sleep 刚好到期,此时会立刻进入新周期——但新周期的"起点"被设成 Instant::now(),不是原来的 until。这意味着如果你从 Limited 走出来的时刻是上一周期结束后 500ms(tokio scheduling 有延迟),新周期的 500ms 损失不会补回——rate 会稍稍"漏"一点。这在实践中基本不可感知,但理论上 rate limiter 不是数学严格的 N/duration

5.4.4 call 的逻辑

rust
// tower/src/limit/rate/service.rs:90-117
fn call(&mut self, request: Request) -> Self::Future {
    match self.state {
        State::Ready { mut until, mut rem } => {
            let now = Instant::now();

            // 周期过期了?开新周期
            if now >= until {
                until = now + self.rate.per();
                rem = self.rate.num();
            }

            if rem > 1 {
                rem -= 1;
                self.state = State::Ready { until, rem };
            } else {
                // 用完这个许可,下一次 call 前必须等 sleep
                self.sleep.as_mut().reset(until);
                self.state = State::Limited;
            }

            self.inner.call(request)
        }
        State::Limited => panic!("service not ready; poll_ready must be called first"),
    }
}

几个小细节:

  • if now >= until:这一段是"周期自动续"——如果 poll_ready 后很久才调用 call,原周期已经过去,直接开一个新周期。这对"偶发流量"场景友好——偶尔来一个请求永远不会被 rate limit 误伤。
  • if rem > 1:注意是 >1,不是 >0。这一次 call 消费掉的就是最后一个,状态切到 Limited。如果写成 >0,就会先减到 0 再在下一次 call 时发现没许可 panic——逻辑上等价但边界比较诡异,作者选择了更清晰的"发现快用完就提前切态"。
  • panic!("service not ready; poll_ready must be called first"):协议违反的标准处理——callLimited 状态下永远不应该被触达(poll_ready 会挂起调用方),如果真的发生,说明上游代码没按协议走。

5.4.5 RateLimit vs ConcurrencyLimit:概念边界

两个名字长得像,实际上是完全不同的约束

ConcurrencyLimitRateLimit
关心的维度同时 in-flight 请求数单位时间请求数
典型用途保护 CPU/内存资源保护依赖的下游(API quota、第三方限制)
背压机制Semaphore 许可周期计数 + Sleep
慢请求影响占住许可直到完成不影响(只看到达速率)
快请求影响可以并发 N 个可以突发一整个 rate.num

实操建议:两个一起用.concurrency_limit(10).rate_limit(100, Duration::from_secs(1)) 意味着"每秒最多 100 个新请求、同时 in-flight 不超过 10 个"。前者保护下游不被速率打爆,后者保护自己不被慢请求堆爆。

5.5 三种范式的总结

我们刚刚读的三个中间件其实对应三种不同的实现范式

中间件范式关键技术
Timeout双 future 赛跑pin_project + 顺序 poll
Retry状态机 + service cloneenum State + Loop+Match + S: Clone
RateLimit状态机 + 预分配 timerBox::pin once + Sleep::reset

Tower 里后面出现的中间件几乎都能归到这三种:

  • Buffer:状态机 + mpsc(类似 Retry 范式)
  • LoadShed:立即决定(Timeout 范式的退化版,只 poll 一次)
  • ConcurrencyLimit:双阶段 Permit(是 RateLimit 的简化——没有时间维度)
  • Balance:多 service + ready cache(Retry 范式的扩展)

当你给自己的代码写 Tower 中间件时,第一步不是"抄一个 Timeout 的模板"——而是先想**"我要做的事属于哪一种范式"**。范式决定了你的 type Future 要不要做状态机、要不要 clone inner、要不要持有 timer。

5.6 和卷四《Tokio》的连接

这三个中间件所有"异步等待"的底层能力都来自 tokio。Timeout 用 tokio::time::sleep、RateLimit 用 tokio::time::sleep_until——两者都建立在卷四《Tokio 源码深度解析》第 11 章讲的分层定时器轮上。

rust
tokio::time::sleep(Duration::from_secs(30))

// tokio::time::Sleep::new_timeout

// 向 Time Driver 注册一个 deadline

// Time Driver 以 "hierarchical timing wheel" 组织所有定时器

// Waker 到期触发,挂起的 task 被唤醒

理解了卷四讲的时间轮,你就知道为什么 tokio::time::sleep(Duration::from_millis(1))tokio::time::sleep(Duration::from_secs(3600)) 的开销几乎一样——注册代价是 O(1)、到期检查是 O(wheel_slots)。这就是为什么 RateLimit 即使设的是 Rate::new(1000, Duration::from_millis(1))(1M QPS)性能也不会崩——每个 Sleep 的注册只是把一个 entry 挂到时间轮的某个 bucket 上。

Retry 的 policy 返回的 Future 也可以利用 tokio time——一个典型的 "exponential backoff" policy 就是每次失败返回一个越来越长的 tokio::time::sleep。Tower 提供了一个辅助 retry::backoff 模块,里面就是这套模式的标准实现。

5.7 落到你键盘上:写一个自己的组合

给一个实战场景:你在写一个对第三方 OCR API 的调用层。约束是:

  • 这个 API 最多 5 QPS(第三方限制);
  • 失败(包括 429/500/504)要重试最多 3 次,每次间隔翻倍;
  • 每次调用最多等 10 秒,超了就放弃;
  • 全链路最多同时跑 20 个任务(保护自己的内存)。

正确的 Tower 栈:

rust
use std::time::Duration;
use tower::ServiceBuilder;

let ocr_service = ServiceBuilder::new()
    .concurrency_limit(20)                           // 最外层:保护自己
    .timeout(Duration::from_secs(10))                // 每次调用超时
    .layer(RetryLayer::new(ExponentialPolicy::new(   // 重试 + backoff
        3,
        Duration::from_millis(200),
    )))
    .rate_limit(5, Duration::from_secs(1))           // 最内层:对齐第三方限流
    .service(ocr_client);

这一串是有顺序的——外层先处理:先过 concurrency_limit 限总并发、再应用 timeout(整条链的 deadline)、再进 retry(可能重试多次)、最后 rate_limit(保证每次去打第三方都不超 5/s)。

几个细节值得品:

  • Rate_limit 放在最内层。放外层的话,rate_limit 把请求"吃进"后 timeout 就开始计时——但请求还得等 rate_limit 的 sleep,可能等完 sleep 就超时了。放内层让 timeout 包住"重试 + 限流"整个 window。
  • Retry 在 timeout 里面。第一次失败后等 200ms、第二次失败后等 400ms——这些等待时间都算在外层 timeout 里。超过 10 秒直接整条放弃,不会卡在第 3 次重试的长 backoff 里。
  • Concurrency_limit 放最外层。它是"系统容量"保护——不管业务做得多复杂,总并发超过 20 就挂起后续调用方。

这种"正确的组合顺序"是 Tower 工程的精髓——每一层放在哪里,决定了"超时从什么时候开始算"、"重试是否穿越其他中间件"、"背压信号是在哪一层被吸收"。没有绝对的最优解,但有跟工程目的匹配的合理组合。

5.8 小结

这一章把三个最常用的 Tower 中间件读到底:

  1. Timeout——双 future 赛跑,全部源码 < 200 行。
  2. Retry——状态机 + Service clone,Policy trait 抽象了策略空间。
  3. RateLimit——token bucket + 预分配 Sleep,减少 hot-path 分配。

每一个都是一种可借鉴的中间件实现范式。

落到你键盘上:

  • 打开 tower/src/retry/backoff.rs 读 Budget 策略。这是 Linkerd 生产环境用的 retry budget 算法,比朴素 ExponentialBackoff 复杂得多,但能防止重试风暴失控——真实世界的 retry 几乎都要和 budget 配合。
  • 实验 rate_limit vs concurrency_limit 的组合。在本地起一个 mock 下游服务(固定延迟 200ms),front 挂上 ServiceBuilder::new().rate_limit(5,1s).concurrency_limit(10).service(...) vs 反过来的 .concurrency_limit(10).rate_limit(5,1s).service(...)——观察它们在高并发 / 低并发 / burst 情况下的差异。
  • 读 tokio 的 Sleep::reset 实现tokio-1.46/tokio/src/time/driver/sleep.rs)。只有几十行,能看到它怎么把 entry 从时间轮摘下来、改 deadline、插回去。

下一章我们读 BufferLoadShed——它们代表 Tower 处理"过载"的两种截然不同的哲学:排队 vs 立即拒绝。

基于 VitePress 构建