부트캠프에서 진행했던 프로젝트에 Celery를 활용하여 비동기 병렬처리를 한 경험을 포스팅하려고 한다.
문제 인식
프로젝트에서 AI가 3개의 설계문서를 생성해주는 기능 중
동기적으로 작동하여 10회 호출 기준 3개의 정보를 생성하는 데 약 62초 정도 소요되었다.
동시 다발적인 요청이 들어오는 상황에서 응답 지연 및 시스템 부하가 발생할 우려가 있었고, 이를 개선하기 위해 비동기 처리 방식으로 전환할 필요성을 느꼈다.
문제점 정리
- 높은 응답 지연 시간: 요청마다 순차적으로 처리되어 전체 소요 시간이 늘어남
- 자원 낭비: 동기 처리로 인해 시스템 자원을 비효율적으로 사용
- 사용자 경험 저하: 기능 실행 시간이 길어짐에 따라 사용자 만족도가 하락
Celery를 사용해서 비동기 병렬처리로 개선하자.
Celery란 무엇일까?
Celery는 Python 기반의 비동기 작업 처리 프레임워크이다. 쉽게 말해, 무거운 작업을 백그라운드에서 따로 처리하도록 도와주는 도구이다.
분산 작업 큐(Distributed Task Queue) 시스템이며
웹 서버에서 시간이 오래 걸리는 작업을 셀러리 워커가 백그라운드에서 비동기 또는 병렬로 실행할 수 있게 해준다.
Celery는 단독으로 동작하지 않고 작업을 중계할 브로커가 필요하다.
브로커가 필요한 이유
여기서 브로커란
Celery에서 작업 요청(Task Message)을 워커에게 전달하는 중개자 역할을 한다.
작업을 시킨 사람(웹 서버)과 작업을 실행할 사람(Celery 워커) 사이에서 전달을 맡는 메시지 큐 시스템이다.
만약 브로커가 없다면?
- 워커가 없으면 요청 실패
- 요청 도중 서버 다운되면 작업 유실
- 동시에 많은 요청 처리 불가
- 비동기 구조 자체가 무너짐
메시지 브로커에는 다양한 종류가 있는데 안정성과 확장성이 높은 RabbitMQ를 사용하였다.
이제 흐름을 한번 보도록 하자

