Skip to content

第16章 Postgres 驱动:协议、管道、Extended Query

"A protocol is a conversation—Extended Query is a carefully choreographed one where every message knows its place." —— 读通 Postgres wire protocol 的领悟

本章要点

  • sqlx-postgres(19841 行代码)实现了Postgres wire protocol 完整栈——从 TCP 建立到 SCRAM 认证、Startup 消息、Simple Query 协议、Extended Query 协议、COPY 协议、LISTEN/NOTIFY。
  • Postgres 协议有两大流派Simple Query(一条 Q 消息发 SQL + 文本结果) vs Extended Query(Parse / Bind / Execute / Sync 四消息 + 二进制结果)。sqlx 默认走 Extended Query——更快、类型精确、支持 prepared。
  • Extended Query 的五消息序列sqlx-postgres/src/connection/executor.rs:198-360):Parse(编译 SQL)→ Bind(绑定参数到 portal)→ Execute(跑 portal)→ Close(清 portal)→ Sync(边界 + 触发 ReadyForQuery)。
  • 连接建立的三阶段sqlx-postgres/src/connection/establish.rs):TCP/TLS 握手 → Startup + SCRAM 认证 → 接收 BackendKeyData / ParameterStatus / 第一个 ReadyForQuery。约 8-10 轮消息。
  • pending_ready_for_query_countconnection/mod.rs:70)—— 跟踪"已发多少 Sync 还没收到 ReadyForQuery"。这个计数让 sqlx 可以流水线发多条 query(虽然 0.8 主要是防止状态错位)。
  • prepared statement cacheStatementCache<(StatementId, Arc<PgStatementMetadata>)>)—— LRU 缓存,默认容量 100。每条 SQL 第一次执行时发 Parse、后续直接复用 statement ID 跳过 Parse。
  • PgStreamconnection/stream.rs)—— 缓冲 I/O 层:wraps TcpStream/UnixStream → 可选 TLS → BufReader/BufWriter → 提供 write_msg / recv / flush。协议级消息的序列化入口
  • sqlx-postgres 的 message/ 目录有 26 个文件——每种协议消息一个文件(Startup、Parse、Bind、Execute、DataRow、RowDescription、ReadyForQuery、SASL 等)。每文件实现自己的 encode/decode。

16.1 问题引入:Postgres 协议与 Rust 异步的融合

用户写一行 sqlx::query("SELECT id, name FROM users WHERE id = $1").bind(42).fetch_one(&pool).await?——背后实际和 Postgres 服务端交换了多少消息?答案:至少 5-7 条协议消息(走 Extended Query + 带参数)。

每条消息都有严格的格式:单字节类型标识 + 4 字节长度 + 载荷。消息按 TCP 流顺序发送——client 和 server 各自维护一个状态机、处理消息按既定顺序。

Rust 异步的挑战:TCP 是字节流、消息边界要 Rust 代码自己识别;每条消息的序列化 / 反序列化要走二进制格式;消息相互依赖顺序(例如必须 Parse 后才能 Bind);错误路径(ErrorResponse)可以在任何消息之间发生。

sqlx-postgres 的19841 行代码大部分就在做这些事——把 Postgres 协议完整地翻译成 Rust async 代码。本章不可能讲完每一处——但会把协议的主干关键实现技巧拆开、让你读懂任何 sqlx-postgres 的 handler 代码。

16.2 Postgres Wire Protocol 全貌

Postgres wire protocol(https://www.postgresql.org/docs/current/protocol.html)从 1996 年的 v2 到现在的 v3.0,已经稳定了 20 多年。核心结构

  • Frontend → Backend 消息(client 发给 server):Q、P、B、E、S、X、...
  • Backend → Frontend 消息(server 发给 client):R、K、Z、T、D、C、E、...

每个消息格式

+-----------+------------+----------+
| 类型字节  | 长度 (u32) | 载荷     |
| 1 byte    | 4 bytes    | N bytes  |
+-----------+------------+----------+
  • 类型字节:大写 ASCII 字母,标识消息类型。Q = simple Query、P = Parse、B = Bind、E = Execute、S = Sync、Z = ReadyForQuery 等。
  • 长度:含自身(4 字节)和载荷,不含类型字节。
  • 载荷:消息类型特定格式,包含字符串(null-terminated)、整数(大端)、可变数据。

16.2.1 两大协议流派

Postgres 协议提供两种 query 执行流派:

Simple Query(简单查询)

Client → Server: Q "SELECT * FROM users WHERE id = 1"
Server → Client: T (RowDescription)
Server → Client: D (DataRow) × N
Server → Client: C "SELECT 1" (CommandComplete)
Server → Client: Z (ReadyForQuery)

一条 Q 消息发 SQL 字符串——服务端按字符串解析、执行、返回结果。不支持参数绑定(SQL 里不能有 $1)、结果全用 Text format

Extended Query(扩展查询)

Client → Server: P (Parse, 编译 SQL)
Client → Server: B (Bind, 绑定参数)
Client → Server: E (Execute, 跑 portal)
Client → Server: C (Close Portal)
Client → Server: S (Sync, 边界)
Server → Client: 1 (ParseComplete)
Server → Client: 2 (BindComplete)
Server → Client: D (DataRow) × N
Server → Client: C (CommandComplete)
Server → Client: 3 (CloseComplete)
Server → Client: Z (ReadyForQuery)

五消息发送 + 六消息接收——复杂但好处多:

  • 支持参数——Bind 消息带参数值、SQL 里用 $1$2
  • prepared statement —— Parse 可以命名 statement、后续 Bind 直接用名字跳过 Parse。
  • Binary format —— 参数和结果默认用二进制编码、比文本紧凑快。
  • Portal —— 执行结果的游标、可以部分读取(Execute 带 limit)。

sqlx 的 Extended Query 路径是业务代码的默认路径——性能更好、功能更强。Simple Query 只在 raw_sql() 和某些协议内部使用。

16.3 连接建立:9 步握手

sqlx-postgres/src/connection/establish.rsPgConnection::establish 方法(约 200 行)实现完整握手。简化步骤:

Step 1:TCP 连接

rust
let mut stream = PgStream::connect(options).await?;

内部调 tokio::net::TcpStream::connect 或 Unix socket connect。

Step 2:SSL 升级(可选)

如果 ssl_mode = Require/VerifyCa/VerifyFull——先发 SSLRequest(8 字节特殊消息)、server 返回 S(支持)或 N(不支持)、然后 TLS 握手。

Step 3:Startup 消息

rust
stream.write(Startup {
    username: Some(&options.username),
    database: options.database.as_deref(),
    params: &[("DateStyle", "ISO, MDY"), ("client_encoding", "UTF8"), ...],
})?;

Startup 不带类型字节(历史遗留)——只有长度 + 载荷。载荷是 null-terminated key-value 对:user / database / 其他参数。

Step 4:循环接收 Authentication

rust
loop {
    let message = stream.recv().await?;
    match message.format {
        BackendMessageFormat::Authentication => match message.decode()? {
            Authentication::Ok => { /* 认证完成,跳出 */ }
            Authentication::CleartextPassword => { /* 发 PasswordMessage */ }
            Authentication::Md5Password { salt } => { /* MD5 hash + 发 PasswordMessage */ }
            Authentication::Sasl { mechanisms } => { /* SCRAM-SHA-256 挑战响应 */ }
            // ...
        }
        // ...
    }
}

认证机制有 4-5 种——cleartext(不安全,测试用)、MD5(老 Postgres)、SCRAM-SHA-256(新标准)、GSSAPI(企业单点登录)、LDAP(少见)。每种发送不同的 PasswordMessage 格式。

SCRAM-SHA-256 的挑战响应sqlx-postgres/src/connection/sasl.rs,约 200 行)是最复杂的——Client First、Server First、Client Final、Server Final 四条消息往返、每条都用 HMAC-SHA-256 计算 proof。sqlx 直接用 hmac + sha2 crate 实现。

