본문 바로가기
언어 정리/python_비동기관련_lib

코루틴으로 ROS2 작업

by 알 수 없는 사용자 2022. 11. 28.

handler - agent - manager 구조

 

Subscription 노드와 , Publisher 노드로 나눠서 진행한 코드랑

 

둘다 같은 노드로 묶어서 진행한 코드 2종류로 짬

 

콜백이라든지, TOPIC, msg_type, qos 정의부분을 최대한 top level class 에서 다룰 수 있게금 구조를 짬

 


한 노드에 Subscription, Publisher 둘다 묶은 코드

 

설명 : 1초 지날때마다 /topic_1 -> 2 -> 3 -> 4 publish 해줍니다.

topic_1 ~ 4 를 subscribing 하고 있기 때문에 바로바로 콜백이 온합니다

 

서브스크립션 , 퍼블리셔를 정의하는 부분은 dataclasses로 정의해 사용합니다.

 

 

 

 

출력값 : 

 

import json
import time
import asyncio
import signal
import rclpy
from rclpy.subscription import Subscription
from rclpy.publisher import Publisher
from rclpy.qos import QoSProfile
from rclpy.node import Node
from dataclasses import dataclass
from typing import TypeVar, Union, Callable, Any, Dict, List
from std_msgs.msg import String
import uuid

MsgType = TypeVar('MsgType')

# ====================================================================
# ROS handler.py =====================================================
# ====================================================================

@dataclass
class SubInfoType:
    msg_type: Any
    topic_name: str
    callback: Callable
    qos_profile: QoSProfile

@dataclass
class PubInfoType:
    msg_type: Any
    topic_name: str
    qos_profile: QoSProfile

class RosHandler(Node):
    def __init__(self, node_name):
        super().__init__(node_name)

    def return_subscription(self, sub_info: SubInfoType) -> Subscription:
        return self.create_subscription(
            sub_info.msg_type,
            sub_info.topic_name,
            sub_info.callback,
            sub_info.qos_profile
        )

    def return_sub_dataclass_type(self, msg_type: MsgType, topic: str, callback: dict, qos_profile: Union[int, QoSProfile]) -> SubInfoType:
        sub_info_1 = SubInfoType(
            msg_type,
            topic,
            callback[topic],
            self._return_qos_profile(qos_profile)
        )
        return sub_info_1

    def _return_qos_profile(self, qos_profile):
        if isinstance(qos_profile, int):
            _qos = rclpy.qos.QoSProfile(
                reliability=rclpy.qos.QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT,
                history=rclpy.qos.QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
                depth=qos_profile
            )
        elif isinstance(qos_profile, QoSProfile):
            _qos = qos_profile
        else:
            raise TypeError('QOS 타입에러')
        return _qos

# -------------------------------------------------------------------------------------------------------------

    def return_publisher(self, pub_info: PubInfoType) -> Publisher:
        return self.create_publisher(
            pub_info.msg_type,
            pub_info.topic_name,
            pub_info.qos_profile
        )

    def return_pub_dataclass_type(self, msg_type, topic, qos_profile) -> PubInfoType:
        pub_info_1 = PubInfoType(
            msg_type,
            topic,
            self._return_qos_profile(qos_profile)
        )
        return pub_info_1

# -------------------------------------------------------------------------------------------------------------

    def get_node_obj(self) -> Node:
        return self

# ====================================================================
# ROS agent.py =======================================================
# ====================================================================

NODE_NAME = 'ROS_NODE_AGENT'


SUB_TOPIC_1 = '/topic_1'
SUB_TOPIC_2 = '/topic_2'
SUB_TOPIC_3 = '/topic_3'
SUB_TOPIC_4 = '/topic_4'

PUB_TOPIC_1 = '/topic_1'
PUB_TOPIC_2 = '/topic_2'
PUB_TOPIC_3 = '/topic_3'
PUB_TOPIC_4 = '/topic_4'


