회사에서 진행했던 일들 정리(2)

저번에 쓴 Celery를 Django에 적용하기에 이어 Celery에서 작업 호출 방법을 공식문서의Calling Tasks와 개인 경험으로 정리해보았다. 원래는 전 편에서 간단히 소개한 만큼 바로 워크플로우 작성법을 정리하려고 했는데.... 호출 방법부터 정리하고 시작하는 게 더 좋을 것 같아서 호출부분을 세세하게 작성해보았다.
Celery-beat 사용(+Periodic Tasks)까지는 정리해볼 것 같다.
Celery 5.6 기반으로 설명할 생각이라 버전에 맞춰서 참고.

📝 왜 사용했지?

일단 Celery를 사용한 이유는 사용자 편의성을 위해서였다.
회사에서 기본 200kb에서 10mb 정도의 이미지 파일을 100개에서 1000개씩 처리해야 하는 경우가 많다보니 당연히 시간이 많이 걸렸고, 사용자 편의성을 생각해보면 무조건 비동기 처리를 해야 했어서....
원래는 Celery를 안 쓰고 해결 가능하지 않을까...? 라고 생각했지만 결론은 업로드/임시삭제/영구삭제/변경/복사/이동 등등 대부분의 작업을 Celery로 처리해야 했다.케이스 가정해보니까 100000개 이상의 이미지 파일이 왔다갔다 하는 경우도 많은데 어쩔 수 없겠더라고.
나중에 비동기 처리에 큐를 추가하고, 청크로 데이터 분할 후 배치 처리하는 것까지 추가해야 했는데 실제로 테스트 돌릴 때 10mb 이상 데이터를 100개 이상 한번에 업로드한다든가 대용량 파일 업로드와 대량 파일 업로드가 충돌날 경우 등 이런 상황에서 오류가 나서....

✅ apply_async VS delay

공식 문서에서는 호출 방법을 크게 3가지로 소개한다.

  • apply_async(args[, kwargs[, …]])

  • delay(*args, **kwargs)

  • calling (call)

근데 사실 마지막 건 거의 사용을 안한 것 같다. 기껏해야 테스트 코드 작성해야 할때?
그래서 가장 많이 사용한 건 apply_asyncdelay인데 둘이 기능 자체는 비슷비슷 하다 보니 처음에 둘의 차이점을 몰라 "...????" 이 상태로 있었던 것 같다.
간단히 둘의 차이점을 정리해보면 밑과 같다.

  • apply_async(args[, kwargs[, ...]]) : 비동기 실행 시 상세 제어가 필요할 때 사용. 실행 타이밍(countdown), 작업 만료 시간, 특정 큐(Queue) 지정, 우선순위(Priority) 등을 설정해 브로커에 전달한다. 작업 간 간섭을 방지하거나 리소스 분배가 필요한 복잡한 시스템에서 주로 활용한다.

  • delay(args, kwargs) : apply_async의 단축형(Shortcut). 별도의 옵션 없이 인자값만 넘겨 즉시 비동기로 실행한다. 단순하고 반복적인 작업을 빠르게 던질 때 유리하다.

단순 실행보다 작업의 우선순위 제어나 전용 큐를 통한 워커 분리가 중요한 환경에서는(폴더 구조대로 폴더 업로드 등) delay보다 apply_async를 사용하는 것이 확장성과 안정성 면에서 유리하다.

1️⃣ apply_async

사실 둘 다 사용법이 비슷한데, 이전에 작업한 코드를 수정해서 예시로 보면서 설명하겠다.
일단 호출 방법은 둘다 비슷하다. task를 view 혹은 다른 task에서 호출하여 사용하면 된다. apply_async부터 설명해보겠다. 예시 코드를 보자.

    from ..tasks import test

    @action(detail=True, methods=['post'])
    def test(self, request, pk=None):
        # 삭제된 상태이거나, 객체가 없을 경우 404 에러를 반환하고 아닐 경우 객체를 가져온다.
        file = get_object_or_404(File.objects.filter(is_deleted=True), pk=pk)

        # uploader가 없는 경우 현재 요청 사용자 ID 사용
        user_id = file.uploader.id if file.uploader else request.user.id

        #test task 호출
        test.apply_async(
            kwargs={
                'file_id': file.id, # 파일id
                'user_id': user_id, # 사용자id
            },
            queue='test'# test 큐에 전달
        )
        serializer = FileSerializer(file)
        return Response(serializer.data)

이렇게 하면 파일 업로드 후 바로 test task가 실행된다. 이때 apply_async만 살펴보면 밑과 같다.

    #test task 호출
    test.apply_async(
        #kwargs를 통해 인자 전달.
        kwargs={
            'file_id': file.id, # 파일id
            'user_id': user_id, # 사용자id
        },
        #queue를 통해 특정 큐에 전달.
        queue='test'
    )

