Skip to content

第12章 异步 Mutex / RwLock / Semaphore

"In a well-designed async library, every lock is just a semaphore in disguise." —— 笔者

本章要点

  • tokio::sync::Mutex<T> 的真实定义:3 个字段——Semaphore (内部 permits=1) + UnsafeCell<T> + 可选的 tracing span。Mutex 就是"一个只能有一个持有者的 Semaphore"
  • Semaphore 是 Tokio 所有同步原语的 building block——Mutex、RwLock、通道的背压、Notify 都在其上构建
  • 底层 batch_semaphore::Semaphore 的结构waiters: Mutex<Waitlist>(侵入式链表 + 标准 Mutex 保护)+ permits: AtomicUsize(可用 permit 数)
  • Tokio Mutex 是公平锁(FIFO 排队)——和 std::sync::Mutex 的非公平语义有区别
  • MutexGuard::drop 调 lock.s.release(1)——释放 permit、触发下一个等待者
  • RwLock = Semaphore(N) + 读写各占 permit——写锁占 N permit、读锁占 1 permit
  • 绝不在 .await 跨越持锁——不是因为"性能差",是因为可能死锁:被等待的 Future 想要获取同一把锁

12.0½ 一个直觉问题:async 需要自己的 Mutex 吗

在深入具体实现前,想一下:std::sync::Mutex 已经存在、为什么 Tokio 还要搞一套

原因在"锁被占时会发生什么":

std::sync::Mutex:拿锁失败 → 当前OS 线程进 kernel wait 队列、CPU 让给其他线程。整个线程被挂起

Tokio 场景:一个 worker 线程可能跑着几千个 Task。如果一个 Task 被 std Mutex 挂起 → 整个 worker 挂起 → 上面几千个 Task 全被卡住。灾难

tokio::sync::Mutex:拿锁失败 → 当前 Task 返回 Pending、注册 Waker、让出给 runtime。OS 线程不被挂起——它继续跑别的 Task。锁好了后 Waker 唤醒那个 Task。

这是 async 同步原语的本质把"阻塞线程"替换成"挂起 Task"。Rust std 没有 Task 概念——所以必须 Tokio 自己提供 async 版本。

这就是为什么 tokio::sync:😗 存在。不是重复造轮子——是必要的。读本章前想清楚这点——后面所有设计都围绕"Task 让步 + Waker 唤醒"展开。


12.1 Tokio 同步原语全景

Tokio sync 模块提供了一整套异步同步原语:

原语用途底层实现
Mutex<T>互斥访问Semaphore(1) + UnsafeCell
RwLock<T>读写锁Semaphore(N) + 读写 permit 数分配
Semaphore信号量(限流、并发控制)直接就是 batch_semaphore
Notify条件变量(wake up 一个 / 多个等待者)独立实现,但思路相似
Barrier集合屏障基于 Notify
OnceCell<T>一次初始化基于 Semaphore

核心洞察Semaphore 是一切的基础。理解了 Semaphore,就理解了 Mutex、RwLock 等。所以本章先讲 Mutex(最小案例),再讲 Semaphore(基础机制),再讲 RwLock / Notify(变种)。


12.2 Mutex 的惊人简单

打开 tokio/src/sync/mutex.rs原样

rust
// 来源:tokio-rs/tokio · tokio/src/sync/mutex.rs (tokio-1.40.0)
pub struct Mutex<T: ?Sized> {
    #[cfg(all(tokio_unstable, feature = "tracing"))]
    resource_span: tracing::Span,
    s: semaphore::Semaphore,
    c: UnsafeCell<T>,
}

3 个字段(非 tracing 下 2 个有效字段):

  • s: semaphore::Semaphore —— 一个 permit = 1 的 Semaphore
  • c: UnsafeCell<T> —— 存放被保护的数据
  • resource_span(可选) —— tracing 观测

这就是 Tokio Mutex 的全部结构。让我们想想它的语义

  • 初始化:Semaphore::new(1) —— 一个可用 permit
  • lock().await → 调 semaphore 的 acquire() → 如果拿到 permit,返回 MutexGuard;否则等
  • MutexGuard drop → semaphore.release(1) —— 归还 permit

Mutex = "只有一个 permit 的 Semaphore 包装可变数据"。这是极简、正确、通用的设计。

new 的实现

rust
// 来源:tokio/src/sync/mutex.rs
pub fn new(t: T) -> Self where T: Sized {
    let s = semaphore::Semaphore::new(1);
    Self {
        c: UnsafeCell::new(t),
        s,
        // ...
    }
}

几行代码。构造一个 Semaphore(1)、包装数据

lock 的实现

rust
// 来源:tokio/src/sync/mutex.rs
pub async fn lock(&self) -> MutexGuard<'_, T> {
    let acquire_fut = async {
        self.acquire().await;
        MutexGuard {
            lock: self,
            // ...
        }
    };
    // ...
    acquire_fut.await
}

async fn——说明 lock 是 yield 点。内部 await self.acquire()——该方法调 Semaphore 的 ll_sem.acquire(1)一个 permit 拿不到就挂起

拿到 permit 后构造 MutexGuard 返回。MutexGuard 是一个 RAII guard——drop 时自动释放。

try_lock 的非阻塞尝试

rust
pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError> {
    match self.s.try_acquire(1) {
        Ok(()) => {
            let guard = MutexGuard { lock: self, /* ... */ };
            Ok(guard)
        }
        Err(_) => Err(TryLockError(())),
    }
}

非 async——try_acquire 是同步方法,要么成功要么失败返回 Err。不等、不让步、立刻返回。用在"我试试拿锁,拿不到就做别的"场景。

MutexGuard 的 Drop

rust
impl<T: ?Sized> Drop for MutexGuard<'_, T> {
    fn drop(&mut self) {
        self.lock.s.release(1);
    }
}