Step 5:接收 ParameterStatus

服务端发一堆 ParameterStatus 消息——告诉 client 服务端的配置:server_versionTimeZoneclient_encodinginteger_datetimes 等。sqlx 存进 PgStream 的相关字段。

Step 6:接收 BackendKeyData

K + process_id (i32) + secret_key (i32)

用于 CancelRequest——之后如果想取消正在跑的 query、用 process_id + secret_key 建新 TCP 连接发 CancelRequest。sqlx 存到 PgConnectionInner::process_id / secret_key(第 12 章 §12.11)。

Step 7:接收 ReadyForQuery

Z + 状态字节 ('I' = idle, 'T' = in tx, 'E' = in failed tx)

Z 'I' 标志认证流程完全结束——连接可以开始执行 query。sqlx 在 PgConnectionInner::transaction_status = r.transaction_status 记录状态。

总共 9 步、约 8-15 条消息——本地 1-5ms、跨机房 50-200ms。这就是 Pool 要复用连接的原因——握手开销不可忽视。

16.3.1 参数协商的含义

Startup 消息里发的 ("DateStyle", "ISO, MDY") 等参数是告诉 server 用什么样的格式

  • DateStyle:"ISO, MDY" 让 server 按 ISO 格式输出日期(2026-04-24 而不是 04/24/2026)。
  • client_encodingUTF8——client 发送 / 接收字符串的编码。
  • TimeZoneUTC——时间戳的默认时区。
  • extra_float_digits:浮点数精度(默认 0——短十进制;3——全精度)。

这些参数是 per-session 的——建连接时协商好、后续 session 期间生效。SET DateStyle = ... SQL 也能动态改。sqlx 在 Startup 里一次性设好、避免后续每条 query 都受影响。

16.3.2 连接建立的完整序列图

把 9 步握手用 mermaid 画出来:

这张图显示本地 Postgres 建立连接约 10 条消息、跨机房会放大 RTT×10——明显到让人觉得"为什么这么慢"。所以 Pool 复用连接必不可少。

16.4 Extended Query 的五消息序列

建立连接后,真正的 query 执行走 Extended Query。以 SELECT id, name FROM users WHERE id = $1 + bind(42) 为例:

16.4.1 Parse 消息

P + len + statement_name (C-string) + query (C-string) + num_params (u16) + param_types (OID × num)

作用:让 server 编译 SQL 成 parse tree + 优化计划。

  • statement_name:空串(不命名、unnamed statement)或 "s_0"(命名、prepared)。
  • query:SQL 字符串。
  • num_params:参数个数。sqlx 通常传 0——让 server 自己推类型;也可以传具体 OID 数组强制类型。

sqlx 代码(executor.rs 相关处):

rust
conn.inner.stream.write_msg(Parse {
    param_types: &param_types,
    query: sql,
    statement: id,  // StatementId::UNNAMED 或命名
})?;

16.4.2 Bind 消息

B + len + portal_name (C-string) + statement_name (C-string) + param_format_count (u16)
   + param_formats (u16 × count) + num_params (u16) + params (length-prefixed bytes × num)
   + result_format_count (u16) + result_formats (u16 × count)

作用:给 statement 绑定参数值、创建 portal。

  • portal_name:空串(unnamed portal)。
  • statement_name:对应 Parse 里的名字。
  • param_formats[PgValueFormat::Binary]——所有参数用二进制。
  • params:每个参数 len(i32) + N bytes——-1 表示 NULL、正数表示字节数(参见第 6 章 §6.4.1)。
  • result_formats[PgValueFormat::Binary]——返回的行值用二进制。
rust
conn.inner.stream.write_msg(Bind {
    portal: PortalId::UNNAMED,
    statement,
    formats: &[PgValueFormat::Binary],
    num_params,
    params: &arguments.buffer,
    result_formats: &[PgValueFormat::Binary],
})?;

16.4.3 Execute 消息

E + len + portal_name (C-string) + max_rows (u32)

作用:执行 portal、返回最多 max_rows 行。max_rows = 0 表示"全部返回"。

rust
conn.inner.stream.write_msg(message::Execute {
    portal: PortalId::UNNAMED,
    limit: 0,  // 0 = all rows
})?;

注意 limit = 0——sqlx 0.8 有一个注释(executor.rs:252-253):

Non-zero limits cause query plan pessimization by disabling parallel workers

非 0 limit 会让 Postgres 关闭并行 worker——为了方便 suspend/resume、不做 parallel execute。所以 sqlx 默认 limit = 0 让 query plan 最优。

16.4.4 Close 消息

C + len + 'P' (portal) or 'S' (statement) + name (C-string)

作用:关 portal 或 statement、释放 server 资源。

rust
conn.inner.stream.write_msg(Close::Portal(PortalId::UNNAMED))?;

只关 unnamed portal——statement 留着让 cache 复用。

16.4.4-1 Extended Query 的完整消息序列图

一次完整 sqlx::query("SELECT ... WHERE id = $1").bind(42).fetch_one(&pool).await? 的协议序列:

sqlx 的写消息合并:Bind / Execute / Close / Sync 一次 flush 发 4 条消息——一次 TCP write syscall。服务端按序处理、依次返回响应。整个过程 1 个 RTT——非常紧凑。

如果 cache miss(第一次执行):

2 个 RTT——Parse 一次、Bind/Execute 一次。之后同条 SQL 的执行都只要 1 个 RTT(cache 命中)。这是 prepared statement 的性能收益——第一次慢、后续快。

16.4.5 Sync 消息

S + len (5)

作用协议分割点——告诉 server "之前发的消息处理完、回一个 ReadyForQuery"。

rust
self.write_sync();
// 内部 pending_ready_for_query_count += 1

关键:Sync 是事务 atomic unit 的边界——从上次 Sync 到这次 Sync 之间的所有消息作为一个 implicit transaction 执行(如果不在显式事务中)。ErrorResponse 发生时、service 会跳过到下一个 Sync 才重新处理。

16.5 run 方法的完整流程

executor.rs:198-360PgConnection::run 方法把五消息序列组织成 Rust async 代码:

