본문 바로가기
언어 정리/python_비동기관련_lib

다른사람 예제 rclpy.executors 랑 rclpy.task 관련 예시 퍼옴

by 알 수 없는 사용자 2022. 8. 17.
"""

https://github.com/ros2/rclpy/blob/691e4fbfcb4bd4cc2a01182a7dced3105a78200b/rclpy/test/test_executor.py#L226
"""

# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import threading
import time
import unittest

import rclpy
from rclpy.executors import MultiThreadedExecutor
from rclpy.executors import ShutdownException
from rclpy.executors import SingleThreadedExecutor
from rclpy.task import Future


class TestExecutor(unittest.TestCase):

    def setUp(self):
        self.context = rclpy.context.Context()
        rclpy.init(context=self.context)
        self.node = rclpy.create_node('TestExecutor', namespace='/rclpy', context=self.context)

    def tearDown(self):
        self.node.destroy_node()
        rclpy.shutdown(context=self.context)

    def func_execution(self, executor):
        got_callback = False

        def timer_callback():
            nonlocal got_callback
            got_callback = True

        tmr = self.node.create_timer(0.1, timer_callback)

        assert executor.add_node(self.node)
        executor.spin_once(timeout_sec=1.23)
        # TODO(sloretz) redesign test, sleeping to workaround race condition between test cleanup
        # and MultiThreadedExecutor thread pool
        time.sleep(0.1)

        self.node.destroy_timer(tmr)
        return got_callback

    def test_single_threaded_executor_executes(self):
        self.assertIsNotNone(self.node.handle)
        executor = SingleThreadedExecutor(context=self.context)
        try:
            self.assertTrue(self.func_execution(executor))
        finally:
            executor.shutdown()

    def test_executor_immediate_shutdown(self):
        self.assertIsNotNone(self.node.handle)
        executor = SingleThreadedExecutor(context=self.context)
        try:
            got_callback = False

            def timer_callback():
                nonlocal got_callback
                got_callback = True

            timer_period = 1
            tmr = self.node.create_timer(timer_period, timer_callback)

            self.assertTrue(executor.add_node(self.node))
            t = threading.Thread(target=executor.spin, daemon=True)
            start_time = time.monotonic()
            t.start()
            executor.shutdown()
            t.join()
            end_time = time.monotonic()

            self.node.destroy_timer(tmr)
            self.assertLess(end_time - start_time, timer_period / 2)
            self.assertFalse(got_callback)
        finally:
            executor.shutdown()

    def test_shutdown_executor_before_waiting_for_callbacks(self):
        self.assertIsNotNone(self.node.handle)
        for cls in [SingleThreadedExecutor, MultiThreadedExecutor]:
            executor = cls(context=self.context)
            executor.shutdown()
            with self.assertRaises(ShutdownException):
                executor.wait_for_ready_callbacks()

    def test_shutdown_exception_from_callback_generator(self):
        self.assertIsNotNone(self.node.handle)
        for cls in [SingleThreadedExecutor, MultiThreadedExecutor]:
            executor = cls(context=self.context)
            cb_generator = executor._wait_for_ready_callbacks()
            executor.shutdown()
            with self.assertRaises(ShutdownException):
                next(cb_generator)

    def test_remove_node(self):
        self.assertIsNotNone(self.node.handle)
        executor = SingleThreadedExecutor(context=self.context)

        got_callback = False

        def timer_callback():
            nonlocal got_callback
            got_callback = True

        try:
            tmr = self.node.create_timer(0.1, timer_callback)
            try:
                executor.add_node(self.node)
                executor.remove_node(self.node)
                executor.spin_once(timeout_sec=0.2)
            finally:
                self.node.destroy_timer(tmr)
        finally:
            executor.shutdown()

        assert not got_callback

    def test_multi_threaded_executor_executes(self):
        self.assertIsNotNone(self.node.handle)
        executor = MultiThreadedExecutor(context=self.context)
        try:
            self.assertTrue(self.func_execution(executor))
        finally:
            executor.shutdown()

    def test_add_node_to_executor(self):
        self.assertIsNotNone(self.node.handle)
        executor = SingleThreadedExecutor(context=self.context)
        executor.add_node(self.node)
        self.assertIn(self.node, executor.get_nodes())

    def test_executor_spin_non_blocking(self):
        self.assertIsNotNone(self.node.handle)
        executor = SingleThreadedExecutor(context=self.context)
        executor.add_node(self.node)
        start = time.monotonic()
        executor.spin_once(timeout_sec=0)
        end = time.monotonic()
        self.assertLess(start - end, 0.001)

    def test_execute_coroutine_timer(self):
        self.assertIsNotNone(self.node.handle)
        executor = SingleThreadedExecutor(context=self.context)
        executor.add_node(self.node)

        called1 = False
        called2 = False

        async def coroutine():
            nonlocal called1
            nonlocal called2
            called1 = True
            await asyncio.sleep(0)
            called2 = True

        tmr = self.node.create_timer(0.1, coroutine)
        try:
            executor.spin_once(timeout_sec=1.23)
            self.assertTrue(called1)
            self.assertFalse(called2)

            called1 = False
            executor.spin_once(timeout_sec=0)
            self.assertFalse(called1)
            self.assertTrue(called2)
        finally:
            self.node.destroy_timer(tmr)

    def test_execute_coroutine_guard_condition(self):
        self.assertIsNotNone(self.node.handle)
        executor = SingleThreadedExecutor(context=self.context)
        executor.add_node(self.node)

        called1 = False
        called2 = False

        async def coroutine():
            nonlocal called1
            nonlocal called2
            called1 = True
            await asyncio.sleep(0)
            called2 = True

        gc = self.node.create_guard_condition(coroutine)
        try:
            gc.trigger()
            executor.spin_once(timeout_sec=0)
            self.assertTrue(called1)
            self.assertFalse(called2)

            called1 = False
            executor.spin_once(timeout_sec=1)
            self.assertFalse(called1)
            self.assertTrue(called2)
        finally:
            self.node.destroy_guard_condition(gc)

    def test_create_task_coroutine(self):
        self.assertIsNotNone(self.node.handle)
        executor = SingleThreadedExecutor(context=self.context)
        executor.add_node(self.node)

        async def coroutine():
            return 'Sentinel Result'

        future = executor.create_task(coroutine)
        self.assertFalse(future.done())

        executor.spin_once(timeout_sec=0)
        self.assertTrue(future.done())
        self.assertEqual('Sentinel Result', future.result())

    def test_create_task_normal_function(self):
        self.assertIsNotNone(self.node.handle)
        executor = SingleThreadedExecutor(context=self.context)
        executor.add_node(self.node)

        def func():
            return 'Sentinel Result'

        future = executor.create_task(func)
        self.assertFalse(future.done())

        executor.spin_once(timeout_sec=0)
        self.assertTrue(future.done())
        self.assertEqual('Sentinel Result', future.result())

    def test_create_task_dependent_coroutines(self):
        self.assertIsNotNone(self.node.handle)
        executor = SingleThreadedExecutor(context=self.context)
        executor.add_node(self.node)

        async def coro1():
            return 'Sentinel Result 1'

        future1 = executor.create_task(coro1)

        async def coro2():
            nonlocal future1
            await future1
            return 'Sentinel Result 2'

        future2 = executor.create_task(coro2)

        # Coro2 is newest task, so it gets to await future1 in this spin
        executor.spin_once(timeout_sec=0)
        # Coro1 execs in this spin
        executor.spin_once(timeout_sec=0)
        self.assertTrue(future1.done())
        self.assertEqual('Sentinel Result 1', future1.result())
        self.assertFalse(future2.done())

        # Coro2 passes the await step here (timeout change forces new generator)
        executor.spin_once(timeout_sec=1)
        self.assertTrue(future2.done())
        self.assertEqual('Sentinel Result 2', future2.result())

    def test_create_task_during_spin(self):
        self.assertIsNotNone(self.node.handle)
        executor = SingleThreadedExecutor(context=self.context)
        executor.add_node(self.node)

        future = None

        def spin_until_task_done(executor):
            nonlocal future
            while future is None or not future.done():
                try:
                    executor.spin_once()
                finally:
                    executor.shutdown()
                    break

        # Start spinning in a separate thread
        thr = threading.Thread(target=spin_until_task_done, args=(executor, ), daemon=True)
        thr.start()

        # Sleep in this thread to give the executor a chance to reach the loop in
        # '_wait_for_ready_callbacks()'
        time.sleep(1)

        def func():
            return 'Sentinel Result'

        # Create a task
        future = executor.create_task(func)

        thr.join(timeout=0.5)
        # If the join timed out, remove the node to cause the spin thread to stop
        if thr.is_alive():
            executor.remove_node(self.node)

        self.assertTrue(future.done())
        self.assertEqual('Sentinel Result', future.result())

    def test_global_executor_completes_async_task(self):
        self.assertIsNotNone(self.node.handle)

        class TriggerAwait:

            def __init__(self):
                self.do_yield = True

            def __await__(self):
                while self.do_yield:
                    yield
                return

        trigger = TriggerAwait()
        did_callback = False
        did_return = False

        async def timer_callback():
            nonlocal trigger, did_callback, did_return
            did_callback = True
            await trigger
            did_return = True

        timer = self.node.create_timer(0.1, timer_callback)

        executor = SingleThreadedExecutor(context=self.context)
        rclpy.spin_once(self.node, timeout_sec=0.5, executor=executor)
        self.assertTrue(did_callback)

        timer.cancel()
        trigger.do_yield = False
        rclpy.spin_once(self.node, timeout_sec=0, executor=executor)
        self.assertTrue(did_return)

    def test_executor_add_node(self):
        self.assertIsNotNone(self.node.handle)
        executor = SingleThreadedExecutor(context=self.context)
        assert executor.add_node(self.node)
        assert id(executor) == id(self.node.executor)
        assert not executor.add_node(self.node)
        assert id(executor) == id(self.node.executor)

    def test_executor_spin_until_future_complete_timeout(self):
        self.assertIsNotNone(self.node.handle)
        executor = SingleThreadedExecutor(context=self.context)
        executor.add_node(self.node)

        def timer_callback():
            pass
        timer = self.node.create_timer(0.003, timer_callback)

        # Timeout
        future = Future()
        self.assertFalse(future.done())
        start = time.monotonic()
        executor.spin_until_future_complete(future=future, timeout_sec=0.1)
        end = time.monotonic()
        # Nothing is ever setting the future, so this should have waited
        # at least 0.1 seconds.
        self.assertGreaterEqual(end - start, 0.1)
        self.assertFalse(future.done())

        timer.cancel()

    def test_executor_spin_until_future_complete_future_done(self):
        self.assertIsNotNone(self.node.handle)
        executor = SingleThreadedExecutor(context=self.context)
        executor.add_node(self.node)

        def timer_callback():
            pass
        timer = self.node.create_timer(0.003, timer_callback)

        def set_future_result(future):
            future.set_result('finished')

        # Future complete timeout_sec > 0
        future = Future()
        self.assertFalse(future.done())
        t = threading.Thread(target=lambda: set_future_result(future))
        t.start()
        executor.spin_until_future_complete(future=future, timeout_sec=0.2)
        self.assertTrue(future.done())
        self.assertEqual(future.result(), 'finished')

        # Future complete timeout_sec = None
        future = Future()
        self.assertFalse(future.done())
        t = threading.Thread(target=lambda: set_future_result(future))
        t.start()
        executor.spin_until_future_complete(future=future, timeout_sec=None)
        self.assertTrue(future.done())
        self.assertEqual(future.result(), 'finished')

        # Future complete timeout < 0
        future = Future()
        self.assertFalse(future.done())
        t = threading.Thread(target=lambda: set_future_result(future))
        t.start()
        executor.spin_until_future_complete(future=future, timeout_sec=-1)
        self.assertTrue(future.done())
        self.assertEqual(future.result(), 'finished')

        timer.cancel()

    def test_executor_spin_until_future_complete_do_not_wait(self):
        self.assertIsNotNone(self.node.handle)
        executor = SingleThreadedExecutor(context=self.context)
        executor.add_node(self.node)

        def timer_callback():
            pass
        timer = self.node.create_timer(0.003, timer_callback)

        # Do not wait timeout_sec = 0
        future = Future()
        self.assertFalse(future.done())
        executor.spin_until_future_complete(future=future, timeout_sec=0)
        self.assertFalse(future.done())

        timer.cancel()

    def test_executor_add_node_wakes_executor(self):
        self.assertIsNotNone(self.node.handle)
        got_callback = False

        def timer_callback():
            nonlocal got_callback
            got_callback = True

        timer_period = 0.1
        tmr = self.node.create_timer(timer_period, timer_callback)

        executor = SingleThreadedExecutor(context=self.context)
        try:
            # spin in background
            t = threading.Thread(target=executor.spin_once, daemon=True)
            t.start()
            # sleep to make sure executor is blocked in rcl_wait
            time.sleep(0.5)

            self.assertTrue(executor.add_node(self.node))
            # Make sure timer has time to trigger
            time.sleep(timer_period)

            self.assertTrue(got_callback)
        finally:
            executor.shutdown()
            self.node.destroy_timer(tmr)


if __name__ == '__main__':
    unittest.main()

댓글