코드 설명
task_1은 1초마다 생성하고 task_2는 2초마다 생성
5번이상 누적된 task 만 FIFO 순으로 종료
출력값은 1. 누적된 task의 수, 2. 실행되고 있는 task 함수명, 3.종료된 task 함수명
코드는 0.5초 간격 루프
하단의 코드 실행결과
누적된 TASK 개수: 1
실행되고 있는 함수 명: ['task_1_a']
누적된 TASK 개수: 2
실행되고 있는 함수 명: ['task_1_a', 'task_1_a']
누적된 TASK 개수: 3
실행되고 있는 함수 명: ['task_2_b', 'task_1_a', 'task_1_a']
누적된 TASK 개수: 4
실행되고 있는 함수 명: ['task_2_b', 'task_1_a', 'task_1_a', 'task_1_a']
누적된 TASK 개수: 5
실행되고 있는 함수 명: ['task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b']
누적된 TASK 개수: 6
실행되고 있는 함수 명: ['task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b']
누적된 TASK 개수: 7
실행되고 있는 함수 명: ['task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b']
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 8
실행되고 있는 함수 명: ['task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b']
종료된 TASK : task_1_a
누적된 TASK 개수: 8
실행되고 있는 함수 명: ['task_1_a', 'task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b']
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 9
실행되고 있는 함수 명: ['task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 9
실행되고 있는 함수 명: ['task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b']
종료된 TASK : task_1_a
누적된 TASK 개수: 9
실행되고 있는 함수 명: ['task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b']
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 10
실행되고 있는 함수 명: ['task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 10
실행되고 있는 함수 명: ['task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b']
종료된 TASK : task_1_a
누적된 TASK 개수: 10
실행되고 있는 함수 명: ['task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b']
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_2_b
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b', 'task_2_b']
종료된 TASK : task_2_b
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_2_b
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a']
종료된 TASK : task_2_b
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_2_b', 'task_1_a', 'task_2_b', 'task_1_a']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_2_b', 'task_2_b', 'task_1_a', 'task_1_a']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_2_b
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_2_b', 'task_2_b', 'task_2_b', 'task_1_a', 'task_1_a']
종료된 TASK : task_2_b
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_2_b', 'task_2_b', 'task_1_a', 'task_1_a']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b', 'task_2_b', 'task_1_a', 'task_1_a']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_2_b
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b', 'task_2_b', 'task_1_a', 'task_1_a']
종료된 TASK : task_2_b
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_1_a']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_2_b
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_2_b']
main ---> | TaskManager ---> | TaskCreationManager |
TaskTerminationManager |
Task 생성클래스와 Task 누적방지클래스
두개를 관리하는 Task매니저 클래스로 나눈 코드
main.py
import asyncio
from task_manager import TaskManager
async def main():
manager = TaskManager()
asyncio.create_task(manager.main_loop())
await asyncio.sleep(15) # 20초간 루프
# await manager.task_termination_manager.terminate_accumulated_tasks()
asyncio.run(main())
task_manager.py
import asyncio
from task_creator import TaskCreationManager
from task_terminator import TaskTerminationManager
class TaskManager:
def __init__(self):
self.task_creation_manager = TaskCreationManager()
self.task_termination_manager = TaskTerminationManager(self.task_creation_manager)
async def main_loop(self):
while True:
await asyncio.gather(
self.task_creation_manager.create_task(),
self.task_termination_manager.remove_completed_tasks_from_queue(),
self.task_termination_manager.terminate_accumulated_tasks(),
asyncio.sleep(0), # 루프 순서 변경
# gather 안에 있는 각각의 async 함수에 "await asyncio.sleep(0)"을
# 배치한것 과 동일한 효과가 있다.
)
print(f"누적된 TASK 개수: {len(self.task_creation_manager.active_tasks)}")
print(f"실행되고 있는 함수 명: {[task.get_coro().__name__ for task in self.task_creation_manager.active_tasks]}")
await asyncio.sleep(0.5) # 메인함수 루프 속도
task_creator.py
import asyncio
from collections import defaultdict
class TaskCreationManager:
"""
active_tasks : SET타입 : create한 Task객체를 담는 집합
task_queues : 딕셔너리타입 : { async_함수주소 : asyncio.Queue() } <- async_큐 안에 async_함수의 Task 객체
task_queues 구조 이유 : task_queues[async_함수주소] Queue size 가 5일 경우 캔슬
따라서 async_함수 당, async_큐를 하나씩 할당.
"""
def __init__(self):
self.task_count = 0
self.task_queues = defaultdict(asyncio.Queue)
self.active_tasks = set()
async def task_1_a(self):
while True:
await asyncio.sleep(1)
async def task_2_b(self):
while True:
await asyncio.sleep(2)
async def create_task(self):
self.task_count += 1
task_name = f"Task-{self.task_count}"
task_func = self.task_1_a if self.task_count % 3 != 0 else self.task_2_b
task = asyncio.create_task(task_func())
# { async_함수 : async_큐 } <- async_큐 안에 async_함수의 Task 객체를 PUT
self.task_queues[task_func].put_nowait(task)
""" 그냥 dictionary 인 경우
self.task_queues[task_func]=asyncio.Queue()
self.task_queues[task_func].put_nowait(task)
"""
self.active_tasks.add(task)
await asyncio.sleep(0) # 루프 순서 변경
return task_name
task_terminator.py
ACCUMULATED_TASK_LIMIT = 5
class TaskTerminationManager:
"""
active_tasks : SET타입 : create한 Task객체를 담는 집합
task_queues : 딕셔너리타입 : { async_함수주소 : asyncio.Queue() } <- async_큐 안에 async_함수의 Task 객체
task_queues 구조 이유 : task_queues[async_함수주소] Queue size 가 5일 경우 캔슬
따라서 async_함수 당, async_큐를 하나씩 할당.
"""
def __init__(self, task_creation_manager):
self.task_creation_manager = task_creation_manager
async def remove_completed_tasks_from_queue(self):
"""
종료된 Task를 active_tasks(집합)에서 삭제
종료된 Task : 1. 실제로 루틴이 종료
2. Task cancel 했을 시 종료
"""
completed_tasks: list = [task for task in self.task_creation_manager.active_tasks if task.done()]
for task in completed_tasks:
task_func_name = await self.get_task_func_name(task)
await self.task_completed(task_func_name)
self.task_creation_manager.active_tasks.remove(task)
# task_done 을 get 함수가 들어간 함수에서 자동으로 처리해줌. 따라서 task_done 에 에러가 발생함.
# self.task_creation_manager.task_queues[task_func].task_done()
async def terminate_accumulated_tasks(self):
"""
task_queues[async_함수주소] = 어싱크큐
같은 주소의 async_함수의 어싱크큐가 5개 되면 old task 캔슬
"""
for task_func, task_queue in self.task_creation_manager.task_queues.items():
if task_queue.qsize() > ACCUMULATED_TASK_LIMIT:
tasks_to_cancel = [await task_queue.get() for _ in range(task_queue.qsize() - ACCUMULATED_TASK_LIMIT)]
task_queue.task_done()
for task in tasks_to_cancel:
task.cancel()
print(f"누적이 5번 이상인 TASK : {await self.get_task_func_name(task)}")
async def task_completed(self, task_func_name):
print(f"종료된 TASK : {task_func_name}")
async def get_task_func_name(self, param_task):
task_func = param_task.get_coro()
task_func_name = task_func.__name__
return task_func_name
한클래스로 담으면
import asyncio
from collections import defaultdict
class TaskManager:
def __init__(self):
self.task_count = 0
self.task_queues = defaultdict(asyncio.Queue)
self.active_tasks = set()
async def task_1_a(self):
while True:
await asyncio.sleep(1)
async def task_2_b(self):
while True:
await asyncio.sleep(1)
async def create_task(self):
self.task_count += 1
task_name = f"Task-{self.task_count}"
task_func = self.task_1_a if self.task_count % 3 != 0 else self.task_2_b
task = asyncio.create_task(task_func())
self.task_queues[task_func].put_nowait(task)
self.active_tasks.add(task)
await asyncio.sleep(0) # 루프 순서 변경
return task_name
async def remove_completed_tasks(self):
completed_tasks = [task for task in self.active_tasks if task.done()]
for task in completed_tasks:
task_func = task.get_coro()
task_func_name = task_func.__name__
await self.task_completed(task_func_name)
self.active_tasks.remove(task)
try:
self.task_queues[task_func].task_done()
except ValueError:
print(f"누적된 TASK 가 5개 이상입니다. {task_func_name}")
async def task_completed(self, task_func_name):
print(f"종료된 TASK : {task_func_name}")
async def terminate_accumulated_tasks(self):
for task_func, task_queue in self.task_queues.items():
if task_queue.qsize() > 5:
tasks_to_cancel = [await task_queue.get() for _ in range(task_queue.qsize() - 5)]
for task in tasks_to_cancel:
task.cancel()
async def main_loop(self):
while True:
await asyncio.gather(
self.create_task(),
self.remove_completed_tasks(),
self.terminate_accumulated_tasks(),
asyncio.sleep(0), # 루프 순서 변경
)
print(f"누적된 TASK 개수 : {len(self.active_tasks)}")
print(f"실행되고 있는 함수 명: {[task.get_coro().__name__ for task in self.active_tasks]}")
await asyncio.sleep(0.5) # 메인함수 루프 속도
async def main():
manager = TaskManager()
asyncio.create_task(manager.main_loop())
await asyncio.sleep(20) # 20초간 돔
await manager.terminate_accumulated_tasks()
asyncio.run(main())
'언어 정리 > python_비동기관련_lib' 카테고리의 다른 글
target 시간에 비동기 콜백 호출 (0) | 2023.12.11 |
---|---|
비동기인풋, toolbar 터미널출력 << prompt-toolkit, curses (0) | 2022.12.22 |
코루틴으로 ROS2 작업 (0) | 2022.11.28 |
asyncio로 ros2_spin 구동예제 ( handler-agent-manager ) (0) | 2022.09.17 |
코루틴 과 Eventloop 그리고 Future (1) | 2022.09.14 |
댓글