rust
pub(crate) async fn run<'e, 'c: 'e, 'q: 'e>(
    &'c mut self,
    query: &'q str,
    arguments: Option<PgArguments>,
    persistent: bool,
    metadata_opt: Option<Arc<PgStatementMetadata>>,
) -> Result<impl Stream<...>, Error> {
    self.wait_until_ready().await?;  // 等之前的 ReadyForQuery

    if let Some(mut arguments) = arguments {
        // Extended Query 路径
        let (statement, metadata) = self.get_or_prepare(query, &arguments.types, persistent, metadata_opt).await?;
        arguments.apply_patches(self, &metadata.parameters).await?;  // §6.4.3
        self.wait_until_ready().await?;

        self.inner.stream.write_msg(Bind { ... })?;
        self.inner.stream.write_msg(Execute { ... })?;
        self.inner.stream.write_msg(Close::Portal(...))?;
        self.write_sync();
    } else {
        // Simple Query 路径(无参数)
        self.inner.stream.write_msg(Query(query))?;
        self.inner.pending_ready_for_query_count += 1;
    }

    self.inner.stream.flush().await?;

    // 返回 stream:用 try_stream! 循环接收消息、yield Row 或 QueryResult
    Ok(try_stream! {
        loop {
            let message = self.inner.stream.recv().await?;
            match message.format {
                BackendMessageFormat::BindComplete | ParseComplete | ... => { /* 忽略 */ }
                BackendMessageFormat::CommandComplete => { yield Left(query_result); }
                BackendMessageFormat::RowDescription => { /* 更新 metadata */ }
                BackendMessageFormat::DataRow => { yield Right(row); }
                BackendMessageFormat::ReadyForQuery => {
                    self.inner.pending_ready_for_query_count -= 1;
                    return;  // 流结束
                }
                BackendMessageFormat::ErrorResponse => { return Err(...); }
                _ => {}
            }
        }
    })
}

核心逻辑

  1. wait_until_ready:等之前的 query 完成(ReadyForQuery 收到)。
  2. Extended / Simple 分叉:有参数走 Extended、无参数走 Simple。
  3. get_or_prepare:查 statement cache、没有就 Parse、有就复用。
  4. apply_patches:处理 PgArguments 的 type holes(第 6 章 §6.4.3)。
  5. write_msg × 4 + write_sync:发完五消息。
  6. flush:把 write buffer 的字节送出 TCP。
  7. try_stream! 接收:循环 recv、按 message format 分发。

这段代码把 Postgres 协议状态机和 Rust async stream 融合——对上层 Executor trait 提供 BoxStream<Either<QueryResult, Row>>、对下层用协议消息和服务端对话。

16.6 prepared statement cache

executor.rs:120-197get_or_prepare

rust
async fn get_or_prepare(&mut self, sql: &str, parameters: &[PgTypeInfo],
                       persistent: bool, metadata: Option<Arc<PgStatementMetadata>>)
    -> Result<(StatementId, Arc<PgStatementMetadata>), Error>
{
    // 1. 查缓存
    if persistent {
        if let Some(cached) = self.inner.cache_statement.get_mut(sql) {
            return Ok(cached.clone());
        }
    }

    // 2. 没缓存——发 Parse
    let id = if persistent {
        let id = self.inner.next_statement_id;
        self.inner.next_statement_id = id.next();
        id
    } else {
        StatementId::UNNAMED
    };

    self.inner.stream.write_msg(Parse { ... })?;
    self.inner.stream.write_msg(message::Describe::Statement(id))?;
    self.write_sync();
    self.inner.stream.flush().await?;

    // 3. 接收 ParseComplete + ParameterDescription + RowDescription
    self.inner.stream.recv_expect::<ParseComplete>().await?;
    let parameters = recv_desc_params(self).await?;
    let (columns, column_names) = self.handle_row_description(...).await?;

    // 4. 存进缓存(如果 persistent)
    let metadata = Arc::new(PgStatementMetadata { columns, parameters, column_names: Arc::new(column_names) });
    if persistent {
        if let Some(old) = self.inner.cache_statement.insert(sql, (id, metadata.clone())) {
            // 旧的 statement 被踢——发 Close::Statement 释放 server 资源
            self.inner.stream.write_msg(Close::Statement(old.0))?;
            self.write_sync();
        }
    }

    Ok((id, metadata))
}

四步

  1. 查缓存——命中直接用 StatementId。
  2. 发 Parse + Describe——Parse 编译 SQL、Describe 让 server 返回参数和列信息。
  3. 接收描述信息——ParameterDescription(每参数 OID)+ RowDescription(每列信息)。
  4. 存缓存——替换旧的(如果 LRU 驱逐)、发 Close 释放 server statement。

命名规则:sqlx 用 s_0 / s_1 / s_2 递增命名(StatementId::next())。每连接独立——不同连接的 statement ID 空间不共享(也不需要)。

LRU 淘汰时释放 server 资源——避免 server 端累积无用 statement 占内存。这条细节是 sqlx 对"资源管理"的 dedication——不只管客户端、也管 server 端资源。

16.7 pending_ready_for_query_count:流水线跟踪

PgConnectionInner::pending_ready_for_query_countconnection/mod.rs:70)跟踪"已发多少 Sync 还没收到 ReadyForQuery"。

理论上 Postgres 支持 pipelining——可以一次发 N 条 Sync、server 顺序处理、client 按顺序接收 N 条 ReadyForQuery。但 sqlx 0.8 不完全利用这个——每条 query 结束才发下一条。pending_ready_for_query_count 主要是防错

rust
pub(crate) async fn wait_until_ready(&mut self) -> Result<(), Error> {
    if !self.inner.stream.write_buffer_mut().is_empty() {
        self.inner.stream.flush().await?;
    }

    while self.inner.pending_ready_for_query_count > 0 {
        let message = self.inner.stream.recv().await?;
        if let BackendMessageFormat::ReadyForQuery = message.format {
            self.handle_ready_for_query(message)?;
        }
    }
    Ok(())
}

wait_until_ready 循环消耗 pending 的 ReadyForQuery——保证新 query 开始前 pipeline 清空。这是保守的做法——保证消息状态机干净、但牺牲了部分 pipelining 性能。

pipelining 的潜在收益

没 pipelining: send(Q1) → recv(R1) → send(Q2) → recv(R2) ——2 次 RTT
有 pipelining: send(Q1) → send(Q2) → recv(R1) → recv(R2) ——1 次 RTT

RTT 10ms 跨机房下,pipelining 能把两条独立 query 的时间从 20ms 降到 10ms——减半。sqlx 未来可能加 pipeline() 方法显式支持——但 0.8 还没有。

16.8 PgStream:缓冲 I/O 层

connection/stream.rsPgStream 是协议和 Tokio I/O 的桥梁。结构(简化):

rust
pub struct PgStream {
    pub(crate) inner: MaybeTlsStream<BufStream<Socket>>,
    pub(crate) notifications: Option<UnboundedSender<Notification>>,
    pub(crate) server_version_num: Option<u32>,
    // ...
}

三层嵌套

  1. Sockettokio::net::TcpStreamUnixStream
  2. BufStreamtokio::io::BufStream 提供 buffered read/write。
  3. MaybeTlsStream:根据 ssl_mode 决定是否走 TLS——Plain(BufStream<Socket>)Tls(...)

