Appearance
第15章 Transaction:Drop 保护、savepoint、嵌套
"A transaction is a promise—and Rust's Drop cannot keep promises that require await." —— 每个写过 sqlx Transaction 代码的人遇到的第一条领悟
本章要点
Transaction<'c, DB>(sqlx-core/src/transaction.rs:86-94)是 sqlx 的事务 RAII guard——两个字段:connection: MaybePoolConnection<'c, DB>+open: bool。简洁到让人意外。TransactionManagertrait(transaction.rs:15-53)定义了驱动层的事务操作:begin/commit/rollback(async)+start_rollback(sync)+get_transaction_depth。由每个驱动具体实现(PgTransactionManager等)。- 三条 ANSI SQL helper 函数(
transaction.rs:277-305):begin_ansi_transaction_sql(depth)、commit_ansi_transaction_sql(depth)、rollback_ansi_transaction_sql(depth)——depth 0 用BEGIN/COMMIT/ROLLBACK、depth > 0 用SAVEPOINT _sqlx_savepoint_N/RELEASE/ROLLBACK TO。 Drop的"尽力 rollback"(transaction.rs:260-275)—— Drop 调start_rollback同步方法、只排队一条 ROLLBACK SQL 到 write buffer、不等发送完成。真正发送要等下次这条连接 I/O。- 嵌套事务通过 SAVEPOINT——第一层 BEGIN;第二层开始用
SAVEPOINT _sqlx_savepoint_1;commit 嵌套用RELEASE SAVEPOINT;rollback 用ROLLBACK TO SAVEPOINT。 Transaction实现 DerefMut 到DB::Connection(transaction.rs:220-232)——让&mut *tx变成&mut Connection、满足 Executor bound(第 4 章 §4.6 讨论过)。Transaction本身不实现 Executor——只有通过 DerefMut 的&mut *tx才是。原因是 lazy normalization 限制(第 4 章 §4.6)。- commit/rollback/drop 的工程优先级:显式 commit 最优 / 显式 rollback 次之 / drop 兜底但不保证——生产代码永远显式 commit 或 rollback,不依赖 drop。
15.1 问题引入:Rust async 里的事务挑战
事务是 SQL 世界的原子性保证——一组操作要么全做要么全不做。经典用法:
rust
let mut tx = pool.begin().await?;
sqlx::query("UPDATE accounts SET balance = balance - $1 WHERE id = $2")
.bind(100).bind(from_id).execute(&mut *tx).await?;
sqlx::query("UPDATE accounts SET balance = balance + $1 WHERE id = $2")
.bind(100).bind(to_id).execute(&mut *tx).await?;
tx.commit().await?;三行 SQL 在同一事务里——中间任何一步失败(第二条 UPDATE 抛错、? 返回 Err),整个事务要回滚,钱不会凭空消失。
问题是:? 返回 Err 后 tx 变量离开作用域被 drop——Rust 期望 drop 里发 ROLLBACK 给数据库。但 Rust 的 Drop::drop 是同步函数——不能 .await。而"发 ROLLBACK 消息给数据库"是异步 I/O(写 socket + 等响应)——drop 里做不完。
这个矛盾是 sqlx Transaction 设计的根本挑战。sqlx 的对策是两层 API:
- 显式 commit/rollback 是 async——用户
.await它们保证消息发出、响应接收。 - Drop 调同步
start_rollback——只把 ROLLBACK 查询排进 write buffer、不发送。真正发送要等下次这条连接被使用时。
这是一个"prefer explicit, fall back to best-effort"的设计——用户如果记得 commit/rollback 就一切正常;如果忘了、Drop 也尽可能把事务关掉(但不保证立即)。
本章拆开这条设计的每一处细节——TransactionManager trait、三条 ANSI SQL helper、Transaction struct、Drop 实现、嵌套 savepoint——看 sqlx 如何在 Rust async 限制下把事务 API 做得"尽可能安全"。
15.2 TransactionManager trait:驱动层的抽象
sqlx-core/src/transaction.rs:15-53 的 trait:
rust
#[doc(hidden)]
pub trait TransactionManager {
type Database: Database;
fn begin<'conn>(
conn: &'conn mut <Self::Database as Database>::Connection,
statement: Option<Cow<'static, str>>,
) -> BoxFuture<'conn, Result<(), Error>>;
fn commit(conn: &mut <Self::Database as Database>::Connection) -> BoxFuture<'_, Result<(), Error>>;
fn rollback(conn: &mut <Self::Database as Database>::Connection) -> BoxFuture<'_, Result<(), Error>>;
fn start_rollback(conn: &mut <Self::Database as Database>::Connection);
fn get_transaction_depth(conn: &<Self::Database as Database>::Connection) -> usize;
}五个方法:
begin(conn, statement)async——开事务或进 savepoint。commit(conn)async——提交事务或 release savepoint。rollback(conn)async——回滚事务或 rollback to savepoint。start_rollback(conn)sync——只排队 ROLLBACK 查询,不等执行。get_transaction_depth(conn)——查当前 nesting 层级(0 = 无事务、1 = 事务中、>1 = savepoint 嵌套)。
#[doc(hidden)] 标注——普通用户不调这些方法,通过 Transaction 类型间接用。trait 是驱动层内部抽象。
没有方法接收 self——所有方法都是 type-associated,通过 &mut Connection 操作。TransactionManager 本身是零大小 marker 类型(PgTransactionManager 是 pub struct PgTransactionManager; 空 struct)——不持状态,只提供方法集。
15.2.1 begin 的 statement 参数
begin(conn, statement: Option<Cow<'static, str>>) 的 statement 允许用户自定义 BEGIN 语句:
- None:用默认 ANSI
BEGIN。 - Some("BEGIN ISOLATION LEVEL SERIALIZABLE"):自定义(只对 depth = 0 有效)。
第 12 章 §12.5 讨论过的 Connection::begin_with 就走这条——传 Some(statement) 给 TransactionManager::begin。嵌套事务(depth > 0)不能自定义 statement——sqlx 强制用 SAVEPOINT,传 Some 时返回 Error::InvalidSavePointStatement。
15.3 Transaction struct 的简洁性
transaction.rs:86-94:
rust
pub struct Transaction<'c, DB>
where DB: Database,
{
connection: MaybePoolConnection<'c, DB>,
open: bool,
}只有两个字段:
connection: MaybePoolConnection<'c, DB>——要么是&mut Connection(从Connection::begin来)、要么是 ownedPoolConnection(从Pool::begin来)。统一抽象让 Transaction 对两种来源都工作。open: bool——标记事务是否还活着。true表示事务中、false表示已 commit / rollback。
MaybePoolConnection<'c, DB>(sqlx-core/src/pool/maybe.rs)简化:
rust
pub enum MaybePoolConnection<'c, DB: Database> {
Connection(&'c mut DB::Connection), // 来自 Connection::begin
PoolConnection(PoolConnection<DB>), // 来自 Pool::begin
}两种来源通过枚举统一——Transaction 不关心具体来源、只管事务语义。这让 Transaction<'c, DB> 的 'c 生命周期根据来源不同而不同:
- Connection-originated:
'c= 被借用的 Connection 的生命周期。 - Pool-originated:
'c = 'static——PoolConnection owned 不借用外部。
15.3.1 open: bool 的作用
open 字段跟踪事务状态。只在两处被修改:
Self::commit成功后:self.open = false(transaction.rs:119)。Self::rollback成功后:self.open = false(transaction.rs:125)。
Drop 里检查 if self.open——只有还在开着的事务才调 start_rollback:
rust
impl<'c, DB> Drop for Transaction<'c, DB> {
fn drop(&mut self) {
if self.open {
DB::TransactionManager::start_rollback(&mut self.connection);
}
}
}这条简单的标志让 "已 commit/rollback 的事务 drop 不做任何事" 成立——避免重复回滚。
15.4 begin 的流程:ANSI BEGIN 或 SAVEPOINT
Transaction::begin(transaction.rs:97-112):
rust
pub fn begin(
conn: impl Into<MaybePoolConnection<'c, DB>>,
statement: Option<Cow<'static, str>>,
) -> BoxFuture<'c, Result<Self, Error>> {
let mut conn = conn.into();
Box::pin(async move {
DB::TransactionManager::begin(&mut conn, statement).await?;
Ok(Self { connection: conn, open: true })
})
}就是调 TransactionManager::begin + 构造 Self。具体的 BEGIN 语句在 PgTransactionManager::begin(sqlx-postgres/src/transaction.rs:17-46)里决定:
rust
fn begin<'conn>(conn: &'conn mut PgConnection, statement: Option<Cow<'static, str>>) -> BoxFuture<...> {
Box::pin(async move {
let depth = conn.inner.transaction_depth;
let statement = match statement {
Some(_) if depth > 0 => return Err(Error::InvalidSavePointStatement),
Some(statement) => statement,
None => begin_ansi_transaction_sql(depth),
};
let rollback = Rollback::new(conn);
rollback.conn.queue_simple_query(&statement)?;
rollback.conn.wait_until_ready().await?;
if !rollback.conn.in_transaction() {
return Err(Error::BeginFailed);
}
rollback.conn.inner.transaction_depth += 1;
rollback.defuse();
Ok(())
})
}核心流程:
- 读当前 depth——决定是 BEGIN 还是 SAVEPOINT。
- 按 depth 选 SQL:
- depth = 0 + 无自定义 statement:
BEGIN。 - depth = 0 + 有自定义:用户 statement(
BEGIN ISOLATION LEVEL SERIALIZABLE等)。 - depth > 0 + 自定义:拒绝(
InvalidSavePointStatement)。 - depth > 0:
SAVEPOINT _sqlx_savepoint_N。
- depth = 0 + 无自定义 statement:
- 新建 Rollback guard——一个内部 RAII,如果 begin 过程中异常 drop、自动 start_rollback。
- 发 simple query——把 BEGIN/SAVEPOINT 发给服务端。
- 等 ReadyForQuery——确认服务端进入事务状态。
- transaction_depth += 1。
- defuse Rollback guard——成功了不需要 guard 的兜底。
15.4.1 begin_ansi_transaction_sql helper
transaction.rs:277-282:
rust
pub fn begin_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
if depth == 0 {
Cow::Borrowed("BEGIN")
} else {
Cow::Owned(format!("SAVEPOINT _sqlx_savepoint_{depth}"))
}
}depth = 0:BEGIN(顶层事务)。 depth = 1:SAVEPOINT _sqlx_savepoint_1(第一个嵌套 savepoint)。 depth = 2:SAVEPOINT _sqlx_savepoint_2。
savepoint 命名用连续数字——第 N 层 savepoint 叫 _sqlx_savepoint_N。这样 commit / rollback 对应的 RELEASE / ROLLBACK TO 也能按 depth 反算名字。
返回 Cow<'static, str> 的巧妙:depth = 0 时是 "BEGIN" 字符串字面量(借用)、depth > 0 时是动态格式化 String(owned)。Cow 让两种情况共用签名——常见字面量零分配、少见情况按需分配。
15.4.2 Rollback guard
PgTransactionManager::begin 里的 Rollback 是个内部 RAII:
rust
struct Rollback<'c> {
conn: &'c mut PgConnection,
defuse: bool,
}
impl Drop for Rollback<'_> {
fn drop(&mut self) {
if !self.defuse {
PgTransactionManager::start_rollback(self.conn)
}
}
}作用:如果 begin 过程中任何 .await? 失败(panic / 网络错误)、Rollback 的 drop 会自动 start_rollback——避免服务端已经开始了事务但 Rust 这边错误返回的状态不一致。
rollback.defuse() 只在 begin 完全成功后调——cancel 掉自动 rollback、因为 Transaction 本身接管了事务管理责任。
这是一个经典 RAII 模式——"部分成功的错误路径"用 guard 兜底。第 14 章的 DecrementSizeGuard 也是同一种设计。
15.5 commit / rollback:async 完整路径
transaction.rs:114-127:
rust
pub async fn commit(mut self) -> Result<(), Error> {
DB::TransactionManager::commit(&mut self.connection).await?;
self.open = false;
Ok(())
}
pub async fn rollback(mut self) -> Result<(), Error> {
DB::TransactionManager::rollback(&mut self.connection).await?;
self.open = false;
Ok(())
}两个方法的结构一样——调 TransactionManager 的 commit/rollback + 设置 open = false。
关键是吃 self——commit/rollback 消费整个 Transaction 对象。.await? 之后 self 要么被消费(Ok)要么错误冒泡(Err)——两条路径之后 Transaction 都不可再用。
注意 如果 commit 的 .await? 失败——open 保持 true、Drop 会触发 start_rollback。commit 失败不意味着"事务还活着"——它意味着"客户端不确定事务状态"(网络可能在 COMMIT 消息发出后断了)——rollback 是保守兜底。
15.5.1 PgTransactionManager::commit 的实现
sqlx-postgres/src/transaction.rs:48-57:
rust
fn commit(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
if conn.inner.transaction_depth > 0 {
conn.execute(&*commit_ansi_transaction_sql(conn.inner.transaction_depth))
.await?;
conn.inner.transaction_depth -= 1;
}
Ok(())
})
}三步:
- 检查 depth:只在 depth > 0 时执行(否则无事务可 commit)。
- 发 commit SQL:
COMMIT或RELEASE SAVEPOINT _sqlx_savepoint_N。 - depth -= 1。
commit_ansi_transaction_sql(transaction.rs:285-291):
rust
pub fn commit_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
if depth == 1 {
Cow::Borrowed("COMMIT")
} else {
Cow::Owned(format!("RELEASE SAVEPOINT _sqlx_savepoint_{}", depth - 1))
}
}- depth = 1:
COMMIT(关闭顶层事务)。 - depth > 1:
RELEASE SAVEPOINT _sqlx_savepoint_{depth-1}(关闭嵌套的 savepoint)。
注意 depth - 1——命名用 savepoint 建立时的 depth。举例:
- depth = 0 → begin
BEGIN(depth 变 1)。 - depth = 1 → begin
SAVEPOINT _sqlx_savepoint_1(depth 变 2)。 - commit at depth = 2 →
RELEASE SAVEPOINT _sqlx_savepoint_1(depth 变 1)。
这条命名和 release 的反向对称是 savepoint 语义的一部分——让 commit/rollback 能按 depth 反推到正确的 savepoint 名字。
15.6 start_rollback:同步路径
PgTransactionManager::start_rollback(sqlx-postgres/src/transaction.rs:71-80):
rust
fn start_rollback(conn: &mut PgConnection) {
if conn.inner.transaction_depth > 0 {
conn.queue_simple_query(&rollback_ansi_transaction_sql(conn.inner.transaction_depth))
.expect("BUG: Rollback query somehow too large for protocol");
conn.inner.transaction_depth -= 1;
}
}同步函数——不 .await。做的事:
- 检查 depth。
queue_simple_query——把 ROLLBACK SQL 写进连接的 write buffer(不 flush、不发送)。- depth -= 1——逻辑上事务已经"准备" rollback。
expect("BUG: ...")——queue_simple_query 只在"SQL 长度超过 u32 最大值(4GB)"时失败——不可能发生、panic 是合理的。
15.6.1 为什么"只排队不发送"?
queue_simple_query 往 write buffer 写字节——不调 flush。什么时候真正发送?
- 下次用这条连接执行 query 时——新 query 写进 buffer 后、flush 会把之前的 ROLLBACK 一起发。
- 连接归还 Pool、Pool 在下次 acquire 时 flush(如果有 test_before_acquire)。
- 连接被 drop——drop 实现 best effort 地 flush(不保证)。
延迟发送意味着:ROLLBACK 消息可能几百毫秒后才真到服务端。在这段间隔里,服务端视角下事务还开着——持有锁、持有行版本、占 statement 资源。
实际影响:
- 短期(几秒):没明显影响,服务端最终会收到 ROLLBACK。
- 极端情况(连接卡住、进程挂):ROLLBACK 永远不发、服务端等到 session timeout(默认几分钟)才清理。
这条"尽力"语义是 Rust async Drop 限制的直接结果——没办法更好。sqlx 文档里反复强调"永远显式 commit/rollback"——就是因为 Drop 路径不够可靠。
15.7 Drop 的"尽力 rollback"
transaction.rs:260-275 的 Drop 实现:
rust
impl<'c, DB> Drop for Transaction<'c, DB>
where DB: Database,
{
fn drop(&mut self) {
if self.open {
// starts a rollback operation
// what this does depends on the database but generally this means we queue a rollback
// operation that will happen on the next asynchronous invocation of the underlying
// connection (including if the connection is returned to a pool)
DB::TransactionManager::start_rollback(&mut self.connection);
}
}
}注释精炼地说清楚了语义——"queue a rollback operation that will happen on the next asynchronous invocation"。用户看到这段注释就明白"drop 不立即 rollback"。
这条设计的工程权衡:
如果 Drop 试图同步发 ROLLBACK(就像 tokio-postgres 的做法之一):
- 需要 block on the runtime——可能死锁(Tokio drop 时 runtime 可能已经 shutdown)。
- 或者 spawn task——task 可能跑不完 Drop 就返回、Transaction 对象先被析构。
如果 Drop 什么都不做:
- ROLLBACK 永远不发——服务端要等 session timeout——更糟。
start_rollback 是中间路径:排队不发送——正确性要靠下次 connection I/O 触发——大部分场景有效、极端情况依赖 server-side timeout。
sqlx 的选择是 "最佳 effort + 明确文档"——不假装能做到同步 rollback、也不放弃尝试。
15.8 嵌套事务:savepoint 的工作机制
sqlx 支持任意深度的嵌套事务——通过 SAVEPOINT 实现:
rust
let mut tx = pool.begin().await?; // depth 1: BEGIN
sqlx::query("INSERT ...").execute(&mut *tx).await?;
let mut inner_tx = (&mut *tx).begin().await?; // depth 2: SAVEPOINT _sqlx_savepoint_1
sqlx::query("INSERT ...").execute(&mut *inner_tx).await?;
// 嵌套 rollback 只回滚到 savepoint
inner_tx.rollback().await?; // ROLLBACK TO SAVEPOINT _sqlx_savepoint_1
// 外层继续
sqlx::query("INSERT ...").execute(&mut *tx).await?;
tx.commit().await?; // COMMIT语义:内层 rollback 不影响外层——外层继续跑到 commit。内层如果 commit 了、外层再 rollback,内层的改动也会被外层 rollback(因为 COMMIT 嵌套 savepoint 实际是 RELEASE——在外层 COMMIT 前还没真正提交到 DB)。
这条语义对应 Postgres / MySQL 的 SAVEPOINT 标准——sqlx 直接用协议的 SAVEPOINT 命令、不做客户端模拟。
15.8.1 嵌套的 SQL 对应
完整映射表:
| depth 变化 | begin SQL | commit SQL | rollback SQL |
|---|---|---|---|
| 0 → 1 | BEGIN | COMMIT | ROLLBACK |
| 1 → 2 | SAVEPOINT _sqlx_savepoint_1 | RELEASE SAVEPOINT _sqlx_savepoint_1 | ROLLBACK TO SAVEPOINT _sqlx_savepoint_1 |
| 2 → 3 | SAVEPOINT _sqlx_savepoint_2 | RELEASE SAVEPOINT _sqlx_savepoint_2 | ROLLBACK TO SAVEPOINT _sqlx_savepoint_2 |
| 3 → 4 | SAVEPOINT _sqlx_savepoint_3 | ... | ... |
depth 在 begin 时递增、commit/rollback 时递减——跟踪由 TransactionManager::get_transaction_depth 维护。
15.9 Transaction 的 DerefMut
transaction.rs:212-232:
rust
impl<'c, DB> Deref for Transaction<'c, DB>
where DB: Database,
{
type Target = DB::Connection;
fn deref(&self) -> &Self::Target { &self.connection }
}
impl<'c, DB> DerefMut for Transaction<'c, DB>
where DB: Database,
{
fn deref_mut(&mut self) -> &mut Self::Target { &mut self.connection }
}Transaction 可以被 deref 成 &mut DB::Connection——这是 sqlx 让 &mut *tx 成为 Executor 的基础。
第 4 章 §4.6 详细讨论过**&mut Transaction 不直接实现 Executor**——因为 lazy normalization 限制。用户写 &mut *tx 显式 deref——绕过这条限制。
这条设计让事务内的 query 写法统一:
rust
sqlx::query(...).execute(&mut *tx).await?; // &mut Connection via DerefMut
sqlx::query(...).execute(&mut conn).await?; // 直接 &mut Connection
sqlx::query(...).execute(&pool).await?; // &Pool三种 Executor 混用——Transaction 通过 DerefMut 融入统一体系。
15.10 Connection::transaction 便捷方法
第 12 章 §12.5 讨论过 Connection::transaction——一个便捷闭包包装:
rust
fn transaction<F, R, E>(&mut self, callback: F) -> BoxFuture<'_, Result<R, E>>
where F: FnOnce(&mut Transaction<'_, Self::Database>) -> BoxFuture<'_, Result<R, E>>,
R: Send, E: From<Error> + Send,
{
Box::pin(async move {
let mut transaction = self.begin().await?;
let ret = callback(&mut transaction).await;
match ret {
Ok(ret) => { transaction.commit().await?; Ok(ret) }
Err(err) => { transaction.rollback().await?; Err(err) }
}
})
}显式 commit 或 rollback——不依赖 Drop。这让 transaction 包装器比手写 begin/commit/rollback 更安全——错误路径显式 rollback、不用担心 Drop 的"尽力"语义。
生产代码推荐优先用 transaction 闭包:
rust
conn.transaction(|tx| Box::pin(async move {
sqlx::query("UPDATE ...").execute(&mut **tx).await?;
sqlx::query("INSERT ...").execute(&mut **tx).await?;
Ok::<_, sqlx::Error>(())
})).await?;相比手写 begin/commit,少两行代码、错误路径自动显式 rollback。唯一限制是闭包内无法跨 await 借用外部变量太复杂——这时候 fallback 到手写 begin。
15.11 跨 DB 的事务差异
三家 DB 的事务行为共性 + 差异:
共性:
- 都用
BEGIN / COMMIT / ROLLBACK顶层事务。 - 都用
SAVEPOINT / RELEASE / ROLLBACK TO SAVEPOINT嵌套。 - depth 跟踪机制类似。
Postgres 的特殊:
- 自动 rollback on error:Postgres 事务内一条 query 失败后、整个事务进入 "aborted" 状态、后续 query 全
current transaction is aborted——只能 rollback。sqlx 不处理这个、让错误自然冒泡。 - ISOLATION LEVEL:
BEGIN ISOLATION LEVEL SERIALIZABLE等通过 begin_with 传入。 - READ ONLY / DEFERRABLE:同样通过 begin_with。
MySQL 的特殊:
- DDL 不事务性:MySQL 的 DDL(CREATE TABLE 等)会自动提交当前事务——不像 Postgres 里 DDL 也事务性。sqlx 不能防止这个(协议层无法拒绝)。
- Lock 行为:MySQL 的行锁在事务中持有直到 COMMIT / ROLLBACK、和 Postgres 略不同。
SQLite 的特殊:
- 三种 BEGIN:
BEGIN DEFERRED(SQLite 默认)、BEGIN IMMEDIATE、BEGIN EXCLUSIVE——获取写锁的时机不同。sqlx 默认发裸BEGIN(等同 DEFERRED、见sqlx-core/src/transaction.rs:278)——第一条 SELECT 拿 shared lock、第一条 UPDATE/INSERT/DELETE 才升级为 reserved lock。如果业务在事务里先读后写且多个连接并发、可能遇到"lock upgrade 死锁"——这时应该用conn.begin_with("BEGIN IMMEDIATE")显式指定。 - 单写者:SQLite 一个数据库文件只能有一个 writing transaction——其他 writer 要等。Pool 的 max_connections 不能解决这个并发限制。
这些差异在业务代码几乎看不见——sqlx 的 Transaction API 屏蔽了大部分方言。但生产事故时需要知道这些——比如"为什么 MySQL 里 DDL 跑完我的事务没了"就是差异 1 导致。
15.12 实战模式
几个常见的事务实战模式:
15.12.1 金钱转账
rust
pub async fn transfer(pool: &PgPool, from: i32, to: i32, amount: i64) -> Result<(), Error> {
let mut tx = pool.begin().await?;
// 扣款(带余额校验)
let affected = sqlx::query(
"UPDATE accounts SET balance = balance - $1 WHERE id = $2 AND balance >= $1"
).bind(amount).bind(from).execute(&mut *tx).await?.rows_affected();
if affected != 1 {
return Err(Error::InsufficientFunds);
// drop tx → start_rollback 排队
}
// 入账
sqlx::query("UPDATE accounts SET balance = balance + $1 WHERE id = $2")
.bind(amount).bind(to).execute(&mut *tx).await?;
// 审计记录
sqlx::query("INSERT INTO audit_log (from_id, to_id, amount) VALUES ($1, $2, $3)")
.bind(from).bind(to).bind(amount).execute(&mut *tx).await?;
tx.commit().await?; // 显式 commit
Ok(())
}三条 SQL 原子性——任何一条失败(扣款校验不过、网络断),整个回滚。
15.12.2 带 SAVEPOINT 的错误恢复
rust
pub async fn bulk_import(pool: &PgPool, rows: Vec<Row>) -> Result<u64, Error> {
let mut tx = pool.begin().await?;
let mut inserted = 0u64;
for row in rows {
// 每行一个 savepoint
let mut sp = (&mut *tx).begin().await?;
let result = sqlx::query("INSERT INTO target ...").bind(...).execute(&mut *sp).await;
match result {
Ok(_) => {
sp.commit().await?; // RELEASE SAVEPOINT
inserted += 1;
}
Err(_) => {
sp.rollback().await?; // ROLLBACK TO SAVEPOINT
// 继续下一行
}
}
}
tx.commit().await?;
Ok(inserted)
}逐行 savepoint让 "一行失败不影响其他行" 成为可能——批量导入的常见模式。
15.12.3 重试(乐观并发)
rust
pub async fn update_with_retry(pool: &PgPool, id: i32, f: impl Fn(&Entity) -> Entity) -> Result<(), Error> {
for attempt in 0..3 {
let mut tx = pool.begin().await?;
let entity: Entity = sqlx::query_as("SELECT ... WHERE id = $1 FOR UPDATE")
.bind(id).fetch_one(&mut *tx).await?;
let updated = f(&entity);
let res = sqlx::query("UPDATE ... WHERE id = $1 AND version = $2")
.bind(id).bind(entity.version)
.execute(&mut *tx).await?;
if res.rows_affected() == 1 {
tx.commit().await?;
return Ok(());
} else {
tx.rollback().await?;
// 版本冲突,重试
}
}
Err(Error::ConflictRetryExceeded)
}乐观并发 + 版本冲突重试——显式 rollback + loop retry。用 Transaction 保证每次尝试隔离。
15.12.1 事务状态机可视化
把 Transaction 对象的完整生命周期画出来:
四条主要路径:
- Running → Committed:正常 commit 关闭。
- Running → RolledBack:显式 rollback 关闭。
- Running → DropRollback → Closed:没显式关就 drop——start_rollback 排队、等下次 I/O 发送。
- Closed 不是完整状态——严格说是"已排队 rollback 但 server 可能还没收到"的临界态。
三条正常路径(1/2/3)都最终到 [*]——但第三条的"到 [*]"延迟不定、server 侧最终一致。生产代码应当只走 1 或 2 路径。
15.12.2 Transaction 的三种典型错误场景
场景 1:commit 失败
rust
let mut tx = pool.begin().await?;
sqlx::query(...).execute(&mut *tx).await?;
tx.commit().await?; // ← 这里可能失败commit 失败的原因:
- 网络断(ACK 没回来)——客户端不知道事务是否提交成功。
- 服务端 COMMIT 被 revoke(极罕见)——事务没提交。
- 并发冲突(SERIALIZABLE 隔离级别下)——事务没提交。
错误处理:tx.commit().await? 返回 Err——tx 已被 commit 消费、但 open 还是 true(commit 没成功设 false)——Drop 会触发 start_rollback——如果事务实际已经在 server 端 commit,这次 rollback 是 no-op(server 拒绝)。
业务含义:commit 返回 Err 时你不能假设事务已生效——可能成功也可能失败。幂等设计是唯一安全做法——重试时先 SELECT 验证状态。
场景 2:begin 失败
rust
let mut tx = pool.begin().await?; // ← 这里失败begin 失败的原因:
- pool 满(PoolTimedOut)。
- 连接建立失败。
- BEGIN SQL 被 server 拒绝(罕见)。
错误处理:tx 根本没构造出——无事务状态泄漏。
场景 3:query 失败
rust
let mut tx = pool.begin().await?;
sqlx::query(...).execute(&mut *tx).await?; // ← 这里失败
sqlx::query(...).execute(&mut *tx).await?; // 不到这
tx.commit().await?; // 不到这? 冒泡错误——tx 离开作用域 drop——start_rollback 排队 ROLLBACK。
Postgres 特殊:第一条 query 失败后事务进入 "aborted" 状态、后续 query 全拒绝 current transaction is aborted。rollback 是唯一出路——drop 的 start_rollback 正好是期望行为。
三种错误场景的共性:始终显式 ? 传播错误、让 Rust 的 Drop 机制处理未完成的事务。不要自己 try/catch + 手动 rollback——Rust 的 RAII 已经帮你做了。
15.13 本章小结
本章拆开 sqlx Transaction 的每一处设计:
- Rust async Drop 的根本限制(§15.1)—— Drop 同步、事务回滚需要 async I/O、两者冲突。sqlx 的对策是显式 commit/rollback + start_rollback 同步排队。
- TransactionManager trait(§15.2)—— 五个方法 begin/commit/rollback(async)+ start_rollback(sync)+ get_transaction_depth。驱动层抽象。
- Transaction 两字段 struct(§15.3)—— connection + open。MaybePoolConnection 统一 Connection-originated 和 Pool-originated。
- begin 的 depth-based SQL 选择(§15.4)—— depth = 0 → BEGIN;depth > 0 → SAVEPOINT _sqlx_savepoint_N。Rollback guard 兜底 begin 过程的错误。
- commit / rollback async(§15.5)—— 执行 SQL + 更新 depth + 设置 open = false。失败时 open 保持 true 让 Drop 兜底。
- start_rollback 同步排队(§15.6)—— queue_simple_query 写 buffer 不发送。真正发送要等下次 I/O。
- Drop 的"尽力 rollback"(§15.7)—— 中间路径:不做太 eager(避免 deadlock)、也不完全放弃(保证最终发送)。
- SAVEPOINT 嵌套(§15.8)—— 连续数字命名、commit/rollback 按 depth 反推 savepoint 名。
- DerefMut 到 Connection(§15.9)——
&mut *tx变成&mut Connection、满足 Executor bound。 - Connection::transaction 闭包(§15.10)—— 显式 commit/rollback 包装、错误路径显式 rollback、比手写 begin 更安全。
- 跨 DB 差异(§15.11)—— Postgres 的 aborted 状态、MySQL 的 DDL 隐式提交、SQLite 的单写者。
- 三个实战模式(§15.12)—— 转账、带 savepoint 的批量导入、乐观并发重试。
第四部分"连接与事务"到此结束。下一章进入第五部分"驱动实现"——Postgres 驱动的协议栈、从 Parse/Bind/Execute 的 Extended Query 到 pipelining 的完整实现。
15.14 Transaction 的三种"不要做"
生产里关于 Transaction 使用的三条反模式,值得单独列出:
不要做 1:在事务里调 tokio::spawn
rust
let mut tx = pool.begin().await?;
tokio::spawn(async move {
sqlx::query("...").execute(&mut *tx).await; // 编译错:tx 不 Send?
});编译层面可能因为 Connection-originated Transaction 持有 &mut conn——不能跨 spawn。即便是 Pool-originated('static)也不推荐 spawn 里用——事务 commit/rollback 必须和 begin 在同一任务、否则语义混乱。
修复:所有事务操作在一个 task 内完成。
不要做 2:让事务持续数秒或更久
rust
let mut tx = pool.begin().await?;
long_external_api_call().await?; // 几秒 HTTP 请求
sqlx::query(...).execute(&mut *tx).await?;
tx.commit().await?;事务持有时间长 = 持锁时间长 = 其他事务被阻塞更久 = scalability 瓶颈。Postgres 的 row lock 在事务中一直持有。
修复:事务只包围必要的原子操作;外部 API 调用在事务外做。
不要做 3:假设 Drop 会立即 rollback
rust
if condition {
// 想"丢弃这个事务,不 rollback"——drop 让它 rollback 就够了
} else {
tx.commit().await?;
}错——drop 会 start_rollback、排队 ROLLBACK SQL——不是"什么都不做"。如果你真的想"不操作、让 server timeout 清理"——也做不到(start_rollback 已经执行)。
修复:想明确 rollback 就调 tx.rollback().await?;不想 rollback 就 commit 或保持路径一致。Drop 是兜底、不是你该依赖的"行为"。
15.15 Transaction API 的设计哲学
读完本章、回看整个 Transaction 设计——能看出 sqlx 团队的几条核心取舍:
1. 类型系统优于运行时检查。Transaction 吃 self commit/rollback——一次用完就消费掉、不能重复 commit。open 字段只在内部维护——用户看不到。用 Rust 类型一次性表达"事务是线性资源"。
2. 让异常路径尽可能正确。Rollback guard(begin 过程)、start_rollback(Drop)、open 字段(防重复)——全部都是"异常时也能兜底"的设计。代码里看不到一个"try/catch"——完全靠 RAII 和 flags。
3. 诚实暴露 Rust 限制。sqlx 没假装能同步 rollback、也没试图"偷偷跨线程 spawn rollback task"——直接文档写"drop 的 rollback 是尽力的"。这种诚实比"假装完美"换来更多用户信任。
4. 跨 DB 差异压到最低。同一套 Transaction 对外 API——底下 Postgres / MySQL / SQLite 的具体 SQL 和行为不同——但用户代码几乎不需要改。差异只在业务要用 DB 方言特性(如 ISOLATION LEVEL)时才显露。
5. RAII 自动化 + 显式调用鼓励。Drop 保底 + transaction() 闭包促显式。用户按场景选——简单场景让 RAII 管、需要精细控制时手写 begin + explicit commit。
这五条哲学共同构成 sqlx Transaction 的精神——"类型驱动安全 + RAII 保底 + 诚实文档 + 一套 API 跨 DB + 灵活性保留"。这是一份好的Rust 库设计模板——你设计自己的资源管理 API 时都可以套用。
15.16 第四部分回顾
本书第四部分"连接与事务"到此结束——四章 250+ 页内容讲完了 sqlx 最核心的资源管理抽象:
- 第 12 章 Connection——单条连接的生命周期、协议、礼仪。
- 第 13 章 Pool 外部 API——用户如何配置和使用 Pool。
- 第 14 章 Pool 内部实现——idle queue、semaphore、maintenance task。
- 第 15 章 Transaction(本章)——事务的 RAII guard、SAVEPOINT、尽力 rollback。
四章合起来构成了 sqlx 的"资源管理栈"——从最底层的 Connection 到最上层的 Transaction、每一层都有清晰职责、层层叠加。
读懂这部分的工程师有能力运维任何 sqlx 生产部署——Pool 配置、连接泄漏排查、事务边界设计、故障诊断都在掌握范围内。这也是本书最"实用派"的部分——第 1-11 章讲"sqlx 如何工作"、第 12-15 章讲"sqlx 在生产怎么用"。
下一部分进入驱动实现——Postgres / MySQL / SQLite / Any 四章。这部分偏"协议级深入"——读者可以按兴趣选。日常业务用 sqlx 不需要读;但想贡献 sqlx 代码、写自定义驱动、深度优化性能——这部分必读。
15.17 Transaction 常见问题 FAQ
读完本章,读者可能常有的几个问题:
Q1:嵌套事务(SAVEPOINT)在生产里真的用得着吗?
用得着,但场景较窄。典型用例:
- 批量导入——每行一个 savepoint 隔离失败(§15.12.2)。
- 子流程——某个独立操作可能失败但不影响主事务。
- ORM 框架内部——sea-orm 等 ORM 用嵌套 savepoint 实现 per-method 的事务边界。
大多数业务代码不用嵌套——一个顶层事务包围所有操作就够。
Q2:事务里可以 await 其他非数据库操作吗?
可以但不建议:
rust
let mut tx = pool.begin().await?;
sqlx::query(...).execute(&mut *tx).await?;
some_http_call().await?; // 危险
sqlx::query(...).execute(&mut *tx).await?;
tx.commit().await?;http_call 可能慢几秒——在这段时间里事务持有锁、占 Pool 连接、占 server 资源。高并发场景会让 Pool 饱和、其他事务等死。
规范:事务里只做数据库操作;外部调用在事务外。
Q3:query_as::<User>(...).fetch_all(&mut *tx) 和 .fetch_all(&pool) 行为有什么不同?
&mut *tx:走事务连接、看到事务内未提交的改动、读一致性受隔离级别控制。&pool:借一个独立连接、看不到你自己事务里未提交的改动、和事务隔离。
在事务里想查"自己刚改的数据"——必须用 &mut *tx。
Q4:为什么 Transaction 不直接用 Drop::drop 等待 rollback 完成?
Rust Drop::drop 是同步、不能 .await。解决方案只有两个:
- spawn task——但 task 可能跑不完 Drop 就返回(主 runtime 可能 shutdown)。
- block_on——可能死锁(当前 runtime 就 drop 时的那个)。
两者都不完美。sqlx 选了"start_rollback 同步排队 + 等下次 I/O 触发发送"——现实可行的最佳策略。
Q5:我的代码里 commit 返回错误是不是应该重试?
看情况。
- 网络错:可以重试、但先查 DB 状态确认事务到底提交了没——commit 错误不代表事务失败、可能 ACK 丢了。
- SERIALIZABLE 冲突:重试是正常流程(§15.12.3 的乐观并发)。
- 逻辑错:不要重试、修代码。
规则:重试必须配合幂等设计——单次操作的重复执行不改变结果。
这五个 FAQ 覆盖生产里最常碰到的 Transaction 疑问——收藏本章作为事务使用的参考。
15.18 Transaction 源码阅读推荐
如果你想精读 sqlx Transaction 的源码,推荐顺序:
sqlx-core/src/transaction.rs——core trait + Transaction struct + 3 条 ANSI SQL helper(300 行)。本章主要源码。sqlx-postgres/src/transaction.rs——PgTransactionManager 具体实现(110 行)。sqlx-mysql/src/transaction.rs——MySqlTransactionManager(类似 PgTransactionManager)。sqlx-sqlite/src/transaction.rs——SQLite 版本、通过 worker command 发 BEGIN(同样默认 DEFERRED 语义)。sqlx-core/src/pool/maybe.rs——MaybePoolConnection 枚举(20 行)——Transaction 用来统一 Connection/PoolConnection 来源。
总共 500-700 行——一个熟悉 Rust async 的工程师 1-2 小时能通读。读完后你对"Rust 里实现 RAII 风格事务"这件事有完整理解——在你自己的项目里需要实现类似"作用域式资源释放"时就知道怎么做。
特别留意:
transaction.rs:260-275的 Drop 实现和注释——解释了"尽力 rollback"的哲学。PgTransactionManager::begin里的 Rollback guard——部分成功的错误路径兜底。- 三条 ANSI SQL helper 的
Cow<'static, str>返回类型——零拷贝常见情况 + 按需分配动态情况。
这些细节不读代码看不出来、读一遍让你对 Rust 工艺水平有更深的欣赏。
15.19 Transaction 给 Rust 设计者的通用启示
跳出 sqlx 看 Transaction 的设计,有四条可迁移的启示:
1. 线性资源用 self 消费确保 "用一次"。commit / rollback 吃 self——事务对象被消费后类型系统不允许再用。这在 Rust 里比手动标志 + 运行时检查优雅太多。任何"只能做一次的操作"都值得用 self 表达。
2. Drop 用于保底、不用于正常路径。Transaction::Drop 是"万一用户忘了调 commit/rollback"的兜底——不是正常关闭流程。正常流程显式 commit/rollback。这条"Drop 仅兜底"的哲学让 API 边界清晰——用户知道什么该自己做、什么让 drop 管。
3. 跨 trait 的能力借用用 DerefMut。Transaction DerefMut 到 Connection——让 &mut *tx 变成 &mut Connection——不需要手写 impl Executor for Transaction。这让 "这个类型虽然不是 Connection 但在某些语境下可以当 Connection 用" 的需求零成本满足。
4. 诚实承认语言限制、写清楚文档。"Drop can't be async" 的限制不试图隐藏——注释、文档、tooling 都明确告诉用户。这比"假装没问题"换来更少的 bug 和生产事故——用户知道限制就不会踩坑。
这四条原则放在任何涉及资源管理 + 异步 + 错误处理的 Rust 项目里都适用——sqlx Transaction 是它们的一份精确展示。
15.20 从 Transaction 到驱动实现:中间观察
第 15 章是第四部分的收尾、也是第五部分(驱动实现)的桥梁。
本章讨论的 TransactionManager trait 和 ANSI SQL helper——是 driver-independent 的接口抽象;真正的 driver-specific 实现在每个驱动 crate 里(PgTransactionManager / MySqlTransactionManager / SqliteTransactionManager)。
从这一步开始往下,我们要讨论的是 "具体 DB 协议"——怎么发 Parse + Bind + Execute 消息(Postgres)、怎么处理 COM_STMT_EXECUTE 的 null_bitmap(MySQL)、怎么把同步 C API 包装成 async(SQLite)。
这条"从抽象到具体"的路径让 sqlx 的学习曲线分阶段——初学者读第 1-11 章就能用好 sqlx;读到第 12-15 章能在生产环境稳定部署;读到第 16-19 章(驱动实现)能贡献代码或写自定义驱动。你不需要一次读完所有章节——按自己阶段取用。
15.21 第四部分的一个核心教训
第 12-15 章贯穿一条教训:Rust async 在"资源管理"层面和同步 Rust 有本质差异——Drop 不能 async、生命周期更长、spawn task 有 capture 限制。sqlx 的解法可以总结成三件事:
- 显式 commit/rollback/close 是 async——保证协议级正确性。
- Drop 走 start_rollback / close_hard 同步排队——最佳 effort 兜底。
- 文档和 API 设计持续提醒用户"显式"——避免用户依赖不完善的自动路径。
这三件事在 Pool(第 12-14 章)和 Transaction(本章)里都有体现——是同一套设计哲学的不同展开。读懂这条哲学、你在自己 Rust 项目里处理类似问题时有明确的参照。
第 16 章开始,我们走出资源管理的讨论、进入协议栈的深水区——Postgres 驱动的 Extended Query 协议是 Rust 里最硬核的 async 状态机之一。
15.22 Transaction 代码品质观察
最后一点对 sqlx transaction 源码的品质观察:
1. 整个 transaction.rs 只有 302 行——包括文档注释和一大段被注释掉的 Executor impl(§第 4 章 §4.6)。去掉注释后业务代码只有 100-150 行。如此少的代码承担了事务这么核心的功能——体现了抽象的力量。
2. 核心方法(commit/rollback)只有 5-6 行——调驱动 + 设 open = false。所有复杂度都推到 TransactionManager trait 的实现方——core crate 保持薄。
3. 三个 SQL helper 是 pub fn——begin_ansi_transaction_sql 等可以被驱动以外的代码调。这让未来可能的第三方驱动(ClickHouse 等)可以复用 ANSI SQL 生成,不用自己重写。
4. 注释里承认限制——start_rollback 的 "depends on the database" / Drop 注释的 "next asynchronous invocation" ——不美化、不含糊。这种文档品质让用户建立准确预期、不踩坑。
5. #[doc(hidden)] 用得克制——只 TransactionManager trait 和 Transaction::begin 标 hidden。其他公共 API 都给用户看。这让 sqlx 的文档站既不暴露内部细节、又不藏掉有用信息。
这五点观察是代码审美层面的——读 sqlx 源码你能感受到作者对"简洁"的追求。每次有"这段能不能更短"的疑问,sqlx 通常已经想过了——减无可减的状态下才是现在的形态。
15.23 本章的最终消化
读完第 15 章,你应该能答下面这些问题:
- 为什么 Transaction::commit 吃 self?——线性资源、一次性消费、类型系统保证不重复 commit。
- Drop 里做什么?——start_rollback 排队 ROLLBACK SQL 到 buffer。
- "尽力 rollback" 意味着什么?——Drop 不保证消息立即发送、只保证最终发送(下次连接 I/O)。
- SAVEPOINT 嵌套的 SQL 是什么?——
SAVEPOINT _sqlx_savepoint_N、RELEASE SAVEPOINT ...、ROLLBACK TO SAVEPOINT ...。 - 为什么
&mut Transaction不直接是 Executor?——lazy normalization 限制(§第 4 章 §4.6)——通过 DerefMut 让&mut *tx成为&mut Connection。 - 生产代码应该依赖 Drop 自动 rollback 吗?——不应该、永远显式 commit/rollback。
- commit 返回 Err 应该假设事务没提交吗?——不能假设、可能提交了只是 ACK 丢失——需要幂等设计。
能答出这七题说明你对 Transaction 理解到位了——第四部分的学习成果到此兑现。
15.24 事务性能数据
用粗略数据给事务相关操作一个量级估计:
| 操作 | 本地 Postgres | 跨机房 Postgres (10ms RTT) |
|---|---|---|
| pool.begin().await | 1-3ms | 15-25ms |
| &mut *tx 一次 query.execute() | 1-3ms | 10-15ms |
| tx.commit().await | 1-3ms | 10-15ms |
| Drop 的 start_rollback | < 1μs (同步排队) | 同左 |
| 嵌套 savepoint begin | 1-3ms | 10-15ms |
观察:事务的每个操作都是一次 DB 往返——跨机房累加很快。一个"begin + 3 条 query + commit"事务在跨机房可能50-70ms——相当明显。
优化方向:
- 减少事务内 query 数量——把多次查合并成一条 SQL(JOIN / CTE /
RETURNING)。 - batch 操作——一条 INSERT 带 1000 行胜过 1000 次 INSERT。
- 保持事务短——事务外的只读查询分离出去。
- 用
sqlx::raw_sql批量执行 migration 脚本——一条 simple query 一次性发。
这些优化都是"减少 round-trip"——对事务性能影响最大的因素。
15.24a 三条 ANSI SQL 生成函数
sqlx 的 SAVEPOINT 嵌套靠三个小函数生成 SQL(sqlx-core/src/transaction.rs:277-302):
rust
pub fn begin_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
if depth == 0 {
Cow::Borrowed("BEGIN")
} else {
Cow::Owned(format!("SAVEPOINT _sqlx_savepoint_{depth}"))
}
}
pub fn commit_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
if depth == 1 {
Cow::Borrowed("COMMIT")
} else {
Cow::Owned(format!("RELEASE SAVEPOINT _sqlx_savepoint_{}", depth - 1))
}
}
pub fn rollback_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
if depth == 1 {
Cow::Borrowed("ROLLBACK")
} else {
Cow::Owned(format!(
"ROLLBACK TO SAVEPOINT _sqlx_savepoint_{}",
depth - 1
))
}
}三个函数一眼看完嵌套语义:
- depth=0 →
BEGIN(真事务启动)。 - depth≥1 →
SAVEPOINT _sqlx_savepoint_<depth>(嵌套点)。 - commit depth=1 →
COMMIT;commit depth>1 →RELEASE SAVEPOINT ...。 - rollback depth=1 →
ROLLBACK;rollback depth>1 →ROLLBACK TO SAVEPOINT ...。
SAVEPOINT 命名前缀 _sqlx_savepoint_—— 和用户 SAVEPOINT 永远不冲突(用户一般不用下划线开头)—— 和 _sqlx_migrations 表的命名策略一致——下划线前缀 = sqlx 内部空间。
depth 计数由 TransactionManager 维护——每家驱动实现 begin/commit/rollback 时维护自己的 depth:Postgres 用 PgConnection::transaction_depth、MySQL 用 MySqlConnection::transaction_depth、SQLite 用 WorkerSharedState::transaction_depth。同一套 ANSI SQL 生成函数 + 三种 depth 存储—— 共享代码最大化。
Cow<'static, str> 的选择—— depth=0/1 的常见路径返回 Borrowed("BEGIN") / Borrowed("COMMIT")—— 零分配;嵌套路径才分配 owned String—— 常见路径 fast、边界路径慢一点但仍正确。
这种"常见路径零开销、少见路径接受开销"是 sqlx 全书反复出现的设计风格。
15.24b Transaction::drop 的 start_rollback 含义
源码(sqlx-core/src/transaction.rs:260-275):
rust
impl<'c, DB> Drop for Transaction<'c, DB>
where DB: Database,
{
fn drop(&mut self) {
if self.open {
// what this does depends on the database but generally this means we queue a rollback
// operation that will happen on the next asynchronous invocation of the underlying
// connection (including if the connection is returned to a pool)
DB::TransactionManager::start_rollback(&mut self.connection);
}
}
}关键词 start_rollback(注意是 start)——不是 "执行 rollback"—— 是 "排队一条 rollback"。因为 Drop 不能 async——不能直接发 ROLLBACK 并等响应——只能标记这个连接"下次用的时候先跑 ROLLBACK"。
每家驱动实现 start_rollback 不同:
- Postgres—— 设
PgConnection::pending_rollback = true—— 下次run前补一个 ROLLBACK。 - MySQL—— 类似机制、设 flag。
- SQLite—— worker 线程收到下一个 command 前先 rollback。
self.open: bool—— 标志位—— commit/rollback 显式调用时置 false—— Drop 看到 false 就什么都不做。
结果—— 用户忘记显式 commit/rollback、业务代码早退、Drop 自动触发"下次 rollback"—— 不丢数据、不弄脏连接—— Rust 的 RAII 把 "C++ 程序员容易忘的事" 变成 "编译器强制的事"。
这段注释是全 sqlx 最重要的注释之一——明确告诉读者 Drop 的语义和限制—— "rollback 会在下次用这条连接时发生"—— 用户不需要 assume Drop 立刻 rollback。
补一句对比—— Java JDBC 的 Connection.close() 默认 commit 未提交事务——很多历史 bug 源于此(崩溃时意外 commit 了半截改动)。sqlx 的 "Drop 触发下次 rollback" 反其道而行—— 默认 rollback 比默认 commit 安全得多——这是 Rust async 库学到 Java 教训后做出的正确默认。
15.25 实战示例:带 retry 的幂等转账
把本章所有内容用一个最完整的例子收尾——一个生产级的带重试的转账函数:
rust
use sqlx::{PgPool, Error};
use std::time::Duration;
pub async fn transfer_with_retry(
pool: &PgPool,
from: i32,
to: i32,
amount: i64,
idempotency_key: &str,
) -> Result<(), TransferError> {
const MAX_ATTEMPTS: u32 = 3;
for attempt in 0..MAX_ATTEMPTS {
match transfer_once(pool, from, to, amount, idempotency_key).await {
Ok(()) => return Ok(()),
Err(TransferError::Conflict) if attempt < MAX_ATTEMPTS - 1 => {
// 乐观并发冲突,重试
let backoff = Duration::from_millis(10 * 2u64.pow(attempt));
tokio::time::sleep(backoff).await;
}
Err(TransferError::MaybeCommitted) if attempt < MAX_ATTEMPTS - 1 => {
// commit 失败但可能已生效——幂等 key 让重试安全
tokio::time::sleep(Duration::from_millis(100)).await;
}
Err(e) => return Err(e),
}
}
Err(TransferError::RetriesExceeded)
}
async fn transfer_once(
pool: &PgPool,
from: i32, to: i32, amount: i64,
idempotency_key: &str,
) -> Result<(), TransferError> {
let mut tx = pool.begin().await?;
// 幂等 key 防重复
let existing: Option<i64> = sqlx::query_scalar(
"SELECT amount FROM transfers WHERE idempotency_key = $1"
).bind(idempotency_key).fetch_optional(&mut *tx).await?;
if existing.is_some() {
return Ok(()); // 已经转过了
}
// 扣款 + 乐观锁(version 列)
let affected = sqlx::query(
"UPDATE accounts SET balance = balance - $1, version = version + 1
WHERE id = $2 AND balance >= $1"
).bind(amount).bind(from).execute(&mut *tx).await?.rows_affected();
if affected != 1 { return Err(TransferError::InsufficientFunds); }
// 入账
sqlx::query("UPDATE accounts SET balance = balance + $1, version = version + 1 WHERE id = $2")
.bind(amount).bind(to).execute(&mut *tx).await?;
// 记录
sqlx::query("INSERT INTO transfers (idempotency_key, from_id, to_id, amount) VALUES ($1, $2, $3, $4)")
.bind(idempotency_key).bind(from).bind(to).bind(amount)
.execute(&mut *tx).await?;
// commit
match tx.commit().await {
Ok(()) => Ok(()),
Err(e) if e.is_connection_error() => Err(TransferError::MaybeCommitted),
Err(e) => Err(TransferError::Other(e)),
}
}这个例子整合了本章所有要点:
- 显式 begin + commit——不依赖 Drop。
- 幂等 key——commit 错误后重试不会重复扣款。
- 乐观锁(version 列)——并发冲突返回 affected != 1。
- 重试逻辑——指数退避 + 有限次数。
- commit 失败分类——连接错误 vs 逻辑错误——区分重试与否。
这段 60 行代码整合了本章所有要点——显式 begin+commit、幂等 key、乐观锁、指数退避重试、commit 失败按连接错/逻辑错分类——是生产级事务代码的最小完整样本。
第 16 章开始进入驱动层——Postgres Extended Query 协议的 Rust async 实现。