From e66ff8a4c51185c353e6d33ab76fdf01283aab77 Mon Sep 17 00:00:00 2001 From: yvan Date: Fri, 27 Jun 2025 10:26:04 +0800 Subject: [PATCH] =?UTF-8?q?client-id=20=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/com/galaxis/rcs/RCSService.java | 4 ++ .../com/galaxis/rcs/ptr/AmrMessageHandler.java | 8 +-- .../java/com/galaxis/rcs/ptr/PtrMqttClient.java | 60 +++++++++++++++++++--- .../java/com/yvan/logisticsEnv/EnvPayload.java | 1 - .../java/com/yvan/logisticsEnv/EnvStartParam.java | 2 + .../com/yvan/logisticsModel/LogisticsRuntime.java | 2 +- 6 files changed, 63 insertions(+), 14 deletions(-) diff --git a/servo/src/main/java/com/galaxis/rcs/RCSService.java b/servo/src/main/java/com/galaxis/rcs/RCSService.java index c66103a..8ae56f4 100644 --- a/servo/src/main/java/com/galaxis/rcs/RCSService.java +++ b/servo/src/main/java/com/galaxis/rcs/RCSService.java @@ -28,6 +28,8 @@ import org.clever.data.jdbc.DaoFactory; import org.clever.data.jdbc.QueryDSL; import org.clever.data.jdbc.querydsl.utils.QueryDslUtils; +import java.lang.management.ManagementFactory; +import java.net.InetAddress; import java.util.Map; import static com.galaxis.rcs.common.query.QLccEnvInfo.lccEnvInfo; @@ -52,6 +54,7 @@ public class RCSService { * 加载所有自动启动的环境和楼层数据 * 并启动对应的环境 */ + @SneakyThrows private void createAutoStartEnv() { QueryDSL queryDSL = DaoFactory.getQueryDSL(); var list = queryDSL.select(QueryDslUtils.linkedMap( @@ -98,6 +101,7 @@ public class RCSService { param.setTimeRate(1); param.setVirtual(false); param.setEnvPayload(envPayload); + param.setClientId(InetAddress.getLocalHost().getHostName()); runtime.start(param); } diff --git a/servo/src/main/java/com/galaxis/rcs/ptr/AmrMessageHandler.java b/servo/src/main/java/com/galaxis/rcs/ptr/AmrMessageHandler.java index 5e24bee..8d3842b 100644 --- a/servo/src/main/java/com/galaxis/rcs/ptr/AmrMessageHandler.java +++ b/servo/src/main/java/com/galaxis/rcs/ptr/AmrMessageHandler.java @@ -68,7 +68,7 @@ public class AmrMessageHandler { private static final Redis redis = AppContextHolder.getBean("defaultRedis", Redis.class, true); public final LogisticsRuntime runtime; - public volatile PtrMqttClient ptrMqttClient; + private volatile PtrMqttClient ptrMqttClient; public int getNewSeqNo() { String redisKey = "lcc:rcs:" + runtime.projectUUID + ":" + runtime.envId + ":seqNo"; @@ -84,8 +84,8 @@ public class AmrMessageHandler { this.runtime = runtime; } - public void start(EnvPayload.MqttConfig mqttConfig) { - this.ptrMqttClient = new PtrMqttClient(this, mqttConfig); + public void start(EnvPayload.MqttConfig mqttConfig, String clientId) { + this.ptrMqttClient = new PtrMqttClient(this, mqttConfig, clientId); } public void stop() { @@ -209,7 +209,7 @@ public class AmrMessageHandler { BannerUtils.printConfig(log, "MQTT 发送报文 [" + id + "] RcsMessageType." + RcsMessageType.fromValue(id) + " - " + RcsMessageType.fromValue(id).description, ar); } - ptrMqttClient.client.publish(topic, payload.getBytes(StandardCharsets.UTF_8), 0, false); + ptrMqttClient.publish(topic, payload); } /** diff --git a/servo/src/main/java/com/galaxis/rcs/ptr/PtrMqttClient.java b/servo/src/main/java/com/galaxis/rcs/ptr/PtrMqttClient.java index cea8883..ed49e01 100644 --- a/servo/src/main/java/com/galaxis/rcs/ptr/PtrMqttClient.java +++ b/servo/src/main/java/com/galaxis/rcs/ptr/PtrMqttClient.java @@ -12,14 +12,16 @@ import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import java.nio.charset.StandardCharsets; + @Slf4j public class PtrMqttClient implements MqttCallback { public final AmrMessageHandler amrMessageHandler; public final EnvPayload.MqttConfig mqttConfig; - public final MqttClient client; + private final MqttClient client; @SneakyThrows - public PtrMqttClient(AmrMessageHandler handler, EnvPayload.MqttConfig mqttConfig) { + public PtrMqttClient(AmrMessageHandler handler, EnvPayload.MqttConfig mqttConfig, String clientId) { this.amrMessageHandler = handler; this.mqttConfig = mqttConfig; @@ -27,17 +29,56 @@ public class PtrMqttClient implements MqttCallback { String username = mqttConfig.getUsername(); String password = mqttConfig.getPassword(); - client = new MqttClient(brokerUrl, mqttConfig.getClientId()); + client = new MqttClient(brokerUrl, clientId); + MqttConnectionOptions options = new MqttConnectionOptions(); + options.setServerURIs(new String[]{brokerUrl}); + options.setAutomaticReconnect(true); + options.setUserName(username); + options.setPassword(password.getBytes()); + options.setConnectionTimeout(10); + options.setKeepAliveInterval(20); + client.setCallback(this); + client.connect(options); + client.subscribe("/agv_robot/status", 0); + } + + @SneakyThrows + public void publish(String topic, String payload) { + this.client.publish(topic, payload.getBytes(StandardCharsets.UTF_8), 0, false); + } + + @SneakyThrows + public static void main(String[] args) { + String brokerUrl = "tcp://10.10.203.239:1885"; + String clientId = "yvan-rcs-dev"; + String topic = "test/topic"; + String content = "Hello from Java"; + String username = "admin"; + String password = "admin"; + int qos = 0; + + MqttClient client = new MqttClient(brokerUrl, clientId); MqttConnectionOptions options = new MqttConnectionOptions(); options.setServerURIs(new String[]{brokerUrl}); - options.setAutomaticReconnect(true); // 启用自动重连 + options.setAutomaticReconnect(true); options.setUserName(username); options.setPassword(password.getBytes()); + options.setConnectionTimeout(10); + options.setKeepAliveInterval(20); + System.out.println("Connecting to broker..."); client.connect(options); - client.subscribe("/agv_robot/status", 0); + System.out.println("Connected"); + + System.out.println("Publishing message..."); + long start = System.currentTimeMillis(); + client.publish(topic, content.getBytes(), qos, false); + System.out.println("Published in " + (System.currentTimeMillis() - start) + " ms"); + + client.disconnect(); + System.out.println("Disconnected"); } @Override @@ -81,10 +122,13 @@ public class PtrMqttClient implements MqttCallback { @SneakyThrows public void stop() { - if (client.isConnected()) { + if (client != null && client.isConnected()) { client.disconnect(); + client.close(); + log.info("MQTT client disconnected and closed."); + } else { + log.warn("MQTT client is not connected, no action taken."); } - client.close(); - log.info("MQTT client stopped"); } + } diff --git a/servo/src/main/java/com/yvan/logisticsEnv/EnvPayload.java b/servo/src/main/java/com/yvan/logisticsEnv/EnvPayload.java index c5b8fea..2692634 100644 --- a/servo/src/main/java/com/yvan/logisticsEnv/EnvPayload.java +++ b/servo/src/main/java/com/yvan/logisticsEnv/EnvPayload.java @@ -12,7 +12,6 @@ public class EnvPayload implements Serializable { @Data public static class MqttConfig { private String brokerUrl; - private String clientId; private String username; private String password; } diff --git a/servo/src/main/java/com/yvan/logisticsEnv/EnvStartParam.java b/servo/src/main/java/com/yvan/logisticsEnv/EnvStartParam.java index f984317..b9ed166 100644 --- a/servo/src/main/java/com/yvan/logisticsEnv/EnvStartParam.java +++ b/servo/src/main/java/com/yvan/logisticsEnv/EnvStartParam.java @@ -10,6 +10,8 @@ import lombok.Setter; @Setter public class EnvStartParam { + public String clientId; + /** * 是否虚拟仿真环境 */ diff --git a/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java b/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java index 7fdbee6..6694ba4 100644 --- a/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java +++ b/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java @@ -176,7 +176,7 @@ public class LogisticsRuntime { this.logisticsEnv.start(param); // 启动 MQTT 监听 - this.amrMessageHandler.start(param.getEnvPayload().getMqtt()); + this.amrMessageHandler.start(param.getEnvPayload().getMqtt(), param.clientId); // 开启所有机器人的任务处理 Set executorTypes = Sets.newHashSet();