본문 바로가기
언어 정리/python_비동기관련_lib

asyncio Queue 랑 loop 개념까지 정리

by 알 수 없는 사용자 2022. 9. 6.

참고 사이트 :

https://docs.python.org/ko/3/library/asyncio-queue.html

 

큐 — Python 3.10.6 문서

큐 소스 코드: Lib/asyncio/queues.py asyncio 큐는 queue 모듈의 클래스와 유사하도록 설계되었습니다. asyncio 큐는 스레드 안전하지 않지만, async/await 코드에서 사용되도록 설계되었습니다. asyncio 큐의 메

docs.python.org

 

 

# loop 개념관련 : asyncio.get_event_loop()  VS  asyncio.run( )

https://stackoverflow.com/questions/53724665/using-queues-results-in-asyncio-exception-got-future-future-pending-attached

 

Using queues results in asyncio exception "got Future <Future pending> attached to a different loop"

I'm trying to run this simple code with asyncio queues, but catch exceptions, and even nested exceptions. I would like to get some help with making queues in asyncio work correctly: import async...

stackoverflow.com

 

 

 


총 예제 4개

 

1

참고 예제 : 

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        # queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    print(1)
    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(5):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    print(2)
    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    print(3)
    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    print(4)
    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())

 


2

코드 예제 : 

키워드 : 

await self.queue.get()

self.queue.task_done()

self.queue.put_nowait(_)

self.queue.qsize()

await self.queue.join()

loop = asyncio.get_event_loop()  VS  asyncio.run(main=self.main(), debug=True)

self.queue = Queue() <- 를 생성해줘야 하는 위치 ( loop insde ) 

 

import asyncio
from asyncio.queues import Queue

class AsyncQueue:
    def __init__(self):
        # queue 를 여기서 생성시 에러남. loop inside 에서 생성해야줘 한다.
        self.queue: Queue = None
        # self.queue = Queue()

    async def getter(self, name):
        print('getter start')
        while True:
            # queue get 함수 queue에 data가 쌓일 때 까지 blocking됨.
            msg = await self.queue.get()
            print('getter got msg [{}]'.format(msg), flush=True)

            # 큐 속도 제어 : 10Hz 로 queue에 있는 data를 get 함
            await asyncio.sleep(0.1)

            # 큐.task_done() : 큐data를 get작업 완료 -> put 해도된다는 의미
            self.queue.task_done()

    async def putter(self):
        print(' putter start')
        # 미리 queue 에 1~5를 쌓아 놓고 진행 )
        for _ in range(5):
            self.queue.put_nowait(_)

        while True:
            # queue에 쌓인 task 가 전부 done 될때까지 join(blocking) 해줌
            # 그래서 출력값도 1~5 출력후 queue에 put 진행
            queue_stacked_size = self.queue.qsize()
            msg = 'Hi Queue : you queue_stacked_size : '+ str(queue_stacked_size)
            self.queue.put_nowait(msg)
            # 큐.join() : getter가 'task_done()' 함수를 진행 시켜줄 때 까지 queue에 put을 blocking 해준다.
            await self.queue.join()

    async def main(self):
        """
        set_new_loop 함수로 프로그램 돌릴시, 'self.queue = Queue()'함수 선언부가 타게팅된 main(self)함수 여야 한다.
        아니면 'await self.queue.get()'library 내부 함수에 의해서 new_loop가 선언되면서 각기다른 asyncio.queue를 쓰게됨
               에러내용 : ~~~ ~~~~ got Future <Future pending> attached to a different loop
        """
        print('main start')
        self.queue = Queue()
        task_1 = asyncio.create_task(self.getter('worker-'))
        task_2 = asyncio.create_task(self.putter())

        print('gather start') # gather 함수에 있는 Task 가 완료 되기전까진 blocking 됨
        await asyncio.gather(
            task_1,
            task_2
        )
        print('main END')  # 출력 안됨

    def set_current_in_use_loop(self):
        """
        현재 loop 를 가져와서, 그 loop에다가 타겟팅된 코루틴을 돌린다.
        따라서 현재루프 + 'future' :parameter 에 타겟된 함수가 돌아가게 된다.
        그래서 'self.queue = Queue()' 부분이 __init__ 에 있어도 loop 내부(inside)이기 때문에 공통된 큐로 인식함
        """
        loop = asyncio.get_event_loop()
        loop.run_until_complete(future=self.main())
        loop.close()

    def set_new_loop(self):
        """
        asyncio.run() 로 돌리게 되면, loop 를 새로 만들어서 동작시킨다.
          loop = events.new_event_loop()
          events.set_event_loop(loop)
          loop.run_until_complete(main)
        이 구조로 돌아감. 그래서 'main' :parameter 에 타겟된 함수만 new_event_loop에 포함시켜서 돌게됨.
        따라서  'self.queue = Queue()' 선언 부분도 타겟된 함수(self.main()) 에 포함시켜줘야 공통된 큐로 인식함
        """
        asyncio.run(main=self.main(), debug=True)


