참고
https://github.com/wialon/gmqtt
GitHub - wialon/gmqtt: Python MQTT v5.0 async client
Python MQTT v5.0 async client . Contribute to wialon/gmqtt development by creating an account on GitHub.
github.com
EMQX 라는 브로커를 실행시켜주고 (roscore 실행하듯이),
gmqtt라는 python lib 를 사용해서 client_id 를 등록할 수 가 있다.
pip3 install gmqtt
이거로 lib 부터 다운 받아야 됨.
일단 mqtt 는 blocking 과 non-blocking 이 있는데 blocking은 모스키또 같은 것들
지금 하려는 EMQX는 non-blocking 이다.
EMQX 를 python code 로 제어하기위해서 gmqtt 라는 lib를 쓰는 모양세임
그래서 EMQX 사이트에 가면 가장큰 특징으로 "MQTTv5.0 버전에 비동기 클라이언트" 라고 소개함.
밑에 예제는 지혼자 커넥하고 지혼자 보내고 지혼자 받는 코드임
그래도 pub sub connect 의 목적지는 전부 EMQX 이다\
기본개념
https://www.hivemq.com/blog/mqtt-essentials-part-3-client-broker-connection-establishment/

리턴코드 : 0 = 연결 수락됨.
1~5 연결 거부
일단 코드먼저
따로 동작은 안해봄
client.py 코드
import asyncio
import os
import signal
import time
from gmqtt import Client as MQTTClient
# gmqtt also compatibility with uvloop
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
STOP = asyncio.Event()
def on_connect(client, flags, rc, properties):
print('Connected')
client.subscribe('TEST/#', qos=0)
def on_message(client, topic, payload, qos, properties):
print('RECV MSG:', payload)
def on_disconnect(client, packet, exc=None):
print('Disconnected')
def on_subscribe(client, mid, qos, properties):
print('SUBSCRIBED')
def ask_exit(*args):
STOP.set()
async def main(broker_host, token):
client = MQTTClient("client-id")
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.on_subscribe = on_subscribe
client.set_auth_credentials(token, None)
await client.connect(broker_host)
client.publish('TEST/TIME', str(time.time()), qos=1)
await STOP.wait()
await client.disconnect()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
host = 'mqtt.flespi.io'
token = os.environ.get('FLESPI_TOKEN')
loop.add_signal_handler(signal.SIGINT, ask_exit)
loop.add_signal_handler(signal.SIGTERM, ask_exit)
loop.run_until_complete(main(host, token))
코드랑 주석으로 설명 달아놓은거임. 이거보삼
전체 설명하자면
1. client callback 함수 등록
ex) connect 되면 on_connect 함수가 콜백 됨
subscribe 되면 on_subscribe함수가 콜백됨

