본문 바로가기
언어 정리/python_비동기관련_lib

agent,handler 모음 (ros2, gmqtt, asyncio.Queue)

by 알 수 없는 사용자 2022. 9. 12.

원래는 manager가 ros2, gmqtt, queue  각각 3개 agent를 다뤄주고

ros2 와 gmqtt 는 client - handler - agent 구조

queue 는 client - agent 구조

인데 manager 는 아직 구현 안함. client 는 라이브러리라고 생각하면 된다.

지금 코드는 agent 까지만 구현되있어서 agent끼리 구현해볼 수 있게끔 해놈

 

실행은 agent.py 코드 키면 되고 코드 내부에 출력 결과 적혀있고

subscribe 랑 publish 해보려면 ros2 같은경우엔 명령어로 test

gmqtt 같은 경우엔 mqtt box 로 test 함. ( gmqtt 경우엔 docker 로 emqx 올리고 셋업까지 해서 동작 시킴 )

 

일단 코드만 올려 놓음


 

ros2 handler

# -*- coding: utf-8 -*-
import rclpy
from rclpy.qos import QoSProfile
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
from rclpy.node import Publisher
from rclpy.node import Subscription
from threading import Thread
import time

from dataclasses import dataclass
from typing import TypeVar, Union, Callable, Any

# MsgType = TypeVar('MsgType')
# dataclass 변수어노테이션으로 요긴하게 쓰는중
@dataclass
class SubNodeInfo:
    node_name: str
    msg_type: Any
    topic_name: str
    callback: Callable
    qos_profile: rclpy.qos.QoSProfile


@dataclass
class PubNodeInfo:
    node_name: str
    msg_type: Any
    topic_name: str
    qos_profile: rclpy.qos.QoSProfile


# class SubNodeMaker(SubNodeInfo):
class SubNodeMaker:
    def __init__(self):
        self.qos_profile = None

    def __call__(self, node_name: str, msg_type: Any, topic_name: str, callback: Callable,
                 qos_profile: Union[int, QoSProfile]) -> SubNodeInfo:

        if isinstance(qos_profile, int):
            self.qos_profile = rclpy.qos.QoSProfile(
                reliability=rclpy.qos.QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT,
                history=rclpy.qos.QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
                depth=qos_profile
            )
        elif isinstance(qos_profile, QoSProfile):
            self.qos_profile = qos_profile
        else:
            raise TypeError('PubNodeMaker 에서 config TYPE ERROR 발생')

        _subscription_type = SubNodeInfo(
            node_name=node_name,
            msg_type=msg_type,
            topic_name=topic_name,
            callback=callback,
            qos_profile=self.qos_profile
        )
        return _subscription_type


# class PubNodeMaker(PubNodeInfo):
class PubNodeMaker:
    def __init__(self):
        self.qos_profile = None

    def __call__(self, node_name: str, msg_type: Any, topic_name: str,
                 qos_profile: Union[int, QoSProfile]) -> PubNodeInfo:

        if isinstance(qos_profile, int):
            self.qos_profile = rclpy.qos.QoSProfile(
                reliability=rclpy.qos.QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT,
                history=rclpy.qos.QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
                depth=qos_profile
            )
        elif isinstance(qos_profile, QoSProfile):
            self.qos_profile = qos_profile
        else:
            raise TypeError('PubNodeMaker 에서 config TYPE ERROR 발생')

        self.publisher_type = PubNodeInfo(
            node_name=node_name,
            msg_type=msg_type,
            topic_name=topic_name,
            qos_profile=self.qos_profile
        )
        return self.publisher_type


