Skip to content

第20章 Connection pool 与 Connector 抽象

20.0 本章导读

凌晨两点被告警叫醒,你打开 Grafana 看见一张熟悉的图——P99 延迟从 30 ms 突然蹿到 1.8 s、QPS 腰斩、error rate 3%。抓包一看——客户端每一个请求都在走三次握手 + TLS 两次往返,也就是每次都在建新连接。再回头看发布 checklist,前一晚改了 reqwest 的 builder,把 .pool_idle_timeout(None) 删掉了,生产上默认值回到了 Some(Duration::from_secs(90))——上游是国外机房,RTT 120 ms 一次 TLS 握手光这一项就要烧 480 ms。于是刚 idle 出头的连接还没等到第二次复用就被踢出了池,复用率归零。这类故事每一年都会在不同团队不同 repo 里重演一次。

Connection Pool 不是一个"顺带做做"的小模块——它是 HTTP 客户端最核心的性能基础设施。hyper 把这块放在 hyper-util 这个外挂 crate 里(对应本书锁定版本 hyper 1.9、hyper-util 0.1.x),整个 legacy::pool 模块一共 1115 行代码,管着 Rust 生态 80% 以上 HTTP 客户端的"连接生命周期"。reqwest 用它、tonic 用它、aws-sdk-rust 用它、cloudflare 的 workers-rs 用它。你线上的每一个 HTTP client,背后都有这份代码。

读完本章你会理解:

  • 为什么 hyper 把 pool 放在 hyper-util 而不是 hyper 主 crate;
  • Pool<T, K> 那两个泛型参数到底是哪两个"可插拔维度"
  • Connection 的三种状态(idle、in-use、reserved)在源码里对应的具体代码路径
  • checkoutpooled 为什么一定要成对出现;
  • HTTP/2 连接为什么共享一条就够而 HTTP/1 必须 N 条——这个差别如何写进 Reservation 枚举;
  • Connector 抽象如何把 DNS / TCP / TLS 三层拆开,用户怎么换成自己的 tower::Service
  • Pool 满了、DNS 陈旧了、TIME_WAIT 堆积了——这些经典事故在代码的哪一行发生

本章涉及到的源码文件一共四个:

  • hyper-util/src/client/legacy/pool.rs(1115 行)—— Pool 主实现。
  • hyper-util/src/client/legacy/client.rs(1694 行)—— Client 把 pool 和 connector 拼起来。
  • hyper-util/src/client/legacy/connect/mod.rs(444 行)—— Connector trait + Connected metadata。
  • hyper-util/src/client/legacy/connect/http.rs(1449 行)—— 默认 HttpConnector,含 Happy Eyeballs。

源码版本:hyper 1.9.0 / hyper-util 0.1.x(与前面 17、18、19 章一致)。

20.1 为什么要 Connection Pool

20.1.1 一次"干净的 HTTPS 请求"代价清单

把手表按在浏览器上去访问 https://api.stripe.com,从发起到拿到第一个字节,中间发生了几件事?——我用一台国内机器、目标机房在弗吉尼亚的环境做过一次 tcpdump 切片计时,数据大致这样:

阶段动作耗时(RTT≈180 ms)
1DNS 解析(A/AAAA)~30 ms(命中本地缓存)/ ~180 ms(未命中)
2TCP 三次握手(SYN / SYN-ACK / ACK)~180 ms(1 RTT)
3TLS 1.3 ClientHello / ServerHello + Finished~180 ms(1 RTT)
4HTTP 请求发出、等待第一字节(TTFB)~180 ms(1 RTT)
合计connect() 到 first byte≈ 570 ms

整条链路在还没开始传业务数据之前就烧掉了将近 600 ms——其中 360 ms 是可以通过复用连接省掉的(TCP + TLS 握手)。如果你的接口 SLA 是 P99 < 500 ms,连一个单请求的预算都不够,更何谈做重试、做超时、做 backoff。

这就是 Connection Pool 存在的第一理由——把 TCP + TLS 握手的一次性成本摊薄到多次请求上。如果 pool 每次能复用,上表的第 2、3 项都消失,整条链路缩到大约 210 ms,P99 立刻跌回可用的范围。

20.1.2 不复用的第二个代价:TIME_WAIT 陷阱

很多新手只记住"不复用慢",但其实还有一个更隐蔽的代价——TCP TIME_WAIT。RFC 793 规定主动关闭方的 socket 进入 TIME_WAIT 状态会持续 2MSL(典型 Linux 内核 60 秒)。这期间这个 (local_ip, local_port, remote_ip, remote_port) 四元组不能被重用

换算一下——如果你的客户端每秒发 2000 次短连接,每次关闭后 60 秒不释放,相当于随时有 12 万个 TIME_WAIT 端口占着位置。默认 ip_local_port_range 在 Linux 上大概是 32768-60999,也就是 28231 个端口。理论上你每秒超过 470 次短连接,端口就会不够分connect() 返回 EADDRNOTAVAIL——错误信息里写的是"cannot assign requested address",但用户看到这行通常都会先怀疑 DNS 或者防火墙,排查半天才追到 TIME_WAIT。

Connection Pool 把"关闭连接"这件事从"每个请求做"变成"很少做",TIME_WAIT 的问题自然消失。这是一种副作用性质的治理——你不是为了治 TIME_WAIT 才做 pool,但做了 pool 就自然治了 TIME_WAIT。

20.1.3 第三个代价:拥塞窗口要从头涨

TCP 的拥塞控制算法(CUBIC、BBR、Reno)都有一个"慢启动"阶段——新连接的初始拥塞窗口(initcwnd)一般是 10 个 MSS(约 14 KB)。如果你要下载一个 200 KB 的响应,慢启动过程中需要几次 RTT 才能把 cwnd 涨到带宽满载。对比已经跑过若干轮、cwnd 早就涨到 500 KB 的旧连接——同样的 200 KB,一个 RTT 就拉完。

这是 Netflix / Google / Cloudflare 都在公开场合讲过的经验——"长连接是带宽的放大器"。Connection Pool 让客户端能吃到这份放大效应,不复用连接等于每次都从 "慢启动" 开始。

20.1.4 最后一个(但最容易踩):资源公平性

假设你跑一个爬虫,目标站点同时间段你要爬 100 个 URL。如果你的客户端不限制对同一 host 的并发连接数,100 个 goroutine / task 同时发起 100 个 connect,对方服务器看到的就是瞬间 100 个 SYN。大部分云厂的 WAF 会把你标记为攻击流量,轻则限速重则封 IP。

Pool 的 max_idle_per_host 配合应用层的并发限制就是一种柔性限流——单 host 永远只持有有限数量的连接,既避免把自己打爆,也避免对方把你标记成异常。我们下面会看到 hyper-util 里 max_idle_per_host 这个字段在 PoolInner 结构体里的位置(pool.rs:85)。

四个代价加起来——延迟、端口、带宽、公平性——任何一个拿出来都足以让 Connection Pool 成为生产 HTTP 客户端的必选项。hyper 的 API 设计哲学里也体现了这一点——你甚至不能直接用 hyper::client::conn::http1::handshake(那是单连接的底层 API)来做日常业务请求,官方推荐的入口就是带 pool 的 hyper_util::client::legacy::Client

20.2 Pool 的数据结构

20.2.1 从最外层的 Pool<T, K> 讲起

打开 pool.rs:25-28

rust
#[allow(missing_debug_implementations)]
pub struct Pool<T, K: Key> {
    // If the pool is disabled, this is None.
    inner: Option<Arc<Mutex<PoolInner<T, K>>>>,
}

这个结构非常朴素——一个可选的、用 Arc + Mutex 包起来的 PoolInner。两个设计意图一目了然:

  1. Option 表示pool 可以被禁用——当用户配 max_idle_per_host = 0 时,inner 就是 None,所有 pool 操作走 short-circuit 路径。这个设计让"不用 pool"也能走同一套 API,省了用户的条件分支。
  2. Arc<Mutex<_>> 表示pool 可以被多个 task 共享——典型场景是一个 Client 被 clone 到若干 axum handler 里,每个 handler 独立发请求。Arc 让 clone 廉价(Pool::clone 只复制 Arc,见 pool.rs:512-517),Mutex 保护内部的 HashMap。

泛型参数 TK 分别是"可插拔的连接类型"和"可插拔的 key 类型"——这两个维度的可插拔是整个 pool 抽象的灵魂

T 的约束写在 pool.rs:35-42

rust
pub trait Poolable: Unpin + Send + Sized + 'static {
    fn is_open(&self) -> bool;
    /// Reserve this connection.
    ///
    /// Allows for HTTP/2 to return a shared reservation.
    fn reserve(self) -> Reservation<Self>;
    fn can_share(&self) -> bool;
}

