본문 바로가기
ros2_python/Ros2 msg,srv,action,parm

ROS2_msg_pub_sub_multithread_python

by 알 수 없는 사용자 2022. 7. 21.

코드

subscriber_node_maker

# ===========================================================================================
# !/usr/bin/python
# -*- coding: utf-8 -*-
#----------------------------------------------------------------------
import os
import sys
path = os.path.abspath(os.path.dirname(sys.argv[0]))
sys.path.append(path[:path.rfind('/')])
#----------------------------------------------------------------------
import rclpy
from rclpy.qos import QoSProfile
from util.util_logger import Logger


class Ros2SubNodeAgent:
    """
    멀티 쓰레드 에 멀티 노드 구조
    멀티쓰레딩 subscribe 내용을 참조 사이트
    https://answers.ros.org/question/377848/spinning-multiple-nodes-across-multiple-threads/
    """
    def __init__(self, _module_info):
        """
        """
        self._executor = None
        self._module_info = _module_info
        self._qos_profile = QoSProfile(depth=10)
        self._logger = Logger.get_instance("ros2")
        self._node_obj = None
        self._rate = None

    def set_up_node_with_binding_subscription(self):
        self.create_node()
        self.node_obj_create_subscription()
        self.node_obj_create_rate()

    def create_node(self) -> None:
        """
        node object 생성
        """
        if rclpy.ok():
            pass
        else:
            rclpy.init()

        _node_name = self._module_info['node_info']['node_name']
        # self._logger.debug('_node_name [{}]'.format(type(_node_name)))
        self._logger.debug('_node_name [{}]'.format(_node_name))
        self._node_obj = rclpy.create_node(_node_name)
        self._logger.debug('_node_objj [{}]'.format(self._node_obj))

    def node_obj_create_subscription(self) -> None:
        """
        executor에 등록된 node_obj에 type,topic_name,콜백함수,qos 정보 삽입
        """
        _callback_func = self._module_info['node_info']['callback_func']
        self._logger.debug("msg_type [{}] , "
                           "topic name [{}] , "
                           "topic name type [{}] , "
                           "callback_func [{}] , "
                           "qos_profile [{}]".format(
            self._module_info['node_info']['msg_type'],
            self._module_info['node_info']['topic_name'],
            type(self._module_info['node_info']['topic_name']),
            self._module_info['node_info']['callback_func'],
            self._qos_profile)
        )

        self._node_obj.subscription = self._node_obj.create_subscription(
            self._module_info['node_info']['msg_type'],
            self._module_info['node_info']['topic_name'],
            self._module_info['node_info']['callback_func'],
            self._qos_profile)

    def node_obj_create_rate(self):
        """
        각 node 당 rate time 설정
        """
        self._logger.debug('node_obj_create_rate : [{}]'.format(
            self._module_info['node_info']['rate_sec']))
        self._rate = self._node_obj.create_rate(self._module_info['node_info']['rate_sec'])
        # self._node_obj.create_rate(self._module_info['node_info']['rate_sec']).sleep()
        self._logger.debug('node_obj_create_rate222')

    def get_node_obj_id(self):
        return self._node_obj

 


 

publisher_node_maker

# ===========================================================================================
# !/usr/bin/python
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------
import os
import sys

path = os.path.abspath(os.path.dirname(sys.argv[0]))
sys.path.append(path[:path.rfind('/')])
# ----------------------------------------------------------------------
import rclpy
from rclpy.qos import QoSProfile
from util.util_logger import Logger