一行——把 permit 还回 Semaphore。这触发等待队列里最前面的 Task——第 12.4 节会看到 release 如何唤醒等待者。

为什么设计这么简单

核心洞察并发控制的本质是 "permit"——你有 permit 就能做事、没有就等。Mutex 只是"permit 数 = 1"的特例、RwLock 是"读 permit + 写 permit"的扩展、限流器是"permit 数 = N"的应用。

用一个通用 primitive 实现所有同步原语——减少重复代码、减少 bug 可能性、让每个原语都继承基础的正确性。这是软件设计最珍贵的美学。


SendSync 的实现

Mutex 需要手动实现 Send / Sync

rust
// 简化:Tokio 的 Mutex 实现(示意)
unsafe impl<T: Send + ?Sized> Send for Mutex<T> {}
unsafe impl<T: Send + ?Sized> Sync for Mutex<T> {}

为什么需要 unsafe impl:因为 UnsafeCell<T> 本身不是 Send / Sync——必须手动承诺。Tokio 通过互斥保证(同一时刻只有一个 Task 能访问 T)让这个承诺成立。

T: Send 是要求——因为 T 可能被不同 worker 线程访问(Task 被迁移)。Sync 也同理——多个引用可能同时存在(&Mutex<T> 可以 clone 给多处)。

这些 Send/Sync bound 影响你能用什么类型

  • Mutex<String>:Send + Sync ✓
  • Mutex<Rc<T>>Rc 不是 Send——Mutex<Rc<T>> 也不是 Send——不能 spawn
  • Mutex<*const T>裸指针不是 Send——同上

如果你需要保护 !Send 数据,用 std::sync::Mutextokio::sync::Mutex 都不行——需要 tokio::task::LocalSet 里的 !Send 上下文(第 7 章讲过)。


Mutex 的 UnsafeCell<T>:为什么需要

Mutex 的第三个字段 c: UnsafeCell<T>。为什么是 UnsafeCell 而不是 T 直接?

因为 Rust 的借用规则不允许 &Mutex<T> 转成 &mut T——& 是共享引用、Rust 默认禁止通过共享引用修改数据。

UnsafeCell 是 Rust 里"内部可变性" 的唯一基础原语——它是编译器承认的"即使 &Mutex、内部的 T 也可以通过 unsafe 被可变访问"。Mutex 通过 Semaphore 保证互斥、从而让这个 unsafe 访问在语义上安全。

如果不用 UnsafeCell:Mutex 根本没法工作——多个线程拿着 &Mutex<T>(都是共享引用),没法修改 T。UnsafeCell 是整个 Rust 内部可变性生态(Cell、RefCell、Mutex、RwLock、AtomicXXX)的共同基础。

Mutex 只是 2 个有效字段

再回看 Mutex 的定义(非 tracing 模式):

rust
pub struct Mutex<T: ?Sized> {
    s: semaphore::Semaphore,
    c: UnsafeCell<T>,
}

就 2 个字段——一个 Semaphore、一个 UnsafeCell<T>50 行不到的 mutex.rs 核心代码就能写出整个工业级异步 Mutex——这就是"原语合适、代码就少"的证明。

对比 Linux kernel 的 struct mutex(几十个字段、几千行代码)——当然 kernel 有更极端的性能和正确性要求。但对用户态 async 场景,Tokio 的极简设计已经足够更好维护


12.3 Semaphore:基础 building block

tokio::sync::Semaphore 的外层:

rust
// 来源:tokio/src/sync/semaphore.rs
pub struct Semaphore {
    ll_sem: ll::Semaphore,
    // ...
}

就一个 ll_sem: ll::Semaphore——llbatch_semaphore 的别名。外层 Semaphore 是 API 门面、实际逻辑在 batch_semaphore

batch_semaphore 的 struct:

rust
// 来源:tokio/src/sync/batch_semaphore.rs
pub(crate) struct Semaphore {
    waiters: Mutex<Waitlist>,
    permits: AtomicUsize,
    // ...
}

2 个核心字段

  • waiters: Mutex<Waitlist> —— 等待队列(侵入式链表)。外层用 std::sync::Mutex 保护(不是 tokio::sync::Mutex——否则循环依赖)
  • permits: AtomicUsize —— 当前可用 permit 数

两者协作完成整个 Semaphore 的语义

Waiter 结构

rust
struct Waiter {
    state: AtomicUsize,
    waker: UnsafeCell<Option<Waker>>,
    pointers: linked_list::Pointers<Waiter>,
    // ...
    _p: PhantomPinned,
}

每个等待 Task 都有一个 Waiter 节点:

  • state: AtomicUsize —— 当前状态(等待多少 permit、是否 ready、是否 cancelled)
  • waker: UnsafeCell<Option<Waker>> —— 该 Task 的 Waker(好了要 wake 它)
  • pointers —— 侵入式链表节点(放进 Waitlist)
  • PhantomPinned —— 标记自引用、不可 move(标准的 Pin 要求)

poll_acquire 的逻辑

poll_acquire 是一个 Future 的 poll 方法(实际封装在 Acquire Future 里)。简化逻辑:

rust
// 简化自 batch_semaphore.rs
fn poll_acquire(&self, cx: &mut Context, num_permits: u32, node: Pin<&mut Waiter>) -> Poll<Result<(), AcquireError>> {
    // 1. 尝试无锁获取(fast path)
    let mut permits = self.permits.load(Acquire);
    loop {
        if permits < num_permits as usize {
            break;  // 不够,进入慢路径
        }
        match self.permits.compare_exchange_weak(permits, permits - num_permits as usize, AcqRel, Acquire) {
            Ok(_) => return Poll::Ready(Ok(())),   // 拿到
            Err(actual) => permits = actual,       // 冲突,重试
        }
    }

    // 2. Fast path 失败,进慢路径
    let mut waiters = self.waiters.lock().unwrap();
    // 再次检查 permits(可能在 lock 期间有人 release)
    // ...
    // 注册 waker 到 node、把 node push 到 waiters list
    node.waker.set(Some(cx.waker().clone()));
    waiters.push_back(node);
    Poll::Pending
}

