본문 바로가기
네트워크 정리/통신관련(MQTT,ssh,rosbridge)

gmqtt 예제 ( client -> handler -> agent 구조 )

by 알 수 없는 사용자 2022. 9. 4.

전제로

wsl : 버전 2 ( 버전2 필요함 docker 랑 쓰려면 )

리눅스 : 우분투 20.04 LTS

python : 3.8 version

lib : uvloop, gmqtt 설치 필요

docker 에 EMQX 구현 후, 셋업까지 해서 컨테이너 start 상태

TEST 확인용 프로그램 : MQTT BOX

 


코드 설명 :

3초마다 특정 topic으로 publish (2개) -> mqtt box로 sub해서 확인

특정 topic sub -> mqtt box 로 publish 해서 message 찍어보는 TEST 코드

agent 파일 밑에 __main__ 코드로 실행

 

mqtt box 결과창

 

mqtt_client.py 코드

# !/usr/bin/python
# -*- coding: utf-8 -*-
#----------------------------------------------------------------------
import os
import sys
path = os.path.abspath(os.path.dirname(sys.argv[0]))
sys.path.append(path[:path.rfind('/')])
#----------------------------------------------------------------------

import asyncio
from gmqtt.client import Client as MQTTClient
from gmqtt.client import Message
from gmqtt.client import Subscription
from gmqtt.client import SubscriptionsHandler
import uvloop

from dataclasses import dataclass
from typing import Any, Union, Sequence

@dataclass
class MqttConfigInfo:
    server_url: str
    server_port: Union[int, str]
    auth_id: str
    auth_password: str
    client_id: str

class MakeMqttParamType:
    def __init__(self):
        pass

    def __call__(self, server_url: str, server_port: Union[int, str], auth_id: str, auth_password: str,
                 client_id: str) -> MqttConfigInfo:
        if isinstance(server_port, str):
            server_port = int(server_port)
        _maker_mqtt_info = MqttConfigInfo(
            server_url=server_url,
            server_port=server_port,
            auth_id=auth_id,
            auth_password=auth_password,
            client_id=client_id,
        )
        return _maker_mqtt_info



asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
STOP = asyncio.Event()


class MqttClient(object):
    """
    gmqtt lib를 이용해서 EMQX에 접속 및 data송수신을 위한 class
    """
    def __init__(self, mqtt_info: MqttConfigInfo):
        self._on_connect = None
        self.mqtt_info = mqtt_info
        self._client = MQTTClient(client_id=self.mqtt_info.client_id,
                                  clean_session=True,
                                  optimistic_acknowledgement=True,
                                  will_message=None,
                                  maximum_packet_size=2000000,
                                  receive_maximum=24000)

    def ask_exit(self):
        STOP.set()

    async def connect_emqx(self):
        print('trying connect_EMQX NOW', flush=True)
        self._client.set_auth_credentials(username=self.mqtt_info.auth_id,
                                          password=self.mqtt_info.auth_password)
        await self._client.connect(host=self.mqtt_info.server_url,
                                   port=self.mqtt_info.server_port,
                                   ssl=False, keepalive=10)
        print('trying connect_EMQX END ', flush=True)

    # 서버접속 해제
    async def disconnect_emqx(self):
        await self._client.disconnect()

    async def main(self):
        """
        콜백함수id = 커스텀콜백함수id
        getter setter 구조 , 코드 : if not callable(cb): 로 조건 체크해서 set해줌.
        self._client.on_connect = self.on_connect 이거는
        == self._client.on_connect(self.on_connect) 랑 같음 "@_on_connect.setter" 이니까
        """
        self._client.on_connect = self._on_connect
        self._client.on_message = self._on_message
        self._client.on_disconnect = self._on_disconnect
        self._client.on_subscribe = self._on_subscribe
        self._client.on_unsubscribe = self._on_unsubscribe

        await self.connect_emqx()

        await STOP.wait()
        print('mqtt main close now', flush=True)
        await self.disconnect_emqx()