核心方法

  • write<M: FrontendMessage>(&mut self, msg: M)——把消息编码到 write buffer。
  • flush(&mut self)——把 write buffer 发到 server。
  • recv(&mut self)——从 read buffer 读一条完整消息(解析类型字节 + 长度 + 载荷)。

缓冲的价值:多条 Extended Query 消息(Parse + Bind + Execute + Close + Sync)可以先全部 write、最后一次 flush——减少 TCP syscall。如果每条消息都单独 send,会有 4-5 次 write syscall;合并后一次 syscall 发 100-200 字节。

这层缓冲是性能关键——executor.rs:293self.inner.stream.flush().await? 只在所有消息写完后才调一次、压缩 syscall 数。

16.9 message/ 模块:26 个协议消息

sqlx-postgres/src/message/ 目录有 26 个文件——每个对应一种协议消息:

  • Frontend 消息:Startup、Password、SASL、Parse、Bind、Execute、Describe、Close、Sync、Query、Flush、Terminate、Copy(3 种)。
  • Backend 消息:Authentication、BackendKeyData、ParseComplete、BindComplete、ParameterDescription、RowDescription、DataRow、CommandComplete、EmptyQueryResponse、PortalSuspended、ErrorResponse、NoticeResponse、ParameterStatus、NotificationResponse、ReadyForQuery、CloseComplete、Copy(3 种)。

每个文件实现 encode(frontend)或 decode(backend)。模式类似

rust
// message/parse.rs
pub struct Parse<'a> {
    pub statement: StatementId,
    pub query: &'a str,
    pub param_types: &'a [Oid],
}

impl Encode for Parse<'_> {
    fn encode_with(&self, buf: &mut Vec<u8>, _context: Self::Context) {
        buf.put_message_with_len('P', |buf| {
            buf.put_statement_id(self.statement);
            buf.put_str_nul(self.query);
            buf.put_length_prefixed_slice(self.param_types, |buf, ty| {
                buf.put_i32_be(ty.0 as i32);
            });
        });
    }
}

put_message_with_len 是个 helper——写类型字节、留 4 字节长度占位、让 closure 写载荷、回填长度。封装了 -先留坑再回填- 模式(第 6 章 §6.4.1)。

rust
// message/data_row.rs (backend)
impl BackendMessage for DataRow {
    const FORMAT: BackendMessageFormat = BackendMessageFormat::DataRow;

    fn decode_body(buf: Bytes) -> Result<Self, Error> {
        // 解析列数 + 按列读长度 + 字节
    }
}

Backend 消息走 decode_body——buf 是 server 发来的载荷字节(不含类型和长度字段)。

这 26 个文件加起来约 2000-3000 行——协议编解码的全部实现。每个文件 50-200 行——清晰、分离、可测试。

16.10 ErrorResponse 的处理

协议中 任何时候 server 都可能发 ErrorResponse——查询语法错、认证失败、连接被 kill、数据库挂掉。sqlx 在 stream.recv() 里统一处理:

rust
// stream.rs::recv 简化
pub async fn recv(&mut self) -> Result<ReceivedMessage, Error> {
    loop {
        let message = self.recv_raw().await?;
        match message.format {
            BackendMessageFormat::ErrorResponse => {
                let error: PgDatabaseError = message.decode()?;
                return Err(Error::Database(Box::new(error)));
            }
            BackendMessageFormat::NoticeResponse => {
                // Notice 只是警告——记日志不返错
                log::warn!("postgres notice: {}", notice);
                continue;
            }
            BackendMessageFormat::NotificationResponse => {
                // LISTEN/NOTIFY 通知——路由到 PgListener
                if let Some(sender) = &self.notifications {
                    let _ = sender.send(notification);
                }
                continue;
            }
            _ => return Ok(message),  // 正常消息、返回给调用方
        }
    }
}

三种非正常消息

  • ErrorResponse——转 Error::Database、上抛。
  • NoticeResponse——server 级警告(WARNING: column 'x' is deprecated)、记日志不打断。
  • NotificationResponse——LISTEN/NOTIFY 通知、发送到 channel 给 PgListener。

这条 recv 里的消息过滤循环让上层代码不用处理 notice / notification——它们被透明消化。上层只收正常消息和错误——业务逻辑清爽。

16.11 Postgres 协议之外的特性

除了 Extended Query 协议栈,sqlx-postgres 还实现了几个协议扩展

  • PgListenerlistener.rs,约 500 行)——LISTEN / NOTIFY 订阅。
  • PgAdvisoryLockadvisory_lock.rs,约 200 行)——Postgres 的 app-level 锁。
  • COPY FROM / COPY TOcopy.rs,约 500 行)——批量数据导入导出的高效协议。
  • NOTIFY 路由——多路 async task 订阅同一 channel 不同 event。

这些都是 Postgres 独有的高级特性——sqlx 把它们纳入驱动、让 Rust 生态拥有 Postgres 全部能力,不只是普通 SELECT/INSERT。

16.11.1 PgListener 简介

rust
let mut listener = PgListener::connect(&pool).await?;
listener.listen("channel_name").await?;

loop {
    match listener.recv().await {
        Ok(notification) => {
            println!("Got: {}", notification.payload());
        }
        Err(e) => { /* 重连等 */ }
    }
}

内部结构:PgListener 持有一个专用的 PgConnection、发 LISTEN channel_name SQL、然后循环 recv——recv 直接走 PgStream::recv、路由 NotificationResponse 消息。

这条特性让 Rust 里实现Postgres 作为消息队列成为可能——很多轻量消息场景不需要额外上 Kafka,Postgres 的 NOTIFY 足够。

16.12 本章小结

本章把 sqlx-postgres 驱动的核心拆开:

  1. Postgres wire protocol 两流派(§16.2)—— Simple Query vs Extended Query。sqlx 默认 Extended——类型精确、支持 prepared、二进制更快。
  2. 连接建立九步(§16.3)—— TCP / SSL / Startup / SCRAM 认证 / ParameterStatus / BackendKeyData / ReadyForQuery。握手 8-15 条消息。
  3. Extended Query 五消息(§16.4)—— Parse / Bind / Execute / Close / Sync。每条的格式和作用。
  4. run 方法整合(§16.5)—— Rust async stream + 协议状态机。wait_until_ready / get_or_prepare / write_msg × 4 / flush / try_stream!。
  5. prepared statement cache(§16.6)—— LRU + 淘汰时 Close server-side statement。
  6. pending_ready_for_query_count(§16.7)—— 流水线跟踪、防状态错位。pipelining 潜力未完全利用。
  7. PgStream 缓冲 I/O(§16.8)—— 三层嵌套(Socket / BufStream / MaybeTls),write_msg 到 buffer、一次 flush 减少 syscall。
  8. 26 个 message 文件(§16.9)—— Frontend / Backend 消息各一个文件、清晰分离。put_message_with_len helper 封装"留坑回填"模式。
  9. ErrorResponse 透明处理(§16.10)—— recv 循环里统一过滤 notice / notification、业务只收正常消息。
  10. 协议扩展(§16.11)—— LISTEN/NOTIFY、advisory lock、COPY——sqlx 把 Postgres 全能力纳入 Rust 生态。