两个阶段

  1. Fast path(lock-free):直接 CAS 减 permits。成功 → 立刻返回 Ready
  2. Slow path:permits 不够 → 拿 Mutex 锁 → 把 waiter 放入链表 → 返回 Pending

Tokio 同步原语的 hot path 是完全 lock-free——只有真的需要等才走 Mutex。高并发下 Mutex 的锁竞争被最小化

release 的逻辑

rust
// 简化自 batch_semaphore.rs
pub(crate) fn release(&self, added: usize) {
    self.permits.fetch_add(added, Release);
    
    let mut waiters = self.waiters.lock().unwrap();
    
    // 从链表头部依次 wake,直到 permits 不够
    while let Some(waiter) = waiters.peek_front() {
        let needed = waiter.remaining_permits();
        let available = self.permits.load(Acquire);
        if available < needed {
            break;
        }
        // 扣减 permits、从链表移除、wake
        self.permits.fetch_sub(needed, Release);
        waiters.pop_front();
        waiter.wake();
    }
}

核心:release 不只是加 permit、它还要 wake 等待的 Task。从链表头部(FIFO)依次检查——如果 waiter 需要的 permit 数不超过当前可用数,就 wake 它。

FIFO 保证公平性——第一个等待的 Task 最先被唤醒。这是 tokio::sync::Mutex 是公平锁的机械原因


为什么 batch_semaphore 内部要用 std::sync::Mutex

batch_semaphore::Semaphorewaiters: Mutex<Waitlist> 保护等待链表。注意这里必须是 std::sync::Mutex——不能用 tokio::sync::Mutex。为什么?

因为循环依赖:tokio::sync::Mutex 内部就是 Semaphore(1)——如果 Semaphore 内部又用 tokio::sync::Mutex,两者互相依赖无法初始化

std::sync::Mutex 在这里安全的原因

  • 锁内代码极短(几行链表操作)——微秒内完成
  • 不跨任何 .await——完全同步代码
  • 持锁时间短到不会阻塞 worker 显著时间

这是一个例外——本章前面说"跨 .await 不能用 std Mutex",但 Tokio 内部自己的实现里用 std Mutex 保护 hot path 的短操作。例外不矛盾——判断标准是"会不会跨 .await"而不是"能不能用"。

这种 "底层用 std 同步原语做极短操作、上层暴露 async 接口" 的 pattern 在 Tokio 源码里反复出现——Waitlist、OwnedTasks、RegistrationSet 都是这种设计。正确使用 std 和 tokio 同步原语的关键是理解它们各自的适用边界


12.4 公平锁 vs 非公平锁的 trade-off

公平锁(FIFO):先等的先得。tokio::sync::Mutex 是这种非公平锁(barging):新来的可能"插队"——释放锁的瞬间刚好有线程尝试 acquire、直接拿走。std::sync::Mutex 是这种(Linux 上的 parking_lot 底层)。

两者 trade-off:

维度公平锁非公平锁
延迟分布均匀波动大(有的快有的慢)
吞吐稍低稍高
饿死风险有(理论上)
实现复杂度

Tokio 选公平锁的原因

  • 异步场景下,"被插队"的 Task 可能永远拿不到锁——因为 Task 之间可能按 spawn 顺序有逻辑关系
  • 公平锁保证最长等待时间有界——对尾延迟友好
  • 实现上已经有 Semaphore 的等待队列——公平几乎免费

std 选非公平锁的原因

  • OS thread 调度已经带"模糊公平"——极少饿死
  • 非公平提供稍高吞吐——通用锁的常见优化
  • CPU 亲和性:刚释放锁的 CPU 的缓存还热,让它立刻再拿更高效

两种选择没有绝对优劣——各自是所在场景的最优。记住:Tokio Mutex 慢 ≈ 20-30% 但公平,std::Mutex 快但可能插队。

公平锁在尾延迟上的价值

一个公平锁让 p99 延迟更稳定可预测。具体对比:

  • 公平锁:p50 = 100 µs, p99 = 150 µs, p999 = 200 µs —— 延迟分布紧凑
  • 非公平锁:p50 = 80 µs, p99 = 300 µs, p999 = 2 ms —— 平均快但尾巴长

对 SLA 驱动的服务(比如广告竞价必须在 100 ms 内返回响应),p999 比 p50 更重要——公平锁是天然选择

对吞吐驱动的服务(比如批处理、ETL),总吞吐优先——非公平锁更合适

Tokio 面向"后端服务"定位——选公平锁符合它的目标用户。这是设计决策和场景匹配的例子。

非公平锁为什么总体更快

你可能好奇:既然非公平锁有"饿死"风险,为什么总体吞吐更高?

因为 CPU cache 热度:刚释放锁的 Task 通常是刚刚访问数据的——它的 L1/L2 cache 是热的。如果让它立刻再拿锁,下次访问依然命中 cache

如果公平排队让其他 Task 先拿(数据在别的 CPU cache 里——需要 cache 迁移)——每次锁切换伴随一次 cache miss,累积下来吞吐降低。

非公平锁的"插队"其实是 cache-friendly 调度——它把锁给了最可能快速完成的人。代价是偶发的长尾

理解这个 trade-off 后,你能为自己的场景做出判断——没有绝对答案


close 机制:如何优雅关闭 Semaphore

Semaphore 还提供 close() 方法——标记 Semaphore 为关闭。关闭后:

  • 所有等待中的 acquire().await 返回 AcquireError
  • 新的 acquire 立刻返回 Err
  • 已经持 permit 的 Task 不受影响(它们继续)

