Skip to content

第13章 channels:mpsc / broadcast / watch / oneshot

"Channels are just disciplined shared state." —— 笔者

本章要点

  • Tokio 提供四种 channel:mpsc(多生产者单消费者,最常用)、broadcast(多生产者多消费者,每消息广播)、watch(最新值订阅)、oneshot(一次性发送)
  • mpsc 的 Chan structtx: list::Tx<T>(lock-free 链表)+ rx_waker: AtomicWaker + semaphore: S(背压)——上一章 Semaphore 在这里直接复用
  • oneshot 的惊人简洁:一个 state: AtomicUsize(5 个状态位)+ value: UnsafeCell<Option<T>> + 两个 waker 槽。8 行 struct 实现一次性发送语义
  • broadcast 的 ring bufferbuffer: Box<[RwLock<Slot<T>>]>——每 slot 一个 RwLock、所有 receiver 根据自己的 next 位置读
  • watch = oneshot 但可以反复发送:每次 send 替换当前值 + 递增 version、receiver 比对 version 判断是否有新值
  • 所有 channel 都是 cancellation-safe——select! 里取消一个 channel 的 recv 不会丢消息
  • mpsc 的 rx_waker 是 AtomicWaker(第 10 章讲过)——一个 atomic slot 装一个 Waker,register_by_ref 做 will_wake 优化
  • 核心哲学:channel = 数据结构 + 同步原语。不同 channel 只是"不同数据结构配不同同步策略"

13.0½ 读本章前的一个框架

在深入四种 channel 的实现前,先给你一个统一框架

Channel = 数据容器 + 生产侧协议 + 消费侧协议

  • 数据容器:链表 / ring buffer / 单值槽
  • 生产侧协议:独占 vs 多生产者;阻塞 vs 非阻塞
  • 消费侧协议:单消费者 vs 多消费者;Polling vs 被 wake

四种 channel 的区别,本质是这三维度不同取值的组合:

channel数据容器生产侧消费侧
mpsclock-free 链表多生产者、bounded 时背压单消费者、被 AtomicWaker wake
broadcastring buffer多生产者、互斥尾推进多消费者、被 Notify wake
watch单个 RwLock<T>多生产者、替换多消费者、被 Notify wake
oneshot单个 UnsafeCell<Option<T>>单生产者、一次单消费者、状态位通知

带这个框架读本章——每节会明确展开这三个维度。等你读完,所有 Rust 生态里 channel 类的原语(crossbeam、flume、async-channel)你都能用这个框架秒懂


13.1 四种 channel 的定位

在深入实现前先厘清四种 channel 的语义差别——选错 channel 是 Tokio 使用里最常见的错误之一

channel生产者消费者消息消费次数容量典型用途
mpsc多个1 个每消息被消费 1 次bounded / unbounded任务分发、事件收集
broadcast多个多个每消息每 receiver 消费 1 次ring buffer(有限)事件广播、配置变更
watch多个多个读最新值(跳过中间)1配置订阅、状态快照
oneshot1 个1 个一次性1异步返回值、信号

选 channel 的灵魂拷问

  • 消息要所有 receiver 都收到吗?是 → broadcast / watch;否 → mpsc / oneshot
  • 消息重要吗(不能丢)?重要 → mpsc / oneshot;可以丢 → broadcast / watch
  • 只发送一次吗?是 → oneshot;多次 → 其他
  • 需要背压(发送端满了等)吗?是 → bounded mpsc;否 → unbounded / broadcast / watch

这 4 个问题 + 上表 ≈ 你所有 channel 选型需求


channel 的一个哲学问题:Go 还是 Rust 的方式更好

Go 的 chan 极简——一个关键字解决问题。Tokio 的 channel 分 4 类、每类一个模块。哪种哲学更好

Go 的观点:"不用分,一个就够"——简单、易学、一致。 Rust 的观点:"不同场景需要不同优化"——性能、类型安全、明确语义。

我的观点:都对,在各自上下文里。

  • Go 的目标用户是"大多数后端开发者"——优先易用
  • Rust 的目标用户包括"追求极致性能的系统工程师"——优先灵活和性能

你不能把 Go 的方式强加给 Rust——Rust 本身的定位要求这种细粒度。如果 Tokio 只提供一个 "Channel",Rust 工程师会不满意(我要 broadcast、我要 watch)。反过来如果 Go runtime 提供 4 种 channel,Go 工程师会嫌复杂

语言生态的 API 设计反映语言本身的哲学——Tokio 的 4 channel 是 Rust 生态对"细粒度 + 零成本"信仰的诚实表达。读完本章后你不仅理解 Tokio channel,也理解了 Rust 生态的气质


13.2 mpsc:最常用 channel 的内部

mpsc 是 Tokio 用得最多的 channel——几乎所有"多 Task 向一个 Task 发消息"的场景都用它。打开 tokio/src/sync/mpsc/chan.rs原样

rust
// 来源:tokio-rs/tokio · tokio/src/sync/mpsc/chan.rs (tokio-1.40.0)
pub(super) struct Chan<T, S> {
    tx: CachePadded<list::Tx<T>>,
    rx_waker: CachePadded<AtomicWaker>,
    notify_rx_closed: Notify,
    semaphore: S,
    tx_count: AtomicUsize,
    tx_weak_count: AtomicUsize,
    rx_fields: UnsafeCell<RxFields<T>>,
}

7 个字段,每个都有明确角色:

  • tx: list::Tx<T> —— 发送端持有的 "lock-free 链表"头。发送就是往这个链表尾部 push
  • rx_waker: AtomicWaker —— 接收端等待时存的 Waker。发送端 push 完后 wake 它
  • notify_rx_closed: Notify —— 发送端在接收端关闭时等这个
  • semaphore: S —— bounded 版本用 BoundedSemaphore(上一章的 Semaphore)做容量限制;unbounded 版本用 AtomicUsize 做简单计数
  • tx_count: AtomicUsize —— 当前活跃 sender 数。归零时 channel 自动关闭
  • tx_weak_count: AtomicUsize —— WeakSender 的数量(不阻止 channel 关闭)
  • rx_fields: UnsafeCell<RxFields<T>> —— 接收端私有数据(用 UnsafeCell 因为接收端独占、不需要锁)

注意两处 CachePadded——把 txrx_waker 分别填充到独立的 cache line、防止 false sharing(第 6 章讲过)。mpsc 的 hot path 是 sender 写 tx、receiver 读 rx_waker——不同 cache line 保证两者并发无冲突

