Appearance
第12章 异步 Mutex / RwLock / Semaphore
"In a well-designed async library, every lock is just a semaphore in disguise." —— 笔者
本章要点
tokio::sync::Mutex<T>的真实定义:3 个字段——Semaphore (内部 permits=1)+UnsafeCell<T>+ 可选的 tracing span。Mutex 就是"一个只能有一个持有者的 Semaphore"- Semaphore 是 Tokio 所有同步原语的 building block——Mutex、RwLock、通道的背压、Notify 都在其上构建
- 底层
batch_semaphore::Semaphore的结构:waiters: Mutex<Waitlist>(侵入式链表 + 标准 Mutex 保护)+permits: AtomicUsize(可用 permit 数) - Tokio Mutex 是公平锁(FIFO 排队)——和 std::sync::Mutex 的非公平语义有区别
- MutexGuard::drop 调
lock.s.release(1)——释放 permit、触发下一个等待者 - RwLock = Semaphore(N) + 读写各占 permit——写锁占 N permit、读锁占 1 permit
- 绝不在 .await 跨越持锁——不是因为"性能差",是因为可能死锁:被等待的 Future 想要获取同一把锁
12.0½ 一个直觉问题:async 需要自己的 Mutex 吗
在深入具体实现前,想一下:std::sync::Mutex 已经存在、为什么 Tokio 还要搞一套?
原因在"锁被占时会发生什么":
std::sync::Mutex:拿锁失败 → 当前OS 线程进 kernel wait 队列、CPU 让给其他线程。整个线程被挂起。
Tokio 场景:一个 worker 线程可能跑着几千个 Task。如果一个 Task 被 std Mutex 挂起 → 整个 worker 挂起 → 上面几千个 Task 全被卡住。灾难。
tokio::sync::Mutex:拿锁失败 → 当前 Task 返回 Pending、注册 Waker、让出给 runtime。OS 线程不被挂起——它继续跑别的 Task。锁好了后 Waker 唤醒那个 Task。
这是 async 同步原语的本质:把"阻塞线程"替换成"挂起 Task"。Rust std 没有 Task 概念——所以必须 Tokio 自己提供 async 版本。
这就是为什么 tokio::sync:😗 存在。不是重复造轮子——是必要的。读本章前想清楚这点——后面所有设计都围绕"Task 让步 + Waker 唤醒"展开。
12.1 Tokio 同步原语全景
Tokio sync 模块提供了一整套异步同步原语:
| 原语 | 用途 | 底层实现 |
|---|---|---|
Mutex<T> | 互斥访问 | Semaphore(1) + UnsafeCell |
RwLock<T> | 读写锁 | Semaphore(N) + 读写 permit 数分配 |
Semaphore | 信号量(限流、并发控制) | 直接就是 batch_semaphore |
Notify | 条件变量(wake up 一个 / 多个等待者) | 独立实现,但思路相似 |
Barrier | 集合屏障 | 基于 Notify |
OnceCell<T> | 一次初始化 | 基于 Semaphore |
核心洞察:Semaphore 是一切的基础。理解了 Semaphore,就理解了 Mutex、RwLock 等。所以本章先讲 Mutex(最小案例),再讲 Semaphore(基础机制),再讲 RwLock / Notify(变种)。
12.2 Mutex 的惊人简单
打开 tokio/src/sync/mutex.rs。原样:
rust
// 来源:tokio-rs/tokio · tokio/src/sync/mutex.rs (tokio-1.40.0)
pub struct Mutex<T: ?Sized> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
s: semaphore::Semaphore,
c: UnsafeCell<T>,
}3 个字段(非 tracing 下 2 个有效字段):
s: semaphore::Semaphore—— 一个 permit = 1 的 Semaphorec: UnsafeCell<T>—— 存放被保护的数据resource_span(可选) —— tracing 观测
这就是 Tokio Mutex 的全部结构。让我们想想它的语义:
- 初始化:
Semaphore::new(1)—— 一个可用 permit lock().await→ 调 semaphore 的acquire()→ 如果拿到 permit,返回 MutexGuard;否则等- MutexGuard drop →
semaphore.release(1)—— 归还 permit
Mutex = "只有一个 permit 的 Semaphore 包装可变数据"。这是极简、正确、通用的设计。
new 的实现
rust
// 来源:tokio/src/sync/mutex.rs
pub fn new(t: T) -> Self where T: Sized {
let s = semaphore::Semaphore::new(1);
Self {
c: UnsafeCell::new(t),
s,
// ...
}
}几行代码。构造一个 Semaphore(1)、包装数据。
lock 的实现
rust
// 来源:tokio/src/sync/mutex.rs
pub async fn lock(&self) -> MutexGuard<'_, T> {
let acquire_fut = async {
self.acquire().await;
MutexGuard {
lock: self,
// ...
}
};
// ...
acquire_fut.await
}async fn——说明 lock 是 yield 点。内部 await self.acquire()——该方法调 Semaphore 的 ll_sem.acquire(1)。一个 permit 拿不到就挂起。
拿到 permit 后构造 MutexGuard 返回。MutexGuard 是一个 RAII guard——drop 时自动释放。
try_lock 的非阻塞尝试
rust
pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError> {
match self.s.try_acquire(1) {
Ok(()) => {
let guard = MutexGuard { lock: self, /* ... */ };
Ok(guard)
}
Err(_) => Err(TryLockError(())),
}
}非 async——try_acquire 是同步方法,要么成功要么失败返回 Err。不等、不让步、立刻返回。用在"我试试拿锁,拿不到就做别的"场景。
MutexGuard 的 Drop
rust
impl<T: ?Sized> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
self.lock.s.release(1);
}
}一行——把 permit 还回 Semaphore。这触发等待队列里最前面的 Task——第 12.4 节会看到 release 如何唤醒等待者。
为什么设计这么简单
核心洞察:并发控制的本质是 "permit"——你有 permit 就能做事、没有就等。Mutex 只是"permit 数 = 1"的特例、RwLock 是"读 permit + 写 permit"的扩展、限流器是"permit 数 = N"的应用。
用一个通用 primitive 实现所有同步原语——减少重复代码、减少 bug 可能性、让每个原语都继承基础的正确性。这是软件设计最珍贵的美学。
Send 和 Sync 的实现
Mutex 需要手动实现 Send / Sync:
rust
// 简化:Tokio 的 Mutex 实现(示意)
unsafe impl<T: Send + ?Sized> Send for Mutex<T> {}
unsafe impl<T: Send + ?Sized> Sync for Mutex<T> {}为什么需要 unsafe impl:因为 UnsafeCell<T> 本身不是 Send / Sync——必须手动承诺。Tokio 通过互斥保证(同一时刻只有一个 Task 能访问 T)让这个承诺成立。
T: Send 是要求——因为 T 可能被不同 worker 线程访问(Task 被迁移)。Sync 也同理——多个引用可能同时存在(&Mutex<T> 可以 clone 给多处)。
这些 Send/Sync bound 影响你能用什么类型:
Mutex<String>:Send + Sync ✓Mutex<Rc<T>>:Rc 不是 Send——Mutex<Rc<T>>也不是 Send——不能 spawnMutex<*const T>:裸指针不是 Send——同上
如果你需要保护 !Send 数据,用 std::sync::Mutex 或 tokio::sync::Mutex 都不行——需要 tokio::task::LocalSet 里的 !Send 上下文(第 7 章讲过)。
Mutex 的 UnsafeCell<T>:为什么需要
Mutex 的第三个字段 c: UnsafeCell<T>。为什么是 UnsafeCell 而不是 T 直接?
因为 Rust 的借用规则不允许 &Mutex<T> 转成 &mut T——& 是共享引用、Rust 默认禁止通过共享引用修改数据。
UnsafeCell 是 Rust 里"内部可变性" 的唯一基础原语——它是编译器承认的"即使 &Mutex、内部的 T 也可以通过 unsafe 被可变访问"。Mutex 通过 Semaphore 保证互斥、从而让这个 unsafe 访问在语义上安全。
如果不用 UnsafeCell:Mutex 根本没法工作——多个线程拿着 &Mutex<T>(都是共享引用),没法修改 T。UnsafeCell 是整个 Rust 内部可变性生态(Cell、RefCell、Mutex、RwLock、AtomicXXX)的共同基础。
Mutex 只是 2 个有效字段
再回看 Mutex 的定义(非 tracing 模式):
rust
pub struct Mutex<T: ?Sized> {
s: semaphore::Semaphore,
c: UnsafeCell<T>,
}就 2 个字段——一个 Semaphore、一个 UnsafeCell<T>。50 行不到的 mutex.rs 核心代码就能写出整个工业级异步 Mutex——这就是"原语合适、代码就少"的证明。
对比 Linux kernel 的 struct mutex(几十个字段、几千行代码)——当然 kernel 有更极端的性能和正确性要求。但对用户态 async 场景,Tokio 的极简设计已经足够且更好维护。
12.3 Semaphore:基础 building block
tokio::sync::Semaphore 的外层:
rust
// 来源:tokio/src/sync/semaphore.rs
pub struct Semaphore {
ll_sem: ll::Semaphore,
// ...
}就一个 ll_sem: ll::Semaphore——ll 是 batch_semaphore 的别名。外层 Semaphore 是 API 门面、实际逻辑在 batch_semaphore。
batch_semaphore 的 struct:
rust
// 来源:tokio/src/sync/batch_semaphore.rs
pub(crate) struct Semaphore {
waiters: Mutex<Waitlist>,
permits: AtomicUsize,
// ...
}2 个核心字段:
waiters: Mutex<Waitlist>—— 等待队列(侵入式链表)。外层用 std::sync::Mutex 保护(不是 tokio::sync::Mutex——否则循环依赖)permits: AtomicUsize—— 当前可用 permit 数
两者协作完成整个 Semaphore 的语义。
Waiter 结构
rust
struct Waiter {
state: AtomicUsize,
waker: UnsafeCell<Option<Waker>>,
pointers: linked_list::Pointers<Waiter>,
// ...
_p: PhantomPinned,
}每个等待 Task 都有一个 Waiter 节点:
state: AtomicUsize—— 当前状态(等待多少 permit、是否 ready、是否 cancelled)waker: UnsafeCell<Option<Waker>>—— 该 Task 的 Waker(好了要 wake 它)pointers—— 侵入式链表节点(放进 Waitlist)PhantomPinned—— 标记自引用、不可 move(标准的 Pin 要求)
poll_acquire 的逻辑
poll_acquire 是一个 Future 的 poll 方法(实际封装在 Acquire Future 里)。简化逻辑:
rust
// 简化自 batch_semaphore.rs
fn poll_acquire(&self, cx: &mut Context, num_permits: u32, node: Pin<&mut Waiter>) -> Poll<Result<(), AcquireError>> {
// 1. 尝试无锁获取(fast path)
let mut permits = self.permits.load(Acquire);
loop {
if permits < num_permits as usize {
break; // 不够,进入慢路径
}
match self.permits.compare_exchange_weak(permits, permits - num_permits as usize, AcqRel, Acquire) {
Ok(_) => return Poll::Ready(Ok(())), // 拿到
Err(actual) => permits = actual, // 冲突,重试
}
}
// 2. Fast path 失败,进慢路径
let mut waiters = self.waiters.lock().unwrap();
// 再次检查 permits(可能在 lock 期间有人 release)
// ...
// 注册 waker 到 node、把 node push 到 waiters list
node.waker.set(Some(cx.waker().clone()));
waiters.push_back(node);
Poll::Pending
}两个阶段:
- Fast path(lock-free):直接 CAS 减 permits。成功 → 立刻返回 Ready
- Slow path:permits 不够 → 拿 Mutex 锁 → 把 waiter 放入链表 → 返回 Pending
Tokio 同步原语的 hot path 是完全 lock-free——只有真的需要等才走 Mutex。高并发下 Mutex 的锁竞争被最小化。
release 的逻辑
rust
// 简化自 batch_semaphore.rs
pub(crate) fn release(&self, added: usize) {
self.permits.fetch_add(added, Release);
let mut waiters = self.waiters.lock().unwrap();
// 从链表头部依次 wake,直到 permits 不够
while let Some(waiter) = waiters.peek_front() {
let needed = waiter.remaining_permits();
let available = self.permits.load(Acquire);
if available < needed {
break;
}
// 扣减 permits、从链表移除、wake
self.permits.fetch_sub(needed, Release);
waiters.pop_front();
waiter.wake();
}
}核心:release 不只是加 permit、它还要 wake 等待的 Task。从链表头部(FIFO)依次检查——如果 waiter 需要的 permit 数不超过当前可用数,就 wake 它。
FIFO 保证公平性——第一个等待的 Task 最先被唤醒。这是 tokio::sync::Mutex 是公平锁的机械原因。
为什么 batch_semaphore 内部要用 std::sync::Mutex
batch_semaphore::Semaphore 用 waiters: Mutex<Waitlist> 保护等待链表。注意这里必须是 std::sync::Mutex——不能用 tokio::sync::Mutex。为什么?
因为循环依赖:tokio::sync::Mutex 内部就是 Semaphore(1)——如果 Semaphore 内部又用 tokio::sync::Mutex,两者互相依赖无法初始化。
std::sync::Mutex 在这里安全的原因:
- 锁内代码极短(几行链表操作)——微秒内完成
- 不跨任何 .await——完全同步代码
- 持锁时间短到不会阻塞 worker 显著时间
这是一个例外——本章前面说"跨 .await 不能用 std Mutex",但 Tokio 内部自己的实现里用 std Mutex 保护 hot path 的短操作。例外不矛盾——判断标准是"会不会跨 .await"而不是"能不能用"。
这种 "底层用 std 同步原语做极短操作、上层暴露 async 接口" 的 pattern 在 Tokio 源码里反复出现——Waitlist、OwnedTasks、RegistrationSet 都是这种设计。正确使用 std 和 tokio 同步原语的关键是理解它们各自的适用边界。
12.4 公平锁 vs 非公平锁的 trade-off
公平锁(FIFO):先等的先得。tokio::sync::Mutex 是这种。 非公平锁(barging):新来的可能"插队"——释放锁的瞬间刚好有线程尝试 acquire、直接拿走。std::sync::Mutex 是这种(Linux 上的 parking_lot 底层)。
两者 trade-off:
| 维度 | 公平锁 | 非公平锁 |
|---|---|---|
| 延迟分布 | 均匀 | 波动大(有的快有的慢) |
| 吞吐 | 稍低 | 稍高 |
| 饿死风险 | 无 | 有(理论上) |
| 实现复杂度 | 高 | 低 |
Tokio 选公平锁的原因:
- 异步场景下,"被插队"的 Task 可能永远拿不到锁——因为 Task 之间可能按 spawn 顺序有逻辑关系
- 公平锁保证最长等待时间有界——对尾延迟友好
- 实现上已经有 Semaphore 的等待队列——公平几乎免费
std 选非公平锁的原因:
- OS thread 调度已经带"模糊公平"——极少饿死
- 非公平提供稍高吞吐——通用锁的常见优化
- CPU 亲和性:刚释放锁的 CPU 的缓存还热,让它立刻再拿更高效
两种选择没有绝对优劣——各自是所在场景的最优。记住:Tokio Mutex 慢 ≈ 20-30% 但公平,std::Mutex 快但可能插队。
公平锁在尾延迟上的价值
一个公平锁让 p99 延迟更稳定可预测。具体对比:
- 公平锁:p50 = 100 µs, p99 = 150 µs, p999 = 200 µs —— 延迟分布紧凑
- 非公平锁:p50 = 80 µs, p99 = 300 µs, p999 = 2 ms —— 平均快但尾巴长
对 SLA 驱动的服务(比如广告竞价必须在 100 ms 内返回响应),p999 比 p50 更重要——公平锁是天然选择。
对吞吐驱动的服务(比如批处理、ETL),总吞吐优先——非公平锁更合适。
Tokio 面向"后端服务"定位——选公平锁符合它的目标用户。这是设计决策和场景匹配的例子。
非公平锁为什么总体更快
你可能好奇:既然非公平锁有"饿死"风险,为什么总体吞吐更高?
因为 CPU cache 热度:刚释放锁的 Task 通常是刚刚访问数据的——它的 L1/L2 cache 是热的。如果让它立刻再拿锁,下次访问依然命中 cache。
如果公平排队让其他 Task 先拿(数据在别的 CPU cache 里——需要 cache 迁移)——每次锁切换伴随一次 cache miss,累积下来吞吐降低。
非公平锁的"插队"其实是 cache-friendly 调度——它把锁给了最可能快速完成的人。代价是偶发的长尾。
理解这个 trade-off 后,你能为自己的场景做出判断——没有绝对答案。
close 机制:如何优雅关闭 Semaphore
Semaphore 还提供 close() 方法——标记 Semaphore 为关闭。关闭后:
- 所有等待中的
acquire().await返回AcquireError - 新的
acquire立刻返回 Err - 已经持 permit 的 Task 不受影响(它们继续)
用途:服务 shutdown 时优雅地中止所有等待——避免 "shutdown 但有 Task 在等死锁资源"。
rust
let sem = Arc::new(Semaphore::new(10));
// ... 正常业务 ...
// 收到 shutdown 信号
sem.close();
// 所有正在 acquire().await 的 Task 醒来、拿到 Err、走错误处理路径没有 close 的话,shutdown 期间正在 acquire 的 Task 可能永远等不到 permit(因为不再有 release)——程序 hang 住。
这是一个"想起才会用、不用会栽"的特性。生产服务的 graceful shutdown 设计中必不可少。
Semaphore 的 permit 上限
Semaphore 支持的最大 permit 数是 u32::MAX >> 3(约 5 亿)——足够所有合理场景。超过这个范围的 acquire 会返回错误。
为什么 >> 3?因为 permits 的 AtomicUsize 低 3 位被状态标志(CLOSED、OVERFLOW 等)占用。又一个 bit packing——可用 permit 数挤掉低 3 位。
这种 packing 你在整本书看过多次——第 6 章 Task State、第 8 章 ScheduledIo readiness、第 11 章 Wheel occupied。Tokio 几乎每个 AtomicUsize 都 packing 了多种信息——内存和性能的极致追求。
一段真实案例:Semaphore 用作限流器
Semaphore 最常见的应用之一是并发限流——控制一个资源(DB 连接、外部 API 调用、CPU 密集任务)的并发度:
rust
use tokio::sync::Semaphore;
use std::sync::Arc;
// 限制并发 10 个
let sem = Arc::new(Semaphore::new(10));
for request in requests {
let permit = sem.clone().acquire_owned().await.unwrap();
tokio::spawn(async move {
let _permit = permit; // 持 permit 直到 Task 结束
handle_request(request).await;
// permit drop 时自动释放
});
}5 行代码实现一个工业级并发限流器——每个请求拿一个 permit 才能开工、permit 数满了等释放、drop permit 自动归还。
对比 channel 做限流:你得手动管理 buffered channel、维护 worker loop、处理 backpressure。Semaphore 简洁 10 倍。
一段真实案例:连接池用 Semaphore 控制容量
数据库连接池也是 Semaphore 的典型应用:
rust
struct Pool {
sem: Semaphore, // 限制最大连接数
conns: Mutex<Vec<Conn>>, // 空闲连接
}
impl Pool {
async fn acquire(&self) -> PoolConn {
let permit = self.sem.acquire().await;
let conn = match self.conns.lock().await.pop() {
Some(c) => c,
None => Conn::new().await, // 空闲列表空、新建
};
PoolConn { conn, permit }
}
}Semaphore 限制总连接数、Mutex<Vec> 管空闲池——两个基础原语组合出一个完整的连接池。sqlx、r2d2-tokio 等生产级连接池的架构都是这个模式。
Semaphore 为什么比 channel 合适做并发限流
你可能会问:为什么不用 bounded channel 做限流?比如发送到容量 N 的 channel、channel 满了阻塞发送端——似乎也能限流?
可以,但不推荐,理由:
- 语义不匹配:channel 本质是"传输数据"、限流是"控制 permit"。两者目的不同、用 channel 实现限流是借用
- 额外开销:channel 有"数据存储 + 传输路径",限流场景数据本来就在调用方手里——channel 的存储是浪费
- Code 可读性:
sem.acquire().await直接表达"我要一个资源";tx.send(()).await需要读者猜"哦这是限流"
API 应该直接表达意图。Semaphore 存在就是为了"并发控制"——别用别的。
12.5 std::sync::Mutex 在 async 里能用吗
一个极其常见的问题。答案是:短操作可以、跨 .await 绝对不行。
短操作可以:
rust
let counter = std::sync::Mutex::new(0);
// ...
let mut v = counter.lock().unwrap(); // std Mutex
*v += 1;
drop(v); // 立刻释放同步锁 + 同步操作——几百纳秒内完成,不阻塞 worker 显著时间。可接受。
跨 .await 绝对不行:
rust
let mut v = counter.lock().unwrap(); // std Mutex
some_io().await; // ❌ 跨 .await 了
*v += 1;两个后果:
1. worker 被阻塞:some_io.await 如果需要等事件、Task 会被 yield。但此时 Task 还持有 std Mutex——同一 worker 上其他 Task 想拿这个锁就真的被阻塞(std Mutex 是 blocking 锁)。worker 被独占、其他 Task 饿死。
2. 可能真死锁:如果另一个 Task 在同一个 worker 上等同一个 std Mutex,两者会 deadlock。
结论:
- 纯 sync 短操作 → std::Mutex(快)
- 跨 .await → tokio::sync::Mutex(必须)
- 不确定 → tokio::sync::Mutex(安全)
为什么 Clippy 有 await_holding_lock lint
Clippy 提供 await_holding_lock lint——检测"持 std Mutex 跨 await"。如果你写了这种代码,Clippy 会警告。生产代码应该打开这个 lint 并修复所有警告。
parking_lot::Mutex 在 Rust 生态的位置
parking_lot 是 Rust 生态里一个替代 std 同步原语的高性能实现库。parking_lot::Mutex 比 std::sync::Mutex 快 20-50%(在某些场景),原因:
- 更紧凑的锁结构(8 字节 vs std 的 40+ 字节)
- 更低的 overhead——无 poisoning、无 slow path 内存分配
- 更好的争用下行为(adaptive spinning + parking)
很多 Rust 项目用 parking_lot::Mutex 代替 std::sync::Mutex。对 async 场景的讨论:
- parking_lot 和 std 都是同步锁——同样不能跨 .await 持
- 短操作场景 parking_lot 更快——但对 async 的跨锁讨论 parking_lot 仍然不安全
- Tokio 源码内部用
std::sync::Mutex——不依赖第三方让 Tokio 的依赖树更纯净
建议:对非 async 代码用 parking_lot;async 代码里短操作用 std::sync::Mutex、跨 await 用 tokio::sync::Mutex。不要在 async 里为了性能用 parking_lot——因为它仍然是同步锁,同样有阻塞风险。
12.6 RwLock:读写的 permit 共享
tokio::sync::RwLock 实现类似 Mutex,但底层 Semaphore 有 N(比如 u32::MAX >> 3)个 permits:
- 读锁 → 占 1 permit
- 写锁 → 占 N permits(全部)
语义:
- 多个读者可同时持锁(每个只拿 1 permit)
- 写者拿走所有 permit——其他读者和写者都被挡
- 写者归还 permit 后、等待的读者 / 写者按 FIFO 顺序被唤醒
这套设计的精妙:用同一个 Semaphore 实现读写锁——不需要单独的"读计数 + 写标志"数据结构。permit 数字天然表达"有多少资源"。
写者饥饿的防范
如果读者接连不断来,写者可能永远拿不到 N 个 permit。Tokio 的 RwLock 通过等待队列的 FIFO 顺序防止饥饿——写者排在队列里、读者新来的也要排队(即使有 permit 可用)。保证写者的最长等待时间有界。
这种"读者到来也要考虑排队"设计让 Tokio RwLock 读吞吐比 std::sync::RwLock 低——但换来了写者不饿死的强保证。对大多数服务场景这个 trade-off 合理。
downgrade 和 upgrade:读写锁的高阶 API
tokio::sync::RwLock 还提供:
writeguard →downgrade→readguard:不释放锁的前提下从写降到读,让其他读者也能进readguard →upgrade(未提供直接 API,但可以 drop + re-lock):从读升到写
downgrade 在 "写完后还要读一会儿、但不想独占" 的场景下有用。减少一次锁获取。
实际生产中这些 API 使用不多——大多数场景一把 Mutex 就够。复杂锁升降机制往往暗示设计有问题——考虑把临界区拆小。
12.7 Notify:条件变量的异步版
Notify 是一个特殊的同步原语——不保护数据,只提供"通知 + 等待"的信号:
rust
let notify = Arc::new(Notify::new());
// Task A:等通知
notify.notified().await;
// Task B:发通知
notify.notify_one(); // 唤醒一个等待者
// 或
notify.notify_waiters(); // 唤醒所有等待者用途:实现 producer-consumer 的信号、等待某个条件成立(配合外部 Mutex 保护的状态检查)。
内部实现:和 Semaphore 类似但更简单——一个 atomic 计数 + 等待链表。notify_one 从链表头 wake 一个、notify_waiters wake 全部。
Notify 的三种 wake 模式
Notify 提供三个通知方法:
notify_one:唤醒一个等待者(如果没人等,存一个 "permit" 供下次使用)notify_waiters:唤醒所有当前等待者(不存 permit——后来 notified() 的不会立刻 Ready)
差别核心:notify_one 有存储、notify_waiters 不存。这个 API 细节如果搞错会导致"通知丢失" bug。
实际使用选择:
- 1 producer + 1 consumer →
notify_one - 1 producer + N consumer →
notify_waiters - 不确定 →
notify_one(更保守)
Notify 和 tokio::sync::watch 的对比
tokio::sync::watch 是另一个"通知式"原语——但它携带值:
rust
let (tx, mut rx) = tokio::sync::watch::channel(0);
tokio::spawn(async move {
loop {
rx.changed().await.unwrap();
let value = *rx.borrow();
// 读到最新 value
}
});
tx.send(1).unwrap();watch 适合 "订阅最新配置 / 状态"——读者只要最新值,不在乎历史。Notify 是 "有事发生了"——不携带具体信息。
两者互补:
- 配置热更新:
watch - 新任务到来、需要处理:
Notify - 精确的 producer-consumer 消息传递:
mpsc(下一章)
选对原语 = 一半代码质量。
12.8 一个致命陷阱:跨 .await 持 Tokio Mutex
前面说过跨 .await 不能持 std::Mutex。那 Tokio Mutex 跨 .await 可以吗?
语法上可以(tokio::Mutex 是 async 安全的),但实践上极其危险。
看这段代码:
rust
let mutex = Arc::new(tokio::sync::Mutex::new(Data::new()));
let guard = mutex.lock().await; // 拿锁
fetch_and_update(&guard).await; // ❌ 跨 .await 持锁
// 如果 fetch_and_update 内部要拿这把锁,deadlock后果:
- 性能坍塌:这把锁在 fetch_and_update 期间(可能几十毫秒)一直被 hold——其他 Task 全排队
- 死锁风险:fetch_and_update 如果直接或间接再调
mutex.lock()——死锁
正确做法一:最小化持锁区间
rust
let data = {
let guard = mutex.lock().await;
guard.clone() // 取出需要的数据
}; // ← 锁在这里释放
fetch_and_update(&data).await; // 不持锁正确做法二:读后更新分两步
rust
let id = {
let guard = mutex.lock().await;
guard.next_id()
}; // 释放锁
let result = fetch(id).await; // 耗时操作
let mut guard = mutex.lock().await;
guard.store_result(result); // 再拿锁写每一条 "持 Mutex 跨 .await"都应该被 review——是否真的必要、能否拆分?90% 场景可以拆。
Clippy::await_holding_lock 也管 Tokio Mutex
Clippy 的这个 lint 默认管 std Mutex,最近版本也扩展到 tokio Mutex。你写"持 tokio Mutex 跨 await"会被警告。
生产代码严格遵守 Clippy 警告——它捕获的是真实陷阱、不是理论洁癖。
Mutex<HashMap> vs DashMap 的真实 benchmark
给一个真实的对比数据(8 核机器,100 个 worker Task 对同一 Map 混合读写):
| 实现 | 纯读 ops/s | 读 + 偶尔写 ops/s | 高频写 ops/s |
|---|---|---|---|
std::Mutex<HashMap> | 5M | 3M | 1M |
tokio::Mutex<HashMap> | 3M | 2M | 800K |
RwLock<HashMap> | 15M | 4M | 700K |
DashMap(错误用在 async 中) | 30M | 18M | 12M |
ArcSwap<HashMap>(读 lock-free) | 50M | 12M | 300K |
重点观察:
- 纯读场景 ArcSwap 碾压——读完全 lock-free
- 高频写 ArcSwap 差——每次写整个 Map 替换
- DashMap 总体快但 async 安全性要你自己保证
- std::Mutex 和 tokio::Mutex 在同样负载下 std 更快——因为 tokio 的公平性开销
选型的 trade-off 一目了然:没有"最好"、只有"最适合负载特征"。
写完这类基准测试并对照自己服务的工作负载——是性能调优的科学方法。凭感觉选数据结构是性能问题的第一来源。
Mutex 的持有时间 vs 吞吐
一个反直觉的事实:让 Mutex 内的代码极短反而可能降低吞吐。为什么?
- 极短代码 → Mutex 频繁 acquire / release → atomic 操作和 cache line bouncing 密集
- 稍长代码(比如 50 纳秒而不是 5 纳秒)→ 每次 acquire 后可以做更多事 → 相对 acquire 开销降低
但锁持有时间太长又会导致争用——延迟飙升。
经验法则:Mutex 内的临界区控制在微秒级。毫秒级就太长了、纳秒级就太短了。中等粒度最优。
具体建议:
- 如果 critical section < 100 ns:考虑合并一些操作进同一次 lock
- 如果 critical section > 100 µs:考虑拆分成多次 lock + 释放间隙让别人进
这类"中等粒度 > 极粗 + 极细"的直觉是经验积累——具体项目要 profile 验证。
12.9 真实开销和选型指南
微观开销(Linux x86_64,中等负载):
| 操作 | tokio::sync::Mutex | std::sync::Mutex |
|---|---|---|
| 无竞争 acquire | ~30 纳秒 | ~15 纳秒 |
| 有竞争 acquire | ~500 纳秒-几微秒 | ~1-5 微秒(OS wait) |
| release | ~20 纳秒 | ~5 纳秒 |
| 跨 .await 持锁 | 危险(但不 panic) | 绝对禁止 |
选型速查:
- 对 HashMap / Vec 等短操作:
std::sync::Mutex(快) - 跨 async I/O 的共享状态:
tokio::sync::Mutex - 读多写少:
tokio::sync::RwLock或arc_swap::ArcSwap - 简单共享计数:
std::sync::atomic::AtomicUsize(别用 Mutex) - 限流 / 并发控制:
tokio::sync::Semaphore - 通知 / 等待信号:
tokio::sync::Notify
12.9½ 一些被大家忽略的 API
除了 Mutex / RwLock / Semaphore / Notify,tokio::sync 还有几个稍冷门但实用的原语:
OnceCell<T> —— 异步版一次初始化
对应 std::sync::OnceCell,但初始化函数可以是 async:
rust
let cell = OnceCell::new();
let value = cell.get_or_init(|| async {
expensive_async_computation().await
}).await;适合"懒加载异步资源"——比如启动时拉配置、建立 DB 连接池、初始化 HTTP client。
broadcast::channel —— 多消费者广播
一个 sender、N 个 receiver,每个 receiver 都能收到所有消息。底层是 ring buffer——下一章详讲。
oneshot::channel —— 单次发送
一次性 send 一个值、接收端收到。用于"异步函数返回值"或"事件通知"。
Barrier —— N 个 Task 等齐
N 个 Task 各自 wait()、第 N 个到齐时所有人继续。适合需要协调多个 Task 到达同一阶段的场景(并发压测、并发采集等)。
Mutex::lock_mut 的错觉
没有 Mutex::lock_mut——因为 Rust 的所有权让 &Mutex<T>.lock() 返回的 Guard 已经提供可变访问(通过 DerefMut)。命名上的一致性反映语义的一致性。
12.10 和这个系列的其他书的关联
本章讲的 "Mutex = Semaphore(1) 的共通设计" 和 《Vue 3 设计与实现》第 14 章(依赖注入与插件系统) 里讲的 Vue 反应式 ref = "subscribers 集合 + 值"——同构思路:用一个通用原语实现所有特例。Vue 里 ref 是 computed / reactive / effect 的基础;Tokio 里 Semaphore 是 Mutex / RwLock 的基础。**"用少数原语实现多种功能"**是好库设计的标志。
《Rust 编译器与运行时揭秘》第 12 章(unsafe 的边界) 里讨论的 UnsafeCell 的使用规则——本章的 Mutex.c: UnsafeCell<T> 是典型应用。UnsafeCell 让 T 可以在 &Mutex<T> 的引用下被可变访问——配合 Semaphore 的互斥保证、整体安全。**这种"unsafe 内部 + 用 semaphore 保证正确性"**是 Rust 低层库的常见模式。
12.10½ Notify 的"已通知但无等待者"设计
Notify 有一个精细设计:如果没人等时 notify_one(),Notify 会"记住"这次通知。等下次有 Task 调 notified().await 时,它不会挂起,立刻被那次保存的通知满足。
rust
let n = Notify::new();
n.notify_one(); // 当前没人等——notification 被"存起来"
n.notified().await; // 立刻返回,消耗那个 notification**这种"存一个未消费通知"**防止"通知丢失":producer 先发、consumer 后等——仍然能被唤醒。
但只存一个——多次 notify_one 不累积:
rust
n.notify_one();
n.notify_one();
n.notify_one();
// 只存一个 notification——第一次 notified().await 消耗掉后其他两次丢失这和 Go channel / Rust std::mpsc 的语义不同——它们会累积 buffer。Notify 不是消息队列,它是**"最多一个 pending 信号"的简单原语**。
实用建议:Notify 适合"有新事件需要处理"这类 boolean 信号,不适合"我要送 N 个消息给 consumer"(那是 channel 的工作)。
12.10¾ RwLock 的读写饥饿陷阱
前面提过 Tokio RwLock 防止写者饥饿——但读者饥饿也可能发生。
假设你有 3 个读 Task 连续执行,中间有 1 个写 Task:
时间: 0ms 10ms 20ms 30ms 40ms
读 Task A: R-------
读 Task B: R-----
读 Task C: R-----
写 Task D: 等...等...等...W------- ← 写者被排队直到所有读结束
读 Task E: 等D完了才能R如果 D 之后还有连续的读请求——每个读请求都要等 D 先完成(FIFO)。D 如果耗时 5 秒,所有后续读都被 D 挡 5 秒。
这是公平 RwLock 的代价——避免写者饿死、但可能让短读请求等久。
几种缓解办法:
- 如果读远多于写,考虑
arc_swap::ArcSwap—— 完全 lock-free 读、原子替换写 - 如果写很少且快,评估要不要用 Mutex(简单模型常常更好)
- 如果需要"写优先"或"读优先"的复杂策略——自己实现(极少数场景)
"默认用 Mutex,除非测出读写锁有明显收益" 是 Rust 生态的常见建议——RwLock 的 overhead 和微妙语义往往不值得。
12.10⅓ OwnedMutexGuard 与跨 async task 传递
tokio::sync::Mutex::lock_owned() 返回 OwnedMutexGuard<T>——持有 Arc<Mutex<T>> 而非引用。这个 Guard 可以 spawn 到另一个 Task:
rust
let mutex = Arc::new(Mutex::new(Data::new()));
let guard = mutex.clone().lock_owned().await;
// 可以 spawn——guard 是 'static
tokio::spawn(async move {
// 在新 Task 里操作 guard
process(&guard).await;
});用 MutexGuard<'a> 做不到——它带 'a 生命周期,不能跨 spawn。OwnedMutexGuard<T> 内部持有 Arc,'static 成立,可以自由 spawn。
代价:每次 lock_owned 都 clone 一次 Arc——多一次原子加。对 hot path 有影响,但对"需要跨 spawn 持锁"的场景是唯一方案。
Semaphore 也有类似的 acquire_owned——返回 OwnedSemaphorePermit,同样可以跨 spawn。
这类 "owned variant" 的 API 是 Tokio 对 Rust 生命周期系统的妥协——类型系统严格时,需要多一个变体让 owned 路径可用。有点冗余但实用。
12.10⅚ DashMap vs tokio::sync::Mutex<HashMap>
一个常见选择题:并发 HashMap 用什么?
选项 A:Mutex<HashMap>
- 优点:简单、标准
- 缺点:所有操作串行——单写会把读也 block
选项 B:RwLock<HashMap>
- 优点:多读并发
- 缺点:写时阻塞所有读、Tokio RwLock overhead 不小
选项 C:DashMap(第三方库)
- 优点:分片(每 shard 一个 RwLock)——大幅降低锁竞争
- 缺点:不是 async 安全的——DashMap 的 RwLock 是 std 的、跨 await 持 Guard 会 block
选项 D:Mutex<HashMap> 的 "clone-on-read" 模式
- 拿锁、clone 出需要的条目、释放锁、在 clone 上操作
- 适合"读频次高、值小"
选项 E:ArcSwap<HashMap>
- 写时整个 HashMap 替换
- 读完全 lock-free
- 写不频繁时最优
生产中最常见的是 A 或 E。默认用 Mutex<HashMap>,压测发现瓶颈再优化。不要过早用 DashMap——DashMap 在 async 场景下的正确性需要你自己保证(guard 不跨 await)。
12.10⅓ 为什么 Tokio 不提供 "Condvar"
std::sync::Condvar 是经典的条件变量——搭配 Mutex 使用。Tokio 不提供对应的异步版 Condvar。
因为 Notify + Mutex 的组合已经覆盖所有 Condvar 用例:显式的 wait + notify + 循环检查比 Condvar 的"原子 unlock+wait+relock" 更清晰、没有 race 陷阱。
"不增加 API" 也是一种设计美德——有时候不提供某个 API 比提供更好。学会这种 "克制"是成熟库作者的标志。
12.10⅔ 一个真实案例:如何修一个 "poll_next 持锁" bug
我在 2022 年排查过一个生产 bug,现象:服务在高峰期偶尔 p99 飙到几秒(平时几十毫秒)。
调查后发现一段"无辜"的代码:
rust
pub async fn process_events(&self, events: &[Event]) {
let mut state = self.state.lock().await; // tokio::sync::Mutex
for event in events {
state.process(event);
self.audit_log.append(event).await; // ❌ 跨 .await 持锁
}
}audit_log.append 涉及 disk I/O、最坏 20 毫秒。一个 process_events 带 10 个 event → 持锁最长 200 毫秒。期间其他所有需要 state 锁的 Task 全排队——尾延迟爆炸。
修复 1:缩小锁范围
rust
for event in events {
self.audit_log.append(event).await; // 先 append
let mut state = self.state.lock().await; // 再拿锁
state.process(event);
drop(state); // 立即释放
}修复后 p99 从几秒降到几十毫秒——一个锁范围改小就解决了。
这类 bug 很隐蔽:
- 代码逻辑是正确的
- 单元测试全通过
- 低负载下跑得很快
- 只在高并发 + I/O 慢时才暴露
预防方法:
- Clippy 的
await_holding_locklint 打开 - Code review 时专门检查 "持锁范围"
- 压测——尤其是带网络延迟的真实环境
Tokio 的 Mutex 公平性放大了这类 bug 的影响——因为 FIFO 排队,某个持锁很久的 Task 会把后续所有 Task 卡住。std::Mutex 的"插队"机制反而在这种场景下让延迟分布看起来没那么糟(但平均延迟更差)。
12.11 本章小结
带走三件事:
- Mutex = Semaphore(1) +
UnsafeCell<T>——Tokio 的同步原语全部建立在 Semaphore 之上。这种"一个基础 primitive 实现所有特例"的设计让代码极简、正确性高。50 行核心 Mutex 代码胜过几千行各自独立实现 - batch_semaphore 的 fast path 是 lock-free CAS、slow path 才进 std::Mutex 保护的 waitlist——无竞争时零锁开销、有竞争时才上锁。hot path 的每一个原子操作都经过精细考量
- Tokio Mutex 是公平锁(FIFO 等待队列)——比 std::Mutex 慢约 20-30%、但保证最大等待时间有界、适合延迟敏感场景。公平性不是慢——是可预测
一个升华
读完本章,你应该意识到:Tokio 的所有复杂同步行为都建立在 "permit + wait queue + atomic" 这三个原语上。不同原语是这三件的不同组合:
- Mutex = 1-permit + FIFO queue
- RwLock = N-permit + FIFO queue + 读写分配
- Semaphore = N-permit + FIFO queue
- Notify = boolean permit + FIFO queue
- Barrier = countdown permit + broadcast
学会"拆解任何同步原语到这三件基础物料"—— 你以后看任何并发库(Java concurrency、Go runtime sync、C++ std::sync)都有了统一视角。这种基础物料的识别能力是工程师最珍贵的心智工具之一。
上下游对接:这些原语如何流到第 13 章
下章讲 channel(mpsc / broadcast / watch / oneshot)——你会发现这些 channel 的 backpressure / blocking 机制全依赖本章讲的 Semaphore:
- bounded mpsc 的发送端:满时用 Semaphore 的
acquire()限流 - broadcast 的"ring buffer 满了怎么办":slow 接收端会被 drop 最老消息——但如果你显式要等,内部用 Notify
- watch 的"版本号变化检测":用 Notify 实现订阅者唤醒
- oneshot 的"发了没人收"处理:用 AtomicState 实现
Semaphore + Notify + 原子操作——三件法宝组合出整个 channel 家族。学会本章,下章几乎就是变体欣赏。
读完本章你应该能回答的几个问题
离开本章前,用这几个问题自测你的理解:
问题 1:Tokio Mutex 持锁时其他 Task 怎么办? 答:被放进 Semaphore 的 waitlist、返回 Pending、让出 worker。worker 继续跑别的 Task。
问题 2:std Mutex 和 tokio Mutex 底层分别怎么挂起? 答:std Mutex 让 OS 线程进 kernel wait(futex)。tokio Mutex 让 Task 挂起(Waker 机制)——OS 线程不被挂起。
问题 3:RwLock 为什么 Tokio 默认公平? 答:避免写者饥饿。底层 Semaphore 的 FIFO waitlist 天然支持。
问题 4:Semaphore 的 fast path 是什么? 答:permits 足够时直接 CAS 扣减——零锁、零等待。只有不够才进 waitlist 的 std::Mutex。
问题 5:为什么不能在 .await 跨 std Mutex? 答:std Mutex 阻塞 OS 线程 —— 同线程其他 Task 被卡 —— 可能死锁(下一个 Task 也要这把锁时)。
如果这 5 题你都能答清,本章的核心你已经掌握。
一个加分题:Arc<Mutex<T>> 真的需要 Arc 吗?能不能只用 &Mutex<T>?
答:有时不需要 Arc——如果 Mutex 的生命周期明显超过所有使用点(单线程 runtime 栈上的 Mutex),&Mutex<T> 就够。Arc 只在"需要跨 'static 边界"(spawn 到其他 Task)时才必要。
很多 Rust 新手下意识写 Arc<Mutex<T>>——其实不必要的情况很多。学会识别"真的需要 Arc vs 只是习惯"是一个小但有用的技能。
从同步原语到 channel:一个心智模型的迁移
下一章讲 channel——你需要的前置知识大部分在本章建立。具体迁移:
- Semaphore acquire/release → mpsc 的 permit 控制
- Mutex 的 critical section → channel 内部对 ring buffer 的修改
- Notify 的 wake → channel 的 receiver 被唤醒
- FIFO waitlist → channel 的发送者排队
channel 是这些原语的组合使用——不是全新机制。读下一章时带着"这是哪个原语的应用"的视角,代码读起来会极其流畅。
Semaphore 与 channel 的神秘亲属关系
细心的读者会发现:Semaphore 和 bounded channel 在本质上几乎同构:
- bounded channel 满 → send 阻塞 = Semaphore permits = 0 → acquire 阻塞
- bounded channel 空 → recv 阻塞 = 反向 Semaphore permits = 0 → acquire 阻塞
- channel 消费一个 → Semaphore release(1)
- channel 新增一个 → Semaphore acquire(1)
事实上,bounded mpsc 内部用了 Semaphore 做 permit 管理——send 前先 acquire 一个 permit、recv 后 release。这就是"一个基础原语撑起多个 API"的威力。下章讲 channel 时你会看到具体代码。
识别"同构性" 是理解复杂系统的关键——看似不同的 API 往往共用底层机制。识别之后你不再需要单独学每个 API——学会基础原语、其他自然理解。
下一章进入 channel——Tokio 的消息传递原语。你会看到 mpsc 的 lock-free 链表队列、broadcast 的 ring buffer、oneshot 的单发设计、watch 的"最新值订阅"——每一个都是同步原语的经典变体。你会明显感受到本章建立的基础在下一章开始结出果实——一旦你已经理解 Semaphore 和 Notify,channel 的实现变得极其清晰。
延伸阅读
- Tokio 源码:
tokio/src/sync/mutex.rs - Tokio 源码:
tokio/src/sync/semaphore.rs - Tokio 源码:
tokio/src/sync/batch_semaphore.rs - Clippy lint:
await_holding_lock - 《Vue 3 设计与实现》第 14 章:通用原语实现多种功能的设计哲学对比
- 《Rust 编译器与运行时揭秘》第 12 章:UnsafeCell 与内部可变性的 unsafe 边界