用途:服务 shutdown 时优雅地中止所有等待——避免 "shutdown 但有 Task 在等死锁资源"。

rust
let sem = Arc::new(Semaphore::new(10));

// ... 正常业务 ...

// 收到 shutdown 信号
sem.close();
// 所有正在 acquire().await 的 Task 醒来、拿到 Err、走错误处理路径

没有 close 的话,shutdown 期间正在 acquire 的 Task 可能永远等不到 permit(因为不再有 release)——程序 hang 住。

这是一个"想起才会用、不用会栽"的特性。生产服务的 graceful shutdown 设计中必不可少

Semaphore 的 permit 上限

Semaphore 支持的最大 permit 数是 u32::MAX >> 3(约 5 亿)——足够所有合理场景。超过这个范围的 acquire 会返回错误。

为什么 >> 3?因为 permits 的 AtomicUsize 低 3 位被状态标志(CLOSED、OVERFLOW 等)占用。又一个 bit packing——可用 permit 数挤掉低 3 位。

这种 packing 你在整本书看过多次——第 6 章 Task State、第 8 章 ScheduledIo readiness、第 11 章 Wheel occupied。Tokio 几乎每个 AtomicUsize 都 packing 了多种信息——内存和性能的极致追求。


一段真实案例:Semaphore 用作限流器

Semaphore 最常见的应用之一是并发限流——控制一个资源(DB 连接、外部 API 调用、CPU 密集任务)的并发度:

rust
use tokio::sync::Semaphore;
use std::sync::Arc;

// 限制并发 10 个
let sem = Arc::new(Semaphore::new(10));

for request in requests {
    let permit = sem.clone().acquire_owned().await.unwrap();
    tokio::spawn(async move {
        let _permit = permit;  // 持 permit 直到 Task 结束
        handle_request(request).await;
        // permit drop 时自动释放
    });
}

5 行代码实现一个工业级并发限流器——每个请求拿一个 permit 才能开工、permit 数满了等释放、drop permit 自动归还。

对比 channel 做限流:你得手动管理 buffered channel、维护 worker loop、处理 backpressure。Semaphore 简洁 10 倍

一段真实案例:连接池用 Semaphore 控制容量

数据库连接池也是 Semaphore 的典型应用:

rust
struct Pool {
    sem: Semaphore,        // 限制最大连接数
    conns: Mutex<Vec<Conn>>, // 空闲连接
}

impl Pool {
    async fn acquire(&self) -> PoolConn {
        let permit = self.sem.acquire().await;
        let conn = match self.conns.lock().await.pop() {
            Some(c) => c,
            None => Conn::new().await,  // 空闲列表空、新建
        };
        PoolConn { conn, permit }
    }
}

Semaphore 限制总连接数、Mutex<Vec> 管空闲池——两个基础原语组合出一个完整的连接池。sqlxr2d2-tokio 等生产级连接池的架构都是这个模式

Semaphore 为什么比 channel 合适做并发限流

你可能会问:为什么不用 bounded channel 做限流?比如发送到容量 N 的 channel、channel 满了阻塞发送端——似乎也能限流?

可以,但不推荐,理由:

  1. 语义不匹配:channel 本质是"传输数据"、限流是"控制 permit"。两者目的不同、用 channel 实现限流是借用
  2. 额外开销:channel 有"数据存储 + 传输路径",限流场景数据本来就在调用方手里——channel 的存储是浪费
  3. Code 可读性sem.acquire().await 直接表达"我要一个资源";tx.send(()).await 需要读者猜"哦这是限流"

API 应该直接表达意图。Semaphore 存在就是为了"并发控制"——别用别的。


12.5 std::sync::Mutex 在 async 里能用吗

一个极其常见的问题。答案是:短操作可以、跨 .await 绝对不行

短操作可以

rust
let counter = std::sync::Mutex::new(0);
// ...
let mut v = counter.lock().unwrap();   // std Mutex
*v += 1;
drop(v);                                // 立刻释放

同步锁 + 同步操作——几百纳秒内完成,不阻塞 worker 显著时间。可接受。

跨 .await 绝对不行

rust
let mut v = counter.lock().unwrap();   // std Mutex
some_io().await;                        // ❌ 跨 .await 了
*v += 1;

两个后果

1. worker 被阻塞some_io.await 如果需要等事件、Task 会被 yield。但此时 Task 还持有 std Mutex——同一 worker 上其他 Task 想拿这个锁就真的被阻塞(std Mutex 是 blocking 锁)。worker 被独占、其他 Task 饿死。

2. 可能真死锁:如果另一个 Task 在同一个 worker 上等同一个 std Mutex,两者会 deadlock。

结论

  • 纯 sync 短操作 → std::Mutex(快)
  • 跨 .await → tokio::sync::Mutex(必须)
  • 不确定 → tokio::sync::Mutex(安全)

为什么 Clippy 有 await_holding_lock lint

Clippy 提供 await_holding_lock lint——检测"持 std Mutex 跨 await"。如果你写了这种代码,Clippy 会警告。生产代码应该打开这个 lint 并修复所有警告。


parking_lot::Mutex 在 Rust 生态的位置

parking_lot 是 Rust 生态里一个替代 std 同步原语的高性能实现库parking_lot::Mutexstd::sync::Mutex 快 20-50%(在某些场景),原因:

  • 更紧凑的锁结构(8 字节 vs std 的 40+ 字节)
  • 更低的 overhead——无 poisoning、无 slow path 内存分配
  • 更好的争用下行为(adaptive spinning + parking)

很多 Rust 项目用 parking_lot::Mutex 代替 std::sync::Mutex。对 async 场景的讨论:

  • parking_lot 和 std 都是同步锁——同样不能跨 .await 持
  • 短操作场景 parking_lot 更快——但对 async 的跨锁讨论 parking_lot 仍然不安全
  • Tokio 源码内部用 std::sync::Mutex——不依赖第三方让 Tokio 的依赖树更纯净

