회사에서 진행했던 일들 정리(2)

Celery에서의 작업 호출에 이어서, Celery에서 워크플로우를 작성하는 방법을 공식문서의Canvas: Designing Work-flows와 개인 경험으로 정리해보았다. 사실 워크플로우라기보다는... 특정 작업/데이터를 를 다른 프로세스에 전달하거나 다른 함수의 인수로 전달하는 경우를 처리하는 방법이라고 보는게 좋을것 같다.

📝 왜 사용했지?

개인적으로는 폴더 작업/대량의 폴더 관련 작업에 주로 사용했다. 예를 들어, 나는 폴더를 업로드하면 그 내부의 하위 폴더 구조를 그대로 유지한 상태로 업로드를 하고 싶었다. 그래서 압축 후 업로드라든가 여러 방법을 고려했는데 결국에는 폴더 구조를 뷰에서 먼저 생성 -> 임시 폴더에 파일들을 일괄 업로드 -> 구조에 맞춰 파일들을 이동 이런식으로 구현을 했었다.
사실 파일은 일괄적으로 동일한 폴더에 저장하고, db에서 논리적 구조만 저장하는 것이 제일 좋은 방법이었겠지만 완전히 플랫폼으로 넘어갈 거라는 보장이 없으니 파일 서버에서 변경이 있어도 db에 오류가 가지 않으면서 파일 서버에서 바로 파일을 찾을 수 있게 해야 했다(이러한 고민은 결국 추후 서비스에 증분 스캔이니 주기적 고아 파일 제거 작업 등등의 기능 추가로 이어졌다...)

✅ Signatures/ The Primitives/ Stamping

공식 문서에서 소개된 순서대로 정의하고, 각각의 함수를 예시를 들어 설명하겠다.

  1. Signatures
    단일 작업 호출의 인자(args), 키워드 인자(kwargs), 실행 옵션을 하나로 묶은 객체. 이를 통해 작업 호출 자체를 함수 인자로 전달하거나, 직렬화하여 네트워크를 통해 전송할 수 있다.

  2. The Primitives
    작업을 조합하여 워크플로우를 만드는 기본 빌딩 블록. 모든 프리미티브는 시그니처(Signature) 객체로, 서로 중첩하거나 조합하여 매우 복잡한 구조를 설계할 수 있다.
    내가 가장 많이 사용한 작업이다.

  3. Stamping
    캔버스가 복잡해지거나 중첩된 그룹, 체인 등이 포함될 때, 특정 작업이 어떤 그룹에 속해 있는지 또는 어떤 계층에 있는지 식별하기 어려울 수 있다. Stamping API는 **방문자 패턴(Visitor pattern)**을 사용하여 캔버스의 각 요소를 순회하며 특정 메타데이터(스탬프)를 마킹하는 메커니즘을 제공한다.

방문자 패턴(Visitor pattern)이란? 객체의 구조를 순회하면서 객체의 각 요소에 대해 특정 작업을 수행할 수 있도록 하는 디자인 패턴.

그럼 이제 각각을 예시와 함께 살펴보자.

1️⃣ Signatures

그래서 시그니처를 언제 쓰느냐? 라는 생각을 할 수 있다. 그냥 apply_async 또는 delay를 사용해서 바로바로 호출을 하면 되는데 왜 굳이 이렇게 따로 객체를 만들어서 전달을 해야 할까?
시그니쳐는 실행을 지연시켜서 여러 작업을 조합(group/chain/chord)해야 하거나, 작업 정의를 객체로 전달해야 할 때 사용된다.
더 쉽게 설명해달라고 하니 제미나이가 단순히 delay()나 apply_async()를 사용하는 것은 **"지금 즉시 이 일을 해라"**라고 명령하는 것이라면, 시그니처를 만드는 것은 **"나중에 실행할 수 있도록 이 작업의 실행 방법(인자, 옵션 등)을 하나의 봉투(객체)에 담아두겠다"**는 의미라고 설명했다.
근데 그냥 여러 작업을 조합하여 복잡한 워크플로우를 구현하기 위해 사용한다고 이해하는게 더 빠를 것 같다. 나는 파일 업로드 후 파일 처리 작업을 병렬로 실행하거나, 파일 처리 작업이 완료된 후에 추가 작업을 순차적으로 실행하는 작업을 구현할 때 시그니처를 사용했다.
밑은 내가 작업하면서 구현한 것으로 디렉토리 업로드에서 파일 업로드 후 파일 처리 작업을 병렬로 실행하는 코드이다.

        if tasks:
            # 모든 파일 업로드 태스크 생성
            all_tasks = []
            for batch in tasks:
                for task_data in batch:
                    task = upload_file.si(**task_data)
                    # → 이전 작업 결과와 무관하게 독립 실행
                    # → 병렬 실행 시 서로 간섭 없음
                    all_tasks.append(task)      

            # 리소스 락 관리용
            completion_lock_info = {
                **current_lock_info,
                'complete_operation': True
            }

            # chord: 모든 업로드 완료 후 콜백 실행
            # → chord가 전달하는 모든 작업의 결과 리스트를 첫 번째 인자(results)로 받음
            chord(all_tasks)(
                process_upload_completion.s(
                    directory_ids=list(directory_ids),
                    current_lock_info=completion_lock_info
                )
            )

이 코드에서 시그니쳐는 upload_file 함수를 호출하는 시그니쳐를 생성하고, 이를 chord에 전달하여 병렬로 실행한다. 또한 process_upload_completion 함수도 시그니쳐로 전달하여 콜백으로 사용한다.

