大家可以先到Spring 官方文档,学习整合mqtt ,地址是:Spring整合Mqtt
然后我们直接使用 Springboot 2.7.5 + Mqtt 5.5.9 搭建环境
1、启动我们的MQTT服务端
1 | mosquitto -d -v |
2、导入依赖
1 | <dependency> |
3、编写配置类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
public class MqttConfig {
/**
* 1、先创建连接
*/
/**
* 创建MqttPahoClientFactory,设置MQTT Broker连接属性,如果使用SSL验证,也在这里设置。
* @return factory
*/
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
// 设置代理端的URL地址,可以是多个
options.setServerURIs(new String[]{"tcp://127.0.0.1:1883"});
factory.setConnectionOptions(options);
return factory;
}
/**
* 2、入站通道
*/
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/**
* 入站
*/
public MessageProducer inbound() {
// Paho客户端消息驱动通道适配器,主要用来订阅主题
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("consumerClient-paho",
mqttClientFactory(), "boat", "collector", "battery", "+/sensor");
adapter.setCompletionTimeout(5000);
// Paho消息转换器
DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
// 按字节接收消息
// defaultPahoMessageConverter.setPayloadAsBytes(true);
adapter.setConverter(defaultPahoMessageConverter);
adapter.setQos(1); // 设置QoS
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
/**
* 3、消息转化,中间站
*/
// ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel。
public MessageHandler handler() {
return message -> {
String payload = message.getPayload().toString();
// byte[] bytes = (byte[]) message.getPayload(); // 收到的消息是字节格式
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
// 根据主题分别进行消息处理。
if (topic.matches(".+/sensor")) { // 匹配:1/sensor
String sensorSn = topic.split("/")[0];
System.out.println("传感器" + sensorSn + ": 的消息: " + payload);
} else if (topic.equals("collector")) {
System.out.println("采集器的消息:" + payload);
} else {
System.out.println("丢弃消息:主题[" + topic + "],负载:" + payload);
}
};
}
/**
* 4、消息出去
*/
/**
* 出站通道
*/
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* 出站
*/
public MessageHandler outbound() {
// 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publishClient", mqttClientFactory());
messageHandler.setAsync(true); // 如果设置成true,即异步,发送消息时将不会阻塞。
messageHandler.setDefaultTopic("command");
messageHandler.setDefaultQos(1); // 设置默认QoS
// Paho消息转换器
DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
// defaultPahoMessageConverter.setPayloadAsBytes(true); // 发送默认按字节类型发送消息
messageHandler.setConverter(defaultPahoMessageConverter);
return messageHandler;
}
}
4、编写实体类,用于接收Http请求1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class MyMessage {
private String topic;
private String content;
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
写一个接口
1 |
|
编写controller,接收请求
1 |
|
启动项目,然后发送Http请求
1 | 127.0.0.1:8085/send |