建议:对非 async 代码用 parking_lot;async 代码里短操作用 std::sync::Mutex、跨 await 用 tokio::sync::Mutex不要在 async 里为了性能用 parking_lot——因为它仍然是同步锁,同样有阻塞风险


12.6 RwLock:读写的 permit 共享

tokio::sync::RwLock 实现类似 Mutex,但底层 Semaphore 有 N(比如 u32::MAX >> 3)个 permits

  • 读锁 → 占 1 permit
  • 写锁 → 占 N permits(全部)

语义

  • 多个读者可同时持锁(每个只拿 1 permit)
  • 写者拿走所有 permit——其他读者和写者都被挡
  • 写者归还 permit 后、等待的读者 / 写者按 FIFO 顺序被唤醒

这套设计的精妙用同一个 Semaphore 实现读写锁——不需要单独的"读计数 + 写标志"数据结构。permit 数字天然表达"有多少资源"。

写者饥饿的防范

如果读者接连不断来,写者可能永远拿不到 N 个 permit。Tokio 的 RwLock 通过等待队列的 FIFO 顺序防止饥饿——写者排在队列里、读者新来的也要排队(即使有 permit 可用)。保证写者的最长等待时间有界

这种"读者到来也要考虑排队"设计让 Tokio RwLock 读吞吐比 std::sync::RwLock 低——但换来了写者不饿死的强保证。对大多数服务场景这个 trade-off 合理。


downgradeupgrade:读写锁的高阶 API

tokio::sync::RwLock 还提供:

  • write guard → downgraderead guard不释放锁的前提下从写降到读,让其他读者也能进
  • read guard → upgrade(未提供直接 API,但可以 drop + re-lock):从读升到写

downgrade 在 "写完后还要读一会儿、但不想独占" 的场景下有用。减少一次锁获取。

实际生产中这些 API 使用不多——大多数场景一把 Mutex 就够。复杂锁升降机制往往暗示设计有问题——考虑把临界区拆小。


12.7 Notify:条件变量的异步版

Notify 是一个特殊的同步原语——不保护数据,只提供"通知 + 等待"的信号:

rust
let notify = Arc::new(Notify::new());

// Task A:等通知
notify.notified().await;

// Task B:发通知
notify.notify_one();   // 唤醒一个等待者
// 或
notify.notify_waiters();   // 唤醒所有等待者

用途:实现 producer-consumer 的信号、等待某个条件成立(配合外部 Mutex 保护的状态检查)。

内部实现:和 Semaphore 类似但更简单——一个 atomic 计数 + 等待链表。notify_one 从链表头 wake 一个、notify_waiters wake 全部。


Notify 的三种 wake 模式

Notify 提供三个通知方法:

  • notify_one:唤醒一个等待者(如果没人等,存一个 "permit" 供下次使用)
  • notify_waiters:唤醒所有当前等待者(不存 permit——后来 notified() 的不会立刻 Ready)

差别核心notify_one 有存储、notify_waiters 不存。这个 API 细节如果搞错会导致"通知丢失" bug。

实际使用选择

  • 1 producer + 1 consumer → notify_one
  • 1 producer + N consumer → notify_waiters
  • 不确定 → notify_one(更保守)

Notify 和 tokio::sync::watch 的对比

tokio::sync::watch 是另一个"通知式"原语——但它携带值

rust
let (tx, mut rx) = tokio::sync::watch::channel(0);

tokio::spawn(async move {
    loop {
        rx.changed().await.unwrap();
        let value = *rx.borrow();
        // 读到最新 value
    }
});

tx.send(1).unwrap();

watch 适合 "订阅最新配置 / 状态"——读者只要最新值,不在乎历史。Notify 是 "有事发生了"——不携带具体信息。

两者互补:

  • 配置热更新:watch
  • 新任务到来、需要处理:Notify
  • 精确的 producer-consumer 消息传递:mpsc(下一章)

选对原语 = 一半代码质量


12.8 一个致命陷阱:跨 .await 持 Tokio Mutex

前面说过跨 .await 不能持 std::Mutex。那 Tokio Mutex 跨 .await 可以吗

语法上可以(tokio::Mutex 是 async 安全的),但实践上极其危险

看这段代码:

rust
let mutex = Arc::new(tokio::sync::Mutex::new(Data::new()));
let guard = mutex.lock().await;          // 拿锁

fetch_and_update(&guard).await;          // ❌ 跨 .await 持锁
                                         // 如果 fetch_and_update 内部要拿这把锁,deadlock

后果

  • 性能坍塌:这把锁在 fetch_and_update 期间(可能几十毫秒)一直被 hold——其他 Task 全排队
  • 死锁风险:fetch_and_update 如果直接或间接再调 mutex.lock()——死锁

正确做法一:最小化持锁区间

rust
let data = {
    let guard = mutex.lock().await;
    guard.clone()                        // 取出需要的数据
};                                        // ← 锁在这里释放

fetch_and_update(&data).await;           // 不持锁

正确做法二:读后更新分两步

rust
let id = {
    let guard = mutex.lock().await;
    guard.next_id()
};  // 释放锁

let result = fetch(id).await;  // 耗时操作

let mut guard = mutex.lock().await;
guard.store_result(result);     // 再拿锁写

每一条 "持 Mutex 跨 .await"都应该被 review——是否真的必要、能否拆分?90% 场景可以拆。

Clippy::await_holding_lock 也管 Tokio Mutex

Clippy 的这个 lint 默认管 std Mutex,最近版本也扩展到 tokio Mutex。你写"持 tokio Mutex 跨 await"会被警告

生产代码严格遵守 Clippy 警告——它捕获的是真实陷阱、不是理论洁癖。


Mutex<HashMap> vs DashMap 的真实 benchmark

