본문 바로가기
ros2_python/Ros2 msg,srv,action,parm

Ros2 비동기적으로 spin( )

by 알 수 없는 사용자 2022. 9. 15.

subscription  2개

publisher      2개

 

비동기로 ros spin 돌리는 방법 2개

1. Thread

2. couroutine // asyncio

 

구조

Ros2Handler ->  ThreadAgent

Ros2Handler ->  AsyncAgent

 

 

추가 설명

Thread 같은경우엔 rclpy 에서 제공하는 Excutor Class 를 이용하면 되지만,

Asyncio 로 돌리고 싶어도 spin 함수에 들어가면 실행권을 가져올 수 가 없다.

spin_once 로 while 문돌려서 실행권을 계속해서 넘기면서 해야 가능함


import asyncio
from std_msgs.msg import String
import threading
import rclpy
from rclpy.executors import MultiThreadedExecutor, SingleThreadedExecutor
from rclpy.node import Node
import time


class Ros2Handler:
    def __init__(self):
        self.node_obj = rclpy.create_node('node_name_here')
        self.node_obj.publisher_1 = None
        self.node_obj.publisher_2 = None
        self.node_obj.subscription_1 = None
        self.node_obj.subscription_2 = None
        self.i = 0
        self.msg = String()
        self.set_up_sub_pub()

    def set_up_sub_pub(self):
        self.node_obj.subscription_1 = self.node_obj.create_subscription(
            String,
            'topic1',
            self.listener_callback_1,
            10)

        self.node_obj.subscription_2 = self.node_obj.create_subscription(
            String,
            'topic2',
            self.listener_callback_2,
            10)
        self.node_obj.publisher_1 = self.node_obj.create_publisher(String, 'topic1', 10)
        self.node_obj.publisher_2 = self.node_obj.create_publisher(String, 'topic2', 10)

    def listener_callback_1(self, msg: String):
        print('listener_callback_1 msg : [{}]'.format(msg), flush=True)

    def listener_callback_2(self, msg: String):
        print('listener_callback_2 msg : [{}]'.format(msg), flush=True)

    def run_publish_1(self, msg: String):
        self.node_obj.publisher_1.publish(msg)

    def run_publish_2(self, msg: String):
        self.node_obj.publisher_2.publish(msg)

    def get_node(self) -> Node :
        return self.node_obj


# 방법 1. ---- Threading 스타트
class ThreadAgent:
    """ 코드설명 :
    publish 는 0.1 초 마다 pub 한다는 가정이고
    ROS2 spin 을 Thread 로 제어 하는 구조. (Executor Class 를 이용하는 구조다)
    노드가 한개라  SingleThreadedExecutor 로 구현해도 문제없다.
    """
    def __init__(self):
        self.handler = Ros2Handler()
        self.msg: String = String()

    def main_threading(self, args=None):
        if rclpy.ok():  # 방어코드
            pass
        else:
            rclpy.init(args=args)

        _executor = MultiThreadedExecutor()
        _executor.add_node(self.handler.get_node())

        thread_task = threading.Thread(target=_executor.spin)
        thread_task.start()
        self.msg = String()
        while True:
            time.sleep(0.1)
            self.msg.data = 'publish_1'
            self.handler.run_publish_1(self.msg)
            time.sleep(0.1)
            self.msg.data = 'run_pub_2'
            self.handler.run_publish_2(self.msg)

        thread_task.join()  # 이밑으론 동작 X

        # Destroy the node explicitly
        # (optional - otherwise it will be done automatically
        # when the garbage collector destroys the node object)
        self.handler.destroy_node()
        rclpy.shutdown()


# 방법 2. ---- coroutine 스타트
class AsyncAgent:
    """ 코드설명 :
    publish 는 0.1 초 마다 pub 한다는 가정이고
    ROS2 spin_once 을 코루틴으로 제어 하는 구조
    """
    def __init__(self):
        self.handler = Ros2Handler()
        self.msg: String = String()

    def main_async(self, args=None):
        if rclpy.ok():  # 방어코드
            pass
        else:
            rclpy.init(args=args)

        _loop = asyncio.get_event_loop()
        _loop.run_until_complete(self.run_task())

    async def run_task(self):
        await asyncio.gather(
            self.run_sub(),
            self.run_pub()
        )

    async def run_sub(self):
        node_obj = self.handler.get_node()
        while True:
            await asyncio.sleep(0.05)  # 필요하진 않은데 while 문 속도제어 용 sleep
            await self.sub_spin(node_obj)
            # await asyncio.create_task(self.sub_spin(node_obj)) # 동일한 내용 이렇게도 쓸 수 있음.

    async def sub_spin(self, node_obj):
        rclpy.spin_once(node=node_obj, timeout_sec=0.001) # timeout_sec 시간 만큼 실행권 가져오지 못함
        # rclpy.spin(node=node_obj) # 이렇게 하면 spin 들어간 이후에 실행권을 가져오지 못함
        await asyncio.sleep(0.1) # spin 속도를 제어 해주는 역할

    async def run_pub(self):
        while True:
            self.msg.data = 'publish_1 on '
            self.handler.run_publish_1(self.msg)
            await asyncio.sleep(0.1)  # 0.1 초마다 pub 하는 조건때문에 만듬
            self.msg.data = 'publish_2 on '
            self.handler.run_publish_2(self.msg)
            await asyncio.sleep(0.1)  # 0.1 초마다 pub 하는 조건때문에 만듬


if __name__ == '__main__':
    rclpy.init()
    a = ThreadAgent()  # 쓰레드
    a.main_threading()
    # b = AsyncAgent()  # 어싱크
    # b.main_async()

 

콘솔 결과 :

코루틴 돌렸을때

 

멀티쓰레드 돌렸을때

 

댓글