Django + Celery 進階:排程任務、重試與監控 | Django 教學

2026/06/23 2026/05/22
Django + Celery 進階:排程任務、重試與監控 | Django 教學

在上一篇入門教學中,我們學會了 Celery 的基礎架構與任務定義。本篇將深入探討生產環境中不可或缺的進階功能:透過 Celery Beatdjango-celery-beat 實現 排程任務(Scheduled Tasks)、設計健壯的 任務重試策略(Retry Strategy) 搭配 指數退避(Exponential Backoff)、使用 ChainGroupChord 組合複雜的 任務工作流(Task Workflow)、配置 任務優先級與多佇列(Multi-Queue)、透過 Flower 監控工具即時掌握任務狀態,以及 冪等性(Idempotency) 設計原則與生產環境最佳實踐。

Celery Beat 定時任務

Celery Beat 是 Celery 內建的排程器(Scheduler),負責在指定的時間自動觸發任務,功能類似 Linux 的 crontab,但與 Django 深度整合。

靜態排程:CELERY_BEAT_SCHEDULE

最簡單的方式是在 settings.py 中定義排程:

# settings.py
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    # 每天凌晨 2:00 清理過期 Session
    'cleanup-expired-sessions': {
        'task': 'apps.users.tasks.cleanup_expired_sessions',
        'schedule': crontab(hour=2, minute=0),
    },
    # 每 5 分鐘同步外部資料
    'sync-external-data': {
        'task': 'apps.data.tasks.sync_data',
        'schedule': 300.0,  # 秒數
        'args': ('source_a',),  # 任務參數
    },
    # 每週一早上 8:00 發送週報
    'weekly-report': {
        'task': 'apps.reports.tasks.send_weekly_report',
        'schedule': crontab(hour=8, minute=0, day_of_week=1),
    },
    # 每月 1 號凌晨 3:00 生成月報
    'monthly-report': {
        'task': 'apps.reports.tasks.generate_monthly_report',
        'schedule': crontab(hour=3, minute=0, day_of_month=1),
    },
}

crontab() 支援與 Linux crontab 相同的時間表達式:

參數說明範例
minute分鐘(0-59)crontab(minute=0) 每小時整點
hour小時(0-23)crontab(hour=2, minute=0) 每天凌晨 2 點
day_of_week星期幾(0=週一, 6=週日)crontab(day_of_week=1) 每週一
day_of_month日期(1-31)crontab(day_of_month=1) 每月 1 號
month_of_year月份(1-12)crontab(month_of_year='1,7') 1 月和 7 月

動態排程:django-celery-beat

靜態排程的缺點是修改排程需要改程式碼並重啟服務。django-celery-beat 將排程資料存入資料庫,讓你透過 Django Admin 動態管理排程,不需要重新部署。

# 安裝
pip install django-celery-beat
# settings.py
INSTALLED_APPS = [
    # ... 其他 App
    'django_celery_beat',
]

# 指定 Beat 使用資料庫排程器
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# 建立資料表
python manage.py migrate django_celery_beat

設定完成後,在 Django Admin 中你會看到 Periodic Tasks 區塊,可以直接在後台新增、修改、啟用或停用排程任務,完全不需要修改程式碼。

啟動 Celery Beat

Celery Beat 需要獨立啟動一個程序:

# 啟動 Beat 排程器
celery -A myproject beat --loglevel=info

# 同時啟動 Worker 和 Beat(開發環境適用)
celery -A myproject worker --beat --loglevel=info

重要提醒:Celery Beat 只能啟動一個實例。如果同時運行多個 Beat 程序,排程任務會被重複觸發。在多台機器部署時,務必確保只有一台機器啟動 Beat。


任務重試策略

在生產環境中,任務失敗是不可避免的:網路瞬斷、第三方 API 暫時不可用、資料庫連線逾時等。設計良好的重試策略是確保任務最終成功執行的關鍵。

自動重試:autoretry_for

autoretry_for 是最方便的宣告式重試方式,指定要重試的例外類別即可:

from celery import shared_task
import requests

