MQTT 与 EMQ
1. 物联网信息协议MQTT
1.1. MQTT简介
MQTT(Message Queuing Telemetry Transport,信息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议。 客户端服务端一个运用MQTT协议的应用程序或设备,它总是创立到服务器的网络连接。客户端能够:亦叫作为"信息代理"(Broker),能够是一个应用程序或一台设备。它是位置于信息发布者和订阅者之间,它能够:- (1)发布其他客户端可能会订阅的信息;- (1)接受来自客户的网络连接;- (2)订阅其它客户端发布的信息;- (2)接受客户发布的应用信息;- (3)退订或删除应用程序的信息;- (3)处理来自客户端的订阅和退订请求;- (4)断开与服务器连接。- (4)向订阅的客户转发应用程序信息。设计原则:
①精简,不添加可有可无的功能;
②发布/订阅(Pub/Sub)模式,方便信息在传感器之间传递,解耦Client/Server模式,带来的好处在于不必预先晓得对方的存在(ip/port),不必同期运行;
③准许用户动态创建主题(不需要预先创建主题),零运维成本;
④把传输量降到最低以加强传输效率;
⑤把低带宽、高延迟、不稳定的网络等原因思虑在内;
⑥支持连续的会话保持和掌控(心跳);
⑦理解客户端计算能力可能很低;
⑧供给服务质量( quality of service level:QoS)管理
⑨不强求传输数据的类型与格式,保持灵活性(指的是应用层业务数据)。
MQTT应用行业:
①物联网M2M通信,物联网大数据采集
②Android信息推送,WEB信息推送
③移动即时信息,例如Facebook Messenger
②智能硬件、智能家居、智能电器
⑤车联网通信,电动车站桩采集
⑥智慧城市、远程医疗、远程教育
⑦电力、石油与能源等行业市场
1.2 MQTT协议关联概念
MQTT协议形成
MQTT协议中的办法 - (1)CONNECT:客户端连接到服务器- (9)SUBACK:订阅确认- (2)CONNACK:连接确认- (10)UNSUBSCRIBE:取消订阅- (3)PUBLISH:发布信息- (11)UNSUBACK:取消订阅确认- (4)PUBACK:发布确认- (12)PINGREQ:客户端发送心跳- (5)PUBREC:发布的信息已接收- (13)PINGRESP:服务端心跳响应- (6)PUBREL:发布的信息已释放- (14)DISCONNECT:断开连接- (7)PUBCOMP:发布完成- (15)AUTH:认证- (8)SUBSCRIBE:订阅请求1.3. 信息服务质量QOS
MQTT 协议中规定了信息服务质量(Quality of Service),它保准了在区别的网络环境下信息传递的靠谱性,QoS 的设计是 MQTT 协议里的重点。
MQTT 设计了 3 个 QoS 等级。
- QoS 0:信息最多传递一次。
- QoS 1:信息传递最少 1 次。
- QoS 2:信息仅传送一次。
QoS0:"至多一次",信息发布完全依赖底层TCP/IP网络。会出现信息丢失。一个信息不会被接收端应答,亦不会被发送者存储并再发送。这个亦被叫做“即发即弃”
QoS1:"最少一次",保证信息到达,但信息重复可能会出现。发送者将会存储发送的信息直到发送者收到一次来自接收者的PUBACK格式的应答。
QoS2:"仅有一次",保证信息到达一次。倘若接收端接收到了一个QoS为2的PUBLISH信息,它将相应地处理PUBLISH信息,并经过PUBREC信息向发送方确认。
PUBLISH:发布信息
PUBREC:发布收到
PUBREL:发布释放
PUBCOMP:发布完成
发送订阅区别QOS状况下是怎样生效的
1.4. topic通配符匹配规则
2. 物联网级信息中间件EMQ
2.1. EMQX 简介
EMQ X Broker 是基于高并发的 Erlang/OTP 语言平台研发,支持百万级连接和分布式集群架构,发布订阅模式的开源 MQTT 信息服务器。
EMQ官网:https://www.emqx.cn/
为何选取EMQ X ?从支持 MQTT5.0、稳定性、扩展性、集群能力等方面思虑,EMQX 的表现应该是最好的。
EMQX的特点 EMQ X 日前为开源社区中最流行的 MQTT 信息中间件;EMQ X 是开源社区中第1个支持 5.0协议规范的信息服务器,并且完全兼容 MQTT V3.1 和 V3.1.1 协议。除了 MQTT 协议之外,EMQ X 还支持MQTT-SN、CoAP、 LwM2M、LoRaWAN 和 WebSocket 等物联网协议单机支持百万连接,集群支持千万级连接;毫秒级信息转发。易于安装和运用;中国本地的技术支持服务;扩展模块和插件,EMQ X 供给了灵活的扩展机制,支持企业的有些定制场景;桥接共享订阅2.2. 环境搭建与配置
Docker 运行与安装
拉取emqx镜像
[root@docker emqx]# docker pull emqx/emqx:v4.1.0
创建emqx容器
[root@docker emqx]# docker run -tid --name emqx -p 1883:1883 -p 8083:8083 -p 8081:8081 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:v4.1.0
EMQ X 目录结构
2.3. EMQDashboard
EMQ X 供给了 Dashboard 以方便用户管理设备与监控关联指标。经过 Dashboard能够查看服务器基本信息、负载状况和统计数据,能够查看某个客户端的连接状态等信息乃至断开其连接,亦能够动态加载和卸载指定插件。
拜访位置 http://192.168.150.102:18083 来查看Dashboard,**默认用户名是 admin,秘码是 public*
2.4. 客户端调试工具MQTTX
MQTT X 是 EMQ 开源的一款优雅的跨平台 MQTT 5.0 桌面客户端,它支持 macOS, Linux, Windows。
MQTT X 的 UI 采用了聊天界面形式,简化了页面操作规律,用户能够快速创建连接,准许保留多个客户端,方便用户快速测试 MQTT/MQTTS 连接,及 MQTT 信息的订阅和发布。
2.5. 延迟信息
EMQ X 的延迟发布功能能够实现根据用户配置的时间间隔延迟发布 PUBLISH 报文的功能。模块开启emqx_mod_delayed
延迟发布主题的详细格式如下: $delayed
/{DelayInterval}/{TopicName}
- 运用delayed 做为主题前缀的信息都将被视为需要延迟发布的信息。
- {DelayInterval}: 指定该 MQTT 信息延迟发布的时间间隔,单位是秒,准许的最大间隔是 4294967 秒。
- {TopicName}: MQTT 信息的主题名叫作。
1.在Websocket上订阅主题:t2/a
2.在Websocket上发布信息主题:topic: $delayed/10/t2/a
观察运行效果
2.6. 共享订阅
不带群组的共享订阅
格式: $queue
/{TopicName}
带群组的共享订阅
格式: $share
//{TopicName}
3.Eclipse Paho
3.1. Eclipse Paho是什么
Eclipse paho 是EMQx官方举荐的实现了mqtt协议java客户端。
其关系类似于Mysql于JDBC,咱们的项目代码要连接数据库需要用到JDBC,而咱们的项目需要连接EMQX需要用到Eclipse Paho ,并且它供给了基本的信息收发。
3.2. Eclipse Paho技术调研
集成Eclipse paho <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version></dependency>
发布信息到EMQ
(1)搭建基本的springBoot程序。
(2)编写controller,新增发布信息办法。 @GetMapping("/publish"
) public void publish() throws MqttException
{ MqttClientPersistence persistence = new MemoryPersistence();;//内存持久化MqttClient client =new MqttClient("tcp://192.168.200.128:1883", "abc"
, persistence); //连接选项中定义用户名秘码和其它配置 MqttConnectOptions options = new
MqttConnectOptions(); options.setCleanSession(true);//参数为true暗示清除缓存,亦便是非持久化订阅者,这个时候只要参数设为true,必定是非持久化订阅者。而参数设为false时,暗示服务器保存客户端的连接记录 options.setAutomaticReconnect(true);//是不是自动重连options.setConnectionTimeout(30);//连接超时时间 秒 options.setKeepAliveInterval(10);//连接保持检测周期 秒 options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本client.connect(options);//连接 client.publish("topic", "发送内容".getBytes(), 2, false
);
}
订阅信息
在controller新增办法,订阅信息 @GetMapping("/subscribe"
) public void subscribe() throwsMqttException
{ MqttClientPersistence persistence = new MemoryPersistence();;//内存持久化 MqttClient client = new MqttClient("tcp://192.168.200.128:1883", "abc"
, persistence); //连接选项中定义用户名秘码和其它配置 MqttConnectOptions options = new
MqttConnectOptions(); options.setCleanSession(true);//参数为true暗示清除缓存,亦便是非持久化订阅者,这个时候只要参数设为true,必定是非持久化订阅者。而参数设为false时,暗示服务器保存客户端的连接记录 options.setAutomaticReconnect(true);//是不是自动重连 options.setConnectionTimeout(30);//连接超时时间 秒options.setKeepAliveInterval(10);//连接保持检测周期 秒 options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本 client.setCallback(new
MqttCallbackExtended() { @Override public void connectionLost(Throwable throwable)
{ System.out.println("连接丢失!"
);
} @Override public void messageArrived(String s, MqttMessage mqttMessage) throwsException
{ System.out.println( "接收到信息 topic:" +s+" id:"+mqttMessage.getId() +" message:"
+ mqttMessage.toString());
} @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken)
{
} @Override public void connectComplete(boolean b, String s)
{ System.out.println("连接成功!"
);
}
}); client.connect(options);//连接 client.subscribe("test"); //订阅主题
}
倘若您觉得本文不错,欢迎关注,点赞,保藏支持
,您的关注是我保持的动力!
原创很难,转载请注明出处,感谢支持!倘若本文对您有用,欢迎转发分享!
|