본문 바로가기
언어 정리/python_비동기관련_lib

코루틴과 태스크, 퓨처 - 파이썬 레퍼런스 정리

by 알 수 없는 사용자 2022. 8. 17.

 

https://docs.python.org/ko/3/library/asyncio-task.html

 

코루틴과 태스크 — Python 3.10.6 문서

코루틴과 태스크 이 절에서는 코루틴과 태스크로 작업하기 위한 고급 asyncio API에 관해 설명합니다. Coroutines declared with the async/await syntax is the preferred way of writing asyncio applications. For example, the follo

docs.python.org

 

https://wonhyeok1994.tistory.com/79

 

asyncIO_1_( 코루틴개념+함수 : create_task, run, sleep )

참고 https://docs.python.org/ko/3/library/asyncio-task.html 키워드 create_task, run, sleep ㅋ couroutine 인데 오타로 corutine 이라고 적음 그런줄알고 보면됨 코루틴을 오해하고 있었음.. 내가 전에 해..

wonhyeok1994.tistory.com

 

https://pythontic.com/asyncio/task/add_done_callback

 

add_done_callback() method of asyncio.Task class in Python | Pythontic.com

--> --> --> HomeAsyncioTaskAdd_done_callback Overview: The add_done_callback() method adds a callback function to an asyncio.Task object which is called when the Task is done. The task object itself is sent back to the call back through the context paramet

pythontic.com

 


키워드

create_task, run, sleep


선행 개념 :

await @@@ 라고 하면

@@@은 awaitable 객체 라고 한다.

awaitable 객체는 세가지 유형이 있습니다.

1. coroutine

2. task

3. future

------------------

task와 future의 관계 : asyncio.create_task( ) 는 Task 객체를 반환하는데, 이 Task객체가 Future의 상속을 받는다. 그래서 Task를 future처럼 쓸 수 있는 이유다.

------------------

couroutine 의미

파이썬 couroutine( async가 붙는 함수 ) 는 awaitable 하므로 다른 코루틴에서 기다릴 수 있다.

코루틴 함수 : async def 함수

코루틴 객체 : 코루틴 함수를 호출하여 반환된 객체 ( async함수에서 return )

------------------

여기서 말하는 task 랑 future 는 _asyncio.Task 와 _asyncio.Future 를 의미함.

_asyncio 에서 다루는 객체로 Task 와 Future 가 있는데 둘다 await Task , await Future 로 실행 가능함.

사실 좀 헷갈리는게 asyncio futures 와 concurrent.futures의 future가 비슷한거 같지만 다름. 동시성과 병렬성 차이 라고 보면 됨. 동작이 비슷하지 컨셉은 완전 다름. 

따라서 concurrent.futures 를 

------------------

Task 개념외에 Future가 있는 이유와 특징

1. 다른 threading.Thread(일반쓰레드) 랑 _asyncio.Future(asyncio태스크) 이 safely synchronizes 하기 위해 필요함.

2. async/await 를 사용할 때 콜백함수를 이용하고 싶으면 _asyncio.Future 를 이용해야 한다. ( ex> 코루틴태스크.add_done_callback(콜백함수id) 이렇게 , 코루틴태스크 완료시 콜백함수 동작 )

3. _asyncio.Future 객체를 await 하면, 코루틴이 _asyncio.Future 객체 동작이 완료 할 때 까지 기다림.(blocking됨) ( TIP > Future 객체가 가리키는 함수 때문에 blocking되는게 싫으면 time.sleep( ) 함수로 blocking을 피할 수 있다. await asynio.sleep( ) 처럼 동작됨. 이유 : 퓨처 객체 함수에서 time.sleep( ) 는 IO-bound operation를 유발해서 인식 함 )

------------------

task 의미

Event Loop 에 등록시킬 수 있는 객체id

 

 


 

1 run // 이런식으로 하면 동기식이랑 다를게 없음

run 함수는 항상 새 이벤트 루프를 만들고, 끝에 이벤트 루프를 닫는다.

run은 async def 함수의 메인 진입 지점이고, 이상적으로 한번만 호출 해주는게 맞다.

#!/usr/bin/env python3
import asyncio

async def main():
    print('hello')
    await asyncio.sleep(1)
    print('world')


asyncio.run(main())

 

2 await // 이런식으로 하면 동기식이랑 다를게 없음 , 1초후 hello 또 2초후 world

