Django + Celery 非同步任務佇列入門 | Django 教學
在 Django 應用程式中,當使用者觸發了 Email 發送、圖片處理、報表生成 等耗時操作時,若在 View 中同步執行,使用者將被迫等待很久才能收到回應。Celery 是 Python 生態系中最成熟的 分散式任務佇列(Distributed Task Queue) 系統,它能將耗時工作交給背景 Worker 非同步執行,讓 Django 立即回應使用者。本文將從 Celery 的架構概念出發,帶你完成 Redis 安裝設定、
celery.py設定檔配置、使用@shared_task定義任務,到透過delay()與apply_async()呼叫任務,最終啟動 Worker 並監控任務結果。
為什麼需要非同步任務?
Django 的 HTTP 請求處理是同步的:使用者發出請求後,伺服器必須在回應前完成所有邏輯。這在大多數情況下沒有問題,但以下場景會讓回應時間變得不可接受:
- Email 發送:使用者註冊後發送歡迎信、訂單確認通知,SMTP 連線可能需要數秒
- 圖片處理:壓縮、縮圖、加浮水印,耗時取決於圖片大小
- 報表生成:大型 CSV / PDF 匯出,需要查詢大量資料並處理格式
- 第三方 API 呼叫:支付 Webhook、社群媒體發文,外部服務回應時間不可控
- 資料同步:從外部來源批次匯入或同步資料
如果這些操作在 View 中同步執行,使用者可能需要等待 10 秒、30 秒甚至更久。更糟的是,如果有大量使用者同時觸發耗時操作,所有 Web Server 的工作執行緒都會被佔滿,導致整個網站無法服務新的請求。
解決方案就是 非同步任務(Asynchronous Task):View 只負責「發送任務訊息」,然後立即回應使用者;實際的耗時工作由獨立的 Worker 程序在背景執行。
Celery 架構
Celery 是由 Ask Solem 於 2009 年發布的開源分散式任務佇列系統,採用 BSD-3-Clause 授權。它的架構由四個核心元件組成:
Django App(生產者)
| .delay() / .apply_async()
v
Broker(訊息代理)
Redis 或 RabbitMQ
| 分發任務
v
Celery Worker(消費者)
執行實際任務邏輯
| 儲存結果(可選)
v
Result Backend
Redis / Django ORM / RPC
| 元件 | 職責 | 常見選擇 |
|---|---|---|
| Django App | 產生任務、呼叫 .delay() | – |
| Broker(訊息代理) | 任務訊息的傳輸層,負責接收與分發任務 | Redis、RabbitMQ |
| Worker(工作者) | 從 Broker 取出任務並執行的獨立程序 | celery -A myproject worker |
| Result Backend(結果後端) | 儲存任務執行結果,供後續查詢 | Redis、Django ORM |
Broker 是整個架構的核心,Django App 將任務訊息推送到 Broker,Worker 從 Broker 取出任務執行。本教學使用 Redis 作為 Broker,因為它同時也能擔任 Result Backend,部署最為簡便。
安裝
首先安裝 Celery 及相關套件:
# 安裝 Celery、Redis 客戶端與 Django Celery Results
pip install celery redis django-celery-results
各套件的用途:
| 套件 | 用途 |
|---|---|
celery | Celery 主體 |
redis | Python Redis 客戶端,讓 Celery 連線 Redis |
django-celery-results | 將任務結果儲存到 Django ORM(可選) |
確保你的環境中已經安裝並啟動了 Redis 服務:
# macOS(使用 Homebrew)
brew install redis
brew services start redis
# Ubuntu / Debian
sudo apt install redis-server
sudo systemctl start redis
# 驗證 Redis 是否正常運作
redis-cli ping
# 預期回應:PONG
settings.py Celery 設定
在 Django 的 settings.py 中加入 Celery 相關設定。Celery 會讀取所有以 CELERY_ 為前綴的設定項:
# settings.py
# Celery Broker 設定(使用 Redis)
CELERY_BROKER_URL = 'redis://localhost:6379/0'
# Result Backend 設定(使用 Redis)
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
# 或使用 Django ORM 儲存結果:
# CELERY_RESULT_BACKEND = 'django-db'
# 需在 INSTALLED_APPS 加入 'django_celery_results'
# 序列化設定(使用 JSON 格式)
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
# 時區設定
CELERY_TIMEZONE = 'Asia/Taipei'
CELERY_ENABLE_UTC = True
# 任務結果過期時間(秒)
CELERY_RESULT_EXPIRES = 60 * 60 * 24 # 24 小時後自動清除
# 任務超時設定
CELERY_TASK_SOFT_TIME_LIMIT = 300 # 5 分鐘,拋出 SoftTimeLimitExceeded(可捕捉)
CELERY_TASK_TIME_LIMIT = 600 # 10 分鐘,強制終止(無法捕捉)
如果選擇使用 Django ORM 作為 Result Backend,記得加入 django_celery_results 到 INSTALLED_APPS 並執行 migrate:
# settings.py
INSTALLED_APPS = [
# ... 其他 App
'django_celery_results',
]
python manage.py migrate django_celery_results
celery.py 設定檔
在 Django 專案的主模組(與 settings.py 同一目錄)中建立 celery.py,這是 Celery App 的設定檔:
# myproject/celery.py
import os
from celery import Celery
# 設定 Django settings 模組,讓 Celery 能讀取 Django 設定
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# 建立 Celery App 實例
app = Celery('myproject')
# 從 settings.py 中讀取所有 CELERY_ 開頭的設定
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自動探索所有 INSTALLED_APPS 中的 tasks.py 檔案
app.autodiscover_tasks()
@app.task(bind=True, ignore_result=True)
def debug_task(self):
"""除錯用任務,印出 request 資訊"""
print(f'Request: {self.request!r}')
關鍵設定說明:
os.environ.setdefault():確保 Celery Worker 啟動時能找到 Django 設定config_from_object(..., namespace='CELERY'):讓 Celery 自動讀取settings.py中所有CELERY_開頭的設定autodiscover_tasks():自動掃描所有已安裝 App 中的tasks.py檔案,註冊其中定義的任務
__init__.py 設定
修改專案主模組的 __init__.py,確保 Celery App 在 Django 啟動時自動載入:
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
這一步非常重要。如果沒有在 __init__.py 中匯入 Celery App,@shared_task 裝飾器將無法正確註冊任務。
此時專案的目錄結構應該如下:
myproject/
__init__.py # 匯入 celery_app
celery.py # Celery App 設定
settings.py # Django 設定(含 CELERY_ 設定)
urls.py
wsgi.py
apps/
users/
tasks.py # 定義使用者相關的非同步任務
reports/
tasks.py # 定義報表相關的非同步任務
定義 Task(@shared_task)
Celery 使用 @shared_task 裝飾器定義任務。@shared_task 是推薦的寫法,因為它不需要直接匯入 Celery App 實例,適合在 Django App 中使用,避免循環匯入問題。
在你的 Django App 中建立 tasks.py:
# apps/users/tasks.py
from celery import shared_task
from celery.utils.log import get_task_logger
from django.core.mail import send_mail
logger = get_task_logger(__name__)
@shared_task
def send_welcome_email(user_id: int) -> dict:
"""發送歡迎 Email(最簡單的任務定義)"""
from apps.users.models import User # 延遲匯入,避免循環 import
user = User.objects.get(pk=user_id)
send_mail(
subject='歡迎加入!',
message=f'親愛的 {user.username},歡迎加入我們的平台。',
from_email='noreply@example.com',
recipient_list=[user.email],
)
logger.info(f'歡迎信已發送給 user_id={user_id}')
return {'status': 'success', 'user_id': user_id}
進階任務定義(bind 與重試)
加上 bind=True 參數後,任務函式的第一個參數會是 self(Task 實例),讓你可以存取任務的中繼資料並觸發重試:
# apps/users/tasks.py
@shared_task(
bind=True, # self 參數可存取 task 實例
max_retries=3, # 最多重試 3 次
default_retry_delay=60, # 每次重試間隔 60 秒
)
def send_welcome_email_with_retry(self, user_id: int) -> dict:
"""發送歡迎 Email,失敗時自動重試"""
from apps.users.models import User
try:
user = User.objects.get(pk=user_id)
send_mail(
subject='歡迎加入!',
message=f'親愛的 {user.username},歡迎加入我們的平台。',
from_email='noreply@example.com',
recipient_list=[user.email],
)
logger.info(f'歡迎信已發送給 user_id={user_id}')
return {'status': 'success', 'user_id': user_id}
except User.DoesNotExist:
logger.error(f'找不到 user_id={user_id}')
raise # 不重試,直接標記失敗
except Exception as exc:
logger.warning(f'發送失敗,準備重試:{exc}')
raise self.retry(exc=exc) # 觸發重試機制
重要提醒:任務函式的參數必須是可 JSON 序列化的型別(int、str、list、dict 等)。不要傳入 Model 實例或 QuerySet,而是傳入 ID,在任務中重新查詢。這是因為任務可能在數秒甚至數分鐘後才執行,傳入的物件可能已經過時。
呼叫 Task:delay、apply_async、signature
定義好任務後,就可以在 Django 的 View、Signal 或其他地方呼叫它。Celery 提供三種呼叫方式:
delay()(簡潔語法)
delay() 是最常用的呼叫方式,語法簡潔,直接傳入任務函式的參數:
# views.py
from apps.users.tasks import send_welcome_email
def register_view(request):
# ... 處理註冊邏輯 ...
user = User.objects.create_user(username='test', email='test@example.com')
# 非同步發送歡迎信(立即返回,不等待 Email 發送完成)
send_welcome_email.delay(user.id)
return JsonResponse({'message': '註冊成功,歡迎信將稍後寄出'})
apply_async()(完整控制)
apply_async() 提供所有可用的參數控制:
from datetime import datetime
# 10 秒後才執行
send_welcome_email.apply_async(
args=[user.id],
countdown=10,
)
# 指定執行時間
send_welcome_email.apply_async(
args=[user.id],
eta=datetime(2026, 6, 22, 9, 0),
)
# 指定佇列
send_welcome_email.apply_async(
args=[user.id],
queue='high_priority',
)
# 設定重試策略
send_welcome_email.apply_async(
args=[user.id],
retry=True,
retry_policy={
'max_retries': 5,
'interval_start': 0, # 第一次重試的等待秒數
'interval_step': 0.2, # 每次重試遞增的秒數
'interval_max': 0.5, # 最大等待秒數
},
)
signature(s(),用於工作流組合)
Signature(簽名) 是任務的「包裝物件」,可以先建立但延遲執行,也可以用於組合複雜的工作流(下一篇將詳細介紹):
from celery import signature
# 建立任務簽名
task_sig = send_welcome_email.s(user.id)
# 延遲執行
task_sig.delay()
# 也可以直接用 apply_async
task_sig.apply_async(countdown=30)
三種呼叫方式的比較:
| 方式 | 語法 | 適用場景 |
|---|---|---|
delay() | task.delay(arg1, arg2) | 一般用途,簡潔方便 |
apply_async() | task.apply_async(args=[...], **kwargs) | 需要指定 countdown、eta、queue 等 |
s() / signature() | task.s(arg1) | 工作流組合(chain、group、chord) |
監控 Task 結果(AsyncResult)
當你需要追蹤任務的執行狀態或取得結果時,可以使用 AsyncResult 物件。delay() 和 apply_async() 都會回傳一個 AsyncResult:
from apps.users.tasks import send_welcome_email
# 呼叫任務,取得 AsyncResult
result = send_welcome_email.delay(user.id)
# 取得任務 ID(唯一識別碼)
print(result.id) # 例如:'a1b2c3d4-e5f6-7890-abcd-ef1234567890'
# 檢查任務狀態
print(result.status) # 'PENDING' / 'STARTED' / 'SUCCESS' / 'FAILURE' / 'RETRY'
# 檢查任務是否完成
print(result.ready()) # True / False
# 檢查任務是否成功
print(result.successful()) # True / False
# 取得任務結果(會阻塞直到任務完成,有 timeout 參數)
print(result.get(timeout=10)) # {'status': 'success', 'user_id': 1}
# 如果任務失敗,取得例外資訊
if result.failed():
print(result.traceback)
你也可以透過任務 ID 從任何地方查詢任務狀態:
from celery.result import AsyncResult
# 用任務 ID 重新建立 AsyncResult 物件
result = AsyncResult('a1b2c3d4-e5f6-7890-abcd-ef1234567890')
print(result.status)
print(result.result)
任務的生命週期與狀態流轉:
| 狀態 | 說明 |
|---|---|
PENDING | 任務已送出但尚未被 Worker 接收 |
STARTED | Worker 已開始執行任務 |
SUCCESS | 任務執行成功 |
FAILURE | 任務執行失敗 |
RETRY | 任務失敗後正在重試 |
REVOKED | 任務被撤銷 |
注意:如果沒有設定 Result Backend,
result.status將永遠回傳PENDING,因為 Celery 無處儲存任務狀態。如果你需要追蹤任務結果,務必設定CELERY_RESULT_BACKEND。
在 View 中整合實際範例
以下是一個完整的範例,展示如何在 Django View 中使用 Celery 處理報表生成:
# apps/reports/tasks.py
from celery import shared_task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@shared_task(bind=True)
def generate_report(self, report_id: int) -> dict:
"""生成報表(耗時操作)"""
from apps.reports.models import Report
report = Report.objects.get(pk=report_id)
report.status = 'processing'
report.task_id = self.request.id # 儲存任務 ID,方便前端查詢進度
report.save()
try:
# 模擬耗時的報表生成邏輯
data = report.collect_data()
file_path = report.render_to_pdf(data)
report.status = 'completed'
report.file_path = file_path
report.save()
logger.info(f'報表 {report_id} 生成完成:{file_path}')
return {'status': 'completed', 'file_path': file_path}
except Exception as exc:
report.status = 'failed'
report.save()
logger.error(f'報表 {report_id} 生成失敗:{exc}')
raise
# apps/reports/views.py
from django.http import JsonResponse
from apps.reports.models import Report
from apps.reports.tasks import generate_report
from celery.result import AsyncResult
def create_report_view(request):
"""建立報表並觸發非同步生成"""
report = Report.objects.create(
user=request.user,
report_type=request.POST['type'],
status='pending',
)
# 觸發非同步任務
result = generate_report.delay(report.id)
return JsonResponse({
'message': '報表生成中,請稍候',
'report_id': report.id,
'task_id': result.id,
})
def check_report_status_view(request, task_id):
"""查詢報表生成進度"""
result = AsyncResult(task_id)
return JsonResponse({
'task_id': task_id,
'status': result.status,
'result': result.result if result.ready() else None,
})
啟動 Worker
所有設定完成後,需要啟動 Celery Worker 來執行任務。開啟一個新的終端機視窗:
# 啟動 Worker(基本指令)
celery -A myproject worker --loglevel=info
啟動成功後會看到類似以下的輸出:
-------------- celery@hostname v5.4.0 (opalescent)
--- ***** -----
-- ******* ---- Linux-5.15.0
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: myproject:0x...
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost:6379/1
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. apps.users.tasks.send_welcome_email
. apps.reports.tasks.generate_report
常用的啟動參數:
# 指定併發數量(預設為 CPU 核心數)
celery -A myproject worker --concurrency=4 --loglevel=info
# 指定監聽的佇列
celery -A myproject worker -Q default,high_priority --loglevel=info
# 開發環境:使用 solo 模式(單執行緒,方便除錯)
celery -A myproject worker --pool=solo --loglevel=debug
開發提示:在開發階段,建議在
settings.py中加入CELERY_TASK_ALWAYS_EAGER = True,這樣所有任務會同步執行(不需要啟動 Worker),方便除錯。正式環境務必移除此設定。
總結
本文從零開始帶你完成了 Django + Celery 的入門設定與基本使用:
- 為什麼需要非同步任務:Email 發送、圖片處理、報表生成等耗時操作不應該阻塞 HTTP 請求
- Celery 架構:Django App(生產者)透過 Broker(Redis)發送任務訊息,Worker(消費者)執行任務,Result Backend 儲存結果
- 安裝與設定:
celery、redis、django-celery-results三個套件,搭配settings.py中的CELERY_設定 - celery.py 與 __init__.py:建立 Celery App 實例、設定自動探索任務、確保 Django 啟動時載入
- @shared_task 定義任務:使用裝飾器宣告任務,參數必須可 JSON 序列化,透過延遲匯入避免循環依賴
- 呼叫任務:
delay()簡潔方便、apply_async()完整控制、s()用於工作流組合 - AsyncResult 監控結果:追蹤任務狀態(PENDING → STARTED → SUCCESS/FAILURE),透過
result.get()取得結果 - 啟動 Worker:
celery -A myproject worker --loglevel=info,開發環境可使用CELERY_TASK_ALWAYS_EAGER同步執行
掌握了 Celery 的基礎後,下一篇我們將深入探討 Celery Beat 排程任務、任務重試策略、Chain / Group / Chord 工作流、Flower 監控工具 等進階主題,帶你打造生產級的非同步任務系統。