#    def subscribe(self, subscription_or_topic: Union[str, Subscription, Sequence[Subscription]],
#               qos=0, no_local=False, retain_as_published=False, retain_handling_options=0, **kwargs):
    def run_subscribe(self, topic: str, qos: int = 0, no_local=False, retain_as_published=False, retain_handling_options=0):
        """ topic 명으로 안넣고 subscription형식으로 진행함 """
        print("mqtt_client subscribe this [{}] topic".format(topic),flush=True)
        self._client.subscribe(topic, qos)
        # 이부분은 구현을 아직 못함 emqx 식별자쓰려고 한건데 좀더 헤딩이 필요함
        # make_subscription_list = SubscriptionsHandler()
        # subscriptions_list = make_subscription_list.update_subscriptions_with_subscription_or_topic(topic, qos,
        #                                                                                             no_local,
        #                                                                                             retain_as_published,
        #                                                                                             retain_handling_options)
        # print('11111111111111111',type(subscriptions_list))
        # self._client.subscribe(subscriptions_list)

# def unsubscribe(self, topic: Union[str, Sequence[str]], **kwargs):
    def run_unsubscribe(self, topic):
        """지금은 안쓰는중"""
        self._client.unsubscribe(topic=topic)

# def publish(self, message_or_topic, payload=None, qos=0, retain=False, **kwargs):
    def run_publish(self, topic: str, payload: Union[dict, str, None], qos: int = 0, retain=False, properties: tuple = None):
        """
        payload 에 지원하는 형시은 list, tuple, dict, int, float, str, None 이지만,
        dict, json형식의 str, None 만 지원하게끔 제한함
        payload의 내부 형식은 json 형식이여야 하고 len으로 데이터를 셀 수 있어야함
        """
        if not self._client.is_connected:
            print("EMQX not connected")
            return -1

        if len(payload) < 268435455:
            publish_msg_type = Message(topic=topic, payload=payload, qos=qos, retain=retain, properties=properties)
            self._client.publish(publish_msg_type)
            print('published now ')
        else:
            print('run_publish parameter Error, topic :', topic, flush=True)

    # Callback function 등록
    # def register_callback_func(self, _event: str, _handler: callable()):
    def register_callback_func(self, _event: str, _handler: callable):
        if isinstance(_event, str) or callable(_handler):
            print('register the callback [{}]'.format(_event), flush=True)
        else:
            raise ValueError
        if 'connect' == _event:
            self._on_connect = _handler
        elif 'message' == _event:
            self._on_message = _handler
        elif 'disconnect' == _event:
            self._on_disconnect = _handler
        elif 'subscribe' == _event:
            self._on_subscribe = _handler
        elif 'unsubscribe' == _event:
            self._on_unsubscribe = _handler


    def _on_connect(self, client, flags, rc, properties):
        print('mqtt_client callback_funtion : [{}] on'.format(self._on_connect.__name__), flush=True)

    def _on_disconnect(self, client, packet, exc=None):
        print('mqtt_client callback_funtion : [{}] on'.format(self._on_disconnect.__name__), flush=True)

    def _on_subscribe(self, client, mid, qos, properties):
        print('mqtt_client callback_funtion : [{}] on'.format(self._on_subscribe.__name__), flush=True)

    def _on_unsubscribe(self, topic):
        print('mqtt_client callback_funtion : [{}] on'.format(self._on_unsubscribe.__name__), flush=True)

    def _on_message(self, client, topic, payload, qos, properties):
        print('mqtt_client callback_funtion : [{}] on'.format(self._on_message.__name__), flush=True)

 

 

mqtt_agent.py 코드

import asyncio
import time
import os
import sys

path = os.path.abspath(os.path.dirname(sys.argv[0]))
sys.path.append(path[:path.rfind('/')])
# ----------------------------------------------------------------------
from mqtt_client import MqttClient, MakeMqttParamType
from typing import Union