@shared_task(
    autoretry_for=(requests.RequestException, ConnectionError),
    retry_backoff=True,       # 啟用指數退避
    retry_backoff_max=600,    # 最大退避時間 600 秒
    retry_jitter=True,        # 加入隨機抖動,避免多個任務同時重試
    max_retries=5,            # 最多重試 5 次
)
def call_external_api(url: str, payload: dict) -> dict:
    """呼叫外部 API,失敗時自動重試"""
    response = requests.post(url, json=payload, timeout=30)
    response.raise_for_status()
    return response.json()

啟用 retry_backoff=True 後,重試的等待時間會按指數增長:

重試次數等待時間(約)
第 1 次1 秒
第 2 次2 秒
第 3 次4 秒
第 4 次8 秒
第 5 次16 秒

加上 retry_jitter=True 後,每次等待時間會加入隨機偏移,避免多個失敗任務在同一時刻同時重試(又稱為 Thundering Herd(雷群問題))。

手動重試:self.retry()

當你需要根據不同的例外類型做出不同的處理決策時,使用手動重試:

@shared_task(bind=True, max_retries=3)
def process_payment(self, order_id: int):
    """處理付款,根據錯誤類型決定是否重試"""
    from apps.orders.models import Order

    try:
        order = Order.objects.get(pk=order_id)
        result = payment_gateway.charge(order.amount, order.payment_token)
        order.status = 'paid'
        order.save()
        return {'status': 'paid', 'order_id': order_id}

    except PaymentGatewayTimeoutError as exc:
        # 網路逾時:重試,使用指數退避
        raise self.retry(
            exc=exc,
            countdown=2 ** self.request.retries,  # 手動實作指數退避
        )

    except InsufficientFundsError:
        # 餘額不足:不重試,直接標記失敗
        order.status = 'payment_failed'
        order.save()
        return {'status': 'failed', 'reason': 'insufficient_funds'}

    except InvalidPaymentTokenError:
        # Token 無效:不重試,直接標記失敗
        order.status = 'payment_failed'
        order.save()
        return {'status': 'failed', 'reason': 'invalid_token'}

retry_kwargs 進階設定

@shared_task(
    bind=True,
    autoretry_for=(Exception,),
    max_retries=5,
    retry_kwargs={'max_retries': 5},  # 也可以在這裡指定
    retry_backoff=True,
)
def resilient_task(self, data: dict):
    """具備完整重試策略的任務"""
    pass

任務工作流:Chain、Group、Chord

Celery 提供了 Canvas(畫布)API,讓你將多個任務組合成複雜的工作流。

Chain(任務鏈)

Chain 將多個任務串行執行,前一個任務的回傳值會自動傳遞給下一個任務作為第一個參數:

from celery import chain

# 定義任務
@shared_task
def fetch_data(source_url: str) -> dict:
    """步驟一:從外部來源取得資料"""
    response = requests.get(source_url)
    return response.json()

@shared_task
def process_data(raw_data: dict) -> dict:
    """步驟二:處理原始資料(接收上一步的回傳值)"""
    processed = transform(raw_data)
    return processed

@shared_task
def save_result(processed_data: dict) -> dict:
    """步驟三:儲存處理結果"""
    Report.objects.create(data=processed_data)
    return {'status': 'saved'}

# 串行執行:fetch → process → save
result = chain(
    fetch_data.s('https://api.example.com/data'),
    process_data.s(),  # 自動接收 fetch_data 的回傳值
    save_result.s(),   # 自動接收 process_data 的回傳值
).delay()

也可以使用管道運算子 | 簡寫:

result = (fetch_data.s('https://api.example.com/data') | process_data.s() | save_result.s()).delay()

Group(任務群組)

Group 將多個任務並行執行,各自獨立、互不依賴:

from celery import group

# 並行發送 Email 給多位使用者
result = group(
    send_email.s(user_id) for user_id in [1, 2, 3, 4, 5]
).delay()

# 等待所有任務完成,取得所有結果
all_results = result.get(timeout=60)  # [result1, result2, ...]

Chord(和弦)