三个方法分别回答三个问题——"还活着吗?""怎么把自己交出去?""能被多路复用吗?"这三个方法就是 pool 对"可池化连接"的完整契约。任何类型只要实现这三个方法,就能被这套 pool 装——hyper-util 自家的 PoolClient<B>client.rs:766)是一个实现;理论上你自己给一个 UdpSessionSqlConnection 实现 Poolable,pool 也照样能管——模块的通用性远超它的实际使用范围

K 的约束更宽松(pool.rs:44-46):

rust
pub trait Key: Eq + Hash + Clone + Debug + Unpin + Send + 'static {}
impl<T> Key for T where T: Eq + Hash + Clone + Debug + Unpin + Send + 'static {}

任何可哈希、可比较、可 clone 的类型都能当 key。hyper-util 自家在 client.rs:92 给出的实际 key 类型是:

rust
type PoolKey = (http::uri::Scheme, http::uri::Authority);

也就是 (scheme, authority) 二元组——等价于"https://api.stripe.com:443" 这种协议 + 主机 + 端口的组合。不同的 scheme / authority 各走各的桶——http://xhttps://x 不共享连接(因为 TLS 不通用),a.com:443a.com:8443 也不共享(不同端口本就是不同服务)。

把 key 留成泛型而不是写死成 (Scheme, Authority)——是一个前瞻性设计。未来如果 hyper-util 要支持"按 SNI 分桶""按 ALPN 分桶""按 SOCKS 代理分桶",只要换 key 类型就够,pool 内部完全不动。这是 Rust 基础库常见的"用 trait bound 替代硬编码"模式。

20.2.2 真正干活的 PoolInner

pool.rs:77-102

rust
struct PoolInner<T, K: Eq + Hash> {
    // A flag that a connection is being established, and the connection
    // should be shared. This prevents making multiple HTTP/2 connections
    // to the same host.
    connecting: HashSet<K>,
    // These are internal Conns sitting in the event loop in the KeepAlive
    // state, waiting to receive a new Request to send on the socket.
    idle: HashMap<K, Vec<Idle<T>>>,
    max_idle_per_host: usize,
    // These are outstanding Checkouts that are waiting for a socket to be
    // able to send a Request one. This is used when "racing" for a new
    // connection.
    waiters: HashMap<K, VecDeque<oneshot::Sender<T>>>,
    // A oneshot channel is used to allow the interval to be notified when
    // the Pool completely drops.
    idle_interval_ref: Option<oneshot::Sender<Infallible>>,
    exec: Exec,
    timer: Option<Timer>,
    timeout: Option<Duration>,
}

五个字段承担不同角色,一个一个拆开看:

  • connecting: HashSet<K>——"正在建连"集合。当某个 key 的 HTTP/2 连接正在建立,后续对这个 key 的请求不再启新 connect,而是等。这是为了避免同 host 建 N 条 HTTP/2 连接(HTTP/2 的卖点就是"一条顶 N 条",多建反而是反模式)。HTTP/1 不放进这个集合——因为 HTTP/1 每请求独占一条,并发发起多条是必要的。
  • idle: HashMap<K, Vec<Idle<T>>>——空闲连接的桶按 key 分桶,每桶一个 Vec<Idle<T>>。为什么是 Vec 而不是 VecDeque 或者 Heap?因为 IdlePopper::poppool.rs:299-338)永远从 vec 的尾部弹——"最新归还的先用",也就是 LIFO 策略。LIFO 在 idle timeout 存在时是最优的——最老的连接被冷置在底部,每次都优先抽底部旁边的新连接,"底部"的连接自然过期被逐出。
  • waiters: HashMap<K, VecDeque<oneshot::Sender<T>>>——排队等连接的那些 Checkout future。当 checkout() 被调但池里没空闲,checkout 会生成一个 oneshot::channel,sender 塞进这个 VecDeque,receiver 返回给 Checkout future 持有。等某条连接归还时(put 方法,pool.rs:348-412),先把 sender 从队首弹出发送 connection 过去——**"排队 + 转移"**的经典生产者消费者形态,用 oneshot 做跨 future 通信。
  • idle_interval_ref——定期扫描清理的后台 task 的"引用计数"通道。后面 §20.4 单独讲。
  • max_idle_per_hosttimeout——用户可配的参数。max_idle_per_host 默认 usize::MAX(见 client.rs:1034-1047),timeout 默认 Some(Duration::from_secs(90))(即 90 秒 idle 过期)。

这个数据结构极其简单——一个 HashMap、一个 HashSet、一个 HashMap of VecDeque——但它支撑了 hyper 整个异步 HTTP 客户端的连接生命周期。看到这里你应该有一种感觉——"基础设施代码的质量不体现在数据结构的花哨,而体现在数据结构刚好够用"。这是工业级库作者的共同美学。

20.2.3 连接的三种状态

回到本章开头提到的"三种状态"——idle、in-use、reserved——具体对应到源码里:

  • idle:空闲在 PoolInner::idle 的 Vec 里,包了 Idle<T> 结构pool.rs:588-591)。Idle<T> 带一个 idle_at: Instant 字段记录何时空闲——用来判断 idle timeout。
  • in-use:被用户代码持有,包在 Pooled<T, K>pool.rs:520-528)。Pooled 是一个RAII guard——它 Drop 的时候会自动尝试归还(pool.rs:560-580)。
  • reserved:HTTP/2 专属状态——一条连接同时被 pool 持有 + 被用户持有。这个语义通过 Reservation::Shared(T, T)pool.rs:63-72)表达——reserve 方法把 self 拆成两份,一份塞回 idle,一份交给 Pooled。你会问"一个 TCP 连接怎么能拆成两份"——答案在 client.rs:866-876PoolTx::Http2(tx.clone())——hyper 的 http2::SendRequest 本身是 Clone,因为它内部是一个 mpsc::Sender 指向真正的 connection task,clone 只是多一个发送端引用。这个设计本身就回答了"HTTP/2 连接如何被多路共享"这个问题——把 sender 抽象成 channel,所有用户拿到的都是 sender 副本

三种状态之间的转换都由同一把 Mutex 保护——Arc<Mutex<PoolInner>>。这是 hyper-util 的刻意选择——整个 pool 的并发原语只有一把锁。没有引入 lock-free 的 HashMap(dashmap / scc),没有分桶锁(striped lock),就是最朴素的 Mutex。对于一个 client 场景(大概几百到几千 QPS),这把锁的竞争根本不会成为瓶颈。库作者把**"简单可懂"的权重放到了"每秒多挤几百次锁"之前——这是工程成熟度的体现**。

20.3 Checkout 流程的源码走读

现在从"用户发起一次请求"这个入口,沿着代码走一遍完整的 checkout 过程。

20.3.1 入口:Pool::checkout

pool.rs:163-171

rust
pub fn checkout(&self, key: K) -> Checkout<T, K> {
    Checkout {
        key,
        pool: self.clone(),
        waiter: None,
    }
}

只创建一个 future,不立即 checkout——典型的"懒求值"。返回的 Checkout future 第一次被 poll 时才真正尝试拿连接。这符合 Rust async 的"future 被 poll 时才跑"的模型。

20.3.2 Checkout::poll 的三条分支

pool.rs:705-723

rust
impl<T: Poolable, K: Key> Future for Checkout<T, K> {
    type Output = Result<Pooled<T, K>, Error>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
        if let Some(pooled) = ready!(self.poll_waiter(cx)?) {
            return Poll::Ready(Ok(pooled));
        }

        if let Some(pooled) = self.checkout(cx) {
            Poll::Ready(Ok(pooled))
        } else if !self.pool.is_enabled() {
            Poll::Ready(Err(Error::PoolDisabled))
        } else {
            // There's a new waiter, already registered in self.checkout()
            debug_assert!(self.waiter.is_some());
            Poll::Pending
        }
    }
}

三条路径分别对应三种场景:

  1. 已经在排队等(waiter 已注册)——调 poll_waiter 看 oneshot 有没有结果。有就 Ready;没有继续 Pending。
  2. pool 里有空闲——self.checkout(cx) 从 idle vec 里 pop 一个回来。
  3. pool 被禁用——直接返回 Error::PoolDisabled

关键在于第一次 poll 走到第二条分支——self.checkout() 里发生了什么?看 pool.rs:654-702

20.3.3 self.checkout() 的内部逻辑

