Appearance
第7章 Balance / Discover / ready_cache:负载均衡抽象
7.1 从"一个服务"到"一群服务"
前六章我们讨论的所有中间件——Timeout、Retry、Buffer、LoadShed、ConcurrencyLimit——都围绕一个隐含假设:底层只有一个 Service。到这里我们要打破这个假设。
真实的分布式系统几乎总有多个同等能力的后端。你调用 user-service 其实是调用一个由十几个 Pod 组成的集群中的某一个;你的 gRPC 客户端背后是一个经过 DNS 轮询或 service mesh 发现的服务端点池。
这就引出三个相互咬合的问题:
- 怎么表达"后端集合"?Pod 可能随时被调度、新 Pod 可能随时被加进来、旧 Pod 可能因为 health check 失败被剔除——这是动态的。
- 怎么选一个后端?随机?轮询?按负载?按延迟?
- 怎么保证选到的那个后端"现在"能接请求?已经满了的后端要跳过,已经挂了的要不再选。
Tower 给这三件事提供了三个抽象:
| 抽象 | 作用 |
|---|---|
Discover trait | 动态后端集合的"变更流" |
ReadyCache<K, S, Req> | 维护"就绪 + 待就绪"两个池子 |
Balance<D, Req> | 在 ready 池子里选一个(默认 P2C 算法) |
这一章我们把三者串起来读。真实源码全在 tower/src/discover/、tower/src/ready_cache/、tower/src/balance/p2c/,版本 0.5.3。
7.2 Discover:服务集合的"变更流"
rust
// tower/src/discover/mod.rs:55-74
pub trait Discover: Sealed<Change<(), ()>> {
type Key: Eq;
type Service;
type Error;
fn poll_discover(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Change<Self::Key, Self::Service>, Self::Error>>>;
}语义:poll_discover 每次 poll 返回一个"集合变更事件"。它不是 "Vec<Service>",也不是 "HashMap<Key, Service>"——而是流式的增量:
rust
// tower/src/discover/mod.rs:100-107
pub enum Change<K, V> {
Insert(K, V), // 新来了一个 K 对应的服务
Remove(K), // K 对应的服务消失了
}为什么这样设计?因为服务发现在现代系统里从来不是"一次性拿到全量列表"的。Kubernetes 的 Endpoints API 用 watch 模式返回一串增量事件;Consul 的 service discovery 用 long polling;DNS SRV 记录有 TTL 要不断重查。把这些异构来源抽象成"一个流",下游(balancer)就不用关心"这些变化从哪来"——它只看到 Insert / Remove 的连续事件。
Discover trait 的另一个妙处是:它通过 blanket impl 自动覆盖了所有 TryStream<Ok = Change<K, S>>:
rust
// tower/src/discover/mod.rs:83-98
impl<K, S, E, D: ?Sized> Discover for D
where D: TryStream<Ok = Change<K, S>, Error = E>, K: Eq,
{
type Key = K;
type Service = S;
type Error = E;
fn poll_discover(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Result<D::Ok, D::Error>>>
{
TryStream::try_poll_next(self, cx)
}
}这意味着你只要有一个能产生 Stream<Item = Result<Change, Error>> 的东西——无论是 async_stream! 包的 coroutine、是 mpsc receiver、是从 Kubernetes API watch 出来的事件流——都自动满足 Discover trait。这种"把一个新抽象嫁接到已经成熟的 Stream 生态上"的设计非常体贴:你不需要为 Tower 专门实现什么,现成的东西就能用。
7.2.1 最简单的 Discover:ServiceList
如果你不需要真正的动态发现——只是想把一个固定列表喂给 balancer——tower 提供了 ServiceList:
rust
// tower/src/discover/list.rs 精简版
pub struct ServiceList<T> { inner: std::vec::IntoIter<T>, i: usize }
impl<T> ServiceList<T> {
pub fn new<I: IntoIterator<Item = T>>(services: I) -> Self {
Self { inner: services.into_iter().collect::<Vec<_>>().into_iter(), i: 0 }
}
}它的 poll_discover 就是不停地 Insert(i, next_svc),遍历完之后 stream 结束——对 balancer 来说等价于"这 N 个端点进来,之后永不变化"。适合配置写死的多实例、或者测试场景。
7.3 ReadyCache:两个池子的协同
Balance 真正干活的地方在 ReadyCache——整整 500 行代码。但它的核心思想可以用一张图说明:
┌──────────────────────────────────────────────┐
│ ReadyCache<K, S, Req> │
│ │
│ ┌──────────────┐ ┌────────────────┐ │
│ │ pending │ ---> │ ready │ │
│ │ │ │ │ │
│ │ K→Pending<S> │ │ K→(S, cancel) │ │
│ └──────────────┘ └────────────────┘ │
│ ↑ ↓ │
│ push(key, svc) call_ready_index() │
│ │
└──────────────────────────────────────────────┘两个池子:
pending:FuturesUnordered<Pending<K, S, Req>>——每一个 pending 是一个 future,它内部反复 poll 自己那个 service 的poll_ready,直到 Ready 了就"晋升"到 ready 池子。ready:IndexMap<K, (S, CancelPair)>——已经就绪的 service,按 key 索引。pending_cancel_txs:IndexMap<K, CancelTx>——每个 pending service 有一个 cancel 通道,可以在它还在 pending 期间把它剔除。
这里有两个数据结构值得拎出来讲。
7.3.1 FuturesUnordered:不分先后的并发 poll
FuturesUnordered 是 futures-util 的一个核心工具(回想卷四《Tokio》第 14 章讨论 select! 时也出现过类似的思想)。它把多个 future 装到同一个集合里,每次 poll 会同时推进所有内部 future——哪个先 Ready 就先产生 output。
对 ReadyCache 来说,这完美贴合"推动多个 pending service 的 poll_ready"这个需求——一次 pending.poll_next(cx) 就能让所有还在 pending 的 service 各被 poll 一次,任何一个 Ready 了就被产出。这是很典型的"数据结构即设计"——选对集合类型,算法几乎自动成立。
7.3.2 indexmap::IndexMap:保持插入顺序的 HashMap
ready 字段是 IndexMap<K, (S, CancelPair)> 而不是 HashMap——关键差别是 IndexMap 既支持按 key 查,也支持按 index 查,而且插入顺序稳定。
为什么需要 by-index?因为 P2C 算法要"随机挑两个端点"——最高效的做法是 rng.gen_range(0..n) 两次生成两个 index,然后按 index 直接拿。HashMap 没法按 index 取,需要先 .values().nth(i) 线性扫描。IndexMap 的 get_index(i) 是 O(1)。
这是一个小到不能再小的数据结构选型决定,但它让整个 balancer 的 hot-path 复杂度从 O(n) 降到 O(1)。一本好书会让你在这种细节处停一停——工业级代码的优秀往往不在算法,而在数据结构。
7.4 P2C:Power of Two Choices 为什么能行
Balance<D, Req> 默认用的算法叫 P2C(Power of Two Choices):
- 从 ready 池随机选两个端点 A、B;
- 比较它们的当前 load;
- 把请求发给 load 较小的那个。
rust
// tower/src/balance/p2c/service.rs:158-183
fn p2c_ready_index(&mut self) -> Option<usize> {
match self.services.ready_len() {
0 => None,
1 => Some(0),
len => {
let [aidx, bidx] = sample_floyd2(&mut self.rng, len as u64);
let aload = self.ready_index_load(aidx as usize);
let bload = self.ready_index_load(bidx as usize);
let chosen = if aload <= bload { aidx } else { bidx };
Some(chosen as usize)
}
}
}简单到让人怀疑它真的有用。事实上它好得惊人。
7.4.1 为什么两个就够
P2C 这个算法来自 Mitzenmacher 等人 1996 年的一篇论文《The power of two choices in randomized load balancing》。论文证明了一个反直觉的结论:从 N 个桶里随机选两个放最小的那个,得到的最大负载期望是 O(log log N);而纯随机选一个的最大负载期望是 O(log N / log log N)。
换成人话:有 100 个端点、100 个请求要分配——
- 纯随机:最繁忙的那个端点大概会接
log(100)/log(log(100)) ≈ 3个请求。 - P2C:最繁忙的那个端点大概只接
log(log(100)) ≈ 2个请求——几乎是最优的均匀分布。 - 理论最优:每个端点正好 1 个。
这个"用 2 换一次 log-reduction"的效果在负载很高、请求数远大于端点数时更显著。Linkerd、Finagle、Envoy 的默认负载均衡策略都是 P2C——不是因为它简单,而是因为它在现实世界里是最好的选择。
7.4.2 为什么不是"选最小的"
朴素直觉会说"直接扫描所有 ready 端点选 load 最小的那个不就行了吗?" 两个原因:
- 成本:n 个端点就是 n 次 load() 调用、n 次比较。1000 个 endpoint 的集群上,每请求一次 O(1000) 的开销在 hot path 上可观。
- 羊群效应(herd effect):如果总选最小的,所有并发调用者会同时把请求打到当前最小的那个端点——等到它 load() 更新后为时已晚。P2C 的随机性天然消解了这种 herd。
2 这个数字是"理论上最优"的妥协——把 2 增加到 3 或 4 收益微乎其微。P2C 就叫 "Power of Two"——不是凑数。
7.4.3 Load trait:让端点自己报告负载
P2C 需要比较 load——但 load 是什么?Tower 把这件事抽象成 Load trait:
rust
// tower/src/load/mod.rs
pub trait Load {
type Metric: PartialOrd;
fn load(&self) -> Self::Metric;
}metric 只需要可比较,不需要是数字——可以是 Duration(用 EWMA 延迟)、usize(in-flight 计数)、自定义 struct(综合指标)。Tower 内置了两种最常见的:
PendingRequests:当前 in-flight 请求数。最便宜的 metric,几乎免费。PeakEwma:延迟的 EWMA 加权移动平均。对抖动敏感,但能检测"快慢端点"的差异。
在真实系统里,PeakEwma 用得多——因为 "in-flight count" 对延迟不敏感,可能所有端点 in-flight 都是 10,但其中一个实际 p99 是 500ms。Linkerd 公开的数据里 PeakEwma 是默认选项。
7.5 Balance::poll_ready:把三者串起来
rust
// tower/src/balance/p2c/service.rs:208-250
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let _ = self.update_pending_from_discover(cx)?;
self.promote_pending_to_ready(cx);
loop {
if let Some(index) = self.ready_index.take() {
match self.services.check_ready_index(cx, index) {
Ok(true) => {
self.ready_index = Some(index);
return Poll::Ready(Ok(()));
}
Ok(false) => { /* no longer ready, try another */ }
Err(Failed(_, error)) => { /* endpoint failed, try another */ }
}
}
self.ready_index = self.p2c_ready_index();
if self.ready_index.is_none() {
return Poll::Pending;
}
}
}四件事:
- 更新 discover:把最近的 Insert/Remove 吸收到 pending/ready 池子里。
- 把 pending 提升到 ready:对所有 pending service 做一次 poll(通过 FuturesUnordered),就绪的进入 ready 池。
- check/select:如果上一次已经选了一个 ready_index,确认它还就绪;否则用 P2C 选新的。
- 结论:有可用的——Ready;一个都没——Pending(waker 已经被 FuturesUnordered 和 discover 注册好了,服务就绪时会被唤醒)。
注意第 3 步是一个 loop——check 失败时不直接 Pending,而是继续 P2C 找下一个。这保证即使某些 ready endpoints 刚刚变成不可用,只要 ready 池里还有其他可用的,poll_ready 就能报告 Ready。这是 fault tolerance 的细节——一次 poll_ready 调用里可以自动跳过多个坏 endpoint。
7.5.1 call 的简洁
rust
// tower/src/balance/p2c/service.rs:252-257
fn call(&mut self, request: Req) -> Self::Future {
let index = self.ready_index.take().expect("called before ready");
self.services
.call_ready_index(index, request)
.map_err(Into::into)
}两行:拿出之前 poll_ready 选好的 index,对那个 service 发起 call。
最神奇的地方在self.services.call_ready_index(index, request)——它内部把那个 service 从 ready 池子 take 出来、调用 .call(request)、把 service 送回 pending 池子等下次 poll_ready。这是 Tower ReadyCache 的核心不变式:每个 service 每次 call 后都要重新 ready——因为它的 poll_ready 可能会因为这次 call 消费了某个 permit 而变回 pending。
这个"call → 重进 pending → FuturesUnordered 自动 poll_ready → 重进 ready"的循环,由 ReadyCache 透明地驱动。你写 Balance 代码时看到的是"选一个、用一个",背后是整个一套高效的后端状态管理。
7.6 取消:CancelPair 的设计
ReadyCache 里有一对非常小但设计精巧的类型:CancelTx 和 CancelRx(tower/src/ready_cache/cache.rs:79-91)。
rust
#[derive(Debug)]
struct Cancel {
waker: AtomicWaker,
canceled: AtomicBool,
}
#[derive(Debug)]
struct CancelRx(Arc<Cancel>);
struct CancelTx(Arc<Cancel>);
type CancelPair = (CancelTx, CancelRx);一对 Tx/Rx 共享一个 Arc<Cancel>。AtomicWaker 存"谁在等 cancel",AtomicBool 存"已经 cancel 了"。
这是比 oneshot 更轻量的替代品——oneshot::channel 内部有一个 Arc<State> 加一个 Mutex,开销大。ReadyCache 的 CancelPair 只用两个原子变量,在高频 insert/evict 场景下几乎零开销。
它的用法:每个 pending service 带一个 CancelRx,它的 Pending future 在每次 poll 里检查 canceled——一旦上层 evict(&key) 调用了对应的 CancelTx,canceled 被置 true,waker 被触发,pending future 下次被 poll 时发现已 cancel,产出一个 PendingError::Canceled(key),FuturesUnordered 把它剔除出集合。
.ready 里的 service 也绑着一个 CancelPair,evict 时同样是立即标记——但 ready 的 service 不是 future,直接从 IndexMap 里移除即可。CancelPair 的设计统一了 pending 和 ready 两套的 evict 路径。
这段代码的美感不在算法,而在用 Rust 的所有权和原子原语把"动态集合 + 取消"这件看似复杂的事情压缩到极简。如果你写过 Go 里类似的 service discovery + pool,会知道那一般需要几百行状态机代码。Tower 的这一套总共不到 50 行。
7.7 组合出一个完整的客户端
把这一章讲的抽象和前面几章串起来,典型的一个"gRPC 客户端组合"长这样:
rust
use std::time::Duration;
use tower::{ServiceBuilder, balance::p2c::Balance, discover::ServiceList, load::PendingRequests};
// 1. 有一组 endpoints
let endpoints = vec![grpc_client_a, grpc_client_b, grpc_client_c];
// 2. 把每个 endpoint 包成"带 load metric"的 service
let endpoints = endpoints.into_iter()
.map(PendingRequests::new)
.collect::<Vec<_>>();
// 3. 把它们打包成一个静态 Discover
let discover = ServiceList::new(endpoints);
// 4. 用 Balance 做 P2C 负载均衡
let balanced = Balance::new(discover);
// 5. 在外面套上常规中间件
let svc = ServiceBuilder::new()
.concurrency_limit(500)
.timeout(Duration::from_secs(10))
.layer(RetryLayer::new(ExponentialPolicy::new(3, Duration::from_millis(200))))
.service(balanced);请求流:
caller → concurrency_limit → timeout → retry → balance → p2c_pick(A,B) → endpoint.call(req)任何一个端点临时变慢、不可用,P2C 会自然绕开它;新的 endpoint 通过 discovery 进来后,几毫秒内就能开始接请求;失败的 endpoint 触发 Retry 自动换一个。整条链条是可编译期单态化的类型链,运行时零 vtable、零堆分配(除了 Box<dyn Rng>——可以替换成具体 RNG 类型消除这个分配)。
这就是 Rust 后端生态最优雅的一段组合拳。
7.8 和其他系统对照
Envoy 的 P2C 实现在 C++ 里,大约 300 行代码,核心思想和 Tower 完全一致——因为论文就是那个论文。差别:Envoy 的 endpoint 负载 metric 是跨 upstream 集群归一化的,Tower 允许你自由定义 Load::Metric。
Finagle(Scala)的 P2C 是 Tower 的直接源头——@carllerche 就是从 Finagle 的 LoadBalancer 得到启发。Finagle 的 P2C 包含 aperture(只在"一部分"端点里做 P2C),Tower 暂时没实现。
Go grpc-go 默认用 round-robin(RR),而不是 P2C。为什么?Go 生态普遍把负载均衡视为"基础设施问题"——让 Envoy/Linkerd 做——所以 grpc-go 的内置实现选了更简单的 RR。Rust 生态相反:Tower 把 P2C 做成库级能力,让应用代码直接获得工业级负载均衡——不需要外挂 service mesh。这是两种生态哲学的差异。
7.9 关于 Unpin 的一个迂回
细心的读者可能已经注意到:Balance 的 Service impl 里有一条 D: Unpin 约束(源码第 194 行)。注释明确解释:
[
Balance] requires that the [Discover] you use is [Unpin] in order to implement [Service]. This is because it needs to be accessed from [Service::poll_ready], which takes&mut self.
原因:Service::poll_ready(&mut self) 拿到的是普通的 &mut self,不是 Pin<&mut Self>。但 Discover::poll_discover 需要 Pin<&mut D>。要把 &mut self.discover 转成 Pin<&mut D>,要么 D 是 Unpin(可以用 Pin::new(&mut self.discover) 零成本转换),要么 Balance 自己持有 Pin<Box<D>>(堆分配)。
Tower 选择前者——要求用户自己把 Discover 包成 Pin<Box<D>> 或者保证它 Unpin。这是一个 "把复杂性推到用户侧" 的决定,换来 Balance 内部实现的简洁。issue #319 有完整讨论。
这个现象在卷三《Rust 编译器与运行时揭秘》第 10 章(Pin / Waker / Future)里已经埋下伏笔——Pin 的传染性在 trait 设计里经常成为需要权衡的焦点。Tower 1.0 如果未来切到 async trait,这类约束可能会彻底简化。
7.10 落到你键盘上
读完这一章,你能做的事:
- 阅读
tower/src/load/下的 PendingRequests 和 PeakEwma 实现。两者都不长,但 PeakEwma 有一个时间衰减的精巧计算——读它你会学到如何用原子更新做"移动平均"。 - 实验一个动态 Discover。用 tokio
mpscchannel 做一个手写 Discover,用tokio::spawn模拟"每 5 秒随机增删一个端点",用tracing::debug!看 Balance 如何响应。 - 对照阅读 Linkerd2-proxy 的
linkerd-stack-discover模块(它在 GitHub 上开源)。Linkerd 的生产级负载均衡器建立在 Tower 的Balance之上,加了很多策略(aperture、退避、health check)——是学习"工业级应用如何扩展 Tower 抽象"的最好参考。
下一章我们读 Tower 的一组更 low-level 的中间件:Filter、MapRequest、Steer——它们不涉及容量管理,专注于"请求变换"。这些是路由层中间件的基础。