外链论坛

 找回密码
 立即注册
搜索
查看: 16|回复: 0

MQTT 与 EMQ快速入门

[复制链接]

3004

主题

2万

回帖

9606万

积分

论坛元老

Rank: 8Rank: 8

积分
96066008
发表于 2024-10-3 12:57:47 | 显示全部楼层 |阅读模式

MQTT 与 EMQ

1. 物联网信息协议MQTT

1.1. MQTT简介

MQTT(Message Queuing Telemetry Transport,信息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议。

客户端服务端一个运用MQTT协议的应用程序设备,它总是创立到服务器的网络连接。客户端能够叫作为"信息代理"(Broker),能够是一个应用程序或一台设备。它是位置于信息发布者和订阅者之间,它能够:- (1)发布其他客户端可能会订阅的信息;- (1)接受来自客户的网络连接;- (2)订阅其它客户端发布的信息;- (2)接受客户发布的应用信息;- (3)退订或删除应用程序的信息;- (3)处理来自客户端的订阅和退订请求;- (4)断开与服务器连接。- (4)向订阅的客户转发应用程序信息

设计原则:

①精简,不添加可有可无的功能;

②发布/订阅(Pub/Sub)模式,方便信息在传感器之间传递,解耦Client/Server模式,带来的好处在于不必预先晓得对方的存在(ip/port),不必同期运行;

准许用户动态创建主题(不需要预先创建主题),零运维成本;

④把传输量降到最低以加强传输效率;

⑤把低带宽、高延迟、不稳定的网络等原因思虑在内;

⑥支持连续的会话保持和掌控(心跳);

⑦理解客户端计算能力可能很低;

供给服务质量( quality of service level:QoS)管理

⑨不强求传输数据的类型与格式,保持灵活性(指的是应用层业务数据)。

MQTT应用行业

①物联网M2M通信,物联网大数据采集

②Android信息推送,WEB信息推送

③移动即时信息,例如Facebook Messenger

②智能硬件、智能家居、智能电器

⑤车联网通信,电动车站桩采集

⑥智慧城市、远程医疗、远程教育

⑦电力、石油与能源等行业市场

1.2 MQTT协议关联概念

MQTT协议形成

MQTT协议中的办法

- (1)CONNECT:客户端连接到服务器- (9)SUBACK:订阅确认- (2)CONNACK:连接确认- (10)UNSUBSCRIBE:取消订阅- (3)PUBLISH:发布信息- (11)UNSUBACK:取消订阅确认- (4)PUBACK:发布确认- (12)PINGREQ:客户端发送心跳- (5)PUBREC:发布的信息已接收- (13)PINGRESP:服务端心跳响应- (6)PUBREL:发布的信息已释放- (14)DISCONNECT:断开连接- (7)PUBCOMP:发布完成- (15)AUTH:认证- (8)SUBSCRIBE:订阅请求

1.3. 信息服务质量QOS

MQTT 协议中规定了信息服务质量(Quality of Service),它保准了在区别的网络环境下信息传递的靠谱性,QoS 的设计是 MQTT 协议里的重点。

MQTT 设计了 3 个 QoS 等级。

- QoS 0:信息最多传递一次。

- QoS 1:信息传递最少 1 次。

- QoS 2:信息仅传送一次。

QoS0:"至多一次",信息发布完全依赖底层TCP/IP网络。会出现信息丢失。一个信息不会被接收端应答,不会被发送者存储并再发送。这个被叫做“即发即弃”

QoS1:"最少一次",保证信息到达,但信息重复可能会出现。发送者将会存储发送的信息直到发送者收到一次来自接收者的PUBACK格式的应答。

QoS2:"仅有一次",保证信息到达一次。倘若接收端接收到了一个QoS为2的PUBLISH信息,它将相应地处理PUBLISH信息,并经过PUBREC信息向发送方确认。

PUBLISH:发布信息 PUBREC:发布收到 PUBREL:发布释放 PUBCOMP:发布完成

发送订阅区别QOS状况下是怎样生效的

1.4. topic通配符匹配规则

2. 物联网级信息中间件EMQ

2.1. EMQX 简介

EMQ X Broker 是基于高并发的 Erlang/OTP 语言平台研发,支持百万级连接和分布式集群架构,发布订阅模式的开源 MQTT 信息服务器。

EMQ官网:https://www.emqx.cn/

为何选取EMQ X ?从支持 MQTT5.0、稳定性、扩展性、集群能力等方面思虑,EMQX 的表现应该是最好的。

EMQX的特点

EMQ X 日前为开源社区中最流行的 MQTT 信息中间件;EMQ X 是开源社区中第1个支持 5.0协议规范的信息服务器,并且完全兼容 MQTT V3.1 和 V3.1.1 协议。除了 MQTT 协议之外,EMQ X 还支持MQTT-SN、CoAP、 LwM2M、LoRaWAN 和 WebSocket 等物联网协议单机支持百万连接,集群支持千万级连接;毫秒级信息转发。易于安装和运用;中国本地的技术支持服务;扩展模块和插件,EMQ X 供给了灵活的扩展机制,支持企业的有些定制场景;桥接共享订阅

2.2. 环境搭建与配置

Docker 运行与安装

拉取emqx镜像

[root@docker emqx]# docker pull emqx/emqx:v4.1.0

创建emqx容器

[root@docker emqx]# docker run -tid --name emqx -p 1883:1883 -p 8083:8083 -p 8081:8081 -p 8883:8883 -p 8084:8084 -p 18083:18083  emqx/emqx:v4.1.0

EMQ X 目录结构

2.3. EMQDashboard

EMQ X 供给了 Dashboard 以方便用户管理设备与监控关联指标。经过 Dashboard能够查看服务器基本信息、负载状况和统计数据,能够查看某个客户端的连接状态等信息乃至断开其连接,能够动态加载和卸载指定插件。

拜访位置 http://192.168.150.102:18083 来查看Dashboard,**默认用户名是 admin,秘码是 public*

2.4. 客户端调试工具MQTTX

MQTT X 是 EMQ 开源的一款优雅的跨平台 MQTT 5.0 桌面客户端,它支持 macOS, Linux, Windows。

MQTT X 的 UI 采用了聊天界面形式,简化了页面操作规律,用户能够快速创建连接,准许保留多个客户端,方便用户快速测试 MQTT/MQTTS 连接,及 MQTT 信息的订阅和发布。

2.5. 延迟信息

EMQ X 的延迟发布功能能够实现根据用户配置的时间间隔延迟发布 PUBLISH 报文的功能。模块开启emqx_mod_delayed

延迟发布主题的详细格式如下:

$delayed

/{DelayInterval}/{TopicName}

- 运用delayed 做为主题前缀的信息都将被视为需要延迟发布的信息

- {DelayInterval}: 指定该 MQTT 信息延迟发布的时间间隔,单位是秒,准许的最大间隔是 4294967 秒。

- {TopicName}: MQTT 信息的主题名叫作

1.在Websocket上订阅主题:t2/a

2.在Websocket上发布信息主题:topic:  $delayed/10/t2/a

观察运行效果

2.6. 共享订阅

不带群组的共享订阅

格式:

$queue

/{TopicName}

带群组的共享订阅

格式:

$share

//{TopicName}

3.Eclipse Paho

3.1. Eclipse Paho是什么

Eclipse paho 是EMQx官方举荐的实现了mqtt协议java客户端。

其关系类似于Mysql于JDBC,咱们的项目代码要连接数据库需要用到JDBC,而咱们的项目需要连接EMQX需要用到Eclipse Paho  ,并且它供给基本信息收发。

3.2. Eclipse Paho技术调研

集成Eclipse paho

<dependency>    <groupId>org.eclipse.paho</groupId>    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>    <version>1.2.5</version></dependency>

发布信息到EMQ

(1)搭建基本的springBoot程序。

(2)编写controller,新增发布信息办法

@GetMapping("/publish"

)

public void publish() throws MqttException 

{

    MqttClientPersistence persistence = new MemoryPersistence();;//内存持久化MqttClient client =new MqttClient("tcp://192.168.200.128:1883""abc"

, persistence);

    //连接选项中定义用户名秘码和其它配置    MqttConnectOptions options = new

 MqttConnectOptions();

options.setCleanSession(true);//参数为true暗示清除缓存,便是非持久化订阅者,这个时候只要参数设为true,必定是非持久化订阅者。而参数设为false时,暗示服务器保存客户端的连接记录    options.setAutomaticReconnect(true);//是不是自动重连options.setConnectionTimeout(30);//连接超时时间  秒    options.setKeepAliveInterval(10);//连接保持检测周期  秒    options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本client.connect(options);//连接    client.publish("topic""发送内容".getBytes(), 2false

);

}

