Django + Celery 非同步任務佇列入門 | Django 教學

2026/06/22 2026/05/21
Django + Celery 非同步任務佇列入門 | Django 教學

Django 應用程式中,當使用者觸發了 Email 發送圖片處理報表生成 等耗時操作時,若在 View 中同步執行,使用者將被迫等待很久才能收到回應。Celery 是 Python 生態系中最成熟的 分散式任務佇列(Distributed Task Queue) 系統,它能將耗時工作交給背景 Worker 非同步執行,讓 Django 立即回應使用者。本文將從 Celery 的架構概念出發,帶你完成 Redis 安裝設定、celery.py 設定檔配置、使用 @shared_task 定義任務,到透過 delay()apply_async() 呼叫任務,最終啟動 Worker 並監控任務結果。

為什麼需要非同步任務?

Django 的 HTTP 請求處理是同步的:使用者發出請求後,伺服器必須在回應前完成所有邏輯。這在大多數情況下沒有問題,但以下場景會讓回應時間變得不可接受:

  • Email 發送:使用者註冊後發送歡迎信、訂單確認通知,SMTP 連線可能需要數秒
  • 圖片處理:壓縮、縮圖、加浮水印,耗時取決於圖片大小
  • 報表生成:大型 CSV / PDF 匯出,需要查詢大量資料並處理格式
  • 第三方 API 呼叫:支付 Webhook、社群媒體發文,外部服務回應時間不可控
  • 資料同步:從外部來源批次匯入或同步資料

如果這些操作在 View 中同步執行,使用者可能需要等待 10 秒、30 秒甚至更久。更糟的是,如果有大量使用者同時觸發耗時操作,所有 Web Server 的工作執行緒都會被佔滿,導致整個網站無法服務新的請求。

解決方案就是 非同步任務(Asynchronous Task):View 只負責「發送任務訊息」,然後立即回應使用者;實際的耗時工作由獨立的 Worker 程序在背景執行。


Celery 架構

Celery 是由 Ask Solem 於 2009 年發布的開源分散式任務佇列系統,採用 BSD-3-Clause 授權。它的架構由四個核心元件組成:

Django App(生產者)
    | .delay() / .apply_async()
    v
Broker(訊息代理)
    Redis 或 RabbitMQ
    | 分發任務
    v
Celery Worker(消費者)
    執行實際任務邏輯
    | 儲存結果(可選)
    v
Result Backend
    Redis / Django ORM / RPC
元件職責常見選擇
Django App產生任務、呼叫 .delay()
Broker(訊息代理)任務訊息的傳輸層,負責接收與分發任務Redis、RabbitMQ
Worker(工作者)從 Broker 取出任務並執行的獨立程序celery -A myproject worker
Result Backend(結果後端)儲存任務執行結果,供後續查詢Redis、Django ORM

Broker 是整個架構的核心,Django App 將任務訊息推送到 Broker,Worker 從 Broker 取出任務執行。本教學使用 Redis 作為 Broker,因為它同時也能擔任 Result Backend,部署最為簡便。


安裝

首先安裝 Celery 及相關套件:

# 安裝 Celery、Redis 客戶端與 Django Celery Results
pip install celery redis django-celery-results

各套件的用途:

套件用途
celeryCelery 主體
redisPython Redis 客戶端,讓 Celery 連線 Redis
django-celery-results將任務結果儲存到 Django ORM(可選)

確保你的環境中已經安裝並啟動了 Redis 服務:

# macOS(使用 Homebrew)
brew install redis
brew services start redis

# Ubuntu / Debian
sudo apt install redis-server
sudo systemctl start redis

# 驗證 Redis 是否正常運作
redis-cli ping
# 預期回應:PONG

settings.py Celery 設定

在 Django 的 settings.py 中加入 Celery 相關設定。Celery 會讀取所有以 CELERY_ 為前綴的設定項:

# settings.py

# Celery Broker 設定(使用 Redis)
CELERY_BROKER_URL = 'redis://localhost:6379/0'

# Result Backend 設定(使用 Redis)
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
# 或使用 Django ORM 儲存結果:
# CELERY_RESULT_BACKEND = 'django-db'
# 需在 INSTALLED_APPS 加入 'django_celery_results'

# 序列化設定(使用 JSON 格式)
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

# 時區設定
CELERY_TIMEZONE = 'Asia/Taipei'
CELERY_ENABLE_UTC = True

# 任務結果過期時間(秒)
CELERY_RESULT_EXPIRES = 60 * 60 * 24  # 24 小時後自動清除