Chord 是 Group 的延伸:先並行執行一組任務,全部完成後,將所有結果傳給一個 callback(回呼函式):

from celery import chord

@shared_task
def process_item(item_id: int) -> dict:
    """處理單個項目"""
    item = Item.objects.get(pk=item_id)
    return {'id': item_id, 'result': item.compute()}

@shared_task
def aggregate_results(results: list) -> dict:
    """彙整所有結果"""
    total = sum(r['result'] for r in results)
    Summary.objects.create(total=total, count=len(results))
    return {'total': total, 'count': len(results)}

# 並行處理所有項目,全部完成後彙整結果
result = chord(
    group(process_item.s(item_id) for item_id in item_ids),
    aggregate_results.s()  # callback:接收所有結果的 list
).delay()

三種工作流的比較:

工作流執行方式適用場景
Chain串行,前一個的結果傳給下一個ETL 流程、多步驟處理
Group並行,各自獨立批次發送通知、並行處理
Chord並行 + callback分散計算後彙整結果

任務優先級與多佇列

在生產環境中,不同類型的任務有不同的優先級。Email 發送可能需要即時處理,但報表生成可以排在後面慢慢來。透過 多佇列(Multi-Queue) 機制,可以將不同優先級的任務分配到不同的佇列,由不同的 Worker 處理。

定義任務路由

# settings.py
CELERY_TASK_ROUTES = {
    'apps.emails.tasks.*': {'queue': 'high_priority'},
    'apps.reports.tasks.*': {'queue': 'low_priority'},
    'apps.data.tasks.*': {'queue': 'default'},
}

# Redis 支援優先佇列(0-9,數字越小優先級越高)
CELERY_TASK_QUEUE_MAX_PRIORITY = 10
CELERY_TASK_DEFAULT_PRIORITY = 5

啟動專屬 Worker

# 高優先級佇列:分配較多 Worker
celery -A myproject worker -Q high_priority --concurrency=8 --loglevel=info

# 預設佇列
celery -A myproject worker -Q default --concurrency=4 --loglevel=info

# 低優先級佇列:分配較少 Worker
celery -A myproject worker -Q low_priority --concurrency=2 --loglevel=info

你也可以在呼叫任務時動態指定佇列:

# 將這個特定任務送到高優先級佇列
send_urgent_email.apply_async(
    args=[user_id],
    queue='high_priority',
)

Flower 監控工具

Flower 是 Celery 官方推薦的 Web 監控工具,提供即時的任務與 Worker 狀態監控介面。

安裝與啟動

# 安裝
pip install flower

# 啟動 Flower Web UI(預設 http://localhost:5555)
celery -A myproject flower --port=5555

# 設定帳號密碼保護(生產環境必須設定)
celery -A myproject flower \
    --port=5555 \
    --basic_auth=admin:your_secure_password

Flower 提供的功能

Flower 的 Web 介面提供以下監控能力:

  • Workers 狀態:查看每個 Worker 的上線狀態、CPU 使用率、記憶體用量、已處理任務數
  • 任務歷史:瀏覽所有已執行、執行中、失敗的任務列表,包含詳細的參數與錯誤訊息
  • 即時監控:即時更新的任務執行圖表,包含成功率、失敗率、執行時間分布
  • 佇列狀態:查看各佇列中等待執行的任務數量
  • 任務操作:直接在介面上撤銷或重新執行任務

生產環境建議

在生產環境中,建議將 Flower 作為 systemd 服務運行,並透過 Nginx 反向代理加上 HTTPS:

# /etc/systemd/system/celery-flower.service
[Unit]
Description=Celery Flower
After=network.target

[Service]
Type=simple
User=deploy
WorkingDirectory=/path/to/project
ExecStart=/path/to/venv/bin/celery -A myproject flower \
    --port=5555 \
    --basic_auth=admin:password
Restart=always

[Install]
WantedBy=multi-user.target

冪等性設計

冪等性(Idempotency) 是指同一個操作執行一次和執行多次的結果完全相同。在 Celery 中,這個概念尤其重要,因為 Celery 採用 at-least-once(至少一次)保證機制,同一個任務可能因為網路問題、Worker 崩潰或 Broker 重新投遞等原因而執行多次。