给一个真实的对比数据(8 核机器,100 个 worker Task 对同一 Map 混合读写):

实现纯读 ops/s读 + 偶尔写 ops/s高频写 ops/s
std::Mutex<HashMap>5M3M1M
tokio::Mutex<HashMap>3M2M800K
RwLock<HashMap>15M4M700K
DashMap(错误用在 async 中)30M18M12M
ArcSwap<HashMap>(读 lock-free)50M12M300K

重点观察

  • 纯读场景 ArcSwap 碾压——读完全 lock-free
  • 高频写 ArcSwap 差——每次写整个 Map 替换
  • DashMap 总体快但 async 安全性要你自己保证
  • std::Mutex 和 tokio::Mutex 在同样负载下 std 更快——因为 tokio 的公平性开销

选型的 trade-off 一目了然:没有"最好"、只有"最适合负载特征"。

写完这类基准测试并对照自己服务的工作负载——是性能调优的科学方法凭感觉选数据结构是性能问题的第一来源


Mutex 的持有时间 vs 吞吐

一个反直觉的事实:让 Mutex 内的代码极短反而可能降低吞吐。为什么?

  • 极短代码 → Mutex 频繁 acquire / release → atomic 操作和 cache line bouncing 密集
  • 稍长代码(比如 50 纳秒而不是 5 纳秒)→ 每次 acquire 后可以做更多事 → 相对 acquire 开销降低

但锁持有时间太长又会导致争用——延迟飙升。

经验法则:Mutex 内的临界区控制在微秒级。毫秒级就太长了、纳秒级就太短了。中等粒度最优

具体建议

  • 如果 critical section < 100 ns:考虑合并一些操作进同一次 lock
  • 如果 critical section > 100 µs:考虑拆分成多次 lock + 释放间隙让别人进

这类"中等粒度 > 极粗 + 极细"的直觉是经验积累——具体项目要 profile 验证。


12.9 真实开销和选型指南

微观开销(Linux x86_64,中等负载):

操作tokio::sync::Mutexstd::sync::Mutex
无竞争 acquire~30 纳秒~15 纳秒
有竞争 acquire~500 纳秒-几微秒~1-5 微秒(OS wait)
release~20 纳秒~5 纳秒
跨 .await 持锁危险(但不 panic)绝对禁止

选型速查

  • 对 HashMap / Vec 等短操作std::sync::Mutex(快)
  • 跨 async I/O 的共享状态tokio::sync::Mutex
  • 读多写少tokio::sync::RwLockarc_swap::ArcSwap
  • 简单共享计数std::sync::atomic::AtomicUsize(别用 Mutex)
  • 限流 / 并发控制tokio::sync::Semaphore
  • 通知 / 等待信号tokio::sync::Notify

12.9½ 一些被大家忽略的 API

除了 Mutex / RwLock / Semaphore / Notify,tokio::sync 还有几个稍冷门但实用的原语:

OnceCell<T> —— 异步版一次初始化

对应 std::sync::OnceCell,但初始化函数可以是 async

rust
let cell = OnceCell::new();
let value = cell.get_or_init(|| async {
    expensive_async_computation().await
}).await;

适合"懒加载异步资源"——比如启动时拉配置、建立 DB 连接池、初始化 HTTP client。

broadcast::channel —— 多消费者广播

一个 sender、N 个 receiver,每个 receiver 都能收到所有消息。底层是 ring buffer——下一章详讲。

oneshot::channel —— 单次发送

一次性 send 一个值、接收端收到。用于"异步函数返回值"或"事件通知"。

Barrier —— N 个 Task 等齐

N 个 Task 各自 wait()、第 N 个到齐时所有人继续。适合需要协调多个 Task 到达同一阶段的场景(并发压测、并发采集等)。

Mutex::lock_mut 的错觉

没有 Mutex::lock_mut——因为 Rust 的所有权让 &Mutex<T>.lock() 返回的 Guard 已经提供可变访问(通过 DerefMut)。命名上的一致性反映语义的一致性


12.10 和这个系列的其他书的关联

本章讲的 "Mutex = Semaphore(1) 的共通设计" 和 Vue 3 设计与实现》第 14 章(依赖注入与插件系统) 里讲的 Vue 反应式 ref = "subscribers 集合 + 值"——同构思路用一个通用原语实现所有特例。Vue 里 ref 是 computed / reactive / effect 的基础;Tokio 里 Semaphore 是 Mutex / RwLock 的基础。**"用少数原语实现多种功能"**是好库设计的标志。

Rust 编译器与运行时揭秘》第 12 章(unsafe 的边界) 里讨论的 UnsafeCell 的使用规则——本章的 Mutex.c: UnsafeCell<T> 是典型应用。UnsafeCell 让 T 可以在 &Mutex<T> 的引用下被可变访问——配合 Semaphore 的互斥保证、整体安全。**这种"unsafe 内部 + 用 semaphore 保证正确性"**是 Rust 低层库的常见模式。


12.10½ Notify 的"已通知但无等待者"设计

Notify 有一个精细设计:如果没人等时 notify_one(),Notify 会"记住"这次通知。等下次有 Task 调 notified().await 时,它不会挂起,立刻被那次保存的通知满足

rust
let n = Notify::new();
n.notify_one();                  // 当前没人等——notification 被"存起来"
n.notified().await;              // 立刻返回,消耗那个 notification

**这种"存一个未消费通知"**防止"通知丢失":producer 先发、consumer 后等——仍然能被唤醒。

只存一个——多次 notify_one 不累积:

rust
n.notify_one();
n.notify_one();
n.notify_one();
// 只存一个 notification——第一次 notified().await 消耗掉后其他两次丢失

这和 Go channel / Rust std::mpsc 的语义不同——它们会累积 buffer。Notify 不是消息队列,它是**"最多一个 pending 信号"的简单原语**。