rust
fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T, K>> {
    let entry = {
        let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
        let expiration = Expiration::new(inner.timeout);
        let now = inner.now();
        let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| {
            trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
            {
                let popper = IdlePopper { key: &self.key, list };
                popper.pop(&expiration, now)
            }
            .map(|e| (e, list.is_empty()))
        });
        // ...
        if entry.is_none() && self.waiter.is_none() {
            let (tx, mut rx) = oneshot::channel();
            trace!("checkout waiting for idle connection: {:?}", self.key);
            inner
                .waiters
                .entry(self.key.clone())
                .or_insert_with(VecDeque::new)
                .push_back(tx);
            // register the waker with this oneshot
            assert!(Pin::new(&mut rx).poll(cx).is_pending());
            self.waiter = Some(rx);
        }
        entry
    };
    entry.map(|e| self.pool.reuse(&self.key, e.value))
}

一步步看:

  1. 锁住 PoolInner,取出 timeout 做过期判断器。
  2. idle HashMap 里按 key 找桶——找不到返回 None;找到拿 &mut Vec<Idle<T>>
  3. IdlePopper::pop(下面 §20.3.4)从 vec 里 从尾部弹出第一个"还没过期、还没关闭"的 entry。
  4. 如果没拿到——且当前 Checkout 还没注册 waiter——创建一个 oneshot,sender 塞 waiters 队列末尾,receiver 留在 self.waiter注意倒数第二行的 assert!(Pin::new(&mut rx).poll(cx).is_pending())——这一行是把当前 task 的 waker 注册到 oneshot 上。如果没这一步,哪怕后面有连接归还,当前 task 也不会被唤醒。poll_waiter 也在下一次 poll 时能看到 waker 已绑定。这是 tokio oneshot 的标准惯用法——"先 poll 一次 Pending 来注册 waker"
  5. 拿到的连接用 pool.reuse(&self.key, e.value) 包装成 Pooled<T, K> 返回。

整个过程在锁里完成 idle 查找 + waiter 注册——一个关键步骤。如果 idle 查找完先释放锁再 push waiter,会出现经典竞态:其他 task 可能在这个缝隙里归还连接、遍历完当前 waiters 集合之后 push 到 idle,此时当前 task 把自己加到 waiters 但已经错过了刚刚那次归还的通知。单锁就避免了这个问题——朴素但正确

20.3.4 IdlePopper 的细节

pool.rs:299-338

rust
impl<'a, T: Poolable + 'a, K: Debug> IdlePopper<'a, T, K> {
    fn pop(self, expiration: &Expiration, now: Instant) -> Option<Idle<T>> {
        while let Some(entry) = self.list.pop() {
            if !entry.value.is_open() {
                trace!("removing closed connection for {:?}", self.key);
                continue;
            }
            if expiration.expires(entry.idle_at, now) {
                trace!("removing expired connection for {:?}", self.key);
                continue;
            }
            let value = match entry.value.reserve() {
                #[cfg(feature = "http2")]
                Reservation::Shared(to_reinsert, to_checkout) => {
                    self.list.push(Idle {
                        idle_at: now,
                        value: to_reinsert,
                    });
                    to_checkout
                }
                Reservation::Unique(unique) => unique,
            };
            return Some(Idle {
                idle_at: entry.idle_at,
                value,
            });
        }
        None
    }
}

这一段代码集中体现了HTTP/1 vs HTTP/2 在 pool 层的差异

  • HTTP/1 走 Reservation::Unique——pop 出来的连接整条交给 checkout,不 reinsert。因为 HTTP/1 一次只能承载一个请求,本条连接还没用完之前不能再借给别人。
  • HTTP/2 走 Reservation::Shared(to_reinsert, to_checkout)——pop 出来之后 立刻to_reinsert 放回 idle(注意 idle_at 被更新为 now,不是原来的时间,等于"续命"),同时 返回 to_checkout 给 checkout。这样下一次 checkout 来的时候仍然能从 idle 里拿到同一条 HTTP/2 连接——实现多路复用。

注意 pool.rs:308-313 附近的维护者注释提出过一个未落地优化:如果某个 idle entry 已经过期,按理说更早进入列表的条目也可能过期,于是可以提前跳出循环并丢掉整段列表。作者没有直接这样做,是因为 push_back 的顺序不一定严格等于真实过期顺序:put 时可能插回一个刚 reserve 的 connection,并把 idle_at 设为 now。读这段注释你能感受到一个成熟工业级代码库的自省力——作者清楚知道哪里可能有优化、哪些优化的前提条件现在不成立,并把推理留给后来者继续验证。

20.3.5 Client::one_connection_for 的"racing" 策略

光看 Pool::checkout 还不完整——它只负责**"从池里拿"。真实生产场景下,常常是池里没空闲、但我也不想干等**——我希望一边等池、一边自己 connect 一条新的,谁快用谁。这个策略叫 racing

client.rs:391-481one_connection_for

rust
async fn one_connection_for(
    &self,
    pool_key: PoolKey,
) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, ClientConnectError> {
    if !self.pool.is_enabled() {
        return self.connect_to(pool_key).await.map_err(ClientConnectError::Normal);
    }

    // This actually races 2 different futures to try to get a ready
    // connection the fastest, and to reduce connection churn.
    //
    // - If the pool has an idle connection waiting, that's used immediately.
    // - Otherwise, the Connector is asked to start connecting to the destination Uri.
    // - Meanwhile, the pool Checkout is watching to see if any other
    //   request finishes and tries to insert an idle connection.
    // - If a new connection is started, but the Checkout wins after
    //   (an idle connection became available first), the started
    //   connection future is spawned into the runtime to complete,
    //   and then be inserted into the pool as an idle connection.
    let checkout = self.pool.checkout(pool_key.clone());
    let connect = self.connect_to(pool_key);
    let is_ver_h2 = self.config.ver == Ver::Http2;

    match future::select(checkout, connect).await {
        Either::Left((Ok(checked_out), connecting)) => {
            if connecting.started() {
                let bg = connecting
                    .map_err(|err| { trace!("background connect error: {}", err); })
                    .map(|_pooled| { });
                self.exec.execute(bg);
            }
            Ok(checked_out)
        }
        Either::Right((Ok(connected), _checkout)) => Ok(connected),
        // ...
    }
}

future::select(checkout, connect) 同时 poll 两个 future,取先 Ready 的那个。两条分支都有工程美学:

  • Checkout 先赢——说明 pool 恰好有 idle 连接可用;此时已经启动的 connect future 别浪费——spawn 到后台继续跑完,建好之后自然被 Drop 归还到 pool(回顾 §20.2.3 的 Pooled Drop)。下一次别人来用就有了。
  • Connect 先赢——直接用新建的。此时 checkout future 被 drop(drop impl 会清 waiters 里的自己,见 pool.rs:725-734)。

这个 "已经起的就跑完、先到的用" 策略比"要么等池要么起新连接"的单选策略要冷静得多——对流量突增场景非常友好。突增期每个请求同时触发 checkout + connect,先到的用、后到的预热池——接下来一秒池就鼓起来了,峰值过后就全靠池跑。

20.3.6 HTTP/2 特殊处理:pool.connecting(&key, Ver::Http2)

racing 对 HTTP/2 有额外保护——client.rs:504-512

rust
let connecting = match pool.connecting(&pool_key, ver) {
    Some(lock) => lock,
    None => {
        let canceled = e!(Canceled);
        return Either::Right(future::err(canceled));
    }
};

pool.connectingpool.rs:175-199

rust
pub fn connecting(&self, key: &K, ver: Ver) -> Option<Connecting<T, K>> {
    if ver == Ver::Http2 {
        if let Some(ref enabled) = self.inner {
            let mut inner = enabled.lock().unwrap();
            return if inner.connecting.insert(key.clone()) {
                // ... 首次拿到 connecting lock
                Some(Connecting { key: key.clone(), pool: WeakOpt::downgrade(enabled) })
            } else {
                trace!("HTTP/2 connecting already in progress for {:?}", key);
                None
            };
        }
    }
    Some(Connecting { key: key.clone(), pool: WeakOpt::none() })
}

HTTP/2 第一次对某个 key connect 时,connecting.insert(key.clone()) 返回 true,拿到"建连许可"。第二次再来时 insert 返回 false——直接返回 None,上层把这次 connect 标为 Canceled——让 checkout 那条分支去等。最终 HTTP/2 一个 host 永远只跑一条 connect 任务。HTTP/1 则不触发这个锁(if ver == Ver::Http2 分支外的 else),可以并发起多条。

这个锁的释放Connecting::droppool.rs:754-763)——建连结束(无论成功失败)Connecting 被 drop,inner.connected(&self.key) 把 key 从 connecting 集合移除。如果正在排队的 waiters 没被通知到(说明这次 connect 失败),waiters.remove(key) 把它们全部取消——让上层看到 Canceled 错误后去重试。