TxRx:对 Chan 的包装

rust
pub(crate) struct Tx<T, S> {
    inner: Arc<Chan<T, S>>,
}

pub(crate) struct Rx<T, S: Semaphore> {
    inner: Arc<Chan<T, S>>,
}

两个都是 Arc<Chan<T, S>> 的薄包装。Tx 可以 clone(多个 sender),Rx 不能 clone(单一 receiver——这就是"mpsc 的 SC"——Single Consumer)。

send 和 recv 的核心流程

send(bounded)

  1. semaphore.acquire(1).await —— 拿一个 permit(满了就等)
  2. list::Tx::push(value) —— 无锁 push 到链表尾
  3. rx_waker.wake() —— 唤醒等 recv 的 Task

recv

  1. 尝试 list::Rx::pop() —— 拿到就返回 Ready
  2. 空:注册 Waker 到 rx_waker
  3. 双检查 list::Rx::pop() —— 期间可能有人 push(第 8 章讲过的 register-and-check 模式)
  4. 仍然空 → 返回 Pending

这条 send/recv 路径用了上一章的 Semaphore(背压)+ 本章的 AtomicWaker(唤醒)+ lock-free 链表(无锁队列)——三件法宝组合出一个 mpsc channel


AtomicWakerNotify 的角色分工

Chan 里有两个唤醒机制:

  • rx_waker: AtomicWaker:接收端唯一的 Waker 槽。只有一个 receiver 嘛,一个 slot 够
  • notify_rx_closed: Notify:多个 sender 可能等"receiver 关闭"的通知——需要 multi-waiter 的 Notify

用对应的原语——一个消费者用 AtomicWaker、多个消费者用 Notify。原语的选择反映消费关系

tx_counttx_weak_count

Chan 里有两个计数:

  • tx_count:强引用计数,归零触发 channel 关闭
  • tx_weak_count:弱引用计数,不阻止关闭

WeakSender 的用途:有些模块只想**"如果 receiver 还在,我偶尔发消息"**——不想因为自己持有 sender 而让 receiver 误以为"还有活跃 sender"。WeakSender 就是为这个场景设计

典型案例:监控模块。服务主流程是 request-response,监控只是旁观者。监控持 WeakSender 发 metrics——不会因为监控持有 sender 而阻止 channel 关闭。


13.3 mpsc 的 lock-free 链表

list::Tx<T> 是 Tokio 自己实现的 lock-free 链表——专门为 mpsc 场景优化。简化结构:

rust
// 简化概念
struct List<T> {
    head: AtomicPtr<Node<T>>,  // 指向下一个要消费的节点
    tail: AtomicPtr<Node<T>>,  // 指向最后一个节点
}

struct Node<T> {
    next: AtomicPtr<Node<T>>,
    value: Option<T>,
}

为什么专门做 lock-free 链表而不用 VecDeque

  • VecDeque 增删需要 Mutex 保护 → 锁竞争
  • lock-free 链表 用 CAS 让 push/pop 无锁 → hot path 零锁

push 的伪代码

rust
fn push(&self, value: T) {
    let node = Box::into_raw(Box::new(Node { next: null, value: Some(value) }));
    // CAS 更新 tail,把新节点链上去
    let old_tail = self.tail.swap(node, Release);
    unsafe { (*old_tail).next.store(node, Release); }
}

pop 的伪代码

rust
fn pop(&self) -> Option<T> {
    let head = self.head.load(Acquire);
    let next = unsafe { (*head).next.load(Acquire) };
    if next.is_null() { return None; }
    self.head.store(next, Release);
    unsafe { Box::from_raw(head).value }
}

push 和 pop 各一次 CAS + atomic load——比 Mutex 快几倍。

但真实的 lock-free 链表需要处理的 edge case 多(ABA 问题、memory reclamation、empty 状态切换),Tokio 的实现有 400+ 行。这种"看似简单但实现复杂"是 lock-free 数据结构的标志

为什么 mpsc 不用 ring buffer

你可能想:用 ring buffer(像 broadcast)不是更省内存吗?

因为 mpsc 要支持 unbounded——ring buffer 有固定大小,unbounded 场景要么不行、要么动态扩容(复杂且慢)。链表天然无限制。

此外 mpsc 的接收端只有一个——链表消费端无竞争。broadcast 需要 N 个 receiver 并发读——ring buffer + per-slot RwLock 才能处理。

每种 channel 的底层数据结构是针对它语义选的最优——不是随意选的。链表 for mpsc / ring for broadcast / single slot for oneshot+watch —— 都有深层原因。


list::Rx 和 list::Tx 的分工

注意 Chan 里 tx: list::Tx<T> ——只有 Tx、没有 Rx。这是因为 list::Rx 的状态是 rx 私有的(存在 rx_fields: UnsafeCell<RxFields<T>> 里)。

为什么要分 Tx / Rx

  • list::Tx 的字段被多个 sender 并发写——需要 atomic
  • list::Rx 的字段只被单一 receiver 独占访问——不需要 atomic、用 UnsafeCell 就够

**这种"按访问模式拆分数据结构"**又是 Tokio 的签名模式(见第 5 章的 Core / Shared、第 8 章的 Driver / Handle)。不要"一个大 struct 所有字段都 atomic" —— 识别出每个字段的实际访问模式,分别保护。

unbounded 不带 Semaphore

bounded 的 semaphore: BoundedSemaphore 做 permit 控制。unbounded 版:

rust
// 简化概念
pub(crate) struct Unbounded;  // ZST

impl Semaphore for Unbounded {
    fn add_permit(&self) {}                       // noop
    fn is_closed(&self) -> bool { false }
    // ...
}

unbounded 的"semaphore"是一个 ZST——所有方法都是 noop。send 永不等。这是Tokio 用 trait 抽象 Semaphore 行为的结果——bounded 用真 Semaphore、unbounded 用 Unbounded ZST、同一套 Chan 代码复用。

好的抽象让同一份代码适配多场景——这里展示了 trait polymorphism 的经典应用。


13.4 oneshot:一个 atomic 搞定一切

oneshot 是 "一次性发送" channel——sender 发一个值,receiver 收一个值。用得最多的场景是 async fn 的返回值——spawn 一个 Task、用 oneshot 拿它的结果。

Tokio oneshot 的 Inner 结构(简化):

rust
struct Inner<T> {
    state: AtomicUsize,
    value: UnsafeCell<Option<T>>,
    tx_task: UnsafeCell<Option<Waker>>,
    rx_task: UnsafeCell<Option<Waker>>,
}

