참고 사이트 :
https://docs.python.org/ko/3/library/asyncio-queue.html
큐 — Python 3.10.6 문서
큐 소스 코드: Lib/asyncio/queues.py asyncio 큐는 queue 모듈의 클래스와 유사하도록 설계되었습니다. asyncio 큐는 스레드 안전하지 않지만, async/await 코드에서 사용되도록 설계되었습니다. asyncio 큐의 메
docs.python.org
# loop 개념관련 : asyncio.get_event_loop() VS asyncio.run( )
Using queues results in asyncio exception "got Future <Future pending> attached to a different loop"
I'm trying to run this simple code with asyncio queues, but catch exceptions, and even nested exceptions. I would like to get some help with making queues in asyncio work correctly: import async...
stackoverflow.com
총 예제 4개
1
참고 예제 :
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
# queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
print(1)
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(5):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
print(2)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
print(3)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
print(4)
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())
2
코드 예제 :
키워드 :
await self.queue.get()
self.queue.task_done()
self.queue.put_nowait(_)
self.queue.qsize()
await self.queue.join()
loop = asyncio.get_event_loop() VS asyncio.run(main=self.main(), debug=True)
self.queue = Queue() <- 를 생성해줘야 하는 위치 ( loop insde )
import asyncio
from asyncio.queues import Queue
class AsyncQueue:
def __init__(self):
# queue 를 여기서 생성시 에러남. loop inside 에서 생성해야줘 한다.
self.queue: Queue = None
# self.queue = Queue()
async def getter(self, name):
print('getter start')
while True:
# queue get 함수 queue에 data가 쌓일 때 까지 blocking됨.
msg = await self.queue.get()
print('getter got msg [{}]'.format(msg), flush=True)
# 큐 속도 제어 : 10Hz 로 queue에 있는 data를 get 함
await asyncio.sleep(0.1)
# 큐.task_done() : 큐data를 get작업 완료 -> put 해도된다는 의미
self.queue.task_done()
async def putter(self):
print(' putter start')
# 미리 queue 에 1~5를 쌓아 놓고 진행 )
for _ in range(5):
self.queue.put_nowait(_)
while True:
# queue에 쌓인 task 가 전부 done 될때까지 join(blocking) 해줌
# 그래서 출력값도 1~5 출력후 queue에 put 진행
queue_stacked_size = self.queue.qsize()
msg = 'Hi Queue : you queue_stacked_size : '+ str(queue_stacked_size)
self.queue.put_nowait(msg)
# 큐.join() : getter가 'task_done()' 함수를 진행 시켜줄 때 까지 queue에 put을 blocking 해준다.
await self.queue.join()
async def main(self):
"""
set_new_loop 함수로 프로그램 돌릴시, 'self.queue = Queue()'함수 선언부가 타게팅된 main(self)함수 여야 한다.
아니면 'await self.queue.get()'library 내부 함수에 의해서 new_loop가 선언되면서 각기다른 asyncio.queue를 쓰게됨
에러내용 : ~~~ ~~~~ got Future <Future pending> attached to a different loop
"""
print('main start')
self.queue = Queue()
task_1 = asyncio.create_task(self.getter('worker-'))
task_2 = asyncio.create_task(self.putter())
print('gather start') # gather 함수에 있는 Task 가 완료 되기전까진 blocking 됨
await asyncio.gather(
task_1,
task_2
)
print('main END') # 출력 안됨
def set_current_in_use_loop(self):
"""
현재 loop 를 가져와서, 그 loop에다가 타겟팅된 코루틴을 돌린다.
따라서 현재루프 + 'future' :parameter 에 타겟된 함수가 돌아가게 된다.
그래서 'self.queue = Queue()' 부분이 __init__ 에 있어도 loop 내부(inside)이기 때문에 공통된 큐로 인식함
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(future=self.main())
loop.close()
def set_new_loop(self):
"""
asyncio.run() 로 돌리게 되면, loop 를 새로 만들어서 동작시킨다.
loop = events.new_event_loop()
events.set_event_loop(loop)
loop.run_until_complete(main)
이 구조로 돌아감. 그래서 'main' :parameter 에 타겟된 함수만 new_event_loop에 포함시켜서 돌게됨.
따라서 'self.queue = Queue()' 선언 부분도 타겟된 함수(self.main()) 에 포함시켜줘야 공통된 큐로 인식함
"""
asyncio.run(main=self.main(), debug=True)
if __name__ == '__main__':
async_queue = AsyncQueue()
# 밑에 둘중 아무거나 하나로 키면 됨
# async_queue.set_current_in_use_loop()
async_queue.set_new_loop()
예제 스크린샷 :
예제 콘솔 결과창 :
3
코드 예제 :
import asyncio
import time
from asyncio.queues import Queue
class AsyncQueue:
def __init__(self):
self.queue: Queue = None
async def getter(self, queue_callback):
print(' getter start')
msg = await self.queue.get()
queue_callback(msg)
# 큐 속도 제어 : 10Hz 로 queue에 있는 data를 get 함
await asyncio.sleep(0.1)
self.queue.task_done()
print('queue_size [{}]'.format(self.queue.qsize()),' getter END once')
async def putter(self, _msg, queue_get_callback):
print('putter start')
self.queue = Queue()
self.queue.put_nowait(_msg)
task_1 = asyncio.create_task(self.getter(queue_get_callback))
await task_1
await self.queue.join()
print('\t\tputter END once')
if __name__ == '__main__':
async_queue = AsyncQueue()
i=0
def queue_get_callback(get_msg):
print('got msg from getter : [{}], index : [{}]'.format(get_msg, i))
test_msg = "{'key1':'val1','key2':'val2'}"
while True:
time.sleep(1)
i = i + 1
asyncio.run(async_queue.putter(test_msg, queue_get_callback))
print('while loop END once')
콘솔 결과창 :
무한루프
4
코드 예제 :
코드 3 번 예제 응용을 하자면,
asyncio.queue 를 통해서 ros2 data받고 그 data를 "큐.put_nowait()" 후, "msg = await 큐.get( )"로 실시간으로 가져와서 그 data를 다시 mqtt를 통해서 보내준다.
이런식으로 서로 다른 통신 규격을 맞출때나, Thread 간 또는 Thread 랑 asyncio, asyncio 간에 싱크를 queue로 맞추거나 할 수 있다.
import asyncio
import time
from asyncio.queues import Queue
class AsyncQueue:
def __init__(self):
self.ros2_to_mqtt: Queue = None
self.mqtt_to_ros2: Queue = None
async def ros2_data_put(self, _msg, queue_get_callback):
self.mqtt_to_ros2 = Queue()
self.mqtt_to_ros2.put_nowait(_msg)
task_1 = asyncio.create_task(self.mqtt_data_get(queue_get_callback))
await task_1
await self.mqtt_to_ros2.join()
# print('\t\tputter END once')
async def mqtt_data_get(self, queue_callback):
msg = await self.mqtt_to_ros2.get()
queue_callback(msg)
# 큐 속도 제어 : 10Hz 로 queue에 있는 data를 get 함
await asyncio.sleep(0.1)
self.mqtt_to_ros2.task_done()
# print('queue_size [{}]'.format(self.queue.qsize()), ' getter END once')
async def mqtt_data_put(self, _msg, queue_get_callback):
self.ros2_to_mqtt = Queue()
self.ros2_to_mqtt.put_nowait(_msg)
task_1 = asyncio.create_task(self.ros2_data_get(queue_get_callback))
await task_1
await self.ros2_to_mqtt.join()
# print('\t\tputter END once')
async def ros2_data_get(self, queue_callback):
msg = await self.ros2_to_mqtt.get()
queue_callback(msg)
# 큐 속도 제어 : 10Hz 로 queue에 있는 data를 get 함
await asyncio.sleep(0.1)
self.ros2_to_mqtt.task_done()
# print('queue_size [{}]'.format(self.queue.qsize()), ' getter END once')
if __name__ == '__main__':
async_queue = AsyncQueue()
i = 0
def ros2_get_callback(get_msg):
print('ros2_get_callback : [{}], index : [{}]'.format(get_msg, i))
def mqtt_get_callback(get_msg):
print('mqtt_get_callback : [{}], index : [{}]'.format(get_msg, i))
test_msg = "{'key1':'val1','key2':'val2'}"
while True:
time.sleep(1)
i = i + 1
asyncio.run(async_queue.ros2_data_put(test_msg, ros2_get_callback))
asyncio.run(async_queue.mqtt_data_put(test_msg, mqtt_get_callback))
콘솔 결과창 :
'언어 정리 > python_비동기관련_lib' 카테고리의 다른 글
코루틴 과 Eventloop 그리고 Future (1) | 2022.09.14 |
---|---|
agent,handler 모음 (ros2, gmqtt, asyncio.Queue) (1) | 2022.09.12 |
이벤트 루프_2 - 예제랑 같이 좀더 자세히 (0) | 2022.08.28 |
다른사람 예제 rclpy.executors 랑 rclpy.task 관련 예시 퍼옴 (0) | 2022.08.17 |
이벤트 루프 - 파이썬 레퍼런스 정리 (0) | 2022.08.17 |
댓글