"HTTP/2 只跑一条建连 + 多个 stream 共享"这条语义——在这一整套代码里体现得淋漓尽致。你会看到 Rust 把复杂语义写成类型签名的能力在这里的价值——Reservation::Shared(T, T) 这一个枚举变体把整个语义表达在类型系统里,而不是注释里。

20.4 归还与 idle 管理

20.4.1 归还入口:Pooled::drop

Pool 的"归还"机制用 RAII——用户持有 Pooled<T, K> 期间算 in-use,Drop 时自动归还pool.rs:560-580

rust
impl<T: Poolable, K: Key> Drop for Pooled<T, K> {
    fn drop(&mut self) {
        if let Some(value) = self.value.take() {
            if !value.is_open() {
                // If we *already* know the connection is done here,
                // it shouldn't be re-inserted back into the pool.
                return;
            }

            if let Some(pool) = self.pool.upgrade() {
                if let Ok(mut inner) = pool.lock() {
                    inner.put(self.key.clone(), value, &pool);
                }
            } else if !value.can_share() {
                trace!("pool dropped, dropping pooled ({:?})", self.key);
            }
            // Ver::Http2 is already in the Pool (or dead), so we wouldn't
            // have an actual reference to the Pool.
        }
    }
}

这段逻辑里三个细节:

  • is_open() 检查——如果连接在 in-use 期间变 dead(远端发了 FIN、本端收到 RST、HTTP/2 GOAWAY 被触发),不归还。这个判断避免把坏连接塞回池。这也是为什么 hyper 的 PoolClient::is_open 定义是**!is_poisoned() && is_ready()**(client.rs:854-856)——两个条件同时成立才算"还能用"。
  • Weak 引用升级——pool: WeakOpt<Mutex<PoolInner<T, K>>>,归还时要 upgrade。如果 pool 本身已经被 drop(Client 已销毁),upgrade 返回 None,这条 drop 不归还——避免悬垂引用。Weak 是 Rust 里 GC-free 做"回调可能目标已死"的标准工具。
  • HTTP/2 的注释——"already in the Pool"——HTTP/2 的 Reservation::Shared 已经在 checkout 时 reinsert 了一份到 idle,这条 Pooled 其实是 shared 的那个"to_checkout" 副本,它 drop 时不需要归还(本来就有一份在池里)。所以 poolWeakOpt::none(),upgrade 得 None,走的是 else 分支。

20.4.2 PoolInner::put 的"先喂等待者再塞 idle"

pool.rs:348-412

rust
fn put(&mut self, key: K, value: T, __pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
    if value.can_share() && self.idle.contains_key(&key) {
        trace!("put; existing idle HTTP/2 connection for {:?}", key);
        return;
    }
    trace!("put; add idle connection for {:?}", key);
    let mut remove_waiters = false;
    let mut value = Some(value);
    if let Some(waiters) = self.waiters.get_mut(&key) {
        while let Some(tx) = waiters.pop_front() {
            if !tx.is_canceled() {
                let reserved = value.take().expect("value already sent");
                let reserved = match reserved.reserve() {
                    #[cfg(feature = "http2")]
                    Reservation::Shared(to_keep, to_send) => {
                        value = Some(to_keep);
                        to_send
                    }
                    Reservation::Unique(uniq) => uniq,
                };
                match tx.send(reserved) {
                    Ok(()) => {
                        if value.is_none() { break; } else { continue; }
                    }
                    Err(e) => { value = Some(e); }
                }
            }
            trace!("put; removing canceled waiter for {:?}", key);
        }
        remove_waiters = waiters.is_empty();
    }
    // ...
    match value {
        Some(value) => {
            let idle_list = self.idle.entry(key.clone()).or_default();
            if self.max_idle_per_host <= idle_list.len() {
                trace!("max idle per host for {:?}, dropping connection", key);
                return;
            }
            idle_list.push(Idle { value, idle_at: now });
            self.spawn_idle_interval(__pool_ref);
        }
        None => trace!("put; found waiter for {:?}", key),
    }
}

几个要点:

  1. HTTP/2 连接如果 idle 里已有——直接丢弃这次 put(value.can_share() && idle.contains_key)。这防止 idle 里累计 N 条 HTTP/2 连接(同 host 我们永远只需要 1 条即可多路复用)。
  2. 优先喂 waiters——从 waiters VecDeque 队首开始 pop,每个取消过的 sender 跳过;没取消的就给它 send 一份。HTTP/2 的 Reservation::Shared 把 value 拆成 to_keep(留着继续喂下一个等待者)+ to_send(送给当前等待者)——"一条 HTTP/2 够喂所有人"
  3. 喂完还剩就塞 idle——检查 max_idle_per_host,如果这个 key 桶已经装满,丢弃(而不是 panic)。这是 pool 的上界保护——单 host 空闲连接数永远不会无限涨。
  4. spawn_idle_interval——第一次有 idle 连接时启动后台扫描任务,下面单独讲。

"先喂等待者、后塞 idle"——这个顺序是避免延迟叠加的关键。如果先塞 idle 再通知 waiter,waiter 还得再走一遍 checkout 从 idle 拿,多一次锁 + 一次 pop——直接 send 过去一次锁搞定。

20.4.3 定期扫描:IdleTask

pool.rs:781-820

rust
struct IdleTask<T, K: Key> {
    timer: Timer,
    duration: Duration,
    pool: WeakOpt<Mutex<PoolInner<T, K>>>,
    pool_drop_notifier: oneshot::Receiver<Infallible>,
}

impl<T: Poolable + 'static, K: Key> IdleTask<T, K> {
    async fn run(self) {
        use futures_util::future;
        let mut sleep = self.timer.sleep_until(self.timer.now() + self.duration);
        let mut on_pool_drop = self.pool_drop_notifier;
        loop {
            match future::select(&mut on_pool_drop, &mut sleep).await {
                future::Either::Left(_) => { break; }
                future::Either::Right(((), _)) => {
                    if let Some(inner) = self.pool.upgrade() {
                        if let Ok(mut inner) = inner.lock() {
                            trace!("idle interval checking for expired");
                            inner.clear_expired();
                        }
                    }
                    let deadline = self.timer.now() + self.duration;
                    self.timer.reset(&mut sleep, deadline);
                }
            }
        }
        trace!("pool closed, canceling idle interval");
    }
}

后台 task 干两件事——定期清 expiredpool 被 drop 就退出future::select 这一对 future 很精彩:

  • on_pool_drop: oneshot::Receiver<Infallible>——Infallible 意味着这个 sender 永远不会 send 任何东西,它只作为"pool 还活着"的标志存在。sender 在 PoolInner 结构体里(idle_interval_ref: Option<oneshot::Sender<Infallible>>)。当整个 PoolInner 被 drop(也就是 Pool 的最后一个 Arc 被 drop),sender 也跟着 drop,receiver 收到 Err(Canceled)——select 的 Left 分支触发,IdleTask break 退出。
  • sleep: timer.sleep_until(...)——到点就 reset 再睡。每一轮醒来调 clear_expiredpool.rs:481-509)遍历所有 idle,retain 丢掉 not-open 或超 timeout 的连接。

这是一个**"永远活到 pool 死"** 的后台 task——没有计数器、没有引用计数、没有心跳,只靠 oneshot 的 drop 语义做生命周期绑定。比你自己写一个 Arc<AtomicBool> 外加 Weak 简洁得多。

这里的 oneshot 用法和本书第 18 章 upgrade.rs 里 OnUpgrade 的 oneshot 是同一个模式——都是利用 "sender drop → receiver Canceled" 做"外部信号"。而它又和卷四《Tokio 源码深度解析》第 13 章 channels 里讲过的"oneshot 的取消语义"形成闭环——整个 Rust async 生态都在反复用这个模式。第一次看可能不起眼,多用几次你会发现它替代了很多场景里的 AtomicBool + Waker 手写实现

20.4.4 spawn_idle_interval 的"首次启动"保护

pool.rs:425-461 里这段代码值得仔细读:

rust
fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
    if self.idle_interval_ref.is_some() { return; }
    let dur = if let Some(dur) = self.timeout { dur } else { return; };
    if dur == Duration::ZERO { return; }
    let timer = if let Some(timer) = self.timer.clone() { timer } else { return; };

    const MIN_CHECK: Duration = Duration::from_millis(90);
    let dur = dur.max(MIN_CHECK);

    let (tx, rx) = oneshot::channel();
    self.idle_interval_ref = Some(tx);
    let interval = IdleTask { timer, duration: dur, pool: WeakOpt::downgrade(pool_ref), pool_drop_notifier: rx };
    self.exec.execute(interval.run());
}

