DML 進階操作:Upsert、MERGE、RETURNING 與 COPY 批量匯入 | PostgreSQL
DML(Data Manipulation Language) 是與資料庫讀寫互動的核心語言。PostgreSQL 在標準 SQL 的基礎上提供了豐富的進階功能——INSERT ON CONFLICT(Upsert) 解決衝突處理、MERGE 實現多條件分支、RETURNING 避免額外查詢、可寫 CTE 串接複雜資料流、COPY 提供最快的批量匯入,以及 Lateral Join 解決 Top-N per group 等經典問題。
DML 與 MVCC 的互動
PostgreSQL 的 DML 操作與 MVCC 緊密結合。每個操作都會影響 Tuple 的版本資訊:
| 操作 | xmin | xmax | 說明 |
|---|---|---|---|
| INSERT | 設為當前 XID | 0 | 新建 Tuple |
| UPDATE | 新 Tuple 設為當前 XID | 舊 Tuple 設為當前 XID | 本質是 DELETE + INSERT |
| DELETE | 不變 | 設為當前 XID | 標記刪除,不立即移除 |
-- 觀察 Tuple 版本資訊
SELECT xmin, xmax, ctid, id, name
FROM users
WHERE id = 1;
UPDATE 不是原地修改,而是「插入新版本 + 標記舊版本」。這是 PostgreSQL 需要 VACUUM 清理 Dead Tuple 的根本原因。
HOT Update:若更新的欄位不在任何索引中,且新舊 Tuple 在同一個 Page 上,PostgreSQL 會使用 HOT(Heap-Only Tuple)Update,不新增索引條目,減少索引膨脹。
INSERT ON CONFLICT(Upsert)
ON CONFLICT 是 PG9.5 引入的 Upsert 語義,在 INSERT 遇到唯一約束衝突時,選擇忽略或更新:
DO NOTHING — 衝突時靜默忽略
-- 若 email 已存在,忽略本次插入
INSERT INTO users (email, name)
VALUES ('alice@example.com', 'Alice')
ON CONFLICT (email)
DO NOTHING;
DO UPDATE — 衝突時更新
-- 若 email 已存在,更新 name 與 updated_at
INSERT INTO users (email, name, updated_at)
VALUES ('alice@example.com', 'Alice Updated', NOW())
ON CONFLICT (email)
DO UPDATE SET
name = EXCLUDED.name,
updated_at = EXCLUDED.updated_at
WHERE users.updated_at < EXCLUDED.updated_at; -- 可加條件過濾
EXCLUDED 是一個偽表(pseudo-table),代表「本次試圖插入但被衝突擋下的那筆資料」。
複合唯一索引的 Upsert
-- 建立複合唯一索引
CREATE UNIQUE INDEX ON product_prices (product_id, currency_code);
-- Upsert 需在 ON CONFLICT 中列出所有索引欄位
INSERT INTO product_prices (product_id, currency_code, price)
VALUES (42, 'USD', 99.99)
ON CONFLICT (product_id, currency_code)
DO UPDATE SET price = EXCLUDED.price;
部分唯一索引的 Upsert
-- 建立部分唯一索引
CREATE UNIQUE INDEX users_email_active_idx
ON users (email) WHERE active = true;
-- ON CONFLICT 需指定相同的 WHERE 條件
INSERT INTO users (email, name, active)
VALUES ('dave@example.com', 'Dave', true)
ON CONFLICT (email) WHERE active = true
DO UPDATE SET name = EXCLUDED.name;
MERGE(PG15+)
MERGE 是 SQL:2003 標準指令,根據來源與目標的匹配結果,在單一語句中執行不同的 DML 分支:
MERGE 執行流程:
Source 每一行
│
├─ 在 Target 找到匹配 → WHEN MATCHED
│ ├─ 條件 A 成立 → UPDATE
│ ├─ 條件 B 成立 → DELETE
│ └─ 無條件 → DO NOTHING
│
└─ 在 Target 找不到 → WHEN NOT MATCHED
└─ INSERT
實戰:同步 staging 到 production
MERGE INTO products AS p
USING product_staging AS s
ON (p.sku = s.sku)
-- 已存在且有變動 → 更新
WHEN MATCHED AND (p.price <> s.price OR p.stock <> s.stock) THEN
UPDATE SET
price = s.price,
stock = s.stock,
updated_at = NOW()
-- 已存在且無變動 → 不做事
WHEN MATCHED THEN
DO NOTHING
-- 不存在 → 新增
WHEN NOT MATCHED THEN
INSERT (sku, price, stock, created_at)
VALUES (s.sku, s.price, s.stock, NOW());
PG17:MERGE RETURNING
MERGE INTO products AS p
USING product_staging AS s ON (p.sku = s.sku)
WHEN MATCHED THEN
UPDATE SET price = s.price
WHEN NOT MATCHED THEN
INSERT (sku, price) VALUES (s.sku, s.price)
RETURNING merge_action(), p.sku, p.price;
-- merge_action() 回傳 'INSERT' 或 'UPDATE'
Upsert vs MERGE 比較
| 特性 | INSERT ON CONFLICT | MERGE(PG15+) |
|---|---|---|
| 標準 SQL | 非標準(PG 擴充) | SQL:2003 標準 |
| 條件分支 | 單一衝突處理 | 多分支(MATCHED/NOT MATCHED/DELETE) |
| DELETE 支援 | 不支援 | 支援 |
| RETURNING | 支援 | PG17+ 支援 |
| 效能 | 通常更快 | 更靈活,適合複雜同步 |
| 並行安全 | 內建 conflict detection | 需注意 race condition |
RETURNING 子句
INSERT、UPDATE、DELETE 都支援 RETURNING,直接回傳受影響的 Tuple,避免額外的 SELECT:
-- INSERT RETURNING:取得自動生成的 id
INSERT INTO users (email, name)
VALUES ('charlie@example.com', 'Charlie')
RETURNING id, created_at;
-- UPDATE RETURNING:取得更新後的值
UPDATE products
SET price = price * 1.1
WHERE category = 'electronics'
RETURNING id, name, price AS new_price;
-- DELETE RETURNING:取得被刪除的資料
DELETE FROM sessions
WHERE expires_at < NOW()
RETURNING user_id, token, expires_at;
RETURNING 的內部行為:
- INSERT RETURNING:回傳新寫入的 Tuple
- UPDATE RETURNING:回傳更新後的新版本
- DELETE RETURNING:回傳刪除前的舊版本(Tuple 仍在 heap 中,只是標記 xmax)
可寫 CTE(Writeable CTE)
可寫 CTE 允許在 WITH 區塊中包含 DML,串接複雜的資料流於單一 SQL 語句:
-- 將過期訊息歸檔 + 更新使用者計數,一條 SQL 完成
WITH
-- 步驟 1:刪除過期訊息
expired AS (
DELETE FROM messages
WHERE created_at < NOW() - INTERVAL '90 days'
RETURNING id, user_id, content, created_at
),
-- 步驟 2:歸檔
archived AS (
INSERT INTO message_archive (id, user_id, content, created_at, archived_at)
SELECT id, user_id, content, created_at, NOW()
FROM expired
RETURNING id
),
-- 步驟 3:更新計數
updated_counts AS (
UPDATE user_stats us
SET message_count = message_count - sub.cnt
FROM (
SELECT user_id, COUNT(*) AS cnt
FROM expired
GROUP BY user_id
) sub
WHERE us.user_id = sub.user_id
RETURNING us.user_id, us.message_count
)
-- 回傳統計
SELECT
(SELECT COUNT(*) FROM archived) AS archived_messages,
(SELECT COUNT(*) FROM updated_counts) AS updated_users;
重要語義:所有 CTE 分支看到的是語句執行前的資料快照,彼此之間的修改不互相影響。
條件式更新與紀錄
-- 扣款並記錄交易流水
WITH deducted AS (
UPDATE accounts
SET balance = balance - 500
WHERE user_id = 1 AND balance >= 500
RETURNING user_id, balance AS new_balance
)
INSERT INTO transaction_log (user_id, amount, type, balance_after, created_at)
SELECT user_id, 500, 'debit', new_balance, NOW()
FROM deducted;
-- 若 deducted 為空(餘額不足),INSERT 也不執行
COPY:最快的批量匯入匯出
COPY 是 PostgreSQL 的伺服器端批量傳輸指令,繞過大部分 SQL 解析成本:
-- 從 CSV 匯入(伺服器端路徑)
COPY users (id, email, name, created_at)
FROM '/var/data/users.csv'
WITH (FORMAT csv, HEADER true, DELIMITER ',', NULL '');
-- 匯出至 CSV
COPY (SELECT * FROM users WHERE active = true)
TO '/var/data/active_users.csv'
WITH (FORMAT csv, HEADER true);
\copy 是 psql 的客戶端指令,路徑為本地路徑,不需要伺服器端檔案存取權限:
# psql 中執行
\copy users (id, email, name) FROM '/Users/me/users.csv' CSV HEADER
\copy (SELECT * FROM reports) TO '/Users/me/report.csv' CSV HEADER
批量匯入效能比較
| 方法 | 相對速度 | 適用場景 |
|---|---|---|
| COPY FROM file | 最快 | 伺服器端大量資料 |
\copy(psql) | 快 | 客戶端本地檔案 |
| unnest 批量 | 中快 | 程式端陣列傳入 |
| INSERT…VALUES 多行 | 中 | 程式產生的中等量資料 |
| 逐行 INSERT | 慢 | 不推薦用於批量 |
-- 多行 VALUES 批量插入(建議每批 500-1000 行)
INSERT INTO events (user_id, event_type, payload)
VALUES
(1, 'click', '{"btn": "submit"}'),
(2, 'view', '{"page": "/home"}'),
(3, 'click', '{"btn": "cancel"}');
-- unnest 批量插入(適合程式傳入陣列)
INSERT INTO events (user_id, event_type)
SELECT * FROM unnest(
ARRAY[1, 2, 3]::int[],
ARRAY['click', 'view', 'click']::text[]
) AS t(user_id, event_type);
Lateral Join
LATERAL 允許右側子查詢引用左側表的欄位,本質是「對左側每一行,執行一次相關子查詢」:
Top-N per group(最經典應用)
-- 每個分類取最貴的 3 個商品
SELECT c.name AS category, p.name AS product, p.price
FROM categories c
JOIN LATERAL (
SELECT name, price
FROM products
WHERE category_id = c.id
ORDER BY price DESC
LIMIT 3
) p ON true
ORDER BY c.name, p.price DESC;
搭配 (category_id, price DESC) 複合索引,Lateral Join 可以非常高效地利用索引掃描。
展開 JSONB 陣列
-- 展開 jsonb 陣列的每個元素
SELECT o.id AS order_id,
item->>'name' AS item_name,
(item->>'qty')::int AS qty
FROM orders o,
LATERAL jsonb_array_elements(o.items) AS item;
時間序列對照
-- 每個使用者:最近一次登入 + 登入後的第一個事件
SELECT u.id, u.email, last_login.logged_at, next_event.event_type
FROM users u
JOIN LATERAL (
SELECT logged_at
FROM login_history
WHERE user_id = u.id
ORDER BY logged_at DESC
LIMIT 1
) last_login ON true
LEFT JOIN LATERAL (
SELECT event_type, created_at
FROM user_events
WHERE user_id = u.id
AND created_at > last_login.logged_at
ORDER BY created_at ASC
LIMIT 1
) next_event ON true;
TRUNCATE vs DELETE
| 特性 | TRUNCATE | DELETE(無 WHERE) |
|---|---|---|
| SQL 分類 | DDL | DML |
| Dead Tuple | 不產生 | 產生,需 VACUUM |
| WHERE 條件 | 不支援 | 支援 |
| RETURNING | 不支援 | 支援 |
| Trigger | 不觸發 DELETE Trigger | 觸發 |
| 鎖定層級 | ACCESS EXCLUSIVE | ROW EXCLUSIVE |
| 速度 | O(1) 極快 | O(n) 與資料量成正比 |
| 序列重置 | RESTART IDENTITY 可重置 | 不影響 |
-- TRUNCATE 並重置自增 ID
TRUNCATE TABLE staging_data RESTART IDENTITY;
-- 危險!CASCADE 會自動 TRUNCATE 外鍵連結的子表
TRUNCATE TABLE orders CASCADE;
-- 較安全:明確列出所有目標表
TRUNCATE TABLE order_items, orders;
Bulk Insert 效能調校
大量資料匯入時的最佳化步驟:
-- 1. 停用 autovacuum(避免匯入中途觸發)
ALTER TABLE big_table SET (autovacuum_enabled = false);
-- 2. 刪除非必要索引(匯入後重建更快)
DROP INDEX IF EXISTS big_table_search_idx;
-- 3. 暫時停用觸發器
ALTER TABLE big_table DISABLE TRIGGER ALL;
-- 4. 設定 UNLOGGED(可接受 crash risk 時)
ALTER TABLE big_table SET UNLOGGED;
-- 5. 執行 COPY
COPY big_table FROM '/data/big_data.csv' CSV HEADER;
-- 6. 恢復設定
ALTER TABLE big_table SET LOGGED;
ALTER TABLE big_table ENABLE TRIGGER ALL;
CREATE INDEX big_table_search_idx ON big_table (search_col);
ALTER TABLE big_table SET (autovacuum_enabled = true);
-- 7. 更新統計
ANALYZE big_table;
相關參數調校:
-- 關閉同步提交提升吞吐量(非關鍵寫入可接受)
SET synchronous_commit = off;
-- 批量操作...
RESET synchronous_commit;
-- 提升 maintenance_work_mem 加速索引重建
SET maintenance_work_mem = '1GB';
CREATE INDEX big_table_col_idx ON big_table (col);
RESET maintenance_work_mem;
版本演進
| 版本 | 新增功能 | 說明 |
|---|---|---|
| PG9.5 | INSERT ON CONFLICT | 首次支援原生 Upsert(DO NOTHING / DO UPDATE) |
| PG12 | 可寫 CTE 語義調整 | WITH 中的 DML 更可預期 |
| PG14 | Bulk Insert 效能改善 | COPY 的 WAL 最佳化 |
| PG15 | MERGE | SQL:2003 標準,多分支條件處理 |
| PG16 | COPY progress reporting | pg_stat_progress_copy 改善 |
| PG17 | MERGE RETURNING | merge_action() + COPY ON_ERROR 選項 |
常見陷阱
ON CONFLICT 需要唯一約束
ON CONFLICT 的 conflict_target 必須對應唯一約束或唯一索引。如果引用不存在的約束,會直接報錯。部分索引(Partial Index)的 Upsert 需在 ON CONFLICT 指定相同的 WHERE 條件。
COPY 錯誤行處理
COPY 遇到格式錯誤或約束違反時,整批操作全部回滾。解決方案是先 COPY 到臨時表,再用 INSERT...ON CONFLICT DO NOTHING 過濾:
CREATE TEMP TABLE staging_import (LIKE target_table);
COPY staging_import FROM '/data/input.csv' CSV HEADER;
INSERT INTO target_table
SELECT * FROM staging_import
ON CONFLICT DO NOTHING;
TRUNCATE CASCADE 的隱式風險
TRUNCATE ... CASCADE 會自動清除所有外鍵連結的子表,且不需要明確列出。在生產環境中,可能意外清除多個關聯表。建議明確列出所有目標表。
總結
DML 進階操作 是 PostgreSQL 資料處理的核心能力:
- INSERT ON CONFLICT 提供原生 Upsert,解決 SELECT-then-INSERT 的 race condition
- MERGE(PG15+)實現多條件分支的資料同步,語義比 Upsert 更豐富
- RETURNING 避免額外 SELECT,搭配可寫 CTE 串接複雜資料流
- COPY 是最快的批量匯入方式,搭配 UNLOGGED + 索引重建可達最佳效能
- Lateral Join 解決 Top-N per group 等需要「逐行相關子查詢」的場景
- TRUNCATE 清空速度極快但要注意 CASCADE 風險
下一篇,我們將深入探討 索引類型與設計——PostgreSQL 如何透過 B-Tree、Hash、GiST、GIN、BRIN 等多種索引類型加速查詢。