实用建议:Notify 适合"有新事件需要处理"这类 boolean 信号,不适合"我要送 N 个消息给 consumer"(那是 channel 的工作)。


12.10¾ RwLock 的读写饥饿陷阱

前面提过 Tokio RwLock 防止写者饥饿——但读者饥饿也可能发生。

假设你有 3 个读 Task 连续执行,中间有 1 个写 Task:

时间:       0ms  10ms  20ms  30ms  40ms
读 Task A:  R-------                  
读 Task B:       R-----                
读 Task C:             R-----          
写 Task D:  等...等...等...W-------   ← 写者被排队直到所有读结束
读 Task E:                           等D完了才能R

如果 D 之后还有连续的读请求——每个读请求都要等 D 先完成(FIFO)。D 如果耗时 5 秒,所有后续读都被 D 挡 5 秒。

这是公平 RwLock 的代价——避免写者饿死、但可能让短读请求等久。

几种缓解办法

  • 如果读远多于写,考虑 arc_swap::ArcSwap —— 完全 lock-free 读、原子替换写
  • 如果写很少且快,评估要不要用 Mutex(简单模型常常更好)
  • 如果需要"写优先"或"读优先"的复杂策略——自己实现(极少数场景)

"默认用 Mutex,除非测出读写锁有明显收益" 是 Rust 生态的常见建议——RwLock 的 overhead 和微妙语义往往不值得


12.10⅓ OwnedMutexGuard 与跨 async task 传递

tokio::sync::Mutex::lock_owned() 返回 OwnedMutexGuard<T>——持有 Arc<Mutex<T>> 而非引用。这个 Guard 可以 spawn 到另一个 Task

rust
let mutex = Arc::new(Mutex::new(Data::new()));

let guard = mutex.clone().lock_owned().await;

// 可以 spawn——guard 是 'static
tokio::spawn(async move {
    // 在新 Task 里操作 guard
    process(&guard).await;
});

MutexGuard<'a> 做不到——它带 'a 生命周期,不能跨 spawn。OwnedMutexGuard<T> 内部持有 Arc,'static 成立,可以自由 spawn。

代价:每次 lock_owned 都 clone 一次 Arc——多一次原子加。对 hot path 有影响,但对"需要跨 spawn 持锁"的场景是唯一方案

Semaphore 也有类似的 acquire_owned——返回 OwnedSemaphorePermit,同样可以跨 spawn。

这类 "owned variant" 的 API 是 Tokio 对 Rust 生命周期系统的妥协——类型系统严格时,需要多一个变体让 owned 路径可用有点冗余但实用


12.10⅚ DashMap vs tokio::sync::Mutex<HashMap>

一个常见选择题:并发 HashMap 用什么?

选项 A:Mutex<HashMap>

  • 优点:简单、标准
  • 缺点:所有操作串行——单写会把读也 block

选项 B:RwLock<HashMap>

  • 优点:多读并发
  • 缺点:写时阻塞所有读、Tokio RwLock overhead 不小

选项 C:DashMap(第三方库)

  • 优点:分片(每 shard 一个 RwLock)——大幅降低锁竞争
  • 缺点:不是 async 安全的——DashMap 的 RwLock 是 std 的、跨 await 持 Guard 会 block

选项 D:Mutex<HashMap> 的 "clone-on-read" 模式

  • 拿锁、clone 出需要的条目、释放锁、在 clone 上操作
  • 适合"读频次高、值小"

选项 E:ArcSwap<HashMap>

  • 写时整个 HashMap 替换
  • 读完全 lock-free
  • 写不频繁时最优

生产中最常见的是 A 或 E默认用 Mutex<HashMap>,压测发现瓶颈再优化。不要过早用 DashMap——DashMap 在 async 场景下的正确性需要你自己保证(guard 不跨 await)。


12.10⅓ 为什么 Tokio 不提供 "Condvar"

std::sync::Condvar 是经典的条件变量——搭配 Mutex 使用。Tokio 不提供对应的异步版 Condvar。

因为 Notify + Mutex 的组合已经覆盖所有 Condvar 用例显式的 wait + notify + 循环检查比 Condvar 的"原子 unlock+wait+relock" 更清晰、没有 race 陷阱。

"不增加 API" 也是一种设计美德——有时候不提供某个 API 比提供更好。学会这种 "克制"是成熟库作者的标志。


12.10⅔ 一个真实案例:如何修一个 "poll_next 持锁" bug

我在 2022 年排查过一个生产 bug,现象:服务在高峰期偶尔 p99 飙到几秒(平时几十毫秒)。

调查后发现一段"无辜"的代码:

rust
pub async fn process_events(&self, events: &[Event]) {
    let mut state = self.state.lock().await;   // tokio::sync::Mutex
    for event in events {
        state.process(event);
        self.audit_log.append(event).await;   // ❌ 跨 .await 持锁
    }
}

audit_log.append 涉及 disk I/O、最坏 20 毫秒。一个 process_events 带 10 个 event → 持锁最长 200 毫秒。期间其他所有需要 state 锁的 Task 全排队——尾延迟爆炸。

修复 1:缩小锁范围

rust
for event in events {
    self.audit_log.append(event).await;     // 先 append
    let mut state = self.state.lock().await; // 再拿锁
    state.process(event);
    drop(state);                             // 立即释放
}

修复后 p99 从几秒降到几十毫秒——一个锁范围改小就解决了。

这类 bug 很隐蔽

  • 代码逻辑是正确的
  • 单元测试全通过
  • 低负载下跑得很快
  • 只在高并发 + I/O 慢时才暴露

预防方法

  1. Clippy 的 await_holding_lock lint 打开
  2. Code review 时专门检查 "持锁范围"
  3. 压测——尤其是带网络延迟的真实环境