if __name__ == '__main__':
    async_queue = AsyncQueue()
    # 밑에 둘중 아무거나 하나로 키면 됨
    # async_queue.set_current_in_use_loop()
    async_queue.set_new_loop()

 

 

예제 스크린샷 :

 

 

예제 콘솔 결과창 :

 


3

코드 예제 : 

import asyncio
import time
from asyncio.queues import Queue

class AsyncQueue:
    def __init__(self):
        self.queue: Queue = None

    async def getter(self, queue_callback):
        print(' getter start')
        msg = await self.queue.get()
        queue_callback(msg)
        # 큐 속도 제어 : 10Hz 로 queue에 있는 data를 get 함
        await asyncio.sleep(0.1)
        self.queue.task_done()
        print('queue_size [{}]'.format(self.queue.qsize()),' getter END once')

    async def putter(self, _msg, queue_get_callback):
        print('putter start')
        self.queue = Queue()
        self.queue.put_nowait(_msg)
        task_1 = asyncio.create_task(self.getter(queue_get_callback))
        await task_1
        await self.queue.join()
        print('\t\tputter END once')

if __name__ == '__main__':
    async_queue = AsyncQueue()
    i=0
    def queue_get_callback(get_msg):
        print('got msg from getter : [{}], index : [{}]'.format(get_msg, i))
    test_msg = "{'key1':'val1','key2':'val2'}"
    while True:
        time.sleep(1)
        i = i + 1
        asyncio.run(async_queue.putter(test_msg, queue_get_callback))
        print('while loop END once')

 

콘솔 결과창 : 

무한루프

 


4

코드 예제 : 

코드 3 번 예제 응용을 하자면,

asyncio.queue 를 통해서 ros2 data받고 그 data를 "큐.put_nowait()" 후, "msg = await 큐.get( )"로 실시간으로 가져와서 그 data를 다시 mqtt를 통해서 보내준다.

이런식으로 서로 다른 통신 규격을 맞출때나, Thread 간 또는 Thread 랑 asyncio, asyncio 간에 싱크를 queue로 맞추거나 할 수 있다.

import asyncio
import time
from asyncio.queues import Queue

class AsyncQueue:
    def __init__(self):
        self.ros2_to_mqtt: Queue = None
        self.mqtt_to_ros2: Queue = None

    async def ros2_data_put(self, _msg, queue_get_callback):
        self.mqtt_to_ros2 = Queue()
        self.mqtt_to_ros2.put_nowait(_msg)
        task_1 = asyncio.create_task(self.mqtt_data_get(queue_get_callback))
        await task_1
        await self.mqtt_to_ros2.join()
        # print('\t\tputter END once')

    async def mqtt_data_get(self, queue_callback):
        msg = await self.mqtt_to_ros2.get()
        queue_callback(msg)
        # 큐 속도 제어 : 10Hz 로 queue에 있는 data를 get 함
        await asyncio.sleep(0.1)
        self.mqtt_to_ros2.task_done()
        # print('queue_size [{}]'.format(self.queue.qsize()), ' getter END once')

    async def mqtt_data_put(self, _msg, queue_get_callback):
        self.ros2_to_mqtt = Queue()
        self.ros2_to_mqtt.put_nowait(_msg)
        task_1 = asyncio.create_task(self.ros2_data_get(queue_get_callback))
        await task_1
        await self.ros2_to_mqtt.join()
        # print('\t\tputter END once')

    async def ros2_data_get(self, queue_callback):
        msg = await self.ros2_to_mqtt.get()
        queue_callback(msg)
        # 큐 속도 제어 : 10Hz 로 queue에 있는 data를 get 함
        await asyncio.sleep(0.1)
        self.ros2_to_mqtt.task_done()
        # print('queue_size [{}]'.format(self.queue.qsize()), ' getter END once')

if __name__ == '__main__':
    async_queue = AsyncQueue()
    i = 0
    def ros2_get_callback(get_msg):
        print('ros2_get_callback : [{}], index : [{}]'.format(get_msg, i))

    def mqtt_get_callback(get_msg):
        print('mqtt_get_callback : [{}], index : [{}]'.format(get_msg, i))

    test_msg = "{'key1':'val1','key2':'val2'}"
    while True:
        time.sleep(1)
        i = i + 1
        asyncio.run(async_queue.ros2_data_put(test_msg, ros2_get_callback))
        asyncio.run(async_queue.mqtt_data_put(test_msg, mqtt_get_callback))

 

콘솔 결과창 : 

 

 

댓글