订阅信息

在controller新增办法,订阅信息

@GetMapping("/subscribe"

)

public void subscribe() throwsMqttException

{

    MqttClientPersistence persistence = new MemoryPersistence();;//内存持久化    MqttClient client = new MqttClient("tcp://192.168.200.128:1883""abc"

, persistence);

    //连接选项中定义用户名秘码和其它配置    MqttConnectOptions options = new

 MqttConnectOptions();

    options.setCleanSession(true);//参数为true暗示清除缓存,便是非持久化订阅者,这个时候只要参数设为true,必定是非持久化订阅者。而参数设为false时,暗示服务器保存客户端的连接记录    options.setAutomaticReconnect(true);//是不是自动重连    options.setConnectionTimeout(30);//连接超时时间  秒options.setKeepAliveInterval(10);//连接保持检测周期  秒    options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本    client.setCallback(new

 MqttCallbackExtended() {

        @Override        public void connectionLost(Throwable throwable) 

{

            System.out.println("连接丢失!"

);

        }

        @Override        public void messageArrived(String s, MqttMessage mqttMessage) throwsException

{

            System.out.println( "接收到信息  topic:" +s+"  id:"+mqttMessage.getId() +" message:"

+ mqttMessage.toString());

        }

        @Override        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) 

{

        }

        @Override        public void connectComplete(boolean b, String s) 

{

            System.out.println("连接成功!"

);

        }

    });

    client.connect(options);//连接    client.subscribe("test");  //订阅主题

}

倘若您觉得本文不错,欢迎关注,点赞,保藏支持

,您的关注是我保持的动力!

原创很难,转载请注明出处,感谢支持!倘若本文对您有用,欢迎转发分享!

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

站点统计|Archiver|手机版|小黑屋|外链论坛 ( 非经营性网站 )|网站地图

GMT+8, 2024-11-9 05:53 , Processed in 0.066527 second(s), 19 queries .

Powered by Discuz! X3.4

Copyright © 2001-2023, Tencent Cloud.