ℹ️ .s() vs .si() 차이

일단 둘 다 시그니처를 생성하는 단축 메서드인데 기능상 차이점이 있어 정리한다.
- .s()(signature): 이전 작업 결과를 인자로 받음
- .si()(signature immutable): 이전 작업 결과를 인자로 받지 않음
밑의 예시로 살펴보자.

```python
    # chain 예시
    chain(task1.s(), task2.s(), task3.s())
    # task1 결과 → task2 첫 번째 인자로 전달
    # task2 결과 → task3 첫 번째 인자로 전달

    chain(task1.s(), task2.si(fixed_arg), task3.s())
    # task1 결과 → task2가 무시 (immutable이라서)
    # task2는 fixed_arg만 사용
```

2️⃣ The Primitives

위에서도 잠깐 설명했지만, 프리미티브는 여러 작업을 실행하도록 하는 주체다. 시그니처 객체를 기반으로 여러 작업을 조합하여 복잡한 워크플로우를 구현할 수 있다. group/ chain/ chord/ map/ starmap/ chunks 으로 구성된다. 각각을 예시와 함께 살펴보자

  • group: 병렬로 적용해야 하는 작업 목록을 인수로 받아 동시에 실행한다. successful(), failed(), revoke() 등 작업 처리 결과를 확인 및 취소할 수 있는 메서드들을 제공한다.
# 시나리오: 디렉토리 내 모든 파일 동기화
sync_tasks = []
for file in files:
    ,sync_tasks.append(sync_single_file.s(str(file.id)))

job = group(sync_tasks)
result = job.apply_async()

  • chain: 작업을 순차적으로 연결하여 이전 결과를 다음 작업으로 전달한다.
# 시나리오: 파일 업로드 → 메타데이터 추출 → 썸네일 생성 → 인덱싱
from celery import chain

workflow = chain(
    upload_file.s(file_data),           # 결과: file_id
    extract_metadata.s(),                # file_id 받아서 처리, 결과: metadata
    generate_thumbnail.s(),              # metadata 받아서 처리
    index_to_search.s()                  # 최종 인덱싱
)
workflow.apply_async()

  • chord: 모든 병렬 작업이 완료된 후 콜백 작업을 실행한다.
#시나리오: 디렉토리 업로드 완료 후 후처리
all_tasks = []
for task_data in batch:
    task = upload_file.si(**task_data)  # immutable: 독립 실행
    all_tasks.append(task)

chord(all_tasks)(
    process_upload_completion.s(        # 모든 업로드 완료 후 실행
        directory_ids=list(directory_ids),
        current_lock_info=completion_lock_info
    )
)

  • map: 하나의 작업을 여러 인자에 대해 반복 실행 (간단한 병렬 처리). group과 유사하지만, 작업 메시지는 하나만 전송 가능하고 작업은 순차적으로 진행된다는 부분에서 차이점이 있다.
# 시나리오: 여러 파일 ID에 대해 동일한 작업 실행
from celery import signature

# 각 파일에 대해 sync_single_file 실행
sync_single_file.map([
    (file_id_1,), 
    (file_id_2,), 
    (file_id_3,)
]).apply_async()

  • starmap: map과 유사하지만 여러 인자를 언패킹하여 전달한다.
# 시나리오: (directory_id, file_name) 쌍으로 파일 처리
from celery import signature

process_file.starmap([
    (dir_id_1, 'file1.jpg'),
    (dir_id_2, 'file2.png'),
    (dir_id_3, 'file3.pdf')
]).apply_async()

  • chunks: 큰 작업 목록을 청크로 나누어 배치 처리한다.
# 시나리오: 10,000개 파일을 100개씩 나누어 처리
file_ids = [str(f.id) for f in File.objects.all()[:10000]]

# 100개씩 청크로 나누어 처리
sync_single_file.chunks(
    [(file_id,) for file_id in file_ids], 
    100  # 청크 크기
).apply_async()

3️⃣ Stamping

이건 나도 활용해보지 않았다.
자료를 찾아보니 복잡하게 얽힌 분산 작업들 사이에서 특정 작업의 출처나 소속을 명확히 파악하기 위해 또는 모니터링 툴이나 로깅 시스템에서 Celery 작업들을 고유 식별자로 추적하기 위해 활용된다고 한다.
그런데 이게 왜 필요할까?
운영 및 유지보수를 위해서다. 예를 들어 chord로 100개의 파일 업로드 작업을 실행했는데 일부가 실패했다고 가정하자. 이때 "이 실패한 작업이 어떤 디렉토리 업로드 요청에서 시작된 건지" 추적하려면 stamp가 유용하다.

# Stamping 예시
from celery import signature

# 작업에 stamp 추가
sig = upload_file.s(file_data).set(
    stamp={
        'request_id': 'upload-batch-20240115-001',
        'user_id': str(user.id),
        'directory_id': str(directory.id)
    }
)

# chord 전체에 stamp 적용
workflow = chord(all_tasks)(callback.s())
workflow.stamp(request_id='batch-001')
workflow.apply_async()
상황 stamp 없이 stamp 사용 시
실패 작업 추적 task_id만으로 원인 파악 어려움 request_id로 연관 작업 일괄 조회
로그 분석 개별 로그 흩어져 있음 동일 stamp로 그룹핑 가능
작업 취소 개별 task_id 필요 stamp 기준 일괄 취소 가능