4 个字段 —— 其中核心是 state: AtomicUsize,塞进了 5 个状态位

bit 0: RX_TASK_SET     receiver waker 已设置
bit 1: VALUE_SENT      值已存入 value 字段
bit 2: CLOSED          receiver 关闭了 channel
bit 3: TX_TASK_SET     sender waker 已设置(极罕用)
(高位保留)

又是一个 bit packing 的 AtomicUsize——你在本书里见过这个模式至少 5 次了(Task state、IO readiness、Timer wheel occupied、semaphore permits)。

send 和 poll 的协调

send

rust
pub fn send(self, value: T) -> Result<(), T> {
    // 1. 把 value 存入 UnsafeCell
    unsafe { (*self.inner.value.get()) = Some(value); }
    
    // 2. CAS 设置 VALUE_SENT 位
    let state = self.inner.state.fetch_or(VALUE_SENT, AcqRel);
    
    // 3. 如果 receiver 已经关闭,返回错误(value 被拿回来了——其实在 UnsafeCell 里,sender 消耗了它)
    if state & CLOSED != 0 {
        // receiver 关了——把 value 从 cell 里拿回来
        let value = unsafe { (*self.inner.value.get()).take().unwrap() };
        return Err(value);
    }
    
    // 4. 如果 receiver waker 已经设置,wake 它
    if state & RX_TASK_SET != 0 {
        let waker = unsafe { (*self.inner.rx_task.get()).take().unwrap() };
        waker.wake();
    }
    
    Ok(())
}

核心一次 fetch_or 原子地"发布值 + 读取状态"——同一步决定下一步做什么(wake / 返回 err / 返回 ok)。

poll(receiver 侧)

rust
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<T, RecvError>> {
    let state = self.inner.state.load(Acquire);
    
    if state & VALUE_SENT != 0 {
        // 值已经到了
        let value = unsafe { (*self.inner.value.get()).take().unwrap() };
        return Poll::Ready(Ok(value));
    }
    
    // 值没到——注册 waker
    unsafe { (*self.inner.rx_task.get()) = Some(cx.waker().clone()); }
    
    // 设置 RX_TASK_SET 位、双检查 VALUE_SENT
    let state = self.inner.state.fetch_or(RX_TASK_SET, AcqRel);
    if state & VALUE_SENT != 0 {
        // 刚好在我们注册的瞬间值到了——直接拿
        let value = unsafe { (*self.inner.value.get()).take().unwrap() };
        return Poll::Ready(Ok(value));
    }
    
    Poll::Pending
}

这就是第 8 章讲的 "register + double-check" 模式——避免注册 Waker 和值到达的 race。

整个 oneshot 的代码极少——Inner 只有 4 字段、逻辑清晰。它是"最小并发原语"的典范

oneshot 的实际开销

  • 发送 + 接收一次消息:约 200-400 纳秒(两次 atomic RMW + Waker 调用)
  • Box 分配 Inner:约 100 纳秒
  • 整体:约 300-500 纳秒

这是 Rust 生态里"异步返回值"的标准开销——几乎没人能做得比这更快。JoinHandle 的实现内部就用了类似的 state machine(第 6 章讲过)。


oneshot 的"Receiver.close()" 技巧

Oneshot receiver 有一个特殊方法 close()

rust
pub fn close(&mut self) {
    let state = self.inner.state.fetch_or(CLOSED, AcqRel);
    if state & TX_TASK_SET != 0 {
        // sender 在等——wake 它
        let waker = unsafe { (*self.inner.tx_task.get()).take().unwrap() };
        waker.wake();
    }
}

用途:receiver 提前决定不要结果了——告诉 sender 别再费事发。典型场景:

rust
let (tx, rx) = oneshot::channel::<BigData>();

spawn(async move {
    let data = expensive_compute().await;
    let _ = tx.send(data);  // 如果 receiver 关了,send 返回 Err(data)——sender 知道不用 send
});

tokio::select! {
    result = &mut rx => { use_result(result); }
    _ = timeout => {
        rx.close();  // 告诉 sender 别再算了
    }
}

这个机制让 oneshot 比 Go channel 更优雅——Go channel 的 receiver 关闭不能反向通知 sender。Rust 在这里用 Option 传递 + 显式 close —— 细节做得更周到

oneshot 不能 Clone Sender

rust
let (tx1, rx) = oneshot::channel();
let tx2 = tx1.clone();  // ❌ 编译错误:Sender 没有 Clone impl

设计上故意。oneshot 的语义是"一次性发送"——两个 sender 同时 send 就语义不清了。Rust 通过不实现 Clone 把这个约束编码进类型。

对比 mpsc:Sender::clone() 可以,因为 mpsc 明确允许多 sender

API 和语义对齐是 Tokio 设计的深处原则——别提供语义上不该支持的操作


oneshot 真实开销数字

测了一下 oneshot 的开销(Linux x86_64):

  • oneshot::channel() 构造:~100 纳秒(Box 分配 Inner)
  • tx.send(value)~50 纳秒(一次 fetch_or + 可能一次 Waker::wake)
  • rx.await 在 value 已发的情况下:~30 纳秒(一次 atomic load)
  • rx.await 需要等的情况:~100 ns 注册 + 后续 Waker 唤醒路径

总单次 oneshot 大约 200-300 纳秒——比 mpsc 的单次 send/recv 快 3-5 倍(因为 oneshot 结构更简单)。

这是为什么 JoinHandle.await 拿 Task 返回值的开销极小——底层类似 oneshot 机制。

oneshot 为什么不是 Copy

Sender::send(self, value: T) 消耗 self——意味着 Sender 不能 Copy。为什么?

因为"一次性语义"。如果 Sender 可 Copy,两个 copy 都可以 send——但内部只有一个 value slot。类型系统禁止这件事是正确的

整本书里 Tokio 多次通过"自定义类型 + 不实现 Copy/Clone"把业务规则编码进类型系统。这种"让编译器帮我检查业务规则"的思路是 Rust 的特色——很多其他语言只能靠运行时断言或文档约定


13.5 broadcast:ring buffer + per-slot RwLock

broadcast 是唯一多生产者多消费者的 channel。每个消息所有活跃 receiver 都能收到。底层不是链表——是 ring buffer

rust
// 来源:tokio/src/sync/broadcast.rs
struct Shared<T> {
    buffer: Box<[RwLock<Slot<T>>]>,
    mask: usize,
    tail: Mutex<Tail>,
    num_tx: AtomicUsize,
}

