코드
subscriber_node_maker
# ===========================================================================================
# !/usr/bin/python
# -*- coding: utf-8 -*-
#----------------------------------------------------------------------
import os
import sys
path = os.path.abspath(os.path.dirname(sys.argv[0]))
sys.path.append(path[:path.rfind('/')])
#----------------------------------------------------------------------
import rclpy
from rclpy.qos import QoSProfile
from util.util_logger import Logger
class Ros2SubNodeAgent:
"""
멀티 쓰레드 에 멀티 노드 구조
멀티쓰레딩 subscribe 내용을 참조 사이트
https://answers.ros.org/question/377848/spinning-multiple-nodes-across-multiple-threads/
"""
def __init__(self, _module_info):
"""
"""
self._executor = None
self._module_info = _module_info
self._qos_profile = QoSProfile(depth=10)
self._logger = Logger.get_instance("ros2")
self._node_obj = None
self._rate = None
def set_up_node_with_binding_subscription(self):
self.create_node()
self.node_obj_create_subscription()
self.node_obj_create_rate()
def create_node(self) -> None:
"""
node object 생성
"""
if rclpy.ok():
pass
else:
rclpy.init()
_node_name = self._module_info['node_info']['node_name']
# self._logger.debug('_node_name [{}]'.format(type(_node_name)))
self._logger.debug('_node_name [{}]'.format(_node_name))
self._node_obj = rclpy.create_node(_node_name)
self._logger.debug('_node_objj [{}]'.format(self._node_obj))
def node_obj_create_subscription(self) -> None:
"""
executor에 등록된 node_obj에 type,topic_name,콜백함수,qos 정보 삽입
"""
_callback_func = self._module_info['node_info']['callback_func']
self._logger.debug("msg_type [{}] , "
"topic name [{}] , "
"topic name type [{}] , "
"callback_func [{}] , "
"qos_profile [{}]".format(
self._module_info['node_info']['msg_type'],
self._module_info['node_info']['topic_name'],
type(self._module_info['node_info']['topic_name']),
self._module_info['node_info']['callback_func'],
self._qos_profile)
)
self._node_obj.subscription = self._node_obj.create_subscription(
self._module_info['node_info']['msg_type'],
self._module_info['node_info']['topic_name'],
self._module_info['node_info']['callback_func'],
self._qos_profile)
def node_obj_create_rate(self):
"""
각 node 당 rate time 설정
"""
self._logger.debug('node_obj_create_rate : [{}]'.format(
self._module_info['node_info']['rate_sec']))
self._rate = self._node_obj.create_rate(self._module_info['node_info']['rate_sec'])
# self._node_obj.create_rate(self._module_info['node_info']['rate_sec']).sleep()
self._logger.debug('node_obj_create_rate222')
def get_node_obj_id(self):
return self._node_obj
publisher_node_maker
# ===========================================================================================
# !/usr/bin/python
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------
import os
import sys
path = os.path.abspath(os.path.dirname(sys.argv[0]))
sys.path.append(path[:path.rfind('/')])
# ----------------------------------------------------------------------
import rclpy
from rclpy.qos import QoSProfile
from util.util_logger import Logger
class Ros2PubNodeAgent:
"""
멀티 쓰레드 에 멀티 노드 구조
멀티쓰레딩 pubscribe 내용을 참조 사이트
https://answers.ros.org/question/377848/spinning-multiple-nodes-across-multiple-threads/
"""
def __init__(self, _module_info):
"""
"""
self._executor = None
self._module_info = _module_info
self._qos_profile = QoSProfile(depth=10)
self._logger = Logger.get_instance("ros2")
self._node_obj = None
self._rate = None
def set_up_node_with_binding_publisher(self):
self.create_node()
self.node_obj_create_publisher()
def create_node(self) -> None:
"""
node object 생성
"""
if rclpy.ok():
pass
else:
rclpy.init()
_node_name = self._module_info['node_info']['node_name']
self._logger.debug('_node_name [{}]'.format(_node_name))
self._node_obj = rclpy.create_node(_node_name)
self._logger.debug('_node_objj [{}]'.format(self._node_obj))
def node_obj_create_publisher(self) -> None:
"""
executor에 등록된 node_obj에 type,topic_name,콜백함수,qos 정보 삽입
"""
_callback_func = self._module_info['node_info']['callback_func']
self._logger.debug(
"msg_type [{}] , "
"topic name [{}] , "
"topic name type [{}] , "
"callback_func [{}] , "
"qos_profile [{}]".format(
self._module_info['node_info']['msg_type'],
self._module_info['node_info']['topic_name'],
type(self._module_info['node_info']['topic_name']),
self._module_info['node_info']['callback_func'],
self._qos_profile))
self._node_obj.publisher = self._node_obj.create_publisher(
self._module_info['node_info']['msg_type'],
self._module_info['node_info']['topic_name'],
self._qos_profile)
def get_node_obj_id(self):
return self._node_obj
def publish_func(self, __msg):
"""
퍼블리시 함수
:return:
"""
self._node_obj.publisher.publish(__msg)
MultiThread starter
import os
import sys
import time
path = os.path.abspath(os.path.dirname(sys.argv[0]))
sys.path.append(path[:path.rfind('/')])
#----------------------------------------------------------------------
import rclpy
from rclpy.executors import Executor # noqa: F401
from threading import Thread
from std_msgs.msg import String
from dataclasses import asdict
from constant.dataclass import SubNodeInfo, PubNodeInfo, NodeDetail
from util.util_logger import Logger
from ros2.ros2_sub_node_agent import Ros2SubNodeAgent
from ros2.ros2_pub_node_agent import Ros2PubNodeAgent
# _subscribe_module_1_info
NODE_NAME_1_SUB = 'sub_node_name_1'
MSG_TYPE_1_SUB = 'String'
TOPIC_1_SUB = 'topic_name/_1'
RATE_HZ_1_SUB = '2.0'
CALLBACK_1_SUB = 'self.subscribe_module_1_info_callback'
MODULE_1_SUB = 'sub_module_1'
NODE_NAME_2_SUB = 'sub_node_name_2_broadcast'
MSG_TYPE_2_SUB = 'String'
TOPIC_2_SUB = 'topic_name/_2/_broadcast'
RATE_HZ_2_SUB = '2.0'
CALLBACK_2_SUB = 'self.subscribe_module_2_info_callback'
MODULE_2_SUB = 'module_2'
NODE_NAME_1_PUB = 'pub_node_name_1'
MSG_TYPE_1_PUB = 'String'
TOPIC_1_PUB = 'pub_topic_name/_1'
RATE_HZ_1_PUB = '2.0'
CALLBACK_1_PUB = 'no_use'
MODULE_1_PUB = 'pub_module_1'
class Ros2ModuleManager():
"""
Ros2SubNodeAgent : client , node_obj 생성클래쓰
Ros2ModuleManager : manager 역할
"""
def __init__(self, **kwargs):
self._queue_dict = self._trans_to_queue(kwargs)
self._logger = Logger.get_instance('module_manager')
self._logger.debug('Ros2ModuleManager queue id : [{}]'.format(
id(self._queue_dict['module_to_robot'])))
self._subscribe_module_1_info = self._get_node_info(NODE_NAME_1_SUB)
self._subscribe_module_2_info = self._get_node_info(NODE_NAME_2_SUB)
self._publish_to_module_1_info = self._get_node_info(NODE_NAME_1_PUB)
self._publish_module_1_obj = None
self._executor = None
rclpy.init()
self._executor = rclpy.executors.MultiThreadedExecutor()
self._setup_for_thread()
def _trans_to_queue(self, _kwargs):
"""
async queue 랑 queue를 인자로 넘겨준걸 dictionary형태로 변환
"""
_queue_dict = {}
for _key, _val in _kwargs.items():
_queue_dict[_key] = _val
return _queue_dict
def _get_node_info(self, node_name):
if node_name == NODE_NAME_1_SUB:
_node_detail_1 = NodeDetail(
node_name=NODE_NAME_1_SUB,
msg_type=MSG_TYPE_1_SUB,
topic_name=TOPIC_1_SUB,
rate_sec=RATE_HZ_1_SUB,
callback_func=CALLBACK_1_SUB
)
_sub_node_info_1 = SubNodeInfo(
module_name=MODULE_1_SUB,
node_info=_node_detail_1
)
_subscribe_module_1_info = asdict(_sub_node_info_1)
if _subscribe_module_1_info['node_info']['callback_func'] == CALLBACK_1_SUB:
_subscribe_module_1_info['node_info']['msg_type'] = String
_subscribe_module_1_info['node_info']['rate_sec'] = eval(RATE_HZ_1_SUB)
_subscribe_module_1_info['node_info']['callback_func'] = eval(CALLBACK_1_SUB)
return _subscribe_module_1_info
elif node_name == NODE_NAME_2_SUB:
_node_detail_2 = NodeDetail(
node_name=NODE_NAME_2_SUB,
msg_type=MSG_TYPE_2_SUB,
topic_name=TOPIC_2_SUB,
rate_sec=RATE_HZ_2_SUB,
callback_func=CALLBACK_2_SUB
)
_sub_node_info_2 = PubNodeInfo(
module_name=MODULE_2_SUB,
node_info=_node_detail_2
)
_subscribe_module_2_info = asdict(_sub_node_info_2)
if _subscribe_module_2_info['node_info']['callback_func'] == CALLBACK_2_SUB:
_subscribe_module_2_info['node_info']['msg_type'] = String
_subscribe_module_2_info['node_info']['rate_sec'] = eval(RATE_HZ_2_SUB)
_subscribe_module_2_info['node_info']['callback_func'] = eval(CALLBACK_2_SUB)
return _subscribe_module_2_info
elif node_name == NODE_NAME_1_PUB:
_node_detail_1 = NodeDetail(
node_name=NODE_NAME_1_PUB,
msg_type=MSG_TYPE_1_PUB,
topic_name=TOPIC_1_PUB,
rate_sec=RATE_HZ_1_PUB,
callback_func=CALLBACK_1_PUB
)
_pub_node_info_1 = SubNodeInfo(
module_name=MODULE_1_PUB,
node_info=_node_detail_1
)
_publish_to_module_1_info = asdict(_pub_node_info_1)
if _publish_to_module_1_info['node_info']['callback_func'] == CALLBACK_1_PUB:
_publish_to_module_1_info['node_info']['msg_type'] = String
# _publish_to_module_1_info['node_info']['rate_sec'] = eval(RATE_HZ_1_PUB)
# _publish_to_module_1_info['node_info']['callback_func'] = eval(CALLBACK_1_PUB)
return _publish_to_module_1_info
raise ValueError("type the right value to 'self._get_node_info('여기')' <-here ")
def _setup_for_thread(self):
"""
인식모듈에서 발행하는 topic 구독자 생성
Ros2SubNodeAgent 에서 node_object를 생성 한 후,
get_node_obj 의 return값으로 오브젝트 id 를 받고
executor에 해당 오브젝트id를 추가함
"""
# assert self._module_validation == self._subscribe_module_1_info
self._logger.debug('self._subscribe_module_1_info : [{}]'.format(
self._subscribe_module_1_info))
_subscribe_module_1_obj = Ros2SubNodeAgent(self._subscribe_module_1_info)
_subscribe_module_1_obj.set_up_node_with_binding_subscription()
# _subscribe_module_1_obj.set_up_subscription()
_sub_node_1_obj = _subscribe_module_1_obj.get_node_obj_id()
self._logger.debug('_sub_node_1_obj : [{}]'.format(_sub_node_1_obj))
self._executor.add_node(_sub_node_1_obj)
# assert self._module_validation == self._subscribe_module_2_info
self._logger.debug('self._subscribe_module_2_info : [{}]'.format(
self._subscribe_module_2_info))
_subscribe_module_2_obj = Ros2SubNodeAgent(self._subscribe_module_2_info)
_subscribe_module_2_obj.set_up_node_with_binding_subscription()
_sub_node_2_obj = _subscribe_module_2_obj.get_node_obj_id()
self._logger.debug('_sub_node_1_obj : [{}]'.format(_sub_node_2_obj))
self._executor.add_node(_sub_node_2_obj)
# assert self._module_validation == self._publish_to_module_1_info
self._logger.debug('self._publish_to_module_1_info : [{}]'.format(
self._publish_to_module_1_info))
self._publish_module_1_obj = Ros2PubNodeAgent(self._publish_to_module_1_info)
self._publish_module_1_obj.set_up_node_with_binding_publisher()
_pub_node_1_obj = self._publish_module_1_obj.get_node_obj_id()
self._logger.debug('_pub_node_1_obj : [{}]'.format(_pub_node_1_obj))
self._executor.add_node(_pub_node_1_obj)
def start_multi_thread(self):
""" executor에 등록된 쓰레드들 start """
self._logger.debug('multi_thead start now')
_executor_thread = Thread(target=self._executor.spin, daemon=True)
pub_trigger_thread = Thread(target=self.publish_module_1_info_trigger)
_executor_thread.start()
pub_trigger_thread.start()
def _put_data_to_robot(self, msg):
self._queue_dict['module_to_robot'].put_nowait(msg)
def subscribe_module_1_info_callback(self, msg) -> None:
"""
큐 data 보내는거 dataclass 어떻게 짤지 생각중
:param msg: ros2 topic 메시지 , std_msg/String
"""
self._logger.warning("ros2_module_1_sub callback called : {} type : {}"
.format(msg, type(msg)))
self._put_data_to_robot(msg.data)
# self._queue_dict['module_to_robot']._loop._write_to_self()
def subscribe_module_2_info_callback(self, msg) -> None:
self._logger.warning("ros2_module_2_sub callback called : {} type : {}"
.format(msg, type(msg)))
self._put_data_to_robot(msg.data)
# self._queue_dict['module_to_robot']._loop._write_to_self()
def publish_module_1_info_trigger(self):
msg_pub = String()
while True:
if self._queue_dict['robot_to_module'].empty():
time.sleep(3)
else:
_msg = self._queue_dict['robot_to_module'].get()
# mqtt 로 오는 data는 bytes로 옴 따라서 str 변환 필요
msg_pub.data = str(_msg)
self._publish_module_1_obj.publish_func(msg_pub)
self._logger.warning('published now , msg : [{}]'.format(msg_pub))
# pub test command 'cmd'의 val 값은 mqtt랑 약속하는 부분이다
# ros2 topic pub --once topic_name/_2/_broadcast std_msgs/String "data: {'cmd': 'md_to_rb_2_broadcast' , 'data': 'md_to_rb_2_test'}"
# ros2 topic pub --once topic_name/_1 std_msgs/String "data: {'cmd': 'md_to_rb_1' , 'data': 'md_to_rb_1_test'}"
'ros2_python > Ros2 msg,srv,action,parm' 카테고리의 다른 글
Ros2 Node 클래스 설명 // 노드랑 sub or pub config 값 바인딩 할때 (0) | 2022.09.07 |
---|---|
python MultiThread( publisher 2개, suscriber 2개 ) (0) | 2022.09.01 |
ros2_msg 토픽 example(executor사용==멀티쓰레드) (0) | 2022.07.05 |
ros2_msg 토픽 example(executor사용==싱글쓰레드) (0) | 2022.06.28 |
ros2_python코드에서 param 데이터 사용법 (0) | 2022.04.27 |
댓글