# 任務超時設定
CELERY_TASK_SOFT_TIME_LIMIT = 300  # 5 分鐘,拋出 SoftTimeLimitExceeded(可捕捉)
CELERY_TASK_TIME_LIMIT = 600       # 10 分鐘,強制終止(無法捕捉)

如果選擇使用 Django ORM 作為 Result Backend,記得加入 django_celery_resultsINSTALLED_APPS 並執行 migrate:

# settings.py
INSTALLED_APPS = [
    # ... 其他 App
    'django_celery_results',
]
python manage.py migrate django_celery_results

celery.py 設定檔

在 Django 專案的主模組(與 settings.py 同一目錄)中建立 celery.py,這是 Celery App 的設定檔:

# myproject/celery.py
import os
from celery import Celery

# 設定 Django settings 模組,讓 Celery 能讀取 Django 設定
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

# 建立 Celery App 實例
app = Celery('myproject')

# 從 settings.py 中讀取所有 CELERY_ 開頭的設定
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自動探索所有 INSTALLED_APPS 中的 tasks.py 檔案
app.autodiscover_tasks()


@app.task(bind=True, ignore_result=True)
def debug_task(self):
    """除錯用任務,印出 request 資訊"""
    print(f'Request: {self.request!r}')

關鍵設定說明:

  • os.environ.setdefault():確保 Celery Worker 啟動時能找到 Django 設定
  • config_from_object(..., namespace='CELERY'):讓 Celery 自動讀取 settings.py 中所有 CELERY_ 開頭的設定
  • autodiscover_tasks():自動掃描所有已安裝 App 中的 tasks.py 檔案,註冊其中定義的任務

__init__.py 設定

修改專案主模組的 __init__.py,確保 Celery App 在 Django 啟動時自動載入:

# myproject/__init__.py
from .celery import app as celery_app

__all__ = ('celery_app',)

這一步非常重要。如果沒有在 __init__.py 中匯入 Celery App,@shared_task 裝飾器將無法正確註冊任務。

此時專案的目錄結構應該如下:

myproject/
    __init__.py      # 匯入 celery_app
    celery.py        # Celery App 設定
    settings.py      # Django 設定(含 CELERY_ 設定)
    urls.py
    wsgi.py
apps/
    users/
        tasks.py     # 定義使用者相關的非同步任務
    reports/
        tasks.py     # 定義報表相關的非同步任務

定義 Task(@shared_task)

Celery 使用 @shared_task 裝飾器定義任務。@shared_task 是推薦的寫法,因為它不需要直接匯入 Celery App 實例,適合在 Django App 中使用,避免循環匯入問題。

在你的 Django App 中建立 tasks.py

# apps/users/tasks.py
from celery import shared_task
from celery.utils.log import get_task_logger
from django.core.mail import send_mail

logger = get_task_logger(__name__)


@shared_task
def send_welcome_email(user_id: int) -> dict:
    """發送歡迎 Email(最簡單的任務定義)"""
    from apps.users.models import User  # 延遲匯入,避免循環 import

    user = User.objects.get(pk=user_id)
    send_mail(
        subject='歡迎加入!',
        message=f'親愛的 {user.username},歡迎加入我們的平台。',
        from_email='noreply@example.com',
        recipient_list=[user.email],
    )
    logger.info(f'歡迎信已發送給 user_id={user_id}')
    return {'status': 'success', 'user_id': user_id}

進階任務定義(bind 與重試)

加上 bind=True 參數後,任務函式的第一個參數會是 self(Task 實例),讓你可以存取任務的中繼資料並觸發重試:

# apps/users/tasks.py
@shared_task(
    bind=True,              # self 參數可存取 task 實例
    max_retries=3,          # 最多重試 3 次
    default_retry_delay=60, # 每次重試間隔 60 秒
)
def send_welcome_email_with_retry(self, user_id: int) -> dict:
    """發送歡迎 Email,失敗時自動重試"""
    from apps.users.models import User

    try:
        user = User.objects.get(pk=user_id)
        send_mail(
            subject='歡迎加入!',
            message=f'親愛的 {user.username},歡迎加入我們的平台。',
            from_email='noreply@example.com',
            recipient_list=[user.email],
        )
        logger.info(f'歡迎信已發送給 user_id={user_id}')
        return {'status': 'success', 'user_id': user_id}

    except User.DoesNotExist:
        logger.error(f'找不到 user_id={user_id}')
        raise  # 不重試,直接標記失敗

    except Exception as exc:
        logger.warning(f'發送失敗,準備重試:{exc}')
        raise self.retry(exc=exc)  # 觸發重試機制