class Ros2Handler(object):
    """
    코드 의도 :
    멀티쓰레드로 등록된 subscription에 data를 쏴줄 때마다 callback 함수가 호출되는데,
    callback 함수가 호출 될 때 마다 해당된 publisher를 publish해주는 코드
    동작은 50초간 지속

    코드설명 :
    Ros2Handler.init_rclpy() 랑  Ros2Handler.run_executor()는 class method로 따로 켜줘야함.
    init_rclpy() 는 rclpy.init() 용 방어코드(안써도됨),  Ros2Handler.run_executor()는 여태 등록된 executor를 한번에 켜주기 위함
    나머지는 코드 등록 과정
    Ros2Handler 의 파라미터는 커스텀 dataclass로 알맞은 파라미터를 보내지 않으면 에러 발생

    """
    _execute_list = []
    _executor: MultiThreadedExecutor
    def __init__(self, node_info: Union[SubNodeInfo, PubNodeInfo]):  # , _queue: asyncio.Queue):
        """
        """
        self.sub_node_info: SubNodeInfo = None
        self.pub_node_info: PubNodeInfo = None
        self._sub_node_obj: Node = None
        self._pub_node_obj: Node = None
        self.set_executor()
        if isinstance(node_info, SubNodeInfo):
            self.sub_node_info = node_info
            self._set_up_subscription()
        elif isinstance(node_info, PubNodeInfo):
            self.pub_node_info = node_info
            self._set_up_publisher()
        else:
            raise TypeError("class Ros2Handler parameter type ERROR")

    @classmethod
    def init_rclpy(cls):
        """ 한번만 수행되어야 하므로 클래쓰메쏘드 사용 """
        if rclpy.ok():
            pass
        else:
            rclpy.init()

    @classmethod
    def set_executor(cls):
        cls._executor = MultiThreadedExecutor()

    @classmethod
    def remove_node_from_executor(cls):
        """필요시 사용 따로 쓰진 않는중"""
        pass
        # cls._executor.remove_node()

    @classmethod
    def run_executor(cls):
        for _ in range(0, len(cls._execute_list)):
            cls._executor.add_node(cls._execute_list[_])
        while rclpy.ok():
            time.sleep(0.1)  # I/O bounding operation time 발생
            _executor_thread = Thread(target=cls._executor.spin, daemon=True)
            print("ROS2 Executor's runnung nodes number : [{}] ".format(len(cls._execute_list)), flush=True)
            print("ROS2 Executor's runnung nodes names : [{}] ".format(cls._executor.get_nodes()), flush=True)
            _executor_thread.start()
            _executor_thread.join()
            print('run_executor END')

    def _set_up_subscription(self) -> None:
        self._create_node()
        self._node_obj_create_subscription()

    def _set_up_publisher(self) -> None:
        self._create_node()
        self._node_obj_create_publisher()

    def _create_node(self) -> None:
        """
        node object 생성
        """
        if rclpy.ok():
            pass
        else:
            rclpy.init()

        if isinstance(self.sub_node_info, SubNodeInfo):
            print('ROS2 creat_node for subscription', flush=True)
            self._sub_node_obj = rclpy.create_node(self.sub_node_info.node_name)
        elif isinstance(self.pub_node_info, PubNodeInfo):
            print('ROS2 creat_node for publisher', flush=True)
            self._pub_node_obj = rclpy.create_node(self.pub_node_info.node_name)
        else:
            raise TypeError("class Ros2Handler parameter type ERROR")

    def _node_obj_create_subscription(self) -> None:
        """subscribe 노드 생성시 사용 되는 함수"""
        print("create_subscription on ", flush=True)
        self._sub_node_obj.subscription: Subscription = self._sub_node_obj.create_subscription(
            self.sub_node_info.msg_type,
            self.sub_node_info.topic_name,
            self.sub_node_info.callback,
            self.sub_node_info.qos_profile
        )
        self._execute_list.append(self._sub_node_obj)

    def _node_obj_create_publisher(self) -> None:
        """publisher 노드 생성시 사용 되는 함수"""
        print("create_publisher on ", flush=True)
        self._pub_node_obj.publisher: Publisher = self._pub_node_obj.create_publisher(
            self.pub_node_info.msg_type,
            self.pub_node_info.topic_name,
            self.pub_node_info.qos_profile
        )
        self._execute_list.append(self._pub_node_obj)

    def run_publish(self, msg):
        print("ROS2 run_publish on, topic name's: ", self._pub_node_obj.publisher.topic_name, flush=True)
        print('The number of this message published is [{}] '.format(self._pub_node_obj.publisher.get_subscription_count()))
        self._pub_node_obj.publisher.publish(msg)
        # self._pub_node_obj.publisher.destroy()

 

