본문 바로가기
ros2_python/Ros2 msg,srv,action,parm

ros2_msg 토픽 example(executor사용==싱글쓰레드)

by 알 수 없는 사용자 2022. 6. 28.

ROS2 msg sub 부분 

code : 

# ===========================================================================================
# !/usr/bin/python
# -*- coding: utf-8 -*-
import rclpy
from rclpy.executors import Executor  # noqa: F401
from rclpy.qos import QoSProfile
import threading
import time
from std_msgs.msg import String

class SubNodeMakerAgent:
    """
    멀티 쓰레드 에 멀티 노드 구조
    멀티쓰레딩 subscribe 내용을 참조 사이트
    https://answers.ros.org/question/377848/spinni_sigint_gcng-multiple-nodes-across-multiple-threads/
    """

    def __init__(self, _module_info):
        """
        """
        self._executor = None
        self._module_info = _module_info
        self._qos_profile = QoSProfile(depth=10)
        self._node_obj = None

    def set_up_sub(self):
        print('setup stat')
        self._create_node()
        self._node_obj_create_subscription()
        print('setup end')

    def _create_node(self) -> None:
        """
        node object 생성
        """
        if rclpy.ok():
            pass
        else:
            rclpy.init()

        _node_name = self._module_info['node_info']['node_name']
        self._node_obj = rclpy.create_node(_node_name)
    def _node_obj_create_subscription(self) -> None:
        """
        executor에 등록된 node_obj에 type,topic_name,콜백함수,qos 정보 삽입
        """
        self._node_obj.subscription = self._node_obj.create_subscription(
            self._module_info['node_info']['msg_type'],
            self._module_info['node_info']['topic_name'],
            self._module_info['node_info']['callback_func'],
            self._qos_profile)

    def get_node_obj_id(self):
        return self._node_obj

#--------------------------#--------------------------#--------------------------#--------------------------
#--------------------------#--------------------------#--------------------------#--------------------------


if __name__ == '__main__':
# sub_1 용  #--------------------------#--------------------------#--------------------------
    def subscribe_module_1_info_callback(msg) -> None:
        print('hihi i got msg : [{}]'.format(msg))

    NODE_NAME_1_SUB = 'sub_thread_test_1'
    MSG_TYPE_1_SUB = String
    TOPIC_1_SUB = '/sub_thread_test_1/test'
    RATE_HZ_1_SUB = '2.0'
    CALLBACK_1_SUB = subscribe_module_1_info_callback

    _subscribe_module_1_info = {
        'module_name': NODE_NAME_1_SUB,
        'node_info': {
            'node_name': NODE_NAME_1_SUB,
            'msg_type': MSG_TYPE_1_SUB,
            'topic_name': TOPIC_1_SUB,
            'rate_sec': RATE_HZ_1_SUB,
            'callback_func': CALLBACK_1_SUB
        }
    }

    class aa():
        def __init__(self):
            _subscribe_module_1_obj = SubNodeMakerAgent(_subscribe_module_1_info)
            _subscribe_module_1_obj.set_up_sub()
            self._sub_node_1_obj = _subscribe_module_1_obj.get_node_obj_id()
            self._executor = rclpy.executors.SingleThreadedExecutor()

        def run(self):
            self._executor.add_node(self._sub_node_1_obj)
            _executor_thread = threading.Thread(target=self._executor.spin, daemon=True)
            _executor_thread.start()
            # _executor_thread.join()

    a = aa()
    a.run()

    while rclpy.ok():
        time.sleep(3)
        print('im waiting publisher')

# 다른 터미널에서 이 명령어 쳐서 받는지 확인하면 끝
# ros2 topic pub /sub_thread_test_1/test std_msgs/msg/String "data : hi SingleThread !"

결과 출력값

 

 

 

ROS2 msg pub 부분

code : 이코드는 개별실행 안됨. 위에 sub코드처럼 네이밍 해주는 부분이 있어야함, Thread 스타트 하는 부분도 따로 빼지 않고 Agent 클래스 내부에서 실행 하는 차이가 있음

