0%

Springboot增加MQTT协议

大家可以先到Spring 官方文档,学习整合mqtt ,地址是:Spring整合Mqtt

然后我们直接使用 Springboot 2.7.5 + Mqtt 5.5.9 搭建环境

1、启动我们的MQTT服务端

1
mosquitto -d -v

2、导入依赖

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.9</version>
</dependency>

<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>

3、编写配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
@Configuration
public class MqttConfig {

/**
* 1、先创建连接
*/

/**
* 创建MqttPahoClientFactory,设置MQTT Broker连接属性,如果使用SSL验证,也在这里设置。
* @return factory
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();

// 设置代理端的URL地址,可以是多个
options.setServerURIs(new String[]{"tcp://127.0.0.1:1883"});

factory.setConnectionOptions(options);
return factory;
}




/**
* 2、入站通道
*/
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}

/**
* 入站
*/
@Bean
public MessageProducer inbound() {
// Paho客户端消息驱动通道适配器,主要用来订阅主题
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("consumerClient-paho",
mqttClientFactory(), "boat", "collector", "battery", "+/sensor");
adapter.setCompletionTimeout(5000);

// Paho消息转换器
DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
// 按字节接收消息
// defaultPahoMessageConverter.setPayloadAsBytes(true);
adapter.setConverter(defaultPahoMessageConverter);
adapter.setQos(1); // 设置QoS
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}


/**
* 3、消息转化,中间站
*/

@Bean
// ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel。
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
String payload = message.getPayload().toString();

// byte[] bytes = (byte[]) message.getPayload(); // 收到的消息是字节格式
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();

// 根据主题分别进行消息处理。
if (topic.matches(".+/sensor")) { // 匹配:1/sensor
String sensorSn = topic.split("/")[0];
System.out.println("传感器" + sensorSn + ": 的消息: " + payload);
} else if (topic.equals("collector")) {
System.out.println("采集器的消息:" + payload);
} else {
System.out.println("丢弃消息:主题[" + topic + "],负载:" + payload);
}

};
}


/**
* 4、消息出去
*/

/**
* 出站通道
*/
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}

/**
* 出站
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler outbound() {

// 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publishClient", mqttClientFactory());
messageHandler.setAsync(true); // 如果设置成true,即异步,发送消息时将不会阻塞。
messageHandler.setDefaultTopic("command");
messageHandler.setDefaultQos(1); // 设置默认QoS

// Paho消息转换器
DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();

// defaultPahoMessageConverter.setPayloadAsBytes(true); // 发送默认按字节类型发送消息
messageHandler.setConverter(defaultPahoMessageConverter);
return messageHandler;
}
}

4、编写实体类,用于接收Http请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class MyMessage {
private String topic;
private String content;

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public String getContent() {
return content;
}

public void setContent(String content) {
this.content = content;
}
}

写一个接口

1
2
3
4
5
6
7
8
9
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
// 定义重载方法,用于消息发送
void sendToMqtt(String payload);
// 指定topic进行消息发送
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}

编写controller,接收请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RestController
public class MqttController {

@Resource
private MqttGateway mqttGateway;

@PostMapping("/send")
public String send(@RequestBody MyMessage myMessage) {
// 发送消息到指定主题
mqttGateway.sendToMqtt(myMessage.getTopic(), 1, myMessage.getContent());
return "send topic: " + myMessage.getTopic()+ ", message : " + myMessage.getContent();
}
}

启动项目,然后发送Http请求

1
127.0.0.1:8085/send