ros2 agent

import os
import sys
import time
import rclpy
from std_msgs.msg import String
import threading
from ros2_handler import SubNodeMaker, PubNodeMaker, Ros2Handler
import enum
from abc import *
from typing import TypeVar, Union, Callable

# manager 로 실행시 필요 없음
import asyncio
import contextvars
import functools

path = os.path.abspath(os.path.dirname(sys.argv[0]))
sys.path.append(path[:path.rfind('/')])
#----------------------------------------------------------------------
# MsgType = TypeVar('MsgType')

class Ros2Agent(object):
    """
    publisher       2개 : 시나리오, 이미지
    subscription    1개 : tm, 또는 모듈 ( 있을지 없을지 모르겠음 있으면 이미지 관련 토픽이지 않을까 )
    """
    def __init__(self):
        Ros2Handler.init_rclpy()
        self._flag_setup_ok = None
        self.publisher_handler: dict = {}
        self.subscription_handler: dict = {}
        self._setup()
        # Ros2Handler.run_executor()

    def _setup(self):
        PUB_TOPIC_1 = "/tm/aa"
        self.publisher_handler[PUB_TOPIC_1] = self._setup_pub(node_name="pub_node_1", msg_type=String,
                                                              topic_name=PUB_TOPIC_1, qos_profile=10)
        PUB_TOPIC_2 = "/image_module/aa"
        self.publisher_handler[PUB_TOPIC_2] = self._setup_pub(node_name="pub_node_2",
                                                              msg_type=String,
                                                              topic_name=PUB_TOPIC_2,
                                                              qos_profile=10)
        SUB_TOPIC_1 = "/server_manager/aa"
        self.subscription_handler[SUB_TOPIC_1] = self._setup_sub(node_name="sub_node_1", msg_type=String,
                                                                 topic_name=SUB_TOPIC_1,
                                                                 callback=self.listener_callback_1, qos_profile=10)
        SUB_TOPIC_2 = "/image_loom/aa/spare"
        self.subscription_handler[SUB_TOPIC_2] = self._setup_sub(node_name="sub_node_2", msg_type=String,
                                                                 topic_name=SUB_TOPIC_2,
                                                                 callback=self.listener_callback_2, qos_profile=10)
        self._flag_setup_ok = 1

    def _setup_pub(self, node_name: str, msg_type, topic_name: str, qos_profile):
        publisher_node_maker = PubNodeMaker()
        ros2_handler_obj = Ros2Handler(
            publisher_node_maker(node_name=node_name, msg_type=msg_type, topic_name=topic_name, qos_profile=qos_profile))
        print(ros2_handler_obj)
        return ros2_handler_obj

    def _setup_sub(self, node_name: str, msg_type, topic_name: str, callback: Callable,
                   qos_profile):
        sub_node_maker = SubNodeMaker()
        ros2_handler_obj = Ros2Handler(
            sub_node_maker(node_name=node_name, msg_type=msg_type, topic_name=topic_name, callback=callback,
                           qos_profile=qos_profile))
        return ros2_handler_obj

    def run_publish(self, topic_name: str, msg):
        """ 적합한 객체의 ros2_handler.py 의 run_publish를 호출 """
        if topic_name in self.publisher_handler.keys():
            self.publisher_handler[topic_name].run_publish(msg)

    @abstractmethod
    def listener_callback_1(self, msg):
        """ subscription 에 대한 콜백 ros2_agent 불러오는 쪽에서 재정의해서 사용할것 """
        print('listener_callback_1 : ',msg)
        pass

    @abstractmethod
    def listener_callback_2(self, msg):
        """ subscription 에 대한 콜백 ros2_agent 불러오는 쪽에서 재정의해서 사용할것 """
        print('listener_callback_2 : ', msg)
        pass

    def run_executor(self):
        Ros2Handler.run_executor()

    # ------------------------------------------- manager 쪽 호출권장방법 -----------------------------------------------#
    def test_make_loop(self):
        while not self._flag_setup_ok and rclpy.ok():
            time.sleep(0.01)
        _loop = asyncio.get_event_loop()
        _loop.run_until_complete(self.test_make_task(_loop))

    async def to_thread(self, func, /, *args, **kwargs):
        """ python 3.9 function 이므로 3.8에서 사용하기 위해서 함수 긁어와서 사용"""
        loop = asyncio.get_event_loop()
        # loop = asyncio.get_running_loop()
        ctx = contextvars.copy_context()
        func_call = functools.partial(ctx.run, func, *args, **kwargs)
        return await loop.run_in_executor(None, func_call)

    async def test_make_task_backup(self, _loop):
        future_object_test = _loop.run_in_executor(None, Ros2Handler.run_executor)
        task_1 = asyncio.create_task(self.test_run_code())
        # await future_object_test, task_1
        await asyncio.gather(
            future_object_test,
            task_1
        )

    async def test_make_task(self, _loop):
        future_object = self.to_thread(self.run_executor)
        task_1 = asyncio.create_task(self.test_run_code())
        await future_object, task_1
        # await asyncio.gather(
        #     future_object,
        #     task_1
        # )

    async def test_run_code(self):
        print('test_run_code')
        while True:
            await asyncio.sleep(1)
            test_str = "{'key':'val'}"
            msg = String()
            msg.data = test_str
            self.run_publish("/image_module/aa", msg)
            self.run_publish("/tm/aa", msg)


