본문 바로가기
언어 정리/python_비동기관련_lib

asyncio : 동일한 Task 5번이상 누적 방어 코드

by 알 수 없는 사용자 2023. 6. 10.

코드 설명

task_1은 1초마다 생성하고 task_2는 2초마다 생성
5번이상 누적된 task 만 FIFO 순으로 종료
출력값은 1. 누적된 task의 수,  2. 실행되고 있는 task 함수명, 3.종료된 task 함수명
코드는 0.5초 간격 루프

 


하단의 코드 실행결과

누적된 TASK 개수: 1
실행되고 있는 함수 명: ['task_1_a']
누적된 TASK 개수: 2
실행되고 있는 함수 명: ['task_1_a', 'task_1_a']
누적된 TASK 개수: 3
실행되고 있는 함수 명: ['task_2_b', 'task_1_a', 'task_1_a']
누적된 TASK 개수: 4
실행되고 있는 함수 명: ['task_2_b', 'task_1_a', 'task_1_a', 'task_1_a']
누적된 TASK 개수: 5
실행되고 있는 함수 명: ['task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b']
누적된 TASK 개수: 6
실행되고 있는 함수 명: ['task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b']
누적된 TASK 개수: 7
실행되고 있는 함수 명: ['task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b']
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 8
실행되고 있는 함수 명: ['task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b']
종료된 TASK : task_1_a
누적된 TASK 개수: 8
실행되고 있는 함수 명: ['task_1_a', 'task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b']
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 9
실행되고 있는 함수 명: ['task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 9
실행되고 있는 함수 명: ['task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b']
종료된 TASK : task_1_a
누적된 TASK 개수: 9
실행되고 있는 함수 명: ['task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b']
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 10
실행되고 있는 함수 명: ['task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 10
실행되고 있는 함수 명: ['task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b']
종료된 TASK : task_1_a
누적된 TASK 개수: 10
실행되고 있는 함수 명: ['task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b']
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_2_b
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b', 'task_2_b']
종료된 TASK : task_2_b
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_2_b
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a']
종료된 TASK : task_2_b
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_2_b', 'task_1_a', 'task_2_b', 'task_1_a']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_2_b', 'task_2_b', 'task_1_a', 'task_1_a']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_2_b
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_2_b', 'task_2_b', 'task_2_b', 'task_1_a', 'task_1_a']
종료된 TASK : task_2_b
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_2_b', 'task_2_b', 'task_1_a', 'task_1_a']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b', 'task_2_b', 'task_1_a', 'task_1_a']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_2_b
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b', 'task_2_b', 'task_1_a', 'task_1_a']
종료된 TASK : task_2_b
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_1_a']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_1_a
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a']
종료된 TASK : task_1_a
누적이 5번 이상인 TASK : task_2_b
누적된 TASK 개수: 11
실행되고 있는 함수 명: ['task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_2_b', 'task_1_a', 'task_1_a', 'task_2_b', 'task_2_b', 'task_1_a', 'task_2_b']

main   ---> TaskManager   ---> TaskCreationManager
    TaskTerminationManager

Task 생성클래스와 Task 누적방지클래스

두개를 관리하는 Task매니저 클래스로 나눈 코드

 

main.py

import asyncio

from task_manager import TaskManager

async def main():
    manager = TaskManager()
    asyncio.create_task(manager.main_loop())
    await asyncio.sleep(15)  # 20초간 루프
    # await manager.task_termination_manager.terminate_accumulated_tasks()


asyncio.run(main())

 

task_manager.py

import asyncio

from task_creator import TaskCreationManager
from task_terminator import TaskTerminationManager

class TaskManager:
    def __init__(self):
        self.task_creation_manager = TaskCreationManager()
        self.task_termination_manager = TaskTerminationManager(self.task_creation_manager)

    async def main_loop(self):
        while True:
            await asyncio.gather(
                self.task_creation_manager.create_task(),
                self.task_termination_manager.remove_completed_tasks_from_queue(),
                self.task_termination_manager.terminate_accumulated_tasks(),
                asyncio.sleep(0),  # 루프 순서 변경
                                    # gather 안에 있는 각각의 async 함수에 "await asyncio.sleep(0)"을
                                    # 배치한것 과 동일한 효과가 있다.
            )
            print(f"누적된 TASK 개수: {len(self.task_creation_manager.active_tasks)}")
            print(f"실행되고 있는 함수 명: {[task.get_coro().__name__ for task in self.task_creation_manager.active_tasks]}")
            await asyncio.sleep(0.5)  # 메인함수 루프 속도

 

task_creator.py

import asyncio
from collections import defaultdict

