참고
https://stackoverflow.com/questions/32889527/is-there-a-way-to-use-asyncio-queue-in-multiple-threads
Is there a way to use asyncio.Queue in multiple threads?
Let's assume I have the following code: import asyncio import threading queue = asyncio.Queue() def threaded(): import time while True: time.sleep(2) queue.put_nowait(tim...
stackoverflow.com
asyncio queue 관련함수 정리
https://runebook.dev/ko/docs/python/library/asyncio-queue
Python - Queues - 소스 코드: Lib/asyncio/queues.py asyncio 대기열은 클래스와 유사하도록 설계되었습니다.
runebook.dev
https://docs.python.org/ko/3.7/library/queue.html
queue — 동기화된 큐 클래스 — Python 3.7.13 문서
queue — 동기화된 큐 클래스 소스 코드: Lib/queue.py The queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queu
docs.python.org
수정 해야 될 사항 :
asyncio.queue 가 Thread_safe하지 않다고 해도 양방향이 아닌 단방향으로 했을 시에는 따로 문제가 생기지 않았습니다.
따라서 "queue._loop._write_to_self()" 를 따로 쓰지 않고
1.단방향으로 동작 2. 큐.empty() 체크 , 3. 블럭형 get , 논블럭형 put 사용
만으로도 동작하는데에는 따로 문제가 없었습니다.
조건 : 쓰레드 1개 , async 1개
잘못된 예 : 예측한 결과는 쓰레드로 asyncio.Queue()에 put 하면, 비동기함수인 async_1 에서 get을 바로바로 해줄줄 알았죵 하지만 안됩니다.
import asyncio
import threading
queue = asyncio.Queue()
def threaded():
import time
while True:
time.sleep(2)
queue.put_nowait(time.time())
print(queue.qsize())
async def async_1():
while True:
time = await queue.get()
print(time)
loop = asyncio.get_event_loop()
asyncio.Task(async_1())
threading.Thread(target=threaded).start()
loop.run_forever()
결과
조건 : 쓰레드 1개 , async 1개
잘된 예 : 쓰레드가 큐를 put할때마다 asyncio 쪽에서 get으로 가져옴
핵심 키워드 :
"
queue = asyncio.Queue()
queue._loop._write_to_self()
"
import asyncio
import threading
queue = asyncio.Queue()
def threaded():
import time
while True:
time.sleep(2)
queue.put_nowait(time.time())
queue._loop._write_to_self()
print(queue.qsize())
async def async_1():
while True:
time = await queue.get()
print(time)
loop = asyncio.get_event_loop()
asyncio.Task(async_1())
threading.Thread(target=threaded).start()
loop.run_forever()
조건 : 쓰레드 1개 , async 2개
잘된 예 : 쓰레드와 async함수async_1이 put할때마다, async함수async_2 쪽에서 get으로 가져옴
핵심 키워드 :
쓰레드는 2초당 한번씩 put 해주고,
async함수async_1은 1초당 한번씩 put 해준다.
async함수async_2는 큐의 상태가 empty가 아닐시 전부 get해주고, 큐의 상태가 empty일 시엔 "await asyncio.sleep(1.0)"해준다.
import asyncio
import threading
queue = asyncio.Queue()
def threaded():
import time
while True:
time.sleep(2)
queue.put_nowait(time.time())
queue._loop._write_to_self()
print("queue size : ",queue.qsize())
async def async_1():
while True:
await asyncio.sleep(1.0)
queue.put_nowait("async_2_put_data")
queue._loop._write_to_self()
async def async_2():
while True:
if queue.empty():
await asyncio.sleep(1.0)
else:
putted_data = await queue.get()
print(putted_data)
async def gather_async():
await asyncio.gather(
async_1(),
async_2()
)
loop = asyncio.get_event_loop()
threading.Thread(target=threaded).start()
loop.run_until_complete(gather_async())
'언어 정리 > python_비동기관련_lib' 카테고리의 다른 글
코루틴과 태스크, 퓨처 - 파이썬 레퍼런스 정리 (0) | 2022.08.17 |
---|---|
ThreadPool + 딕셔너리 컴프리헨션 (0) | 2022.07.31 |
AsyncIO_5_멀티 스크랩핑 실습(인프런) (0) | 2022.06.04 |
AsyncIO_4_{ 저수준함수 : 이벤트 루프 : asyncio.get_event_loop( ) } (0) | 2022.05.05 |
python 쓰레드 개념 (0) | 2022.05.03 |
댓글