mica

时间:2023-11-30 本站 点击:1

一、简介

mica-mqtt 基于 t-io 实现的简单、低延迟、高性能 的 mqtt 物联网开源组件。使用详见 mica-mqtt gitee 源码 mica-mqtt-example 模块。

在多个朋友咨询 mica-mqtt 集群之后我添加了一个 mica-mqtt-broker 模块演示了基于 redis pub/sub 实现集群实现。

二、功能

[x] 支持 MQTT v3.1、v3.1.1 以及 v5.0 协议。

[x] 支持 websocket mqtt 子协议(支持 mqtt.js)。

[x] 支持 http rest api,http api 文档详见。

[x] 支持 MQTT client 客户端。

[x] 支持 MQTT server 服务端。

[x] 支持 MQTT 遗嘱消息。

[x] 支持 MQTT 保留消息。

[x] 支持自定义消息(mq)处理转发实现集群。

[x] MQTT 客户端 阿里云 mqtt 连接 demo。

[x] 支持 GraalVM 编译成本机可执行程序。

[x] 支持 Spring boot 项目快速接入(mica-mqtt-spring-boot-starter)。

[x] mica-mqtt-spring-boot-starter 支持对接 Prometheus + Grafana。

[x] 基于 redis pub/sub 实现集群,详见 mica-mqtt-broker 模块。

三、待办

[ ] 优化处理 mqtt session,以及支持部分 mqtt v5.0 新特性。

四、更新记录

✨ 添加 mica-mqtt-broker 模块,基于 redis pub/sub 实现 mqtt 集群。

✨ mica-mqtt-broker 基于 redis 实现客户端状态存储。

✨ mica-mqtt-broker 基于 redis 实现遗嘱、保留消息存储。

✨ mqtt-server http api 调整订阅和取消订阅,方便集群处理。

✨ mica-mqtt-spring-boot-example 添加 mqtt 和 http api 认证示例。

✨ 添加 mqtt 5 所有 ReasonCode。

✨ 优化解码 PacketNeededLength 计算。

? 修复遗嘱消息,添加消息类型。

? 修复 mqtt-server 保留消息匹配规则。

五、Spring boot 快速接入

5.1 添加依赖

<dependency><groupId>net.dreamlu</groupId><artifactId>mica-mqtt-spring-boot-starter</artifactId><version>1.1.2</version></dependency>

5.2 服务端配置示例

mqtt:server:enabled:true#是否开启,默认:trueip:127.0.0.1#服务端ip默认:127.0.0.1port:5883#端口,默认:1883name:Mica-Mqtt-Server#名称,默认:Mica-Mqtt-Serverbuffer-allocator:HEAP#堆内存和堆外内存,默认:堆内存heartbeat-timeout:120000#心跳超时,单位毫秒,默认:1000*120read-buffer-size:8092#接收数据的buffersize,默认:8092max-bytes-in-message:8092#消息解析最大bytes长度,默认:8092debug:true#如果开启prometheus指标收集建议关闭websocket-enable:true#开启websocket子协议,默认开启websocket-port:8083#websocket端口,默认:8083

5.3 服务端可实现接口(注册成 Spring Bean 即可)

接口是否必须说明IMqttServerAuthHandler是用于客户端认证IMqttMessageListener是消息监听IMqttConnectStatusListener是连接状态监听IMqttSessionManager否session 管理IMqttMessageStore集群是,单机否遗嘱和保留消息存储AbstractMqttMessageDispatcher集群是,单机否消息转发,(遗嘱、保留消息转发)IpStatListener否t-io ip 状态监听

5.4 Prometheus + Grafana 监控对接

得益于 t-io 良好的设计,监控指标直接对接的 t-iostat,目前支持下列指标,后期会不断完善。

支持得指标说明mqtt_connections_accepted共接受过连接数mqtt_connections_closed关闭过的连接数mqtt_connections_size当前连接数mqtt_messages_handled_packets已处理消息数mqtt_messages_handled_bytes已处理消息字节数mqtt_messages_received_packets已接收消息数mqtt_messages_received_bytes已处理消息字节数mqtt_messages_send_packets已发送消息数mqtt_messages_send_bytes已发送消息字节数

关于 mica-mqtt-spring-boot-starter 更多请查看文档:https://gitee.com/596392912/mica-mqtt/tree/master/mica-mqtt-spring-boot-starter

六、普通 java 项目接入

6.1 maven 依赖

<dependency><groupId>net.dreamlu</groupId><artifactId>mica-mqtt-core</artifactId><version>1.1.2</version></dependency>

6.2 mica-mqtt 客户端

//初始化mqtt客户端MqttClientclient=MqttClient.create().ip("127.0.0.1").port(1883)//默认:1883.username("admin").password("123456").version(MqttVersion.MQTT_5)//默认:3_1_1.clientId("xxxxxx")//默认:MICA-MQTT-前缀和36进制的纳秒数.connect();//连接//消息订阅,同类方法subxxxclient.subQos0("/test/#",(topic,payload)->{logger.info(topic+'\t'+ByteBufferUtil.toString(payload));});//取消订阅client.unSubscribe("/test/#");//发送消息client.publish("/test/client",ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));//断开连接client.disconnect();//重连client.reconnect();//停止client.stop();

6.3 mica-mqtt 服务端

//注意:为了能接受更多链接(降低内存),请添加jvm参数-Xss129kMqttServermqttServer=MqttServer.create()//默认:127.0.0.1.ip("127.0.0.1")//默认:1883.port(1883)//默认为:8092(mqtt默认最大消息大小),为了降低内存可以减小小此参数,如果消息过大t-io会尝试解析多次(建议根据实际业务情况而定).readBufferSize(512)//自定义认证.authHandler((clientId,userName,password)->true)//消息监听.messageListener((clientId,topic,mqttQoS,payload)->{logger.info("clientId:{}topic:{}mqttQoS:{}message:{}",clientId,topic,mqttQoS,ByteBufferUtil.toString(payload));}).debug()//开启t-iodebug信息日志.start();//发送给某个客户端mqttServer.publish("clientId","/test/123",ByteBuffer.wrap("mica最牛皮".getBytes()));//发送给所有在线监听这个topic的客户端mqttServer.publishAll("/test/123",ByteBuffer.wrap("mica最牛皮".getBytes()));//停止服务mqttServer.stop();

七、集群演示


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/IOT/2527.html