Django + Celery 進階:排程任務、重試與監控 | Django 教學
在上一篇入門教學中,我們學會了 Celery 的基礎架構與任務定義。本篇將深入探討生產環境中不可或缺的進階功能:透過 Celery Beat 與 django-celery-beat 實現 排程任務(Scheduled Tasks)、設計健壯的 任務重試策略(Retry Strategy) 搭配 指數退避(Exponential Backoff)、使用 Chain、Group、Chord 組合複雜的 任務工作流(Task Workflow)、配置 任務優先級與多佇列(Multi-Queue)、透過 Flower 監控工具即時掌握任務狀態,以及 冪等性(Idempotency) 設計原則與生產環境最佳實踐。
Celery Beat 定時任務
Celery Beat 是 Celery 內建的排程器(Scheduler),負責在指定的時間自動觸發任務,功能類似 Linux 的 crontab,但與 Django 深度整合。
靜態排程:CELERY_BEAT_SCHEDULE
最簡單的方式是在 settings.py 中定義排程:
# settings.py
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
# 每天凌晨 2:00 清理過期 Session
'cleanup-expired-sessions': {
'task': 'apps.users.tasks.cleanup_expired_sessions',
'schedule': crontab(hour=2, minute=0),
},
# 每 5 分鐘同步外部資料
'sync-external-data': {
'task': 'apps.data.tasks.sync_data',
'schedule': 300.0, # 秒數
'args': ('source_a',), # 任務參數
},
# 每週一早上 8:00 發送週報
'weekly-report': {
'task': 'apps.reports.tasks.send_weekly_report',
'schedule': crontab(hour=8, minute=0, day_of_week=1),
},
# 每月 1 號凌晨 3:00 生成月報
'monthly-report': {
'task': 'apps.reports.tasks.generate_monthly_report',
'schedule': crontab(hour=3, minute=0, day_of_month=1),
},
}
crontab() 支援與 Linux crontab 相同的時間表達式:
| 參數 | 說明 | 範例 |
|---|---|---|
minute | 分鐘(0-59) | crontab(minute=0) 每小時整點 |
hour | 小時(0-23) | crontab(hour=2, minute=0) 每天凌晨 2 點 |
day_of_week | 星期幾(0=週一, 6=週日) | crontab(day_of_week=1) 每週一 |
day_of_month | 日期(1-31) | crontab(day_of_month=1) 每月 1 號 |
month_of_year | 月份(1-12) | crontab(month_of_year='1,7') 1 月和 7 月 |
動態排程:django-celery-beat
靜態排程的缺點是修改排程需要改程式碼並重啟服務。django-celery-beat 將排程資料存入資料庫,讓你透過 Django Admin 動態管理排程,不需要重新部署。
# 安裝
pip install django-celery-beat
# settings.py
INSTALLED_APPS = [
# ... 其他 App
'django_celery_beat',
]
# 指定 Beat 使用資料庫排程器
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# 建立資料表
python manage.py migrate django_celery_beat
設定完成後,在 Django Admin 中你會看到 Periodic Tasks 區塊,可以直接在後台新增、修改、啟用或停用排程任務,完全不需要修改程式碼。
啟動 Celery Beat
Celery Beat 需要獨立啟動一個程序:
# 啟動 Beat 排程器
celery -A myproject beat --loglevel=info
# 同時啟動 Worker 和 Beat(開發環境適用)
celery -A myproject worker --beat --loglevel=info
重要提醒:Celery Beat 只能啟動一個實例。如果同時運行多個 Beat 程序,排程任務會被重複觸發。在多台機器部署時,務必確保只有一台機器啟動 Beat。
任務重試策略
在生產環境中,任務失敗是不可避免的:網路瞬斷、第三方 API 暫時不可用、資料庫連線逾時等。設計良好的重試策略是確保任務最終成功執行的關鍵。
自動重試:autoretry_for
autoretry_for 是最方便的宣告式重試方式,指定要重試的例外類別即可:
from celery import shared_task
import requests
@shared_task(
autoretry_for=(requests.RequestException, ConnectionError),
retry_backoff=True, # 啟用指數退避
retry_backoff_max=600, # 最大退避時間 600 秒
retry_jitter=True, # 加入隨機抖動,避免多個任務同時重試
max_retries=5, # 最多重試 5 次
)
def call_external_api(url: str, payload: dict) -> dict:
"""呼叫外部 API,失敗時自動重試"""
response = requests.post(url, json=payload, timeout=30)
response.raise_for_status()
return response.json()
啟用 retry_backoff=True 後,重試的等待時間會按指數增長:
| 重試次數 | 等待時間(約) |
|---|---|
| 第 1 次 | 1 秒 |
| 第 2 次 | 2 秒 |
| 第 3 次 | 4 秒 |
| 第 4 次 | 8 秒 |
| 第 5 次 | 16 秒 |
加上 retry_jitter=True 後,每次等待時間會加入隨機偏移,避免多個失敗任務在同一時刻同時重試(又稱為 Thundering Herd(雷群問題))。
手動重試:self.retry()
當你需要根據不同的例外類型做出不同的處理決策時,使用手動重試:
@shared_task(bind=True, max_retries=3)
def process_payment(self, order_id: int):
"""處理付款,根據錯誤類型決定是否重試"""
from apps.orders.models import Order
try:
order = Order.objects.get(pk=order_id)
result = payment_gateway.charge(order.amount, order.payment_token)
order.status = 'paid'
order.save()
return {'status': 'paid', 'order_id': order_id}
except PaymentGatewayTimeoutError as exc:
# 網路逾時:重試,使用指數退避
raise self.retry(
exc=exc,
countdown=2 ** self.request.retries, # 手動實作指數退避
)
except InsufficientFundsError:
# 餘額不足:不重試,直接標記失敗
order.status = 'payment_failed'
order.save()
return {'status': 'failed', 'reason': 'insufficient_funds'}
except InvalidPaymentTokenError:
# Token 無效:不重試,直接標記失敗
order.status = 'payment_failed'
order.save()
return {'status': 'failed', 'reason': 'invalid_token'}
retry_kwargs 進階設定
@shared_task(
bind=True,
autoretry_for=(Exception,),
max_retries=5,
retry_kwargs={'max_retries': 5}, # 也可以在這裡指定
retry_backoff=True,
)
def resilient_task(self, data: dict):
"""具備完整重試策略的任務"""
pass
任務工作流:Chain、Group、Chord
Celery 提供了 Canvas(畫布)API,讓你將多個任務組合成複雜的工作流。
Chain(任務鏈)
Chain 將多個任務串行執行,前一個任務的回傳值會自動傳遞給下一個任務作為第一個參數:
from celery import chain
# 定義任務
@shared_task
def fetch_data(source_url: str) -> dict:
"""步驟一:從外部來源取得資料"""
response = requests.get(source_url)
return response.json()
@shared_task
def process_data(raw_data: dict) -> dict:
"""步驟二:處理原始資料(接收上一步的回傳值)"""
processed = transform(raw_data)
return processed
@shared_task
def save_result(processed_data: dict) -> dict:
"""步驟三:儲存處理結果"""
Report.objects.create(data=processed_data)
return {'status': 'saved'}
# 串行執行:fetch → process → save
result = chain(
fetch_data.s('https://api.example.com/data'),
process_data.s(), # 自動接收 fetch_data 的回傳值
save_result.s(), # 自動接收 process_data 的回傳值
).delay()
也可以使用管道運算子 | 簡寫:
result = (fetch_data.s('https://api.example.com/data') | process_data.s() | save_result.s()).delay()
Group(任務群組)
Group 將多個任務並行執行,各自獨立、互不依賴:
from celery import group
# 並行發送 Email 給多位使用者
result = group(
send_email.s(user_id) for user_id in [1, 2, 3, 4, 5]
).delay()
# 等待所有任務完成,取得所有結果
all_results = result.get(timeout=60) # [result1, result2, ...]
Chord(和弦)
Chord 是 Group 的延伸:先並行執行一組任務,全部完成後,將所有結果傳給一個 callback(回呼函式):
from celery import chord
@shared_task
def process_item(item_id: int) -> dict:
"""處理單個項目"""
item = Item.objects.get(pk=item_id)
return {'id': item_id, 'result': item.compute()}
@shared_task
def aggregate_results(results: list) -> dict:
"""彙整所有結果"""
total = sum(r['result'] for r in results)
Summary.objects.create(total=total, count=len(results))
return {'total': total, 'count': len(results)}
# 並行處理所有項目,全部完成後彙整結果
result = chord(
group(process_item.s(item_id) for item_id in item_ids),
aggregate_results.s() # callback:接收所有結果的 list
).delay()
三種工作流的比較:
| 工作流 | 執行方式 | 適用場景 |
|---|---|---|
| Chain | 串行,前一個的結果傳給下一個 | ETL 流程、多步驟處理 |
| Group | 並行,各自獨立 | 批次發送通知、並行處理 |
| Chord | 並行 + callback | 分散計算後彙整結果 |
任務優先級與多佇列
在生產環境中,不同類型的任務有不同的優先級。Email 發送可能需要即時處理,但報表生成可以排在後面慢慢來。透過 多佇列(Multi-Queue) 機制,可以將不同優先級的任務分配到不同的佇列,由不同的 Worker 處理。
定義任務路由
# settings.py
CELERY_TASK_ROUTES = {
'apps.emails.tasks.*': {'queue': 'high_priority'},
'apps.reports.tasks.*': {'queue': 'low_priority'},
'apps.data.tasks.*': {'queue': 'default'},
}
# Redis 支援優先佇列(0-9,數字越小優先級越高)
CELERY_TASK_QUEUE_MAX_PRIORITY = 10
CELERY_TASK_DEFAULT_PRIORITY = 5
啟動專屬 Worker
# 高優先級佇列:分配較多 Worker
celery -A myproject worker -Q high_priority --concurrency=8 --loglevel=info
# 預設佇列
celery -A myproject worker -Q default --concurrency=4 --loglevel=info
# 低優先級佇列:分配較少 Worker
celery -A myproject worker -Q low_priority --concurrency=2 --loglevel=info
你也可以在呼叫任務時動態指定佇列:
# 將這個特定任務送到高優先級佇列
send_urgent_email.apply_async(
args=[user_id],
queue='high_priority',
)
Flower 監控工具
Flower 是 Celery 官方推薦的 Web 監控工具,提供即時的任務與 Worker 狀態監控介面。
安裝與啟動
# 安裝
pip install flower
# 啟動 Flower Web UI(預設 http://localhost:5555)
celery -A myproject flower --port=5555
# 設定帳號密碼保護(生產環境必須設定)
celery -A myproject flower \
--port=5555 \
--basic_auth=admin:your_secure_password
Flower 提供的功能
Flower 的 Web 介面提供以下監控能力:
- Workers 狀態:查看每個 Worker 的上線狀態、CPU 使用率、記憶體用量、已處理任務數
- 任務歷史:瀏覽所有已執行、執行中、失敗的任務列表,包含詳細的參數與錯誤訊息
- 即時監控:即時更新的任務執行圖表,包含成功率、失敗率、執行時間分布
- 佇列狀態:查看各佇列中等待執行的任務數量
- 任務操作:直接在介面上撤銷或重新執行任務
生產環境建議
在生產環境中,建議將 Flower 作為 systemd 服務運行,並透過 Nginx 反向代理加上 HTTPS:
# /etc/systemd/system/celery-flower.service
[Unit]
Description=Celery Flower
After=network.target
[Service]
Type=simple
User=deploy
WorkingDirectory=/path/to/project
ExecStart=/path/to/venv/bin/celery -A myproject flower \
--port=5555 \
--basic_auth=admin:password
Restart=always
[Install]
WantedBy=multi-user.target
冪等性設計
冪等性(Idempotency) 是指同一個操作執行一次和執行多次的結果完全相同。在 Celery 中,這個概念尤其重要,因為 Celery 採用 at-least-once(至少一次)保證機制,同一個任務可能因為網路問題、Worker 崩潰或 Broker 重新投遞等原因而執行多次。
冪等任務的設計模式
from django.db import transaction
from celery import shared_task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@shared_task(bind=True)
def process_order(self, order_id: int):
"""冪等任務設計:使用資料庫狀態防止重複處理"""
from apps.orders.models import Order
with transaction.atomic():
# select_for_update 取得資料庫行鎖,防止並發競爭
order = Order.objects.select_for_update().get(pk=order_id)
# 冪等性檢查:若已處理則直接返回
if order.status != 'pending':
logger.info(f'Order {order_id} 已處理(狀態:{order.status}),跳過')
return {'skipped': True, 'order_id': order_id}
# 執行業務邏輯
order.process()
order.status = 'processed'
order.save()
return {'processed': True, 'order_id': order_id}
冪等性設計原則
- 傳入唯一識別碼:傳入訂單 ID、使用者 ID 等唯一識別碼,而非操作指令(如「扣款 100 元」)
- 狀態檢查:在執行前檢查目標物件的狀態,避免重複操作
- 資料庫鎖:使用
select_for_update()防止並發競爭 - 原子操作:使用
transaction.atomic()確保操作的原子性 - 冪等的外部呼叫:呼叫第三方 API 時,使用其提供的冪等 key(如 Stripe 的 Idempotency Key)
@shared_task(bind=True)
def charge_payment(self, order_id: int):
"""使用冪等 key 呼叫付款 API"""
from apps.orders.models import Order
order = Order.objects.get(pk=order_id)
# 使用 order_id 作為冪等 key
# 即使此任務執行多次,付款 API 也只會收一次款
result = stripe.PaymentIntent.create(
amount=order.amount,
currency='twd',
idempotency_key=f'order_{order_id}', # 冪等 key
)
order.payment_intent_id = result.id
order.status = 'paid'
order.save()
return {'status': 'paid', 'order_id': order_id}
任務超時與記憶體保護
在生產環境中,必須為任務設定超時限制,避免單一任務卡住整個 Worker:
from billiard.exceptions import SoftTimeLimitExceeded
@shared_task(
soft_time_limit=300, # 300 秒後拋出 SoftTimeLimitExceeded(可捕捉)
time_limit=600, # 600 秒後強制 SIGKILL(無法捕捉)
)
def generate_large_report(report_id: int):
"""生成大型報表,設定超時保護"""
from apps.reports.models import Report
try:
report = Report.objects.get(pk=report_id)
# 耗時的報表生成邏輯
report.generate()
report.status = 'completed'
report.save()
except SoftTimeLimitExceeded:
# 優雅地處理超時:標記報表為失敗
Report.objects.filter(pk=report_id).update(status='timeout')
raise # 重新拋出,讓 Celery 記錄失敗
記憶體保護:透過 --max-tasks-per-child 參數,讓 Worker 在執行一定數量的任務後自動重啟,防止記憶體洩漏:
celery -A myproject worker \
--concurrency=4 \
--max-tasks-per-child=1000 \ # 每個子程序執行 1000 個任務後重啟
--prefetch-multiplier=1 \ # 每次只預取 1 個任務
-Ofair \ # 公平排程
--loglevel=info
生產環境最佳實踐
Worker 調優
# CPU 密集型任務:concurrency = CPU 核心數
celery -A myproject worker --concurrency=4 --loglevel=info
# I/O 密集型任務:使用 gevent,可提高併發數
celery -A myproject worker --concurrency=100 -P gevent --loglevel=info
| 場景 | Pool 模式 | concurrency 建議 |
|---|---|---|
| CPU 密集型(圖片處理) | prefork(預設) | CPU 核心數 |
| I/O 密集型(API 呼叫、Email) | gevent / eventlet | 50-200 |
| 開發除錯 | solo | 1 |
systemd 服務設定
在生產環境中,Worker 和 Beat 應作為系統服務運行:
# /etc/systemd/system/celery-worker.service
[Unit]
Description=Celery Worker
After=network.target redis.service
[Service]
Type=forking
User=deploy
WorkingDirectory=/path/to/project
ExecStart=/path/to/venv/bin/celery multi start worker \
-A myproject \
--concurrency=4 \
--max-tasks-per-child=1000 \
--loglevel=info \
--logfile=/var/log/celery/worker.log \
--pidfile=/var/run/celery/worker.pid
ExecStop=/path/to/venv/bin/celery multi stopwait worker \
--pidfile=/var/run/celery/worker.pid
Restart=always
[Install]
WantedBy=multi-user.target
# 啟用並啟動服務
sudo systemctl enable celery-worker
sudo systemctl start celery-worker
測試 Celery Tasks
# 方式一:CELERY_TASK_ALWAYS_EAGER(同步執行,不需啟動 Worker)
# settings_test.py
CELERY_TASK_ALWAYS_EAGER = True
CELERY_TASK_EAGER_PROPAGATES = True # 讓例外正常拋出
# 方式二:直接呼叫任務函式(不通過 Celery 基礎設施)
# tests/test_tasks.py
from unittest.mock import patch
import pytest
@pytest.mark.django_db
def test_send_welcome_email(user):
"""直接呼叫任務函式進行測試"""
with patch('django.core.mail.send_mail') as mock_mail:
send_welcome_email(user.id)
mock_mail.assert_called_once()
# 方式三:測試任務是否被正確觸發
def test_registration_triggers_email(client, db):
with patch('apps.users.tasks.send_welcome_email.delay') as mock_task:
client.post('/api/register/', {'username': 'test', 'password': 'secure123'})
mock_task.assert_called_once()
日誌與錯誤追蹤
# settings.py — 設定 Celery 日誌
CELERY_WORKER_HIJACK_ROOT_LOGGER = False # 不覆蓋 Django 的 logging 設定
LOGGING = {
'version': 1,
'handlers': {
'celery': {
'class': 'logging.FileHandler',
'filename': '/var/log/celery/tasks.log',
'formatter': 'verbose',
},
},
'loggers': {
'celery': {
'handlers': ['celery'],
'level': 'INFO',
},
},
}
搭配 Sentry 等錯誤追蹤服務,可以即時收到任務失敗的通知:
# settings.py
import sentry_sdk
from sentry_sdk.integrations.celery import CeleryIntegration
sentry_sdk.init(
dsn='your-sentry-dsn',
integrations=[CeleryIntegration()],
)
總結
本文深入探討了 Django + Celery 的進階功能,涵蓋生產環境中必須掌握的核心主題:
- Celery Beat 排程任務:
CELERY_BEAT_SCHEDULE定義靜態排程,django-celery-beat支援透過 Django Admin 動態管理排程 - 任務重試策略:
autoretry_for搭配retry_backoff實現自動指數退避重試,self.retry()提供手動重試的精細控制 - 任務工作流:
chain串行執行、group並行執行、chord並行後彙整,三種 Canvas 原語組合複雜的任務邏輯 - 多佇列與優先級:
CELERY_TASK_ROUTES定義任務路由,不同佇列分配不同數量的 Worker - Flower 監控:即時監控 Worker 狀態、任務歷史、佇列長度,生產環境必備
- 冪等性設計:使用狀態檢查、資料庫鎖、原子操作確保任務重複執行不會產生副作用
- 生產環境最佳實踐:Worker 調優、systemd 服務管理、超時與記憶體保護、測試策略、日誌與錯誤追蹤
掌握這些進階主題後,你已經具備建構生產級非同步任務系統的能力。在實際應用中,建議從簡單的任務定義開始,逐步加入重試機制、監控工具和多佇列架構,避免一開始就過度設計。記住:可靠的任務系統比快速的任務系統更重要。