회사에서 진행했던 일들 정리(2)
저번에 쓴 Celery 개념 정리에 이어서
공식문서의 First steps with Django를 참고하여 Django에 Celery를 적용하는 방법을 적는다.
❓ 어떻게 적용하면 될까?
1. Celery 및 메시지 브로커 설치
Celery와 선택한 메시지 브로커에 필요한 Python 패키지를 먼저 설치한다.
Celery는 작업을 전달받을 메시지 브로커가 반드시 있어야 하기 때문에 Redis나 RabbitMQ를 주로 사용한다.
나는 캐싱 목적으로 Redis를 이미 사용하고 있었어서 Redis를 사용했지만 그게 아니라면 RabbitMQ를 사용해도 문제는 없다.
Redis vs RabbitMQ
| 구분 | Redis | RabbitMQ |
|---|---|---|
| 근본적인 역할 | 인메모리 NoSQL DB, 캐시, Pub/Sub | 전문 메시지 브로커 (MQ) |
| 주요 프로토콜 | Redis Protocol (자체) | AMQP (표준), MQTT, STOMP 등 |
| 속도/처리량 | 매우 빠름 (주로 메모리 기반), 고성능 | Redis보다는 느리지만 안정적, 충분한 성능 제공 |
| 메시지 신뢰성 | 낮음 (기본적으로 전송 보장X, 메시지 손실 가능성) | 높음 (메시지 전송 보장, 디스크 영속성 지원) |
| 복잡한 라우팅 | 제한적 (간단한 Pub/Sub 또는 List(Queue) 방식) | 매우 유연함 (Exchange, Binding을 통한 복잡한 라우팅) |
| 메시지 우선순위 | 지원하지 않음 | 지원함 (높은 우선순위 메시지 먼저 처리 가능) |
| 적합한 사용 사례 | 속도가 최우선, 지속성이 덜 중요한 짧은 작업 (단순 캐싱, 세션 저장) | 신뢰성과 복잡한 라우팅이 중요한 작업 (결제, 이메일 전송, 대규모 트랜잭션) |
# Celery 및 Redis 백엔드용 라이브러리 설치
pip install "celery[redis]"
2. Celery 인스턴스 설정 (프로젝트 루트)
Django 프로젝트의 메인 디렉토리에 celery.py 파일을 생성하고 다음 내용을 추가한다.
이때 메인 디렉토리는 settings.py가 위치한 곳이다. 공식 문서 내용을 보면
- proj/
- manage.py
- proj/
- __init__.py
- settings.py
- urls.py
로 되어 있는데 이때 celery 파일의 경로는 proj/proj/celery.py 여야 한다고 되어 있다.
나는 폴더 구조가 밑과 같아서 config 폴더 내에 위치하도록 했다
- dashboard/
- back/
- app # view,task 파일 위치
- config/ #settings 등 설정 파일들 위치
- __init__.py
- settings.py
- celery.py
- urls.py
celery 파일 등록 후 밑과 같은 내용을 추가한다
import os
from celery import Celery
# 'celery' 프로그램이 Django의 설정을 로드하도록 기본 Django 설정 모듈을 설정.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject') # 프로젝트 이름으로 Celery 앱 인스턴스를 생성.
# Celery가 Django 설정 파일에서 설정을 로드.
# 'CELERY' 네임스페이스를 사용하면 모든 Celery 관련 설정 키에 'CELERY_' 접두사를 추가해야 함
app.config_from_object('django.conf:settings', namespace='CELERY')
# 등록된 모든 Django 앱 설정에서 task 모듈을 자동으로 검색.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
밑의 건 내가 작성했던 코드 예시인데 env 파일을 따로 두었기에 env 파일을 확인하여 환경변수가 설정되어 있는지 아닌지부터 체크하고, 그 다음 개발 환경인지 배포 환경인지 확인하는 부분을 추가했다.
또한 여러 작업 추가 시 우선순위를 위해 큐를 설정하는 부분, 작업 실패 시 재시도하는 부분을 추가했다.
from __future__ import absolute_import, unicode_literals
import os
import django
from celery import Celery
# DJANGO_SETTINGS_MODULE의 환경 변수가 설정되어 있지 않다면 기본값 사용
if not os.environ.get('DJANGO_SETTINGS_MODULE'):
if os.environ.get('DEBUG') == 'True':
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.development')
else:
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.production')
app = Celery('config')
# Django 설정 파일의 CELERY 관련 항목을 사용
#대문자 네임스페이스는 모든 Celery 구성 옵션을 소문자가 아닌 대문자로 지정해야 하며, 로 시작해야 함
app.config_from_object('django.conf:settings', namespace='CELERY')
# 태스크 자동 탐색
app.autodiscover_tasks()
# 작업 큐 설정
app.conf.task_queues = {
'default': {
'exchange': 'default',
'exchange_type': 'direct',
'binding_key': 'default',
}
}
# 작업자(worker) 설정
app.conf.worker_prefetch_multiplier = 4
app.conf.task_acks_late = True
app.conf.worker_concurrency = 4 # 동시 실행 태스크 수를 4로 제한
# 풀 경로 기준 라우팅
# task_routes 설정을 사용하면 작업을 이름으로 라우팅하고 모든 것을 한곳에 중앙에서 관리할 수 있음
app.conf.task_routes = {
# config
"config.celery.debug_task": {"queue": "default"},
}
# 재시도 및 rate limit 설정
app.conf.task_default_retry_delay = 10 * 60 # 10분 후에 재시도
app.conf.task_annotations = {
'app.tasks.*': {'rate_limit': '10/m'}, # 분당 10개의 작업으로 제한
}
# 디버그 태스크
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
3. Celery 앱 로드 (프로젝트 init.py)
Celery 앱이 Django가 시작될 때 항상 로드되도록 메인 프로젝트 디렉토리의 init.py 파일에 다음을 추가한다.
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
4. Celery 설정 추가
Django 프로젝트의 settings.py 파일에 메시지 브로커 URL 및 필요한 Celery 관련 설정을 추가한다.
# Celery 기본 설정
CELERY_TIMEZONE = 'Asia/Seoul'
CELERY_IMPORTS = ('app.tasks',)
CELERY_TASK_TIME_LIMIT = 30 * 60
# Redis를 브로커로 사용할 경우
CELERY_BROKER_URL = 'redis://localhost:6379/0'
# 작업 결과를 저장할 백엔드를 설정 (선택 사항)
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
나는 CELERY_BROKER_URL/CELERY_RESULT_BACKEND를 직접 기입하지 않고 docker-compose.yml 파일에서 정의하는 식으로 진행했다.
5. Task 정의 및 사용 (tasks.py)
이건 전의 내용과 동일하다. 시간이 오래 걸리는 작업 등을 task로 만들면 된다.
일반적으로 Django 앱 내에 tasks.py 파일을 생성하고 @shared_task 데코레이터를 사용한다.
@app.task 와 @shared_task 비교
| 구분 | @app.task | @shared_task |
|---|---|---|
| 인스턴스 의존성 | Celery 앱 인스턴스(app)를 명시적으로 가져와야 함 | Celery 인스턴스 없이 정의 가능, 독립적으로 작동 |
| 사용 환경 | 일반적인 Python 프로젝트에서 사용 | Django 프로젝트나 재사용 가능한 앱에서 사용 적합 |
| Django 사용 시 추천 여부 | Django 환경에서는 비추천 | Django 통합 시 강력히 권장 |
밑과 같이 작성하면 된다
# myapp/tasks.py
from celery import shared_task
@shared_task
def send_email_task(recipient_email, message_body):
# 이메일을 보내는 등 시간이 오래 걸리는 작업을 수행
print(f"Sending email to {recipient_email} with message: {message_body}")
# 실제 이메일 전송 로직...
return True
task 파일 구현 후에는 프로젝트 루트 디렉토리로 이동하여 밑과 같은 명령을 실행한다. (celery_app는 실제 프로젝트 명으로 변경해야 한다.)
celery -A celery_app worker --loglevel=info
실행시킨 후에는 일반적으로 delay() 또는 apply_async() 메서드를 사용하여 정의된 task를 Celery Worker가 처리하도록 큐에 넣어주면 된다.
@delay()와 apply_async() 비교
| 구분 | @delay() | apply_async() |
|---|---|---|
| 사용 목적 | 빠르고 단순한 비동기 호출용 | 고급 설정 및 세밀한 제어가 필요한 경우에 사용 |
| 인수 전달 방식 | 일반 함수 호출처럼 인수를 직접 전달 (task.delay(a, b)) | args=(), kwargs={} 형태로 명시적으로 전달 |
| Celery 옵션 사용 여부 | ❌ 사용 불가 | ✅ 사용 가능 (예: queue, countdown, eta) |
# Django 쉘이나 View 등에서
from myapp.tasks import send_email_task
# 비동기적으로 작업 실행
send_email_task.delay('user@example.com', 'Welcome to our service!')