class TaskCreationManager:
    """
    active_tasks : SET타입    : create한 Task객체를 담는 집합
    task_queues : 딕셔너리타입  : { async_함수주소 : asyncio.Queue() } <- async_큐 안에 async_함수의 Task 객체
        task_queues 구조 이유 : task_queues[async_함수주소] Queue size 가 5일 경우 캔슬
        따라서 async_함수 당, async_큐를 하나씩 할당.
    """
    def __init__(self):
        self.task_count = 0
        self.task_queues = defaultdict(asyncio.Queue)
        self.active_tasks = set()

    async def task_1_a(self):
        while True:
            await asyncio.sleep(1)

    async def task_2_b(self):
        while True:
            await asyncio.sleep(2)

    async def create_task(self):
        self.task_count += 1
        task_name = f"Task-{self.task_count}"
        task_func = self.task_1_a if self.task_count % 3 != 0 else self.task_2_b
        task = asyncio.create_task(task_func())

        # { async_함수 : async_큐 } <- async_큐 안에 async_함수의 Task 객체를 PUT
        self.task_queues[task_func].put_nowait(task)
        """ 그냥 dictionary 인 경우
        self.task_queues[task_func]=asyncio.Queue()
        self.task_queues[task_func].put_nowait(task)
        """
        self.active_tasks.add(task)
        await asyncio.sleep(0)  # 루프 순서 변경
        return task_name

 

task_terminator.py

 

ACCUMULATED_TASK_LIMIT = 5

class TaskTerminationManager:
    """
    active_tasks : SET타입    : create한 Task객체를 담는 집합
    task_queues : 딕셔너리타입  : { async_함수주소 : asyncio.Queue() } <- async_큐 안에 async_함수의 Task 객체
        task_queues 구조 이유 : task_queues[async_함수주소] Queue size 가 5일 경우 캔슬
        따라서 async_함수 당, async_큐를 하나씩 할당.
    """
    def __init__(self, task_creation_manager):
        self.task_creation_manager = task_creation_manager

    async def remove_completed_tasks_from_queue(self):
        """
        종료된 Task를 active_tasks(집합)에서 삭제
        종료된 Task :  1. 실제로 루틴이 종료
                      2. Task cancel 했을 시 종료
        """
        completed_tasks: list = [task for task in self.task_creation_manager.active_tasks if task.done()]
        for task in completed_tasks:
            task_func_name = await self.get_task_func_name(task)
            await self.task_completed(task_func_name)
            self.task_creation_manager.active_tasks.remove(task)
            # task_done 을 get 함수가 들어간 함수에서 자동으로 처리해줌. 따라서 task_done 에 에러가 발생함.
            # self.task_creation_manager.task_queues[task_func].task_done()

    async def terminate_accumulated_tasks(self):
        """
        task_queues[async_함수주소] = 어싱크큐
        같은 주소의 async_함수의 어싱크큐가 5개 되면 old task 캔슬
        """
        for task_func, task_queue in self.task_creation_manager.task_queues.items():
            if task_queue.qsize() > ACCUMULATED_TASK_LIMIT:
                tasks_to_cancel = [await task_queue.get() for _ in range(task_queue.qsize() - ACCUMULATED_TASK_LIMIT)]
                task_queue.task_done()
                for task in tasks_to_cancel:
                    task.cancel()
                    print(f"누적이 5번 이상인 TASK : {await self.get_task_func_name(task)}")

    async def task_completed(self, task_func_name):
        print(f"종료된 TASK : {task_func_name}")

    async def get_task_func_name(self, param_task):
        task_func = param_task.get_coro()
        task_func_name = task_func.__name__
        return task_func_name

한클래스로 담으면

import asyncio
from collections import defaultdict

class TaskManager:
    def __init__(self):
        self.task_count = 0
        self.task_queues = defaultdict(asyncio.Queue)
        self.active_tasks = set()

    async def task_1_a(self):
        while True:
            await asyncio.sleep(1)

    async def task_2_b(self):
        while True:
            await asyncio.sleep(1)

    async def create_task(self):
        self.task_count += 1
        task_name = f"Task-{self.task_count}"
        task_func = self.task_1_a if self.task_count % 3 != 0 else self.task_2_b
        task = asyncio.create_task(task_func())
        self.task_queues[task_func].put_nowait(task)
        self.active_tasks.add(task)
        await asyncio.sleep(0)  # 루프 순서 변경 
        return task_name

    async def remove_completed_tasks(self):
        completed_tasks = [task for task in self.active_tasks if task.done()]
        for task in completed_tasks:
            task_func = task.get_coro()
            task_func_name = task_func.__name__
            await self.task_completed(task_func_name)
            self.active_tasks.remove(task)
            try:
                self.task_queues[task_func].task_done()
            except ValueError:
                print(f"누적된 TASK 가 5개 이상입니다. {task_func_name}")

    async def task_completed(self, task_func_name):
        print(f"종료된 TASK : {task_func_name}")

    async def terminate_accumulated_tasks(self):
        for task_func, task_queue in self.task_queues.items():
            if task_queue.qsize() > 5:
                tasks_to_cancel = [await task_queue.get() for _ in range(task_queue.qsize() - 5)]
                for task in tasks_to_cancel:
                    task.cancel()

    async def main_loop(self):
        while True:
            await asyncio.gather(
                self.create_task(),
                self.remove_completed_tasks(),
                self.terminate_accumulated_tasks(),
                asyncio.sleep(0),  # 루프 순서 변경 
            )
            print(f"누적된 TASK 개수 : {len(self.active_tasks)}")
            print(f"실행되고 있는 함수 명: {[task.get_coro().__name__ for task in self.active_tasks]}")
            await asyncio.sleep(0.5)  # 메인함수 루프 속도

async def main():
    manager = TaskManager()
    asyncio.create_task(manager.main_loop())
    await asyncio.sleep(20)  # 20초간 돔
    await manager.terminate_accumulated_tasks()

asyncio.run(main())

 

댓글