1. 사용자가 웹사이트나 앱에서 특정 기능을 실행한다.
2. Django는 이 요청을 즉시 처리하지 않고 비동기로 처리한다.
.delay()를 호출하여 Celery Task 메시지를 생성한다.
.delay()는 실제로 내부적으로 RabbitMQ에 메시지를 전송한다.
generate_project_structure.delay(...)
push_to_github.delay(...)
3. RabbitMQ는 중간 브로커로, 메시지를 큐에 안전하게 저장한다.
Celery 워커는 RabbitMQ 큐에 상시 연결되어 있음
큐에 새로운 작업이 생기면, 워커가 자동으로 꺼내서 처리
워커 수가 여러 개라면 병렬로 여러 작업을 동시에 처리 가능
4. 워커가 작업을 실행한다. 여러 개의 워커들은 작업을 병렬로 처리한다.
task_chord = chord(
[
create_diagram.s(document.result),
create_erd.s(document.result),
create_api.s(document.result),
]
)(collect_results.s())
5. 워커는 작업 결과를 DB에 저장하거나, 다른 기능을 통해 사용자에게 결과 알림 가능
이제 구현해보자
1. 메시지 브로커(RabbitMQ) 설정
- RabbitMQ를 설치 및 설정 후, Celery와 연동하여 비동기 작업 큐를 구성했다.
- 메시지 큐를 통해 작업 요청을 비동기적으로 분배하고, 처리 결과를 받아올 수 있도록 설계했다.
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
app = Celery("config")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f"Request: {self.request}")
2. Celery 작업 정의 및 구성
- Celery Worker를 구현하여 비동기로 처리해야 할 작업들을 정의했다.
- 기존 동기 처리 로직을 Celery 태스크로 전환하여 메시지 큐에서 작업을 받아 처리하도록 변경했다.
아래는 기존의 동기 코드이다.
@api_view(["POST"])
@csrf_exempt
def create_document(request):
serializer = CreateDocumentSerializer(data=request.data)
if serializer.is_valid():
try:
title = serializer.validated_data.get("title")
content = serializer.validated_data.get("content")
requirements = serializer.validated_data.get("requirements", "No requirements provided")
prompt = f"""
Title: {title}
Content: {content}
requirements: {requirements}
위 내용을 가지고 쳬계적인 문서화를 만들어주세요.
방식은
Title: {title}
Content: {content}
requirements: {requirements} 형식으로 체계적으로 다시 문서화 해주세요.
"""
messages = [{"role": "user", "content": prompt}]
response = client.chat.completions.create(
model = "deepseek-chat",
messages = messages)
result = response.choices[0].message.content
document = Document.objects.create(
title = title,
content = content,
requirements = requirements,
result = result)
return JsonResponse({
"status": "success",
"data": {
"id": document.id,
"response": result
}},
status = status.HTTP_200_OK)
except Exception as e:
return JsonResponse({
"status": "error",
"message": str(e)},
status = status.HTTP_500_INTERNAL_SERVER_ERROR)
else:
return JsonResponse({
"status": "error",
"errors": serializer.errors},
status = status.HTTP_400_BAD_REQUEST)
다음은 Celery Worker를 사용한 비동기 방식 코드이다.
def call_openai_api(prompt):
api_url = "https://api.openai.com/v1/chat/completions"
api_key = settings.OPENAI_API_KEY
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
payload = {
"model": "gpt-4o", # 정확한 모델 이름으로 변경
"messages": [
{"role": "system", "content": "당신은 전문적인 기술 문서를 작성하는 전문가입니다."},
{"role": "user", "content": prompt}
],
"stream": False
}
response = requests.post(api_url, json=payload, headers=headers)
if response.status_code == 200:
result = response.json()
return result["choices"][0]["message"]["content"]
else:
error_msg = response.json().get("error", "Unknown error occurred.")
raise Exception(f"DeepSeek API 호출 실패: {error_msg}")
@shared_task
def create_diagram(data):
"""
주어진 데이터를 기반으로 Mermaid 형식의 시퀀스 다이어그램을 생성합니다.
"""
channel = "task_updates"
redis_client.publish(channel, "create_diagram 작업 시작")
prompt = f"""
{data} ...
"""
try:
# call_openai_api를 사용하여 다이어그램 코드 생성
diagram_code = call_openai_api(prompt)
redis_client.publish(channel, "create_diagram 작업 완료")
return diagram_code
except Exception as e:
redis_client.publish(channel, f"create_diagram 작업 실패: {e}")
raise Exception(f"Error generating sequence diagram: {e}")
@shared_task
def create_erd(data):
channel = "task_updates"
redis_client.publish(channel, "create_erd 작업 시작")
prompt = f"""
{data} ...
"""
try:
# call_openai_api 함수 호출하여 Mermaid 코드 생성
diagram_code = call_openai_api(prompt)
# 작업 완료 알림
redis_client.publish(channel, "create_erd 작업 완료")
return diagram_code
except Exception as e:
# 작업 실패 알림
redis_client.publish(channel, f"create_erd 작업 실패: {e}")
raise
@shared_task
def create_api(data):
channel = "task_updates"
redis_client.publish(channel, "create_api 작업 시작")
prompt = f"""
{data} ...
"""
try:
# call_openai_api 함수 호출하여 Swagger JSON 코드 생성
swagger_json_code = call_openai_api(prompt)
# 작업 완료 알림
redis_client.publish(channel, "create_api 작업 완료")
return swagger_json_code
except Exception as e:
# 작업 실패 알림
redis_client.publish(channel, f"create_api 작업 실패: {e}")
raise
@shared_task
def collect_results(results):
return {
"diagram": results[0],
"erd": results[1],
"api": results[2],
}
비동기 처리 흐름 확인

이미지와 같이, 기존 동기 처리 시의 흐름과 성능 지표를 먼저 기록한 후, (10회 기준 약 62초)

비동기 처리 후 개선된 성능과 시스템 상태를 확인했다. (10회 기준 약 34초)
성능 개선 결과
비동기 처리 적용 후, 10회 호출 기준 동기처리 평균 실행 시간은 34초로, 동기 방식의 약 45% 단축되었다.
이를 통해 동시 요청 처리 능력과 시스템 자원 활용 효율성이 크게 향상된 것을 확인할 수 있었다.
마무리 및 향후 계획
이번 비동기 처리 리팩토링을 통해 시스템 확장성 향상, 빠른 응답 시간 보장, 그리고 사용자 경험 개선이라는 목표를 달성할 수 있었다.
앞으로도 지속적인 모니터링과 성능 최적화를 통해, 보다 많은 요청과 복잡한 작업 처리에 유연하게 대응할 계획이다.
'백엔드' 카테고리의 다른 글
| [모의 투자 사이트 개발기] 실시간 주식 시세 중계 시스템 구축하기 (0) | 2025.11.14 |
|---|---|
| [모의 투자 사이트 개발기] KIS 오픈API사용법, 토큰관리, API 호출 (1) | 2025.11.08 |
| 프로젝트에 Rag 도입하기 (0) | 2025.09.29 |
| [동시성 제어] 좌석 예매 동시성 문제 해결하기 (3) | 2025.08.27 |
| [RabbitMQ + Celery] 메세지큐와 워커를 활용한 비동기처리 (8) | 2025.08.07 |