Browse Source

clientForSend 和 subscribe 是两套 MQTT

master
修宁 6 months ago
parent
commit
dd4e27d0d0
  1. 5
      servo/src/main/java/com/galaxis/rcs/ptr/AmrMessageHandler.java
  2. 76
      servo/src/main/java/com/galaxis/rcs/ptr/PtrMqttClient.java

5
servo/src/main/java/com/galaxis/rcs/ptr/AmrMessageHandler.java

@ -19,6 +19,7 @@ import org.clever.core.BannerUtils;
import org.clever.core.json.JsonWrapper;
import org.clever.core.mapper.BeanCopyUtils;
import org.clever.data.redis.Redis;
import org.clever.data.redis.RedisAdmin;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
@ -65,7 +66,7 @@ public class AmrMessageHandler {
private static final TypeReference<AmrMessage<AmrExceptionMessage>> typeRef20250Message = new TypeReference<AmrMessage<AmrExceptionMessage>>() {
};
private static final Redis redis = AppContextHolder.getBean("defaultRedis", Redis.class, true);
private static final Redis redis = RedisAdmin.getRedis();
public final LogisticsRuntime runtime;
private volatile PtrMqttClient ptrMqttClient;
@ -162,7 +163,7 @@ public class AmrMessageHandler {
break;
case AMR_HEARTBEAT:
AmrMessage<AmrHeartbeatMessage> heartbeatMessage = JacksonUtils.parse(json, typeRef20100Message);
sendCmdHeartBeat(heartbeatMessage.content.VehicleId + "");
// sendCmdHeartBeat(heartbeatMessage.content.VehicleId + "");
handleHeartbeatMessage(agvItem, heartbeatMessage.content);
break;
case AMR_BOOT:

76
servo/src/main/java/com/galaxis/rcs/ptr/PtrMqttClient.java

@ -17,77 +17,74 @@ import java.util.concurrent.CountDownLatch;
public class PtrMqttClient implements MqttCallback {
public final AmrMessageHandler amrMessageHandler;
public final EnvPayload.MqttConfig mqttConfig;
private final MqttClient clientForSend;
private final MqttClient client;
private final String clientId;
private CountDownLatch connectLatch = new CountDownLatch(1);
@SneakyThrows
public PtrMqttClient(AmrMessageHandler handler, EnvPayload.MqttConfig mqttConfig, String clientId) {
this.amrMessageHandler = handler;
this.mqttConfig = mqttConfig;
this.clientId = clientId;
String brokerUrl = mqttConfig.getBrokerUrl(); // "tcp://10.10.203.239:1885"
String username = mqttConfig.getUsername(); // admin
String password = mqttConfig.getPassword(); // admin
clientForSend = new MqttClient(brokerUrl, clientId + "_send"); // String clientId = "LUOYIFAN-PC_send"
client = new MqttClient(brokerUrl, clientId); // String clientId = "LUOYIFAN-PC"
MqttConnectionOptions options = new MqttConnectionOptions();
options.setServerURIs(new String[]{brokerUrl});
options.setAutomaticReconnect(true);
options.setUserName(username);
options.setPassword(password.getBytes());
options.setConnectionTimeout(10);
options.setConnectionTimeout(1);
options.setKeepAliveInterval(20);
options.setExecutorServiceTimeout(1);
client.setCallback(this);
client.connect(options);
BannerUtils.printConfig(log, "MQTT 开启监听", new String[]{
clientForSend.connect(options);
connectLatch.await();
BannerUtils.printConfig(log, "MQTT 开启监听成功", new String[]{
"brokerUrl: " + brokerUrl,
"userName: " + username,
"clientId: " + clientId,
"topic: /agv_robot/status"});
client.connect(options);
connectLatch.await();
client.subscribe("/agv_robot/status", 0);
}
@SneakyThrows
public void publish(String topic, String payload) {
// 这个一发送就阻塞
this.client.publish(topic, payload.getBytes(StandardCharsets.UTF_8), 0, false);
public void publish(String topic, String payloadString) {
MqttMessage message = new MqttMessage(payloadString.getBytes(StandardCharsets.UTF_8));
message.setQos(0);
message.setRetained(false);
if (this.clientForSend.isConnected()) {
this.clientForSend.publish(topic, message);
} else {
throw new RuntimeException("MQTT client is not connected, cannot publish message.");
}
log.info("Message published to topic {}: finish", topic);
}
@SneakyThrows
public static void main(String[] args) {
// 这个测试是好的
String brokerUrl = "tcp://10.10.203.239:1885";
String clientId = InetAddress.getLocalHost().getHostName(); // LUOYIFAN-PC
String topic = "/agv_robot/status";
String content = "Hello from Java";
String username = "admin";
String password = "admin";
int qos = 0;
MqttClient client = new MqttClient(brokerUrl, clientId);
MqttConnectionOptions options = new MqttConnectionOptions();
options.setServerURIs(new String[]{brokerUrl});
options.setAutomaticReconnect(true);
options.setUserName(username);
options.setPassword(password.getBytes());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
System.out.println("Connecting to broker...");
client.connect(options);
System.out.println("Connected");
System.out.println("Publishing message...");
long start = System.currentTimeMillis();
client.publish(topic, content.getBytes(), qos, false); // 这个发送是好的
System.out.println("Published in " + (System.currentTimeMillis() - start) + " ms");
client.disconnect();
System.out.println("Disconnected");
var config = new EnvPayload.MqttConfig();
config.setBrokerUrl("tcp://10.10.203.239:1885");
config.setUsername("admin");
config.setPassword("admin");
PtrMqttClient mqttClient = new PtrMqttClient(null, config, "LUOYIFAN-PC");
for (int i = 0; i < 100; i++) {
mqttClient.publish("/agv_robot/status", "Hello from Java " + i);
}
log.info("Message published successfully.");
}
@Override
@ -105,7 +102,11 @@ public class PtrMqttClient implements MqttCallback {
switch (topic) {
case "/agv_robot/status":
try {
amrMessageHandler.handleAgvRobotStatusMessage(message);
if (this.amrMessageHandler == null) {
log.info("amrMessageHandler is null, skipping message handling {}", new String(message.getPayload(), StandardCharsets.UTF_8));
return;
}
this.amrMessageHandler.handleAgvRobotStatusMessage(message);
} catch (Exception e) {
log.error("amrMessageHandler.handleAgvRobotStatusMessage 异常", e);
@ -118,6 +119,7 @@ public class PtrMqttClient implements MqttCallback {
@Override
public void deliveryComplete(IMqttToken token) {
log.info("Message delivery complete: {}", token);
}
@Override

Loading…
Cancel
Save