class RosAgent():
    def __init__(self):
        self.print_f('Ros2Agent __init__ called')
        if rclpy.ok():  # 방어코드
            pass
        else:
            rclpy.init()
        self.callback_dict = {}
        self.publisher_dict = {}

        self.ros_handler = RosHandler(NODE_NAME)
        self._node_obj: Node = self.ros_handler.get_node_obj()
        self.print_f(self._node_obj.get_name())
        self.binding_subscription_with_node()
        self.binding_publisher_with_node()
        self.i = 0

    def print_f(self, _print):
        print('{}'.format(_print), flush=True)

    def binding_publisher_with_node(self):
        _topic=PUB_TOPIC_1
        tmp_dataclass = self.ros_handler.return_pub_dataclass_type(String, _topic, 10)
        self.publisher_dict[_topic]: Dict[str, Publisher] = self.ros_handler.return_publisher(tmp_dataclass)

        _topic = PUB_TOPIC_2
        tmp_dataclass = self.ros_handler.return_pub_dataclass_type(String, _topic, 10)
        self.publisher_dict[_topic]: Dict[str, Publisher] = self.ros_handler.return_publisher(tmp_dataclass)

        _topic = PUB_TOPIC_3
        tmp_dataclass = self.ros_handler.return_pub_dataclass_type(String, _topic, 10)
        self.publisher_dict[_topic]: Dict[str, Publisher] = self.ros_handler.return_publisher(tmp_dataclass)

        _topic = PUB_TOPIC_4
        tmp_dataclass = self.ros_handler.return_pub_dataclass_type(String, _topic, 10)
        self.publisher_dict[_topic]: Dict[str, Publisher] = self.ros_handler.return_publisher(tmp_dataclass)
        self.print_f('PUB_TOPIC : {}'.format(self.publisher_dict))


    def binding_subscription_with_node(self):
        self.callback_dict[SUB_TOPIC_1] = self.listener_callback_1
        tmp_dataclass = self.ros_handler.return_sub_dataclass_type(String, SUB_TOPIC_1, self.callback_dict, 10)
        self._node_obj._sub_1 = self.ros_handler.return_subscription(tmp_dataclass)

        self.callback_dict[SUB_TOPIC_2] = self.listener_callback_1
        tmp_dataclass = self.ros_handler.return_sub_dataclass_type(String, SUB_TOPIC_2, self.callback_dict, 10)
        self._node_obj._sub_2 = self.ros_handler.return_subscription(tmp_dataclass)

        self.callback_dict[SUB_TOPIC_3] = self.listener_callback_1
        tmp_dataclass = self.ros_handler.return_sub_dataclass_type(String, SUB_TOPIC_3, self.callback_dict, 10)
        self._node_obj._sub_3 = self.ros_handler.return_subscription(tmp_dataclass)

        self.callback_dict[SUB_TOPIC_4] = self.listener_callback_1
        tmp_dataclass = self.ros_handler.return_sub_dataclass_type(String, SUB_TOPIC_4, self.callback_dict, 10)
        self._node_obj._sub_4 = self.ros_handler.return_subscription(tmp_dataclass)
        self.print_f('SUB_TOPIC : {} , {} , {} , {}'.format(SUB_TOPIC_1,SUB_TOPIC_2,SUB_TOPIC_3,SUB_TOPIC_4))


    def listener_callback_1(self, msg):
        self._node_obj.get_logger().info('get_messaged : "%s"' % msg.data)
        #ros2 topic pub -1 /topic_1 std_msgs/msg/String data:\ \'HIHI\'\

    def listener_callback_2(self, msg):
        self._node_obj.get_logger().info('listner_22 I heard: "%s"' % msg.data)
        #ros2 topic pub -1 /topic_2 std_msgs/msg/String data:\ \'HIHI\'\