防重入、防零 timeout、防无 timer——三重守门MIN_CHECK = 90ms 是一个有趣的地板值——"即使你配了 10ms idle timeout 也不会比 90ms 扫得更快"——避免高频扫描把锁竞争推到不合理。这类"合理下限"的工程判断在成熟库里非常常见——类似 OpenJDK 的 G1GC 里 MinRegionSize、Linux 的 HZ 常数——都是从"物理极限"反推出的参数

20.5 Connector 抽象:DNS + TCP + TLS 三层分离

20.5.1 Connector 到底是什么

hyper-util/src/client/legacy/connect/mod.rs:9-14 的文档注释直接把 Connector 的定义写死:

A "connector" is a [Service] that takes a [Uri] destination, and
its Response is some type implementing [Read], [Write], and [Connection].

一个 Connector 就是一个 tower::Service<Uri, Response = impl Read + Write + Connection>。没有别的神秘。你可以把它当成**"给我一个 Uri,还我一个可读可写的 IO,并附带 metadata"**的一个抽象。

形式上的约束在 mod.rs:324-351——Connect trait(sealed)要求 Service<Uri>,错误类型要能转成 Box<dyn StdError + Send + Sync>,future 要 Unpin + Send,连接必须实现 Read + Write + Connection + Unpin + Send + 'static。sealed 是为了**"你不能手动 impl Connect"**——你只能 impl tower::Service<Uri>,剩下的由 blanket impl 给你。

20.5.2 三层分工

hyper-util 把 Connector 的组装分成三层——DNS、TCP、TLS——每一层都是一个独立可替换的小组件。

DNS 层connect/dns.rs。核心 trait 是 Resolve(在 sealed 模块里),公开类型 GaiResolverdns.rs:42-46)用 getaddrinfo 做同步解析,调用时 tokio::task::spawn_blocking 扔到阻塞线程池——保证不阻塞 runtime。用户可以替换成 hickory-resolver、自建 DoH 客户端,接口只需 Service<Name, Response = impl Iterator<Item = SocketAddr>>——"给一个域名,还我一组 IP"

TCP 层connect/http.rsHttpConnector<R> 定义在 http.rs:32-36

rust
#[derive(Clone)]
pub struct HttpConnector<R = GaiResolver> {
    config: Arc<Config>,
    resolver: R,
}

R 默认是 GaiResolver,但可以是任何实现 Resolve 的类型。Config 包含一堆 TCP 层参数:connect_timeouthappy_eyeballs_timeouttcp_keepalive_confignodelaysend_buffer_sizerecv_buffer_sizelocal_address_ipv4/ipv6tcp_user_timeout(Linux 专属)——这一共十几个 knob 覆盖了 Linux socket API 的主要 tunable。每个参数都有 setter(http.rs:262-400 左右),返回 &mut self 做链式调用。

默认值见 http.rs:228-254

rust
HttpConnector {
    config: Arc::new(Config {
        connect_timeout: None,
        enforce_http: true,
        happy_eyeballs_timeout: Some(Duration::from_millis(300)),
        // ...
        nodelay: false,
        // ...
    }),
    resolver,
}

两个值得留意的默认值:

  • connect_timeout: None——默认没有 connect 超时。也就是说如果对端不响应 SYN,你的 connect()一直等到内核 SYN 重传超时(Linux 默认 ~63 秒)。生产上必须显式配这个值,否则在对端 IP 黑洞、防火墙 DROP 这类故障下你的请求会僵死一分多钟。我们在 §20.7.1 会复盘一个真实事故。
  • happy_eyeballs_timeout: Some(300ms)——Happy Eyeballs 是 RFC 8305 定义的双栈 connect 策略——IPv6 先上,300ms 不通 fallback 到 IPv4 并行跑。这个默认值是 Chromium 团队经验值,业界普遍采用。

TLS 层:hyper-util 不包。生态里用 hyper-rustlshyper-tls 做"把 HttpConnector wrap 一层 TLS"。典型写法:

rust
let http_connector = HttpConnector::new();
let tls_connector = HttpsConnectorBuilder::new()
    .with_native_roots()?
    .https_or_http()
    .enable_http1()
    .wrap_connector(http_connector);
let client = Client::builder(TokioExecutor::new()).build(tls_connector);

wrap_connector 是一个经典的 decorator 模式——HttpsConnector 持有一个 HttpConnector,Service<Uri>::call 里先让 HttpConnector 拿到 TcpStream,然后在 TcpStream 上做 TLS handshake,返回 MaybeHttpsStream<TcpStream>。整条链路的 DNS、TCP、TLS 参数分别归属到各自的 connector 上,互不干扰。

20.5.3 Connector 就是 tower::Service:真正的"可插拔"

mod.rs:339-351 给所有 tower::Service<Uri> 实现了 blanket Connect

rust
impl<S, T> Connect for S
where
    S: tower_service::Service<Uri, Response = T> + Send + 'static,
    S::Error: Into<Box<dyn StdError + Send + Sync>>,
    S::Future: Unpin + Send,
    T: Read + Write + Connection + Unpin + Send + 'static,
{
    type _Svc = S;
    fn connect(self, _: Internal, dst: Uri) -> crate::service::Oneshot<S, Uri> {
        crate::service::Oneshot::new(self, dst)
    }
}

这句话的威力——"用户自己写一个 tower::ServiceFn 闭包就能当 Connector 用"。官方文档给了最精简的例子(mod.rs:20-24):

rust
let connector = tower::service_fn(|_dst| async {
    tokio::net::TcpStream::connect("127.0.0.1:1337")
});

一行!所有 Uri 都 connect 到 localhost:1337 的 Connector——这在本地测试、mock server、gRPC sidecar 场景下极其常用。你不用从 HttpConnector 的一堆配置里去掉 DNS、去掉 Happy Eyeballs、去掉 keepalive——直接写一个闭包就够。

这就是 tower 的 "Service 是一切" 哲学的力量——DNS 是 Service、Resolver 是 Service、Connector 是 Service、最终的 HTTP Client 也是 Service——所有层都用同一个 trait 抽象,用户在任何一层都能接入自己的实现。这点我们在第 2 章 Service trait、第 7 章 balance / discover 已经反复讲过,此处再次体验到"抽象一致性"给工程带来的收益。

20.5.4 Connected metadata:连接的"额外信息"

mod.rs:100-106

rust
#[derive(Debug)]
pub struct Connected {
    pub(super) alpn: Alpn,
    pub(super) is_proxied: bool,
    pub(super) extra: Option<Extra>,
    pub(super) poisoned: PoisonPill,
}

alpn 告诉 pool "ALPN 协商结果是什么"——如果是 H2,即使用户只请求了 HTTP/1.1,pool 也会把连接当 HTTP/2 处理(见 client.rs:523-538 的 alpn_h2 分支)。is_proxied 告诉 HTTP/1 writer "要写 absolute-form 还是 origin-form URI"——这个细节在代理场景下至关重要(CONNECT 隧道除外)。

poisonedPoisonPillmod.rs:108-138)——一个 Arc<AtomicBool>。外部代码(比如检测到协议级异常)可以调 connected.poison()——下次 is_open() 就返回 false,pool 会丢弃这条连接。这是 hyper 给用户的一个 "手动投毒" 能力——"虽然 TCP 还连着,但我知道它已经坏了,别再给别人用"——比如业务 level 检测到返回数据被篡改、或者中间件识别到异常 header 时,可以主动投毒。

Extra 是一个 type-erased 的 extension 机制(mod.rs:140、265-301 行),允许 Connector 给 Response 附加任意类型的 metadata。HttpInfohttp.rs:62-65)就是一个典型用法——HttpConnector 在连接建立后把 {remote_addr, local_addr} 塞进 Extra,res.extensions().get::<HttpInfo>() 就能在任意地方读到对端地址。

这种"Connector 的副产物可以按类型注入到 Response"的机制——也是 tower/hyper 生态的一个共同模式。类似的思路在《Serde 元编程》第 5 章 Deserializer 的 access traits 里也有——"数据流过过程中可以携带任意辅助信息,但辅助信息对不关心它的消费者不可见"

20.5.5 Happy Eyeballs 的一个小展示

顺手看一下 http.rs:711-739ConnectingTcp::new

