mqtt
This commit is contained in:
parent
670740979c
commit
e0cb765540
|
|
@ -0,0 +1,9 @@
|
||||||
|
package com.ruoyi.common.mqtt.domain;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public class MqttBody implements Serializable {
|
||||||
|
private int code;
|
||||||
|
private String msg;
|
||||||
|
private Object body;
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,10 @@
|
||||||
|
package com.ruoyi.common.mqtt.domain;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public class MqttHead implements Serializable {
|
||||||
|
|
||||||
|
private int type;
|
||||||
|
private String topic;
|
||||||
|
private MqttBody body;
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,38 @@
|
||||||
|
package com.ruoyi.common.mqtt.service;
|
||||||
|
|
||||||
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
|
||||||
|
public class Callback implements MqttCallback {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(Callback.class);
|
||||||
|
/**
|
||||||
|
* MQTT 断开连接会执行此方法
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void connectionLost(Throwable throwable) {
|
||||||
|
log.info("断开了MQTT连接 :{}", throwable.getMessage());
|
||||||
|
log.error(throwable.getMessage(), throwable);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* publish发布成功后会执行到这里
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
||||||
|
log.info("发布消息成功");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* subscribe订阅后得到的消息会执行到这里
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void messageArrived(String topic, MqttMessage message) throws Exception {
|
||||||
|
// TODO 此处可以将订阅得到的消息进行业务处理、数据存储
|
||||||
|
log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload()));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,123 @@
|
||||||
|
package com.ruoyi.common.mqtt.service;
|
||||||
|
|
||||||
|
import org.eclipse.paho.client.mqttv3.*;
|
||||||
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class MqttService {
|
||||||
|
|
||||||
|
private String HOST = "tcp://127.0.0.1:1883"; //mqtt服务器的地址和端口号
|
||||||
|
private final String clientId = "DC" + (int) (Math.random() * 100000000);
|
||||||
|
private MqttClient mqttClient;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 客户端connect连接mqtt服务器
|
||||||
|
*
|
||||||
|
* @param userName 用户名
|
||||||
|
* @param passWord 密码
|
||||||
|
* @param mqttCallback 回调函数
|
||||||
|
**/
|
||||||
|
public void setMqttClient(String userName, String passWord, MqttCallback mqttCallback) throws MqttException {
|
||||||
|
MqttConnectOptions options = mqttConnectOptions(userName, passWord);
|
||||||
|
if (mqttCallback == null) {
|
||||||
|
mqttClient.setCallback(new Callback());
|
||||||
|
} else {
|
||||||
|
mqttClient.setCallback(mqttCallback);
|
||||||
|
}
|
||||||
|
mqttClient.connect(options);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MQTT连接参数设置
|
||||||
|
*/
|
||||||
|
private MqttConnectOptions mqttConnectOptions(String userName, String passWord) throws MqttException {
|
||||||
|
mqttClient = new MqttClient(HOST, clientId, new MemoryPersistence());
|
||||||
|
MqttConnectOptions options = new MqttConnectOptions();
|
||||||
|
options.setUserName(userName);
|
||||||
|
options.setPassword(passWord.toCharArray());
|
||||||
|
options.setConnectionTimeout(10);///默认:30
|
||||||
|
options.setAutomaticReconnect(true);//默认:false
|
||||||
|
options.setCleanSession(false);//默认:true
|
||||||
|
//options.setKeepAliveInterval(20);//默认:60
|
||||||
|
return options;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 关闭MQTT连接
|
||||||
|
*/
|
||||||
|
public void close() throws MqttException {
|
||||||
|
mqttClient.close();
|
||||||
|
mqttClient.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 向某个主题发布消息 默认qos:1
|
||||||
|
*
|
||||||
|
* @param topic:发布的主题
|
||||||
|
* @param msg:发布的消息
|
||||||
|
*/
|
||||||
|
public void pub(String topic, String msg) throws MqttException {
|
||||||
|
MqttMessage mqttMessage = new MqttMessage();
|
||||||
|
//mqttMessage.setQos(2);
|
||||||
|
mqttMessage.setPayload(msg.getBytes());
|
||||||
|
MqttTopic mqttTopic = mqttClient.getTopic(topic);
|
||||||
|
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
|
||||||
|
token.waitForCompletion();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 向某个主题发布消息
|
||||||
|
*
|
||||||
|
* @param topic: 发布的主题
|
||||||
|
* @param msg: 发布的消息
|
||||||
|
* @param qos: 消息质量 Qos:0、1、2
|
||||||
|
*/
|
||||||
|
public void pub(String topic, String msg, int qos) throws MqttException {
|
||||||
|
MqttMessage mqttMessage = new MqttMessage();
|
||||||
|
mqttMessage.setQos(qos);
|
||||||
|
mqttMessage.setPayload(msg.getBytes());
|
||||||
|
MqttTopic mqttTopic = mqttClient.getTopic(topic);
|
||||||
|
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
|
||||||
|
token.waitForCompletion();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 订阅某一个主题 ,此方法默认的的Qos等级为:1
|
||||||
|
*
|
||||||
|
* @param topic 主题
|
||||||
|
*/
|
||||||
|
public void sub(String topic) throws MqttException {
|
||||||
|
mqttClient.subscribe(topic);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 订阅某一个主题,可携带Qos
|
||||||
|
*
|
||||||
|
* @param topic 所要订阅的主题
|
||||||
|
* @param qos 消息质量:0、1、2
|
||||||
|
*/
|
||||||
|
public void sub(String topic, int qos) throws MqttException {
|
||||||
|
mqttClient.subscribe(topic, qos);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* main函数自己测试用
|
||||||
|
*/
|
||||||
|
public static void main(String[] args) throws MqttException {
|
||||||
|
MqttService mqttConnect = new MqttService();
|
||||||
|
mqttConnect.setMqttClient("admin", "public", new Callback());
|
||||||
|
mqttConnect.pub("com/iot/init", "Mr.Qu" + (int) (Math.random() * 100000000));
|
||||||
|
//mqttConnect.sub("com/iot/init");
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* main函数自己测试用
|
||||||
|
*/
|
||||||
|
public static void main1(String[] args) throws MqttException {
|
||||||
|
MqttService mqttConnect = new MqttService();
|
||||||
|
mqttConnect.setMqttClient("admin", "public", new Callback());
|
||||||
|
mqttConnect.pub("com/iot/init", "Mr.Qu" + (int) (Math.random() * 100000000));
|
||||||
|
//mqttConnect.sub("com/iot/init");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,12 @@
|
||||||
|
package com.ruoyi.common.mqtt.service;
|
||||||
|
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||||
|
|
||||||
|
public class ss {
|
||||||
|
public static void main(String[] args) throws MqttException {
|
||||||
|
MqttService mqttConnect = new MqttService();
|
||||||
|
mqttConnect.setMqttClient("admin", "public", new Callback());
|
||||||
|
mqttConnect.pub("com/iot/init", "Mr.Qu" + (int) (Math.random() * 100000000));
|
||||||
|
//mqttConnect.sub("com/iot/init");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,7 +0,0 @@
|
||||||
package com.ruoyi.common.redis.service;
|
|
||||||
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
@Component
|
|
||||||
public class MqttService {
|
|
||||||
}
|
|
||||||
Loading…
Reference in New Issue