handler - agent - manager 구조
노드 1개
퍼블리셔 2개
서브스크립션 2개
async 함수를 활용해서 spin_once 함수 돌림
퍼블리셔나 서브스크립션에 대한 config 값은 manager에서 정의해주고
Publisher 객체를 manager 단에 가져와서 manager에서 발행, subscription의 subscriber callback 도 manager에서 정의후 등록
handler <--- 객체 생성 관계 -----> agnet
agnet <----- 상속 관계 -----> manager
코드 :
node_with_multiple_sub_pub.py
import asyncio
from std_msgs.msg import String
import threading
import rclpy
from rclpy.executors import MultiThreadedExecutor, SingleThreadedExecutor
from rclpy.node import Node
import time
# -*- coding: utf-8 -*-
import rclpy
from rclpy.qos import QoSProfile
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
from rclpy.node import Publisher
from rclpy.node import Subscription
from threading import Thread
import time
from dataclasses import dataclass
from typing import TypeVar, Union, Callable, Any
MsgType = TypeVar('MsgType')
# ------------------------------------------- Handler Class 쪽 -----------------------------------------------#
"""
ReturnSubType, ReturnPubType class 로 Ros2Handler의 파라미터 에 넣어줄 data를 생성하고
Ros2Handler에 를 이용해서 Subscription 과 Publisher 객체를 생성한다.
"""
@dataclass
class SubInfoType:
msg_type: Any
topic_name: str
callback: Callable
qos_profile: Union[rclpy.qos.QoSProfile, int]
@dataclass
class PubInfoType:
msg_type: Any
topic_name: str
qos_profile: Union[rclpy.qos.QoSProfile, int]
class ReturnSubType:
def __init__(self):
self.qos_profile = None
def __call__(self, msg_type: Any, topic_name: str, callback: Callable,
qos_profile: Union[int, QoSProfile]) -> SubInfoType:
if isinstance(qos_profile, int):
self.qos_profile = rclpy.qos.QoSProfile(
reliability=rclpy.qos.QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT,
history=rclpy.qos.QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
depth=qos_profile
)
elif isinstance(qos_profile, QoSProfile):
self.qos_profile = qos_profile
else:
raise TypeError('PubNodeMaker 에서 config TYPE ERROR 발생')
_subscription_type = SubInfoType(
msg_type=msg_type,
topic_name=topic_name,
callback=callback,
qos_profile=self.qos_profile
)
return _subscription_type
class ReturnPubType:
def __init__(self):
self.qos_profile = None
def __call__(self, msg_type: Any, topic_name: str,
qos_profile: Union[int, QoSProfile]) -> PubInfoType:
if isinstance(qos_profile, int):
self.qos_profile = rclpy.qos.QoSProfile(
reliability=rclpy.qos.QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT,
history=rclpy.qos.QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
depth=qos_profile
)
elif isinstance(qos_profile, QoSProfile):
self.qos_profile = qos_profile
else:
raise TypeError('PubNodeMaker 에서 config TYPE ERROR 발생')
self.publisher_type = PubInfoType(
msg_type=msg_type,
topic_name=topic_name,
qos_profile=self.qos_profile
)
return self.publisher_type
class Ros2Handler:
def __init__(self, node_info: Union[SubInfoType, PubInfoType], node_obj: Node):
if isinstance(node_obj, Node):
self.node_obj = node_obj
if isinstance(node_info, SubInfoType):
self._set_up_subscription(node_info)
elif isinstance(node_info, PubInfoType):
self._set_up_publisher(node_info)
else:
raise TypeError("Ros2Handler's 파라미터 타입 에러")
# self.node_obj: Node = node_obj
def _set_up_subscription(self, node_info):
self.node_obj.subscription: Subscription = self.node_obj.create_subscription(
node_info.msg_type,
node_info.topic_name,
node_info.callback,
node_info.qos_profile
)
def _set_up_publisher(self, node_info):
self.node_obj.publisher_: Publisher = self.node_obj.create_publisher(
node_info.msg_type,
node_info.topic_name,
node_info.qos_profile
)
def get_publisher(self) -> Publisher:
""" get_publisher 로 return 받고 '변수.publish(msg)' 로 퍼블리시 해주면 된다. """
return self.node_obj.publisher_
def get_node(self) -> Node:
return self.node_obj
# ------------------------------------------- Agent Class 쪽 -----------------------------------------------#
"""
Ros2Handler 클래쓰를 이용해서 Subscription 과 Publisher 객체를 생성한다.
Manager 단에서 pub sub 하려면
node_obj 에 묶여서 실행되는 Subscription와 달리,
Publisher 는 node_obj의 멤버변수에 선언되있으므로 해당 멤버변수의 return 이 필요하다
"""
class Ros2Agent:
def __init__(self):
if rclpy.ok(): # 방어코드
pass
else:
rclpy.init()
self.node_obj: Node = rclpy.create_node('ros2handler_node') # 공통으로 사용할 Node obj
self.msg: String = String()
self.callback_dictionary = {}
# self._setup_sub(String, "/server_manager/aa", self.callback_dictionary["/server_manager/aa"], 10)
# self._setup_sub(String, "/image_loom/aa/spare", self.callback_dictionary["/image_loom/aa/spare"], 10)
# self.publisher_for_tm = self._setup_pub(String, "/tm/aa", 10) # self.publisher_for_tm.publish(msg) <- 해당 토픽에 퍼블리시가능
# self.publisher_for_module = self._setup_pub(String, "/image_module/aa", 10)
def _setup_sub(self, msg_type: Any, topic_name: str, callback: Callable, qos_profile: Union[int, QoSProfile]):
sub_node_info = ReturnSubType()
sub_node_info = sub_node_info(
msg_type=msg_type, topic_name=topic_name,
callback=callback, qos_profile=qos_profile
)
ros_handler = Ros2Handler(sub_node_info, self.node_obj)
_get_node = ros_handler.get_node()
if ros_handler.get_node() == self.node_obj:
pass
else:
raise ValueError('하나의 노드에 pub,sub 을 binding 하지만, 싱글노드가 아닌 다른 노드가 생성 되었음.')
def setup_pub_and_return_publisher_obj(self, msg_type: Any, topic_name: str, qos_profile: Union[rclpy.qos.QoSProfile, int]) -> Publisher:
pub_node_info = ReturnPubType()
pub_node_info = pub_node_info(msg_type=msg_type, topic_name=topic_name, qos_profile=qos_profile)
ros_handler = Ros2Handler(pub_node_info, self.node_obj)
if ros_handler.get_node() == self.node_obj:
pass
else:
raise ValueError('하나의 노드에 pub,sub 을 binding 하지만, 싱글노드가 아닌 다른 노드가 생성 되었음.')
return ros_handler.get_publisher()
# ------------------------------------------- manager 쪽 호출권장방법 -----------------------------------------------#
"""
Ros2Agent를 상속 받고, _setup_sub 와 _setup_pub 함수를 이용해서 config값을 삽입해서 객체 생성
생성된 Subscription 과 Publisher 객체를 이용해서 발행과 구독을 진행한다.
Subscription, Publisher 은 동작 방식이 조금 다른게
Subscription 는 노드에 등록하고 spin 돌리면 구독 할때마다 callback이 켜지는 구조고
Publisher 는 노드에 등록하고 등록된 멤버변수(Publisher객체를 리턴받은 변수)를 통해서 발행한다.
"""
class manager(Ros2Agent):
def __init__(self):
Ros2Agent.__init__(self)
self.callback_dictionary["/server_manager/aa"] = self.server_manager_subscriber_callback
self.callback_dictionary["/image_loom/aa/spare"] = self.image_loom_subscriber_callback
self._setup_sub(String, "/server_manager/aa", self.callback_dictionary["/server_manager/aa"], 10)
self._setup_sub(String, "/image_loom/aa/spare", self.callback_dictionary["/image_loom/aa/spare"], 10)
self.publisher_for_tm = self.setup_pub_and_return_publisher_obj(String, "/tm/aa", 10) # self.publisher_for_tm.publish(msg) <- 해당 토픽에 퍼블리시가능
self.publisher_for_module = self.setup_pub_and_return_publisher_obj(String, "/image_module/aa", 10)
self.check_subscription_seq_i = 0
# ------------------------------- loop 실행 부분 ---------------------------------#
def test_make_loop(self):
_loop = asyncio.get_event_loop()
_loop.run_until_complete(self.test_make_task())
async def test_make_task(self):
task_1 = asyncio.create_task(self.run_sub())
task_2 = asyncio.create_task(self.test_run_code())
# await task_1, task_2
await asyncio.gather(
task_1,
task_2
)
print('test_make_task END')
async def run_sub(self):
executor = rclpy.get_global_executor()
print('subscription ready : ', executor.context.ok())
while executor.context.ok():
await asyncio.sleep(0.05) # 필요하진 않은데 while 문 속도제어 용 sleep
await self.sub_spin(self.node_obj)
# await asyncio.create_task(self.sub_spin(node_obj)) # 동일한 내용 이렇게도 쓸 수 있음.
# asyncio.create_task(self.sub_spin(node_obj))
async def sub_spin(self, node_obj):
rclpy.spin_once(node=node_obj, timeout_sec=0.) # timeout_sec 시간 만큼 실행권 가져오지 못함 0. 으로 설정 할시 Never blocking 개념
# rclpy.spin(node=node_obj) # 이렇게 하면 spin 들어간 이후에 실행권을 가져오지 못함
await asyncio.sleep(0.0) # 코루틴 함수내에 await sleep 이 없으면 subscribe_callback이 실행이 안됨
async def test_run_code(self):
while True:
self.msg.data = 'publisher_for_tm'
self.publisher_for_tm.publish(self.msg)
await asyncio.sleep(1)
self.msg.data = 'publisher_for_module'
self.publisher_for_module.publish(self.msg)
await asyncio.sleep(1)
#------------------------------- Ros2Agent callback 정의부분 ---------------------------------#
def server_manager_subscriber_callback(self, msg):
print('server_manager_callback msg : [{}], index : [{}]'.format(msg, self.check_subscription_seq_i))
self.check_subscription_seq_i = self.check_subscription_seq_i + 1
def image_loom_subscriber_callback(self, msg):
print('image_loom_callback msg : [{}], index : [{}]'.format(msg, self.check_subscription_seq_i))
self.check_subscription_seq_i = self.check_subscription_seq_i + 1
if __name__ == '__main__':
aa = manager()
aa.test_make_loop()
""" 테스트 방법
ros2 topic pub /server_manager/aa std_msgs/msg/String "data : { 'key_1' : 'val_1' }"
ros2 topic pub /image_loom/aa/spare std_msgs/msg/String "data : { 'key_2' : 'val_2' }"
ros2 topic pub --once /server_manager/aa std_msgs/msg/String "data : { 'key_1' : 'val_1' }"
ros2 topic pub --once /image_loom/aa/spare std_msgs/msg/String "data : { 'key_2' : 'val_2' }"
ros2 topic echo /tm/aa std_msgs/msg/String
ros2 topic echo /image_module/aa std_msgs/msg/String
"""
콘솔 결과창 :
'언어 정리 > python_비동기관련_lib' 카테고리의 다른 글
비동기인풋, toolbar 터미널출력 << prompt-toolkit, curses (0) | 2022.12.22 |
---|---|
코루틴으로 ROS2 작업 (0) | 2022.11.28 |
코루틴 과 Eventloop 그리고 Future (1) | 2022.09.14 |
agent,handler 모음 (ros2, gmqtt, asyncio.Queue) (1) | 2022.09.12 |
asyncio Queue 랑 loop 개념까지 정리 (2) | 2022.09.06 |
댓글