if __name__=="__main__":
    aa = Ros2Agent()
    aa.test_make_loop()
"""
ros2 topic echo /image_module/aa std_msgs/msg/String
ros2 topic echo /tm/aa std_msgs/msg/String
ros2 topic pub --once /server_manager/aa std_msgs/msg/String "data : topic_test_1!"
ros2 topic pub --once /image_loom/aa/spare std_msgs/msg/String "data : topic_test_2!"
"""

 

gmqtt handler

# !/usr/bin/python
# -*- coding: utf-8 -*-
#----------------------------------------------------------------------
import os
import sys
path = os.path.abspath(os.path.dirname(sys.argv[0]))
sys.path.append(path[:path.rfind('/')])
#----------------------------------------------------------------------

import asyncio
from gmqtt.client import Client as MQTTClient
from gmqtt.client import Message
from gmqtt.client import Subscription
from gmqtt.client import SubscriptionsHandler
import uvloop

from dataclasses import dataclass
from typing import Any, Union, Sequence

@dataclass
class MqttConfigInfo:
    server_url: str
    server_port: Union[int, str]
    auth_id: str
    auth_password: str
    client_id: str

class MakeMqttParamType:
    def __init__(self):
        pass

    def __call__(self, server_url: str, server_port: Union[int, str], auth_id: str, auth_password: str,
                 client_id: str) -> MqttConfigInfo:
        if isinstance(server_port, str):
            server_port = int(server_port)
        _maker_mqtt_info = MqttConfigInfo(
            server_url=server_url,
            server_port=server_port,
            auth_id=auth_id,
            auth_password=auth_password,
            client_id=client_id,
        )
        return _maker_mqtt_info


asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
STOP = asyncio.Event()


class MqttHandler(object):
    """
    gmqtt lib를 이용해서 EMQX에 접속 및 data송수신을 위한 class
    """
    def __init__(self, mqtt_info: MqttConfigInfo):
        self._on_connect = None
        self.mqtt_info = mqtt_info
        self._client = MQTTClient(client_id=self.mqtt_info.client_id,
                                  clean_session=True,
                                  optimistic_acknowledgement=True,
                                  will_message=None,
                                  maximum_packet_size=2000000,
                                  receive_maximum=24000)

    def ask_exit(self):
        STOP.set()

    async def connect_emqx(self):
        print('trying connect_EMQX NOW', flush=True)
        self._client.set_auth_credentials(username=self.mqtt_info.auth_id,
                                          password=self.mqtt_info.auth_password)
        await self._client.connect(host=self.mqtt_info.server_url,
                                   port=self.mqtt_info.server_port,
                                   ssl=False, keepalive=10)
        print('trying connect_EMQX COMPLETE ', flush=True)

    # 서버접속 해제
    async def disconnect_emqx(self):
        await self._client.disconnect()

    async def main(self):
        """
        콜백함수id = 커스텀콜백함수id
        getter setter 구조 , 코드 : if not callable(cb): 로 조건 체크해서 set해줌.
        self._client.on_connect = self.on_connect 이거는
        == self._client.on_connect(self.on_connect) 랑 같음 "@_on_connect.setter" 이니까
        """
        self._client.on_connect = self._on_connect
        self._client.on_message = self._on_message
        self._client.on_disconnect = self._on_disconnect
        self._client.on_subscribe = self._on_subscribe
        self._client.on_unsubscribe = self._on_unsubscribe

        await self.connect_emqx()

        await STOP.wait()
        print('mqtt main close now', flush=True)
        await self.disconnect_emqx()

