본문 바로가기
언어 정리/python_비동기관련_lib

asyncio.queue 를 쓰레드와 asyncio 사이에서 사용 하기

by 알 수 없는 사용자 2022. 7. 13.

참고

https://stackoverflow.com/questions/32889527/is-there-a-way-to-use-asyncio-queue-in-multiple-threads

 

Is there a way to use asyncio.Queue in multiple threads?

Let's assume I have the following code: import asyncio import threading queue = asyncio.Queue() def threaded(): import time while True: time.sleep(2) queue.put_nowait(tim...

stackoverflow.com

 

asyncio queue 관련함수 정리

https://runebook.dev/ko/docs/python/library/asyncio-queue

 

Python - Queues - 소스 코드: Lib/asyncio/queues.py asyncio 대기열은 클래스와 유사하도록 설계되었습니다.

 

runebook.dev

https://docs.python.org/ko/3.7/library/queue.html

 

queue — 동기화된 큐 클래스 — Python 3.7.13 문서

queue — 동기화된 큐 클래스 소스 코드: Lib/queue.py The queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queu

docs.python.org


수정 해야 될 사항 : 

asyncio.queue 가 Thread_safe하지 않다고 해도 양방향이 아닌 단방향으로 했을 시에는 따로 문제가 생기지 않았습니다.
따라서 "queue._loop._write_to_self()" 를 따로 쓰지 않고
1.단방향으로 동작 2. 큐.empty() 체크 , 3. 블럭형 get , 논블럭형 put 사용
만으로도 동작하는데에는 따로 문제가 없었습니다.


조건 : 쓰레드 1개 , async 1개

잘못된 예 : 예측한 결과는 쓰레드로 asyncio.Queue()에 put 하면, 비동기함수인 async_1 에서 get을 바로바로 해줄줄 알았죵 하지만 안됩니다.

import asyncio
import threading

queue = asyncio.Queue()

def threaded():
    import time
    while True:
        time.sleep(2)
        queue.put_nowait(time.time())
        print(queue.qsize())

async def async_1():
    while True:
        time = await queue.get()
        print(time)

loop = asyncio.get_event_loop()
asyncio.Task(async_1())
threading.Thread(target=threaded).start()
loop.run_forever()

결과

 


조건 : 쓰레드 1개 , async 1개

잘된 예 : 쓰레드가 큐를 put할때마다 asyncio 쪽에서 get으로 가져옴

핵심 키워드 :

"

queue = asyncio.Queue()

queue._loop._write_to_self()

"

import asyncio
import threading

queue = asyncio.Queue()

def threaded():
    import time
    while True:
        time.sleep(2)
        queue.put_nowait(time.time())
        queue._loop._write_to_self()
        print(queue.qsize())

async def async_1():
    while True:
        time = await queue.get()
        print(time)

loop = asyncio.get_event_loop()
asyncio.Task(async_1())
threading.Thread(target=threaded).start()
loop.run_forever()

 


조건 : 쓰레드 1개 , async 2개

잘된 예 : 쓰레드와 async함수async_1이 put할때마다, async함수async_2 쪽에서 get으로 가져옴

핵심 키워드 :

쓰레드는 2초당 한번씩 put 해주고,

async함수async_1은 1초당 한번씩 put 해준다.

async함수async_2는 큐의 상태가 empty가 아닐시 전부 get해주고, 큐의 상태가 empty일 시엔 "await asyncio.sleep(1.0)"해준다.

import asyncio
import threading

queue = asyncio.Queue()

def threaded():
    import time
    while True:
        time.sleep(2)
        queue.put_nowait(time.time())
        queue._loop._write_to_self()
        print("queue size : ",queue.qsize())

async def async_1():
	while True:
		await asyncio.sleep(1.0)
		queue.put_nowait("async_2_put_data")
		queue._loop._write_to_self()
	
async def async_2():
    while True:
    	if queue.empty():
    		await asyncio.sleep(1.0)
    	else:
        	putted_data = await queue.get()
        	print(putted_data)
 	
async def gather_async():
	await asyncio.gather(
		async_1(),
		async_2()
	)

loop = asyncio.get_event_loop()

threading.Thread(target=threaded).start()
loop.run_until_complete(gather_async())

 

 

 

 

 

댓글