rust
impl<'a> ConnectingTcp<'a> {
    fn new(remote_addrs: dns::SocketAddrs, config: &'a Config) -> Self {
        if let Some(fallback_timeout) = config.happy_eyeballs_timeout {
            let (preferred_addrs, fallback_addrs) = remote_addrs
                .split_by_preference(config.local_address_ipv4, config.local_address_ipv6);
            if fallback_addrs.is_empty() {
                return ConnectingTcp {
                    preferred: ConnectingTcpRemote::new(preferred_addrs, config.connect_timeout),
                    fallback: None,
                    config,
                };
            }
            ConnectingTcp {
                preferred: ConnectingTcpRemote::new(preferred_addrs, config.connect_timeout),
                fallback: Some(ConnectingTcpFallback {
                    delay: tokio::time::sleep(fallback_timeout),
                    remote: ConnectingTcpRemote::new(fallback_addrs, config.connect_timeout),
                }),
                config,
            }
        } else {
            // ...
        }
    }
}

DNS 返回一组 IP(典型情况下会有 v4 和 v6 各一到几个)之后,split_by_preference 按用户偏好把地址分两组——首选组 + fallback 组。首选组立即 connect,fallback 组等 happy_eyeballs_timeout(默认 300ms)后并行 connect。谁先返回用谁。这就是 RFC 8305 的工程实现。

注意 connect_timeout均分到每个 IP(http.rs:754):

rust
let connect_timeout = connect_timeout.and_then(|t| t.checked_div(addrs.len() as u32));

如果 DNS 返回 4 个地址、用户配了 10s connect_timeout,每个地址实际超时是 2.5s。这样总耗时不会超过用户的 connect_timeout——公平但偶尔被吐槽:"我明明配了 10 秒,为什么单 IP 只等 2.5 秒就认为失败?"这是一个微妙的 API 语义选择,文档里没特别突出,靠读源码才能明白。

20.6 HTTP/1 vs HTTP/2 的 pool 语义差异

前面几节零散提到了 HTTP/2 在 pool 里的特殊性。这一节集中梳理一次,给你一个"为什么要这么区别对待"的清晰画面。

20.6.1 语义对照表

维度HTTP/1.1HTTP/2
单连接并发1(不支持 pipelining 的情况下)多(默认 100 streams)
单 host 需要几条连接N(按并发数)1 即可
归还时机请求 + 响应完成stream 结束,连接不动
Reservation 类型Unique(T)Shared(T, T)
建连去重不需要需要(connecting HashSet)
idle timeout 的含义无请求期间多久关无 stream 期间多久关(少见,主要靠 PING)
Pool 里的副本数N永远 1

20.6.2 PoolClient::reserve 的枚举切换

client.rs:858-878

rust
fn reserve(self) -> pool::Reservation<Self> {
    match self.tx {
        #[cfg(feature = "http1")]
        PoolTx::Http1(tx) => pool::Reservation::Unique(PoolClient {
            conn_info: self.conn_info,
            tx: PoolTx::Http1(tx),
        }),
        #[cfg(feature = "http2")]
        PoolTx::Http2(tx) => {
            let b = PoolClient {
                conn_info: self.conn_info.clone(),
                tx: PoolTx::Http2(tx.clone()),
            };
            let a = PoolClient {
                conn_info: self.conn_info,
                tx: PoolTx::Http2(tx),
            };
            pool::Reservation::Shared(a, b)
        }
    }
}

fn can_share(&self) -> bool {
    self.is_http2()
}

Reservation::Shared(a, b) 的两个副本——a 给到 checkout 用户,b 留在 pool。tx.clone() 可行是因为 hyper::client::conn::http2::SendRequest 是 Clone——clone 等于把 mpsc sender 复制一份,背后指向同一个 connection task。

这种设计——"把多路复用的成本在 pool 层摊到 Clone 一个 channel sender"——简洁得令人瞠目。如果你在 Go 里要做同样的事情,你得手写一个 ConnPoolmap[host]*h2Conn、每个 h2Conn 里维护 stream counter、加锁检查是不是 max_concurrent_streams 到顶——几百行代码。Rust 这里一个 Reservation::Shared(T, T)T: Clone 就表达完了。类型系统把语义吃进去是 Rust 基础库设计里非常常见的成就。

20.6.3 ALPN 升级的"事后改派"

有一种场景特别有趣——用户请求了 HTTP/1.1,但服务端 ALPN 协商到 HTTP/2。怎么办?client.rs:523-538

rust
let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
    match connecting.alpn_h2(&pool) {
        Some(lock) => {
            trace!("ALPN negotiated h2, updating pool");
            lock
        }
        None => {
            let canceled = e!(Canceled, "ALPN upgraded to HTTP/2");
            return Either::Right(future::err(canceled));
        }
    }
} else {
    connecting
};

Connecting::alpn_h2pool.rs:744-751)尝试从 HTTP/1 的建连锁升级到 HTTP/2 的建连锁

rust
pub fn alpn_h2(self, pool: &Pool<T, K>) -> Option<Self> {
    debug_assert!(self.pool.0.is_none(), "Connecting::alpn_h2 but already Http2");
    pool.connecting(&self.key, Ver::Http2)
}

"连接建好之后才发现要升级,此时如果同 host 已经有 HTTP/2 连接在建,就放弃这条(让 pool 的已有 HTTP/2 连接去用)"——防止浪费一个握手。这是一个非常漂亮的乐观假设 + 事后修正的设计——绝大多数情况走 HTTP/1 路径,遇到 ALPN 升级时切换到 HTTP/2 路径。

20.6.4 为什么 HTTP/2 pool 不 reinsert 时 also_put

你可能会问——HTTP/2 的 shared 连接永远活在 pool 的 idle 里,那它什么时候被清?put 函数一开始就有一句保护:

rust
if value.can_share() && self.idle.contains_key(&key) {
    trace!("put; existing idle HTTP/2 connection for {:?}", key);
    return;
}

所以 HTTP/2 连接只有第一次会被塞进 idle——后续归还都是 no-op。真正把它从 idle 移除的路径有两条:

  1. IdlePopper::pop 发现它 is_open() == false——PoolClient::is_open 在 HTTP/2 连接被 GOAWAY 或底层 TCP 出错时变 false(client.rs:854),下次 checkout 时被丢弃。
  2. IdleTask::clear_expired 定期清理——如果 idle_timeout 到了,即使连接还活着也会被清。这个在 HTTP/2 场景下要小心——idle_timeout 设短了,连接会被当 HTTP/1 那样频繁清除,HTTP/2 的复用优势就消失了。实际建议 HTTP/2 配更长的 idle_timeout 或者直接 None

实践里我见过一个团队用 hyper + tonic 做 gRPC 客户端,QPS 不高但长期运行,配了 pool_idle_timeout(60s)——结果过 60 秒池里没 stream 就被清了,下次请求重建,整个应用变成了"每分钟握一次手"。改成 pool_idle_timeout(None) 后 CPU 降了 15%。

20.7 典型事故:pool 耗尽与 DNS 陈旧

讲了这么多机制,收尾之前必须来几个真实事故让所有抽象落到生产的尘土里

20.7.1 事故一:connect_timeout = None 加对端 IP 黑洞

某团队的图片处理服务走 hyper 调用上游图像 CDN。一天下午 CDN 某区域故障,转入流量到冗余区域但DNS 没同步更新,依然返回旧区域的 A 记录。旧区域的负载均衡器被拔了电源,SYN 有去无回。

现象:所有发去该 CDN 的请求全部挂起,QPS 从几千瞬间降到个位数。更糟的是——因为 pool 的 checkout + connect racing 策略(§20.3.5),一次用户请求会同时发起 checkout 和 connect;池里没连接所以 checkout 等;connect 走到 ConnectingTcp::connect 里对 IP 发 SYN,因为没配 connect_timeout,一直等内核 SYN 超时(/proc/sys/net/ipv4/tcp_syn_retries = 6,大概 127 秒)。128 秒!整个 task 卡住,axum::Server 的 handler 池被耗尽、新请求进来没 worker 跑、上游雪崩。

事后复盘根本问题——没有 connect_timeoutHttpConnector::new() 默认值是 None——意味着"用内核默认"。但内核默认(127 秒)对任何生产服务都太长。修复只需一行:

rust
let mut connector = HttpConnector::new();
connector.set_connect_timeout(Some(Duration::from_secs(3)));

3 秒对人类来说很长,对 HTTP 客户端来说是合理的上限——超过 3 秒都连不上对端,绝大多数情况下不是"正在重试能通"而是"对端已经挂了"。

这个事故的教训是——hyper-util 的默认值是"能让新手跑通"的默认值,不是"能在生产上稳跑"的默认值。Builder 上的每个参数都值得花 10 分钟读文档+源码,决定你的业务需要的值。

20.7.2 事故二:pool 耗尽(指 waiters 队列爆炸)

另一个团队把 pool_max_idle_per_host 配得太小——1。原意是"我希望同一个下游不要被我打爆,一个 host 最多一条连接"。正常时段一切看起来 OK,因为请求是一条来一条走,正好一条连接够用。

