从 v4.1 版本起始,EMQ X 供给了专门的多语言支持插件 emqx_extension_hook (https://github.com/emqx/emqx-extension-hook) ,现已支持运用其他编程语言来处理 EMQ X 中的钩子事件,研发者能够运用 Python 或 Java 快速研发自己的插件,在官方功能的基本上进行扩展,满足自己的业务场景。例如:
验证某客户端的登录权限:客户端连接时触发对应函数,经过参数获取客户端信息后经过读取数据库、比对等操作判定是不是有登录权限
记录客户端在线状态与上下线历史:客户端状态变动时触发对应函数,经过参数获取客户端信息,改写数据库中客户端在线状态
校验某客户端的 PUB/SUB 的操作权限:发布/订阅时触发对应函数,经过参数获取客户端信息与当前主题,判定客户端是不是有对应的操作权限
处理会话 (Sessions) 和 信息 (Message) 事件,实现订阅关系与信息处理/存储:信息发布、状态变动时触发对应函数,获取当前客户端信息、信息状态与消息内容,转发到 Kafka 或数据库进行存储。
注:信息(Message) 类钩子,仅在企业版中支持。
Python 和 Java 驱动基于 Erlang/OTP-Port (https://erlang.org/doc/tutorial/c_port.html)进程间通信实现,本身拥有非常高的吞吐性能,本文以 Java 拓展为例介绍 EMQ X 跨语言拓展运用方式。
Java 拓展运用示例 需求
EMQ X 所在服务器需安装 JDK 1.8 以上版本
起始运用
创建 Java 项目
下载 io.emqx.extension.jar (https://github.com/emqx/emqx-extension-java-sdk/releases)和 erlport.jar (https://github.com/emqx/emqx-extension-java-sdk/blob/master/deps/erlport-v1.1.1.jar) 文件
添加SDK io.emqx.extension.jar和 erlport.jar 到项目依赖
复制 examples/SampleHandler.java到您的项目中
按照 SDK SampleHandler.java 中的示例编写业务代码,保证能够成功编译
安排
编译所有源代码后,需要将 sdk 和代码文件安排到 EMQ X 中:
复制 io.emqx.extension.jar 到 emqx/data/extension 目录
将编译后的 .class 文件,例如 SampleHandler.class 复制到 emqx/data/extension目录
修改 emqx/etc/plugins/emqx_extension_hook.conf 配置文件: exhook.drivers = java## Search path for scripts or libraryexhook.drivers.java.path = data/extension/exhook.drivers.java.init_module = SampleHandler
起步 emqx_extension_hook 插件,倘若配置错误或 Java 代码编写错误将没法正常起步。起步后尝试创立 MQTT 连接并观察业务运行状况。
示例
以下为 SampleHandler.java (https://github.com/emqx/emqx-extension-hook/blob/master/test/scripts/SampleHandler.java)示例程序, 该程序继承自 SDK 中的 DefaultCommunicationHandler 类。该示例代码演示了怎样挂载 EMQ X 系统中所有的钩子: import emqx.extension.java.handler.*;import emqx.extension.java.handler.codec.*;import emqx.extension.java.handler.ActionOptionConfig.Keys;public class SampleHandler extends DefaultCommunicationHandler { @Override public ActionOptionConfig getActionOption() { ActionOptionConfig option = new ActionOptionConfig(); option.set(Keys.MESSAGE_PUBLISH_TOPICS, "#"); option.set(Keys.MESSAGE_DELIVERED_TOPICS, "#"); option.set(Keys.MESSAGE_ACKED_TOPICS, "#"); option.set(Keys.MESSAGE_DROPPED_TOPICS, "#"); return option; } // Clients @Override public void onClientConnect(ConnInfo connInfo, Property[] props) { System.err.printf("[Java] onClientConnect: connInfo: %s, props: %s\n", connInfo, props); } @Override public void onClientConnack(ConnInfo connInfo, ReturnCode rc, Property[] props) { System.err.printf("[Java] onClientConnack: connInfo: %s, rc: %s, props: %s\n", connInfo, rc, props); } @Override public void onClientConnected(ClientInfo clientInfo) { System.err.printf("[Java] onClientConnected: clientinfo: %s\n", clientInfo); } @Override public void onClientDisconnected(ClientInfo clientInfo, Reason reason) { System.err.printf("[Java] onClientDisconnected: clientinfo: %s, reason: %s\n", clientInfo, reason); } // 判定认证结果,返回 true 或 false @Override public boolean onClientAuthenticate(ClientInfo clientInfo,boolean authresult) { System.err.printf("[Java] onClientAuthenticate: clientinfo: %s, authresult: %s\n", clientInfo, authresult); return true; } // 判定 ACL 检测结果,返回 true 或 false @Override public boolean onClientCheckAcl(ClientInfo clientInfo, PubSub pubsub, Topic topic, boolean result) {System.err.printf("[Java] onClientCheckAcl: clientinfo: %s, pubsub: %s, topic: %s, result: %s\n", clientInfo, pubsub, topic, result); return true; } @Override public void onClientSubscribe(ClientInfo clientInfo, Property[] props, TopicFilter[] topic) { System.err.printf("[Java] onClientSubscribe: clientinfo: %s, topic: %s, props: %s\n", clientInfo, topic, props); } @Override public void onClientUnsubscribe(ClientInfo clientInfo, Property[] props, TopicFilter[] topic) {System.err.printf("[Java] onClientUnsubscribe: clientinfo: %s, topic: %s, props: %s\n", clientInfo, topic, props); } // Sessions @Override public void onSessionCreated(ClientInfo clientInfo) { System.err.printf("[Java] onSessionCreated: clientinfo: %s\n", clientInfo); } @Override public void onSessionSubscribed(ClientInfo clientInfo, Topic topic, SubscribeOption opts) { System.err.printf("[Java] onSessionSubscribed: clientinfo: %s, topic: %s\n", clientInfo, topic); } @Override public void onSessionUnsubscribed(ClientInfo clientInfo, Topic topic) { System.err.printf("[Java] onSessionUnsubscribed: clientinfo: %s, topic: %s\n", clientInfo, topic); } @Override public void onSessionResumed(ClientInfo clientInfo) { System.err.printf("[Java] onSessionResumed: clientinfo: %s\n", clientInfo); } @Override public void onSessionDiscarded(ClientInfo clientInfo) { System.err.printf("[Java] onSessionDiscarded: clientinfo: %s\n", clientInfo); } @Override public void onSessionTakeovered(ClientInfo clientInfo) { System.err.printf("[Java] onSessionTakeovered: clientinfo: %s\n", clientInfo); } @Override public void onSessionTerminated(ClientInfo clientInfo, Reason reason) { System.err.printf("[Java] onSessionTerminated: clientinfo: %s, reason: %s\n", clientInfo, reason); } // Messages @Override public Message onMessagePublish(Message message) { System.err.printf("[Java] onMessagePublish: message: %s\n", message); return message; } @Override public void onMessageDropped(Message message, Reason reason) { System.err.printf("[Java] onMessageDropped: message: %s, reason: %s\n", message, reason); } @Override public void onMessageDelivered(ClientInfo clientInfo, Message message) { System.err.printf("[Java] onMessageDelivered: clientinfo: %s, message: %s\n", clientInfo, message); } @Override public void onMessageAcked(ClientInfo clientInfo, Message message) { System.err.printf("[Java] onMessageAcked: clientinfo: %s, message: %s\n", clientInfo, message); }}
SampleHandler 重点包括两部分:
重载了 getActionOption 办法。该办法对信息(Message)关联的钩子进行配置,指定了需要生效的主题列表。
重载了 on<hookName> 办法,这些办法是实质处理钩子事件的回调函数,函数命名方式为各个钩子名叫作变体后前面加 on 前缀,变体方式为钩子名叫作去掉下划线后运用骆驼拼写法(CamelCase),例如,钩子client_connect对应的函数名为onClientConnect。EMQ X 客户端产生的事件,例如:连接、发布、订阅等,都会最后分发到这些钩子事件回调函数上,而后回调函数可对各属性及状态进行关联操作。示例程序中仅对各参数进行了打印输出。倘若只关心部分钩子事件,只需对这部分钩子事件的回调函数进行重载就可,不需要重载所有回调函数。
各回调函数的执行机会和支持的钩子列表与 EMQ X 内置的钩子完全一致,参见:Hooks - EMQ X(https://docs.emqx.io/broker/latest/en/advanced/hooks.html#hookpoint)
在实现自己的扩展程序时,最简单的方式亦是继承 DefaultCommunicationHandler 父类,该类对各钩子与回调函数的绑定进行了封装,并进一步封装了回调函数触及到的参数数据结构,以方便快速上手运用。
进阶研发
倘若对 Java 扩展程序的可控性需求更高,DefaultCommunicationHandler 类已没法满足需要时,能够经过实现 CommunicationHandler 接口,从更底层掌控代码规律,编写更灵活的扩展程序。 package emqx.extension.java.handler;public interface CommunicationHandler { public Object init(); public void deinit();}
init() 办法:用于初始化,声明扩展需要挂载那些钩子,以及挂载的配置
deinit() 办法:用于注销。
仔细数据格式说明,参见:设计文档 (https://github.com/emqx/emqx-extension-hook/blob/master/docs/design.md) 。
点击"阅读原文" ,认识更加多
↓↓↓
|