下一章我们看 MySQL 驱动——同样的 sqlx-core trait 家族、完全不同的协议实现。

16.13 Postgres 协议和 Tokio I/O 的融合点

sqlx-postgres 把 Postgres 协议翻译成 Rust async 代码的三个关键融合点值得单独讲:

融合点 1:AsyncReadExt::read_exact 读消息头

Postgres 每条消息都是"1 字节类型 + 4 字节长度 + N 字节载荷"。recv_raw 逻辑:

rust
async fn recv_raw(&mut self) -> Result<RawMessage, Error> {
    let mut header = [0u8; 5];
    self.inner.read_exact(&mut header).await?;  // 读 5 字节头
    let format = BackendMessageFormat::try_from_u8(header[0])?;
    let len = u32::from_be_bytes(header[1..5].try_into().unwrap()) as usize - 4;

    let mut body = BytesMut::with_capacity(len);
    body.resize(len, 0);
    self.inner.read_exact(&mut body).await?;  // 读 N 字节载荷

    Ok(RawMessage { format, body: body.freeze() })
}

两次 read_exact——Tokio 保证每次读够指定字节数(否则 yield 让出 async)。这条模式是协议消息解析的标准做法——固定头部再变长体。

融合点 2:Pin<&mut Future> 配合 try_stream!

run 方法返回 impl Stream ——内部用 async-stream::try_stream! 宏。每次 recv 是一个 await 点、r#yield!(value) 产生 stream 的下一项。

这让Postgres 的消息流变成 Rust 的 Stream——上层 Executor 能用 try_next / try_collect / for_each 等 combinators 消费。

融合点 3:BufStream 的智能刷新

PgStream::flush 只在 write buffer 非空时调 syscall——如果之前 write_msg 的字节还没发、flush 一次发完;如果已经全发完(上次 flush 后没 write),什么都不做。避免无意义的空 syscall。

这三个融合点把字节流 I/O消息协议Rust async stream三个抽象层串起来——每层各自简单、组合后能力强大。

16.14 Postgres 驱动的性能特征

sqlx-postgres 的性能特征几条重要观察:

1. prepared statement 比 simple query 稍快(约 5-10%)——binary format 紧凑 + 省 Parse。 2. cache hit 比 cache miss 快一倍——省 1 RTT(Parse + Describe)。 3. pipelining 未完全利用——pipelining 有 15-30% 潜力但 sqlx 0.8 没实现。 4. 大结果集 fetch 是 O(N) 线性——每条 DataRow 解码 + yield 的 async task yield 让步——吞吐受限于这条同步协议路径。 5. COPY 比 INSERT 快 10-100 倍——批量导入场景 COPY FROM STDIN 协议高效。

这些特征让 sqlx-postgres 在大多数场景性能优秀——普通 web 业务用它零烦恼。真要压极致性能(每秒万级 query、低延迟)才需要考虑更底层(tokio-postgres + 自己的 pipeline 管理)。

16.15 和 tokio-postgres 的对比

sqlx-postgres 和 tokio-postgres(rust-postgres 生态)都实现 Postgres 协议——但定位不同:

维度sqlx-postgrestokio-postgres
目标用户业务开发者基础设施 / 协议级工具开发者
API 风格高抽象(Pool, Query, FromRow)低抽象(client.query(...)
参数绑定.bind(x).bind(y) 链式 + Encode trait&[&dyn ToSql] 切片
行解码FromRow 派生 + try_getrow.get<T>(idx) panic on error
Pipelining未完全(wait_until_ready 保守)支持(pipeline API)
编译期校验query! 宏
LISTEN / NOTIFYPgListener 高级封装手动实现
COPY 支持
代码量19841 行~8000 行
编译时间较长(宏系统 + derive)较短

选择规则

  • 业务后端:用 sqlx-postgres——抽象 + 类型安全值得。
  • 工具 / 代理:用 tokio-postgres——更贴近协议、更好控制。
  • 极致性能:用 tokio-postgres + 自己的 pipeline 实现。

两者不是"谁更好"——是"针对不同用户"。sqlx-postgres 的 2 倍代码量换来了对业务开发者的更多抽象——这个交换对大多数 Rust 项目合算。

16.16 sqlx-postgres 的可扩展性

sqlx-postgres 有几个扩展点让用户或上层库能定制:

1. 自定义类型的 Encode/Decode/Type——第 5 章讲过、让用户加任意 Rust 类型到 Postgres 映射。 2. PgConnectOptions::options(...)——传自定义 server options(-c statement_timeout=5000)。 3. after_connect 回调(第 13 章 §13.11)——每条新连接初始化 session 参数。 4. PgListener::from_connection(...)——接管已有连接做 LISTEN——和自己的连接池集成。 5. PgQueryResultrows_affected——标准字段;对 INSERT RETURNING 场景 sqlx 把 fetch_one 返回 row、rows_affected 不可靠——这是协议限制。

生产 sqlx-postgres 用户扩展最常用的是 1 和 3——加新类型(UUID / chrono / 业务类型)+ session 初始化。其他扩展点用得少但有时关键。

16.17 本章的核心要义

回看整章、提炼三条核心要义:

1. Extended Query 协议是 Postgres 的现代标准——所有业务代码走 Extended、只有少数场景(migration / raw_sql)走 Simple。理解这条协议就理解 sqlx-postgres 99% 的行为。

2. prepared statement cache 是性能关键——第一次 query 慢(Parse)、后续快(直接 Bind/Execute)。persistent(true) 默认开启正是为此。

3. Rust async + Tokio I/O 是协议实现的好载体——字节流读写、缓冲、消息状态机、stream 返回——每层 Rust 生态都有合适工具。sqlx-postgres 的 20000 行代码看起来多、实际每一千行都在做具体可定位的事。

读完本章你对 "Postgres server 后面到底怎么运作" 有了完整的心智模型——下次看到"为什么这条 query 这么慢"的问题时能精确分解到协议级原因(Parse 开销 / 数据量 / RTT / pipelining / 等等)。这是从用户到工程师的关键跃迁。

16.18 延伸阅读

如果本章激发了你对 Postgres 协议的兴趣,推荐几个方向:

  • 官方协议文档postgresql.org/docs/current/protocol.html —— 300 页详细说明、是 ground truth。
  • tokio-postgres 源码——对比 sqlx 的另一种实现风格。
  • Postgres server 侧——backend/libpq/ 子目录是 server 侧协议实现(C 代码)——看到 server 端怎么响应对客户端更有感觉。
  • psql 二进制——开源的 Postgres CLI 工具、自己实现 client 协议——作为另一个参考。

这些资源加起来能让你对 Postgres 协议的理解远超 Rust 领域——直接进入"懂 Postgres 内部"的工程师行列。

16.19 sqlx-postgres 的代码结构导览

sqlx-postgres crate 的目录结构(约 108 个文件、19841 行):

sqlx-postgres/src/
├── lib.rs                    顶层 re-export
├── arguments.rs              PgArguments(第 6 章)
├── column.rs                 PgColumn
├── connection/
│   ├── mod.rs                PgConnection struct
│   ├── establish.rs          握手流程
│   ├── executor.rs           run 方法 + Executor impl
│   ├── stream.rs             PgStream
│   ├── sasl.rs               SCRAM-SHA-256
│   ├── tls.rs                TLS 升级
│   └── describe.rs           Describe 查询
├── copy.rs                   COPY FROM/TO
├── database.rs               impl Database for Postgres
├── error.rs                  PgDatabaseError
├── io/                       底层 I/O helper
├── listener.rs               PgListener
├── message/                  26 个协议消息文件
├── options/                  PgConnectOptions(约 1000 行)
├── query_result.rs           PgQueryResult
├── row.rs                    PgRow
├── statement.rs              PgStatement
├── transaction.rs            PgTransactionManager
├── type_checking.rs          编译期类型映射
├── type_info.rs              PgTypeInfo
├── types/                    内置类型的 Encode/Decode/Type
└── value.rs                  PgValue / PgValueRef

文件组织的几个观察

  1. 每个公共类型一个文件——PgConnection / PgStream / PgListener 等各自独立——方便 IDE 导航和 review。
  2. connection/ 子目录放协议握手相关——stream / sasl / tls / establish / executor——这些是 Postgres 驱动最复杂部分。
  3. message/ 独立——26 个消息文件清晰分离——符合协议级抽象的清晰度。
  4. **types/**独立子目录——每种类型(int / str / bool / chrono / uuid / json)一个文件——加新类型不影响其他代码。
  5. options/ 约 1000 行——PgConnectOptions 的 40+ 配置方法占了相当篇幅——但分成多个 sub-file 可读。

读 sqlx-postgres 源码时按这个结构导航——先看 connection/ 了解握手、再看 message/ 了解协议、最后 executor.rs 看整合。20000 行代码逐个文件读可能要 8-10 小时、但有明确的阅读路径。

16.20 协议变化与驱动维护

Postgres 协议从 1997 年的 v3.0 到现在几乎没有 breaking change——这是 sqlx-postgres 能稳定 6-7 年的基石。具体变化:

  • v3.0(1999):当前版本。sqlx 和几乎所有现代 client 用。
  • 后续增量改进
    • SCRAM-SHA-256 (Postgres 10,2017)——sqlx 默认支持。
    • Lastval 对 Async API(未来 Postgres 18?)——sqlx 未支持。

协议稳定意味着驱动维护成本低——sqlx 的协议代码几乎不用随 Postgres 新版本变、只是加新类型(pgvector / tsvector 等)。这和某些"协议持续演进"的技术(比如 gRPC)相反。

实际 sqlx-postgres 的 commit 历史里,协议相关改动类型支持的比例约 1:9——大部分工作是加 chrono / bigdecimal / uuid 等的适配、而不是改协议。这让 sqlx-postgres 的长期维护相对轻松——新手贡献者加一个类型适配(比如 PostGIS::Geometry 支持)容易上手、不涉及协议核心。

16.21 Postgres 驱动对其他 DB 的启示

第 17 章 MySQL 和第 18 章 SQLite 的驱动结构深度参考 sqlx-postgres——因为它是最早也最成熟的。观察 sqlx-postgres 你会看到其他驱动的骨架

  • connection/ 子目录的组织——MySQL / SQLite 也是 mod.rs + establish.rs + executor.rs + ...
  • message/(或等价)目录——MySQL 有 protocol/ 目录放消息、SQLite 有 statement/ 目录(SQLite 不是网络协议、statement 是等价抽象)。
  • types/ 目录——三家完全一致——每类型一文件。
  • options/ 目录——三家都有 ConnectOptions 的丰富配置。

这条一致性让 sqlx 贡献者学会一个驱动、能改所有驱动——这是 sqlx-core 的 trait 家族驱动层实现标准化的价值兑现。

读懂 sqlx-postgres 你对 sqlx-mysql / sqlx-sqlite 的理解就能迁移 70%——剩下 30% 是协议特殊性(MySQL 的 COM_STMT_EXECUTE / SQLite 的 worker 线程)。第 17 章 / 第 18 章会讲那 30%。

16.22 实战:Postgres 特有特性的 sqlx 使用

sqlx-postgres 支持 Postgres 独有的几个特性——这些在其他 DB 上没有、值得专门用 sqlx-postgres 享受:

1. JSONB 查询 + 索引

rust
#[derive(sqlx::FromRow)]
struct Event {
    id: i32,
    #[sqlx(json)]
    payload: serde_json::Value,
}

// 用 JSONB 操作符查询
let events: Vec<Event> = sqlx::query_as(
    "SELECT id, payload FROM events WHERE payload @> $1"
).bind(serde_json::json!({"user_id": 42})).fetch_all(&pool).await?;

Postgres 的 JSONB 操作符(@>?->->>)让 JSON 字段的索引查询变高效——sqlx 的 #[sqlx(json)] 完美配合。

2. ARRAY 类型 + UNNEST

rust
let ids: Vec<i32> = vec![1, 2, 3, 4];

// 批量查询
let users: Vec<User> = sqlx::query_as(
    "SELECT * FROM users WHERE id = ANY($1)"
).bind(&ids).fetch_all(&pool).await?;

// 批量插入用 UNNEST(第 10 章 §10.10)
sqlx::query("INSERT INTO users (name, age) SELECT * FROM UNNEST($1::text[], $2::int4[])")
    .bind(&names).bind(&ages).execute(&pool).await?;

3. RETURNING + 批量

rust
let new_ids: Vec<i32> = sqlx::query_scalar(
    "INSERT INTO logs (level, message) SELECT * FROM UNNEST($1::text[], $2::text[]) RETURNING id"
).bind(&levels).bind(&messages).fetch_all(&pool).await?;

一次 INSERT + UNNEST 批插入 N 行、用 RETURNING 拿回新 ID——比 "N 次单 INSERT" 快 10-100 倍、比 INSERT ... VALUES 批量快 3-5 倍。

4. 自定义类型 / Enum

rust
#[derive(sqlx::Type)]
#[sqlx(type_name = "user_role", rename_all = "lowercase")]
enum UserRole { Admin, User, Guest }

Postgres CREATE TYPE user_role AS ENUM ('admin', 'user', 'guest') + Rust enum——类型安全贯穿到 DB schema。

5. LISTEN/NOTIFY 做消息总线

rust
let mut listener = PgListener::connect(&pool).await?;
listener.listen_all(["channel_a", "channel_b"]).await?;

loop {
    let n = listener.recv().await?;
    match n.channel() {
        "channel_a" => { /* ... */ }
        "channel_b" => { /* ... */ }
        _ => {}
    }
}

Postgres 作为轻量消息队列——不需要 Kafka 的场景够用。

6. Advisory Lock

rust
let lock = sqlx::query("SELECT pg_advisory_lock($1)").bind(1234).execute(&pool).await?;
// ... 独占临界区 ...
sqlx::query("SELECT pg_advisory_unlock($1)").bind(1234).execute(&pool).await?;