# ================================ SUBSCRIPTION 정의================================

    async def ros_task_main(self):
        asyncio.create_task(self.execute_node_spin())
        asyncio.create_task(self.publish_timer())

    async def execute_node_spin(self):
        async def spin_node(node_obj):
            rclpy.spin_once(node=node_obj,
                            timeout_sec=0.)  # timeout_sec 시간 만큼 실행권 가져오지 못함 0. 으로 설정 할시 Never blocking 개념
            # rclpy.spin(node=node_obj) # 이렇게 하면 spin 들어간 이후에 실행권을 가져오지 못함
            await asyncio.sleep(0.0)  # 코루틴 함수내에 await sleep 이 없으면 subscribe_callback이 실행이 안됨

        executor = rclpy.get_global_executor()
        print('subscription ready : ', executor.context.ok())
        while executor.context.ok():
            await asyncio.sleep(0.05)  #  while 문 속도제어 용 sleep
            await spin_node(self._node_obj)
            # await asyncio.create_task(self.spin_subscription(node_obj)) # 동일한 내용 이렇게도 쓸 수 있음.
            # asyncio.create_task(self.spin_subscription(node_obj))

    async def publish_timer(self):
        msg = String()
        dict_msg_type_1 = {
            1:2
        }
        dict_msg_type_2 = {
            3:4
        }
        dict_msg_type_3 = {
            5:6
        }
        dict_msg_type_4 = {
            7:8
        }

        while rclpy.ok():
            msg.data = json.dumps(dict_msg_type_1,ensure_ascii=True)
            self.publisher_dict[PUB_TOPIC_1].publish(msg)
            # self._node_obj._pub_1.publish(msg)
            # self.pub_1.publish(msg)
            await asyncio.sleep(1)
            msg.data = json.dumps(dict_msg_type_2, ensure_ascii=True)
            self.publisher_dict[PUB_TOPIC_2].publish(msg)
            # self._node_obj._pub_2.publish(msg)
            # self.pub_2.publish(msg)
            await asyncio.sleep(1)
            msg.data = json.dumps(dict_msg_type_3, ensure_ascii=True)
            self.publisher_dict[PUB_TOPIC_3].publish(msg)
            # self._node_obj._pub_3.publish(msg)
            # self.pub_3.publish(msg)
            await asyncio.sleep(1)
            msg.data = json.dumps(dict_msg_type_4, ensure_ascii=True)
            self.publisher_dict[PUB_TOPIC_4].publish(msg)
            # self._node_obj._pub_4.publish(msg)
            # self.pub_4.publish(msg)
            await asyncio.sleep(1)
# ============================== 코루틴으로 돌려주는 부분 ===============================

# ====================================================================
# ROS manager.py =======================================================
# ====================================================================
class RosManager():
    def __init__(self):
        self.ros_agent = RosAgent()

    def make_loop(self):
        while not rclpy.ok():
            time.sleep(0.01)
        self.main_loop = asyncio.get_event_loop()
        # self.main_loop.add_signal_handler(signal.SIGINT, self.ask_exit)
        # self.main_loop.add_signal_handler(signal.SIGTERM, self.ask_exit)
        self.main_loop.run_until_complete(self.make_task())
        # self.main_loop.run_forever()

    async def make_task(self):
        # ros2_task = asyncio.create_task(self.execute_spin())
        # mqtt_task = asyncio.create_task(self.mqtt_agent_main())
        await asyncio.gather(
            self.ros_agent.ros_task_main(),
            self.another_async()
        )
        print('TASK END ERROR ', flush=True)

    async def another_async(self):
        while True:
            await asyncio.sleep(5)
            # print('TASK Traffic ', flush=True)


if __name__ == '__main__':
    rclpy.init()
    a = RosManager()
    a.make_loop()

 

Subscription 노드와 , Publisher 노드로 나눠서 진행한 코드

node_subscription.py

import time

import rclpy
from rclpy.node import Node
from rclpy.subscription import Subscription
from rclpy.qos import QoSProfile
from rclpy.node import Node
from dataclasses import dataclass
from typing import TypeVar, Union, Callable, Any

from std_msgs.msg import String

import asyncio
import signal

from typing import Callable

# ====================================================================
# ROS handler.py =====================================================
# ====================================================================

@dataclass
class SubInfoType:
    msg_type: Any
    topic_name: str
    callback: Callable
    qos_profile: Union[rclpy.qos.QoSProfile, int]

class SubHandler(Node):
    def __init__(self, node_name):
        super().__init__(node_name)

    def return_subscription(self, sub_info: SubInfoType) -> Subscription:
        return self.create_subscription(
            sub_info.msg_type,
            sub_info.topic_name,
            sub_info.callback,
            sub_info.qos_profile
        )

    def return_sub_dataclass_type(self, msg_type, topic, callback, qos) -> SubInfoType:
        sub_info_1 = SubInfoType(
            msg_type,
            topic,
            callback[topic],
            qos
        )
        return sub_info_1

    def get_node_obj(self) -> Node:
        return self

# ====================================================================
# ROS agent.py =======================================================
# ====================================================================

NODE_NAME_1 = 'SUB_NODE'
TOPIC_1='/topic_1'
TOPIC_2='/topic_2'