冪等任務的設計模式

from django.db import transaction
from celery import shared_task
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)


@shared_task(bind=True)
def process_order(self, order_id: int):
    """冪等任務設計:使用資料庫狀態防止重複處理"""
    from apps.orders.models import Order

    with transaction.atomic():
        # select_for_update 取得資料庫行鎖,防止並發競爭
        order = Order.objects.select_for_update().get(pk=order_id)

        # 冪等性檢查:若已處理則直接返回
        if order.status != 'pending':
            logger.info(f'Order {order_id} 已處理(狀態:{order.status}),跳過')
            return {'skipped': True, 'order_id': order_id}

        # 執行業務邏輯
        order.process()
        order.status = 'processed'
        order.save()

    return {'processed': True, 'order_id': order_id}

冪等性設計原則

  1. 傳入唯一識別碼:傳入訂單 ID、使用者 ID 等唯一識別碼,而非操作指令(如「扣款 100 元」)
  2. 狀態檢查:在執行前檢查目標物件的狀態,避免重複操作
  3. 資料庫鎖:使用 select_for_update() 防止並發競爭
  4. 原子操作:使用 transaction.atomic() 確保操作的原子性
  5. 冪等的外部呼叫:呼叫第三方 API 時,使用其提供的冪等 key(如 Stripe 的 Idempotency Key)
@shared_task(bind=True)
def charge_payment(self, order_id: int):
    """使用冪等 key 呼叫付款 API"""
    from apps.orders.models import Order

    order = Order.objects.get(pk=order_id)

    # 使用 order_id 作為冪等 key
    # 即使此任務執行多次,付款 API 也只會收一次款
    result = stripe.PaymentIntent.create(
        amount=order.amount,
        currency='twd',
        idempotency_key=f'order_{order_id}',  # 冪等 key
    )

    order.payment_intent_id = result.id
    order.status = 'paid'
    order.save()
    return {'status': 'paid', 'order_id': order_id}

任務超時與記憶體保護

在生產環境中,必須為任務設定超時限制,避免單一任務卡住整個 Worker:

from billiard.exceptions import SoftTimeLimitExceeded

@shared_task(
    soft_time_limit=300,  # 300 秒後拋出 SoftTimeLimitExceeded(可捕捉)
    time_limit=600,       # 600 秒後強制 SIGKILL(無法捕捉)
)
def generate_large_report(report_id: int):
    """生成大型報表,設定超時保護"""
    from apps.reports.models import Report

    try:
        report = Report.objects.get(pk=report_id)
        # 耗時的報表生成邏輯
        report.generate()
        report.status = 'completed'
        report.save()
    except SoftTimeLimitExceeded:
        # 優雅地處理超時:標記報表為失敗
        Report.objects.filter(pk=report_id).update(status='timeout')
        raise  # 重新拋出,讓 Celery 記錄失敗

記憶體保護:透過 --max-tasks-per-child 參數,讓 Worker 在執行一定數量的任務後自動重啟,防止記憶體洩漏:

celery -A myproject worker \
    --concurrency=4 \
    --max-tasks-per-child=1000 \  # 每個子程序執行 1000 個任務後重啟
    --prefetch-multiplier=1 \     # 每次只預取 1 個任務
    -Ofair \                      # 公平排程
    --loglevel=info

生產環境最佳實踐

Worker 調優

# CPU 密集型任務:concurrency = CPU 核心數
celery -A myproject worker --concurrency=4 --loglevel=info

# I/O 密集型任務:使用 gevent,可提高併發數
celery -A myproject worker --concurrency=100 -P gevent --loglevel=info
場景Pool 模式concurrency 建議
CPU 密集型(圖片處理)prefork(預設)CPU 核心數
I/O 密集型(API 呼叫、Email)gevent / eventlet50-200
開發除錯solo1

systemd 服務設定

在生產環境中,Worker 和 Beat 應作為系統服務運行:

