2 消息通信
主题列表
发布主题不能带主题通配符号(MQTT标准),订阅主题允许带主题通配符。APP 不允许用通配符代替 ProductKey和 deviceId 两个字段。只有服务器端允许使用通配符代替deviceId字段。
| Topic | 权限 | 支持消息类型 | 描述 |
|---|---|---|---|
| {ProductKey}/{deviceId}/control | 发布 | device.hex | 发布消息给指定设备 |
| {ProductKey}/{deviceId}/deviceSubscribe | 订阅 | device.status, device.state ,device.response | 订阅指定设备的消息 |
| {ProductKey}/{deviceId}/{custom} | N/A | N/A | 创建自定义的主题,只要匹配{ProductKey},{deviceId} 即可收发 |
消息类型
| 消息类型 | 权限 | 消息格式 | 描述 |
|---|---|---|---|
| device.hex | 发布 | JSON | 发布16进制字符串控制设备 |
| device.state | 订阅 | JSON | 设备的在线离线状态 |
| device.status | 订阅 | JSON | 订阅设备状态消息 |
| device.response | 订阅 | JSON | 订阅设备响应消息 |
device.hex
body 的内容是16进制协议。
{
"type":"device.hex",
"body": "FFFFFFFFFFFF"
}
device.state
body 如果等于 true,代表设备在线,等于 false,代表设备离线
{
"type":"device.state",
"body": false|true
}
device.status
body 的内容是16进制协议
{
"type":"device.status",
"body":"81352361000500ff00000700f035"
}
device.response
body 的内容是16进制协议
{
"type":"device.response",
"body":"FFFFFFFFFFF"
}
云端接入
云端接入需要使用 springboot-zengge-mqtt库。springboot-zengge-mqtt 实现了springboot与mqtt的集成。注解订阅与自动配置。
使用教程
springboot-zengge-mqtt 项目进行打包并且添加到本地创库
git clone https://gitee.com/luoke365/springboot-zengge-mqtt.git
cd springboot-zengge-mqtt
mvn clean package
cd target
mvn install:install-file -Dfile=springboot-zengge-mqtt-1.0-SNAPSHOT-jar-with-dependencies.jar -DgroupId=com.zengge.mqtt -DartifactId=springboot-zengge-mqtt -Dversion=1.0-SNAPSHOT -Dpackaging=jar
添加Maven依赖到你的项目
<dependency>
<groupId>com.zengge.mqtt</groupId>
<artifactId>springboot-zengge-mqtt</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
参数配置
mqtt:
list:
- instanceName: device_mqtt
serverKey: ${serverKey}
serverSecret: ${serverSecret}
productKey: ${productKey}
keepAlive: 60
ssl: false
host: cnwifidevsdk.magichue.net
port: 1883
poolConfig:
initMax: 3
maxClients: 4
waitTime: 3000
minIdle: 3
keepAliveInterval: 60
参数说明
| 字段 | 类型 | 描述 |
|---|---|---|
| instanceName | String | instanceName 用来区分多台不域名的服务器或者serverKey的连接 |
| serverKey | String | serverKey |
| serverSecret | String | serverSecret |
| productKey | Long | productKey |
| keepAlive | Integer | MQTT 连接心跳 |
| ssl | Boolean | 是否开启SSL |
| host | String | MQTT服务器的域名 |
| port | Integer | MQTT服务器端口 |
| poolConfig | Object | 发送消息连接池 |
| initMax | Integer | 连接池初始大小 |
| maxClients | Integer | 最大连接数 |
| waitTime | Integer | 连接等待时间 |
| minIdle | Integer | 空闲大小 |
| keepAliveInterval | Integer | 连接池心跳 |
使用范例
1 启动发布者与订阅者
使用@StartupPublisher和@StartupSubscriber注解启动发布者和订阅者,订阅者需要添加扫描包路径,实现订阅者类的自动注入。
@SpringBootApplication
@StartupPublisher
@StartupSubscriber(basePackages = {
"com.zengge.demo.subscriber"
})
public class MainApplication {
public static void main(String[] args){
SpringApplication springApplication = new SpringApplication(MainApplication.class);
springApplication.run(args);
}
}
2 实现消息订阅
- @MQTTSubscriber注解标示订阅者,如果没有该注解的类会给@StartupSubscriber的自动注入忽略。
- @SubscribeTopic 在订阅者的方法下添加@SubscribeTopic并且设置instanceName,topic,qos参数,MQTT消息会自动匹配该方法上的注解的Topic参数进行方法回调。
@MQTTSubscriber
public class Subscriber {
private final static Logger logger = LoggerFactory.getLogger(Subscriber.class);
@SubscribeTopic(instanceName = "device_mqtt",
topic = {"1431502354/+/deviceSubscribe"}, qos = {0})
public void deviceSubscribe(@ReceiveTopic String topic,
@Payload ReceivePayload payload){
logger.info("topic : "+topic+" payload "+payload.toString());
}
}
3 发布消息
- @Resource 如果存在多个instanceName,@Resource的name参数必须要指定。name规则是instanceName_publisher
@RestController
public class MQTTController {
@Resource(name = "device_mqtt_publisher")
private MQTTPahoPublishers mqttPahoPublishers;
public MQTTController() {
}
@PostMapping("sendMessage")
public String sendMessage(@RequestBody ReceivePayload payload){
mqttPahoPublishers.publishQos("1431502354/7382E44D1BB34AC0A8099FDC89F7D50B/control",
payload,false);
return "ok";
}
}