重要提醒:任務函式的參數必須是可 JSON 序列化的型別(int、str、list、dict 等)。不要傳入 Model 實例或 QuerySet,而是傳入 ID,在任務中重新查詢。這是因為任務可能在數秒甚至數分鐘後才執行,傳入的物件可能已經過時。


呼叫 Task:delay、apply_async、signature

定義好任務後,就可以在 Django 的 View、Signal 或其他地方呼叫它。Celery 提供三種呼叫方式:

delay()(簡潔語法)

delay() 是最常用的呼叫方式,語法簡潔,直接傳入任務函式的參數:

# views.py
from apps.users.tasks import send_welcome_email

def register_view(request):
    # ... 處理註冊邏輯 ...
    user = User.objects.create_user(username='test', email='test@example.com')

    # 非同步發送歡迎信(立即返回,不等待 Email 發送完成)
    send_welcome_email.delay(user.id)

    return JsonResponse({'message': '註冊成功,歡迎信將稍後寄出'})

apply_async()(完整控制)

apply_async() 提供所有可用的參數控制:

from datetime import datetime

# 10 秒後才執行
send_welcome_email.apply_async(
    args=[user.id],
    countdown=10,
)

# 指定執行時間
send_welcome_email.apply_async(
    args=[user.id],
    eta=datetime(2026, 6, 22, 9, 0),
)

# 指定佇列
send_welcome_email.apply_async(
    args=[user.id],
    queue='high_priority',
)

# 設定重試策略
send_welcome_email.apply_async(
    args=[user.id],
    retry=True,
    retry_policy={
        'max_retries': 5,
        'interval_start': 0,    # 第一次重試的等待秒數
        'interval_step': 0.2,   # 每次重試遞增的秒數
        'interval_max': 0.5,    # 最大等待秒數
    },
)

signature(s(),用於工作流組合)

Signature(簽名) 是任務的「包裝物件」,可以先建立但延遲執行,也可以用於組合複雜的工作流(下一篇將詳細介紹):

from celery import signature

# 建立任務簽名
task_sig = send_welcome_email.s(user.id)

# 延遲執行
task_sig.delay()

# 也可以直接用 apply_async
task_sig.apply_async(countdown=30)

三種呼叫方式的比較:

方式語法適用場景
delay()task.delay(arg1, arg2)一般用途,簡潔方便
apply_async()task.apply_async(args=[...], **kwargs)需要指定 countdown、eta、queue 等
s() / signature()task.s(arg1)工作流組合(chain、group、chord)

監控 Task 結果(AsyncResult)

當你需要追蹤任務的執行狀態或取得結果時,可以使用 AsyncResult 物件。delay()apply_async() 都會回傳一個 AsyncResult:

from apps.users.tasks import send_welcome_email

# 呼叫任務,取得 AsyncResult
result = send_welcome_email.delay(user.id)

# 取得任務 ID(唯一識別碼)
print(result.id)       # 例如:'a1b2c3d4-e5f6-7890-abcd-ef1234567890'

# 檢查任務狀態
print(result.status)   # 'PENDING' / 'STARTED' / 'SUCCESS' / 'FAILURE' / 'RETRY'

# 檢查任務是否完成
print(result.ready())  # True / False

# 檢查任務是否成功
print(result.successful())  # True / False

# 取得任務結果(會阻塞直到任務完成,有 timeout 參數)
print(result.get(timeout=10))  # {'status': 'success', 'user_id': 1}

# 如果任務失敗,取得例外資訊
if result.failed():
    print(result.traceback)

你也可以透過任務 ID 從任何地方查詢任務狀態:

from celery.result import AsyncResult

# 用任務 ID 重新建立 AsyncResult 物件
result = AsyncResult('a1b2c3d4-e5f6-7890-abcd-ef1234567890')
print(result.status)
print(result.result)

任務的生命週期與狀態流轉:

狀態說明
PENDING任務已送出但尚未被 Worker 接收
STARTEDWorker 已開始執行任務
SUCCESS任務執行成功
FAILURE任務執行失敗
RETRY任務失敗後正在重試
REVOKED任務被撤銷

注意:如果沒有設定 Result Backend,result.status 將永遠回傳 PENDING,因為 Celery 無處儲存任務狀態。如果你需要追蹤任務結果,務必設定 CELERY_RESULT_BACKEND


在 View 中整合實際範例