await say_after(1, 'hello') 하면 asyncio.sleep(1)초 들어가고 들어간 동안 예약된 task가 없는지 확인함. 여기선 예약된 task 가 없으므로 그냥 time.sleep(delay) 와 똑같이 동작함

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

 

3 run , await ,

create_task : task를 예약(future개념) . 

그러면 await시, await asyncio.sleep(delay)로 슬립되고 예약된 task를 먼저 실행시킨다.

#!/usr/bin/env python3

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")
    
asyncio.run(main())

 

4. async def 함수를 그냥 호출하면 Warning뜸 //

import asyncio
import time

async def nested():
    await asyncio.sleep(1)
    return 42

async def main():
    print(f"1 at {time.strftime('%X')}")
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()
    print('--------------------------------')
    print(f"2 at {time.strftime('%X')}")
    # Let's do it differently now and await it:
    print(await nested())  # will print "42".
    print(f"3 at {time.strftime('%X')}")

asyncio.run(main())

출력값 : 여기서 나오는 에러는 코루틴 함수를 짜면서 자주 볼 수 있는 에러

async def 함수를 그냥 호출시킬 때 뜸

 

5. async def 함수를 create_task로 예약 후, 예약된 Task-2 를 await 로 호출

import asyncio
import time

async def nested():
    await asyncio.sleep(1)
    return 42

async def main():
    print(f"1 at {time.strftime('%X')}")
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())
    print(task)
    print(f"2 at {time.strftime('%X')}")
    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task
    print(f"3 at {time.strftime('%X')}")

asyncio.run(main())

creat_task 로 리턴된 객체

 

6. Future ( 짜임새 구조 )

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

실행 중에 작업이 사라지는 것을 방지하려면 이 함수의 결과에 대한 참조를 저장하십시오. 이벤트 루프는 작업에 대한 약한 참조만 유지합니다. 다른 곳에서 참조되지 않은 작업은 완료되기 전에 언제든지 가비지 수집될 수 있습니다. 신뢰할 수 있는 "방화 및 삭제" 백그라운드 작업을 위해 다음 컬렉션으로 수집합니다.

 

키워드 : add_done_callback() , create_task() , set() , 백그라운드로 task 돌리기

set( ) 함수를 통해서 집합을 만들고, 집합에 task들을 전부 추가한 후에 ( await으로 task를 진행 ) set( ) 함수에 있는 task를 처분 해주는 구조

background_tasks = set()

for i in range(10):
    task = asyncio.create_task(some_coro(param=i))

    # Add task to the set. This creates a strong reference.
    background_tasks.add(task)
	
    # ( await으로 task를 진행 ) 
    # To prevent keeping references to finished tasks forever,
    # make each task remove its own reference from the set after
    # completion:
    task.add_done_callback(background_tasks.discard)

예시 :

simple_coroutine코루틴함수에 task_cb일반함수를 콜백함수를 씌운걸 Wrap 했다 라고 표현하는듯 

"""
refer :  https://pythontic.com/asyncio/task/add_done_callback
"""

import time
print(f"0 at {time.strftime('%X')}")

import asyncio
from datetime import datetime

# Callback - context object is the actual
# task object
def task_cb(context):
    print(2)
    print("Task completion received...")
    print("Name of the task:%s"%context.get_name())
    print("Wrapped coroutine object:%s"%context.get_coro())
    print("Task is done:%s"%context.done())
    print("Task has been cancelled:%s"%context.cancelled())
    print("Task result:%s"%context.result())
    print(type(context))
    print(context)
    print(f"3 at {time.strftime('%X')}")

# A simple Python coroutine
async def simple_coroutine(param):
    await asyncio.sleep(1)
    print(param)
    return 1

# Create an asyncio.Task object
async def main():
    t1 = asyncio.create_task(simple_coroutine(param='1'))
    t1.add_done_callback(task_cb)
    await t1
    print("Coroutine main() exiting")
    print("get coro object \t: ",t1.get_coro())
    print("get coro done result \t: ",t1.done())


# Execute the task
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
asyncio.run(main())

 

7. 코루틴 취소 task.cancel( )

"""
refer :  https://pythontic.com/asyncio/task/cancel
"""

from datetime import date, datetime
import asyncio