# /etc/systemd/system/celery-worker.service
[Unit]
Description=Celery Worker
After=network.target redis.service

[Service]
Type=forking
User=deploy
WorkingDirectory=/path/to/project
ExecStart=/path/to/venv/bin/celery multi start worker \
    -A myproject \
    --concurrency=4 \
    --max-tasks-per-child=1000 \
    --loglevel=info \
    --logfile=/var/log/celery/worker.log \
    --pidfile=/var/run/celery/worker.pid
ExecStop=/path/to/venv/bin/celery multi stopwait worker \
    --pidfile=/var/run/celery/worker.pid
Restart=always

[Install]
WantedBy=multi-user.target
# 啟用並啟動服務
sudo systemctl enable celery-worker
sudo systemctl start celery-worker

測試 Celery Tasks

# 方式一:CELERY_TASK_ALWAYS_EAGER(同步執行,不需啟動 Worker)
# settings_test.py
CELERY_TASK_ALWAYS_EAGER = True
CELERY_TASK_EAGER_PROPAGATES = True  # 讓例外正常拋出

# 方式二:直接呼叫任務函式(不通過 Celery 基礎設施)
# tests/test_tasks.py
from unittest.mock import patch
import pytest

@pytest.mark.django_db
def test_send_welcome_email(user):
    """直接呼叫任務函式進行測試"""
    with patch('django.core.mail.send_mail') as mock_mail:
        send_welcome_email(user.id)
        mock_mail.assert_called_once()

# 方式三:測試任務是否被正確觸發
def test_registration_triggers_email(client, db):
    with patch('apps.users.tasks.send_welcome_email.delay') as mock_task:
        client.post('/api/register/', {'username': 'test', 'password': 'secure123'})
        mock_task.assert_called_once()

日誌與錯誤追蹤

# settings.py — 設定 Celery 日誌
CELERY_WORKER_HIJACK_ROOT_LOGGER = False  # 不覆蓋 Django 的 logging 設定

LOGGING = {
    'version': 1,
    'handlers': {
        'celery': {
            'class': 'logging.FileHandler',
            'filename': '/var/log/celery/tasks.log',
            'formatter': 'verbose',
        },
    },
    'loggers': {
        'celery': {
            'handlers': ['celery'],
            'level': 'INFO',
        },
    },
}

搭配 Sentry 等錯誤追蹤服務,可以即時收到任務失敗的通知:

# settings.py
import sentry_sdk
from sentry_sdk.integrations.celery import CeleryIntegration

sentry_sdk.init(
    dsn='your-sentry-dsn',
    integrations=[CeleryIntegration()],
)

總結

本文深入探討了 Django + Celery 的進階功能,涵蓋生產環境中必須掌握的核心主題:

  1. Celery Beat 排程任務CELERY_BEAT_SCHEDULE 定義靜態排程,django-celery-beat 支援透過 Django Admin 動態管理排程
  2. 任務重試策略autoretry_for 搭配 retry_backoff 實現自動指數退避重試,self.retry() 提供手動重試的精細控制
  3. 任務工作流chain 串行執行、group 並行執行、chord 並行後彙整,三種 Canvas 原語組合複雜的任務邏輯
  4. 多佇列與優先級CELERY_TASK_ROUTES 定義任務路由,不同佇列分配不同數量的 Worker
  5. Flower 監控:即時監控 Worker 狀態、任務歷史、佇列長度,生產環境必備
  6. 冪等性設計:使用狀態檢查、資料庫鎖、原子操作確保任務重複執行不會產生副作用
  7. 生產環境最佳實踐:Worker 調優、systemd 服務管理、超時與記憶體保護、測試策略、日誌與錯誤追蹤

掌握這些進階主題後,你已經具備建構生產級非同步任務系統的能力。在實際應用中,建議從簡單的任務定義開始,逐步加入重試機制、監控工具和多佇列架構,避免一開始就過度設計。記住:可靠的任務系統比快速的任務系統更重要。

BenZ Software Developer

熱愛技術的軟體開發者,在這裡分享程式開發經驗與學習筆記。