问题出在一次小爆量——业务上游下发了一批任务,客户端需要对同一下游连续发 50 次请求。按理说这 50 次会排队串行,延迟叠加到一两秒。但实际现象是——延迟正常,但对下游的 TCP 连接数瞬间飙到 20+。为什么?

因为 max_idle_per_host = 1 不是 "max concurrent per host = 1"。它限制的是idle 数量,不是in-use 数量。50 个请求并发进来,checkout 池里只有 1 条 idle,pop 走之后池空了——剩下 49 个进入 waiters 队列——同时每个也都在 racing 一条新 connect!因为 pool 的 Either::Left(checkout) or Either::Right(connect) 策略(§20.3.5)——checkout 没拿到就等池也在等新连接建好。新连接建好之后塞回池,喂下一个 waiter——但 50 个 connect 同时发起就是 50 条新 TCP。

结果——下游 WAF 把这个客户端标记为异常流量,封了 5 分钟 IP。

修正是两步:一是应用层做限流(tower 的 ConcurrencyLimit),把对同一下游的并发控制在 5 以下;二是max_idle_per_host 放宽到 10 让 pool 能蓄一些空闲。max_idle_per_host 不是并发限流器——这条一定要记住。

这个故事里 max_idle_per_host 这个参数的命名很有误导性——实际语义是"idle 桶容量上限",不是"同 host 并发上限"。即使看过 doc,也容易在脑子里混淆。源码里 pool.rs:396-399 这行才把语义说清:

rust
if self.max_idle_per_host <= idle_list.len() {
    trace!("max idle per host for {:?}, dropping connection", key);
    return;
}

——当 idle 桶满了,归还的连接直接扔掉

20.7.3 事故三:DNS 陈旧

这是一个 GaiResolver 的结构性问题。hyper-util 的 HttpConnector 只在建连时查 DNS——idle 连接永久持有之前查到的 IP。问题是——如果对端 DNS 切换到新 IP(发布、故障迁移、DNS TTL 过期),pool 里的旧连接依然连在旧 IP 上,直到它被关闭或 idle timeout。

典型场景:某团队把后端从 AWS 迁移到 GCP,DNS TTL 30 秒,按理应该 30 秒内前端客户端全部切流。但他们看到大概 10% 的请求持续打到旧 AWS(经过 CloudWatch 日志回溯),维持了好几个小时。

原因是——客户端 pool 里的 idle 连接没在 DNS 切换之前断掉。这些连接持续被复用,持续把请求发到老 IP(老机房仍然在线但没流量),形成"长连接尾巴"。

修正方案有几种:

  1. 短的 idle_timeout——比如 30 秒。让 idle 连接定期被清,重新建连时重新 DNS。代价是连接复用率下降。
  2. TTL 感知的 resolver——替换 GaiResolver 为 hickory-resolver 或自研 DNS 客户端,让 resolver 基于 DNS TTL 决定什么时候强制 refresh。但 pool 里的连接仍然不会主动断——只影响新建的连接。
  3. 主动 poison——在监控发现异常时调 connected.poison() 把所有 in-use 或相关的连接标记为坏。
  4. rolling restart——最暴力但最有效。DNS 切换时把客户端应用重启一遍,pool 从零开始建。

hyper-util 目前没有内置 TTL-aware pool 失效机制——这是社区里长期讨论但没定论的 feature。实际上连 reqwest 都没内置——大家的共识是这件事应该在更上层(应用层 / 配置层)解决,而不是在 pool 里。

这个事故引出一个更深的思考——pool 是一个双刃剑:它提高了性能,但也让"变化"传播变慢。DNS 变更、后端切换、灰度发布——所有"数据平面感知"的动作,对持有长连接的客户端都有延迟。设计系统时必须把"pool 陈旧"作为一个已知变量,要么用主动失效、要么用TTL 感知、要么用rolling 重启——不能假装这件事不存在。

20.7.4 事故四:keep-alive 陷阱(上游主动关,客户端不知道)

HTTP/1 的 Connection: keep-alive 本身是隐式的——双方默默约定"这条连接可以复用"。但 上游 可能因为maxKeepaliveRequests(Apache/Nginx 默认 100)或者 keepAliveTimeout(Nginx 默认 75s)主动关连接。上游发 FIN 之后,客户端下一次 read 才会发现——但如果客户端在那之前再发一个请求上去,写到这条"半关闭"的连接上——请求在 TCP 层被接受、但应用层收不到响应,拿到的是 connection: close 或者干脆 RST。

hyper 如何防御这种情况?PoolClient::is_openclient.rs:854)在 HTTP/1 场景下是 tx.is_ready() —— 如果底层 dispatcher 发现连接关闭会把 is_ready 变 false。但这个检测有竞态——上游 FIN 可能在 is_open() 返回 true 之后、send_request() 之前的那几纳秒里到达——结果是 send_request 成功发出但响应永远收不到。

这就是为什么 Client 有一个 retry_canceled_requests 参数(client.rs:1577)——如果请求被"canceled"(包括"发送时连接刚关"这种情况),如果连接是 reused 的(不是新建的),就重试一次client.rs:257-260

rust
if !self.config.retry_canceled_requests || !connection_reused {
    return Err(err);
}
trace!("unstarted request canceled, trying again (reason={:?})", reason);
continue;

"只对 reused connection 重试"——新建连接如果 canceled,很可能是业务级错误,重试没意义;reused connection 的 cancel 几乎肯定是"刚好撞到上游 keep-alive 关闭窗口"——重试一次就能拿到新建的连接发成功。这是 "幂等重试 vs 非幂等重试" 的经典判断——只在"明确可安全重试"的场景重试,避免业务端出现"写入两次"的副作用。

这个参数默认 trueclient.rs:1034)——hyper-util 默认帮你处理 keep-alive 关闭窗口的问题。绝大多数用户不知道这个参数存在,但它每天在你看不见的地方做正确的事。

20.8 与其他语言 HTTP 客户端的对照

把 hyper-util 的 pool 设计放到更宽的视野里看——理解**"为什么 Rust 这样做"**的最好方式是看其他语言怎么做、权衡在哪里。

20.8.1 vs Go net/http

Go 的 http.Transport 里带 pool,内部结构叫 idleConn + idleConnCh——功能差不多。关键差异:

  • Go 的 Transport 是"一个 struct 两把锁"——idleMuconnsPerHostMu 分开——hyper-util 只有一把锁。两种选择各有得失:Go 锁多但临界区短;Rust 锁少但临界区略大。单机 QPS 到几万级别都感觉不出差别。
  • Go 的 connection status 没有 Reservation 的概念——HTTP/2 连接的共享由 Transport 里另一条路径 altProtocol 处理。Rust 用类型系统把这个语义做成了 Shared(T, T) 变体——代码更对称
  • Go 默认 MaxIdleConnsPerHost = 2(很小!)——这是一个业界公认的不合理默认值,导致很多 Go 用户在高并发场景下意外碰到 TIME_WAIT。hyper-util 默认是 usize::MAX(更保守——不限量)——虽然可能引起内存膨胀,但至少不会有"突然降速"的诡异现象。

20.8.2 vs Python requests / urllib3

urllib3.HTTPConnectionPool 是一个最简单的实现——一个 Queue,put / get 两个方法。没有 idle_timeout 扫描(依赖 TCP keepalive)、没有 HTTP/2 区分、没有 racing。这对 Python 本来就单线程的场景合适——"简单 → 省心 → 够用"。

但这个简单也带来问题——Python 生态里连接陈旧是长期话题,很多项目不得不在应用层写"定期 recycle pool"。hyper-util 这些机制写得费劲,但长期维护成本更低——库作者花了几千行代码让用户永远不用关心。

20.8.3 vs Node.js http.Agent

Node 的 Agent 模型有意思——每个 http.Agent 实例独立一个 pool,用户可以在不同请求用不同 Agent 来隔离 pool。这种"多 pool"模型在某些场景下比"一个全局 pool"更方便——比如你想让某个特殊下游独占 5 条连接、不和其他下游抢。

hyper-util 怎么做?——Client 实例。一个 Client 背后一个 Pool。用户想隔离就 clone 一个新的 Client(但这样 pool 也独立了)或者用不同 builder 参数构造不同 Client。概念上等价于 Node 的 Agent,只是名字换了。

20.8.4 vs Java HttpClient(Java 11+)

Java 的 java.net.http.HttpClient 设计比 Apache HttpClient 现代得多——immutable + builderHTTP/2 first-classasync via CompletableFuture。pool 内部实现(jdk.internal.net.http.ConnectionPool)和 hyper-util 高度类似——idle: Map<CacheKey, LinkedList<HttpConnection>>expirer 定时任务、reserveConnection racing 策略。

