전제로
wsl : 버전 2 ( 버전2 필요함 docker 랑 쓰려면 )
리눅스 : 우분투 20.04 LTS
python : 3.8 version
lib : uvloop, gmqtt 설치 필요
docker 에 EMQX 구현 후, 셋업까지 해서 컨테이너 start 상태
TEST 확인용 프로그램 : MQTT BOX
코드 설명 :
3초마다 특정 topic으로 publish (2개) -> mqtt box로 sub해서 확인
특정 topic sub -> mqtt box 로 publish 해서 message 찍어보는 TEST 코드
agent 파일 밑에 __main__ 코드로 실행
mqtt box 결과창
mqtt_client.py 코드
# !/usr/bin/python
# -*- coding: utf-8 -*-
#----------------------------------------------------------------------
import os
import sys
path = os.path.abspath(os.path.dirname(sys.argv[0]))
sys.path.append(path[:path.rfind('/')])
#----------------------------------------------------------------------
import asyncio
from gmqtt.client import Client as MQTTClient
from gmqtt.client import Message
from gmqtt.client import Subscription
from gmqtt.client import SubscriptionsHandler
import uvloop
from dataclasses import dataclass
from typing import Any, Union, Sequence
@dataclass
class MqttConfigInfo:
server_url: str
server_port: Union[int, str]
auth_id: str
auth_password: str
client_id: str
class MakeMqttParamType:
def __init__(self):
pass
def __call__(self, server_url: str, server_port: Union[int, str], auth_id: str, auth_password: str,
client_id: str) -> MqttConfigInfo:
if isinstance(server_port, str):
server_port = int(server_port)
_maker_mqtt_info = MqttConfigInfo(
server_url=server_url,
server_port=server_port,
auth_id=auth_id,
auth_password=auth_password,
client_id=client_id,
)
return _maker_mqtt_info
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
STOP = asyncio.Event()
class MqttClient(object):
"""
gmqtt lib를 이용해서 EMQX에 접속 및 data송수신을 위한 class
"""
def __init__(self, mqtt_info: MqttConfigInfo):
self._on_connect = None
self.mqtt_info = mqtt_info
self._client = MQTTClient(client_id=self.mqtt_info.client_id,
clean_session=True,
optimistic_acknowledgement=True,
will_message=None,
maximum_packet_size=2000000,
receive_maximum=24000)
def ask_exit(self):
STOP.set()
async def connect_emqx(self):
print('trying connect_EMQX NOW', flush=True)
self._client.set_auth_credentials(username=self.mqtt_info.auth_id,
password=self.mqtt_info.auth_password)
await self._client.connect(host=self.mqtt_info.server_url,
port=self.mqtt_info.server_port,
ssl=False, keepalive=10)
print('trying connect_EMQX END ', flush=True)
# 서버접속 해제
async def disconnect_emqx(self):
await self._client.disconnect()
async def main(self):
"""
콜백함수id = 커스텀콜백함수id
getter setter 구조 , 코드 : if not callable(cb): 로 조건 체크해서 set해줌.
self._client.on_connect = self.on_connect 이거는
== self._client.on_connect(self.on_connect) 랑 같음 "@_on_connect.setter" 이니까
"""
self._client.on_connect = self._on_connect
self._client.on_message = self._on_message
self._client.on_disconnect = self._on_disconnect
self._client.on_subscribe = self._on_subscribe
self._client.on_unsubscribe = self._on_unsubscribe
await self.connect_emqx()
await STOP.wait()
print('mqtt main close now', flush=True)
await self.disconnect_emqx()
# def subscribe(self, subscription_or_topic: Union[str, Subscription, Sequence[Subscription]],
# qos=0, no_local=False, retain_as_published=False, retain_handling_options=0, **kwargs):
def run_subscribe(self, topic: str, qos: int = 0, no_local=False, retain_as_published=False, retain_handling_options=0):
""" topic 명으로 안넣고 subscription형식으로 진행함 """
print("mqtt_client subscribe this [{}] topic".format(topic),flush=True)
self._client.subscribe(topic, qos)
# 이부분은 구현을 아직 못함 emqx 식별자쓰려고 한건데 좀더 헤딩이 필요함
# make_subscription_list = SubscriptionsHandler()
# subscriptions_list = make_subscription_list.update_subscriptions_with_subscription_or_topic(topic, qos,
# no_local,
# retain_as_published,
# retain_handling_options)
# print('11111111111111111',type(subscriptions_list))
# self._client.subscribe(subscriptions_list)
# def unsubscribe(self, topic: Union[str, Sequence[str]], **kwargs):
def run_unsubscribe(self, topic):
"""지금은 안쓰는중"""
self._client.unsubscribe(topic=topic)
# def publish(self, message_or_topic, payload=None, qos=0, retain=False, **kwargs):
def run_publish(self, topic: str, payload: Union[dict, str, None], qos: int = 0, retain=False, properties: tuple = None):
"""
payload 에 지원하는 형시은 list, tuple, dict, int, float, str, None 이지만,
dict, json형식의 str, None 만 지원하게끔 제한함
payload의 내부 형식은 json 형식이여야 하고 len으로 데이터를 셀 수 있어야함
"""
if not self._client.is_connected:
print("EMQX not connected")
return -1
if len(payload) < 268435455:
publish_msg_type = Message(topic=topic, payload=payload, qos=qos, retain=retain, properties=properties)
self._client.publish(publish_msg_type)
print('published now ')
else:
print('run_publish parameter Error, topic :', topic, flush=True)
# Callback function 등록
# def register_callback_func(self, _event: str, _handler: callable()):
def register_callback_func(self, _event: str, _handler: callable):
if isinstance(_event, str) or callable(_handler):
print('register the callback [{}]'.format(_event), flush=True)
else:
raise ValueError
if 'connect' == _event:
self._on_connect = _handler
elif 'message' == _event:
self._on_message = _handler
elif 'disconnect' == _event:
self._on_disconnect = _handler
elif 'subscribe' == _event:
self._on_subscribe = _handler
elif 'unsubscribe' == _event:
self._on_unsubscribe = _handler
def _on_connect(self, client, flags, rc, properties):
print('mqtt_client callback_funtion : [{}] on'.format(self._on_connect.__name__), flush=True)
def _on_disconnect(self, client, packet, exc=None):
print('mqtt_client callback_funtion : [{}] on'.format(self._on_disconnect.__name__), flush=True)
def _on_subscribe(self, client, mid, qos, properties):
print('mqtt_client callback_funtion : [{}] on'.format(self._on_subscribe.__name__), flush=True)
def _on_unsubscribe(self, topic):
print('mqtt_client callback_funtion : [{}] on'.format(self._on_unsubscribe.__name__), flush=True)
def _on_message(self, client, topic, payload, qos, properties):
print('mqtt_client callback_funtion : [{}] on'.format(self._on_message.__name__), flush=True)
mqtt_agent.py 코드
import asyncio
import time
import os
import sys
path = os.path.abspath(os.path.dirname(sys.argv[0]))
sys.path.append(path[:path.rfind('/')])
# ----------------------------------------------------------------------
from mqtt_client import MqttClient, MakeMqttParamType
from typing import Union
class MqttAgent(object):
"""
"""
def __init__(self, server_url="192.168.0.94", server_port=1883, auth_id='connect_mqtt_id',
auth_password='connect_mqtt_password', _client_id='client_id'):
self._connect_on_flag = None
self._client_id = _client_id
self._make_mqtt_param_type = MakeMqttParamType()
self._mqtt_agent = MqttClient(
self._make_mqtt_param_type(server_url=server_url, server_port=server_port,
auth_id=auth_id, auth_password=auth_password,
client_id=self._client_id)
)
self._set_up()
def ask_exit(self):
self._mqtt_agent.ask_exit()
def _set_up(self):
"""
emqx 접속정보, sub정보, pub정보를 전달
agnet쪽에서 연결
:return:
"""
self._ready()
def _ready(self):
"""
agent쪽에 비동기 콜백함수를 manager쪽에서 관리
:return:
"""
self._mqtt_agent.register_callback_func('connect', self._on_connect)
self._mqtt_agent.register_callback_func('message', self._on_message)
self._mqtt_agent.register_callback_func('disconnect', self._on_disconnect)
self._mqtt_agent.register_callback_func('subscribe', self._on_subscribe)
self._mqtt_agent.register_callback_func('unsubscribe', self._on_unsubscribe)
def _on_connect(self, client, flags, rc, properties):
print('mqtt_client callback_funtion : [{}] on'.format(self._on_connect.__name__), flush=True)
_connect_on_flag = True
def _on_disconnect(self, client, packet, exc=None):
print('mqtt_client callback_funtion : [{}] on'.format(self._on_disconnect.__name__), flush=True)
self._connect_on_flag = False
def _on_subscribe(self, client, mid, qos, properties):
print('mqtt_client callback_funtion : [{}] on'.format(self._on_subscribe.__name__), flush=True)
def _on_unsubscribe(self, topic):
print('mqtt_client callback_funtion : [{}] on'.format(self._on_unsubscribe.__name__), flush=True)
def _on_message(self, client, topic, payload, qos, properties):
print('mqtt_client callback_funtion : [{}] on'.format(self._on_message.__name__), flush=True)
print('for debugging! -> topic : [{}], payload : [{}], '
'qos : [{}], properties : [{}] ,'.format(topic, payload, qos, properties), flush=True)
async def run_main(self):
task_1 = asyncio.create_task(self._mqtt_agent.main())
await task_1
def run_subscribe(self, topic: str, qos: int = 0, no_local=False,
retain_as_published=False, retain_handling_options=0, **kwargs):
""" subscribe 필수 조건 : topic 명 """
subscription_identifier = kwargs.get('subscription_identifier')
self._mqtt_agent.run_subscribe(topic, qos, no_local, retain_as_published, retain_handling_options)
def run_publish(self, topic: str, payload: Union[dict, str, None], qos: int = 0, properties: tuple = None):
""" publish 필수조건 : topic 명"""
self._mqtt_agent.run_publish(topic=topic, payload=payload, qos=qos, retain=False, properties=properties)
def get_client_id(self):
return self._client_id
if __name__ == "__main__":
print(1)
mqtt_agent = MqttAgent(server_url="172.24.33.83", server_port=1883,
auth_id='test_id', auth_password='test_password',
_client_id='client_id')
print(2)
loop = asyncio.get_event_loop()
print(3)
async def gather_task():
async def mqtt_test():
await asyncio.sleep(0.001) # run_main(callback 등록 및 mqtt connect) 이 끝나고 나서 mqtt_test (mqtt의 pub, sub)가 실행 되야 되는데, connect 되기도 전에 pub, sub 하려해서 에러 발생 -> 싱크 맞춰주기 위해서 0.001초 슬립함.
_set_topic_name = ('/{}/#'.format(mqtt_agent.get_client_id()))
# (self, topic: str, qos: int = 0, no_local=False, retain_as_published=False, retain_handling_options=0, ** kwargs):
mqtt_agent.run_subscribe(topic=_set_topic_name, qos=0, no_local=False, retain_as_published=False,
retain_handling_options=0, subscription_identifier='1')
while True:
await asyncio.sleep(3)
mqtt_agent.run_publish('/test_publish/topic_1', "{'test1': 'hi1'}")
b = {'test2': 'hi2'}
mqtt_agent.run_publish('/test_publish/topic_2', b)
task_1 = asyncio.create_task(mqtt_agent.run_main())
task_2 = asyncio.create_task(mqtt_test())
await asyncio.gather(
task_1,
task_2
)
print(4)
loop.run_until_complete(gather_task())
print(5) # 5 발생 하면 에러
'네트워크 정리 > 통신관련(MQTT,ssh,rosbridge)' 카테고리의 다른 글
ssh 관련정리 (0) | 2022.10.09 |
---|---|
gmqtt 예제 모음 저장 (0) | 2022.09.02 |
gmqtt ( Retain Message, Clean Session and Qos Table ) (0) | 2022.06.29 |
MQTTbox 사용법 (0) | 2022.06.14 |
rosbridge (0) | 2022.05.01 |
댓글