# ===========================================================================================
# !/usr/bin/python
# -*- coding: utf-8 -*-
import rclpy
# from rclpy.node import Node #노드를 상속해서 쓰지 않음
from std_msgs.msg import String
from rclpy.executors import Executor  # noqa: F401
import threading
import logging
import time
from constant_1.enum_1 import *
from rclpy.qos import QoSProfile


class PubNodeMakerAgent(object):
    """
    싱글 쓰레드 에 싱글 노드 구조
    """
    def __init__(self, node_name, msg_type, topic_name, time_period):
        """
        :param node_name    : rclpy.executors에 등록할 node 명
        :param msg_type     : 보낼 msg의 타입
        :param topic_name   : 지정할 topic 이름
        :param time_period  : timer callback 함수 주기
        """
        self._logger = logging.getLogger("pub_maker")
        self._logger.setLevel(logging.DEBUG)
        self.a = logging.StreamHandler()
        self._logger.addHandler(self.a)
        self.node_name = node_name
        self.msg_type = msg_type
        self.topic_name = topic_name
        self.time_period = time_period
        self.executor = None
        self.node = None
        self._i = 0
        self.qos_profile = QoSProfile(depth=10)
        # rclpy.init()

    def __del__(self):
        self._logger.warning("__del__ on")
        self.node.destroy_node()
        rclpy.shutdown()

    def __call__(self, *args, **kwargs):
        pass

    def node_maker(self) -> None:
        """
        노드 생성
        노드 이름을 기준으로 멀티쓰레드Executor에 add해주는 방식이므로 중요함
        :return:
        """
        self.node = rclpy.create_node(self.node_name)

    def node_adder(self) -> None:
        """
        ???????????????????
        멀티쓰레드 Executor를 쓰고 있음 . 싱글쓰레드 Executor 사용 하기 가능하다.
        :return:
        """
        self.executor = rclpy.executors.SingleThreadedExecutor()
        self.executor.add_node(self.node)

    def node_pub_binder(self) -> None:
        """
        1. node랑 publisher의 config 파일을 binding 해줌
        2. node에 timer_callback 함수를 등록(register) 해줌

        여기서는 받을 만한 신호가 딱히 없으므로 특정시간마다 publish 신호가 오도록 구성함.
        ex) 센서data , 특정 플래그 신호가 on 시, publish
        :return:
        """
        self._logger.debug("qos profile type is : [{}]".format(type(self.qos_profile)))
        self.node.publisher = self.node.create_publisher(self.msg_type, self.topic_name, self.qos_profile)
        timer_period = self.time_period
        self.node.timer = self.node.create_timer(timer_period, self.timer_callback)

    def node_executor(self) -> None:
        """
        threading.Thread(target=self.executor.spin, daemon=True)
        "target=self.executor.spin_once()" 이부분에서 target 이 spin_once 일시엔, 한번만 publish하고 ros가 shutdown 된다.
        :return:
        """
        executor_thread = threading.Thread(target=self.executor.spin, daemon=True)
        # executor_thread = threading.Thread(target=self.executor.spin_once(), daemon=True)
        executor_thread.start()
        # # join 의미 쓰레드가 끝나기전에 main 쓰레드가 끝나지 않도록 기다려 달란의미 while True 해도됨
        # executor_thread.join()

    def timer_callback(self) -> None:
        """
        특정시간마다 등록된 node의 timer_callback 함수가 동작하고 timer_callback 함수에서 publish를 한다.
        :return:
        """
        msg = String()
        msg.data = 'Hello World: %d' % self._i
        self.node.publisher.publish(msg)
        self._logger.info(
            'modul->edge_server pub !! -> node_name : [{}] , msg.data : [{}] , topic_name : [{}]'.format(self.node_name,
                                                                                                         msg.data,
                                                                                                         self.topic_name))
        self._i += 1

    def run_pub(self) -> None:
        self.node_maker()
        self.node_adder()
        self.node_pub_binder()
        self.node_executor()


if __name__ == '__main__':
    rclpy.init()
    pub_1 = PubNodeMakerAgent(node_name=ROS2_MODULE_1_PUB_NODE,
                              msg_type=ROS2_MSG_TYPE_STR,
                              topic_name=ROS2_MODULE_1_SUB,
                              time_period=2)
    pub_1.run_pub()

    i = 0
    while rclpy.ok():
        i = i + 1
        time.sleep(1)

 

댓글