handler - agent - manager 구조
Subscription 노드와 , Publisher 노드로 나눠서 진행한 코드랑
둘다 같은 노드로 묶어서 진행한 코드 2종류로 짬
콜백이라든지, TOPIC, msg_type, qos 정의부분을 최대한 top level class 에서 다룰 수 있게금 구조를 짬
한 노드에 Subscription, Publisher 둘다 묶은 코드
설명 : 1초 지날때마다 /topic_1 -> 2 -> 3 -> 4 publish 해줍니다.
topic_1 ~ 4 를 subscribing 하고 있기 때문에 바로바로 콜백이 온합니다
서브스크립션 , 퍼블리셔를 정의하는 부분은 dataclasses로 정의해 사용합니다.
출력값 :
import json
import time
import asyncio
import signal
import rclpy
from rclpy.subscription import Subscription
from rclpy.publisher import Publisher
from rclpy.qos import QoSProfile
from rclpy.node import Node
from dataclasses import dataclass
from typing import TypeVar, Union, Callable, Any, Dict, List
from std_msgs.msg import String
import uuid
MsgType = TypeVar('MsgType')
# ====================================================================
# ROS handler.py =====================================================
# ====================================================================
@dataclass
class SubInfoType:
msg_type: Any
topic_name: str
callback: Callable
qos_profile: QoSProfile
@dataclass
class PubInfoType:
msg_type: Any
topic_name: str
qos_profile: QoSProfile
class RosHandler(Node):
def __init__(self, node_name):
super().__init__(node_name)
def return_subscription(self, sub_info: SubInfoType) -> Subscription:
return self.create_subscription(
sub_info.msg_type,
sub_info.topic_name,
sub_info.callback,
sub_info.qos_profile
)
def return_sub_dataclass_type(self, msg_type: MsgType, topic: str, callback: dict, qos_profile: Union[int, QoSProfile]) -> SubInfoType:
sub_info_1 = SubInfoType(
msg_type,
topic,
callback[topic],
self._return_qos_profile(qos_profile)
)
return sub_info_1
def _return_qos_profile(self, qos_profile):
if isinstance(qos_profile, int):
_qos = rclpy.qos.QoSProfile(
reliability=rclpy.qos.QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT,
history=rclpy.qos.QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
depth=qos_profile
)
elif isinstance(qos_profile, QoSProfile):
_qos = qos_profile
else:
raise TypeError('QOS 타입에러')
return _qos
# -------------------------------------------------------------------------------------------------------------
def return_publisher(self, pub_info: PubInfoType) -> Publisher:
return self.create_publisher(
pub_info.msg_type,
pub_info.topic_name,
pub_info.qos_profile
)
def return_pub_dataclass_type(self, msg_type, topic, qos_profile) -> PubInfoType:
pub_info_1 = PubInfoType(
msg_type,
topic,
self._return_qos_profile(qos_profile)
)
return pub_info_1
# -------------------------------------------------------------------------------------------------------------
def get_node_obj(self) -> Node:
return self
# ====================================================================
# ROS agent.py =======================================================
# ====================================================================
NODE_NAME = 'ROS_NODE_AGENT'
SUB_TOPIC_1 = '/topic_1'
SUB_TOPIC_2 = '/topic_2'
SUB_TOPIC_3 = '/topic_3'
SUB_TOPIC_4 = '/topic_4'
PUB_TOPIC_1 = '/topic_1'
PUB_TOPIC_2 = '/topic_2'
PUB_TOPIC_3 = '/topic_3'
PUB_TOPIC_4 = '/topic_4'
class RosAgent():
def __init__(self):
self.print_f('Ros2Agent __init__ called')
if rclpy.ok(): # 방어코드
pass
else:
rclpy.init()
self.callback_dict = {}
self.publisher_dict = {}
self.ros_handler = RosHandler(NODE_NAME)
self._node_obj: Node = self.ros_handler.get_node_obj()
self.print_f(self._node_obj.get_name())
self.binding_subscription_with_node()
self.binding_publisher_with_node()
self.i = 0
def print_f(self, _print):
print('{}'.format(_print), flush=True)
def binding_publisher_with_node(self):
_topic=PUB_TOPIC_1
tmp_dataclass = self.ros_handler.return_pub_dataclass_type(String, _topic, 10)
self.publisher_dict[_topic]: Dict[str, Publisher] = self.ros_handler.return_publisher(tmp_dataclass)
_topic = PUB_TOPIC_2
tmp_dataclass = self.ros_handler.return_pub_dataclass_type(String, _topic, 10)
self.publisher_dict[_topic]: Dict[str, Publisher] = self.ros_handler.return_publisher(tmp_dataclass)
_topic = PUB_TOPIC_3
tmp_dataclass = self.ros_handler.return_pub_dataclass_type(String, _topic, 10)
self.publisher_dict[_topic]: Dict[str, Publisher] = self.ros_handler.return_publisher(tmp_dataclass)
_topic = PUB_TOPIC_4
tmp_dataclass = self.ros_handler.return_pub_dataclass_type(String, _topic, 10)
self.publisher_dict[_topic]: Dict[str, Publisher] = self.ros_handler.return_publisher(tmp_dataclass)
self.print_f('PUB_TOPIC : {}'.format(self.publisher_dict))
def binding_subscription_with_node(self):
self.callback_dict[SUB_TOPIC_1] = self.listener_callback_1
tmp_dataclass = self.ros_handler.return_sub_dataclass_type(String, SUB_TOPIC_1, self.callback_dict, 10)
self._node_obj._sub_1 = self.ros_handler.return_subscription(tmp_dataclass)
self.callback_dict[SUB_TOPIC_2] = self.listener_callback_1
tmp_dataclass = self.ros_handler.return_sub_dataclass_type(String, SUB_TOPIC_2, self.callback_dict, 10)
self._node_obj._sub_2 = self.ros_handler.return_subscription(tmp_dataclass)
self.callback_dict[SUB_TOPIC_3] = self.listener_callback_1
tmp_dataclass = self.ros_handler.return_sub_dataclass_type(String, SUB_TOPIC_3, self.callback_dict, 10)
self._node_obj._sub_3 = self.ros_handler.return_subscription(tmp_dataclass)
self.callback_dict[SUB_TOPIC_4] = self.listener_callback_1
tmp_dataclass = self.ros_handler.return_sub_dataclass_type(String, SUB_TOPIC_4, self.callback_dict, 10)
self._node_obj._sub_4 = self.ros_handler.return_subscription(tmp_dataclass)
self.print_f('SUB_TOPIC : {} , {} , {} , {}'.format(SUB_TOPIC_1,SUB_TOPIC_2,SUB_TOPIC_3,SUB_TOPIC_4))
def listener_callback_1(self, msg):
self._node_obj.get_logger().info('get_messaged : "%s"' % msg.data)
#ros2 topic pub -1 /topic_1 std_msgs/msg/String data:\ \'HIHI\'\
def listener_callback_2(self, msg):
self._node_obj.get_logger().info('listner_22 I heard: "%s"' % msg.data)
#ros2 topic pub -1 /topic_2 std_msgs/msg/String data:\ \'HIHI\'\
# ================================ SUBSCRIPTION 정의================================
async def ros_task_main(self):
asyncio.create_task(self.execute_node_spin())
asyncio.create_task(self.publish_timer())
async def execute_node_spin(self):
async def spin_node(node_obj):
rclpy.spin_once(node=node_obj,
timeout_sec=0.) # timeout_sec 시간 만큼 실행권 가져오지 못함 0. 으로 설정 할시 Never blocking 개념
# rclpy.spin(node=node_obj) # 이렇게 하면 spin 들어간 이후에 실행권을 가져오지 못함
await asyncio.sleep(0.0) # 코루틴 함수내에 await sleep 이 없으면 subscribe_callback이 실행이 안됨
executor = rclpy.get_global_executor()
print('subscription ready : ', executor.context.ok())
while executor.context.ok():
await asyncio.sleep(0.05) # while 문 속도제어 용 sleep
await spin_node(self._node_obj)
# await asyncio.create_task(self.spin_subscription(node_obj)) # 동일한 내용 이렇게도 쓸 수 있음.
# asyncio.create_task(self.spin_subscription(node_obj))
async def publish_timer(self):
msg = String()
dict_msg_type_1 = {
1:2
}
dict_msg_type_2 = {
3:4
}
dict_msg_type_3 = {
5:6
}
dict_msg_type_4 = {
7:8
}
while rclpy.ok():
msg.data = json.dumps(dict_msg_type_1,ensure_ascii=True)
self.publisher_dict[PUB_TOPIC_1].publish(msg)
# self._node_obj._pub_1.publish(msg)
# self.pub_1.publish(msg)
await asyncio.sleep(1)
msg.data = json.dumps(dict_msg_type_2, ensure_ascii=True)
self.publisher_dict[PUB_TOPIC_2].publish(msg)
# self._node_obj._pub_2.publish(msg)
# self.pub_2.publish(msg)
await asyncio.sleep(1)
msg.data = json.dumps(dict_msg_type_3, ensure_ascii=True)
self.publisher_dict[PUB_TOPIC_3].publish(msg)
# self._node_obj._pub_3.publish(msg)
# self.pub_3.publish(msg)
await asyncio.sleep(1)
msg.data = json.dumps(dict_msg_type_4, ensure_ascii=True)
self.publisher_dict[PUB_TOPIC_4].publish(msg)
# self._node_obj._pub_4.publish(msg)
# self.pub_4.publish(msg)
await asyncio.sleep(1)
# ============================== 코루틴으로 돌려주는 부분 ===============================
# ====================================================================
# ROS manager.py =======================================================
# ====================================================================
class RosManager():
def __init__(self):
self.ros_agent = RosAgent()
def make_loop(self):
while not rclpy.ok():
time.sleep(0.01)
self.main_loop = asyncio.get_event_loop()
# self.main_loop.add_signal_handler(signal.SIGINT, self.ask_exit)
# self.main_loop.add_signal_handler(signal.SIGTERM, self.ask_exit)
self.main_loop.run_until_complete(self.make_task())
# self.main_loop.run_forever()
async def make_task(self):
# ros2_task = asyncio.create_task(self.execute_spin())
# mqtt_task = asyncio.create_task(self.mqtt_agent_main())
await asyncio.gather(
self.ros_agent.ros_task_main(),
self.another_async()
)
print('TASK END ERROR ', flush=True)
async def another_async(self):
while True:
await asyncio.sleep(5)
# print('TASK Traffic ', flush=True)
if __name__ == '__main__':
rclpy.init()
a = RosManager()
a.make_loop()
Subscription 노드와 , Publisher 노드로 나눠서 진행한 코드
node_subscription.py
import time
import rclpy
from rclpy.node import Node
from rclpy.subscription import Subscription
from rclpy.qos import QoSProfile
from rclpy.node import Node
from dataclasses import dataclass
from typing import TypeVar, Union, Callable, Any
from std_msgs.msg import String
import asyncio
import signal
from typing import Callable
# ====================================================================
# ROS handler.py =====================================================
# ====================================================================
@dataclass
class SubInfoType:
msg_type: Any
topic_name: str
callback: Callable
qos_profile: Union[rclpy.qos.QoSProfile, int]
class SubHandler(Node):
def __init__(self, node_name):
super().__init__(node_name)
def return_subscription(self, sub_info: SubInfoType) -> Subscription:
return self.create_subscription(
sub_info.msg_type,
sub_info.topic_name,
sub_info.callback,
sub_info.qos_profile
)
def return_sub_dataclass_type(self, msg_type, topic, callback, qos) -> SubInfoType:
sub_info_1 = SubInfoType(
msg_type,
topic,
callback[topic],
qos
)
return sub_info_1
def get_node_obj(self) -> Node:
return self
# ====================================================================
# ROS agent.py =======================================================
# ====================================================================
NODE_NAME_1 = 'SUB_NODE'
TOPIC_1='/topic_1'
TOPIC_2='/topic_2'
class SubAgent():
def __init__(self):
if rclpy.ok():
pass
else:
rclpy.init()
self.callback_dict = {}
self.sub_handler = SubHandler(NODE_NAME_1)
self._node_obj: Node = self.sub_handler.get_node_obj()
self.binding_subscription_with_node()
def binding_subscription_with_node(self):
self.callback_dict[TOPIC_1] = self.listener_callback_1
sub_dataclass_1 = self.sub_handler.return_sub_dataclass_type(String, TOPIC_1, self.callback_dict, 10)
self._node_obj._sub_1 = self.sub_handler.return_subscription(sub_dataclass_1)
self.callback_dict[TOPIC_2] = self.listener_callback_2
sub_dataclass_2 = self.sub_handler.return_sub_dataclass_type(String, TOPIC_2, self.callback_dict, 10)
self._node_obj._sub_2 = self.sub_handler.return_subscription(sub_dataclass_2)
def listener_callback_1(self, msg):
self._node_obj.get_logger().info('listner_11 I heard: "%s"' % msg.data)
#ros2 topic pub -1 /topic_1 std_msgs/msg/String data:\ \'HIHI\'\
def listener_callback_2(self, msg):
self._node_obj.get_logger().info('listner_22 I heard: "%s"' % msg.data)
#ros2 topic pub -1 /topic_2 std_msgs/msg/String data:\ \'HIHI\'\
# ================================ SUBSCRIPTION 정의================================
async def ros_sub_main(self):
asyncio.create_task(self.execute_sub_spin())
async def execute_sub_spin(self):
async def spin_subscription(node_obj):
rclpy.spin_once(node=node_obj,
timeout_sec=0.) # timeout_sec 시간 만큼 실행권 가져오지 못함 0. 으로 설정 할시 Never blocking 개념
# rclpy.spin(node=node_obj) # 이렇게 하면 spin 들어간 이후에 실행권을 가져오지 못함
await asyncio.sleep(0.0) # 코루틴 함수내에 await sleep 이 없으면 subscribe_callback이 실행이 안됨
executor = rclpy.get_global_executor()
print('subscription ready : ', executor.context.ok())
while executor.context.ok():
await asyncio.sleep(0.05) # while 문 속도제어 용 sleep
await spin_subscription(self._node_obj)
# await asyncio.create_task(self.spin_subscription(node_obj)) # 동일한 내용 이렇게도 쓸 수 있음.
# asyncio.create_task(self.spin_subscription(node_obj))
# ============================== 코루틴으로 돌려주는 부분 ===============================
# ====================================================================
# ROS manager.py =======================================================
# ====================================================================
class RosManager():
def __init__(self):
self.ros_agent = SubAgent()
def make_loop(self):
while not rclpy.ok():
time.sleep(0.01)
self.main_loop = asyncio.get_event_loop()
# self.main_loop.add_signal_handler(signal.SIGINT, self.ask_exit)
# self.main_loop.add_signal_handler(signal.SIGTERM, self.ask_exit)
self.main_loop.run_until_complete(self.make_task())
# self.main_loop.run_forever()
async def make_task(self):
# ros2_task = asyncio.create_task(self.execute_spin())
# mqtt_task = asyncio.create_task(self.mqtt_agent_main())
await asyncio.gather(
self.ros_agent.ros_sub_main(),
self.another_async()
)
print('TASK END ERROR ', flush=True)
async def another_async(self):
while True:
await asyncio.sleep(1)
# print('TASK Traffic ', flush=True)
if __name__ == '__main__':
rclpy.init()
a = RosManager()
a.make_loop()
node_publisher.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import time
import rclpy
from rclpy.node import Node
from rclpy.publisher import Publisher
from rclpy.qos import QoSProfile
from rclpy.node import Node
from dataclasses import dataclass
from typing import TypeVar, Union, Callable, Any
from std_msgs.msg import String
import asyncio
import signal
from typing import Callable
# ====================================================================
# ROS handler.py =====================================================
# ====================================================================
@dataclass
class PubInfoType:
msg_type: Any
topic_name: str
qos_profile: Union[rclpy.qos.QoSProfile, int]
class PubHandler(Node):
def __init__(self, node_name):
super().__init__(node_name)
def return_publisher(self, pub_info: PubInfoType) -> Publisher:
return self.create_publisher(
pub_info.msg_type,
pub_info.topic_name,
pub_info.qos_profile
)
def return_pub_dataclass_type(self, msg_type, topic, qos) -> PubInfoType:
pub_info_1 = PubInfoType(
msg_type,
topic,
qos
)
return pub_info_1
def get_node_obj(self) -> Node:
return self
# ====================================================================
# ROS agent.py =======================================================
# ====================================================================
NODE_NAME_1 = 'PUB_NODE'
TOPIC_1 = '/topic_1'
TOPIC_2 = '/topic_2'
class PubAgent(Node):
def __init__(self):
if rclpy.ok():
pass
else:
rclpy.init()
self.pub_handler = PubHandler(NODE_NAME_1)
self._node_obj: Node = self.pub_handler.get_node_obj()
self.binding_publisher_with_node()
self.i = 0
def binding_publisher_with_node(self):
pub_dataclass_1 = self.pub_handler.return_pub_dataclass_type(String, TOPIC_1, 10)
self._node_obj._pub_1 = self.pub_handler.return_publisher(pub_dataclass_1)
pub_dataclass_2 = self.pub_handler.return_pub_dataclass_type(String, TOPIC_2, 10)
self._node_obj._pub_2 = self.pub_handler.return_publisher(pub_dataclass_2)
# super().__init__('minimal_publisher')
# self.publisher_ = self.create_publisher(String, 'topic', 10)
# timer_period = 0.5 # seconds
# ================================ SUBSCRIPTION 정의================================
async def ros_pub_main(self):
asyncio.create_task(self.execute_pub_spin())
asyncio.create_task(self.publish_timer())
async def execute_pub_spin(self):
async def spin_pubscription(node_obj):
rclpy.spin_once(node=node_obj,
timeout_sec=0.) # timeout_sec 시간 만큼 실행권 가져오지 못함 0. 으로 설정 할시 Never blocking 개념
# rclpy.spin(node=node_obj) # 이렇게 하면 spin 들어간 이후에 실행권을 가져오지 못함
await asyncio.sleep(0.0) # 코루틴 함수내에 await sleep 이 없으면 subscribe_callback이 실행이 안됨
executor = rclpy.get_global_executor()
print('pubscription ready : ', executor.context.ok())
while executor.context.ok():
await asyncio.sleep(0.05) # while 문 속도제어 용 sleep
await spin_pubscription(self._node_obj)
# await asyncio.create_task(self.spin_subscription(node_obj)) # 동일한 내용 이렇게도 쓸 수 있음.
# asyncio.create_task(self.spin_subscription(node_obj))
async def publish_timer(self):
str = String()
while rclpy.ok():
str.data = '{} : HI'.format(self.i)
self._node_obj._pub_1.publish(str)
self.i = self.i + 1
await asyncio.sleep(1)
str.data = '{} : HI'.format(self.i)
self._node_obj._pub_1.publish(str)
self.i = self.i + 1
await asyncio.sleep(1)
str.data = '{} : HI'.format(self.i)
self._node_obj._pub_2.publish(str)
self.i = self.i + 1
await asyncio.sleep(1)
# ============================== 코루틴으로 돌려주는 부분 ===============================
# ====================================================================
# ROS manager.py =======================================================
# ====================================================================
class RosManager():
def __init__(self):
self.ros_agent = PubAgent()
def make_loop(self):
while not rclpy.ok():
time.sleep(0.01)
self.main_loop = asyncio.get_event_loop()
# self.main_loop.add_signal_handler(signal.SIGINT, self.ask_exit)
# self.main_loop.add_signal_handler(signal.SIGTERM, self.ask_exit)
self.main_loop.run_until_complete(self.make_task())
# self.main_loop.run_forever()
async def make_task(self):
# ros2_task = asyncio.create_task(self.execute_spin())
# mqtt_task = asyncio.create_task(self.mqtt_agent_main())
await asyncio.gather(
self.ros_agent.ros_pub_main(),
self.another_async()
)
print('TASK END ERROR ', flush=True)
async def another_async(self):
while True:
await asyncio.sleep(1)
# print('TASK Traffic ', flush=True)
if __name__ == '__main__':
rclpy.init()
a = RosManager()
a.make_loop()
'언어 정리 > python_비동기관련_lib' 카테고리의 다른 글
asyncio : 동일한 Task 5번이상 누적 방어 코드 (0) | 2023.06.10 |
---|---|
비동기인풋, toolbar 터미널출력 << prompt-toolkit, curses (0) | 2022.12.22 |
asyncio로 ros2_spin 구동예제 ( handler-agent-manager ) (0) | 2022.09.17 |
코루틴 과 Eventloop 그리고 Future (1) | 2022.09.14 |
agent,handler 모음 (ros2, gmqtt, asyncio.Queue) (1) | 2022.09.12 |
댓글