以下是一個完整的範例,展示如何在 Django View 中使用 Celery 處理報表生成:

# apps/reports/tasks.py
from celery import shared_task
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)


@shared_task(bind=True)
def generate_report(self, report_id: int) -> dict:
    """生成報表(耗時操作)"""
    from apps.reports.models import Report

    report = Report.objects.get(pk=report_id)
    report.status = 'processing'
    report.task_id = self.request.id  # 儲存任務 ID,方便前端查詢進度
    report.save()

    try:
        # 模擬耗時的報表生成邏輯
        data = report.collect_data()
        file_path = report.render_to_pdf(data)

        report.status = 'completed'
        report.file_path = file_path
        report.save()

        logger.info(f'報表 {report_id} 生成完成:{file_path}')
        return {'status': 'completed', 'file_path': file_path}

    except Exception as exc:
        report.status = 'failed'
        report.save()
        logger.error(f'報表 {report_id} 生成失敗:{exc}')
        raise
# apps/reports/views.py
from django.http import JsonResponse
from apps.reports.models import Report
from apps.reports.tasks import generate_report
from celery.result import AsyncResult


def create_report_view(request):
    """建立報表並觸發非同步生成"""
    report = Report.objects.create(
        user=request.user,
        report_type=request.POST['type'],
        status='pending',
    )

    # 觸發非同步任務
    result = generate_report.delay(report.id)

    return JsonResponse({
        'message': '報表生成中,請稍候',
        'report_id': report.id,
        'task_id': result.id,
    })


def check_report_status_view(request, task_id):
    """查詢報表生成進度"""
    result = AsyncResult(task_id)

    return JsonResponse({
        'task_id': task_id,
        'status': result.status,
        'result': result.result if result.ready() else None,
    })

啟動 Worker

所有設定完成後,需要啟動 Celery Worker 來執行任務。開啟一個新的終端機視窗:

# 啟動 Worker(基本指令)
celery -A myproject worker --loglevel=info

啟動成功後會看到類似以下的輸出:

 -------------- celery@hostname v5.4.0 (opalescent)
--- ***** -----
-- ******* ---- Linux-5.15.0
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         myproject:0x...
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/1
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF
--- ***** -----
 -------------- [queues]
                .> celery          exchange=celery(direct) key=celery

[tasks]
  . apps.users.tasks.send_welcome_email
  . apps.reports.tasks.generate_report

常用的啟動參數:

# 指定併發數量(預設為 CPU 核心數)
celery -A myproject worker --concurrency=4 --loglevel=info

# 指定監聽的佇列
celery -A myproject worker -Q default,high_priority --loglevel=info

# 開發環境:使用 solo 模式(單執行緒,方便除錯)
celery -A myproject worker --pool=solo --loglevel=debug

開發提示:在開發階段,建議在 settings.py 中加入 CELERY_TASK_ALWAYS_EAGER = True,這樣所有任務會同步執行(不需要啟動 Worker),方便除錯。正式環境務必移除此設定。


總結

本文從零開始帶你完成了 Django + Celery 的入門設定與基本使用:

  1. 為什麼需要非同步任務:Email 發送、圖片處理、報表生成等耗時操作不應該阻塞 HTTP 請求
  2. Celery 架構:Django App(生產者)透過 Broker(Redis)發送任務訊息,Worker(消費者)執行任務,Result Backend 儲存結果
  3. 安裝與設定celeryredisdjango-celery-results 三個套件,搭配 settings.py 中的 CELERY_ 設定
  4. celery.py 與 __init__.py:建立 Celery App 實例、設定自動探索任務、確保 Django 啟動時載入
  5. @shared_task 定義任務:使用裝飾器宣告任務,參數必須可 JSON 序列化,透過延遲匯入避免循環依賴
  6. 呼叫任務delay() 簡潔方便、apply_async() 完整控制、s() 用於工作流組合
  7. AsyncResult 監控結果:追蹤任務狀態(PENDING → STARTED → SUCCESS/FAILURE),透過 result.get() 取得結果
  8. 啟動 Workercelery -A myproject worker --loglevel=info,開發環境可使用 CELERY_TASK_ALWAYS_EAGER 同步執行

掌握了 Celery 的基礎後,下一篇我們將深入探討 Celery Beat 排程任務任務重試策略Chain / Group / Chord 工作流Flower 監控工具 等進階主題,帶你打造生產級的非同步任務系統。

BenZ Software Developer

熱愛技術的軟體開發者,在這裡分享程式開發經驗與學習筆記。