2. set_auth_credentials( token ) 이부분으로 접근권한체크? 하는 개념이다. (로그인 정도?)
해석하자면 인증 정보 설정 인데. token으로도 가능하고, id,pwd를 이용해서 많이한다.
3. await client.connect(broker_host) 이거로 1차연결. broker_host=='mqtt.flespi.io' 인데, 이런 형식 외에도 ip(host),port 로도 connect 많이한다.
4. client.publish('TEST/TIME', str(time.time()), qos=1) 이거는 payload 보내는거임 (메세지보냄)
일단 Topic이 같으면 보냄. client라고 subscribe만 하는건 아니다.
"publish(토픽,데이터,qos레벨,user_properties)" 인자는 이렇게 있다. ROS에 actionlib에 보면 feedback 느낌으로 사용할 수 도 있고(현재 상태에 대한 중간보고?), 뭐 암튼 이러저러하게 가능.
5. await STOP.wait()
6. await client.disconnect()
두개 같이 설명하자면, asyncio.Event().wait() <- 원형이고 딱히 뭐 말할 거 없을듯 보이는대로다.
기본적으로 MQTT클라이언트는 연결이 끊긴 후에 항상 다시 연결을 무제한으로 시도한다. <- 하자면 이정도
client.py 코드 ( 설명도 같이 )
import asyncio
import os
import signal
import time
from gmqtt import Client as MQTTClient
## 1. uvloop 란 ?
## asyncio 의 low레벨 library 성능이 빠르다고 함.
## libuv는 nodejs에서 사용하는 고성능 멀티 플랫폼 비동기 I / O 라이브러리
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
## 2.
"""
coroutine
wait()
주로 이벤트가 켜지는걸 감시(wating)할 때 사용 // client
이벤트가 설정될 때까지 기다립니다.
이벤트가 설정되었으면 True를 즉시 반환합니다. 그렇지 않으면 다른 태스크가 set()을 호출할 때까지 블록합니다.
set()
주로 이벤트 waiting 블록을 멈추고 싶을 때 사용
이벤트를 설정합니다.
이벤트가 설정되기를 기다리는 모든 태스크는 즉시 깨어납니다.
clear()
주로 다시 wait 상태로 만들고 싶을 때 사용
이벤트를 지웁니다 (재설정).
이제 wait()를 기다리는 태스크는 set() 메서드가 다시 호출될 때까지 블록 됩니다.
is_set()
주로 "if is_set() : asyncio.event.clear()" 이런식으로 clear용도 조건으로 사용
이벤트가 설정되면 True를 반환합니다.
"""
STOP = asyncio.Event()
def on_connect(client, flags, rc, properties):
print('Connected')
client.subscribe('TEST/#', qos=0)
def on_message(client, topic, payload, qos, properties):
print('RECV MSG:', payload)
def on_disconnect(client, packet, exc=None):
print('Disconnected')
def on_subscribe(client, mid, qos, properties):
print('SUBSCRIBED')
def ask_exit(*args):
STOP.set()
async def main(broker_host, token):
## client 가 MQTTClient 이다.
client = MQTTClient("client-id")
## client 객체에 관련 기능들 엮어줌. 여기다 보통 로그 띄운다. info같은거
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.on_subscribe = on_subscribe
"""
async def connect(self, id=None, pwd=None):
if pwd:
self.client.set_auth_credentials(id if id else self.client_id, pwd)
await self.client.connect(host=self.host, port=self.port, ssl=False, keepalive=10)
다른 곳에선 이렇게도 씀. pwd 가 설정되있으면 id로 로그인(삼항연산자로) 하고,
pwd 설정 안되있으면, host(ip),port 로 타고 들어가서 connect 해라~ 란뜻
"""
## 여기서는 token or host로 connect 하는것.
client.set_auth_credentials(token, None)
await client.connect(broker_host)
## publish(토픽,데이터,qos레벨,user_properties) 이런식으로 넣음
## 프로퍼티는 기타정보(키-값)를 제공하는데 사용한다.(튜플형식으로 보낸다) <- 보통 프로퍼티객체를 이용해서 유저정보라든가 형태정보를 보냄.
client.publish('TEST/TIME', str(time.time()), qos=1)
## asyncio.Event().wait()
## 기본적으로 MQTT클라이언트는 연결이 끊긴 후에 항상 다시 연결을 시도한다. 무제한으로
## 따라서 무제한시도를 안하려면 client.set_config({'~~': 10, '~~' : 60}) 이런 함수로 제어해야함.
await STOP.wait()
await client.disconnect()
if __name__ == '__main__':
## main 루프에서
loop = asyncio.get_event_loop()
## 다른 곳에선 host,token으로 하기 보다는 id , ip(host) , port 로도 많이 연동시킨다.
host = 'mqtt.flespi.io'
token = os.environ.get('FLESPI_TOKEN')
## SIGINT 는 Ctrl + C, SIGTERM은 kill로 프로세스 종료시
## 위 두가지 경우 프로그램 종료.
loop.add_signal_handler(signal.SIGINT, ask_exit)
loop.add_signal_handler(signal.SIGTERM, ask_exit)
"""
await STOP.wait()
await client.disconnect()
이게 포함된 함수를 인자로 보통 넣음. 말그대로 인자에 들어간 함수가 전부 수행될때 까지 RUN
그래서 Event발생하기 전까지는 wait상태이다가, 발생시에 동작하고 연결종료
"""
loop.run_until_complete(main(host, token))
'네트워크 정리 > 통신관련(MQTT,ssh,rosbridge)' 카테고리의 다른 글
gmqtt 예제 모음 저장 (0) | 2022.09.02 |
---|---|
gmqtt ( Retain Message, Clean Session and Qos Table ) (0) | 2022.06.29 |
MQTTbox 사용법 (0) | 2022.06.14 |
rosbridge (0) | 2022.05.01 |
MQTT(Mosquitto) (0) | 2022.04.19 |
댓글