最大的差异在线程模型——Java 用 OS 线程池 + CompletableFuture;Rust 用 Future + 用户态调度器。连接归还的 call site 在 Java 是 try-with-resources 或显式 connection.returnToCache(),在 Rust 是 Pooled::drop 自动调用。RAII 比显式归还更不容易出错——Rust 这里有明显的人机工程学优势。

20.8.5 小结:各自的"不变式"

四种语言四种实现,但共同的不变式只有几条:

  1. pool 按"协议 + host + port"分桶
  2. HTTP/1 每条连接同时只跑一个请求、HTTP/2 一条连接多路复用
  3. idle 超时的连接要清除——否则池会滞留僵尸连接。
  4. 不配 idle timeout 会陷阱(Go 的 MaxIdleConns=0、Rust 的 pool_idle_timeout=None 都是经典陷阱)。
  5. pool 不知道对端 IP 换了——这件事在任何语言里都需要上层感知

这些不变式跨语言存在——因为它们是TCP + HTTP 协议本身的物理约束带来的。Rust 的贡献是把它们写得更明确、更可读、更不容易写错——这也是我们花一整章看 hyper-util pool 的价值。

20.8.6 §20.0 文件清单的实测对齐 + 三条默认值真相

§20.0 列了"本章涉及到的源码文件一共四个",给的行数有一处偏差。按 hyper-util 0.1.20 当前 HEAD 实测:

文件章节正文实测偏差
legacy/pool.rs111511150 ✓
legacy/client.rs16941670-24
legacy/connect/mod.rs4444440 ✓
legacy/connect/http.rs144914490 ✓
合计47024678-24

client.rs 的 1694 → 1670 这一条偏差,与卷三 ch19 §19.10.5(hyper-util 12693 行账本)+ ch21 §21.7.5(client 子树 6619 行账本)给的同口径数字完全对齐——读者跨章交叉校对时不会再被同一文件三个数字搞晕。

三条本章正文没有点出来、但读者最该记住的"默认值真相"——全部从源码直接读出,可以剪切到生产 audit 清单里:

旋钮hyper-util 0.1.20 默认值源码位置§20.9 对应建议
pool_max_idle_per_hostusize::MAX(无上限)client.rs:1045 / 1092 doc + pool.rs:885 test§20.9 第三件——"只管 idle 桶,不管并发"
HttpConnector::connect_timeoutNone(无超时!)connect/http.rs:229 Config::default§20.9 第一件——绝对不要用默认 None
HttpConnector::happy_eyeballs_timeoutSome(Duration::from_millis(300))connect/http.rs:231双栈环境 IPv6→v4 fallback 仅 300 ms

第二条"connect_timeout = None" 是 §20.9 第一件 checklist 的真正源码依据——文档没有显式写出"如果不配会怎样",但 Config::default 里硬编码 None 就是答案:hyper 默认会让你的 connect() 等到内核 SYN retry 超时(约 127 秒)。这一行配置漏掉,你的 SLA 表立刻多一个 127s 长尾。

第三条 happy_eyeballs 300ms 是个很容易被忽略的隐藏旋钮——在双栈(IPv6 + IPv4)环境,preferred AAAA address 如果 300 ms 内没建立 TCP,hyper 会fallback 到 A address 并行尝试(RFC 6555 算法,http.rs:351 注释明确点名 RFC 6555)。如果你的 IPv6 链路偶尔慢一点(比如某些 ISP 的 v6 路由质量差),300ms 可能太短——结果是所有请求都默默走 v4,v6 链路的能力浪费掉。生产部署如果开了 v6,这条值要根据线路实测调(典型给 1000 ms)。

20.8.7 三个 Drop 撑起整个 pool 的 RAII

§20.4 那段"checkoutpooled 必须成对出现"的论述,背后真正落地的是 pool.rs三个 impl Drop——这是整章正文没有展开的"RAII 三件套":

类型Drop 位置触发动作
Pooled<T, K>pool.rs:560-561把租出去的连接还回 idle 桶(除非被 poison)
Checkout<T, K>pool.rs:725-726取消还在排队的 checkout(用户 cancel 了请求)
Connecting<T, K>pool.rs:754-755取消正在建立的连接(避免 connecting 锁泄漏)

三个 Drop 串成一个完整的取消传播链——任何一层用户 future 被 drop(比如 tokio::time::timeout(1ms, client.request(...)) 触发的取消),下面的 Pooled / Checkout / Connecting 都沿着 stack 自动释放。这条 RAII 链是 §20.8.4 对比 Java HttpClient 时强调的"人机工程学优势"的具体落地——Java 必须 try-with-resources、Go 必须 defer、Python 必须 with,Rust 是 drop 自动跑。三个 Drop 实现合起来不到 80 行代码,撑起了 pool 的"永远不会泄漏连接"承诺。

补一个 §20.9 第六件遗漏的源码位置:PoisonPillconnect/mod.rs:113-125——impl PoisonPill 一共 12 行,对外暴露 pill()poisoned() 两个方法。中间件捕获到业务级异常后调 conn_info.poison(),下次这条连接进 pool 时被 Pooled::drop 检测到 poison 标记直接丢弃,不进 idle 桶。这条精细控制几乎所有 Rust HTTP 中间件(reqwest-middleware / tower-http retry policy)都用得上,但没几个人知道源码在哪——本节算把它点亮。

20.9 落到你键盘上

读了整章源码、事故、对比——真正走到你手边的是几个可操作的 checklist:

第一件——一定要显式配 connect_timeout

rust
use hyper_util::client::legacy::connect::HttpConnector;
use std::time::Duration;

let mut connector = HttpConnector::new();
connector.set_connect_timeout(Some(Duration::from_secs(3)));

3 秒只是建议值。对 intra-DC 调用可以缩到 500ms;对跨境 public API 可以放到 10 秒。但绝对不要用默认的 None

第二件——理解 pool_idle_timeout 是"idle 期多久清",不是"整条连接能活多久"

  • HTTP/1.1 跨境请求:建议 60~90 秒(默认 90 秒即可)。
  • HTTP/2 长跑业务:建议 None(或者很长,比如 24 小时)。连接本身由 PING / GOAWAY 管,不要让 idle timeout 提前干掉它。
  • DNS 切换频繁的场景:短到 30 秒或更短,接受复用率下降的代价。

第三件——理解 pool_max_idle_per_host 只管 idle 桶,不管并发

rust
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;

let client = Client::builder(TokioExecutor::new())
    .pool_max_idle_per_host(10)
    .pool_idle_timeout(Duration::from_secs(90))
    .build(connector);

并发控制请用 tower 的 ConcurrencyLimit(见本书第 5 章),不是 pool 参数。

第四件——不要在一个 pool 里混 HTTP/1 和 HTTP/2 Host。虽然 pool key 里有 scheme,但 api.a.comapi.b.com 一个走 h1 一个走 h2,配 pool_idle_timeout 很难两边都最优——不如用两个 Client 实例。

第五件——生产上养成用 HttpInfo 读 remote_addr 的习惯

rust
if let Some(info) = response.extensions().get::<HttpInfo>() {
    tracing::info!(remote = %info.remote_addr(), "response from");
}

这一行在"DNS 陈旧"事故里能救命——你会看到自己的请求打在哪台机器上,对比 DNS 应有的地址。

第六件——了解 PoisonPill 的存在。中间件检测到业务级异常(签名失败、数据格式错乱)时,别等 pool 自己清——立刻 conn_info.poison(),下次这条连接就会被主动丢弃。这是 hyper-util 给你的"精细控制"武器。

第七件——retry_canceled_requests = true(默认)对绝大多数场景是对的。除非你的业务强非幂等(比如金融交易的下单),否则别关它——它在你睡觉时为你挡住 keep-alive 关闭窗口的故障。

第八件——拿到一份真实流量,用 RUST_LOG=hyper_util=trace 跑一遍看日志。pool 的每一次 checkout、put、idle clear、connecting lock 都有 trace——你能直接看到复用率、idle 桶大小、是否频繁建连。这份日志是最可靠的 pool 行为观测,比任何 dashboard 都精确。


下一章(第 21 章 请求分发、取消与背压)我们走进 client.rssend_requesttry_send_request 路径——看请求从 Client::request 到真正写上 TCP 的每一步发生了什么、取消怎么传播、HTTP/2 的 flow-control 背压如何向上反馈到用户。本章讲"拿连接",下章讲"用连接"。合起来构成 hyper 异步 HTTP 客户端的完整心智模型

基于 VitePress 构建