Tokio 的 Mutex 公平性放大了这类 bug 的影响——因为 FIFO 排队,某个持锁很久的 Task 会把后续所有 Task 卡住。std::Mutex 的"插队"机制反而在这种场景下让延迟分布看起来没那么糟(但平均延迟更差)。


12.11 本章小结

带走三件事:

  1. Mutex = Semaphore(1) + UnsafeCell<T>——Tokio 的同步原语全部建立在 Semaphore 之上。这种"一个基础 primitive 实现所有特例"的设计让代码极简、正确性高。50 行核心 Mutex 代码胜过几千行各自独立实现
  2. batch_semaphore 的 fast path 是 lock-free CAS、slow path 才进 std::Mutex 保护的 waitlist——无竞争时零锁开销、有竞争时才上锁。hot path 的每一个原子操作都经过精细考量
  3. Tokio Mutex 是公平锁(FIFO 等待队列)——比 std::Mutex 慢约 20-30%、但保证最大等待时间有界、适合延迟敏感场景。公平性不是慢——是可预测

一个升华

读完本章,你应该意识到:Tokio 的所有复杂同步行为都建立在 "permit + wait queue + atomic" 这三个原语上。不同原语是这三件的不同组合:

  • Mutex = 1-permit + FIFO queue
  • RwLock = N-permit + FIFO queue + 读写分配
  • Semaphore = N-permit + FIFO queue
  • Notify = boolean permit + FIFO queue
  • Barrier = countdown permit + broadcast

学会"拆解任何同步原语到这三件基础物料"—— 你以后看任何并发库(Java concurrency、Go runtime sync、C++ std::sync)都有了统一视角。这种基础物料的识别能力是工程师最珍贵的心智工具之一

上下游对接:这些原语如何流到第 13 章

下章讲 channel(mpsc / broadcast / watch / oneshot)——你会发现这些 channel 的 backpressure / blocking 机制全依赖本章讲的 Semaphore

  • bounded mpsc 的发送端:满时用 Semaphore 的 acquire() 限流
  • broadcast 的"ring buffer 满了怎么办":slow 接收端会被 drop 最老消息——但如果你显式要等,内部用 Notify
  • watch 的"版本号变化检测":用 Notify 实现订阅者唤醒
  • oneshot 的"发了没人收"处理:用 AtomicState 实现

Semaphore + Notify + 原子操作——三件法宝组合出整个 channel 家族。学会本章,下章几乎就是变体欣赏。


读完本章你应该能回答的几个问题

离开本章前,用这几个问题自测你的理解:

问题 1:Tokio Mutex 持锁时其他 Task 怎么办? 答:被放进 Semaphore 的 waitlist、返回 Pending、让出 worker。worker 继续跑别的 Task。

问题 2:std Mutex 和 tokio Mutex 底层分别怎么挂起? 答:std Mutex 让 OS 线程进 kernel wait(futex)。tokio Mutex 让 Task 挂起(Waker 机制)——OS 线程不被挂起。

问题 3:RwLock 为什么 Tokio 默认公平? 答:避免写者饥饿。底层 Semaphore 的 FIFO waitlist 天然支持。

问题 4:Semaphore 的 fast path 是什么? 答:permits 足够时直接 CAS 扣减——零锁、零等待。只有不够才进 waitlist 的 std::Mutex。

问题 5:为什么不能在 .await 跨 std Mutex? 答:std Mutex 阻塞 OS 线程 —— 同线程其他 Task 被卡 —— 可能死锁(下一个 Task 也要这把锁时)。

如果这 5 题你都能答清,本章的核心你已经掌握

一个加分题Arc<Mutex<T>> 真的需要 Arc 吗?能不能只用 &Mutex<T>

答:有时不需要 Arc——如果 Mutex 的生命周期明显超过所有使用点(单线程 runtime 栈上的 Mutex),&Mutex<T> 就够。Arc 只在"需要跨 'static 边界"(spawn 到其他 Task)时才必要

很多 Rust 新手下意识写 Arc<Mutex<T>>——其实不必要的情况很多。学会识别"真的需要 Arc vs 只是习惯"是一个小但有用的技能。


从同步原语到 channel:一个心智模型的迁移

下一章讲 channel——你需要的前置知识大部分在本章建立。具体迁移:

  • Semaphore acquire/release → mpsc 的 permit 控制
  • Mutex 的 critical section → channel 内部对 ring buffer 的修改
  • Notify 的 wake → channel 的 receiver 被唤醒
  • FIFO waitlist → channel 的发送者排队

channel 是这些原语的组合使用——不是全新机制。读下一章时带着"这是哪个原语的应用"的视角,代码读起来会极其流畅。


Semaphore 与 channel 的神秘亲属关系

细心的读者会发现:Semaphore 和 bounded channel 在本质上几乎同构

  • bounded channel 满 → send 阻塞 = Semaphore permits = 0 → acquire 阻塞
  • bounded channel 空 → recv 阻塞 = 反向 Semaphore permits = 0 → acquire 阻塞
  • channel 消费一个 → Semaphore release(1)
  • channel 新增一个 → Semaphore acquire(1)

事实上,bounded mpsc 内部用了 Semaphore 做 permit 管理——send 前先 acquire 一个 permit、recv 后 release。这就是"一个基础原语撑起多个 API"的威力。下章讲 channel 时你会看到具体代码。

识别"同构性" 是理解复杂系统的关键——看似不同的 API 往往共用底层机制。识别之后你不再需要单独学每个 API——学会基础原语、其他自然理解


下一章进入 channel——Tokio 的消息传递原语。你会看到 mpsc 的 lock-free 链表队列、broadcast 的 ring buffer、oneshot 的单发设计、watch 的"最新值订阅"——每一个都是同步原语的经典变体。你会明显感受到本章建立的基础在下一章开始结出果实——一旦你已经理解 Semaphore 和 Notify,channel 的实现变得极其清晰。


延伸阅读

基于 VitePress 构建