본문 바로가기
ros2_python/Ros2 msg,srv,action,parm

python MultiThread( publisher 2개, suscriber 2개 )

by 알 수 없는 사용자 2022. 9. 1.
코드 의도 :
멀티쓰레드로 등록된 subscription에 data를 쏴줄 때마다 callback 함수가 호출되는데,
callback 함수가 호출 될 때 마다 해당된 publisher를 publish해주는 코드
동작은 50초간 지속

 

 

ros2_agnet.py

import os
import sys
import time
import rclpy
from std_msgs.msg import String
import threading
from ros2_handler import SubNodeMaker, PubNodeMaker, Ros2Handler

path = os.path.abspath(os.path.dirname(sys.argv[0]))
sys.path.append(path[:path.rfind('/')])
#----------------------------------------------------------------------


if __name__ == "__main__":
    rclpy.init()
    Ros2Handler.init_rclpy()

#========================================================publisher config
    pub_node_maker_1 = PubNodeMaker()
    # 메시지 정의-----------------------------------------------------------------
    _msg_1 = String()
    _msg_1.data = 'pub_data_from_pub_agent_1'
    # --------------------------------------------------------------------------
    pub_agent_1 = Ros2Handler(pub_node_maker_1(node_name="pub_node_1", msg_type=String, topic_name="/node_name_pub1/aa",
                                               qos_profile=10))
    # pub_agent_1.run_publish(_msg_1)         # 이 명령할 떄마다 한번 씩 publish 함


    pub_node_maker_2 = PubNodeMaker()
    # 메시지 정의-----------------------------------------------------------------
    _msg_2 = String()
    _msg_2.data = 'pub_data_from_pub_agent_2'
    # --------------------------------------------------------------------------
    pub_agent_2 = Ros2Handler(pub_node_maker_2("pub_node_2", String, "/node_name_pub2/aa", 10))
    # pub_agent_2.run_publish(_msg_2)         # 이 명령할 떄마다 한번 씩 publish 함


# ========================================================subscription config
    sub_node_maker_1 = SubNodeMaker()
    # 콜백 정의------------------------------------------------------------------
    def listener_callback_1(msg):
        print()
        print('sub_callback_1, The number of active thread', threading.active_count())
        print('I heard: ', msg.data)
        pub_agent_1.run_publish(_msg_1)
        print('executing Node id : ', Ros2Handler._execute_list)
    # --------------------------------------------------------------------------
    sub_agent_1 = Ros2Handler(sub_node_maker_1(node_name="sub_node_1", msg_type=String, topic_name="/node_name_sub1/aa",
                                               callback=listener_callback_1, qos_profile=10))
    sub_node_maker_2 = SubNodeMaker()


    # 콜백 정의------------------------------------------------------------------
    def listener_callback_1(msg):
        print()
        print('sub_callback_2, The number of active thread', threading.active_count())
        print('I heard: ', msg.data)
        pub_agent_2.run_publish(_msg_2)
        print('executing Node id : ', Ros2Handler._execute_list)
    # --------------------------------------------------------------------------
    sub_agent_2 = Ros2Handler(sub_node_maker_2("sub_node_2", String, "/node_name_sub2/aa", listener_callback_1, 10))


    Ros2Handler.run_executor()
    print('running_Thread_num : ', threading.active_count())
    time.sleep(50)
    print('running_Thread_num', threading.active_count())

"""
ros2 topic echo /node_name_pub1/aa 
ros2 topic echo /node_name_pub2/aa
ros2 topic pub --once /node_name_sub1/aa std_msgs/msg/String "data : topic_test_1!"
ros2 topic pub --once /node_name_sub2/aa std_msgs/msg/String "data : topic_test_2!"
"""

 

 

ros2_handler.py

# -*- coding: utf-8 -*-
import rclpy
from rclpy.qos import QoSProfile
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
from rclpy.node import Publisher
from rclpy.node import Subscription
from threading import Thread

from dataclasses import dataclass
from typing import Any
from typing import Union

#dataclass 변수어노테이션으로 요긴하게 쓰는중
@dataclass
class SubNodeInfo:
    node_name: str
    msg_type: Any
    topic_name: str
    callback: Any
    qos_profile: rclpy.qos.QoSProfile


@dataclass
class PubNodeInfo:
    node_name: str
    msg_type: Any
    topic_name: str
    qos_profile: rclpy.qos.QoSProfile


# class SubNodeMaker(SubNodeInfo):
class SubNodeMaker:
    def __init__(self):
        self.qos_profile = None

    def __call__(self, node_name: str, msg_type: Any, topic_name: str, callback: Any,
                 qos_profile: Union[int, QoSProfile]) -> SubNodeInfo:

        if isinstance(qos_profile, int):
            self.qos_profile = rclpy.qos.QoSProfile(
                reliability=rclpy.qos.QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT,
                history=rclpy.qos.QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
                depth=qos_profile
            )
        elif isinstance(qos_profile, QoSProfile):
            self.qos_profile = qos_profile
        else:
            raise TypeError('PubNodeMaker 에서 config TYPE ERROR 발생')

        _subscription_type = SubNodeInfo(
            node_name=node_name,
            msg_type=msg_type,
            topic_name=topic_name,
            callback=callback,
            qos_profile=self.qos_profile
        )
        return _subscription_type