class Ros2PubNodeAgent:
    """
    멀티 쓰레드 에 멀티 노드 구조
    멀티쓰레딩 pubscribe 내용을 참조 사이트
    https://answers.ros.org/question/377848/spinning-multiple-nodes-across-multiple-threads/
    """

    def __init__(self, _module_info):
        """
        """
        self._executor = None
        self._module_info = _module_info
        self._qos_profile = QoSProfile(depth=10)
        self._logger = Logger.get_instance("ros2")
        self._node_obj = None
        self._rate = None

    def set_up_node_with_binding_publisher(self):
        self.create_node()
        self.node_obj_create_publisher()

    def create_node(self) -> None:
        """
        node object 생성
        """
        if rclpy.ok():
            pass
        else:
            rclpy.init()

        _node_name = self._module_info['node_info']['node_name']
        self._logger.debug('_node_name [{}]'.format(_node_name))
        self._node_obj = rclpy.create_node(_node_name)
        self._logger.debug('_node_objj [{}]'.format(self._node_obj))

    def node_obj_create_publisher(self) -> None:
        """
        executor에 등록된 node_obj에 type,topic_name,콜백함수,qos 정보 삽입
        """
        _callback_func = self._module_info['node_info']['callback_func']
        self._logger.debug(
            "msg_type [{}] , "
            "topic name [{}] , "
            "topic name type [{}] , "
            "callback_func [{}] , "
            "qos_profile [{}]".format(
                self._module_info['node_info']['msg_type'],
                self._module_info['node_info']['topic_name'],
                type(self._module_info['node_info']['topic_name']),
                self._module_info['node_info']['callback_func'],
                self._qos_profile))

        self._node_obj.publisher = self._node_obj.create_publisher(
            self._module_info['node_info']['msg_type'],
            self._module_info['node_info']['topic_name'],
            self._qos_profile)

    def get_node_obj_id(self):
        return self._node_obj

    def publish_func(self, __msg):
        """
        퍼블리시 함수
        :return:
        """
        self._node_obj.publisher.publish(__msg)

 


MultiThread starter

import os
import sys
import time

path = os.path.abspath(os.path.dirname(sys.argv[0]))
sys.path.append(path[:path.rfind('/')])
#----------------------------------------------------------------------
import rclpy
from rclpy.executors import Executor  # noqa: F401
from threading import Thread
from std_msgs.msg import String
from dataclasses import asdict
from constant.dataclass import SubNodeInfo, PubNodeInfo, NodeDetail
from util.util_logger import Logger
from ros2.ros2_sub_node_agent import Ros2SubNodeAgent
from ros2.ros2_pub_node_agent import Ros2PubNodeAgent

# _subscribe_module_1_info
NODE_NAME_1_SUB = 'sub_node_name_1'
MSG_TYPE_1_SUB = 'String'
TOPIC_1_SUB = 'topic_name/_1'
RATE_HZ_1_SUB = '2.0'
CALLBACK_1_SUB = 'self.subscribe_module_1_info_callback'
MODULE_1_SUB = 'sub_module_1'

NODE_NAME_2_SUB = 'sub_node_name_2_broadcast'
MSG_TYPE_2_SUB = 'String'
TOPIC_2_SUB = 'topic_name/_2/_broadcast'
RATE_HZ_2_SUB = '2.0'
CALLBACK_2_SUB = 'self.subscribe_module_2_info_callback'
MODULE_2_SUB = 'module_2'

NODE_NAME_1_PUB = 'pub_node_name_1'
MSG_TYPE_1_PUB = 'String'
TOPIC_1_PUB = 'pub_topic_name/_1'
RATE_HZ_1_PUB = '2.0'
CALLBACK_1_PUB = 'no_use'
MODULE_1_PUB = 'pub_module_1'