class SubAgent():
    def __init__(self):
        if rclpy.ok():
            pass
        else:
            rclpy.init()
        self.callback_dict = {}

        self.sub_handler = SubHandler(NODE_NAME_1)
        self._node_obj: Node = self.sub_handler.get_node_obj()
        self.binding_subscription_with_node()

    def binding_subscription_with_node(self):
        self.callback_dict[TOPIC_1] = self.listener_callback_1
        sub_dataclass_1 = self.sub_handler.return_sub_dataclass_type(String, TOPIC_1, self.callback_dict, 10)
        self._node_obj._sub_1 = self.sub_handler.return_subscription(sub_dataclass_1)

        self.callback_dict[TOPIC_2] = self.listener_callback_2
        sub_dataclass_2 = self.sub_handler.return_sub_dataclass_type(String, TOPIC_2, self.callback_dict, 10)
        self._node_obj._sub_2 = self.sub_handler.return_subscription(sub_dataclass_2)

    def listener_callback_1(self, msg):
        self._node_obj.get_logger().info('listner_11 I heard: "%s"' % msg.data)
        #ros2 topic pub -1 /topic_1 std_msgs/msg/String data:\ \'HIHI\'\

    def listener_callback_2(self, msg):
        self._node_obj.get_logger().info('listner_22 I heard: "%s"' % msg.data)
        #ros2 topic pub -1 /topic_2 std_msgs/msg/String data:\ \'HIHI\'\

# ================================ SUBSCRIPTION 정의================================

    async def ros_sub_main(self):
        asyncio.create_task(self.execute_sub_spin())

    async def execute_sub_spin(self):
        async def spin_subscription(node_obj):
            rclpy.spin_once(node=node_obj,
                            timeout_sec=0.)  # timeout_sec 시간 만큼 실행권 가져오지 못함 0. 으로 설정 할시 Never blocking 개념
            # rclpy.spin(node=node_obj) # 이렇게 하면 spin 들어간 이후에 실행권을 가져오지 못함
            await asyncio.sleep(0.0)  # 코루틴 함수내에 await sleep 이 없으면 subscribe_callback이 실행이 안됨

        executor = rclpy.get_global_executor()
        print('subscription ready : ', executor.context.ok())
        while executor.context.ok():
            await asyncio.sleep(0.05)  #  while 문 속도제어 용 sleep
            await spin_subscription(self._node_obj)
            # await asyncio.create_task(self.spin_subscription(node_obj)) # 동일한 내용 이렇게도 쓸 수 있음.
            # asyncio.create_task(self.spin_subscription(node_obj))

# ============================== 코루틴으로 돌려주는 부분 ===============================

# ====================================================================
# ROS manager.py =======================================================
# ====================================================================
class RosManager():
    def __init__(self):
        self.ros_agent = SubAgent()

    def make_loop(self):
        while not rclpy.ok():
            time.sleep(0.01)
        self.main_loop = asyncio.get_event_loop()
        # self.main_loop.add_signal_handler(signal.SIGINT, self.ask_exit)
        # self.main_loop.add_signal_handler(signal.SIGTERM, self.ask_exit)
        self.main_loop.run_until_complete(self.make_task())
        # self.main_loop.run_forever()

    async def make_task(self):
        # ros2_task = asyncio.create_task(self.execute_spin())
        # mqtt_task = asyncio.create_task(self.mqtt_agent_main())
        await asyncio.gather(
            self.ros_agent.ros_sub_main(),
            self.another_async()
        )
        print('TASK END ERROR ', flush=True)

    async def another_async(self):
        while True:
            await asyncio.sleep(1)
            # print('TASK Traffic ', flush=True)


if __name__ == '__main__':
    rclpy.init()
    a = RosManager()
    a.make_loop()

 

node_publisher.py

import rclpy
from rclpy.node import Node

from std_msgs.msg import String

import time

import rclpy
from rclpy.node import Node
from rclpy.publisher import Publisher
from rclpy.qos import QoSProfile
from rclpy.node import Node
from dataclasses import dataclass
from typing import TypeVar, Union, Callable, Any

from std_msgs.msg import String

import asyncio
import signal

from typing import Callable

# ====================================================================
# ROS handler.py =====================================================
# ====================================================================

@dataclass
class PubInfoType:
    msg_type: Any
    topic_name: str
    qos_profile: Union[rclpy.qos.QoSProfile, int]