# class PubNodeMaker(PubNodeInfo):
class PubNodeMaker:
    def __init__(self):
        self.qos_profile = None

    def __call__(self, node_name: str, msg_type: Any, topic_name: str,
                 qos_profile: Union[int, QoSProfile]) -> PubNodeInfo:

        if isinstance(qos_profile, int):
            self.qos_profile = rclpy.qos.QoSProfile(
                reliability=rclpy.qos.QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT,
                history=rclpy.qos.QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
                depth=qos_profile
            )
        elif isinstance(qos_profile, QoSProfile):
            self.qos_profile = qos_profile
        else:
            raise TypeError('PubNodeMaker 에서 config TYPE ERROR 발생')

        self.publisher_type = PubNodeInfo(
            node_name=node_name,
            msg_type=msg_type,
            topic_name=topic_name,
            qos_profile=self.qos_profile
        )
        return self.publisher_type


class Ros2Handler(object):
    """
    코드 의도 :
    멀티쓰레드로 등록된 subscription에 data를 쏴줄 때마다 callback 함수가 호출되는데,
    callback 함수가 호출 될 때 마다 해당된 publisher를 publish해주는 코드
    동작은 50초간 지속

    코드설명 :
    Ros2Handler.init_rclpy() 랑  Ros2Handler.run_executor()는 class method로 따로 켜줘야함.
    init_rclpy() 는 rclpy.init() 용 방어코드(안써도됨),  Ros2Handler.run_executor()는 여태 등록된 executor를 한번에 켜주기 위함
    나머지는 코드 등록 과정
    Ros2Handler 의 파라미터는 커스텀 dataclass로 알맞은 파라미터를 보내지 않으면 에러 발생

    """
    _execute_list = []
    _executor: MultiThreadedExecutor
    def __init__(self, node_info: Union[SubNodeInfo, PubNodeInfo]):  # , _queue: asyncio.Queue):
        """
        """
        self.sub_node_info: SubNodeInfo = None
        self.pub_node_info: PubNodeInfo = None
        self._sub_node_obj: Node = None
        self._pub_node_obj: Node = None
        self.set_executor()
        if isinstance(node_info, SubNodeInfo):
            self.sub_node_info = node_info
            self._set_up_subscription()
        elif isinstance(node_info, PubNodeInfo):
            self.pub_node_info = node_info
            self._set_up_publisher()
        else:
            raise TypeError("class Ros2Handler parameter type ERROR")

    @classmethod
    def init_rclpy(cls):
        """ 한번만 수행되어야 하므로 클래쓰메쏘드 사용 """
        if rclpy.ok():
            pass
        else:
            rclpy.init()

    @classmethod
    def set_executor(cls):
        cls._executor = MultiThreadedExecutor()

    @classmethod
    def remove_node_from_executor(cls):
        """필요시 사용 따로 쓰진 않는중"""
        cls._executor.remove_node()

    @classmethod
    def run_executor(cls):
        for _ in range(0,len(cls._execute_list)):
            cls._executor.add_node(cls._execute_list[_])
        _executor_thread = Thread(target=cls._executor.spin, daemon=True)
        print("ROS2 Executor's runnung nodes number : [{}] ".format(len(cls._execute_list)), flush=True)
        print("ROS2 Executor's runnung nodes names : [{}] ".format(cls._executor.get_nodes()), flush=True)
        _executor_thread.start()

    def _set_up_subscription(self) -> None:
        self._create_node()
        self._node_obj_create_subscription()

    def _set_up_publisher(self) -> None:
        self._create_node()
        self._node_obj_create_publisher()

    def _create_node(self) -> None:
        """
        node object 생성
        """
        if rclpy.ok():
            pass
        else:
            rclpy.init()

        if isinstance(self.sub_node_info, SubNodeInfo):
            print('ROS2 create_node for subscription', flush=True)
            self._sub_node_obj = rclpy.create_node(self.sub_node_info.node_name)
        elif isinstance(self.pub_node_info, PubNodeInfo):
            print('ROS2 create_node for publisher', flush=True)
            self._pub_node_obj = rclpy.create_node(self.pub_node_info.node_name)
        else:
            raise TypeError("class Ros2Handler parameter type ERROR")

    def _node_obj_create_subscription(self) -> None:
        """subscribe 노드 생성시 사용 되는 함수"""
        print("create_subscription on ", flush=True)
        self._sub_node_obj.subscription: Subscription = self._sub_node_obj.create_subscription(
            self.sub_node_info.msg_type,
            self.sub_node_info.topic_name,
            self.sub_node_info.callback,
            self.sub_node_info.qos_profile
        )
        self._execute_list.append(self._sub_node_obj)

    def _node_obj_create_publisher(self) -> None:
        """publisher 노드 생성시 사용 되는 함수"""
        print("create_publisher on ", flush=True)
        self._pub_node_obj.publisher: Publisher = self._pub_node_obj.create_publisher(
            self.pub_node_info.msg_type,
            self.pub_node_info.topic_name,
            self.pub_node_info.qos_profile
        )
        # self._execute_list.append(self._pub_node_obj)

    def run_publish(self, msg):
        print("ROS2 run_publish on, topic name's: ", self._pub_node_obj.publisher.topic_name, flush=True)
        print('The number of this message published is [{}] '.format(self._pub_node_obj.publisher.get_subscription_count()))
        self._pub_node_obj.publisher.publish(msg)
        # self._pub_node_obj.publisher.destroy()

 

 

 

실행결과 :

댓글