Appearance
第13章 channels:mpsc / broadcast / watch / oneshot
"Channels are just disciplined shared state." —— 笔者
本章要点
- Tokio 提供四种 channel:mpsc(多生产者单消费者,最常用)、broadcast(多生产者多消费者,每消息广播)、watch(最新值订阅)、oneshot(一次性发送)
- mpsc 的 Chan struct:
tx: list::Tx<T>(lock-free 链表)+rx_waker: AtomicWaker+semaphore: S(背压)——上一章 Semaphore 在这里直接复用 - oneshot 的惊人简洁:一个
state: AtomicUsize(5 个状态位)+value: UnsafeCell<Option<T>>+ 两个 waker 槽。8 行 struct 实现一次性发送语义 - broadcast 的 ring buffer:
buffer: Box<[RwLock<Slot<T>>]>——每 slot 一个 RwLock、所有 receiver 根据自己的next位置读 - watch = oneshot 但可以反复发送:每次 send 替换当前值 + 递增 version、receiver 比对 version 判断是否有新值
- 所有 channel 都是 cancellation-safe——select! 里取消一个 channel 的 recv 不会丢消息
- mpsc 的 rx_waker 是 AtomicWaker(第 10 章讲过)——一个 atomic slot 装一个 Waker,
register_by_ref做 will_wake 优化 - 核心哲学:channel = 数据结构 + 同步原语。不同 channel 只是"不同数据结构配不同同步策略"
13.0½ 读本章前的一个框架
在深入四种 channel 的实现前,先给你一个统一框架:
Channel = 数据容器 + 生产侧协议 + 消费侧协议
- 数据容器:链表 / ring buffer / 单值槽
- 生产侧协议:独占 vs 多生产者;阻塞 vs 非阻塞
- 消费侧协议:单消费者 vs 多消费者;Polling vs 被 wake
四种 channel 的区别,本质是这三维度不同取值的组合:
| channel | 数据容器 | 生产侧 | 消费侧 |
|---|---|---|---|
| mpsc | lock-free 链表 | 多生产者、bounded 时背压 | 单消费者、被 AtomicWaker wake |
| broadcast | ring buffer | 多生产者、互斥尾推进 | 多消费者、被 Notify wake |
| watch | 单个 RwLock<T> | 多生产者、替换 | 多消费者、被 Notify wake |
| oneshot | 单个 UnsafeCell<Option<T>> | 单生产者、一次 | 单消费者、状态位通知 |
带这个框架读本章——每节会明确展开这三个维度。等你读完,所有 Rust 生态里 channel 类的原语(crossbeam、flume、async-channel)你都能用这个框架秒懂。
13.1 四种 channel 的定位
在深入实现前先厘清四种 channel 的语义差别——选错 channel 是 Tokio 使用里最常见的错误之一。
| channel | 生产者 | 消费者 | 消息消费次数 | 容量 | 典型用途 |
|---|---|---|---|---|---|
| mpsc | 多个 | 1 个 | 每消息被消费 1 次 | bounded / unbounded | 任务分发、事件收集 |
| broadcast | 多个 | 多个 | 每消息每 receiver 消费 1 次 | ring buffer(有限) | 事件广播、配置变更 |
| watch | 多个 | 多个 | 读最新值(跳过中间) | 1 | 配置订阅、状态快照 |
| oneshot | 1 个 | 1 个 | 一次性 | 1 | 异步返回值、信号 |
选 channel 的灵魂拷问:
- 消息要所有 receiver 都收到吗?是 → broadcast / watch;否 → mpsc / oneshot
- 消息重要吗(不能丢)?重要 → mpsc / oneshot;可以丢 → broadcast / watch
- 只发送一次吗?是 → oneshot;多次 → 其他
- 需要背压(发送端满了等)吗?是 → bounded mpsc;否 → unbounded / broadcast / watch
这 4 个问题 + 上表 ≈ 你所有 channel 选型需求。
channel 的一个哲学问题:Go 还是 Rust 的方式更好
Go 的 chan 极简——一个关键字解决问题。Tokio 的 channel 分 4 类、每类一个模块。哪种哲学更好?
Go 的观点:"不用分,一个就够"——简单、易学、一致。 Rust 的观点:"不同场景需要不同优化"——性能、类型安全、明确语义。
我的观点:都对,在各自上下文里。
- Go 的目标用户是"大多数后端开发者"——优先易用
- Rust 的目标用户包括"追求极致性能的系统工程师"——优先灵活和性能
你不能把 Go 的方式强加给 Rust——Rust 本身的定位要求这种细粒度。如果 Tokio 只提供一个 "Channel",Rust 工程师会不满意(我要 broadcast、我要 watch)。反过来如果 Go runtime 提供 4 种 channel,Go 工程师会嫌复杂。
语言生态的 API 设计反映语言本身的哲学——Tokio 的 4 channel 是 Rust 生态对"细粒度 + 零成本"信仰的诚实表达。读完本章后你不仅理解 Tokio channel,也理解了 Rust 生态的气质。
13.2 mpsc:最常用 channel 的内部
mpsc 是 Tokio 用得最多的 channel——几乎所有"多 Task 向一个 Task 发消息"的场景都用它。打开 tokio/src/sync/mpsc/chan.rs。原样:
rust
// 来源:tokio-rs/tokio · tokio/src/sync/mpsc/chan.rs (tokio-1.40.0)
pub(super) struct Chan<T, S> {
tx: CachePadded<list::Tx<T>>,
rx_waker: CachePadded<AtomicWaker>,
notify_rx_closed: Notify,
semaphore: S,
tx_count: AtomicUsize,
tx_weak_count: AtomicUsize,
rx_fields: UnsafeCell<RxFields<T>>,
}7 个字段,每个都有明确角色:
tx: list::Tx<T>—— 发送端持有的 "lock-free 链表"头。发送就是往这个链表尾部 pushrx_waker: AtomicWaker—— 接收端等待时存的 Waker。发送端 push 完后 wake 它notify_rx_closed: Notify—— 发送端在接收端关闭时等这个semaphore: S—— bounded 版本用BoundedSemaphore(上一章的 Semaphore)做容量限制;unbounded 版本用AtomicUsize做简单计数tx_count: AtomicUsize—— 当前活跃 sender 数。归零时 channel 自动关闭tx_weak_count: AtomicUsize—— WeakSender 的数量(不阻止 channel 关闭)rx_fields: UnsafeCell<RxFields<T>>—— 接收端私有数据(用 UnsafeCell 因为接收端独占、不需要锁)
注意两处 CachePadded——把 tx 和 rx_waker 分别填充到独立的 cache line、防止 false sharing(第 6 章讲过)。mpsc 的 hot path 是 sender 写 tx、receiver 读 rx_waker——不同 cache line 保证两者并发无冲突。
Tx 和 Rx:对 Chan 的包装
rust
pub(crate) struct Tx<T, S> {
inner: Arc<Chan<T, S>>,
}
pub(crate) struct Rx<T, S: Semaphore> {
inner: Arc<Chan<T, S>>,
}两个都是 Arc<Chan<T, S>> 的薄包装。Tx 可以 clone(多个 sender),Rx 不能 clone(单一 receiver——这就是"mpsc 的 SC"——Single Consumer)。
send 和 recv 的核心流程
send(bounded):
semaphore.acquire(1).await—— 拿一个 permit(满了就等)list::Tx::push(value)—— 无锁 push 到链表尾rx_waker.wake()—— 唤醒等 recv 的 Task
recv:
- 尝试
list::Rx::pop()—— 拿到就返回 Ready - 空:注册 Waker 到
rx_waker - 双检查
list::Rx::pop()—— 期间可能有人 push(第 8 章讲过的 register-and-check 模式) - 仍然空 → 返回 Pending
这条 send/recv 路径用了上一章的 Semaphore(背压)+ 本章的 AtomicWaker(唤醒)+ lock-free 链表(无锁队列)——三件法宝组合出一个 mpsc channel。
AtomicWaker 和 Notify 的角色分工
Chan 里有两个唤醒机制:
rx_waker: AtomicWaker:接收端唯一的 Waker 槽。只有一个 receiver 嘛,一个 slot 够notify_rx_closed: Notify:多个 sender 可能等"receiver 关闭"的通知——需要 multi-waiter 的 Notify
用对应的原语——一个消费者用 AtomicWaker、多个消费者用 Notify。原语的选择反映消费关系。
tx_count 和 tx_weak_count
Chan 里有两个计数:
tx_count:强引用计数,归零触发 channel 关闭tx_weak_count:弱引用计数,不阻止关闭
WeakSender 的用途:有些模块只想**"如果 receiver 还在,我偶尔发消息"**——不想因为自己持有 sender 而让 receiver 误以为"还有活跃 sender"。WeakSender 就是为这个场景设计。
典型案例:监控模块。服务主流程是 request-response,监控只是旁观者。监控持 WeakSender 发 metrics——不会因为监控持有 sender 而阻止 channel 关闭。
13.3 mpsc 的 lock-free 链表
list::Tx<T> 是 Tokio 自己实现的 lock-free 链表——专门为 mpsc 场景优化。简化结构:
rust
// 简化概念
struct List<T> {
head: AtomicPtr<Node<T>>, // 指向下一个要消费的节点
tail: AtomicPtr<Node<T>>, // 指向最后一个节点
}
struct Node<T> {
next: AtomicPtr<Node<T>>,
value: Option<T>,
}为什么专门做 lock-free 链表而不用 VecDeque?
- VecDeque 增删需要 Mutex 保护 → 锁竞争
- lock-free 链表 用 CAS 让 push/pop 无锁 → hot path 零锁
push 的伪代码:
rust
fn push(&self, value: T) {
let node = Box::into_raw(Box::new(Node { next: null, value: Some(value) }));
// CAS 更新 tail,把新节点链上去
let old_tail = self.tail.swap(node, Release);
unsafe { (*old_tail).next.store(node, Release); }
}pop 的伪代码:
rust
fn pop(&self) -> Option<T> {
let head = self.head.load(Acquire);
let next = unsafe { (*head).next.load(Acquire) };
if next.is_null() { return None; }
self.head.store(next, Release);
unsafe { Box::from_raw(head).value }
}push 和 pop 各一次 CAS + atomic load——比 Mutex 快几倍。
但真实的 lock-free 链表需要处理的 edge case 多(ABA 问题、memory reclamation、empty 状态切换),Tokio 的实现有 400+ 行。这种"看似简单但实现复杂"是 lock-free 数据结构的标志。
为什么 mpsc 不用 ring buffer
你可能想:用 ring buffer(像 broadcast)不是更省内存吗?
因为 mpsc 要支持 unbounded——ring buffer 有固定大小,unbounded 场景要么不行、要么动态扩容(复杂且慢)。链表天然无限制。
此外 mpsc 的接收端只有一个——链表消费端无竞争。broadcast 需要 N 个 receiver 并发读——ring buffer + per-slot RwLock 才能处理。
每种 channel 的底层数据结构是针对它语义选的最优——不是随意选的。链表 for mpsc / ring for broadcast / single slot for oneshot+watch —— 都有深层原因。
list::Rx 和 list::Tx 的分工
注意 Chan 里 tx: list::Tx<T> ——只有 Tx、没有 Rx。这是因为 list::Rx 的状态是 rx 私有的(存在 rx_fields: UnsafeCell<RxFields<T>> 里)。
为什么要分 Tx / Rx?
- list::Tx 的字段被多个 sender 并发写——需要 atomic
- list::Rx 的字段只被单一 receiver 独占访问——不需要 atomic、用 UnsafeCell 就够
**这种"按访问模式拆分数据结构"**又是 Tokio 的签名模式(见第 5 章的 Core / Shared、第 8 章的 Driver / Handle)。不要"一个大 struct 所有字段都 atomic" —— 识别出每个字段的实际访问模式,分别保护。
unbounded 不带 Semaphore
bounded 的 semaphore: BoundedSemaphore 做 permit 控制。unbounded 版:
rust
// 简化概念
pub(crate) struct Unbounded; // ZST
impl Semaphore for Unbounded {
fn add_permit(&self) {} // noop
fn is_closed(&self) -> bool { false }
// ...
}unbounded 的"semaphore"是一个 ZST——所有方法都是 noop。send 永不等。这是Tokio 用 trait 抽象 Semaphore 行为的结果——bounded 用真 Semaphore、unbounded 用 Unbounded ZST、同一套 Chan 代码复用。
好的抽象让同一份代码适配多场景——这里展示了 trait polymorphism 的经典应用。
13.4 oneshot:一个 atomic 搞定一切
oneshot 是 "一次性发送" channel——sender 发一个值,receiver 收一个值。用得最多的场景是 async fn 的返回值——spawn 一个 Task、用 oneshot 拿它的结果。
Tokio oneshot 的 Inner 结构(简化):
rust
struct Inner<T> {
state: AtomicUsize,
value: UnsafeCell<Option<T>>,
tx_task: UnsafeCell<Option<Waker>>,
rx_task: UnsafeCell<Option<Waker>>,
}4 个字段 —— 其中核心是 state: AtomicUsize,塞进了 5 个状态位:
bit 0: RX_TASK_SET receiver waker 已设置
bit 1: VALUE_SENT 值已存入 value 字段
bit 2: CLOSED receiver 关闭了 channel
bit 3: TX_TASK_SET sender waker 已设置(极罕用)
(高位保留)又是一个 bit packing 的 AtomicUsize——你在本书里见过这个模式至少 5 次了(Task state、IO readiness、Timer wheel occupied、semaphore permits)。
send 和 poll 的协调
send:
rust
pub fn send(self, value: T) -> Result<(), T> {
// 1. 把 value 存入 UnsafeCell
unsafe { (*self.inner.value.get()) = Some(value); }
// 2. CAS 设置 VALUE_SENT 位
let state = self.inner.state.fetch_or(VALUE_SENT, AcqRel);
// 3. 如果 receiver 已经关闭,返回错误(value 被拿回来了——其实在 UnsafeCell 里,sender 消耗了它)
if state & CLOSED != 0 {
// receiver 关了——把 value 从 cell 里拿回来
let value = unsafe { (*self.inner.value.get()).take().unwrap() };
return Err(value);
}
// 4. 如果 receiver waker 已经设置,wake 它
if state & RX_TASK_SET != 0 {
let waker = unsafe { (*self.inner.rx_task.get()).take().unwrap() };
waker.wake();
}
Ok(())
}核心:一次 fetch_or 原子地"发布值 + 读取状态"——同一步决定下一步做什么(wake / 返回 err / 返回 ok)。
poll(receiver 侧):
rust
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<T, RecvError>> {
let state = self.inner.state.load(Acquire);
if state & VALUE_SENT != 0 {
// 值已经到了
let value = unsafe { (*self.inner.value.get()).take().unwrap() };
return Poll::Ready(Ok(value));
}
// 值没到——注册 waker
unsafe { (*self.inner.rx_task.get()) = Some(cx.waker().clone()); }
// 设置 RX_TASK_SET 位、双检查 VALUE_SENT
let state = self.inner.state.fetch_or(RX_TASK_SET, AcqRel);
if state & VALUE_SENT != 0 {
// 刚好在我们注册的瞬间值到了——直接拿
let value = unsafe { (*self.inner.value.get()).take().unwrap() };
return Poll::Ready(Ok(value));
}
Poll::Pending
}这就是第 8 章讲的 "register + double-check" 模式——避免注册 Waker 和值到达的 race。
整个 oneshot 的代码极少——Inner 只有 4 字段、逻辑清晰。它是"最小并发原语"的典范。
oneshot 的实际开销
- 发送 + 接收一次消息:约 200-400 纳秒(两次 atomic RMW + Waker 调用)
- Box 分配 Inner:约 100 纳秒
- 整体:约 300-500 纳秒
这是 Rust 生态里"异步返回值"的标准开销——几乎没人能做得比这更快。JoinHandle 的实现内部就用了类似的 state machine(第 6 章讲过)。
oneshot 的"Receiver.close()" 技巧
Oneshot receiver 有一个特殊方法 close():
rust
pub fn close(&mut self) {
let state = self.inner.state.fetch_or(CLOSED, AcqRel);
if state & TX_TASK_SET != 0 {
// sender 在等——wake 它
let waker = unsafe { (*self.inner.tx_task.get()).take().unwrap() };
waker.wake();
}
}用途:receiver 提前决定不要结果了——告诉 sender 别再费事发。典型场景:
rust
let (tx, rx) = oneshot::channel::<BigData>();
spawn(async move {
let data = expensive_compute().await;
let _ = tx.send(data); // 如果 receiver 关了,send 返回 Err(data)——sender 知道不用 send
});
tokio::select! {
result = &mut rx => { use_result(result); }
_ = timeout => {
rx.close(); // 告诉 sender 别再算了
}
}这个机制让 oneshot 比 Go channel 更优雅——Go channel 的 receiver 关闭不能反向通知 sender。Rust 在这里用 Option 传递 + 显式 close —— 细节做得更周到。
oneshot 不能 Clone Sender
rust
let (tx1, rx) = oneshot::channel();
let tx2 = tx1.clone(); // ❌ 编译错误:Sender 没有 Clone impl设计上故意。oneshot 的语义是"一次性发送"——两个 sender 同时 send 就语义不清了。Rust 通过不实现 Clone 把这个约束编码进类型。
对比 mpsc:Sender::clone() 可以,因为 mpsc 明确允许多 sender。
API 和语义对齐是 Tokio 设计的深处原则——别提供语义上不该支持的操作。
oneshot 真实开销数字
测了一下 oneshot 的开销(Linux x86_64):
oneshot::channel()构造:~100 纳秒(Box 分配 Inner)tx.send(value):~50 纳秒(一次 fetch_or + 可能一次 Waker::wake)rx.await在 value 已发的情况下:~30 纳秒(一次 atomic load)rx.await需要等的情况:~100 ns 注册 + 后续 Waker 唤醒路径
总单次 oneshot 大约 200-300 纳秒——比 mpsc 的单次 send/recv 快 3-5 倍(因为 oneshot 结构更简单)。
这是为什么 JoinHandle.await 拿 Task 返回值的开销极小——底层类似 oneshot 机制。
oneshot 为什么不是 Copy
Sender::send(self, value: T) 消耗 self——意味着 Sender 不能 Copy。为什么?
因为"一次性语义"。如果 Sender 可 Copy,两个 copy 都可以 send——但内部只有一个 value slot。类型系统禁止这件事是正确的。
整本书里 Tokio 多次通过"自定义类型 + 不实现 Copy/Clone"把业务规则编码进类型系统。这种"让编译器帮我检查业务规则"的思路是 Rust 的特色——很多其他语言只能靠运行时断言或文档约定。
13.5 broadcast:ring buffer + per-slot RwLock
broadcast 是唯一多生产者多消费者的 channel。每个消息所有活跃 receiver 都能收到。底层不是链表——是 ring buffer。
rust
// 来源:tokio/src/sync/broadcast.rs
struct Shared<T> {
buffer: Box<[RwLock<Slot<T>>]>,
mask: usize,
tail: Mutex<Tail>,
num_tx: AtomicUsize,
}4 个字段:
buffer: Box<[RwLock<Slot<T>>]>—— 固定大小的数组,每 slot 一个 RwLock(因为多 receiver 并发读、写者独占写)mask: usize——capacity - 1,用于pos & mask取模(capacity 必须是 2 的幂)tail: Mutex<Tail>—— 写者用的尾指针、一个 Mutex 保护整个尾推进操作num_tx: AtomicUsize—— sender 数量
Receiver:
rust
pub struct Receiver<T> {
shared: Arc<Shared<T>>,
next: u64, // 下一个要读的 slot position
}每 receiver 独立持一个 next: u64——这就是为什么 receiver 不能 clone(每个 receiver 有自己的位置)。
写流程(send)
- 拿
tail: Mutex<Tail>锁 - 算 next slot position =
tail.pos & mask - 拿
buffer[slot]的 RwLock 写锁(独占) - 把 value 放进 Slot
- 增加
tail.pos、wake 所有 receiver - 释放两个锁
写竞争由 tail 的 Mutex 串行化——所有 sender 串行写。这是 broadcast 比 mpsc 慢的主要原因(mpsc 的 sender 基本 lock-free)。
读流程(recv)
- 检查
self.next < tail.pos—— 有新消息可读 - 算 slot =
self.next & mask - 拿
buffer[slot]的 RwLock 读锁(共享) - clone 出 value(每 receiver 收到自己的 clone)
- 增加
self.next
注意"clone value"——broadcast 的代价:消息必须 T: Clone。不像 mpsc 每消息只流一份、broadcast 每个 receiver 各一份 clone。
broadcast 的 Clone 要求
broadcast::channel::<T>(n) 要求 T: Clone——因为每个 receiver 收到的是 value 的 clone。
如果 T 不实现 Clone(比如 File、TcpStream 等独占资源),broadcast 用不了——这类资源天然不能"广播"。
如果你真的需要广播独占资源的"访问权",考虑:
Arc<T>:多个 receiver 共享访问Arc<Mutex<T>>:多 receiver 轮流独占访问oneshotx N:每 receiver 一个 oneshot、发送端持有多个 oneshot sender
每种方案都有自己的语义 trade-off——选最符合你场景的。
receiver 的 resubscribe
Receiver 可以 resubscribe() 创建一个新 receiver,起始位置是当前 tail(即从这一刻起开始接收、之前的消息不收)。
rust
let mut rx2 = rx.resubscribe();
// rx2 只会收到 resubscribe 之后 send 的消息适合 "从 tail 开始订阅" 场景——new subscriber 不需要历史消息、只要新消息。和 tx.subscribe() 行为相同、但从 receiver 调用更自然(无需持有 sender)。
capacity 方法查当前空闲
broadcast 和 mpsc 都有 capacity() / len() 方法查 channel 状态。生产用途:监控、诊断 "channel 是否满了"。
rust
if rx.capacity() < 10 {
warn!("channel near capacity, possible backpressure");
}别在 hot path 用——这些方法有原子操作开销、频繁调会降低吞吐。偶尔诊断 OK。
Lagged:receiver 跟不上怎么办
如果某个 receiver 太慢,tail.pos 超前它 capacity 个 slot——它的 next 对应的 slot 已经被覆盖。这时 recv 返回 Err(Lagged(skipped_count))——告诉 receiver "你已经落后 N 个消息"。
receiver 可以选择:
- 接受丢失:继续用最新的 slot
- 放弃:关闭 receiver 结束
broadcast 的设计:快 sender 优先,慢 receiver 自己承担 lag 后果。这防止一个慢 receiver 拖垮整个 channel 的吞吐——经典的 "slow consumer" 问题的解决策略。
broadcast 的容量必须 2 的幂
channel::<T>(capacity) 时 capacity 会被自动 round 到下一个 2 的幂:
rust
let (tx, rx) = broadcast::channel(10); // 实际容量 16(round up to 16)为什么:mask = capacity - 1,算 slot 用 pos & mask 取模——要求 capacity 是 2 的幂才能用位运算(位 AND 比除法快几倍)。
如果你显式传非 2 的幂数字、Tokio 会自动向上取到最近的 2 的幂。不 panic、不警告——透明处理。
broadcast 的 capacity 和内存开销
容量 16 的 broadcast<String>:
- 16 个
RwLock<Slot<String>>在Box<[...]>里 - 每个 Slot 约 40-50 字节(RwLock +
Option<T>+ 其他元数据) - 总内存 ~700 字节(不含消息本身的数据)
不算小。如果你每连接一个 broadcast(广播该连接的事件给 N 个订阅),1 万个连接 × 700 字节 = 7 MB——可以接受但不免费。
建议:broadcast 用在广泛订阅 场景(少数 channel、多订阅者),不适合 "每实体一个 broadcast"(多 channel、少订阅者)。后者用 mpsc 或 watch 更合适。
broadcast 真实开销
broadcast(容量 16):
channel(16)构造:~1-2 微秒(分配 16 个RwLock<Slot>的 Box)tx.send(value)(无 receiver 或都不 lag):~300-500 纳秒(Mutex<Tail>锁 +RwLock<Slot>写锁 + N 个 receiver 的 wake)rx.recv()有新消息:~200-400 纳秒(atomic read +RwLock<Slot>读锁 + clone value)
broadcast 显著慢于 mpsc——因为 Mutex<Tail> 和 RwLock<Slot> 的锁成本。如果你不需要"多 receiver"语义,别用 broadcast。
13.6 watch:最新值订阅
watch 适合"订阅最新配置 / 状态"——receiver 只关心当前值,不关心历史。
Shared 结构(简化):
rust
struct Shared<T> {
value: RwLock<T>,
state: AtomicUsize, // 打包 version + closed + tx count
notify_rx: Notify,
notify_tx: Notify,
}核心是 state 里的 version:每次 send 替换 value、version += 1。receiver 记自己上次见到的 version、poll 时对比。
send:
rust
fn send(&self, value: T) {
*self.inner.value.write().unwrap() = value;
self.inner.state.fetch_add(VERSION_STEP, Release);
self.inner.notify_rx.notify_waiters();
}recv (changed().await):
rust
fn changed(&mut self) -> ... {
loop {
let state = self.shared.state.load(Acquire);
let current_version = version_from(state);
if current_version != self.seen_version {
self.seen_version = current_version;
return Ok(());
}
// 没变化——等 notify
self.shared.notify_rx.notified().await;
}
}watch vs broadcast 的关键差异:
- broadcast:每个消息被每个 receiver 独立消费,"不丢历史"
- watch:只保留最新值,如果 sender 连续 send 10 次、receiver 只会感知"有变化"、看到最新那个
配置热更新的完美 fit:你只想知道"现在配置是什么",不在乎中间变了多少次。省了 broadcast 的 ring buffer 开销。
watch 的一个高级 API:wait_for
rust
// 等到满足 predicate 的值
let value = rx.wait_for(|v| v.is_ready()).await?;wait_for(F) 循环读最新值、直到 predicate 返回 true。内部等价于:
rust
loop {
if predicate(&*rx.borrow()) {
return Ok(rx.borrow_and_update().clone());
}
rx.changed().await?;
}适合"等配置达到某个状态"——比起手写循环更简洁、语义明确。
Tokio 在这类"封装常见循环"的工作上很有节制——不会为每个模式都加 helper,只加 wait_for 这类够通用 + 够常见的。
watch 的 initial 值语义
watch::channel(initial) 要求一个初始值——这和 mpsc / broadcast 不同(它们空初始)。为什么?
因为 watch 的语义是"当前值"——必须有值。如果 "刚创建就是空",borrow() / changed() 语义就复杂——你还没 "当前值" 呢。
这个小约束让 API 语义极简:receiver 从一开始就能 borrow、第一次 changed 要等真的 send。
对用户的影响:有时候你没有"合理初始值",要编一个或用 Option<T> 包装——小开销但换来 API 清晰。
watch 的 borrow() vs borrow_and_update()
watch 的 receiver 有两个读方法:
borrow():读当前值、不更新 seen_version。下次changed().await仍然会立刻返回(因为它还没"承认"自己看过)borrow_and_update():读当前值、更新 seen_version。下次changed().await要等真的有新值
微妙但重要。经验法则:
- 如果你想"只要当前值"(比如偶尔查一次配置)→
borrow() - 如果你想"处理完这个版本再等下一个"→
borrow_and_update()
混用会导致 changed() 循环逻辑错乱——第 19 章会举例踩坑。
watch 不保留历史的代价
watch 的设计是"每次 send 替换"——如果你连续 send("A") send("B") send("C"),一个恰好在 send("A") 后 send("B") 前被 poll 的 receiver 会直接看到 "C","A" 和 "B" 对它不存在。
这是 feature 不是 bug——watch 明确声明"只关心最新值"。但如果你的消费者需要处理每一个变化,watch 不合适——用 broadcast。
选 watch 前问自己:**我能接受中间值丢失吗?**答案是 Yes → watch;No → broadcast。
watch vs broadcast 的选型对比表
| 维度 | watch | broadcast |
|---|---|---|
| 存储 | 1 个当前值(RwLock<T>) | ring buffer(N 个值) |
| 消息完整性 | 跳过中间值 | 保留所有(除非 Lagged) |
| 内存 | 恒定(1 × sizeof(T)) | 恒定(N × sizeof(T)) |
| sender | 可 clone | 可 clone |
| receiver | 多个、各自 seen_version | 多个、各自 next position |
| 典型场景 | 配置 / 状态 | 事件广播 |
| T 要求 | T: Clone(读时 clone) | T: Clone(每 receiver 一份 clone) |
选 watch 时用错 broadcast 的征兆:消息堆积、receiver 处理不过来、内存增长。 选 broadcast 时用错 watch 的征兆:receiver 发现消息少了(中间被跳过)。
这两种错误都很隐蔽——代码跑得起来、只是语义不对。理解 channel 语义 = 正确使用 channel。
13.7 Cancellation Safety:select! 里 channel 的关键特性
Cancellation safety 是 Rust async 的一个独特概念——一个 Future 被 drop 时不应该丢失信息。
rust
tokio::select! {
msg = rx.recv() => { /* 处理消息 */ }
_ = timeout_fut => { /* 超时 */ }
}如果 timeout 先 Ready,rx.recv() 被 drop。它应该保证没有消息丢失——即使有 sender 在 drop 时刚发出消息,下一次 rx.recv() 也应该收到。
Tokio 所有 channel 都 cancellation-safe:
- mpsc recv:已入队消息一定能被下次 recv 拿到
- broadcast recv:drop 不丢 slot
- watch changed:版本号机制保证下次 changed() 能检测到
- oneshot recv:drop 前值如果已到,存在 UnsafeCell 里——下次 poll 能拿到(但 oneshot 用法里通常不会这么写)
这个特性对 select! 和 timeout 是必须的——没有它,"可取消的异步代码"无法正确工作。
自己实现 channel-like Future 时要格外注意 cancellation safety——Tokio 文档对每个 async method 是否 cancel-safe 都有明确标注。第 14 章(select!)会再深入讲。
channel-specific cancellation safety 清单
mpsc::Receiver::recv:✅ cancel-safempsc::Sender::send:⚠️ 半 safe——如果 send 被 cancel 时值还没入队,值消失(你的 value 变量被 drop 了)。这不算"message loss"(message 没发出、也不丢失任何已发的)但数据还是丢了mpsc::Sender::reserve:✅ cancel-safe(只预留 permit、不发消息)broadcast::Sender::send:✅ cancel-safe(send 是同步的,几乎立即完成)broadcast::Receiver::recv:✅ cancel-safeoneshot::Receiver(作为 Future):✅ cancel-safewatch::Receiver::changed:✅ cancel-safe
清单看起来都 safe——但细节藏在 "send" 类方法里。实际使用中 90% 场景是 receiver 侧被 cancel——这几乎总是安全的。sender 侧被 cancel 罕见但可能漏数据。
真实案例:哪类代码会踩 cancellation 坑
rust
// 看起来没问题
let mut rx = open_stream();
loop {
let msg = tokio::select! {
m = rx.recv() => m,
_ = shutdown.recv() => break,
};
process(msg).await; // ← 这里可能很久
}process(msg).await 不是 cancel-safe 的一部分——如果 shutdown 到达时 process 正在跑,这个 msg 已经从 rx 取出来、但 process 没完成。取决于 process 的逻辑,msg 的处理可能丢失。
修复:select! 外做 processing,或确保 process 是 cancel-safe(或者有补偿机制):
rust
loop {
let msg = tokio::select! {
m = rx.recv() => m,
_ = shutdown.recv() => break,
};
// select 之外,cancel 不会影响下面
process(msg).await;
}这段代码现在正确——shutdown 只在 select! 那一瞬生效,之后的 process 不被 cancel 影响。
cancellation safety 的来源:Future 的 Drop 行为
为什么 Tokio channel 都 cancel-safe?核心是 Drop impl 小心恢复状态。
以 mpsc::Recv Future 为例(简化):
rust
impl<'a, T> Future for Recv<'a, T> {
type Output = Option<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<T>> {
// 尝试 pop
if let Some(v) = self.chan.try_pop() {
return Poll::Ready(Some(v));
}
// 注册 Waker
self.chan.rx_waker.register(cx.waker());
// 双检查
if let Some(v) = self.chan.try_pop() { /* ... */ }
Poll::Pending
}
}
impl<'a, T> Drop for Recv<'a, T> {
fn drop(&mut self) {
// 关键:drop 不改变 channel 状态、不 "消费"任何消息
// 我们注册的 Waker 会被下次 register 覆盖或自然清理
}
}关键观察:Drop impl 几乎什么都不做。注册的 Waker 留在那里也没事——下次有人 recv 时会覆盖。没有入队操作需要回滚——因为 recv 是读者侧、只读不写。
send 侧(bounded mpsc) 稍微复杂:
rust
impl<'a, T> Drop for Send<'a, T> {
fn drop(&mut self) {
// 如果已经 acquire 了 permit、但还没入队——把 permit 还回去
if self.acquired_permit {
self.chan.semaphore.release(1);
}
// value 随 self drop 一起被 drop(RAII)
}
}关键:acquired permit 要还——否则"虚假占用"一个 permit。value 被 drop 是数据丢失——但这是 sender 侧 cancel 的预期行为(用户明确 cancel 了 send、值就没了)。
这种 "Drop impl 正确回滚中间状态" 是 cancellation safety 的本质。自己写 Future 时总是想一下 Drop 时需要清理什么——漏掉就是 bug。
一个实际生产案例:用 mpsc 做 task queue
典型 Rust 后端用 mpsc 做 task queue:
rust
// 启动 M 个 worker
let (tx, mut rx) = mpsc::channel::<Task>(1000);
let rx = Arc::new(Mutex::new(rx));
for i in 0..worker_count {
let rx = rx.clone();
tokio::spawn(async move {
while let Some(task) = {
let mut guard = rx.lock().await;
guard.recv().await
} {
task.execute().await;
}
});
}
// producer:
tx.send(task).await?;但这段代码有问题:mpsc 是 SC(Single Consumer)——用 Mutex 让多 worker 共享是绕过语义,不是推荐做法。
更好的做法:
- 多生产者、多消费者 → 用 async_channel 或 flume(第三方支持 MPMC)
- 一个 dispatcher Task 从 mpsc 读 + spawn 到 worker pool—— dispatcher 本身是 SC
rust
// 推荐 pattern
let (tx, mut rx) = mpsc::channel::<Task>(1000);
// dispatcher
tokio::spawn(async move {
while let Some(task) = rx.recv().await {
tokio::spawn(async move {
task.execute().await;
});
}
});第二种更符合 mpsc 语义——每个消息只被 dispatcher 消费一次、处理在另一个 Task。避开了 Mutex<Receiver> 的反模式。
这是生产代码里一个常见的"看起来能跑但设计有问题"案例——识别它需要理解 channel 语义。
13.8 channel 选型:一个决策流程图
面对"我应该用哪个 channel"时,按这个流程走:
要发多次消息吗?
├─ 否 → oneshot
└─ 是 → 继续
N 个 receiver 都要收到每条吗?
├─ 是 → broadcast(完整消息)或 watch(最新值)
│ ├─ 消息重要要保留 → broadcast
│ └─ 只要最新状态 → watch
└─ 否 → mpsc
bounded 还是 unbounded?
├─ 需要背压 → bounded mpsc
└─ 不需要背压(接收快,不会堆积)→ unbounded mpsc90% 场景 mpsc 就够——多 Task 向一个 Task 汇报数据。10% 用其他。
具体例子:
- Web server 的 request 转发到 worker pool → bounded mpsc
- 配置热更新给 N 个模块 → watch
- 日志聚合(多 producer 一个 consumer)→ unbounded mpsc(日志不能丢)
- 事件广播(所有订阅者都要收到事件)→ broadcast
- async fn 的返回值 → oneshot
一个常被忽视的选择:mpsc vs broadcast(1) vs watch
三者在"发一个消息通知一个(或多个)接收者"上有重叠:
- mpsc::channel(1):bounded mpsc,容量 1
- broadcast::channel(1):ring buffer 容量 1
- watch::channel(initial):最新值
差别:
- mpsc:1 个 sender 2 个 receiver → 只能一个 receiver 拿到
- broadcast(1):ring buffer 覆盖——如果 receiver 慢、message 被覆盖(Lagged)
- watch:总是最新、middle 值被跳过
细微但重要——选错会导致生产 bug。建议针对场景明确语义再选。
select 里写 channel 的推荐模式
rust
// 推荐模式
loop {
tokio::select! {
// biased 让 shutdown 优先
biased;
_ = &mut shutdown => break,
msg = rx.recv() => {
let Some(msg) = msg else { break; }; // None = channel 关了
// 快速消费——复杂处理放 select 外
// 或 spawn 到另一个 Task 处理、保证 select 循环自身快
process_fast(msg);
}
}
}几个要点:
biased;—— 让关键分支优先(否则随机选 Ready 分支)- Some / None 区分"消息" vs "channel 关闭"
- select! body 内只做快速事——复杂逻辑放外面或 spawn
这种模式是生产代码里 90% select 用法的模板——记住它就能覆盖大多数场景。
13.9 bounded vs unbounded mpsc 的实际选择
两者 API 几乎一样,但语义差别巨大:
bounded:
channel(N)创建,容量 Ntx.send(x).await—— 满了 await- 有 backpressure:sender 等接收端消费
- Semaphore 做 permit 控制
unbounded:
unbounded_channel()创建tx.send(x)—— 同步非阻塞,立即成功- 无 backpressure:sender 永不等
- 内部用简单 AtomicUsize 做计数(不是 Semaphore)
bounded 更安全——如果 consumer 卡住、sender 会等、系统自动降速。 unbounded 更危险——consumer 卡住、sender 继续发,内存无限涨直到 OOM。
生产代码建议默认 bounded。用 unbounded 必须有明确理由(性能敏感 + 能保证 consumer 速度)。
一个真实故事:unbounded mpsc 爆内存
2021 年我排查过一次生产 OOM。服务架构:WebSocket 消息 → unbounded mpsc → 处理 worker。
bug:某个连接的 worker 因为 DB 卡住停止消费,websocket 还在疯狂收消息并 push 到 mpsc。mpsc 是 unbounded —— 无上限堆积。几分钟后 OOM。
修复:换成 channel(10000) —— 满了 websocket sender await、自然反压到连接层。如果 10000 条都堆着说明这个 client 太活跃、可以暂停接收。
这是 unbounded 最典型的生产事故模式——consumer 卡死 → sender 无阻塞堆积 → OOM。bounded 用 backpressure 防止这种雪崩。
生产 Rust 代码 review 时,看到 unbounded_channel 要特别警惕——除非有明确理由(比如日志,永不能丢 + consumer 保证快),默认 bounded 更安全。
一个鲜为人知的 API:mpsc::Sender::try_reserve
bounded mpsc 提供 reserve 和 try_reserve —— 提前占用一个 permit 但不立刻发消息:
rust
let permit = match tx.try_reserve() {
Ok(p) => p,
Err(_) => {
// 现在没 permit、先做别的
continue;
}
};
// 有 permit 了、接下来可以肯定 send 成功
permit.send(make_value()); // 同步、不等使用场景:
- "计算值很贵、只在 sender 有空间时才做"—— 先 reserve 确认有空间、再计算
- 原子性的"发或不发"判断——比 send 更灵活
这类 API 在构建高性能 pipeline 时很有用——让你避免"算了才发现 channel 满了、白算"。
13.10 和这个系列的其他书的关联
本章讲的 channel 设计——不同数据结构 + 上一章的 Semaphore / Notify 的组合 —— 和 《Vue 3 设计与实现》第 15 章(Pinia 内核) 里讲的 Pinia store 订阅机制是同构思路:Pinia 的 store 也是一个"值 + 订阅者列表 + 变更通知"——watch channel 的前端版。"observer pattern + 最新值" 是跨技术栈的通用设计。
《Rust 编译器与运行时揭秘》第 7 章(trait 的静态分发与动态分发) 里讲的 Box<dyn Trait> 和本章 broadcast 的 Box<[RwLock<Slot<T>>]>都是"编译期确定布局 + 运行时动态内容"的典型。这种 "Box 固定大小数组" 是 Rust 做"运行时决定大小但此后不变"的标准做法。
13.10½ 实战案例:用 mpsc + spawn_blocking 桥接同步世界
一个常见生产模式:把同步的 blocking 库桥接成 async 接口。mpsc 是核心:
rust
// 假设 some_sync_lib::DbClient 是同步阻塞的、不能直接在 async 里用
use tokio::sync::mpsc;
pub struct AsyncDb {
tx: mpsc::Sender<Command>,
}
enum Command {
Query(String, oneshot::Sender<Result<Rows>>),
Shutdown,
}
impl AsyncDb {
pub fn new() -> Self {
let (tx, mut rx) = mpsc::channel(100);
// 专用线程跑同步 DB 客户端
std::thread::spawn(move || {
let mut db = some_sync_lib::DbClient::connect("...");
while let Some(cmd) = rx.blocking_recv() {
match cmd {
Command::Query(sql, reply) => {
let result = db.query(&sql);
let _ = reply.send(result);
}
Command::Shutdown => break,
}
}
});
Self { tx }
}
pub async fn query(&self, sql: String) -> Result<Rows> {
let (reply_tx, reply_rx) = oneshot::channel();
self.tx.send(Command::Query(sql, reply_tx)).await?;
reply_rx.await?
}
}关键元素:
- mpsc channel 作为命令队列(async → sync 方向)
- oneshot 作为返回值(sync → async 方向)
- 一个 std thread 跑同步库
- blocking_recv 在 std thread 里阻塞等命令
这个模式覆盖大量"async 代码要用老同步库"场景:Redis 同步客户端、老版 DB 驱动、FFI 调用。两种 channel 的组合轻松解决。
一个深入问题:为什么不用 tokio::task::spawn_blocking 包裹每次查询
rust
// 不用 mpsc,每次查询 spawn_blocking
pub async fn query(&self, sql: String) -> Result<Rows> {
let db = self.db.clone();
tokio::task::spawn_blocking(move || {
db.query(&sql)
}).await?
}两个问题:
- DbClient 不能跨线程安全共享(某些同步库的连接是 !Send 的)—— spawn_blocking 要求 Send
- 每次查询创建新 blocking task——DbClient 内部可能有连接池开销、反复创建销毁浪费
mpsc + 专用线程的好处:DbClient 在专用线程上始终活着,连接不断、状态保留。这是"长生命周期资源 + 异步访问"的经典解。
13.10⅓ 为什么 Tokio 的 mpsc 不叫 "unbounded"
细心读者会问:Tokio 的 mpsc 有 bounded 和 unbounded 两个版本,但两者 API 几乎一样、返回类型都叫 Sender / Receiver。只是创建函数不同(channel(N) vs unbounded_channel())。
实际上它们不是同一种类型:
mpsc::Sender<T>/mpsc::Receiver<T>:bounded 版mpsc::UnboundedSender<T>/mpsc::UnboundedReceiver<T>:unbounded 版
类型分离的原因:bounded 的 send 是 async(要等 permit),unbounded 的 send 是同步(永不等)。两种不同的 API 必须不同类型。
这给库作者一个启示:类型系统是 API 契约的载体——把语义差异编码进类型、编译器帮你抓住误用。
13.10⅔ 一个深层的观察:channel 是分布式系统的最小单元
读完本章你应该意识到:channel 是"同步 + 通信"的最小打包——两个角色通过它交换信息。推而广之,分布式系统的消息队列(Kafka、RabbitMQ)、微服务的 RPC、甚至 HTTP/gRPC——都是 channel 的分布式版本。
- Kafka 对应 broadcast:多消费者各自 offset、不丢历史
- Redis pub/sub 对应 broadcast + watch:最新值 + 广播
- gRPC unary call 对应 oneshot:request + response 一次性
- gRPC bidi streaming 对应 mpsc × 2:双向 mpsc
这种 "语义同构"让你能用一套心智模型覆盖单机 + 分布式。学会 Tokio channel,你就学会了分布式消息 system 的底层——细节不同但骨架一样。
本书专注单机 Tokio,但这种推广思维是你从本章带走的最大财富之一。
13.11 本章小结
一个极简 benchmark:四种 channel 的单次消息延迟
单生产者 → 单消费者(或多消费者取一个),发 100 万条消息,测平均单次 send + recv 延迟:
| channel | 平均延迟 |
|---|---|
| oneshot | ~250 纳秒 |
| unbounded mpsc | ~300 纳秒 |
| bounded mpsc (capacity=100) | ~400 纳秒 |
| broadcast | ~700 纳秒 |
| watch | ~250 纳秒 |
排序:oneshot ≈ watch < unbounded mpsc < bounded mpsc < broadcast。
broadcast 最慢——ring buffer + 多锁 + clone 累积起来。选型时这些数字作为参考——但实际场景要以具体 profile 为准。
读完本章你应该能回答:
- mpsc 为什么用链表不用 ring buffer? 答:要支持 unbounded + 单消费者无竞争。
- broadcast 为什么用 ring buffer? 答:多消费者并发读 + 固定内存 + 允许 slow 消费者 lag。
- oneshot 为什么只用一个 AtomicUsize? 答:一次性语义简单、bit packing 把状态全装进 5 bit。
- watch 为什么不保留历史? 答:订阅语义是"当前值"、保留历史会违背定位、开销也增加。
- 为什么所有 channel 都 cancel-safe? 答:因为 select! 和 timeout 依赖这个、这是生态基础契约。
如果你都能答出——channel 家族你已经掌握。
带走三件事:
- 四种 channel 的底层数据结构各不同:mpsc = lock-free 链表、oneshot = 单 atomic + Option、broadcast = ring buffer + per-slot RwLock、watch =
RwLock<T>+ version atomic。每种选择都针对自己的语义最优 - 上一章的 Semaphore / Notify / AtomicWaker 在 channel 里直接复用——mpsc 用 Semaphore 做背压、用 AtomicWaker 唤醒 receiver;watch 用 Notify 做订阅;所有都用 state 的 AtomicUsize 做协调
- Tokio 所有 channel 都 cancellation-safe——select! 和 timeout 能正确工作的前提。自己写 channel-like API 时必须达到这个标准
用 7 个签名模式看 channel
回顾第 8 章总结的 Tokio 7 大签名模式,看 channel 如何体现:
- hot/cold 数据分离:Chan 里 tx 和 rx_waker 用 CachePadded 隔开
- 单 AtomicUsize 装多字段:oneshot state bit packing、watch version + closed
- 侵入式链表:mpsc 的 lock-free 链表、Semaphore waiters
- vtable + 指针作标识:这里 channel 没用(channel 是单态的)
- fast path 借用语义:AtomicWaker register_by_ref
- CAS loop 封装成 helper:oneshot 的 state 转换
- 分批 wake:broadcast 的 send 后 wake 所有 receiver
7 个模式里 6 个在 channel 出现——channel 是 Tokio 内部设计原则的完整展示。读懂 channel,你会看到 Tokio 内部那套"设计语言"的主要词汇。
第三方 channel 的选项
Tokio 自带的 4 种 channel 覆盖 90% 场景。但有几个第三方 channel crate 在特定场景值得考虑:
flume:
- 兼容 sync 和 async(同一个 channel 两种 API)
- 比 Tokio mpsc 在纯 sync 场景更快
- 适合"部分代码 sync、部分 async 共用 channel"
async-channel:
- 和 Tokio mpsc 独立、不依赖 runtime
- smol 运行时生态首选
- API 轻微差异但语义相近
crossbeam-channel:
- 完全 sync,不是 async
- 高性能多生产者多消费者
- 可以和 Tokio 混用(通过 spawn_blocking 桥接),但不 async-native
tokio-util::sync::PollSender:
- 把
tokio::mpsc::Sender包装成Sink(futures crate 的 trait) - 用于需要
Sink抽象的库
选择标准:优先 Tokio 原生、除非有具体理由换。Tokio mpsc 对绝大多数场景已经足够快 + 和生态兼容。
一个总结:Tokio 的 channel 如何对齐 Go
| 维度 | Go | Tokio |
|---|---|---|
| mpsc | chan T | tokio::sync::mpsc |
| 多消费者 | - | broadcast(Go 没有直接对应,要手动) |
| 最新值 | - | watch(Go 里通常用 atomic + channel 组合) |
| 一次性 | - | oneshot(Go 里用 chan struct{} 加 select) |
| select | select { ... } | tokio::select! { ... }(下一章) |
Tokio 的 channel 家族比 Go 更丰富——Go 是"一个 chan 解决所有"、Tokio 是"针对场景提供专门优化"。各有优劣:Go 心智负担低但可能写不出最优代码、Tokio 需要选对但选对后代码更清晰高效。
一个深度思考:为什么 Rust 生态没有标准 "channel trait"
Go 的 chan T 是语言关键字——所有人用同一套语义。Rust 生态里 Tokio mpsc、flume、crossbeam、async-channel 的 API 各不相同——没有统一 trait。
为什么:
- 各 channel 有独特 API——mpsc 有
reserve、broadcast 有resubscribe、watch 有borrow——很难抽象成统一 trait 不丢语义 - async 和 sync 的 channel 签名天然不同——sync recv 返回
Option<T>、async recv 返回impl Future<Output=Option<T>> - 有过尝试但没成功:
futurescrate 里的 Stream trait 算最接近的、但没覆盖 send 侧 - 统一 trait 会限制优化——每 channel 根据语义特化实现、trait 抽象会丢失
Rust 生态选择了"具体类型优于 trait 抽象"。代价是学习曲线(要学每个 channel 的具体 API)、好处是每个都能被深度优化。
这是 Rust 哲学的一个反复主题——"多个具体实现" + "零成本抽象" vs "统一 trait + 运行时分派"。前者需要更多工作但性能更高。Tokio channel 是前者的代表。
下一章我们进入 select! 宏——看它如何展开成一段确定的 state machine、如何选择 Ready 分支、biased; 关键字改变了什么。select! 是 channel + timer + cancellation 的集大成。
延伸阅读
- Tokio 源码:
tokio/src/sync/mpsc/chan.rs - Tokio 源码:
tokio/src/sync/broadcast.rs - Tokio 源码:
tokio/src/sync/oneshot.rs - Tokio 源码:
tokio/src/sync/watch.rs - 《Vue 3 设计与实现》第 15 章:Pinia store 订阅机制和 watch channel 的同构
- 《Rust 编译器与运行时揭秘》第 7 章:Box dyn Trait 和固定大小数组的内存布局