# Define a coroutine
async def alarm():
    try:
        print("Task entered:%s"%datetime.now())
        for i in range (1, 10):
            print("Beep...%d"%i)
            await asyncio.sleep(1)
        print(date.today())
    except asyncio.CancelledError as e:
        print("Stopping the alarm")
    finally:
        print("Alarm stopped")
        print("Task cancelled:%s"%datetime.now())


# A coroutine for creating and cancelling tasks
async def PerformTasks(cancelAfter):
    # Create and schedule a task for execution
    t1 = asyncio.create_task(alarm())
    await asyncio.sleep(cancelAfter)

    # Make a cancellation request
    t1.cancel()
    
    # Wait for the cancellation of t1 to be complete
    await t1
    print("LOOP END")

# Create a new event loop and set to asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

# Run the coroutine containing tasks
cancelAfter = 5
asyncio.run(PerformTasks(cancelAfter))

 

8. gather VS await

import asyncio

async def coro_func( number):
    await asyncio.sleep(1)
    print(number)
    return number

async def main_gather():
    # Schedule three calls *concurrently*:
    task_another_way = asyncio.create_task(coro_func(2))
    result = await asyncio.gather(
        coro_func(number=1),
        task_another_way,
        coro_func(3),
        coro_func(4),
        coro_func(5)
    )
    print(result)
    print(type(result))

async def main_create_task():
    t1 = asyncio.create_task(coro_func(number=1))
    t2 = asyncio.create_task(coro_func(number=2))
    t3 = asyncio.create_task(coro_func(number=3))
    t4 = asyncio.create_task(coro_func(4))
    t5 = asyncio.create_task(coro_func(5))
    print(t1)
    print(type(t1))
    L = []
    for i in range(1,6):
        _loop = 't' + str(i)
        tmp = await eval(_loop)
        L.append(tmp)

    print(L)

asyncio.run(main_gather())
print('----------------gather VS create_task----------------')
asyncio.run(main_create_task())

 

10. 

res = await shield(something())
await asyncio.wait_for(eternity(), timeout=1.0)
done, pending = await asyncio.wait(aws, timeout=None, return_when=ALL_COMPLETED)
# ALL_COMPLETED : 모든 퓨처가 끝나거나 취소되면 함수가 반환됩니다.
# wait()는 코루틴을 태스크로 자동 예약하고, 나중에 묵시적으로 생성된 Task 객체를 (done, pending) 집합으로 반환함

async def foo():
    return 42

task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})

if task in done:
    # Everything will work as expected now.

aws 이터러블에 있는 어웨이터블 객체를 동시에 실행하고, return_when에 의해 지정된 조건을 만족할 때까지 블록합니다.

 

11. 

_asyncio.Task    객체를 만드는 함수 랑       < loop.create_task( 코루틴 ) >

_asyncio.Future 객체를 만드는 함수            < loop.run_in_executor( None, 일반함수 ) >

< asyncio.ensure_future( 코루틴 or future ) >  일반함수면 awaitable 한 future객체로, 코루틴이면 Task로, future면 future객체로 알아서 만들어주는 함수

run_in_executor <- 이거 이용해서 _asyncio.Future 함수를 만들고 다른 쓰레드(일반 future객체)와 같이 사용함

import asyncio
import time

async def coro_1():
    await asyncio.sleep(2)
    print('코루틴함수임')

def thread_or_callback_function():
    time.sleep(1)
    print("쓰레드를 실행시키는 함수 이거나"
          "콜백함수라 가정함")

loop = asyncio.get_event_loop()

# asyncio.create_task() 함수를 쓰면 실질적으로 코루틴을 task로 변환해주는 함수.
task1 = loop.create_task(coro_1())
assert isinstance(task1, asyncio.Task)
print('1', task1)   	# 1 <Task pending name='Task-1' coro=<coro_1() running at tmp.py:5>>
print(type(task1)) 		# <class '_asyncio.Task'>

task2 = asyncio.ensure_future(coro_1())
assert isinstance(task2, asyncio.Task)
print('2', task2)       # 2 <Task pending name='Task-2' coro=<coro_1() running at tmp.py:5>>
print(type(task2))      # <class '_asyncio.Task'>

task3 = asyncio.ensure_future(task1)
assert task3 is task1
print('3', task3)       # 3 <Task pending name='Task-1' coro=<coro_1() running at tmp.py:5>>
print(type(task3))      # <class '_asyncio.Task'>