class PubHandler(Node):
    def __init__(self, node_name):
        super().__init__(node_name)

    def return_publisher(self, pub_info: PubInfoType) -> Publisher:
        return self.create_publisher(
            pub_info.msg_type,
            pub_info.topic_name,
            pub_info.qos_profile
        )

    def return_pub_dataclass_type(self, msg_type, topic, qos) -> PubInfoType:
        pub_info_1 = PubInfoType(
            msg_type,
            topic,
            qos
        )
        return pub_info_1

    def get_node_obj(self) -> Node:
        return self

# ====================================================================
# ROS agent.py =======================================================
# ====================================================================

NODE_NAME_1 = 'PUB_NODE'
TOPIC_1 = '/topic_1'
TOPIC_2 = '/topic_2'


class PubAgent(Node):
    def __init__(self):
        if rclpy.ok():
            pass
        else:
            rclpy.init()
        self.pub_handler = PubHandler(NODE_NAME_1)
        self._node_obj: Node = self.pub_handler.get_node_obj()
        self.binding_publisher_with_node()
        self.i = 0

    def binding_publisher_with_node(self):
        pub_dataclass_1 = self.pub_handler.return_pub_dataclass_type(String, TOPIC_1, 10)
        self._node_obj._pub_1 = self.pub_handler.return_publisher(pub_dataclass_1)

        pub_dataclass_2 = self.pub_handler.return_pub_dataclass_type(String, TOPIC_2, 10)
        self._node_obj._pub_2 = self.pub_handler.return_publisher(pub_dataclass_2)

        # super().__init__('minimal_publisher')
        # self.publisher_ = self.create_publisher(String, 'topic', 10)
        # timer_period = 0.5  # seconds

# ================================ SUBSCRIPTION 정의================================

    async def ros_pub_main(self):
        asyncio.create_task(self.execute_pub_spin())
        asyncio.create_task(self.publish_timer())

    async def execute_pub_spin(self):
        async def spin_pubscription(node_obj):
            rclpy.spin_once(node=node_obj,
                            timeout_sec=0.)  # timeout_sec 시간 만큼 실행권 가져오지 못함 0. 으로 설정 할시 Never blocking 개념
            # rclpy.spin(node=node_obj) # 이렇게 하면 spin 들어간 이후에 실행권을 가져오지 못함
            await asyncio.sleep(0.0)  # 코루틴 함수내에 await sleep 이 없으면 subscribe_callback이 실행이 안됨

        executor = rclpy.get_global_executor()
        print('pubscription ready : ', executor.context.ok())
        while executor.context.ok():
            await asyncio.sleep(0.05)  #  while 문 속도제어 용 sleep
            await spin_pubscription(self._node_obj)
            # await asyncio.create_task(self.spin_subscription(node_obj)) # 동일한 내용 이렇게도 쓸 수 있음.
            # asyncio.create_task(self.spin_subscription(node_obj))

    async def publish_timer(self):
        str = String()
        while rclpy.ok():
            str.data = '{} : HI'.format(self.i)
            self._node_obj._pub_1.publish(str)
            self.i = self.i + 1
            await asyncio.sleep(1)
            str.data = '{} : HI'.format(self.i)
            self._node_obj._pub_1.publish(str)
            self.i = self.i + 1
            await asyncio.sleep(1)
            str.data = '{} : HI'.format(self.i)
            self._node_obj._pub_2.publish(str)
            self.i = self.i + 1
            await asyncio.sleep(1)

# ============================== 코루틴으로 돌려주는 부분 ===============================

# ====================================================================
# ROS manager.py =======================================================
# ====================================================================
class RosManager():
    def __init__(self):
        self.ros_agent = PubAgent()

    def make_loop(self):
        while not rclpy.ok():
            time.sleep(0.01)
        self.main_loop = asyncio.get_event_loop()
        # self.main_loop.add_signal_handler(signal.SIGINT, self.ask_exit)
        # self.main_loop.add_signal_handler(signal.SIGTERM, self.ask_exit)
        self.main_loop.run_until_complete(self.make_task())
        # self.main_loop.run_forever()

    async def make_task(self):
        # ros2_task = asyncio.create_task(self.execute_spin())
        # mqtt_task = asyncio.create_task(self.mqtt_agent_main())
        await asyncio.gather(
            self.ros_agent.ros_pub_main(),
            self.another_async()
        )
        print('TASK END ERROR ', flush=True)

    async def another_async(self):
        while True:
            await asyncio.sleep(1)
            # print('TASK Traffic ', flush=True)


if __name__ == '__main__':
    rclpy.init()
    a = RosManager()
    a.make_loop()

 


 

댓글