2 消息通信

主题列表

发布主题不能带主题通配符号(MQTT标准),订阅主题允许带主题通配符。APP 不允许用通配符代替 ProductKey和 deviceId 两个字段。只有服务器端允许使用通配符代替deviceId字段。

Topic 权限 支持消息类型 描述
{ProductKey}/{deviceId}/control 发布 device.hex 发布消息给指定设备
{ProductKey}/{deviceId}/deviceSubscribe 订阅 device.status, device.state ,device.response 订阅指定设备的消息
{ProductKey}/{deviceId}/{custom} N/A N/A 创建自定义的主题,只要匹配{ProductKey},{deviceId} 即可收发

消息类型

消息类型 权限 消息格式 描述
device.hex 发布 JSON 发布16进制字符串控制设备
device.state 订阅 JSON 设备的在线离线状态
device.status 订阅 JSON 订阅设备状态消息
device.response 订阅 JSON 订阅设备响应消息

device.hex

body 的内容是16进制协议。
{
    "type":"device.hex",    
    "body": "FFFFFFFFFFFF"
}

device.state

body 如果等于 true,代表设备在线,等于 false,代表设备离线

{
    "type":"device.state",
    "body": false|true
}

device.status

body 的内容是16进制协议

{
    "type":"device.status",
    "body":"81352361000500ff00000700f035"
}

device.response

body 的内容是16进制协议

{
    "type":"device.response",
    "body":"FFFFFFFFFFF"
}

云端接入

云端接入需要使用 springboot-zengge-mqtt库。springboot-zengge-mqtt 实现了springboot与mqtt的集成。注解订阅与自动配置。

使用教程

springboot-zengge-mqtt 项目进行打包并且添加到本地创库

    git clone https://gitee.com/luoke365/springboot-zengge-mqtt.git

    cd springboot-zengge-mqtt

    mvn clean package

    cd target

    mvn install:install-file -Dfile=springboot-zengge-mqtt-1.0-SNAPSHOT-jar-with-dependencies.jar -DgroupId=com.zengge.mqtt -DartifactId=springboot-zengge-mqtt -Dversion=1.0-SNAPSHOT -Dpackaging=jar

添加Maven依赖到你的项目

    <dependency>
        <groupId>com.zengge.mqtt</groupId>
        <artifactId>springboot-zengge-mqtt</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>

参数配置

mqtt:
  list:
    - instanceName: device_mqtt
      serverKey: ${serverKey}
      serverSecret: ${serverSecret}
      productKey: ${productKey}
      keepAlive: 60
      ssl: false
      host: cnwifidevsdk.magichue.net
      port: 1883
      poolConfig:
        initMax: 3
        maxClients: 4
        waitTime: 3000
        minIdle: 3
        keepAliveInterval: 60

参数说明

字段 类型 描述
instanceName String instanceName 用来区分多台不域名的服务器或者serverKey的连接
serverKey String serverKey
serverSecret String serverSecret
productKey Long productKey
keepAlive Integer MQTT 连接心跳
ssl Boolean 是否开启SSL
host String MQTT服务器的域名
port Integer MQTT服务器端口
poolConfig Object 发送消息连接池
initMax Integer 连接池初始大小
maxClients Integer 最大连接数
waitTime Integer 连接等待时间
minIdle Integer 空闲大小
keepAliveInterval Integer 连接池心跳

使用范例

1 启动发布者与订阅者

使用@StartupPublisher和@StartupSubscriber注解启动发布者和订阅者,订阅者需要添加扫描包路径,实现订阅者类的自动注入。


@SpringBootApplication
@StartupPublisher
@StartupSubscriber(basePackages = {
        "com.zengge.demo.subscriber"
})
public class MainApplication {
    public static void main(String[] args){

        SpringApplication springApplication = new SpringApplication(MainApplication.class);
        springApplication.run(args);
    }
}

2 实现消息订阅

  • @MQTTSubscriber注解标示订阅者,如果没有该注解的类会给@StartupSubscriber的自动注入忽略。
  • @SubscribeTopic 在订阅者的方法下添加@SubscribeTopic并且设置instanceName,topic,qos参数,MQTT消息会自动匹配该方法上的注解的Topic参数进行方法回调。

@MQTTSubscriber
public class Subscriber {
    private final static Logger logger = LoggerFactory.getLogger(Subscriber.class);

    @SubscribeTopic(instanceName = "device_mqtt",
            topic = {"1431502354/+/deviceSubscribe"}, qos = {0})
    public void deviceSubscribe(@ReceiveTopic String topic,
                                @Payload ReceivePayload payload){

        logger.info("topic : "+topic+" payload "+payload.toString());
    }
}

3 发布消息

  • @Resource 如果存在多个instanceName,@Resource的name参数必须要指定。name规则是instanceName_publisher

@RestController
public class MQTTController {

    @Resource(name = "device_mqtt_publisher")
    private MQTTPahoPublishers mqttPahoPublishers;

    public MQTTController() {

    }

    @PostMapping("sendMessage")
    public String sendMessage(@RequestBody  ReceivePayload payload){

        mqttPahoPublishers.publishQos("1431502354/7382E44D1BB34AC0A8099FDC89F7D50B/control",
                payload,false);
        return "ok";
    }

}

results matching ""

    No results matching ""