#    def subscribe(self, subscription_or_topic: Union[str, Subscription, Sequence[Subscription]],
#               qos=0, no_local=False, retain_as_published=False, retain_handling_options=0, **kwargs):
    def run_subscribe(self, topic: str, qos: int = 0, no_local=False, retain_as_published=False, retain_handling_options=0):
        """ topic 명으로 안넣고 subscription형식으로 진행함 """
        self._client.subscribe(topic, qos)
        # 이부분은 구현을 아직 못함 emqx 식별자쓰려고 한건데 좀더 헤딩이 필요함
        # make_subscription_list = SubscriptionsHandler()
        # subscriptions_list = make_subscription_list.update_subscriptions_with_subscription_or_topic(topic, qos,
        #                                                                                             no_local,
        #                                                                                             retain_as_published,
        #                                                                                             retain_handling_options)
        # print('11111111111111111',type(subscriptions_list))
        # self._client.subscribe(subscriptions_list)

# def unsubscribe(self, topic: Union[str, Sequence[str]], **kwargs):
    def run_unsubscribe(self, topic):
        """지금은 안쓰는중"""
        self._client.unsubscribe(topic=topic)

# def publish(self, message_or_topic, payload=None, qos=0, retain=False, **kwargs):
    def run_publish(self, topic: str, payload: Union[dict, str, None], qos: int = 0, retain=False,
                    properties: tuple = None) -> Union[int, None]:
        """
        payload 에 지원하는 형시은 list, tuple, dict, int, float, str, None 이지만,
        dict, json형식의 str, None 만 지원하게끔 제한함
        payload의 내부 형식은 json 형식이여야 하고 len으로 데이터를 셀 수 있어야함
        """
        if not self._client.is_connected:
            print("EMQX not connected")
            return -1

        if len(payload) < 268435455:
            if properties:
                publish_msg_type = Message(topic=topic, payload=payload, qos=qos, retain=retain, properties=properties)
            else:
                publish_msg_type = Message(topic=topic, payload=payload, qos=qos, retain=retain)
            self._client.publish(publish_msg_type)
        else:
            print('run_publish parameter Error, topic :', topic, flush=True)

    # Callback function 등록
    # def register_callback_func(self, _event: str, _handler: callable()):
    def register_callback_func(self, _event: str, _handler: callable):
        if isinstance(_event, str) or callable(_handler):
            print('register the callback [{}]'.format(_event), flush=True)
        else:
            raise ValueError
        if 'connect' == _event:
            self._on_connect = _handler
        elif 'message' == _event:
            self._on_message = _handler
        elif 'disconnect' == _event:
            self._on_disconnect = _handler
        elif 'subscribe' == _event:
            self._on_subscribe = _handler
        elif 'unsubscribe' == _event:
            self._on_unsubscribe = _handler


    def _on_connect(self, client, flags, rc, properties):
        print('mqtt_client callback_funtion : [{}] on'.format(self._on_connect.__name__), flush=True)

    def _on_disconnect(self, client, packet, exc=None):
        print('mqtt_client callback_funtion : [{}] on'.format(self._on_disconnect.__name__), flush=True)

    def _on_subscribe(self, client, mid, qos, properties):
        print('mqtt_client callback_funtion : [{}] on'.format(self._on_subscribe.__name__), flush=True)

    def _on_unsubscribe(self, topic):
        print('mqtt_client callback_funtion : [{}] on'.format(self._on_unsubscribe.__name__), flush=True)

    def _on_message(self, client, topic, payload, qos, properties):
        print('mqtt_client callback_funtion : [{}] on'.format(self._on_message.__name__), flush=True)

 

