Appearance
第4章 poll_ready 与 backpressure:显式容量信号的工程意义
4.1 一次真实事故
先讲一段真实发生过的事故。
某家公司在做一个新版 API 网关,Rust 编写,Axum 做路由,上游接一个慢的认证服务,所有请求都要经过认证。测试环境一切正常——吞吐、延迟、错误率漂亮。上线当天的凌晨一切也正常。第二天上午十点,业务高峰到来,内存曲线以一条近乎完美的直线向上爬。十一点半,服务被 OOM killer 杀掉,重启后几分钟又爬到天花板。监控看得见的连接数不高、CPU 不高、磁盘 IO 不高——只有内存在涨。
团队花了半天才定位到原因:他们在中间件层写了这样一段伪代码:
rust
async fn auth_middleware(req: Request, svc: AuthService) -> Response {
let resp = svc.call(req).await?;
next.call(resp).await
}AuthService 上游是一个能接 100 QPS 的认证服务。业务高峰每秒来 3000 个请求,每个请求都 spawn 一个 future、每个 future 都调用 svc.call(req)——然后被认证服务的 100ms 延迟阻塞。3000 个 futures 堆在内存里等着。认证服务越来越慢(因为它也在吃 3000 个 RPC),请求堆得更多,内存曲线继续爬。
这类事故在分布式系统里有一个专门的名字:unbounded queue collapse,无界队列坍塌。它的根源不在于慢的服务本身——任何系统都有 throughput 上限,慢本身不是错。问题在于调用方没有感知到对端已经满了。所有请求被无差别地接下来、排进一个(物理上或逻辑上的)无限队列,直到系统耗尽内存或者延迟彻底发散。
这就是 Tower 给 Service trait 配上 poll_ready 的根本理由——它让 Service 的容量状态被调用方可见,让背压(backpressure)从一个"好心的运维约定"变成类型系统里显式存在的信号。
这一章我们把 poll_ready 放到显微镜下看:它在问什么、它怎么工作、它为什么被设计成"先问再做"的两步协议、它和 tokio 生态其他"能容量感知"的原语(Semaphore、mpsc、permit)是什么关系。
4.2 "接得住"是一件不平凡的事
回到最基本的抽象问题。一个 Service 定义上是"async fn(Request) -> Response"——这句话里有一个隐藏的假设:Service 永远能接受一个新的 Request。
但真实世界里这个假设不成立。一个服务可能:
- 上游缓慢:它依赖一个数据库、一个下游 RPC、一个缓存;任意一个变慢,它也慢。
- 资源有限:只有 N 条 DB 连接、M 个 GPU slot、K 个 shared buffer。
- 节流规则:业务规则(配额、限速、防刷);合规规则(每分钟最多 60 个短信)。
- 内部排队:worker 队列已满、消息已积压。
"能不能接新请求"是一个真实、连续、动态变化的状态。它不是布尔值——更像是一个需要异步询问、可能当前不知道答案的信号量。
Tower 的回答是:让 Service 自己报告容量。
rust
fn poll_ready(&mut self, cx: &mut Context<'_>)
-> Poll<Result<(), Self::Error>>;翻译成人话:
- 返回
Poll::Ready(Ok(())):我现在有空位,你可以call。 - 返回
Poll::Pending:我暂时没空位,请挂起你的 task,等我有空了我会通过 waker 通知你。 - 返回
Poll::Ready(Err(_)):我永久坏了,不要再发 call 过来,把我扔了。
这是一个异步、事件驱动、零轮询的容量信号。调用方 await 它的时候,tokio runtime 把 task 挂起到 waker 队列里;服务恢复时 waker 被触发,task 被唤醒。整个过程没有 busy-loop、没有 sleep-retry、没有虚假的"先发再看"——完全由 Rust 异步运行时的唤醒机制驱动。
4.3 ConcurrencyLimit:看清两步协议
把概念落到代码上。我们读一段 100 行的真实工业级实现:tower::limit::concurrency::ConcurrencyLimit(tower/src/limit/concurrency/service.rs)。它的作用是把任何 Service 限制在最多 N 个 in-flight 请求。
4.3.1 状态机
rust
pub struct ConcurrencyLimit<T> {
inner: T,
semaphore: PollSemaphore,
permit: Option<OwnedSemaphorePermit>,
}三个字段:
inner:被包裹的 Service。semaphore:tokioSemaphore的 poll-friendly 包装,初始化时容量为max。permit:当前是否持有许可。Some意味着"我已经预扣了一个槽位,可以call";None意味着"还没扣到许可"。
permit 是这段代码的灵魂——它让"poll_ready 阶段预扣资源"这件事变成了类型系统里显式存在的状态。
4.3.2 poll_ready:先拿许可,再问内层
rust
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// 1. 还没拿许可?试着从 semaphore 里拿一个
if self.permit.is_none() {
self.permit = ready!(self.semaphore.poll_acquire(cx));
debug_assert!(
self.permit.is_some(),
"ConcurrencyLimit semaphore is never closed, so `poll_acquire` \
should never fail",
);
}
// 2. 许可到手,继续问内层 Service 是否就绪
self.inner.poll_ready(cx)
}八行代码,三个关键决定:
- "预扣"发生在 poll_ready。一旦 semaphore 返回
Ready,permit字段从None变成Some(许可)。这个许可被放进 struct 持有——semaphore 的内部计数器实际上已经扣减了。 ready!宏处理 Pending 传播。semaphore.poll_acquire(cx)返回Poll<Option<Permit>>——ready!宏把Pending直接 return 出去;Ready(permit)则把 permit 取出来赋值。这是 Rust 异步编程里的标准控制流宏(std::task::ready!)。- 两层 poll_ready 串行。先拿 semaphore 许可,再问 inner 是否就绪。任何一步 Pending,整个
poll_ready就 Pending。只有两步都 Ready,才向调用方报告 Ready。
关键问题:如果 inner 还没就绪,许可已经预扣了怎么办?答案:那就让许可持有在 self.permit 里,不要释放。下一次 poll_ready 调用时,分支 if self.permit.is_none() 为假,直接跳过 semaphore 步骤,继续 poll inner。这避免了"每次 poll 都反复 acquire/release 许可"带来的抖动。
4.3.3 call:消费许可
rust
fn call(&mut self, request: Request) -> Self::Future {
let permit = self
.permit
.take()
.expect("max requests in-flight; poll_ready must be called first");
let future = self.inner.call(request);
ResponseFuture::new(future, permit)
}self.permit.take() 把 Option 清空并返回里面的值——这意味着许可的所有权从 struct 移出来,进入即将构造的 ResponseFuture。
.expect(...) 这一行的字面意思是"没拿许可就 call 的话我 panic 给你看"——这是 Tower 协议的强制条款(上一章讨论过)。调用方必须先 poll_ready 返回 Ready 才能 call,否则这里会 panic,程序崩溃。看起来严厉,但它对 protocol correctness 是必要的。
ResponseFuture::new(future, permit) 把许可和业务 future 绑在一起。许可的生命周期现在跟着 future 走:
rust
// tower/src/limit/concurrency/future.rs 精简
pin_project! {
pub struct ResponseFuture<T> {
#[pin] inner: T,
#[allow(unused)]
permit: OwnedSemaphorePermit,
}
}
impl<F, T, E> Future for ResponseFuture<F>
where F: Future<Output = Result<T, E>>,
{
type Output = Result<T, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx)
}
}ResponseFuture 只是对内层 future 的透传。许可被持有在 struct 字段里,字段命名是 permit,标注了 #[allow(unused)]——因为它只靠 Drop 起作用:当 future 完成(或被取消),ResponseFuture 被 drop,permit 跟着被 drop,tokio OwnedSemaphorePermit 的 Drop 实现会自动把计数器加回去。
许可的 acquire、consume、release 是通过 Rust 类型系统和 RAII 机制自动完成的,不需要 try/finally、不需要手动 permit.release()、不需要担心 panic unwind 的资源泄漏。这是 Rust 相对其他语言的典型优势。
4.3.4 Clone 的陷阱
看这段 Clone impl:
rust
impl<T: Clone> Clone for ConcurrencyLimit<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
semaphore: self.semaphore.clone(),
permit: None, // 关键:新 clone 不继承许可
}
}
}注意 permit: None——新 clone 出来的服务不共享旧实例的许可。它们共享同一个 semaphore(因为 PollSemaphore 内部是 Arc<Semaphore>),但每个实例独立 acquire 自己的许可。
这件事如果写错了是灾难性的。想象一下:如果 Clone 直接 derive,所有 clone 共享一个 OwnedSemaphorePermit——这在类型系统上不允许(permit 不是 Clone);但如果你手写成 permit: self.permit.clone(),就会在运行时造成"多个实例都认为自己独占一个许可",semaphore 的计数控制瞬间瓦解。
源码里的注释把这个权衡写得很清楚:
Since we hold an
OwnedSemaphorePermit, we can't deriveClone. Instead, when cloning the service, create a new service with the same semaphore, but with the permit in the un-acquired state.
这引出了一条通用规则——凡是在 poll_ready 里预扣了资源的 Service,Clone 时都必须重置资源状态。你在 Tower 里看到的 Buffer、Retry、Balance 的 Clone 都遵守这条规则。
4.4 poll_ready 在 async 世界的惯用法
手写 cx.waker().wake_by_ref() 是运行时代码,业务很少直接这么写。Tower 提供了 ServiceExt::ready,让你能用 .await 直接等待 Service 就绪:
rust
// tower/src/util/mod.rs:77-88 概念摘录
pub trait ServiceExt<Request>: tower_service::Service<Request> {
fn ready(&mut self) -> Ready<'_, Self, Request>
where Self: Sized {
Ready::new(self)
}
fn ready_oneshot(self) -> ReadyOneshot<Self, Request>
where Self: Sized {
ReadyOneshot::new(self)
}
}Ready<'a, T, R> 是一个 future,它内部 poll 的是 self.poll_ready(cx):
rust
// tower/src/util/ready.rs
pub struct Ready<'a, T, Request>(ReadyOneshot<&'a mut T, Request>);
impl<'a, T, Request> Future for Ready<'a, T, Request>
where T: Service<Request>,
{
type Output = Result<&'a mut T, T::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx)
}
}
// ReadyOneshot 的核心:
impl<T, Request> Future for ReadyOneshot<T, Request>
where T: Service<Request>,
{
type Output = Result<T, T::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(self.inner.as_mut().expect("poll after Poll::Ready").poll_ready(cx))?;
Poll::Ready(Ok(self.inner.take().expect("poll after Poll::Ready")))
}
}看清楚 Ready::poll 的语义:反复 poll 内部 Service 的 poll_ready,直到返回 Ready;然后把 Service 的可变引用还给调用者。
典型用法:
rust
let response = svc.ready().await?.call(req).await?;这一行读作:
svc.ready()返回一个Ready<'_, _, _>future;.await驱动这个 future,内部调用poll_ready——Pending时当前 task 被挂起、waker 被注册;Ready(Ok)时返回&mut svc。?处理可能的 error(service 永久坏掉)。.call(req)在确认就绪之后立即调用。.await等待响应。
这是 Tower 协议在 async 代码里的标准用法,Tonic 的 generated client code、reqwest 的 dispatcher、tower-http 的多个中间件内部都是这一招。如果你在业务里想"手动调用一个 Service"——无论是 HTTP client、gRPC client 还是自写的 Redis client——请记住这个组合:.ready().await?.call(req).await?,不要直接 svc.call(req).await。
ready_oneshot 和 ready 的区别:ready 借用 &mut self、返回后可以连续 call 多次;ready_oneshot 消费 self、只能 call 一次,但避免了生命周期缠绕,适合跨 await 点传递。Tower 里的 Oneshot future 就把"ready + call"打包成一个 future:
rust
let response = svc.oneshot(req).await?; // ready + call 合一一行搞定,但失去了 Service 的所有权,不能再次使用。
4.5 "backpressure" 在三个层次穿透
poll_ready 真正的威力在于传递性。请你仔细看一个栈:
ConcurrencyLimit<Timeout<HttpClient<ConnPool>>>从外向内:ConcurrencyLimit(限并发)→ Timeout(超时) → HttpClient(负责 HTTP) → ConnPool(连接池)。
每一层的 poll_ready 实现:
| 层 | poll_ready 的含义 |
|---|---|
ConcurrencyLimit | 有空闲许可 且 内层就绪 |
Timeout | 内层就绪(Timeout 本身无容量限制) |
HttpClient | 有可用连接(从 pool 拿) |
ConnPool | 总连接数 < 上限 |
最外层的 poll_ready 返回 Ready 的前提是:所有层都 Ready——许可有、内层 HTTP 有连接、连接池没满。任何一层 Pending,整个栈 Pending,调用方被正确挂起。
这就是我说的"背压是可以穿透抽象的"。只要每一层都遵守 Tower 协议,背压信号会沿着 trait 边界向外传播,最终让调用方感知到任何层次上的容量限制。
对比 Go 的 http.Handler 模型:
go
// Go 没有 poll_ready
func(w http.ResponseWriter, r *http.Request) {
// 没有显式机会说"我还没准备好"
}Go 通过 goroutine + channel 解决并发,但 Go runtime 不会把"慢服务"的反压传回调用方——goroutine 会一直 spawn,直到内存爆掉(章节开头那种事故的 Go 版本更常见)。Node.js 有 stream.backpressure(),但只对 byte-level 的 stream 有效;对业务级的"请求 - 响应"语义没法直接表达。
Rust 的 poll_ready 把容量信号抽象成一个 trait 方法,让它可以被中间件组合、传播、响应。这种"把运行时行为具象成类型系统里的 API"的做法,是 Rust 工程哲学最核心的一招。
4.6 常见错误:三种反模式
掌握协议之后,我们列三种常见错误,帮你在自己的代码里识别。
4.6.1 错误 1:跳过 poll_ready
rust
// WRONG
svc.call(req).await?直接 call,没有 poll_ready。这在大多数 Service 上可能"凑巧能跑"——但任何使用了 ConcurrencyLimit、Buffer、RateLimit、Balance 之类的中间件就会 panic,因为那些中间件的 call 实现里有 expect("poll_ready must be called first")。
正确写法是前面提到的 svc.ready().await?.call(req).await? 或者 svc.oneshot(req).await?。
4.6.2 错误 2:poll_ready 后 clone 再 call
这是 tower-service 文档里明确警告的经典错误。假代码:
rust
// WRONG
impl<S, R> Service<R> for Wrapper<S>
where S: Service<R> + Clone + 'static, R: 'static,
{
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx) // 在原始 self.inner 上 poll_ready
}
fn call(&mut self, req: R) -> Self::Future {
let mut inner = self.inner.clone(); // 克隆一个新的!
Box::pin(async move {
inner.call(req).await // 在 clone 上 call——clone 并没 ready!
})
}
}问题:poll_ready 是在原始 self.inner 上问的——它 acquire 了那个原始实例的 permit。但 call 的时候克隆了一个新实例,新实例的 permit 状态是 None,直接 call 会 panic。
正确的写法(文档里推荐):
rust
// CORRECT
fn call(&mut self, req: R) -> Self::Future {
let clone = self.inner.clone();
let mut inner = std::mem::replace(&mut self.inner, clone);
// 原 inner 是就绪的,已 own;self.inner 被替换为新 clone,下次 poll_ready 会重新准备
Box::pin(async move { inner.call(req).await })
}std::mem::replace 把"就绪的"inner 搬出 self,换上一个新 clone。新 clone 不 ready——下次调用 poll_ready 时它会重新准备。这是一个细微但重要的模式,几乎所有需要"在 call 里 spawn async task"的 Service 实现都应该遵守。
4.6.3 错误 3:长期持有许可
rust
// WRONG
async fn handler(mut svc: ConcurrencyLimit<Backend>, reqs: Vec<Req>) {
for req in reqs {
svc.ready().await.unwrap(); // 拿许可
svc.call(req).await.unwrap(); // 用许可
// 这次调用的 permit 被消费掉了——OK
// 但下次循环再 .ready().await 会 acquire 新的许可
}
}这段看起来正确。但如果你写成:
rust
// WRONG
let ready_svc = svc.ready().await.unwrap();
// 这里持有了 &mut svc 的许可很久……
do_something_slow().await;
// 此时许可还没被消费,但 semaphore 的 slot 被占着
ready_svc.call(req).await.unwrap();poll_ready 返回之后,许可已经被 struct 占住了。如果你在 call 之前做了大量的异步工作,那个许可会一直占用 semaphore 的 slot,其他 clone 的实例可能因此被饿死。
更隐蔽的版本:.ready() 返回的 Ready<'_, T, _> future 持有 &mut T 借用。如果你把它 spawn 到一个 task 里延迟执行,意味着许可被延迟消费——系统的有效并发度被降低。
经验规则:拿到 ready 之后立即 call。不要在 ready 和 call 之间插入任何异步等待。如果确实需要跨越 await 点,考虑用 oneshot(req) 或者把 ready 和 call 放进同一个 async move { ... }。
4.7 和 tokio Semaphore / mpsc 的桥梁
你可能会想:既然 tokio 已经有 Semaphore::acquire().await、mpsc::Sender::send().await,它们本身就是异步"等容量"——为什么 Tower 还要再发明 poll_ready?
两个原因。
第一,Tower 的抽象层次更高。Semaphore 只管"一个许可池"、mpsc 只管"一个 channel"——它们是 raw 原语。poll_ready 是所有需要容量管理的 Service 共享的一个 trait 方法——既能表达 Semaphore,也能表达 mpsc buffer,也能表达 connection pool,甚至能表达一个"看起来不忙但定时去拒绝请求"的 circuit breaker。
第二,poll_ready 能嵌套 + 透传。Semaphore::acquire 拿到的许可是一个具体对象;mpsc 的 send 是一次明确的送入。它们不会把"容量信号"自动向外传。而 poll_ready 通过 Service trait 的组合(Stack),把每一层的容量信号汇聚成栈顶那一个 poll_ready。只有栈顶就绪,整条链才就绪——这是单一原语做不到的。
实操上,二者配合得很好:
- Tower 中间件在实现 poll_ready 时,内部往往用 tokio Semaphore/mpsc 做资源管理。
ConcurrencyLimit用 Semaphore,Buffer用 mpsc——我们上面刚读过。 - 业务代码想做"底层资源 + 容量管理"的时候,优先考虑能不能装成 Service + Layer。这样你获得了整个 Tower 生态。
从卷四《Tokio 源码深度解析》第 12 章(异步 Mutex / RwLock / Semaphore)里我们已经看过 tokio Semaphore 的 poll_acquire 是怎么和 Waker 合作的——Tower 的 PollSemaphore 就是把那个 API 包成 Tower 惯用的 poll_ready 形状。
4.8 poll_ready 在 hyper 1.0 里"消失"的故事
剧透一次(第 13 章会完整展开):
hyper 1.x 自己定义的 Service trait 没有 poll_ready——只有 call(&self, req) -> Self::Future。这是 hyper 1.0 做的一个有争议的决定。
原因是 HTTP/2 多路复用场景下的语义困境:一个 Connection<T, S> 可能同时接收几十个 stream 的请求,如果每个请求都要先走一次 poll_ready,并且 poll_ready 的语义是"预扣资源"——那要怎么表示"十个 stream 同时在请求"?按顺序 poll_ready?那就退化成串行,多路复用失去意义。并行 poll_ready?那每个许可到底归谁?
hyper 团队的结论是:HTTP 层不需要 poll_ready。HTTP/1 是天然串行的(单连接上一个时刻只有一个请求);HTTP/2 的流控由 h2 层在连接级别处理,不是 per-stream 级别。业务层的背压(比如限并发)应该由用户在 Service 内部用 Semaphore 之类的工具自己处理,或者在 hyper 外层套一个 Tower 中间件栈。
这个选择的后果:
- hyper 1.x 的 Service trait 简洁得多——只有 call,没有 poll_ready,天然支持并发调用(因为签名用
&self)。 - Tower 的
ConcurrencyLimit等中间件还是能用——但必须套在 hyper 之外、在你的路由层;hyper_util::service::TowerToHyperService桥接适配器会在进入 hyper 时吞掉poll_ready。 - 复杂场景(比如 Axum)仍然用
poll_ready——Axum 的Router内部是 Tower Service,整个背压机制完好。只是"Axum → hyper"那一层桥接是背压信号的断点。
第 13 章会一字不落地读这段代码,你会看到"桥接适配器如何处理 tower 的 poll_ready"——简短得让人失望:它直接把 poll_ready 的结果丢掉。原因是 hyper 的 Service 没地方接这个信号。这是 hyper 1.0 设计的一个已知代价。
换句话说:你的 Tower 栈的背压,到 hyper 这一层就止步了。如果你依赖全链路背压,你需要在 Service 内部做自限流(Semaphore),或者把 ConcurrencyLimit 套在 axum 的 Router 外面(而不是 hyper 之内)。这不是 bug,是一个架构权衡。
4.9 一段对照:Python / Node.js / Go 里的"背压"都在哪
为了让你对 Rust 的选择有更深的理解,对照三大语言看一遍。
Python:asyncio 的 asyncio.Semaphore 提供容量管理。但 Python 没有 trait 机制——你没法让"各种服务"共享一个统一的背压 API。FastAPI 用 asyncio.Queue 自己搞一套;aiohttp 的 client 用 ClientSession._connector._limit 私有字段。每一个库的做法都不一样。代价:生态碎片化,中间件无法真正可复用。
Node.js:Stream 层有 highWaterMark 和 pause()/resume(),但只管字节流。Express/Fastify 处理请求时没有统一的"服务忙"信号——它们假设 handler 总能被调用。需要限流就用 express-rate-limit 之类的库,这些库的行为是直接拒绝(返回 429),不是挂起。
Go:http.Handler.ServeHTTP(w, r) 是同步签名,中间件靠 closure 套 handler。没有 poll_ready 概念。限流方法:
- 用 middleware 在入口就拒绝(类 429 逻辑);
- 用
semaphore.Weighted在 handler 里 acquire、处理完 release——阻塞住 goroutine; - 用 channel worker pool 自己实现背压。
Go 有 channel 这把大锤,能解决大多数问题,但"一个请求在进入 handler 之前等到服务有空为止"这个语义要自己实现——没有统一的协议可循。
Rust + Tower 是这几个语言里少数把"backpressure-aware service"做成协议级抽象的。这件事的工程红利:任何实现了 Service 的组件——HTTP handler、DB driver、gRPC client、WebSocket server——都能用同一套中间件管容量。这不是"Rust 比别的语言快",而是"Rust 的类型系统支持这件事、其他语言的类型系统支持不了"。
4.10 小结:落到你键盘上
本章的结论:
poll_ready不是样板,是协议核心——它把"服务能不能接新请求"变成异步事件驱动的信号,让调用方不必轮询、不必先发再后悔。- "两步协议"(poll_ready → call)是可传导的背压——每一层 Service 可以在
poll_ready里预扣资源、在call里消费、在Drop里释放。这个协议在 Stack 嵌套下自然传播。 - Clone 需要特别小心——凡是
poll_ready里扣了资源的 Service,Clone 实现必须重置扣资源状态。 - hyper 1.0 故意不带
poll_ready——这是对 HTTP/2 多路复用的一个架构妥协,第 13 章会展开。 - 实操里几乎总是
svc.ready().await?.call(req).await?——这是 Tower Service 被正确调用的惯用形式。
落到你键盘上的三件事:
- 写一段小代码测试
ConcurrencyLimit。创建一个 max=2 的ConcurrencyLimit,spawn 5 个并发任务都调用.ready().await.unwrap().call(...)。用tracing或println打印每个任务 acquire 和 release 许可的时间点。你会清晰看到"背压等待 - 触发 - 释放"的节奏。 - 把错误写法实验一次。故意跳过
poll_ready直接call——观察在ConcurrencyLimit下的 panic 消息。这一次"踩坑"会让你永远记住协议。 - 浏览 tower-http crate 里几个常见中间件的
poll_ready实现——Trace,Compression,Cors。你会发现大部分 HTTP 中间件的poll_ready就是透传 inner、不做任何自己的容量判断。这非常合理:HTTP 语义级的中间件不负责容量,容量是 Tower 核心中间件的职责。
下一章我们开始读 Tower 的中间件源码——从最基础的三个 Timeout、Retry、RateLimit 开始。