원래는 manager가 ros2, gmqtt, queue 각각 3개 agent를 다뤄주고
ros2 와 gmqtt 는 client - handler - agent 구조
queue 는 client - agent 구조
인데 manager 는 아직 구현 안함. client 는 라이브러리라고 생각하면 된다.
지금 코드는 agent 까지만 구현되있어서 agent끼리 구현해볼 수 있게끔 해놈
실행은 agent.py 코드 키면 되고 코드 내부에 출력 결과 적혀있고
subscribe 랑 publish 해보려면 ros2 같은경우엔 명령어로 test
gmqtt 같은 경우엔 mqtt box 로 test 함. ( gmqtt 경우엔 docker 로 emqx 올리고 셋업까지 해서 동작 시킴 )
일단 코드만 올려 놓음
ros2 handler
# -*- coding: utf-8 -*-
import rclpy
from rclpy.qos import QoSProfile
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
from rclpy.node import Publisher
from rclpy.node import Subscription
from threading import Thread
import time
from dataclasses import dataclass
from typing import TypeVar, Union, Callable, Any
# MsgType = TypeVar('MsgType')
# dataclass 변수어노테이션으로 요긴하게 쓰는중
@dataclass
class SubNodeInfo:
node_name: str
msg_type: Any
topic_name: str
callback: Callable
qos_profile: rclpy.qos.QoSProfile
@dataclass
class PubNodeInfo:
node_name: str
msg_type: Any
topic_name: str
qos_profile: rclpy.qos.QoSProfile
# class SubNodeMaker(SubNodeInfo):
class SubNodeMaker:
def __init__(self):
self.qos_profile = None
def __call__(self, node_name: str, msg_type: Any, topic_name: str, callback: Callable,
qos_profile: Union[int, QoSProfile]) -> SubNodeInfo:
if isinstance(qos_profile, int):
self.qos_profile = rclpy.qos.QoSProfile(
reliability=rclpy.qos.QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT,
history=rclpy.qos.QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
depth=qos_profile
)
elif isinstance(qos_profile, QoSProfile):
self.qos_profile = qos_profile
else:
raise TypeError('PubNodeMaker 에서 config TYPE ERROR 발생')
_subscription_type = SubNodeInfo(
node_name=node_name,
msg_type=msg_type,
topic_name=topic_name,
callback=callback,
qos_profile=self.qos_profile
)
return _subscription_type
# class PubNodeMaker(PubNodeInfo):
class PubNodeMaker:
def __init__(self):
self.qos_profile = None
def __call__(self, node_name: str, msg_type: Any, topic_name: str,
qos_profile: Union[int, QoSProfile]) -> PubNodeInfo:
if isinstance(qos_profile, int):
self.qos_profile = rclpy.qos.QoSProfile(
reliability=rclpy.qos.QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT,
history=rclpy.qos.QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
depth=qos_profile
)
elif isinstance(qos_profile, QoSProfile):
self.qos_profile = qos_profile
else:
raise TypeError('PubNodeMaker 에서 config TYPE ERROR 발생')
self.publisher_type = PubNodeInfo(
node_name=node_name,
msg_type=msg_type,
topic_name=topic_name,
qos_profile=self.qos_profile
)
return self.publisher_type
class Ros2Handler(object):
"""
코드 의도 :
멀티쓰레드로 등록된 subscription에 data를 쏴줄 때마다 callback 함수가 호출되는데,
callback 함수가 호출 될 때 마다 해당된 publisher를 publish해주는 코드
동작은 50초간 지속
코드설명 :
Ros2Handler.init_rclpy() 랑 Ros2Handler.run_executor()는 class method로 따로 켜줘야함.
init_rclpy() 는 rclpy.init() 용 방어코드(안써도됨), Ros2Handler.run_executor()는 여태 등록된 executor를 한번에 켜주기 위함
나머지는 코드 등록 과정
Ros2Handler 의 파라미터는 커스텀 dataclass로 알맞은 파라미터를 보내지 않으면 에러 발생
"""
_execute_list = []
_executor: MultiThreadedExecutor
def __init__(self, node_info: Union[SubNodeInfo, PubNodeInfo]): # , _queue: asyncio.Queue):
"""
"""
self.sub_node_info: SubNodeInfo = None
self.pub_node_info: PubNodeInfo = None
self._sub_node_obj: Node = None
self._pub_node_obj: Node = None
self.set_executor()
if isinstance(node_info, SubNodeInfo):
self.sub_node_info = node_info
self._set_up_subscription()
elif isinstance(node_info, PubNodeInfo):
self.pub_node_info = node_info
self._set_up_publisher()
else:
raise TypeError("class Ros2Handler parameter type ERROR")
@classmethod
def init_rclpy(cls):
""" 한번만 수행되어야 하므로 클래쓰메쏘드 사용 """
if rclpy.ok():
pass
else:
rclpy.init()
@classmethod
def set_executor(cls):
cls._executor = MultiThreadedExecutor()
@classmethod
def remove_node_from_executor(cls):
"""필요시 사용 따로 쓰진 않는중"""
pass
# cls._executor.remove_node()
@classmethod
def run_executor(cls):
for _ in range(0, len(cls._execute_list)):
cls._executor.add_node(cls._execute_list[_])
while rclpy.ok():
time.sleep(0.1) # I/O bounding operation time 발생
_executor_thread = Thread(target=cls._executor.spin, daemon=True)
print("ROS2 Executor's runnung nodes number : [{}] ".format(len(cls._execute_list)), flush=True)
print("ROS2 Executor's runnung nodes names : [{}] ".format(cls._executor.get_nodes()), flush=True)
_executor_thread.start()
_executor_thread.join()
print('run_executor END')
def _set_up_subscription(self) -> None:
self._create_node()
self._node_obj_create_subscription()
def _set_up_publisher(self) -> None:
self._create_node()
self._node_obj_create_publisher()
def _create_node(self) -> None:
"""
node object 생성
"""
if rclpy.ok():
pass
else:
rclpy.init()
if isinstance(self.sub_node_info, SubNodeInfo):
print('ROS2 creat_node for subscription', flush=True)
self._sub_node_obj = rclpy.create_node(self.sub_node_info.node_name)
elif isinstance(self.pub_node_info, PubNodeInfo):
print('ROS2 creat_node for publisher', flush=True)
self._pub_node_obj = rclpy.create_node(self.pub_node_info.node_name)
else:
raise TypeError("class Ros2Handler parameter type ERROR")
def _node_obj_create_subscription(self) -> None:
"""subscribe 노드 생성시 사용 되는 함수"""
print("create_subscription on ", flush=True)
self._sub_node_obj.subscription: Subscription = self._sub_node_obj.create_subscription(
self.sub_node_info.msg_type,
self.sub_node_info.topic_name,
self.sub_node_info.callback,
self.sub_node_info.qos_profile
)
self._execute_list.append(self._sub_node_obj)
def _node_obj_create_publisher(self) -> None:
"""publisher 노드 생성시 사용 되는 함수"""
print("create_publisher on ", flush=True)
self._pub_node_obj.publisher: Publisher = self._pub_node_obj.create_publisher(
self.pub_node_info.msg_type,
self.pub_node_info.topic_name,
self.pub_node_info.qos_profile
)
self._execute_list.append(self._pub_node_obj)
def run_publish(self, msg):
print("ROS2 run_publish on, topic name's: ", self._pub_node_obj.publisher.topic_name, flush=True)
print('The number of this message published is [{}] '.format(self._pub_node_obj.publisher.get_subscription_count()))
self._pub_node_obj.publisher.publish(msg)
# self._pub_node_obj.publisher.destroy()
ros2 agent
import os
import sys
import time
import rclpy
from std_msgs.msg import String
import threading
from ros2_handler import SubNodeMaker, PubNodeMaker, Ros2Handler
import enum
from abc import *
from typing import TypeVar, Union, Callable
# manager 로 실행시 필요 없음
import asyncio
import contextvars
import functools
path = os.path.abspath(os.path.dirname(sys.argv[0]))
sys.path.append(path[:path.rfind('/')])
#----------------------------------------------------------------------
# MsgType = TypeVar('MsgType')
class Ros2Agent(object):
"""
publisher 2개 : 시나리오, 이미지
subscription 1개 : tm, 또는 모듈 ( 있을지 없을지 모르겠음 있으면 이미지 관련 토픽이지 않을까 )
"""
def __init__(self):
Ros2Handler.init_rclpy()
self._flag_setup_ok = None
self.publisher_handler: dict = {}
self.subscription_handler: dict = {}
self._setup()
# Ros2Handler.run_executor()
def _setup(self):
PUB_TOPIC_1 = "/tm/aa"
self.publisher_handler[PUB_TOPIC_1] = self._setup_pub(node_name="pub_node_1", msg_type=String,
topic_name=PUB_TOPIC_1, qos_profile=10)
PUB_TOPIC_2 = "/image_module/aa"
self.publisher_handler[PUB_TOPIC_2] = self._setup_pub(node_name="pub_node_2",
msg_type=String,
topic_name=PUB_TOPIC_2,
qos_profile=10)
SUB_TOPIC_1 = "/server_manager/aa"
self.subscription_handler[SUB_TOPIC_1] = self._setup_sub(node_name="sub_node_1", msg_type=String,
topic_name=SUB_TOPIC_1,
callback=self.listener_callback_1, qos_profile=10)
SUB_TOPIC_2 = "/image_loom/aa/spare"
self.subscription_handler[SUB_TOPIC_2] = self._setup_sub(node_name="sub_node_2", msg_type=String,
topic_name=SUB_TOPIC_2,
callback=self.listener_callback_2, qos_profile=10)
self._flag_setup_ok = 1
def _setup_pub(self, node_name: str, msg_type, topic_name: str, qos_profile):
publisher_node_maker = PubNodeMaker()
ros2_handler_obj = Ros2Handler(
publisher_node_maker(node_name=node_name, msg_type=msg_type, topic_name=topic_name, qos_profile=qos_profile))
print(ros2_handler_obj)
return ros2_handler_obj
def _setup_sub(self, node_name: str, msg_type, topic_name: str, callback: Callable,
qos_profile):
sub_node_maker = SubNodeMaker()
ros2_handler_obj = Ros2Handler(
sub_node_maker(node_name=node_name, msg_type=msg_type, topic_name=topic_name, callback=callback,
qos_profile=qos_profile))
return ros2_handler_obj
def run_publish(self, topic_name: str, msg):
""" 적합한 객체의 ros2_handler.py 의 run_publish를 호출 """
if topic_name in self.publisher_handler.keys():
self.publisher_handler[topic_name].run_publish(msg)
@abstractmethod
def listener_callback_1(self, msg):
""" subscription 에 대한 콜백 ros2_agent 불러오는 쪽에서 재정의해서 사용할것 """
print('listener_callback_1 : ',msg)
pass
@abstractmethod
def listener_callback_2(self, msg):
""" subscription 에 대한 콜백 ros2_agent 불러오는 쪽에서 재정의해서 사용할것 """
print('listener_callback_2 : ', msg)
pass
def run_executor(self):
Ros2Handler.run_executor()
# ------------------------------------------- manager 쪽 호출권장방법 -----------------------------------------------#
def test_make_loop(self):
while not self._flag_setup_ok and rclpy.ok():
time.sleep(0.01)
_loop = asyncio.get_event_loop()
_loop.run_until_complete(self.test_make_task(_loop))
async def to_thread(self, func, /, *args, **kwargs):
""" python 3.9 function 이므로 3.8에서 사용하기 위해서 함수 긁어와서 사용"""
loop = asyncio.get_event_loop()
# loop = asyncio.get_running_loop()
ctx = contextvars.copy_context()
func_call = functools.partial(ctx.run, func, *args, **kwargs)
return await loop.run_in_executor(None, func_call)
async def test_make_task_backup(self, _loop):
future_object_test = _loop.run_in_executor(None, Ros2Handler.run_executor)
task_1 = asyncio.create_task(self.test_run_code())
# await future_object_test, task_1
await asyncio.gather(
future_object_test,
task_1
)
async def test_make_task(self, _loop):
future_object = self.to_thread(self.run_executor)
task_1 = asyncio.create_task(self.test_run_code())
await future_object, task_1
# await asyncio.gather(
# future_object,
# task_1
# )
async def test_run_code(self):
print('test_run_code')
while True:
await asyncio.sleep(1)
test_str = "{'key':'val'}"
msg = String()
msg.data = test_str
self.run_publish("/image_module/aa", msg)
self.run_publish("/tm/aa", msg)
if __name__=="__main__":
aa = Ros2Agent()
aa.test_make_loop()
"""
ros2 topic echo /image_module/aa std_msgs/msg/String
ros2 topic echo /tm/aa std_msgs/msg/String
ros2 topic pub --once /server_manager/aa std_msgs/msg/String "data : topic_test_1!"
ros2 topic pub --once /image_loom/aa/spare std_msgs/msg/String "data : topic_test_2!"
"""
gmqtt handler
# !/usr/bin/python
# -*- coding: utf-8 -*-
#----------------------------------------------------------------------
import os
import sys
path = os.path.abspath(os.path.dirname(sys.argv[0]))
sys.path.append(path[:path.rfind('/')])
#----------------------------------------------------------------------
import asyncio
from gmqtt.client import Client as MQTTClient
from gmqtt.client import Message
from gmqtt.client import Subscription
from gmqtt.client import SubscriptionsHandler
import uvloop
from dataclasses import dataclass
from typing import Any, Union, Sequence
@dataclass
class MqttConfigInfo:
server_url: str
server_port: Union[int, str]
auth_id: str
auth_password: str
client_id: str
class MakeMqttParamType:
def __init__(self):
pass
def __call__(self, server_url: str, server_port: Union[int, str], auth_id: str, auth_password: str,
client_id: str) -> MqttConfigInfo:
if isinstance(server_port, str):
server_port = int(server_port)
_maker_mqtt_info = MqttConfigInfo(
server_url=server_url,
server_port=server_port,
auth_id=auth_id,
auth_password=auth_password,
client_id=client_id,
)
return _maker_mqtt_info
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
STOP = asyncio.Event()
class MqttHandler(object):
"""
gmqtt lib를 이용해서 EMQX에 접속 및 data송수신을 위한 class
"""
def __init__(self, mqtt_info: MqttConfigInfo):
self._on_connect = None
self.mqtt_info = mqtt_info
self._client = MQTTClient(client_id=self.mqtt_info.client_id,
clean_session=True,
optimistic_acknowledgement=True,
will_message=None,
maximum_packet_size=2000000,
receive_maximum=24000)
def ask_exit(self):
STOP.set()
async def connect_emqx(self):
print('trying connect_EMQX NOW', flush=True)
self._client.set_auth_credentials(username=self.mqtt_info.auth_id,
password=self.mqtt_info.auth_password)
await self._client.connect(host=self.mqtt_info.server_url,
port=self.mqtt_info.server_port,
ssl=False, keepalive=10)
print('trying connect_EMQX COMPLETE ', flush=True)
# 서버접속 해제
async def disconnect_emqx(self):
await self._client.disconnect()
async def main(self):
"""
콜백함수id = 커스텀콜백함수id
getter setter 구조 , 코드 : if not callable(cb): 로 조건 체크해서 set해줌.
self._client.on_connect = self.on_connect 이거는
== self._client.on_connect(self.on_connect) 랑 같음 "@_on_connect.setter" 이니까
"""
self._client.on_connect = self._on_connect
self._client.on_message = self._on_message
self._client.on_disconnect = self._on_disconnect
self._client.on_subscribe = self._on_subscribe
self._client.on_unsubscribe = self._on_unsubscribe
await self.connect_emqx()
await STOP.wait()
print('mqtt main close now', flush=True)
await self.disconnect_emqx()
# def subscribe(self, subscription_or_topic: Union[str, Subscription, Sequence[Subscription]],
# qos=0, no_local=False, retain_as_published=False, retain_handling_options=0, **kwargs):
def run_subscribe(self, topic: str, qos: int = 0, no_local=False, retain_as_published=False, retain_handling_options=0):
""" topic 명으로 안넣고 subscription형식으로 진행함 """
self._client.subscribe(topic, qos)
# 이부분은 구현을 아직 못함 emqx 식별자쓰려고 한건데 좀더 헤딩이 필요함
# make_subscription_list = SubscriptionsHandler()
# subscriptions_list = make_subscription_list.update_subscriptions_with_subscription_or_topic(topic, qos,
# no_local,
# retain_as_published,
# retain_handling_options)
# print('11111111111111111',type(subscriptions_list))
# self._client.subscribe(subscriptions_list)
# def unsubscribe(self, topic: Union[str, Sequence[str]], **kwargs):
def run_unsubscribe(self, topic):
"""지금은 안쓰는중"""
self._client.unsubscribe(topic=topic)
# def publish(self, message_or_topic, payload=None, qos=0, retain=False, **kwargs):
def run_publish(self, topic: str, payload: Union[dict, str, None], qos: int = 0, retain=False,
properties: tuple = None) -> Union[int, None]:
"""
payload 에 지원하는 형시은 list, tuple, dict, int, float, str, None 이지만,
dict, json형식의 str, None 만 지원하게끔 제한함
payload의 내부 형식은 json 형식이여야 하고 len으로 데이터를 셀 수 있어야함
"""
if not self._client.is_connected:
print("EMQX not connected")
return -1
if len(payload) < 268435455:
if properties:
publish_msg_type = Message(topic=topic, payload=payload, qos=qos, retain=retain, properties=properties)
else:
publish_msg_type = Message(topic=topic, payload=payload, qos=qos, retain=retain)
self._client.publish(publish_msg_type)
else:
print('run_publish parameter Error, topic :', topic, flush=True)
# Callback function 등록
# def register_callback_func(self, _event: str, _handler: callable()):
def register_callback_func(self, _event: str, _handler: callable):
if isinstance(_event, str) or callable(_handler):
print('register the callback [{}]'.format(_event), flush=True)
else:
raise ValueError
if 'connect' == _event:
self._on_connect = _handler
elif 'message' == _event:
self._on_message = _handler
elif 'disconnect' == _event:
self._on_disconnect = _handler
elif 'subscribe' == _event:
self._on_subscribe = _handler
elif 'unsubscribe' == _event:
self._on_unsubscribe = _handler
def _on_connect(self, client, flags, rc, properties):
print('mqtt_client callback_funtion : [{}] on'.format(self._on_connect.__name__), flush=True)
def _on_disconnect(self, client, packet, exc=None):
print('mqtt_client callback_funtion : [{}] on'.format(self._on_disconnect.__name__), flush=True)
def _on_subscribe(self, client, mid, qos, properties):
print('mqtt_client callback_funtion : [{}] on'.format(self._on_subscribe.__name__), flush=True)
def _on_unsubscribe(self, topic):
print('mqtt_client callback_funtion : [{}] on'.format(self._on_unsubscribe.__name__), flush=True)
def _on_message(self, client, topic, payload, qos, properties):
print('mqtt_client callback_funtion : [{}] on'.format(self._on_message.__name__), flush=True)
gmqtt agent
import asyncio
import time
import os
import sys
path = os.path.abspath(os.path.dirname(sys.argv[0]))
sys.path.append(path[:path.rfind('/')])
# ----------------------------------------------------------------------
from mqtt_handler import MqttHandler, MakeMqttParamType
from typing import Union
SERVER_BROKER_URL = "192.168.0.94"
class MqttAgent(object):
"""
디폴트 파라미터 이용해서 config 정보 삽입
"""
def __init__(self, server_url=SERVER_BROKER_URL, server_port=1883, auth_id='auth_id',
auth_password='auth_password', _client_id='server_manager'):
self.connect_on_flag = None
self._client_id = _client_id
self._make_mqtt_param_type = MakeMqttParamType()
self._mqtt_handler = MqttHandler(
self._make_mqtt_param_type(server_url=server_url, server_port=server_port,
auth_id=auth_id, auth_password=auth_password,
client_id=self._client_id)
)
self._set_up()
def ask_exit(self):
self._mqtt_handler.ask_exit()
def _set_up(self):
"""
emqx 접속정보, sub정보, pub정보를 전달
agnet쪽에서 연결
:return:
"""
self._ready()
def _ready(self):
"""
agent쪽에 비동기 콜백함수를 manager쪽에서 관리
:return:
"""
self._mqtt_handler.register_callback_func('connect', self._on_connect)
self._mqtt_handler.register_callback_func('message', self._on_message)
self._mqtt_handler.register_callback_func('disconnect', self._on_disconnect)
self._mqtt_handler.register_callback_func('subscribe', self._on_subscribe)
self._mqtt_handler.register_callback_func('unsubscribe', self._on_unsubscribe)
def _on_connect(self, client, flags, rc, properties):
print('mqtt_client callback_funtion : [{}] on'.format(self._on_connect.__name__), flush=True)
self.is_connected = True
def _on_disconnect(self, client, packet, exc=None):
print('mqtt_client callback_funtion : [{}] on'.format(self._on_disconnect.__name__), flush=True)
self.is_connected = False
def _on_subscribe(self, client, mid, qos, properties):
print('mqtt_client callback_funtion : [{}] on'.format(self._on_subscribe.__name__), flush=True)
def _on_unsubscribe(self, topic):
print('mqtt_client callback_funtion : [{}] on'.format(self._on_unsubscribe.__name__), flush=True)
def _on_message(self, client, topic, payload, qos, properties):
print('mqtt_client callback_funtion : [{}] on'.format(self._on_message.__name__), flush=True)
print('for debugging! -> topic : [{}], payload : [{}], '
'qos : [{}], properties : [{}] ,'.format(topic, payload, qos, properties), flush=True)
def run_subscribe(self, topic: str, qos: int = 0, no_local=False,
retain_as_published=False, retain_handling_options=0, **kwargs):
""" subscribe 필수 조건 : topic 명 """
print('subscribing this topic : [{}]'.format(topic),flush=True)
subscription_identifier = kwargs.get('subscription_identifier')
self._mqtt_handler.run_subscribe(topic, qos, no_local, retain_as_published, retain_handling_options)
def run_publish(self, topic: str, payload: Union[dict, str, None], qos: int = 0, properties: tuple = None):
""" publish 필수조건 : topic 명"""
self._mqtt_handler.run_publish(topic=topic, payload=payload, qos=qos, retain=False, properties=properties)
@property
def prop_client_id(self): #getter
return self._client_id
# @prop_client_id.setter #setter
# def prop_client_id(self, value):
# self._client_id = value
@property
def is_connected(self): #getter
return self.connect_on_flag
@is_connected.setter #setter
def is_connected(self, value):
self.connect_on_flag = value
# ------------------------------------------- manager 쪽 호출권장방법 -----------------------------------------------#
def test_make_loop(self):
_loop = asyncio.get_event_loop()
_loop.run_until_complete(self.test_make_task())
async def test_make_task(self):
task_1 = asyncio.create_task(self._mqtt_handler.main())
task_2 = asyncio.create_task(self.test_run_code())
await task_1, task_2
print('test_make_task END')
# await asyncio.gather(
# task_1,
# self.run_queue_agent()
# )
async def test_run_code(self):
# mqtt subscribe는 이런식으로 하면되고
while not (self._client_id and self.is_connected):
await asyncio.sleep(0.01)
_set_topic_name = ('/{}/#'.format(self._client_id))
# print(' mqtt client id : [{}] , mqtt_connected : [{}]'.format(self._client_id, self.is_connected))
self.run_subscribe(topic=_set_topic_name, qos=0, no_local=False, retain_as_published=False,
retain_handling_options=0, subscription_identifier='1')
# mqtt publish는 이런식으로 하면된다.
while True:
await asyncio.sleep(3)
self.run_publish('/test_publish/topic_1', "{'test1': 'hi1'}")
b = {'test2': 'hi2'}
self.run_publish('/test_publish/topic_2', b)
if __name__ == "__main__":
aa = MqttAgent(server_url="172.18.252.9")
aa.test_make_loop()
"""
출력 결과 :
register the callback [connect]
register the callback [message]
register the callback [disconnect]
register the callback [subscribe]
register the callback [unsubscribe]
trying connect_EMQX NOW
mqtt_client callback_funtion : [_on_connect] on
trying connect_EMQX COMPLETE
subscribing this topic : [/server_manager/#]
mqtt_client callback_funtion : [_on_subscribe] on
########## 여기 까지 동작후 subscribe 대기 상태 mqtt box 로 publish 2 번 한 결과가 밑에 부분 ##########
########## /server_manager 를 subscribe 하는 중이다. ##########
mqtt_client callback_funtion : [_on_message] on
for debugging! -> topic : [/server_manager], payload : [b"{'hello':'world'}"], qos : [0], properties : [{'dup': 0, 'retain': 0}] ,
mqtt_client callback_funtion : [_on_message] on
for debugging! -> topic : [/server_manager], payload : [b"{'hello':'world'}"], qos : [0], properties : [{'dup': 0, 'retain': 0}] ,
########## + 콘솔에 출력은 안햇지만 mqtt box로 subscrbie 해보면 계속 publish 하는걸 알 수 있다. ##########
########## /test_publish/topic_2 랑 /test_publish/topic_1 를 subscribe 해보면 결과 나옴 ##########
"""
asyncio.Queue agent
import asyncio
import time
from asyncio.queues import Queue
class AsyncQueue:
"""
코드 설명 :
ros2 로 받아서 queue 를 통해서 mqtt에게 보내고
mqtt 로 받아서 queue 를 통해서 ros2에게 보내려고 만든 코드
queue 방향성은 단뱡향 ( Thread-safe 하지 않은 asyncio.Queue 를 쓰기 위해서 )이고 2개 선언함.
put_nowait 를 하고 나서 asyncio.get 을 하는 구조이다.
"""
def __init__(self):
# self.ros2_to_mqtt: Queue = None
# self.mqtt_to_ros2: Queue = None
self.ros2_to_mqtt = Queue()
self.mqtt_to_ros2 = Queue()
self.ros2_to_mqtt_index = 0
self.mqtt_to_ros2_index = 0
# -------------------------------------- ros2 -> queue put -> queue get -> mqtt ------------------------------------------#
async def ros2_data_put(self, _msg):
# self.mqtt_to_ros2 = Queue()
self.mqtt_to_ros2.put_nowait(_msg)
task_1 = asyncio.create_task(self.mqtt_data_get(self.mqtt_get_callback))
await task_1
await self.mqtt_to_ros2.join()
# print('\t\tputter END once')
async def mqtt_data_get(self, queue_callback):
msg = await self.mqtt_to_ros2.get()
# get 한 data 콜백으로 전달
queue_callback(msg)
# 큐 속도 제어 : 10Hz 로 queue에 있는 data를 get 함
await asyncio.sleep(0.1)
self.mqtt_to_ros2.task_done()
# print('queue_size [{}]'.format(self.queue.qsize()), ' getter END once')
#-------------------------------------- mqtt -> queue put -> queue get -> ros2 ------------------------------------------#
async def mqtt_data_put(self, _msg):
# self.ros2_to_mqtt = Queue()
self.ros2_to_mqtt.put_nowait(_msg)
task_1 = asyncio.create_task(self.ros2_data_get(self.ros2_get_callback))
await task_1
await self.ros2_to_mqtt.join()
# print('\t\tputter END once')
async def ros2_data_get(self, queue_callback):
msg = await self.ros2_to_mqtt.get()
# get 한 data 콜백으로 전달
queue_callback(msg)
# 큐 속도 제어 : 10Hz 로 queue에 있는 data를 get 함
await asyncio.sleep(0.1)
self.ros2_to_mqtt.task_done()
# print('queue_size [{}]'.format(self.queue.qsize()), ' getter END once')
# ---------------------------------------------- callback 선언부 --------------------------------------------------#
def ros2_get_callback(self, get_msg):
self.ros2_to_mqtt_index = self.ros2_to_mqtt_index + 1
print('ros2_get_callback : [{}], index : [{}]'.format(get_msg, self.ros2_to_mqtt_index))
def mqtt_get_callback(self, get_msg):
self.mqtt_to_ros2_index = self.mqtt_to_ros2_index + 1
print('mqtt_get_callback : [{}], index : [{}]'.format(get_msg, self.mqtt_to_ros2_index))
# ------------------------------------------- manager 쪽 호출권장방법 -----------------------------------------------#
def test_make_loop(self):
_loop = asyncio.get_event_loop()
_loop.run_until_complete(self.test_make_task())
async def test_make_task(self):
while True:
task_1 = asyncio.create_task(self.test_run_code())
await task_1
# await asyncio.gather(
# task_1
# )
async def test_run_code(self):
test_data = "{'key':'val','key2':'val2'}"
task_mqtt_data_put = asyncio.create_task(self.mqtt_data_put(test_data))
task_ros2_data_put = asyncio.create_task(self.ros2_data_put(test_data))
await asyncio.sleep(1)
await task_mqtt_data_put
await task_ros2_data_put
if __name__ == '__main__':
# async_queue = AsyncQueue()
# test_data = "{'key':'val','key2':'val2'}"
#
# while True:
# time.sleep(1)
# async_queue.test_make_loop()
test_data = "{'key':'val','key2':'val2'}"
aa = AsyncQueue()
aa.test_make_loop()
"""
출력 결과 :
ros2_get_callback : [{'key':'val','key2':'val2'}], index : [1]
mqtt_get_callback : [{'key':'val','key2':'val2'}], index : [1]
ros2_get_callback : [{'key':'val','key2':'val2'}], index : [2]
mqtt_get_callback : [{'key':'val','key2':'val2'}], index : [2]
ros2_get_callback : [{'key':'val','key2':'val2'}], index : [3]
mqtt_get_callback : [{'key':'val','key2':'val2'}], index : [3]
ros2_get_callback : [{'key':'val','key2':'val2'}], index : [4]
mqtt_get_callback : [{'key':'val','key2':'val2'}], index : [4]
ros2_get_callback : [{'key':'val','key2':'val2'}], index : [5]
mqtt_get_callback : [{'key':'val','key2':'val2'}], index : [5]
"""
'언어 정리 > python_비동기관련_lib' 카테고리의 다른 글
asyncio로 ros2_spin 구동예제 ( handler-agent-manager ) (0) | 2022.09.17 |
---|---|
코루틴 과 Eventloop 그리고 Future (1) | 2022.09.14 |
asyncio Queue 랑 loop 개념까지 정리 (2) | 2022.09.06 |
이벤트 루프_2 - 예제랑 같이 좀더 자세히 (0) | 2022.08.28 |
다른사람 예제 rclpy.executors 랑 rclpy.task 관련 예시 퍼옴 (0) | 2022.08.17 |
댓글