여기서는 kwargs를 통해 인자를 전달하고 queue를 통해 특정 큐에 전달한다. 여기서는 queue 옵션만 사용했는데, 이 외의 다른 옵션은 밑과 같다.

apply_async 옵션
1.시간 제어 옵션 (Scheduling): 작업을 즉시 실행하지 않고 특정 시점에 실행하도록 제어.

countdown: 현재 시점부터 몇 초 후에 실행할지 설정.
(ex)add.apply_async(args=[1, 2], countdown=60) # 1분 뒤 실행

eta (Estimated Time of Arrival): 특정 시각(datetime)에 실행되도록 예약.
(ex)add.apply_async(args=[1, 2], eta=now + timedelta(hours=1)) # 1시간 뒤 정각 실행

expires: 작업의 유효 기간을 설정. 설정된 시간이 지나도록 워커가 작업을 시작하지 못했다면 해당 작업은 취소(Revoke)됩니다.
(ex)add.apply_async(args=[1, 2], expires=now + timedelta(hours=1)) # 1시간 후에만 실행


2.라우팅 및 우선순위 (Routing & Priority): 작업이 전달될 경로와 중요도를 결정.

queue: 작업을 보낼 전용 큐의 이름을 지정.
(ex)upload_task.apply_async(args=[file], queue='heavy_tasks')

priority: 큐 내에서의 우선순위를 지정. (일반적으로 0~9 사이의 정수 사용)
(ex)upload_task.apply_async(args=[file], priority=5)
+ 숫자가 높을수록 먼저 처리
+ 사용하는 브로커(Redis, RabbitMQ)의 설정에 따라 동작 방식이 다를 수 있음

routing_key: AMQP(RabbitMQ 등) 환경에서 메시지를 더 세밀하게 라우팅할 때 사용
(ex)upload_task.apply_async(args=[file], routing_key='heavy_tasks')


3.재시도 및 오류 관리 (Retries & Reliability): 네트워크 불안정이나 외부 API 장애에 대비하는 옵션.

retry: 작업 실패 시 자동 재시도 여부를 설정
(ex)upload_task.apply_async(args=[file], retry=True)
+ True/False

retry_policy: 재시도 횟수, 지연 시간 등을 딕셔너리 형태로 상세 설정
(ex)upload_task.apply_async(args=[file], retry_policy={
        'max_retries': 3,         # 최대 3번 시도
        'interval_start': 0,      # 첫 재시도 대기 시간
        'interval_step': 0.2,     # 재시도마다 늘어날 시간
        'interval_max': 0.5,      # 최대 대기 시간
    })


4. 작업 연결 및 콜백 (Workflow/Chaining): 작업 완료 후의 후속 조치를 설정.

link (Success Callback): 작업이 성공했을 때 실행할 다음 작업을 지정
(ex)upload_task.apply_async(args=[file], link=process_file.s())
+ 아마 워크플로우 설명할때 가장 많이 나올 것 같다. 

link_error (Error Callback): 작업이 실패했을 때 실행할 오류 처리 작업을 지정
(ex)upload_task.apply_async(args=[file], link_error=send_alert.s())

2️⃣ delay 사용법

이번에는 delay를 사용해보겠다.
delay는 apply_async와 비슷하지만, 세부적인 옵션을 다룰 수 없다.
공식문서에서는 밑과 같이 나와 있다.

    task.delay(arg1, arg2, kwarg1='x', kwarg2='y')

나는 밑과 같이 활용했다.

    def post(self, request):
        """일괄 작업 요청"""
        try:
            file_ids = request.data.get('file_ids', [])
            
            if not file_ids:
                return Response(
                    {"error": "file_ids가 필요합니다"},
                    status=status.HTTP_400_BAD_REQUEST
                )
            
            # 유효한 파일만 필터링 -- 이미지인 파일만 필터링
            valid_files = File.objects.filter(
                id__in=file_ids,
                file_type='img'
            ).values_list('id', flat=True)
            
            valid_file_ids = [str(fid) for fid in valid_files]
            
            if not valid_file_ids:
                return Response(
                    {"error": "유효한 파일이 없습니다"},
                    status=status.HTTP_400_BAD_REQUEST
                )
            
            # 비동기 작업 실행
            from app.tasks.maintenance.test_tasks import generate_test_batch
            task = generate_test_batch.delay(valid_file_ids)
            
            return Response({
                'task_id': task.id,
                'message': 'test가 시작되었습니다.',
                'file_count': len(valid_file_ids)
            }, status=status.HTTP_202_ACCEPTED)
            
        except Exception as e:
            logger.error(f"test 오류: {str(e)}")
            return Response(
                {"error": "서버 오류가 발생했습니다"},
                status=status.HTTP_500_INTERNAL_SERVER_ERROR
            )

일괄적으로 썸네일 이미지 생성/이름 변경 등의 작업을 처리할 때 사용한 방법이다.
apply_async와 달리 설정보다 실행이 우선인 작업을 처리할 때 사용했다.