App 级分布式锁——跨 Rust 进程的互斥。sqlx 的 PgAdvisoryLock 封装了这个。

这六条 Postgres 独特性是**"选 Postgres 而不是 MySQL"**的核心理由——也是 sqlx-postgres 花大力气支持的特性。用 sqlx-postgres 就能享受 Rust 类型安全 + Postgres 功能强——两全其美。

16.23 源码精读:wait_until_ready 的精妙

值得单独精读的一段代码是 PgConnection::wait_until_readyconnection/mod.rs 约 82-100 行):

rust
pub(crate) async fn wait_until_ready(&mut self) -> Result<(), Error> {
    if !self.inner.stream.write_buffer_mut().is_empty() {
        self.inner.stream.flush().await?;
    }

    while self.inner.pending_ready_for_query_count > 0 {
        let message = self.inner.stream.recv().await?;
        if let BackendMessageFormat::ReadyForQuery = message.format {
            self.handle_ready_for_query(message)?;
        }
    }
    Ok(())
}

三条关键决策

1. 先 flush 再 recv——保证之前 write 的消息送出去才能收到响应。否则可能死锁(server 等消息、client 等响应)。

2. 只消耗 ReadyForQuery——循环里只处理 Z 消息,其他消息被 recv 内部过滤(NoticeResponse / NotificationResponse 等)或返回但被 while 循环忽略。

3. handle_ready_for_query 减 count + 更新 transaction_status

rust
fn handle_ready_for_query(&mut self, message: ReceivedMessage) -> Result<(), Error> {
    self.inner.pending_ready_for_query_count -= 1;
    self.inner.transaction_status = message.decode::<ReadyForQuery>()?.transaction_status;
    Ok(())
}

这条方法是连接状态机的核心——每个 Z 消息触发一次、同步 client 和 server 的事务状态视图。

wait_until_ready 是每次 query 的入口——保证"开始新 query 前清理残留状态"。这条保守做法让 sqlx 的连接状态永远一致——不会出现"server 还在处理上条 query、client 就发下一条"的错误。

如果 sqlx 实现 pipelining、这条方法要重写——允许有 pending ReadyForQuery 的情况下发新 query。0.8 没做——保守、但正确。

16.24 一条协议级排查案例

真实生产问题:某 handler 偶发"unexpected message type"错误

排查路径

Step 1:查错误上下文——发生在 stream.recv() 返回时。

Step 2:抓网络包(tcpdump + wireshark)——发现 client 发了 Bind + Execute 但 server 响应 ErrorResponse(SQL 有语法错)。

Step 3:检查代码——query("SELECT ... WHERE id = ?")——用了 ? 占位符(MySQL 语法)而不是 $1(Postgres)。SQL 语法错、server 回 ErrorResponse、后续 Bind 发现 statement 无效。

Step 4:修复——改用 $1

这条 bug 的关键本质是协议层的——如果 sqlx 有更智能的语法校验(把 ? 翻译成 $N),这条问题捕获会更早。但 sqlx 不做这层翻译——SQL 就是字面量传给 server。

类似的协议层排查在生产中不常见(大多数 bug 在业务层)——但遇到时你要能用 tcpdump + 协议知识追踪——本章提供的协议理解就是工具。

16.25 Postgres 协议的未来

Postgres 协议一直在缓慢演进——几个 ongoing 讨论:

  • Async notification 加强——改善 LISTEN 的时效性。
  • Pipelining 标准化——目前 psql 实验性支持、未来可能普及。
  • Binary protocol 增强——更紧凑的编码(尤其是数组和复合类型)。
  • v4 协议讨论——长期计划、可能 2030+。

sqlx-postgres 的维护者会跟进新协议——但节奏不快(Postgres 协议稳定、新特性渐进)。读者如果想持续关注、推荐 watch launchbadge/sqlx GitHub 的 CHANGELOG 和 discussion。

但对大多数 Rust 后端用户——当前的 sqlx-postgres 足够稳定 5-10 年。你今天投入理解它的时间不会过时。这是选择 sqlx 作为生产栈的一条理由——技术栈稳定

16.26 Postgres 驱动代码品质的七条观察

读完整个 sqlx-postgres 的源码,有七条代码品质观察可以总结:

1. 注释密度适中。协议相关代码(message / 握手)注释较多、引用协议文档链接;普通业务代码较少注释。这让"要理解的地方有解释、不需要的地方不冗余"。

2. 错误类型层次清晰sqlx::Error::Database(Box<PgDatabaseError>)——顶层 Error enum + DB-specific detail。用户能按需匹配具体错误(比如 e.is_unique_violation())也能当 generic 错误处理。

3. 内部 API 用 pub(crate)——不对外公开、但 driver 内各文件可用。严格区分内部细节和公开接口。

4. unsafe 极少。整个 sqlx-postgres 几乎零 unsafe——所有复杂度都在 safe Rust 里完成。复杂的 async 状态机靠类型系统 + RAII 保证、不依赖 unsafe hack。

5. 合理使用 #[cfg(feature = "...")]——chrono / uuid / time 等类型的支持通过 feature gate 可选加入。用户不用的 feature 不拉依赖。

6. 错误消息对生产友好err_protocol!("PgConnection::run(): too many arguments for query: {}", args.len())——错误里带具体数字、方便排查。不是"something went wrong"这种无用错误。

7. 单元测试覆盖关键 helper。SCRAM 计算、message encode / decode、类型映射——都有 unit test 验证。集成测试用真 Postgres 数据库。

这七条品质观察让 sqlx-postgres 维护 7 年保持稳定——代码质量是长期维护的基石。你自己写 Rust 库时这七条都值得遵守。

16.27 Postgres 驱动和业务代码的接口

本章末尾的一个哲学观察——sqlx-postgres 和业务代码的接口极简

  • 业务代码只看到 PgPool / PgConnection / PgRow / PgTypeInfo——几个顶层类型。
  • 内部的 19841 行代码全部隐藏——业务开发者不需要知道 Parse / Bind / Execute 消息、不需要知道 pending_ready_for_query_count、不需要知道 SCRAM 挑战响应。

这种**"复杂实现 + 简洁接口"**就是好的抽象的标志——用户享受功能、不被细节淹没。sqlx-postgres 做到了这一点——即使内部结构复杂、对外使用非常轻松。

从 API 设计角度看、这条原则值得你在自己项目里应用——接口的简洁程度和实现的复杂度成反比才是好的抽象。简陋实现给简陋接口、复杂实现也给复杂接口——都不如 sqlx-postgres 这种"内部繁复、外部清爽"。

第 17 章 MySQL 驱动的篇幅会短很多——因为你现在已经有了 sqlx-postgres 的参照系、重点放在差异上足以讲清楚 MySQL。这是阶梯式深入的学习方法——先读透一个复杂的、后面类似的就快了。

16.28 BackendMessageFormat:21 个变体撑起 Postgres 的所有响应

Postgres 服务器可以发 21 种不同的后端消息—— sqlx 用一个 enum 承载全部(sqlx-postgres/src/message/mod.rs:90-114):

