파이썬에시 비동기로 여러 작업을 동시에 수행할 때 다음의 요구사항이 필요할 때가 있다.
1. 병렬적으로 작업들을 처리한다.
2. 작업한 내역들을 특정한 순서를 가진다.
3. 작업 결과를 순서에 맞게 한 군데 모아서 한번에 반환한다.
위 내용들을 충족하는 내용이 파이썬의 gather다.
여기서 말하는 병렬은 concurrecny고, CPU를 동시에 사용하는 multiprocessing은 아님을 주의해야 한다.
Async Gather
파이썬에서 gather는 다음의 역할을 한다.
1. k개의 태스크를 for loop로 동시에 처리한다.
2. 이때, gather를 사용하면 for loop에서 지정한 순서대로 리스트레 저장한다.
3. 예를 들어서, Task A와 B가 있을 때 A, B 순서로 작업을 실행한다.
4. 결과 리스트에는 순서대로 A의 결과, B의 결과를 [ Result A, Result B ]의 형태로 저장한다.
5. 작업 B가 먼저 끝나더라도, 결과 리스트의 순서는 바뀌면 안된다. 예를 들어서 [ Result B, Result A ] 형태로 저장되지 않는다.
6. 모든 작업을 기다린 다음에 최종 결과인 [ Result A, Result B ] 리스트를 반환한다.
import asyncio
import time
import random
# 비동기 함수(코루틴) 정의
async def process_data(delay, data_name):
print(f"[{data_name}] 작업 시작. {delay}초 대기...")
# I/O Bound 작업을 시뮬레이션 (비동기 대기)
await asyncio.sleep(delay)
result = f"결과: {data_name} 작업 완료 (대기 시간: {delay:.2f}s)"
print(f"[{data_name}] 작업 완료!")
return result
async def main_gather():
start_time = time.monotonic()
# 1. 실행할 코루틴 목록 생성
coro_list = [
process_data(random.uniform(1.0, 3.0), "Task A"),
process_data(random.uniform(1.0, 3.0), "Task B"),
process_data(random.uniform(1.0, 3.0), "Task C"),
process_data(random.uniform(1.0, 3.0), "Task D")
]
# 2. asyncio.gather로 코루틴들을 동시에 실행하고 모든 완료를 기다림
# gather는 모든 작업이 끝날 때까지 기다린 후 결과를 리스트로 반환합니다.
results = await asyncio.gather(*coro_list)
end_time = time.monotonic()
print("\n--- Gather 결과 ---")
for res in results:
print(res)
print(f"\n총 실행 시간: {end_time - start_time:.2f}초")
return results
asnyc_results = await main_gather()
[Task A] 작업 시작. 1.861686459260191초 대기...
[Task B] 작업 시작. 1.2370914600784426초 대기...
[Task C] 작업 시작. 1.7372483715856855초 대기...
[Task D] 작업 시작. 1.9246038128555707초 대기...
[Task B] 작업 완료!
[Task C] 작업 완료!
[Task A] 작업 완료!
[Task D] 작업 완료!
--- Gather 결과 ---
결과: Task A 작업 완료 (대기 시간: 1.86s)
결과: Task B 작업 완료 (대기 시간: 1.24s)
결과: Task C 작업 완료 (대기 시간: 1.74s)
결과: Task D 작업 완료 (대기 시간: 1.92s)
총 실행 시간: 1.93초
위 실행 결과를 보면, Task A는 B 보다 먼저 실행되었고, 결과 리스트의 인덱스 0에 저장되어야 한다.
Task B는 2번째로 실행되었으며, 결과 리스트의 인덱스 1에 저장되어야 한다.
태스크 B가 A 보다 먼저 작업이 끝났지만, 결과 리스트의 1번째에 저장되었음을 확인할 수 있다.
일을 시킨 순서대로 저장되어야 하는 의도를 잘 지켰다.
비동기 Queue에 저장하는 코루틴
이번에는 비동기로 실행하면서도, 순서에 맞게 완성이 되었다면 바로 출력하는 함수를 만들었다.
예를 들어서, Taks 1과 2가 있는데 둘 다 비동기로 돌린다.
Task들의 결과 출력 순서는 반드시 1, 2 순서다.
다음의 2가지 경우에 따라서 출력 함수는 아래처럼 동작한다.
1. Task 1과 2를 동시에 시작한다.
2. Task 종료 순서에 따른 출력 동작
2-1. Task 1이 먼저 끝나는 경우
2-1-1. Task 1이 먼저 끝난다면 순서가 동일하므로 바로 출력한다.
2-1-2. Task 2가 끝나면 바로 출력한다.
2-2. Task 2가 먼저 끝나는 경우
2-2-1. Taks 2가 먼저 끝난다면, 큐에 저장하고 기다린다.
2-2-2. Task 1이 끝나면 이를 출력하고, 저장된 Task 2의 결과를 출력한다.
큐의 종류
파이썬으로 알고리즘 코딩 테스트를 공부해본 사람들은 queue.Queue를 사용했던 기억이 있을것이다.
파이썬의 공식 문서 (링크)를 보면 queue.Queue는 멀티스레드 상황에서의 안정성을 Lock으로 보장하는 동기화된 큐다.
반면에, asyncio.Queue는 공식 문서 (링크)를 보면 스레드 상에서 안전하지는 않지만 비동기적으로 처리할 때 쓰는 큐임을 알 수 있다.
Case 1
import asyncio
import time
import random
# 비동기 함수: 작업을 수행하고 결과를 Queue에 (순서 정보와 함께) 저장
async def data_producer(queue, index, data_name, delay):
print(f"[{data_name}] Producer 시작. (순서: {index}) {delay:.2f}초 대기...")
# 비동기 작업 수행 (I/O 대기)
await asyncio.sleep(delay)
# 결과 데이터 생성
result = f"결과: {data_name} 완료 (대기 시간: {delay:.2f}s)"
# Queue에 순서 인덱스와 결과를 튜플로 저장
await queue.put((index, result))
print(f"[{data_name}] Producer 완료 및 Queue 저장.")
# Producer 함수 자체는 완료 인덱스를 리턴 (main 함수 종료 감지용)
return index
# 비동기 함수: Queue에서 결과를 가져와 순서대로 출력하는 Consumer
async def ordered_result_consumer(queue, num_tasks):
processed_count = 0
next_index_to_process = 0
# 임시 저장소: 완료되었지만 아직 순서가 되지 않은 결과를 보관
# Key: 순서 인덱스, Value: 결과 문자열
pending_results = {}
print("Consumer 시작. [다음 처리 순서: 0]")
while processed_count < num_tasks:
# 1. Queue에서 결과가 들어올 때까지 비동기로 대기 (가장 먼저 끝난 작업의 결과)
index, result = await queue.get()
print(f" [Consumer] Queue에서 결과 도착 (인덱스: {index}).")
# 2. 도착한 결과의 순서가 현재 처리해야 할 순서와 일치하는지 확인
if index == next_index_to_process:
# 순서가 맞으면 바로 처리 (출력)
print(f" ✅ [순서 일치 - {next_index_to_process}] 최종 출력: {result}")
processed_count += 1
next_index_to_process += 1
# 순서가 맞았으므로, 임시 저장소에 보관된 다음 순서의 결과가 있는지 연속적으로 확인
while next_index_to_process in pending_results:
pending_result = pending_results.pop(next_index_to_process)
print(f" ✅ [대기 해제 - {next_index_to_process}] 최종 출력: {pending_result}")
processed_count += 1
next_index_to_process += 1
else:
# 순서가 맞지 않으면 임시 저장소에 보관
print(f" ➡️ [순서 불일치] 인덱스 {index} 결과를 대기 목록에 저장.")
pending_results[index] = result
# Queue에 작업이 완료되었음을 알림
queue.task_done()
print(f"\nConsumer 종료. 최종 처리된 작업 수: {processed_count}")
async def main_ordered_queue():
start_time = time.monotonic()
# 결과 저장을 위한 비동기 Queue 생성
result_queue = asyncio.Queue()
# Producer 작업을 정의 (순서: 0, 1, 2, 3)
data_names = ["Task W", "Task X", "Task Y", "Task Z"]
delays = [random.uniform(2.0, 4.0), random.uniform(1.0, 3.0), random.uniform(0.5, 1.0), random.uniform(1.5, 3.5)]
NUM_TASKS = len(data_names)
# 1. 실행할 Producer 코루틴 목록 생성 (index를 0부터 부여)
producer_tasks = [
data_producer(result_queue, i, name, delays[i])
for i, name in enumerate(data_names)
]
# 2. Consumer 태스크 생성 (백그라운드에서 실행)
consumer_task = asyncio.create_task(ordered_result_consumer(result_queue, NUM_TASKS))
# 3. gather로 Producer들이 모두 완료될 때까지 기다림
await asyncio.gather(*producer_tasks)
# 4. Queue에 남아있는 모든 항목이 처리될 때까지 기다림
await result_queue.join()
# 5. Consumer 태스크 종료
consumer_task.cancel()
end_time = time.monotonic()
print(f"\n총 실행 시간: {end_time - start_time:.2f}초")
Consumer 시작. [다음 처리 순서: 0]
[Task W] Producer 시작. (순서: 0) 3.62초 대기...
[Task X] Producer 시작. (순서: 1) 1.08초 대기...
[Task Y] Producer 시작. (순서: 2) 0.90초 대기...
[Task Z] Producer 시작. (순서: 3) 3.16초 대기...
[Task Y] Producer 완료 및 Queue 저장.
[Consumer] Queue에서 결과 도착 (인덱스: 2).
➡️ [순서 불일치] 인덱스 2 결과를 대기 목록에 저장.
[Task X] Producer 완료 및 Queue 저장.
[Consumer] Queue에서 결과 도착 (인덱스: 1).
➡️ [순서 불일치] 인덱스 1 결과를 대기 목록에 저장.
[Task Z] Producer 완료 및 Queue 저장.
[Consumer] Queue에서 결과 도착 (인덱스: 3).
➡️ [순서 불일치] 인덱스 3 결과를 대기 목록에 저장.
[Task W] Producer 완료 및 Queue 저장.
[Consumer] Queue에서 결과 도착 (인덱스: 0).
✅ [순서 일치 - 0] 최종 출력: 결과: Task W 완료 (대기 시간: 3.62s)
✅ [대기 해제 - 1] 최종 출력: 결과: Task X 완료 (대기 시간: 1.08s)
✅ [대기 해제 - 2] 최종 출력: 결과: Task Y 완료 (대기 시간: 0.90s)
✅ [대기 해제 - 3] 최종 출력: 결과: Task Z 완료 (대기 시간: 3.16s)
Consumer 종료. 최종 처리된 작업 수: 4
총 실행 시간: 3.62초
Task W, X, Y, Z의 순서를 지키면서도 완성하는 순서에 따라서 즉시 출력해야 한다.
이때, W가 가장 오랜 시간이 걸렸으므로, X, Y, Z는 대기상태에 있다가 출력되었다.
Case 2
이번에는 Task W, X, Y, Z의 대기 시간을 변경해보았다.
W가 가장 먼저 끝나서 먼저 나오고, 그 다음 Y가 먼저 끝나고 X를 대기하는 방식으로 변경했다.
# Producer 작업을 정의 (순서: 0, 1, 2, 3)
data_names = ["Task W", "Task X", "Task Y", "Task Z"]
delays = [random.uniform(0.5, 1.0), random.uniform(1.0, 3.0), random.uniform(0.5, 1.0), random.uniform(1.5, 3.5)]
Consumer 시작. [다음 처리 순서: 0]
[Task W] Producer 시작. (순서: 0) 0.71초 대기...
[Task X] Producer 시작. (순서: 1) 2.69초 대기...
[Task Y] Producer 시작. (순서: 2) 0.79초 대기...
[Task Z] Producer 시작. (순서: 3) 2.51초 대기...
[Task W] Producer 완료 및 Queue 저장.
[Consumer] Queue에서 결과 도착 (인덱스: 0).
✅ [순서 일치 - 0] 최종 출력: 결과: Task W 완료 (대기 시간: 0.71s)
[Task Y] Producer 완료 및 Queue 저장.
[Consumer] Queue에서 결과 도착 (인덱스: 2).
➡️ [순서 불일치] 인덱스 2 결과를 대기 목록에 저장.
[Task Z] Producer 완료 및 Queue 저장.
[Consumer] Queue에서 결과 도착 (인덱스: 3).
➡️ [순서 불일치] 인덱스 3 결과를 대기 목록에 저장.
[Task X] Producer 완료 및 Queue 저장.
[Consumer] Queue에서 결과 도착 (인덱스: 1).
✅ [순서 일치 - 1] 최종 출력: 결과: Task X 완료 (대기 시간: 2.69s)
✅ [대기 해제 - 2] 최종 출력: 결과: Task Y 완료 (대기 시간: 0.79s)
✅ [대기 해제 - 3] 최종 출력: 결과: Task Z 완료 (대기 시간: 2.51s)
Consumer 종료. 최종 처리된 작업 수: 4
총 실행 시간: 2.69초
의도한대로 W는 제일 먼저 끝나서 바로 출력되고, Y는 X를 기다리고, Z도 X를 기다리는 방식으로 결과가 잘 나왔음을 알 수 있다.
'개발 > Python' 카테고리의 다른 글
| python에서 json.load와 json.loads의 차이 (0) | 2025.11.04 |
|---|---|
| Python dealing with time (0) | 2025.04.21 |
| Python의 sorted 함수 (0) | 2025.02.19 |
| 파이썬의 여러 가상환경 비교 (0) | 2024.08.03 |
| Anaconda와 가상 환경 관리 (0) | 2024.08.01 |