코드 의도 :
멀티쓰레드로 등록된 subscription에 data를 쏴줄 때마다 callback 함수가 호출되는데,
callback 함수가 호출 될 때 마다 해당된 publisher를 publish해주는 코드
동작은 50초간 지속
ros2_agnet.py
import os
import sys
import time
import rclpy
from std_msgs.msg import String
import threading
from ros2_handler import SubNodeMaker, PubNodeMaker, Ros2Handler
path = os.path.abspath(os.path.dirname(sys.argv[0]))
sys.path.append(path[:path.rfind('/')])
#----------------------------------------------------------------------
if __name__ == "__main__":
rclpy.init()
Ros2Handler.init_rclpy()
#========================================================publisher config
pub_node_maker_1 = PubNodeMaker()
# 메시지 정의-----------------------------------------------------------------
_msg_1 = String()
_msg_1.data = 'pub_data_from_pub_agent_1'
# --------------------------------------------------------------------------
pub_agent_1 = Ros2Handler(pub_node_maker_1(node_name="pub_node_1", msg_type=String, topic_name="/node_name_pub1/aa",
qos_profile=10))
# pub_agent_1.run_publish(_msg_1) # 이 명령할 떄마다 한번 씩 publish 함
pub_node_maker_2 = PubNodeMaker()
# 메시지 정의-----------------------------------------------------------------
_msg_2 = String()
_msg_2.data = 'pub_data_from_pub_agent_2'
# --------------------------------------------------------------------------
pub_agent_2 = Ros2Handler(pub_node_maker_2("pub_node_2", String, "/node_name_pub2/aa", 10))
# pub_agent_2.run_publish(_msg_2) # 이 명령할 떄마다 한번 씩 publish 함
# ========================================================subscription config
sub_node_maker_1 = SubNodeMaker()
# 콜백 정의------------------------------------------------------------------
def listener_callback_1(msg):
print()
print('sub_callback_1, The number of active thread', threading.active_count())
print('I heard: ', msg.data)
pub_agent_1.run_publish(_msg_1)
print('executing Node id : ', Ros2Handler._execute_list)
# --------------------------------------------------------------------------
sub_agent_1 = Ros2Handler(sub_node_maker_1(node_name="sub_node_1", msg_type=String, topic_name="/node_name_sub1/aa",
callback=listener_callback_1, qos_profile=10))
sub_node_maker_2 = SubNodeMaker()
# 콜백 정의------------------------------------------------------------------
def listener_callback_1(msg):
print()
print('sub_callback_2, The number of active thread', threading.active_count())
print('I heard: ', msg.data)
pub_agent_2.run_publish(_msg_2)
print('executing Node id : ', Ros2Handler._execute_list)
# --------------------------------------------------------------------------
sub_agent_2 = Ros2Handler(sub_node_maker_2("sub_node_2", String, "/node_name_sub2/aa", listener_callback_1, 10))
Ros2Handler.run_executor()
print('running_Thread_num : ', threading.active_count())
time.sleep(50)
print('running_Thread_num', threading.active_count())
"""
ros2 topic echo /node_name_pub1/aa
ros2 topic echo /node_name_pub2/aa
ros2 topic pub --once /node_name_sub1/aa std_msgs/msg/String "data : topic_test_1!"
ros2 topic pub --once /node_name_sub2/aa std_msgs/msg/String "data : topic_test_2!"
"""
ros2_handler.py
# -*- coding: utf-8 -*-
import rclpy
from rclpy.qos import QoSProfile
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
from rclpy.node import Publisher
from rclpy.node import Subscription
from threading import Thread
from dataclasses import dataclass
from typing import Any
from typing import Union
#dataclass 변수어노테이션으로 요긴하게 쓰는중
@dataclass
class SubNodeInfo:
node_name: str
msg_type: Any
topic_name: str
callback: Any
qos_profile: rclpy.qos.QoSProfile
@dataclass
class PubNodeInfo:
node_name: str
msg_type: Any
topic_name: str
qos_profile: rclpy.qos.QoSProfile
# class SubNodeMaker(SubNodeInfo):
class SubNodeMaker:
def __init__(self):
self.qos_profile = None
def __call__(self, node_name: str, msg_type: Any, topic_name: str, callback: Any,
qos_profile: Union[int, QoSProfile]) -> SubNodeInfo:
if isinstance(qos_profile, int):
self.qos_profile = rclpy.qos.QoSProfile(
reliability=rclpy.qos.QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT,
history=rclpy.qos.QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
depth=qos_profile
)
elif isinstance(qos_profile, QoSProfile):
self.qos_profile = qos_profile
else:
raise TypeError('PubNodeMaker 에서 config TYPE ERROR 발생')
_subscription_type = SubNodeInfo(
node_name=node_name,
msg_type=msg_type,
topic_name=topic_name,
callback=callback,
qos_profile=self.qos_profile
)
return _subscription_type
# class PubNodeMaker(PubNodeInfo):
class PubNodeMaker:
def __init__(self):
self.qos_profile = None
def __call__(self, node_name: str, msg_type: Any, topic_name: str,
qos_profile: Union[int, QoSProfile]) -> PubNodeInfo:
if isinstance(qos_profile, int):
self.qos_profile = rclpy.qos.QoSProfile(
reliability=rclpy.qos.QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT,
history=rclpy.qos.QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
depth=qos_profile
)
elif isinstance(qos_profile, QoSProfile):
self.qos_profile = qos_profile
else:
raise TypeError('PubNodeMaker 에서 config TYPE ERROR 발생')
self.publisher_type = PubNodeInfo(
node_name=node_name,
msg_type=msg_type,
topic_name=topic_name,
qos_profile=self.qos_profile
)
return self.publisher_type
class Ros2Handler(object):
"""
코드 의도 :
멀티쓰레드로 등록된 subscription에 data를 쏴줄 때마다 callback 함수가 호출되는데,
callback 함수가 호출 될 때 마다 해당된 publisher를 publish해주는 코드
동작은 50초간 지속
코드설명 :
Ros2Handler.init_rclpy() 랑 Ros2Handler.run_executor()는 class method로 따로 켜줘야함.
init_rclpy() 는 rclpy.init() 용 방어코드(안써도됨), Ros2Handler.run_executor()는 여태 등록된 executor를 한번에 켜주기 위함
나머지는 코드 등록 과정
Ros2Handler 의 파라미터는 커스텀 dataclass로 알맞은 파라미터를 보내지 않으면 에러 발생
"""
_execute_list = []
_executor: MultiThreadedExecutor
def __init__(self, node_info: Union[SubNodeInfo, PubNodeInfo]): # , _queue: asyncio.Queue):
"""
"""
self.sub_node_info: SubNodeInfo = None
self.pub_node_info: PubNodeInfo = None
self._sub_node_obj: Node = None
self._pub_node_obj: Node = None
self.set_executor()
if isinstance(node_info, SubNodeInfo):
self.sub_node_info = node_info
self._set_up_subscription()
elif isinstance(node_info, PubNodeInfo):
self.pub_node_info = node_info
self._set_up_publisher()
else:
raise TypeError("class Ros2Handler parameter type ERROR")
@classmethod
def init_rclpy(cls):
""" 한번만 수행되어야 하므로 클래쓰메쏘드 사용 """
if rclpy.ok():
pass
else:
rclpy.init()
@classmethod
def set_executor(cls):
cls._executor = MultiThreadedExecutor()
@classmethod
def remove_node_from_executor(cls):
"""필요시 사용 따로 쓰진 않는중"""
cls._executor.remove_node()
@classmethod
def run_executor(cls):
for _ in range(0,len(cls._execute_list)):
cls._executor.add_node(cls._execute_list[_])
_executor_thread = Thread(target=cls._executor.spin, daemon=True)
print("ROS2 Executor's runnung nodes number : [{}] ".format(len(cls._execute_list)), flush=True)
print("ROS2 Executor's runnung nodes names : [{}] ".format(cls._executor.get_nodes()), flush=True)
_executor_thread.start()
def _set_up_subscription(self) -> None:
self._create_node()
self._node_obj_create_subscription()
def _set_up_publisher(self) -> None:
self._create_node()
self._node_obj_create_publisher()
def _create_node(self) -> None:
"""
node object 생성
"""
if rclpy.ok():
pass
else:
rclpy.init()
if isinstance(self.sub_node_info, SubNodeInfo):
print('ROS2 create_node for subscription', flush=True)
self._sub_node_obj = rclpy.create_node(self.sub_node_info.node_name)
elif isinstance(self.pub_node_info, PubNodeInfo):
print('ROS2 create_node for publisher', flush=True)
self._pub_node_obj = rclpy.create_node(self.pub_node_info.node_name)
else:
raise TypeError("class Ros2Handler parameter type ERROR")
def _node_obj_create_subscription(self) -> None:
"""subscribe 노드 생성시 사용 되는 함수"""
print("create_subscription on ", flush=True)
self._sub_node_obj.subscription: Subscription = self._sub_node_obj.create_subscription(
self.sub_node_info.msg_type,
self.sub_node_info.topic_name,
self.sub_node_info.callback,
self.sub_node_info.qos_profile
)
self._execute_list.append(self._sub_node_obj)
def _node_obj_create_publisher(self) -> None:
"""publisher 노드 생성시 사용 되는 함수"""
print("create_publisher on ", flush=True)
self._pub_node_obj.publisher: Publisher = self._pub_node_obj.create_publisher(
self.pub_node_info.msg_type,
self.pub_node_info.topic_name,
self.pub_node_info.qos_profile
)
# self._execute_list.append(self._pub_node_obj)
def run_publish(self, msg):
print("ROS2 run_publish on, topic name's: ", self._pub_node_obj.publisher.topic_name, flush=True)
print('The number of this message published is [{}] '.format(self._pub_node_obj.publisher.get_subscription_count()))
self._pub_node_obj.publisher.publish(msg)
# self._pub_node_obj.publisher.destroy()
실행결과 :
'ros2_python > Ros2 msg,srv,action,parm' 카테고리의 다른 글
spin_until_future_complete 예제 (0) | 2022.09.15 |
---|---|
Ros2 Node 클래스 설명 // 노드랑 sub or pub config 값 바인딩 할때 (0) | 2022.09.07 |
ROS2_msg_pub_sub_multithread_python (0) | 2022.07.21 |
ros2_msg 토픽 example(executor사용==멀티쓰레드) (0) | 2022.07.05 |
ros2_msg 토픽 example(executor사용==싱글쓰레드) (0) | 2022.06.28 |
댓글