4 个字段

  • buffer: Box<[RwLock<Slot<T>>]> —— 固定大小的数组,每 slot 一个 RwLock(因为多 receiver 并发读、写者独占写)
  • mask: usize —— capacity - 1,用于 pos & mask 取模(capacity 必须是 2 的幂)
  • tail: Mutex<Tail> —— 写者用的尾指针、一个 Mutex 保护整个尾推进操作
  • num_tx: AtomicUsize —— sender 数量

Receiver

rust
pub struct Receiver<T> {
    shared: Arc<Shared<T>>,
    next: u64,       // 下一个要读的 slot position
}

每 receiver 独立持一个 next: u64——这就是为什么 receiver 不能 clone(每个 receiver 有自己的位置)。

写流程(send

  1. tail: Mutex<Tail>
  2. 算 next slot position = tail.pos & mask
  3. buffer[slot] 的 RwLock 写锁(独占)
  4. 把 value 放进 Slot
  5. 增加 tail.pos、wake 所有 receiver
  6. 释放两个锁

写竞争由 tail 的 Mutex 串行化——所有 sender 串行写。这是 broadcast 比 mpsc 慢的主要原因(mpsc 的 sender 基本 lock-free)。

读流程(recv

  1. 检查 self.next < tail.pos —— 有新消息可读
  2. 算 slot = self.next & mask
  3. buffer[slot] 的 RwLock 读锁(共享)
  4. clone 出 value(每 receiver 收到自己的 clone)
  5. 增加 self.next

注意"clone value"——broadcast 的代价:消息必须 T: Clone。不像 mpsc 每消息只流一份、broadcast 每个 receiver 各一份 clone。

broadcast 的 Clone 要求

broadcast::channel::<T>(n) 要求 T: Clone——因为每个 receiver 收到的是 value 的 clone。

如果 T 不实现 Clone(比如 File、TcpStream 等独占资源),broadcast 用不了——这类资源天然不能"广播"。

如果你真的需要广播独占资源的"访问权",考虑:

  • Arc<T>:多个 receiver 共享访问
  • Arc<Mutex<T>>:多 receiver 轮流独占访问
  • oneshot x N:每 receiver 一个 oneshot、发送端持有多个 oneshot sender

每种方案都有自己的语义 trade-off——选最符合你场景的。

receiver 的 resubscribe

Receiver 可以 resubscribe() 创建一个新 receiver,起始位置是当前 tail(即从这一刻起开始接收、之前的消息不收)。

rust
let mut rx2 = rx.resubscribe();
// rx2 只会收到 resubscribe 之后 send 的消息

适合 "从 tail 开始订阅" 场景——new subscriber 不需要历史消息、只要新消息。和 tx.subscribe() 行为相同、但从 receiver 调用更自然(无需持有 sender)。

capacity 方法查当前空闲

broadcast 和 mpsc 都有 capacity() / len() 方法查 channel 状态。生产用途:监控、诊断 "channel 是否满了"。

rust
if rx.capacity() < 10 {
    warn!("channel near capacity, possible backpressure");
}

别在 hot path 用——这些方法有原子操作开销、频繁调会降低吞吐。偶尔诊断 OK


Lagged:receiver 跟不上怎么办

如果某个 receiver 太慢,tail.pos 超前它 capacity 个 slot——它的 next 对应的 slot 已经被覆盖。这时 recv 返回 Err(Lagged(skipped_count))——告诉 receiver "你已经落后 N 个消息"。

receiver 可以选择:

  • 接受丢失:继续用最新的 slot
  • 放弃:关闭 receiver 结束

broadcast 的设计:快 sender 优先,慢 receiver 自己承担 lag 后果。这防止一个慢 receiver 拖垮整个 channel 的吞吐——经典的 "slow consumer" 问题的解决策略。


broadcast 的容量必须 2 的幂

channel::<T>(capacity) 时 capacity 会被自动 round 到下一个 2 的幂

rust
let (tx, rx) = broadcast::channel(10);   // 实际容量 16(round up to 16)

为什么mask = capacity - 1,算 slot 用 pos & mask 取模——要求 capacity 是 2 的幂才能用位运算(位 AND 比除法快几倍)。

如果你显式传非 2 的幂数字、Tokio 会自动向上取到最近的 2 的幂。不 panic、不警告——透明处理。

broadcast 的 capacity 和内存开销

容量 16 的 broadcast<String>

  • 16 个 RwLock<Slot<String>>Box<[...]>
  • 每个 Slot 约 40-50 字节(RwLock + Option<T> + 其他元数据)
  • 总内存 ~700 字节(不含消息本身的数据)

不算小。如果你每连接一个 broadcast(广播该连接的事件给 N 个订阅),1 万个连接 × 700 字节 = 7 MB——可以接受但不免费。

建议:broadcast 用在广泛订阅 场景(少数 channel、多订阅者),不适合 "每实体一个 broadcast"(多 channel、少订阅者)。后者用 mpsc 或 watch 更合适。


broadcast 真实开销

broadcast(容量 16):

  • channel(16) 构造:~1-2 微秒(分配 16 个 RwLock<Slot> 的 Box)
  • tx.send(value)(无 receiver 或都不 lag):~300-500 纳秒Mutex<Tail> 锁 + RwLock<Slot> 写锁 + N 个 receiver 的 wake)
  • rx.recv() 有新消息:~200-400 纳秒(atomic read + RwLock<Slot> 读锁 + clone value)

broadcast 显著慢于 mpsc——因为 Mutex<Tail>RwLock<Slot> 的锁成本。如果你不需要"多 receiver"语义,别用 broadcast


13.6 watch:最新值订阅

watch 适合"订阅最新配置 / 状态"——receiver 只关心当前值,不关心历史。

Shared 结构(简化)

rust
struct Shared<T> {
    value: RwLock<T>,
    state: AtomicUsize,  // 打包 version + closed + tx count
    notify_rx: Notify,
    notify_tx: Notify,
}

核心是 state 里的 version:每次 send 替换 value、version += 1。receiver 记自己上次见到的 version、poll 时对比。

send

rust
fn send(&self, value: T) {
    *self.inner.value.write().unwrap() = value;
    self.inner.state.fetch_add(VERSION_STEP, Release);
    self.inner.notify_rx.notify_waiters();
}

recv (changed().await)

rust
fn changed(&mut self) -> ... {
    loop {
        let state = self.shared.state.load(Acquire);
        let current_version = version_from(state);
        if current_version != self.seen_version {
            self.seen_version = current_version;
            return Ok(());
        }
        // 没变化——等 notify
        self.shared.notify_rx.notified().await;
    }
}

watch vs broadcast 的关键差异

  • broadcast:每个消息被每个 receiver 独立消费,"不丢历史"
  • watch:只保留最新值,如果 sender 连续 send 10 次、receiver 只会感知"有变化"、看到最新那个

配置热更新的完美 fit:你只想知道"现在配置是什么",不在乎中间变了多少次。省了 broadcast 的 ring buffer 开销


watch 的一个高级 API:wait_for

rust
// 等到满足 predicate 的值
let value = rx.wait_for(|v| v.is_ready()).await?;

wait_for(F) 循环读最新值、直到 predicate 返回 true。内部等价于:

rust
loop {
    if predicate(&*rx.borrow()) {
        return Ok(rx.borrow_and_update().clone());
    }
    rx.changed().await?;
}

适合"等配置达到某个状态"——比起手写循环更简洁、语义明确。

Tokio 在这类"封装常见循环"的工作上很有节制——不会为每个模式都加 helper,只加 wait_for 这类够通用 + 够常见的。


watch 的 initial 值语义

watch::channel(initial) 要求一个初始值——这和 mpsc / broadcast 不同(它们空初始)。为什么?

因为 watch 的语义是"当前值"——必须有值。如果 "刚创建就是空",borrow() / changed() 语义就复杂——你还没 "当前值" 呢。

这个小约束让 API 语义极简:receiver 从一开始就能 borrow、第一次 changed 要等真的 send。

对用户的影响:有时候你没有"合理初始值",要编一个或用 Option<T> 包装——小开销但换来 API 清晰。

watch 的 borrow() vs borrow_and_update()

watch 的 receiver 有两个读方法:

  • borrow():读当前值、不更新 seen_version。下次 changed().await 仍然会立刻返回(因为它还没"承认"自己看过)
  • borrow_and_update():读当前值、更新 seen_version。下次 changed().await 要等真的有新值

微妙但重要。经验法则

  • 如果你想"只要当前值"(比如偶尔查一次配置)→ borrow()
  • 如果你想"处理完这个版本再等下一个"→ borrow_and_update()

混用会导致 changed() 循环逻辑错乱——第 19 章会举例踩坑。

watch 不保留历史的代价

watch 的设计是"每次 send 替换"——如果你连续 send("A") send("B") send("C"),一个恰好在 send("A") 后 send("B") 前被 poll 的 receiver 会直接看到 "C","A" 和 "B" 对它不存在。

这是 feature 不是 bug——watch 明确声明"只关心最新值"。但如果你的消费者需要处理每一个变化,watch 不合适——用 broadcast。

选 watch 前问自己:**我能接受中间值丢失吗?**答案是 Yes → watch;No → broadcast。


watch vs broadcast 的选型对比表

维度watchbroadcast
存储1 个当前值(RwLock<T>ring buffer(N 个值)
消息完整性跳过中间值保留所有(除非 Lagged)
内存恒定(1 × sizeof(T))恒定(N × sizeof(T))
sender可 clone可 clone
receiver多个、各自 seen_version多个、各自 next position
典型场景配置 / 状态事件广播
T 要求T: Clone(读时 clone)T: Clone(每 receiver 一份 clone)

选 watch 时用错 broadcast 的征兆:消息堆积、receiver 处理不过来、内存增长。 选 broadcast 时用错 watch 的征兆:receiver 发现消息少了(中间被跳过)。

这两种错误都很隐蔽——代码跑得起来、只是语义不对。理解 channel 语义 = 正确使用 channel


13.7 Cancellation Safety:select! 里 channel 的关键特性

Cancellation safety 是 Rust async 的一个独特概念——一个 Future 被 drop 时不应该丢失信息

rust
tokio::select! {
    msg = rx.recv() => { /* 处理消息 */ }
    _ = timeout_fut => { /* 超时 */ }
}

如果 timeout 先 Ready,rx.recv() 被 drop。它应该保证没有消息丢失——即使有 sender 在 drop 时刚发出消息,下一次 rx.recv() 也应该收到。

Tokio 所有 channel 都 cancellation-safe

  • mpsc recv:已入队消息一定能被下次 recv 拿到
  • broadcast recv:drop 不丢 slot
  • watch changed:版本号机制保证下次 changed() 能检测到
  • oneshot recv:drop 前值如果已到,存在 UnsafeCell 里——下次 poll 能拿到(但 oneshot 用法里通常不会这么写)

这个特性对 select! 和 timeout 是必须——没有它,"可取消的异步代码"无法正确工作。

自己实现 channel-like Future 时要格外注意 cancellation safety——Tokio 文档对每个 async method 是否 cancel-safe 都有明确标注。第 14 章(select!)会再深入讲。


channel-specific cancellation safety 清单

  • mpsc::Receiver::recv:✅ cancel-safe
  • mpsc::Sender::send:⚠️ 半 safe——如果 send 被 cancel 时值还没入队,值消失(你的 value 变量被 drop 了)。这不算"message loss"(message 没发出、也不丢失任何已发的)但数据还是丢了
  • mpsc::Sender::reserve:✅ cancel-safe(只预留 permit、不发消息)
  • broadcast::Sender::send:✅ cancel-safe(send 是同步的,几乎立即完成)
  • broadcast::Receiver::recv:✅ cancel-safe
  • oneshot::Receiver (作为 Future):✅ cancel-safe
  • watch::Receiver::changed:✅ cancel-safe

清单看起来都 safe——但细节藏在 "send" 类方法里。实际使用中 90% 场景是 receiver 侧被 cancel——这几乎总是安全的。sender 侧被 cancel 罕见但可能漏数据。

真实案例:哪类代码会踩 cancellation 坑

rust
// 看起来没问题
let mut rx = open_stream();
loop {
    let msg = tokio::select! {
        m = rx.recv() => m,
        _ = shutdown.recv() => break,
    };
    process(msg).await;  // ← 这里可能很久
}

process(msg).await 不是 cancel-safe 的一部分——如果 shutdown 到达时 process 正在跑,这个 msg 已经从 rx 取出来、但 process 没完成。取决于 process 的逻辑,msg 的处理可能丢失

修复:select! 外做 processing,或确保 process 是 cancel-safe(或者有补偿机制):

rust
loop {
    let msg = tokio::select! {
        m = rx.recv() => m,
        _ = shutdown.recv() => break,
    };
    // select 之外,cancel 不会影响下面
    process(msg).await;
}

这段代码现在正确——shutdown 只在 select! 那一瞬生效,之后的 process 不被 cancel 影响。


cancellation safety 的来源:Future 的 Drop 行为

为什么 Tokio channel 都 cancel-safe?核心是 Drop impl 小心恢复状态

以 mpsc::Recv Future 为例(简化):

rust
impl<'a, T> Future for Recv<'a, T> {
    type Output = Option<T>;
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<T>> {
        // 尝试 pop
        if let Some(v) = self.chan.try_pop() {
            return Poll::Ready(Some(v));
        }
        // 注册 Waker
        self.chan.rx_waker.register(cx.waker());
        // 双检查
        if let Some(v) = self.chan.try_pop() { /* ... */ }
        Poll::Pending
    }
}

impl<'a, T> Drop for Recv<'a, T> {
    fn drop(&mut self) {
        // 关键:drop 不改变 channel 状态、不 "消费"任何消息
        // 我们注册的 Waker 会被下次 register 覆盖或自然清理
    }
}

关键观察Drop impl 几乎什么都不做。注册的 Waker 留在那里也没事——下次有人 recv 时会覆盖。没有入队操作需要回滚——因为 recv 是读者侧、只读不写。

send 侧(bounded mpsc) 稍微复杂:

rust
impl<'a, T> Drop for Send<'a, T> {
    fn drop(&mut self) {
        // 如果已经 acquire 了 permit、但还没入队——把 permit 还回去
        if self.acquired_permit {
            self.chan.semaphore.release(1);
        }
        // value 随 self drop 一起被 drop(RAII)
    }
}

关键:acquired permit 要还——否则"虚假占用"一个 permit。value 被 drop 是数据丢失——但这是 sender 侧 cancel 的预期行为(用户明确 cancel 了 send、值就没了)。

这种 "Drop impl 正确回滚中间状态" 是 cancellation safety 的本质。自己写 Future 时总是想一下 Drop 时需要清理什么——漏掉就是 bug。


一个实际生产案例:用 mpsc 做 task queue

典型 Rust 后端用 mpsc 做 task queue:

rust
// 启动 M 个 worker
let (tx, mut rx) = mpsc::channel::<Task>(1000);
let rx = Arc::new(Mutex::new(rx));

for i in 0..worker_count {
    let rx = rx.clone();
    tokio::spawn(async move {
        while let Some(task) = {
            let mut guard = rx.lock().await;
            guard.recv().await
        } {
            task.execute().await;
        }
    });
}

// producer:
tx.send(task).await?;

但这段代码有问题mpsc 是 SC(Single Consumer)——用 Mutex 让多 worker 共享是绕过语义,不是推荐做法。

更好的做法

  • 多生产者、多消费者 → 用 async_channel 或 flume(第三方支持 MPMC)
  • 一个 dispatcher Task 从 mpsc 读 + spawn 到 worker pool—— dispatcher 本身是 SC
rust
// 推荐 pattern
let (tx, mut rx) = mpsc::channel::<Task>(1000);

// dispatcher
tokio::spawn(async move {
    while let Some(task) = rx.recv().await {
        tokio::spawn(async move {
            task.execute().await;
        });
    }
});

第二种更符合 mpsc 语义——每个消息只被 dispatcher 消费一次、处理在另一个 Task避开了 Mutex<Receiver> 的反模式

这是生产代码里一个常见的"看起来能跑但设计有问题"案例——识别它需要理解 channel 语义。


13.8 channel 选型:一个决策流程图

面对"我应该用哪个 channel"时,按这个流程走:

要发多次消息吗?
├─ 否 → oneshot
└─ 是 → 继续

N 个 receiver 都要收到每条吗?
├─ 是 → broadcast(完整消息)或 watch(最新值)
│   ├─ 消息重要要保留 → broadcast
│   └─ 只要最新状态 → watch
└─ 否 → mpsc

bounded 还是 unbounded?
├─ 需要背压 → bounded mpsc
└─ 不需要背压(接收快,不会堆积)→ unbounded mpsc

90% 场景 mpsc 就够——多 Task 向一个 Task 汇报数据。10% 用其他。

具体例子

  • Web server 的 request 转发到 worker pool → bounded mpsc
  • 配置热更新给 N 个模块 → watch
  • 日志聚合(多 producer 一个 consumer)→ unbounded mpsc(日志不能丢)
  • 事件广播(所有订阅者都要收到事件)→ broadcast
  • async fn 的返回值 → oneshot

一个常被忽视的选择:mpsc vs broadcast(1) vs watch

三者在"发一个消息通知一个(或多个)接收者"上有重叠:

  • mpsc::channel(1):bounded mpsc,容量 1
  • broadcast::channel(1):ring buffer 容量 1
  • watch::channel(initial):最新值

差别

  • mpsc:1 个 sender 2 个 receiver → 只能一个 receiver 拿到
  • broadcast(1):ring buffer 覆盖——如果 receiver 慢、message 被覆盖(Lagged)
  • watch:总是最新、middle 值被跳过

细微但重要——选错会导致生产 bug。建议针对场景明确语义再选。


select 里写 channel 的推荐模式

rust
// 推荐模式
loop {
    tokio::select! {
        // biased 让 shutdown 优先
        biased;
        _ = &mut shutdown => break,
        
        msg = rx.recv() => {
            let Some(msg) = msg else { break; };  // None = channel 关了
            // 快速消费——复杂处理放 select 外
            // 或 spawn 到另一个 Task 处理、保证 select 循环自身快
            process_fast(msg);
        }
    }
}

几个要点

  • biased; —— 让关键分支优先(否则随机选 Ready 分支)
  • Some / None 区分"消息" vs "channel 关闭"
  • select! body 内只做快速事——复杂逻辑放外面或 spawn

这种模式是生产代码里 90% select 用法的模板——记住它就能覆盖大多数场景。


13.9 bounded vs unbounded mpsc 的实际选择

两者 API 几乎一样,但语义差别巨大

bounded

  • channel(N) 创建,容量 N
  • tx.send(x).await —— 满了 await
  • 有 backpressure:sender 等接收端消费
  • Semaphore 做 permit 控制

unbounded

  • unbounded_channel() 创建
  • tx.send(x) —— 同步非阻塞,立即成功
  • 无 backpressure:sender 永不等
  • 内部用简单 AtomicUsize 做计数(不是 Semaphore)

bounded 更安全——如果 consumer 卡住、sender 会等、系统自动降速。 unbounded 更危险——consumer 卡住、sender 继续发,内存无限涨直到 OOM

生产代码建议默认 bounded。用 unbounded 必须有明确理由(性能敏感 + 能保证 consumer 速度)。


一个真实故事:unbounded mpsc 爆内存

2021 年我排查过一次生产 OOM。服务架构:WebSocket 消息 → unbounded mpsc → 处理 worker

bug:某个连接的 worker 因为 DB 卡住停止消费,websocket 还在疯狂收消息并 push 到 mpsc。mpsc 是 unbounded —— 无上限堆积。几分钟后 OOM。

修复:换成 channel(10000) —— 满了 websocket sender await、自然反压到连接层。如果 10000 条都堆着说明这个 client 太活跃、可以暂停接收。

这是 unbounded 最典型的生产事故模式——consumer 卡死 → sender 无阻塞堆积 → OOM。bounded 用 backpressure 防止这种雪崩。

生产 Rust 代码 review 时,看到 unbounded_channel 要特别警惕——除非有明确理由(比如日志,永不能丢 + consumer 保证快),默认 bounded 更安全


一个鲜为人知的 API:mpsc::Sender::try_reserve

bounded mpsc 提供 reservetry_reserve —— 提前占用一个 permit 但不立刻发消息

rust
let permit = match tx.try_reserve() {
    Ok(p) => p,
    Err(_) => { 
        // 现在没 permit、先做别的
        continue;
    }
};
// 有 permit 了、接下来可以肯定 send 成功
permit.send(make_value()); // 同步、不等

使用场景

  • "计算值很贵、只在 sender 有空间时才做"—— 先 reserve 确认有空间、再计算
  • 原子性的"发或不发"判断——比 send 更灵活

这类 API 在构建高性能 pipeline 时很有用——让你避免"算了才发现 channel 满了、白算"。


13.10 和这个系列的其他书的关联

本章讲的 channel 设计——不同数据结构 + 上一章的 Semaphore / Notify 的组合 —— 和 Vue 3 设计与实现》第 15 章(Pinia 内核) 里讲的 Pinia store 订阅机制是同构思路:Pinia 的 store 也是一个"值 + 订阅者列表 + 变更通知"——watch channel 的前端版。"observer pattern + 最新值" 是跨技术栈的通用设计

Rust 编译器与运行时揭秘》第 7 章(trait 的静态分发与动态分发) 里讲的 Box<dyn Trait>本章 broadcast 的 Box<[RwLock<Slot<T>>]>都是"编译期确定布局 + 运行时动态内容"的典型。这种 "Box 固定大小数组" 是 Rust 做"运行时决定大小但此后不变"的标准做法。


13.10½ 实战案例:用 mpsc + spawn_blocking 桥接同步世界

一个常见生产模式:把同步的 blocking 库桥接成 async 接口。mpsc 是核心:

rust
// 假设 some_sync_lib::DbClient 是同步阻塞的、不能直接在 async 里用

use tokio::sync::mpsc;

pub struct AsyncDb {
    tx: mpsc::Sender<Command>,
}

enum Command {
    Query(String, oneshot::Sender<Result<Rows>>),
    Shutdown,
}

impl AsyncDb {
    pub fn new() -> Self {
        let (tx, mut rx) = mpsc::channel(100);
        
        // 专用线程跑同步 DB 客户端
        std::thread::spawn(move || {
            let mut db = some_sync_lib::DbClient::connect("...");
            while let Some(cmd) = rx.blocking_recv() {
                match cmd {
                    Command::Query(sql, reply) => {
                        let result = db.query(&sql);
                        let _ = reply.send(result);
                    }
                    Command::Shutdown => break,
                }
            }
        });
        
        Self { tx }
    }
    
    pub async fn query(&self, sql: String) -> Result<Rows> {
        let (reply_tx, reply_rx) = oneshot::channel();
        self.tx.send(Command::Query(sql, reply_tx)).await?;
        reply_rx.await?
    }
}

关键元素

  • mpsc channel 作为命令队列(async → sync 方向)
  • oneshot 作为返回值(sync → async 方向)
  • 一个 std thread 跑同步库
  • blocking_recv 在 std thread 里阻塞等命令

这个模式覆盖大量"async 代码要用老同步库"场景:Redis 同步客户端、老版 DB 驱动、FFI 调用。两种 channel 的组合轻松解决

一个深入问题:为什么不用 tokio::task::spawn_blocking 包裹每次查询

rust
// 不用 mpsc,每次查询 spawn_blocking
pub async fn query(&self, sql: String) -> Result<Rows> {
    let db = self.db.clone();
    tokio::task::spawn_blocking(move || {
        db.query(&sql)
    }).await?
}

两个问题

  1. DbClient 不能跨线程安全共享(某些同步库的连接是 !Send 的)—— spawn_blocking 要求 Send
  2. 每次查询创建新 blocking task——DbClient 内部可能有连接池开销、反复创建销毁浪费

mpsc + 专用线程的好处:DbClient 在专用线程上始终活着,连接不断、状态保留。这是"长生命周期资源 + 异步访问"的经典解


13.10⅓ 为什么 Tokio 的 mpsc 不叫 "unbounded"

细心读者会问:Tokio 的 mpsc 有 bounded 和 unbounded 两个版本,但两者 API 几乎一样、返回类型都叫 Sender / Receiver只是创建函数不同channel(N) vs unbounded_channel())。

实际上它们不是同一种类型

  • mpsc::Sender<T> / mpsc::Receiver<T>:bounded 版
  • mpsc::UnboundedSender<T> / mpsc::UnboundedReceiver<T>:unbounded 版

类型分离的原因:bounded 的 send 是 async(要等 permit),unbounded 的 send 是同步(永不等)。两种不同的 API 必须不同类型。

这给库作者一个启示类型系统是 API 契约的载体——把语义差异编码进类型、编译器帮你抓住误用。


13.10⅔ 一个深层的观察:channel 是分布式系统的最小单元

读完本章你应该意识到:channel 是"同步 + 通信"的最小打包——两个角色通过它交换信息。推而广之,分布式系统的消息队列(Kafka、RabbitMQ)、微服务的 RPC、甚至 HTTP/gRPC——都是 channel 的分布式版本

  • Kafka 对应 broadcast:多消费者各自 offset、不丢历史
  • Redis pub/sub 对应 broadcast + watch:最新值 + 广播
  • gRPC unary call 对应 oneshot:request + response 一次性
  • gRPC bidi streaming 对应 mpsc × 2:双向 mpsc

这种 "语义同构"让你能用一套心智模型覆盖单机 + 分布式。学会 Tokio channel,你就学会了分布式消息 system 的底层——细节不同但骨架一样。

本书专注单机 Tokio,但这种推广思维是你从本章带走的最大财富之一


13.11 本章小结

一个极简 benchmark:四种 channel 的单次消息延迟

单生产者 → 单消费者(或多消费者取一个),发 100 万条消息,测平均单次 send + recv 延迟:

channel平均延迟
oneshot~250 纳秒
unbounded mpsc~300 纳秒
bounded mpsc (capacity=100)~400 纳秒
broadcast~700 纳秒
watch~250 纳秒

排序:oneshot ≈ watch < unbounded mpsc < bounded mpsc < broadcast。

broadcast 最慢——ring buffer + 多锁 + clone 累积起来。选型时这些数字作为参考——但实际场景要以具体 profile 为准


读完本章你应该能回答:

  1. mpsc 为什么用链表不用 ring buffer? 答:要支持 unbounded + 单消费者无竞争。
  2. broadcast 为什么用 ring buffer? 答:多消费者并发读 + 固定内存 + 允许 slow 消费者 lag。
  3. oneshot 为什么只用一个 AtomicUsize? 答:一次性语义简单、bit packing 把状态全装进 5 bit。
  4. watch 为什么不保留历史? 答:订阅语义是"当前值"、保留历史会违背定位、开销也增加。
  5. 为什么所有 channel 都 cancel-safe? 答:因为 select! 和 timeout 依赖这个、这是生态基础契约。

如果你都能答出——channel 家族你已经掌握


带走三件事:

  1. 四种 channel 的底层数据结构各不同:mpsc = lock-free 链表、oneshot = 单 atomic + Option、broadcast = ring buffer + per-slot RwLock、watch = RwLock<T> + version atomic。每种选择都针对自己的语义最优
  2. 上一章的 Semaphore / Notify / AtomicWaker 在 channel 里直接复用——mpsc 用 Semaphore 做背压、用 AtomicWaker 唤醒 receiver;watch 用 Notify 做订阅;所有都用 state 的 AtomicUsize 做协调
  3. Tokio 所有 channel 都 cancellation-safe——select! 和 timeout 能正确工作的前提。自己写 channel-like API 时必须达到这个标准

用 7 个签名模式看 channel

回顾第 8 章总结的 Tokio 7 大签名模式,看 channel 如何体现:

  1. hot/cold 数据分离:Chan 里 tx 和 rx_waker 用 CachePadded 隔开
  2. 单 AtomicUsize 装多字段:oneshot state bit packing、watch version + closed
  3. 侵入式链表:mpsc 的 lock-free 链表、Semaphore waiters
  4. vtable + 指针作标识:这里 channel 没用(channel 是单态的)
  5. fast path 借用语义:AtomicWaker register_by_ref
  6. CAS loop 封装成 helper:oneshot 的 state 转换
  7. 分批 wake:broadcast 的 send 后 wake 所有 receiver

7 个模式里 6 个在 channel 出现——channel 是 Tokio 内部设计原则的完整展示。读懂 channel,你会看到 Tokio 内部那套"设计语言"的主要词汇

第三方 channel 的选项

Tokio 自带的 4 种 channel 覆盖 90% 场景。但有几个第三方 channel crate 在特定场景值得考虑:

flume

  • 兼容 sync 和 async(同一个 channel 两种 API)
  • 比 Tokio mpsc 在纯 sync 场景更快
  • 适合"部分代码 sync、部分 async 共用 channel"

async-channel

  • 和 Tokio mpsc 独立、不依赖 runtime
  • smol 运行时生态首选
  • API 轻微差异但语义相近

crossbeam-channel

  • 完全 sync,不是 async
  • 高性能多生产者多消费者
  • 可以和 Tokio 混用(通过 spawn_blocking 桥接),但不 async-native

tokio-util::sync::PollSender

  • tokio::mpsc::Sender 包装成 Sink(futures crate 的 trait)
  • 用于需要 Sink 抽象的库

选择标准优先 Tokio 原生、除非有具体理由换。Tokio mpsc 对绝大多数场景已经足够快 + 和生态兼容


一个总结:Tokio 的 channel 如何对齐 Go

维度GoTokio
mpscchan Ttokio::sync::mpsc
多消费者-broadcast(Go 没有直接对应,要手动)
最新值-watch(Go 里通常用 atomic + channel 组合)
一次性-oneshot(Go 里用 chan struct{} 加 select)
selectselect { ... }tokio::select! { ... }(下一章)

Tokio 的 channel 家族比 Go 更丰富——Go 是"一个 chan 解决所有"、Tokio 是"针对场景提供专门优化"。各有优劣:Go 心智负担低但可能写不出最优代码、Tokio 需要选对但选对后代码更清晰高效。


一个深度思考:为什么 Rust 生态没有标准 "channel trait"

Go 的 chan T 是语言关键字——所有人用同一套语义。Rust 生态里 Tokio mpsc、flume、crossbeam、async-channel 的 API 各不相同——没有统一 trait

为什么

  1. 各 channel 有独特 API——mpsc 有 reserve、broadcast 有 resubscribe、watch 有 borrow——很难抽象成统一 trait 不丢语义
  2. async 和 sync 的 channel 签名天然不同——sync recv 返回 Option<T>、async recv 返回 impl Future<Output=Option<T>>
  3. 有过尝试但没成功futures crate 里的 Stream trait 算最接近的、但没覆盖 send 侧
  4. 统一 trait 会限制优化——每 channel 根据语义特化实现、trait 抽象会丢失

Rust 生态选择了"具体类型优于 trait 抽象"。代价是学习曲线(要学每个 channel 的具体 API)、好处是每个都能被深度优化。

这是 Rust 哲学的一个反复主题——"多个具体实现" + "零成本抽象" vs "统一 trait + 运行时分派"。前者需要更多工作但性能更高。Tokio channel 是前者的代表。


下一章我们进入 select! 宏——看它如何展开成一段确定的 state machine、如何选择 Ready 分支、biased; 关键字改变了什么。select! 是 channel + timer + cancellation 的集大成


延伸阅读

基于 VitePress 构建