class MqttAgent(object):
    """
    """
    def __init__(self, server_url="192.168.0.94", server_port=1883, auth_id='connect_mqtt_id',
                 auth_password='connect_mqtt_password', _client_id='client_id'):
        self._connect_on_flag = None
        self._client_id = _client_id
        self._make_mqtt_param_type = MakeMqttParamType()
        self._mqtt_agent = MqttClient(
            self._make_mqtt_param_type(server_url=server_url, server_port=server_port,
                                       auth_id=auth_id, auth_password=auth_password,
                                       client_id=self._client_id)
        )
        self._set_up()

    def ask_exit(self):
        self._mqtt_agent.ask_exit()

    def _set_up(self):
        """
        emqx 접속정보, sub정보, pub정보를 전달
        agnet쪽에서 연결
        :return:
        """
        self._ready()

    def _ready(self):
        """
        agent쪽에 비동기 콜백함수를 manager쪽에서 관리
        :return:
        """
        self._mqtt_agent.register_callback_func('connect', self._on_connect)
        self._mqtt_agent.register_callback_func('message', self._on_message)
        self._mqtt_agent.register_callback_func('disconnect', self._on_disconnect)
        self._mqtt_agent.register_callback_func('subscribe', self._on_subscribe)
        self._mqtt_agent.register_callback_func('unsubscribe', self._on_unsubscribe)

    def _on_connect(self, client, flags, rc, properties):
        print('mqtt_client callback_funtion : [{}] on'.format(self._on_connect.__name__), flush=True)
        _connect_on_flag = True

    def _on_disconnect(self, client, packet, exc=None):
        print('mqtt_client callback_funtion : [{}] on'.format(self._on_disconnect.__name__), flush=True)
        self._connect_on_flag = False

    def _on_subscribe(self, client, mid, qos, properties):
        print('mqtt_client callback_funtion : [{}] on'.format(self._on_subscribe.__name__), flush=True)

    def _on_unsubscribe(self, topic):
        print('mqtt_client callback_funtion : [{}] on'.format(self._on_unsubscribe.__name__), flush=True)

    def _on_message(self, client, topic, payload, qos, properties):
        print('mqtt_client callback_funtion : [{}] on'.format(self._on_message.__name__), flush=True)
        print('for debugging! -> topic : [{}], payload : [{}], '
              'qos : [{}], properties : [{}] ,'.format(topic, payload, qos, properties), flush=True)

    async def run_main(self):
        task_1 = asyncio.create_task(self._mqtt_agent.main())
        await task_1

    def run_subscribe(self, topic: str, qos: int = 0, no_local=False,
                      retain_as_published=False, retain_handling_options=0, **kwargs):
        """ subscribe 필수 조건 : topic 명 """
        subscription_identifier = kwargs.get('subscription_identifier')
        self._mqtt_agent.run_subscribe(topic, qos, no_local, retain_as_published, retain_handling_options)

    def run_publish(self, topic: str, payload: Union[dict, str, None], qos: int = 0, properties: tuple = None):
        """ publish 필수조건 : topic 명"""
        self._mqtt_agent.run_publish(topic=topic, payload=payload, qos=qos, retain=False, properties=properties)

    def get_client_id(self):
        return self._client_id


if __name__ == "__main__":
    print(1)
    mqtt_agent = MqttAgent(server_url="172.24.33.83", server_port=1883,
                           auth_id='test_id', auth_password='test_password',
                           _client_id='client_id')
    print(2)
    loop = asyncio.get_event_loop()
    print(3)
    async def gather_task():
        async def mqtt_test():
            await asyncio.sleep(0.001) # run_main(callback 등록 및 mqtt connect) 이 끝나고 나서 mqtt_test (mqtt의 pub, sub)가 실행 되야 되는데, connect 되기도 전에 pub, sub 하려해서 에러 발생 -> 싱크 맞춰주기 위해서 0.001초 슬립함.
            _set_topic_name = ('/{}/#'.format(mqtt_agent.get_client_id()))
# (self, topic: str, qos: int = 0, no_local=False, retain_as_published=False, retain_handling_options=0, ** kwargs):
            mqtt_agent.run_subscribe(topic=_set_topic_name, qos=0, no_local=False, retain_as_published=False,
                                     retain_handling_options=0, subscription_identifier='1')
            while True:
                await asyncio.sleep(3)
                mqtt_agent.run_publish('/test_publish/topic_1', "{'test1': 'hi1'}")
                b = {'test2': 'hi2'}
                mqtt_agent.run_publish('/test_publish/topic_2', b)
        task_1 = asyncio.create_task(mqtt_agent.run_main())
        task_2 = asyncio.create_task(mqtt_test())
        await asyncio.gather(
            task_1,
            task_2
        )

    print(4)
    loop.run_until_complete(gather_task())
    print(5) # 5 발생 하면 에러

 

 

 

'네트워크 정리 > 통신관련(MQTT,ssh,rosbridge)' 카테고리의 다른 글

ssh 관련정리  (0) 2022.10.09
gmqtt 예제 모음 저장  (0) 2022.09.02
gmqtt ( Retain Message, Clean Session and Qos Table )  (0) 2022.06.29
MQTTbox 사용법  (0) 2022.06.14
rosbridge  (0) 2022.05.01

댓글