class Ros2ModuleManager():
    """
    Ros2SubNodeAgent : client , node_obj 생성클래쓰
    Ros2ModuleManager : manager 역할
    """
    def __init__(self, **kwargs):
        self._queue_dict = self._trans_to_queue(kwargs)
        self._logger = Logger.get_instance('module_manager')
        self._logger.debug('Ros2ModuleManager queue id : [{}]'.format(
            id(self._queue_dict['module_to_robot'])))
        self._subscribe_module_1_info = self._get_node_info(NODE_NAME_1_SUB)
        self._subscribe_module_2_info = self._get_node_info(NODE_NAME_2_SUB)
        self._publish_to_module_1_info = self._get_node_info(NODE_NAME_1_PUB)
        self._publish_module_1_obj = None
        self._executor = None
        rclpy.init()
        self._executor = rclpy.executors.MultiThreadedExecutor()
        self._setup_for_thread()

    def _trans_to_queue(self, _kwargs):
        """
        async queue 랑 queue를 인자로 넘겨준걸 dictionary형태로 변환
        """
        _queue_dict = {}
        for _key, _val in _kwargs.items():
            _queue_dict[_key] = _val
        return _queue_dict

    def _get_node_info(self, node_name):
        if node_name == NODE_NAME_1_SUB:
            _node_detail_1 = NodeDetail(
                node_name=NODE_NAME_1_SUB,
                msg_type=MSG_TYPE_1_SUB,
                topic_name=TOPIC_1_SUB,
                rate_sec=RATE_HZ_1_SUB,
                callback_func=CALLBACK_1_SUB
            )
            _sub_node_info_1 = SubNodeInfo(
                module_name=MODULE_1_SUB,
                node_info=_node_detail_1
            )
            _subscribe_module_1_info = asdict(_sub_node_info_1)
            if _subscribe_module_1_info['node_info']['callback_func'] == CALLBACK_1_SUB:
                _subscribe_module_1_info['node_info']['msg_type'] = String
                _subscribe_module_1_info['node_info']['rate_sec'] = eval(RATE_HZ_1_SUB)
                _subscribe_module_1_info['node_info']['callback_func'] = eval(CALLBACK_1_SUB)
                return _subscribe_module_1_info
        elif node_name == NODE_NAME_2_SUB:
            _node_detail_2 = NodeDetail(
                node_name=NODE_NAME_2_SUB,
                msg_type=MSG_TYPE_2_SUB,
                topic_name=TOPIC_2_SUB,
                rate_sec=RATE_HZ_2_SUB,
                callback_func=CALLBACK_2_SUB
            )
            _sub_node_info_2 = PubNodeInfo(
                module_name=MODULE_2_SUB,
                node_info=_node_detail_2
            )
            _subscribe_module_2_info = asdict(_sub_node_info_2)
            if _subscribe_module_2_info['node_info']['callback_func'] == CALLBACK_2_SUB:
                _subscribe_module_2_info['node_info']['msg_type'] = String
                _subscribe_module_2_info['node_info']['rate_sec'] = eval(RATE_HZ_2_SUB)
                _subscribe_module_2_info['node_info']['callback_func'] = eval(CALLBACK_2_SUB)
                return _subscribe_module_2_info
        elif node_name == NODE_NAME_1_PUB:
            _node_detail_1 = NodeDetail(
                node_name=NODE_NAME_1_PUB,
                msg_type=MSG_TYPE_1_PUB,
                topic_name=TOPIC_1_PUB,
                rate_sec=RATE_HZ_1_PUB,
                callback_func=CALLBACK_1_PUB
            )
            _pub_node_info_1 = SubNodeInfo(
                module_name=MODULE_1_PUB,
                node_info=_node_detail_1
            )
            _publish_to_module_1_info = asdict(_pub_node_info_1)
            if _publish_to_module_1_info['node_info']['callback_func'] == CALLBACK_1_PUB:
                _publish_to_module_1_info['node_info']['msg_type'] = String
                # _publish_to_module_1_info['node_info']['rate_sec'] = eval(RATE_HZ_1_PUB)
                # _publish_to_module_1_info['node_info']['callback_func'] = eval(CALLBACK_1_PUB)
                return _publish_to_module_1_info
        raise ValueError("type the right value to 'self._get_node_info('여기')' <-here ")

    def _setup_for_thread(self):
        """
        인식모듈에서 발행하는 topic 구독자 생성
        Ros2SubNodeAgent 에서 node_object를 생성 한 후,
        get_node_obj 의 return값으로 오브젝트 id 를 받고
        executor에 해당 오브젝트id를 추가함
        """
        # assert self._module_validation == self._subscribe_module_1_info
        self._logger.debug('self._subscribe_module_1_info : [{}]'.format(
            self._subscribe_module_1_info))
        _subscribe_module_1_obj = Ros2SubNodeAgent(self._subscribe_module_1_info)
        _subscribe_module_1_obj.set_up_node_with_binding_subscription()
        # _subscribe_module_1_obj.set_up_subscription()
        _sub_node_1_obj = _subscribe_module_1_obj.get_node_obj_id()
        self._logger.debug('_sub_node_1_obj : [{}]'.format(_sub_node_1_obj))
        self._executor.add_node(_sub_node_1_obj)

        # assert self._module_validation == self._subscribe_module_2_info
        self._logger.debug('self._subscribe_module_2_info : [{}]'.format(
            self._subscribe_module_2_info))
        _subscribe_module_2_obj = Ros2SubNodeAgent(self._subscribe_module_2_info)
        _subscribe_module_2_obj.set_up_node_with_binding_subscription()
        _sub_node_2_obj = _subscribe_module_2_obj.get_node_obj_id()
        self._logger.debug('_sub_node_1_obj : [{}]'.format(_sub_node_2_obj))
        self._executor.add_node(_sub_node_2_obj)

        # assert self._module_validation == self._publish_to_module_1_info
        self._logger.debug('self._publish_to_module_1_info : [{}]'.format(
            self._publish_to_module_1_info))
        self._publish_module_1_obj = Ros2PubNodeAgent(self._publish_to_module_1_info)
        self._publish_module_1_obj.set_up_node_with_binding_publisher()
        _pub_node_1_obj = self._publish_module_1_obj.get_node_obj_id()
        self._logger.debug('_pub_node_1_obj : [{}]'.format(_pub_node_1_obj))
        self._executor.add_node(_pub_node_1_obj)

    def start_multi_thread(self):
        """ executor에 등록된 쓰레드들 start """
        self._logger.debug('multi_thead start now')
        _executor_thread = Thread(target=self._executor.spin, daemon=True)
        pub_trigger_thread = Thread(target=self.publish_module_1_info_trigger)
        _executor_thread.start()
        pub_trigger_thread.start()

    def _put_data_to_robot(self, msg):
        self._queue_dict['module_to_robot'].put_nowait(msg)

    def subscribe_module_1_info_callback(self, msg) -> None:
        """
        큐 data 보내는거 dataclass 어떻게 짤지 생각중
        :param msg: ros2 topic 메시지 , std_msg/String
        """
        self._logger.warning("ros2_module_1_sub callback called : {}  type : {}"
                             .format(msg, type(msg)))
        self._put_data_to_robot(msg.data)
        # self._queue_dict['module_to_robot']._loop._write_to_self()

    def subscribe_module_2_info_callback(self, msg) -> None:
        self._logger.warning("ros2_module_2_sub callback called : {}  type : {}"
                             .format(msg, type(msg)))
        self._put_data_to_robot(msg.data)
        # self._queue_dict['module_to_robot']._loop._write_to_self()

    def publish_module_1_info_trigger(self):
        msg_pub = String()
        while True:
            if self._queue_dict['robot_to_module'].empty():
                time.sleep(3)
            else:
                _msg = self._queue_dict['robot_to_module'].get()
                # mqtt 로 오는 data는 bytes로 옴 따라서 str 변환 필요
                msg_pub.data = str(_msg)
                self._publish_module_1_obj.publish_func(msg_pub)
                self._logger.warning('published now , msg : [{}]'.format(msg_pub))


# pub test command   'cmd'의 val 값은 mqtt랑 약속하는 부분이다
# ros2 topic pub --once topic_name/_2/_broadcast std_msgs/String "data: {'cmd': 'md_to_rb_2_broadcast' , 'data': 'md_to_rb_2_test'}"
# ros2 topic pub --once topic_name/_1 std_msgs/String "data: {'cmd': 'md_to_rb_1' , 'data': 'md_to_rb_1_test'}"

 

댓글