gmqtt agent

import asyncio
import time
import os
import sys

path = os.path.abspath(os.path.dirname(sys.argv[0]))
sys.path.append(path[:path.rfind('/')])
# ----------------------------------------------------------------------
from mqtt_handler import MqttHandler, MakeMqttParamType
from typing import Union

SERVER_BROKER_URL = "192.168.0.94"

class MqttAgent(object):
    """
    디폴트 파라미터 이용해서 config 정보 삽입
    """
    def __init__(self, server_url=SERVER_BROKER_URL, server_port=1883, auth_id='auth_id',
                 auth_password='auth_password', _client_id='server_manager'):
        self.connect_on_flag = None
        self._client_id = _client_id
        self._make_mqtt_param_type = MakeMqttParamType()
        self._mqtt_handler = MqttHandler(
            self._make_mqtt_param_type(server_url=server_url, server_port=server_port,
                                       auth_id=auth_id, auth_password=auth_password,
                                       client_id=self._client_id)
        )
        self._set_up()

    def ask_exit(self):
        self._mqtt_handler.ask_exit()

    def _set_up(self):
        """
        emqx 접속정보, sub정보, pub정보를 전달
        agnet쪽에서 연결
        :return:
        """
        self._ready()

    def _ready(self):
        """
        agent쪽에 비동기 콜백함수를 manager쪽에서 관리
        :return:
        """
        self._mqtt_handler.register_callback_func('connect', self._on_connect)
        self._mqtt_handler.register_callback_func('message', self._on_message)
        self._mqtt_handler.register_callback_func('disconnect', self._on_disconnect)
        self._mqtt_handler.register_callback_func('subscribe', self._on_subscribe)
        self._mqtt_handler.register_callback_func('unsubscribe', self._on_unsubscribe)

    def _on_connect(self, client, flags, rc, properties):
        print('mqtt_client callback_funtion : [{}] on'.format(self._on_connect.__name__), flush=True)
        self.is_connected = True

    def _on_disconnect(self, client, packet, exc=None):
        print('mqtt_client callback_funtion : [{}] on'.format(self._on_disconnect.__name__), flush=True)
        self.is_connected = False

    def _on_subscribe(self, client, mid, qos, properties):
        print('mqtt_client callback_funtion : [{}] on'.format(self._on_subscribe.__name__), flush=True)

    def _on_unsubscribe(self, topic):
        print('mqtt_client callback_funtion : [{}] on'.format(self._on_unsubscribe.__name__), flush=True)

    def _on_message(self, client, topic, payload, qos, properties):
        print('mqtt_client callback_funtion : [{}] on'.format(self._on_message.__name__), flush=True)
        print('for debugging! -> topic : [{}], payload : [{}], '
              'qos : [{}], properties : [{}] ,'.format(topic, payload, qos, properties), flush=True)

    def run_subscribe(self, topic: str, qos: int = 0, no_local=False,
                      retain_as_published=False, retain_handling_options=0, **kwargs):
        """ subscribe 필수 조건 : topic 명 """
        print('subscribing this topic : [{}]'.format(topic),flush=True)
        subscription_identifier = kwargs.get('subscription_identifier')
        self._mqtt_handler.run_subscribe(topic, qos, no_local, retain_as_published, retain_handling_options)

    def run_publish(self, topic: str, payload: Union[dict, str, None], qos: int = 0, properties: tuple = None):
        """ publish 필수조건 : topic 명"""
        self._mqtt_handler.run_publish(topic=topic, payload=payload, qos=qos, retain=False, properties=properties)

    @property
    def prop_client_id(self):                #getter
        return self._client_id

    # @prop_client_id.setter                #setter
    # def prop_client_id(self, value):
    #     self._client_id = value

    @property
    def is_connected(self):             #getter
        return self.connect_on_flag

    @is_connected.setter                #setter
    def is_connected(self, value):
        self.connect_on_flag = value

