从 v4.1 版本起始,EMQ X 供给了专门的多语言支持插件 (emqx_extension_hook(https://github.com/emqx/emqx-extension-hook) ,现已支持运用其他编程语言来处理 EMQ X 中的钩子事件,研发者能够运用 Python 或 Java 快速研发自己的插件,在官方功能的基本上进行扩展,满足自己的业务场景。例如:
验证某客户端的登录权限:客户端连接时触发对应函数,经过参数获取客户端信息后经过读取数据库、比对等操作判定是不是有登录权限
记录客户端在线状态与上下线历史:客户端状态变动时触发对应函数,经过参数获取客户端信息,改写数据库中客户端在线状态
校验某客户端的 PUB/SUB 的操作权限:发布/订阅时触发对应函数,经过参数获取客户端信息与当前主题,判定客户端是不是有对应的操作权限
处理会话 (Sessions) 和 信息 (Message) 事件,实现订阅关系与信息处理/存储:信息发布、状态变动时触发对应函数,获取当前客户端信息、信息状态与消息内容,转发到 Kafka 或数据库进行存储。
注:信息(Message) 类钩子,仅在企业版中支持。
Python 和 Java 驱动基于 Erlang/OTP-Port (https://erlang.org/doc/tutorial/c_port.html)进程间通信实现,本身拥有非常高的吞吐性能,本文以 Python 拓展为例介绍 EMQ X 跨语言拓展运用方式。
Python 拓展运用示例
需求
EMQ X 所在服务器需安装 Python 3.6 以上版本
运用过程
经过 pip 安装 Python SDK (https://pypi.org/project/emqx-extension-sdk/)
调整 EMQ X 配置,保证关联配置项正确指向 Python 项目
引入 SDK 编写代码
安装
经过 pip 命令在本地安装 SDK,保证运用 pip3 进行安装: pip3 install emqx-extension-sdk
修改配置
修改 emqx-extension-hook 插件配置,正确运用拓展: ## Setup the supported drivers#### Value: python2 | python3 | javaexhook.drivers = python3## Search path for scripts/libraryexhook.drivers.python3.path = data/extension/hooks.py## Call timeout#### Value: Duration##exhook.drivers.python3.call_timeout = 5s## Initial module name## Your filename or module nameexhook.drivers.python3.init_module = hooks
编写代码
在 emqx/data/extension 目录下新建 hooks.py 文件,引入 SDK 编写业务规律,示例程序如下: ## data/extension/hooks.pyfromemqx_extension.hooksimport EmqxHookSdk, hooks_handlerfrom emqx_extension.types import EMQX_CLIENTINFO_PARSE_T, EMQX_MESSAGE_PARSE_T# 继承 SDK HookSdk 类class CustomHook(EmqxHookSdk): # 运用装饰器注册 hooks @hooks_handler() def on_client_connect(self, conninfo: EMQX_CLIENTINFO_PARSE_T = None, props: dict = None,state: list = None): print(f[Python SDK] [on_client_connect] {conninfo.clientid} connecte) @hooks_handler() def on_client_connected(self,clientinfo: EMQX_CLIENTINFO_PARSE_T, state: list = None): print( f[Python SDK] [on_client_connected] {clientinfo.clientid} connected) @hooks_handler() def on_client_check_acl(self, clientinfo: EMQX_CLIENTINFO_PARSE_T, pubsub: str, topic: str, result: bool, state: tuple) -> bool: print( f[Python SDK] [on_client_check_acl] {clientinfo.username}check ACL:{pubsub} {topic}) # 用户名为空时,ACL 验证不经过 if clientinfo.username == : return False return True @hooks_handler() def on_client_authenticate(self, clientinfo: EMQX_CLIENTINFO_PARSE_T, authresult, state) -> bool: print( f[Python SDK] [on_client_authenticate] {clientinfo.clientid} authenticate) # clientid 不为空时,验证经过 ifclientinfo.clientid != : return True return False # on_message_* 仅支持企业版 @hooks_handler() def on_message_publish(self, message: EMQX_MESSAGE_PARSE_T, state): print( f[Python SDK] [on_message_publish]{message.topic} {message.payload})emqx_hook = CustomHook(hook_module=f{__name__}.emqx_hook)def init(): returnemqx_hook.start()def deinit(): return
起步
起步 emqx_extension_hook 插件,倘若配置错误或代码编写错误将没法正常起步。起步后尝试创立 MQTT 连接并观察业务运行状况。 ./bin/emqx_ctl plugins load emqx_extension_hook 进阶研发 日前 EMQ X Python 拓展 SDK 是开源的,倘若对可控性、性能需求更高,或需要运用 Python 2.7 版本的运行环境,欢迎贡献代码或基于原始示例进行研发:
代码仓库:emqx-extension-python-sdk (https://github.com/emqx/emqx-extension-python-sdk)
Python 原始示例,可运用该示例自动封装:emqx-extension-hook main.py (https://github.com/emqx/emqx-extension-hook/blob/master/test/scripts/main.py)
点击"阅读原文" ,认识更加多
↓↓↓
|