From dd4e27d0d0e2db1ca316d021b8b475c9bb6a1db3 Mon Sep 17 00:00:00 2001 From: luoyifan Date: Fri, 27 Jun 2025 11:34:34 +0800 Subject: [PATCH] =?UTF-8?q?clientForSend=20=E5=92=8C=20subscribe=20?= =?UTF-8?q?=E6=98=AF=E4=B8=A4=E5=A5=97=20MQTT?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/galaxis/rcs/ptr/AmrMessageHandler.java | 5 +- .../java/com/galaxis/rcs/ptr/PtrMqttClient.java | 76 +++++++++++----------- 2 files changed, 42 insertions(+), 39 deletions(-) diff --git a/servo/src/main/java/com/galaxis/rcs/ptr/AmrMessageHandler.java b/servo/src/main/java/com/galaxis/rcs/ptr/AmrMessageHandler.java index 8d3842b..2777562 100644 --- a/servo/src/main/java/com/galaxis/rcs/ptr/AmrMessageHandler.java +++ b/servo/src/main/java/com/galaxis/rcs/ptr/AmrMessageHandler.java @@ -19,6 +19,7 @@ import org.clever.core.BannerUtils; import org.clever.core.json.JsonWrapper; import org.clever.core.mapper.BeanCopyUtils; import org.clever.data.redis.Redis; +import org.clever.data.redis.RedisAdmin; import org.eclipse.paho.mqttv5.client.MqttClient; import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttMessage; @@ -65,7 +66,7 @@ public class AmrMessageHandler { private static final TypeReference> typeRef20250Message = new TypeReference>() { }; - private static final Redis redis = AppContextHolder.getBean("defaultRedis", Redis.class, true); + private static final Redis redis = RedisAdmin.getRedis(); public final LogisticsRuntime runtime; private volatile PtrMqttClient ptrMqttClient; @@ -162,7 +163,7 @@ public class AmrMessageHandler { break; case AMR_HEARTBEAT: AmrMessage heartbeatMessage = JacksonUtils.parse(json, typeRef20100Message); - sendCmdHeartBeat(heartbeatMessage.content.VehicleId + ""); + // sendCmdHeartBeat(heartbeatMessage.content.VehicleId + ""); handleHeartbeatMessage(agvItem, heartbeatMessage.content); break; case AMR_BOOT: diff --git a/servo/src/main/java/com/galaxis/rcs/ptr/PtrMqttClient.java b/servo/src/main/java/com/galaxis/rcs/ptr/PtrMqttClient.java index 084def4..bf43cb4 100644 --- a/servo/src/main/java/com/galaxis/rcs/ptr/PtrMqttClient.java +++ b/servo/src/main/java/com/galaxis/rcs/ptr/PtrMqttClient.java @@ -17,77 +17,74 @@ import java.util.concurrent.CountDownLatch; public class PtrMqttClient implements MqttCallback { public final AmrMessageHandler amrMessageHandler; public final EnvPayload.MqttConfig mqttConfig; + private final MqttClient clientForSend; private final MqttClient client; + private final String clientId; private CountDownLatch connectLatch = new CountDownLatch(1); + @SneakyThrows public PtrMqttClient(AmrMessageHandler handler, EnvPayload.MqttConfig mqttConfig, String clientId) { this.amrMessageHandler = handler; this.mqttConfig = mqttConfig; + this.clientId = clientId; String brokerUrl = mqttConfig.getBrokerUrl(); // "tcp://10.10.203.239:1885" String username = mqttConfig.getUsername(); // admin String password = mqttConfig.getPassword(); // admin + clientForSend = new MqttClient(brokerUrl, clientId + "_send"); // String clientId = "LUOYIFAN-PC_send" client = new MqttClient(brokerUrl, clientId); // String clientId = "LUOYIFAN-PC" MqttConnectionOptions options = new MqttConnectionOptions(); options.setServerURIs(new String[]{brokerUrl}); options.setAutomaticReconnect(true); options.setUserName(username); options.setPassword(password.getBytes()); - options.setConnectionTimeout(10); + options.setConnectionTimeout(1); options.setKeepAliveInterval(20); + options.setExecutorServiceTimeout(1); client.setCallback(this); + client.connect(options); - BannerUtils.printConfig(log, "MQTT 开启监听", new String[]{ + clientForSend.connect(options); + connectLatch.await(); + + BannerUtils.printConfig(log, "MQTT 开启监听成功", new String[]{ "brokerUrl: " + brokerUrl, "userName: " + username, "clientId: " + clientId, "topic: /agv_robot/status"}); - client.connect(options); - connectLatch.await(); client.subscribe("/agv_robot/status", 0); } @SneakyThrows - public void publish(String topic, String payload) { - // 这个一发送就阻塞 - this.client.publish(topic, payload.getBytes(StandardCharsets.UTF_8), 0, false); + public void publish(String topic, String payloadString) { + MqttMessage message = new MqttMessage(payloadString.getBytes(StandardCharsets.UTF_8)); + message.setQos(0); + message.setRetained(false); + + if (this.clientForSend.isConnected()) { + this.clientForSend.publish(topic, message); + } else { + throw new RuntimeException("MQTT client is not connected, cannot publish message."); + } + log.info("Message published to topic {}: finish", topic); } @SneakyThrows public static void main(String[] args) { - // 这个测试是好的 - String brokerUrl = "tcp://10.10.203.239:1885"; - String clientId = InetAddress.getLocalHost().getHostName(); // LUOYIFAN-PC - String topic = "/agv_robot/status"; - String content = "Hello from Java"; - String username = "admin"; - String password = "admin"; - int qos = 0; - - MqttClient client = new MqttClient(brokerUrl, clientId); - MqttConnectionOptions options = new MqttConnectionOptions(); - options.setServerURIs(new String[]{brokerUrl}); - options.setAutomaticReconnect(true); - options.setUserName(username); - options.setPassword(password.getBytes()); - options.setConnectionTimeout(10); - options.setKeepAliveInterval(20); - - System.out.println("Connecting to broker..."); - client.connect(options); - System.out.println("Connected"); - - System.out.println("Publishing message..."); - long start = System.currentTimeMillis(); - client.publish(topic, content.getBytes(), qos, false); // 这个发送是好的 - System.out.println("Published in " + (System.currentTimeMillis() - start) + " ms"); - - client.disconnect(); - System.out.println("Disconnected"); + var config = new EnvPayload.MqttConfig(); + config.setBrokerUrl("tcp://10.10.203.239:1885"); + config.setUsername("admin"); + config.setPassword("admin"); + + PtrMqttClient mqttClient = new PtrMqttClient(null, config, "LUOYIFAN-PC"); + for (int i = 0; i < 100; i++) { + mqttClient.publish("/agv_robot/status", "Hello from Java " + i); + } + log.info("Message published successfully."); } @Override @@ -105,7 +102,11 @@ public class PtrMqttClient implements MqttCallback { switch (topic) { case "/agv_robot/status": try { - amrMessageHandler.handleAgvRobotStatusMessage(message); + if (this.amrMessageHandler == null) { + log.info("amrMessageHandler is null, skipping message handling {}", new String(message.getPayload(), StandardCharsets.UTF_8)); + return; + } + this.amrMessageHandler.handleAgvRobotStatusMessage(message); } catch (Exception e) { log.error("amrMessageHandler.handleAgvRobotStatusMessage 异常", e); @@ -118,6 +119,7 @@ public class PtrMqttClient implements MqttCallback { @Override public void deliveryComplete(IMqttToken token) { + log.info("Message delivery complete: {}", token); } @Override