# ------------------------------------------- manager 쪽 호출권장방법 -----------------------------------------------#
    def test_make_loop(self):
        _loop = asyncio.get_event_loop()
        _loop.run_until_complete(self.test_make_task())

    async def test_make_task(self):
        task_1 = asyncio.create_task(self._mqtt_handler.main())
        task_2 = asyncio.create_task(self.test_run_code())
        await task_1, task_2
        print('test_make_task END')
        # await asyncio.gather(
        #     task_1,
        #     self.run_queue_agent()
        # )

    async def test_run_code(self):
        # mqtt subscribe는 이런식으로 하면되고
        while not (self._client_id and self.is_connected):
            await asyncio.sleep(0.01)
        _set_topic_name = ('/{}/#'.format(self._client_id))
        # print(' mqtt client id : [{}] , mqtt_connected : [{}]'.format(self._client_id, self.is_connected))
        self.run_subscribe(topic=_set_topic_name, qos=0, no_local=False, retain_as_published=False,
                           retain_handling_options=0, subscription_identifier='1')
        # mqtt publish는 이런식으로 하면된다.
        while True:
            await asyncio.sleep(3)
            self.run_publish('/test_publish/topic_1', "{'test1': 'hi1'}")
            b = {'test2': 'hi2'}
            self.run_publish('/test_publish/topic_2', b)

if __name__ == "__main__":
    aa = MqttAgent(server_url="172.18.252.9")
    aa.test_make_loop()

"""
출력 결과 :
register the callback [connect]
register the callback [message]
register the callback [disconnect]
register the callback [subscribe]
register the callback [unsubscribe]
trying connect_EMQX NOW
mqtt_client callback_funtion : [_on_connect] on
trying connect_EMQX COMPLETE
subscribing this topic : [/server_manager/#]
mqtt_client callback_funtion : [_on_subscribe] on
########## 여기 까지 동작후 subscribe 대기 상태 mqtt box 로 publish 2 번 한 결과가 밑에 부분     ##########
########## /server_manager 를 subscribe 하는 중이다.                                       ##########
mqtt_client callback_funtion : [_on_message] on
for debugging! -> topic : [/server_manager], payload : [b"{'hello':'world'}"], qos : [0], properties : [{'dup': 0, 'retain': 0}] ,
mqtt_client callback_funtion : [_on_message] on
for debugging! -> topic : [/server_manager], payload : [b"{'hello':'world'}"], qos : [0], properties : [{'dup': 0, 'retain': 0}] ,
########## + 콘솔에 출력은 안햇지만 mqtt box로 subscrbie 해보면 계속 publish 하는걸 알 수 있다.    ##########
########## /test_publish/topic_2 랑 /test_publish/topic_1 를 subscribe 해보면 결과 나옴      ##########

"""

 

asyncio.Queue agent

import asyncio
import time
from asyncio.queues import Queue

class AsyncQueue:
    """
    코드 설명 :
    ros2 로 받아서 queue 를 통해서 mqtt에게 보내고
    mqtt 로 받아서 queue 를 통해서 ros2에게 보내려고 만든 코드
    queue 방향성은 단뱡향 ( Thread-safe 하지 않은 asyncio.Queue 를 쓰기 위해서 )이고 2개 선언함.
    put_nowait 를 하고 나서 asyncio.get 을 하는 구조이다.
    """
    def __init__(self):
        # self.ros2_to_mqtt: Queue = None
        # self.mqtt_to_ros2: Queue = None
        self.ros2_to_mqtt = Queue()
        self.mqtt_to_ros2 = Queue()
        self.ros2_to_mqtt_index = 0
        self.mqtt_to_ros2_index = 0