rust
#[repr(u8)]
pub enum BackendMessageFormat {
    Authentication, BackendKeyData, BindComplete, CloseComplete,
    CommandComplete, CopyData, CopyDone, CopyInResponse, CopyOutResponse,
    DataRow, EmptyQueryResponse, ErrorResponse, NoData, NoticeResponse,
    NotificationResponse, ParameterDescription, ParameterStatus,
    ParseComplete, PortalSuspended, ReadyForQuery, RowDescription,
}

tag 字节与 enum 的映射:145-165)—— tag 字节是 ASCII 字符:'R' → Authentication、'K' → BackendKeyData、'1' → ParseComplete、'2' → BindComplete、'D' → DataRow、'Z' → ReadyForQuery、'E' → ErrorResponse……

Postgres 协议选 ASCII 可见字符做 tag 的好处——wireshark 抓包直接读得懂——D 看就知道是 DataRow——不需要查表——诊断友好

类型安全的 decode:122-143):

rust
pub fn decode<T: BackendMessage>(self) -> Result<T, Error> {
    if T::FORMAT != self.format {
        return Err(err_protocol!(
            "Postgres protocol error: expected {:?}, got {:?}",
            T::FORMAT, self.format));
    }
    T::decode_body(self.contents).map_err(...)
}

T::FORMAT 是每个消息类型的关联常量—— 编译期确定"我这个 struct 对应哪个 tag"—— decode 时 runtime 核对 tag 和类型是否匹配—— 不匹配直接 protocol error 而不是 panic—— 错误路径优雅

这个 FORMAT 常量 + decode_body 方法 的 pattern 把 Rust 类型系统和 wire 协议串起来——每个 Rust struct 声明"我是 ParseComplete 消息"、FORMAT 常量固定、decode 逻辑独立—— 21 个消息 21 个 struct + 21 个 FORMAT 常量—— 数据和行为对齐到单一源

16.29 startup 握手的 SSL 协商

Postgres 连接建立时可选 SSL/TLS 加密——sqlx 的 establish.rs 有一个特殊的pre-startup 阶段

  1. 客户端发 SslRequestsqlx-postgres/src/message/ssl_request.rs)—— 一个 8 字节特殊包(长度 + magic number 80877103)。
  2. 服务器回单字节—— 'S'(支持 SSL)或 'N'(不支持)。
  3. 客户端按回答决定是否 upgrade 到 TLS。
  4. 之后才进入正常的 StartupMessage 发送。

为什么 SSL 协商走这种怪异单字节协议

历史遗留—— Postgres 协议比 TLS 还早—— 没有预留 TLS negotiation 字段—— 只能hijack 一个 magic 数字—— 在 startup 前插入一个"你支持 TLS 吗"的 query—— 用和后续 Startup 不同的 magic 区分。

sqlx-postgres 在代码里小心处理这个 pre-startup 阶段—— 如果服务器回 'S' 就用 rustls/native-tls 包装 socket、再发 StartupMessage—— 整个过程对上层(query API)完全透明

这个设计 sqlx-mysql 没有—— MySQL 的 SSL 协商做在 Handshake V10 的 capabilities 位图里—— 同一条连接、同一个 packet 协商—— 更干净。两种设计反映了两家 DB 的协议演进历史

16.30 Parse 消息的源码结构:body_size_hint 的意义

Parse 消息的 Rust 实现(sqlx-postgres/src/message/parse.rs:8-21):

rust
pub struct Parse<'a> {
    pub statement: StatementId,
    pub query: &'a str,
    pub param_types: &'a [Oid],
}

三个字段——statement name、SQL string、参数类型 OID 数组——对应 Postgres 协议 Parse 消息的三段 body。

FrontendMessage trait 实现:22-62)—— 除了 const FORMAT: FrontendMessageFormat 外、有一个关键方法 body_size_hint(&self) -> Saturating<usize>

rust
fn body_size_hint(&self) -> Saturating<usize> {
    let mut size = Saturating(0);
    size += self.statement.name_len();
    size += self.query.len();
    size += 1; // NUL terminator
    size += 2; // param_types_len
    size += self.param_types.len().saturating_mul(4);
    size
}

为什么这么重要——Postgres 协议每个 message 以 tag(1) + length(4) + body 开头—— length 必须是body 字节数——sqlx 用 body_size_hint 预先计算 body 大小预分配 buf、再一次 write—— 避免"先 write body 再 rewind 回头填 length"的麻烦操作。

Saturating<usize> 是标准库的饱和整数—— 避免溢出 panic—— 无参数的 SQL 5KB 的 query 也不会冒泡 bug—— 防御性编程

参数类型 OID 的 u16 限制:46-51):

rust
let param_types_len = u16::try_from(self.param_types.len()).map_err(|_| {
    err_protocol!("param_types.len() too large for binary protocol: ...")
})?;

Postgres 协议限制单 query 最多 65535 个参数—— sqlx 在 encode 前校验、超限直接 protocol error——这条显式限制 是生产代码的老练之处:不要等服务器回 ErrorResponse(错误信息可能含糊)、自己先查清楚并给出明确错误。

这个 body_size_hint + encode_body 两段式 encode 是 sqlx-postgres 所有 FrontendMessage 的统一模式——Parse / Bind / Execute / Sync / Close / Describe 都遵循—— 21 类消息 21 份类似实现—— 代码一致性让维护者从单个消息的代码 pattern 推断其他消息。

16.31 Execute 消息的 limit 字段:默认为 0 的深意

源码sqlx-postgres/src/message/execute.rs:8-15):

rust
pub struct Execute {
    pub portal: PortalId,
    pub limit: u32,
}

两字段—— portal name + 整数 limit。但 limit 字段的文档(:11-13值得注意

Maximum number of rows to return, if portal contains a query that returns rows (ignored otherwise). Zero denotes "no limit".

Postgres 协议允许 Execute 带行数限制—— 客户端可以执行一个 portal、只拿前 N 行—— N 行后portal 暂停、下一次 Execute(same portal) 继续—— 类似数据库 server-side cursor。

但 sqlx 几乎永远用 limit = 0sqlx-postgres/src/connection/executor.rs:252-256 附近有注释)—— 为什么?

Non-zero limits cause query plan pessimization by disabling parallel workers

关键决策—— Postgres planner 看到 Execute 带非零 limit 会禁用并行执行(以为客户端只要少量行、并行反而开销大)—— 结果整个 query 变慢—— 得不偿失。

sqlx 的默认策略—— 发 limit = 0(无限制)、让 server 并行 plan、自己在客户端逐行消费—— 并行优化收益 > server-side cursor 收益

这种**"协议提供的功能不一定用"** 的判断是 sqlx 作为生产级库的成熟之处—— 协议 feature 是机会、但怎么用要看 实测权衡limit = 0 这个默认值背后是一次性能测试 + 一次文档注释—— 生产代码里不起眼、但决定你的 query p99。

下一章 MySQL 驱动:和 Postgres 协议结构类似但细节全部不同。

基于 VitePress 构建