# asyncio.future 함수를 리턴해줌
future_obj = loop.run_in_executor(None, thread_or_callback_function)
print('4', future_obj)      # 4 <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/lib/python3.8/asyncio/futures.py:360]>
print(type(future_obj))     # <class '_asyncio.Future'>

loop.run_until_complete(coro_1())

loop.close()

# await로 _asyncio.Future 와 _asyncio.Task 둘다 실행 가능

 

 

12. 코루틴에서 쓰레드 제어를 하는 방식 

asyncio.to_thread( )같은 경우에는 python 3.9버전 이상 부터 쓸 수 있는 함수긴 한데 사실 다른 버전에서도 사용할 수 있는 방법은 있다. python3.9에 있는 to_thread( ) 함수를 복붙해서 상황에 맞게 쓰면 됨

예제 : 

import asyncio
import time
import contextvars
import functools

class thread_async():
    def __init__(self):
        pass

    def blocking_io(self):
        while True :
            print(f"start blocking_io at {time.strftime('%X')}")
            # Note that time.sleep() can be replaced with any blocking
            # IO-bound operation, such as file operations.
            time.sleep(1)
            print(f"blocking_io complete at {time.strftime('%X')}")

    async def test_func_1(self):
        while True:
            await asyncio.sleep(1)
            print('test_func_1 on ')

    async def test_func_2(self):
        while True:
            await asyncio.sleep(1)
            print('test_func_2 on ')

    async def to_thread(self, func, /, *args, **kwargs):
        # loop = asyncio.get_event_loop()
        loop = asyncio.get_running_loop()
        ctx = contextvars.copy_context()
        func_call = functools.partial(ctx.run, func, *args, **kwargs)
        return await loop.run_in_executor(None, func_call)

    async def main(self):
        print(f"started main at {time.strftime('%X')}")
        _task_1 = asyncio.create_task(self.test_func_1())
        _task_2 = asyncio.create_task(self.test_func_2())
        await asyncio.gather(
            self.to_thread(self.blocking_io),
            _task_1,
            _task_2
        )
        print(f"finished main at {time.strftime('%X')}")

    def run(self):
        _loop = asyncio.new_event_loop()
        asyncio.set_event_loop(_loop)
        _loop.run_until_complete(self.main())



aa = thread_async()
aa.run()

 

 

13. 쓰레드에서 코루틴 제어를 하는 방식 ( 실행되는 쓰레드 안에서 "run_coroutine_threadsafe(코루틴함수,루프)" 를 호출 )

키워드 : asyncio.run_coroutine_threadsafe(coro, loop)

import asyncio
import threading
import time

class coro_thread():
    def __init__(self,_loop):
        self._loop = _loop

    def another_thread(self):
        """ 쓰레드 함수 """
        print('1 another_thread start')
        coro = self.coro_func()
        future = asyncio.run_coroutine_threadsafe(coro, self._loop)
        # future.add_done_callback(self.start_coro())
        # print(f"{threading.current_thread().name}: {future.result()}")
        time.sleep(5)
        # print(f"{threading.current_thread().name} is Finished")
        print('1 another_thread end')

    async def coro_task(self):
        """ task 예약하는 코루틴함수 """
        print('2 creat_task start')
        _task_1 = asyncio.create_task(self.coro_test())
        await _task_1
        print('2 creat_task end')

    async def coro_func(self):
        """ 쓰레드에서 켜지는 코루틴 함수 ( asyncio.run_coroutine_threadsafe ) """
        print('3 coro_func start')
        await asyncio.sleep(5)
        # return await asyncio.sleep(3, 5)
        print('3 coro_func end')

    async def coro_test(self):
        """ 예약된 task 인 코루틴함수 ( asyncio.create_task() )   """
        print('4 coro_test start')
        await asyncio.sleep(5)
        print('4 coro_test end')

    def start_coro(self):
        """ 코루틴 run 함수"""
        self._loop.run_until_complete(obj_1.coro_task())


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    print("START MAIN")
    obj_1 = coro_thread(loop)
    x = threading.Thread(target=obj_1.another_thread, args=(), name="Thread_name_param")
    x.start()
    obj_1.start_coro()
    # 메인함수 안꺼지도록 타임을 제일 길게 셋업
    time.sleep(10)
    print("FINISH MAIN")

 

 

 

댓글