# -------------------------------------- ros2 -> queue put -> queue get -> mqtt ------------------------------------------#
    async def ros2_data_put(self, _msg):
        # self.mqtt_to_ros2 = Queue()
        self.mqtt_to_ros2.put_nowait(_msg)
        task_1 = asyncio.create_task(self.mqtt_data_get(self.mqtt_get_callback))
        await task_1
        await self.mqtt_to_ros2.join()
        # print('\t\tputter END once')

    async def mqtt_data_get(self, queue_callback):
        msg = await self.mqtt_to_ros2.get()
        # get 한 data 콜백으로 전달
        queue_callback(msg)
        # 큐 속도 제어 : 10Hz 로 queue에 있는 data를 get 함
        await asyncio.sleep(0.1)
        self.mqtt_to_ros2.task_done()
        # print('queue_size [{}]'.format(self.queue.qsize()), ' getter END once')

#-------------------------------------- mqtt -> queue put -> queue get  -> ros2 ------------------------------------------#
    async def mqtt_data_put(self, _msg):
        # self.ros2_to_mqtt = Queue()
        self.ros2_to_mqtt.put_nowait(_msg)
        task_1 = asyncio.create_task(self.ros2_data_get(self.ros2_get_callback))
        await task_1
        await self.ros2_to_mqtt.join()
        # print('\t\tputter END once')

    async def ros2_data_get(self, queue_callback):
        msg = await self.ros2_to_mqtt.get()
        # get 한 data 콜백으로 전달
        queue_callback(msg)
        # 큐 속도 제어 : 10Hz 로 queue에 있는 data를 get 함
        await asyncio.sleep(0.1)
        self.ros2_to_mqtt.task_done()
        # print('queue_size [{}]'.format(self.queue.qsize()), ' getter END once')

# ---------------------------------------------- callback 선언부 --------------------------------------------------#
    def ros2_get_callback(self, get_msg):
        self.ros2_to_mqtt_index = self.ros2_to_mqtt_index + 1
        print('ros2_get_callback : [{}], index : [{}]'.format(get_msg, self.ros2_to_mqtt_index))

    def mqtt_get_callback(self, get_msg):
        self.mqtt_to_ros2_index = self.mqtt_to_ros2_index + 1
        print('mqtt_get_callback : [{}], index : [{}]'.format(get_msg, self.mqtt_to_ros2_index))

# ------------------------------------------- manager 쪽 호출권장방법 -----------------------------------------------#
    def test_make_loop(self):
        _loop = asyncio.get_event_loop()
        _loop.run_until_complete(self.test_make_task())

    async def test_make_task(self):
        while True:
            task_1 = asyncio.create_task(self.test_run_code())
            await task_1
        # await asyncio.gather(
        #     task_1
        # )

    async def test_run_code(self):
        test_data = "{'key':'val','key2':'val2'}"
        task_mqtt_data_put = asyncio.create_task(self.mqtt_data_put(test_data))
        task_ros2_data_put = asyncio.create_task(self.ros2_data_put(test_data))
        await asyncio.sleep(1)
        await task_mqtt_data_put
        await task_ros2_data_put



if __name__ == '__main__':
    # async_queue = AsyncQueue()
    # test_data = "{'key':'val','key2':'val2'}"
    #
    # while True:
    #     time.sleep(1)
    #     async_queue.test_make_loop()

    test_data = "{'key':'val','key2':'val2'}"
    aa = AsyncQueue()
    aa.test_make_loop()

"""
출력 결과 :
ros2_get_callback : [{'key':'val','key2':'val2'}], index : [1]
mqtt_get_callback : [{'key':'val','key2':'val2'}], index : [1]
ros2_get_callback : [{'key':'val','key2':'val2'}], index : [2]
mqtt_get_callback : [{'key':'val','key2':'val2'}], index : [2]
ros2_get_callback : [{'key':'val','key2':'val2'}], index : [3]
mqtt_get_callback : [{'key':'val','key2':'val2'}], index : [3]
ros2_get_callback : [{'key':'val','key2':'val2'}], index : [4]
mqtt_get_callback : [{'key':'val','key2':'val2'}], index : [4]
ros2_get_callback : [{'key':'val','key2':'val2'}], index : [5]
mqtt_get_callback : [{'key':'val','key2':'val2'}], index : [5]
"""

 

댓글