diff --git a/servo/src/main/java/com/galaxis/rcs/ptr/PtrMqttClient.java b/servo/src/main/java/com/galaxis/rcs/ptr/PtrMqttClient.java index 0c24f48..37bf0a6 100644 --- a/servo/src/main/java/com/galaxis/rcs/ptr/PtrMqttClient.java +++ b/servo/src/main/java/com/galaxis/rcs/ptr/PtrMqttClient.java @@ -141,6 +141,14 @@ public class PtrMqttClient implements MqttCallback { } else { log.warn("MQTT client is not connected, no action taken."); } + + if(clientForSend != null && clientForSend.isConnected()) { + clientForSend.disconnect(); + clientForSend.close(); + log.info("MQTT clientForSend disconnected and closed."); + } else { + log.warn("MQTT clientForSend is not connected, no action taken."); + } } } diff --git a/servo/src/main/java/com/yvan/logisticsModel/FrontendPushService.java b/servo/src/main/java/com/yvan/logisticsModel/FrontendPushService.java new file mode 100644 index 0000000..db467c0 --- /dev/null +++ b/servo/src/main/java/com/yvan/logisticsModel/FrontendPushService.java @@ -0,0 +1,150 @@ +package com.yvan.logisticsModel; + +import com.yvan.logisticsEnv.EnvConfig; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.mqttv5.client.*; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; + +@Slf4j +public class FrontendPushService implements MqttCallback { + private final LogisticsRuntime runtime; + public final String rcsFrontendTopic; + private volatile MqttClient clientForSend; + private volatile MqttClient client; + private CountDownLatch connectLatch = new CountDownLatch(1); + + public FrontendPushService(LogisticsRuntime runtime) { + this.runtime = runtime; + this.rcsFrontendTopic = "/rcs/" + runtime.projectUUID + "/" + runtime.envId + "/frontend"; + } + + @Override + public void disconnected(MqttDisconnectResponse disconnectResponse) { + log.info("mqtt disconnected"); + } + + @Override + public void mqttErrorOccurred(MqttException exception) { + log.error("mqtt error occurred"); + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + log.info("Message arrived on topic {}: {}", topic, new String(message.getPayload(), StandardCharsets.UTF_8)); + } + + @Override + public void deliveryComplete(IMqttToken token) { + log.info("Message delivery complete: {}", token); + } + + @Override + public void connectComplete(boolean reconnect, String serverURI) { + // BannerUtils.printConfig(log, "MQTT 开启监听", new String[]{serverURI + " topic: /agv_robot/status"}); + log.info("MQTT client connected to server: {}", serverURI); + connectLatch.countDown(); // 放行 + } + + @Override + public void authPacketArrived(int reasonCode, MqttProperties properties) { + } + + @SneakyThrows + public void stop() { + if (client != null && client.isConnected()) { + client.disconnect(); + client.close(); + log.info("MQTT client disconnected and closed."); + } else { + log.warn("MQTT client is not connected, no action taken."); + } + + if (clientForSend != null && clientForSend.isConnected()) { + clientForSend.disconnect(); + clientForSend.close(); + log.info("MQTT clientForSend disconnected and closed."); + } else { + log.warn("MQTT clientForSend is not connected, no action taken."); + } + } + + /** + * 创建一个表示服务器离线的最后消息 + */ + public MqttMessage createWillMessage() { + MqttMessage lastMessage = new MqttMessage(); + String msg = String.format(""" + { + "id": "serverOffline", + "content": { + "project": "%s", + "env": %s, + "status": "offline", + "server": "%s", + } + } + """, runtime.projectUUID, runtime.envId, runtime.clientId); + lastMessage.setPayload(msg.getBytes(StandardCharsets.UTF_8)); + lastMessage.setQos(0); + lastMessage.setRetained(true); + return lastMessage; + } + + @SneakyThrows + public void start(EnvConfig.MqttConfig mqttConfig) { + String brokerUrl = mqttConfig.getBrokerUrl(); // "tcp://10.10.203.239:1885" + String username = mqttConfig.getUsername(); // admin + String password = mqttConfig.getPassword(); // admin + + clientForSend = new MqttClient(brokerUrl, runtime.clientId + "_front_send"); // String clientId = "LUOYIFAN-PC_send" + client = new MqttClient(brokerUrl, runtime.clientId + "_front"); // String clientId = "LUOYIFAN-PC" + MqttConnectionOptions options = new MqttConnectionOptions(); + options.setServerURIs(new String[]{brokerUrl}); + options.setAutomaticReconnect(true); + options.setUserName(username); + options.setPassword(password.getBytes()); + options.setConnectionTimeout(1); + options.setKeepAliveInterval(20); + options.setExecutorServiceTimeout(1); + // 最后一条消息,表示服务离线 + options.setWill(this.rcsFrontendTopic, createWillMessage()); + + client.setCallback(this); + client.connect(options); + + clientForSend.connect(options); + connectLatch.await(); + + // 发送一条消息代表上线 + this.publish(String.format(""" + { + "id": "serverOnline", + "content": { + "project": "%s", + "env": %s, + "status": "online", + "server": "%s" + } + } + """, runtime.projectUUID, runtime.envId, runtime.clientId)); + } + + @SneakyThrows + void publish(String payloadString) { + MqttMessage message = new MqttMessage(payloadString.getBytes(StandardCharsets.UTF_8)); + message.setQos(0); + message.setRetained(false); + + if (this.clientForSend.isConnected()) { + this.clientForSend.publish(this.rcsFrontendTopic, message); + } else { + throw new RuntimeException("MQTT client is not connected"); + } + } +} diff --git a/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java b/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java index 074ff9e..bcd50c1 100644 --- a/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java +++ b/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java @@ -1,10 +1,8 @@ package com.yvan.logisticsModel; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.galaxis.rcs.common.enums.LCCDirection; import com.galaxis.rcs.connector.cl2.Cl2Item; import com.galaxis.rcs.plan.path.NavigationGraph; -import com.galaxis.rcs.plan.path.PathUtils; import com.galaxis.rcs.plan.path.PtrPathPlanner; import com.galaxis.rcs.ptr.AgvEventManager; import com.galaxis.rcs.ptr.AmrMessageHandler; @@ -19,18 +17,12 @@ import com.yvan.workbench.model.entity.LccProjectEnv; import lombok.extern.slf4j.Slf4j; import org.clever.core.BannerUtils; import org.clever.core.Conv; -import org.clever.core.json.JsonWrapper; -import org.clever.data.jdbc.DaoFactory; -import org.clever.data.jdbc.QueryDSL; -import org.clever.data.jdbc.querydsl.utils.QueryDslUtils; import java.util.Date; import java.util.List; import java.util.Map; import java.util.Set; -import static com.galaxis.rcs.common.query.QLccBasExecutor.lccBasExecutor; - /** * 物流上下文运行时 */ @@ -117,6 +109,8 @@ public class LogisticsRuntime { public final AgvEventManager agvEventManager = new AgvEventManager(this); + public final FrontendPushService frontendPushService = new FrontendPushService(this); + public LogisticsRuntime(LccProject project, LccProjectEnv env, String clientId) { this.project = project; this.env = env; @@ -241,6 +235,7 @@ public class LogisticsRuntime { // 启动 MQTT 监听 this.amrMessageHandler.start(this.env.getEnvConfig().getMqtt(), this.clientId); + this.frontendPushService.start(this.env.getEnvConfig().getMqtt()); // 开启所有机器人的任务处理 Set executorTypes = Sets.newHashSet(); @@ -309,6 +304,7 @@ public class LogisticsRuntime { // 停止 MQTT 监听 this.amrMessageHandler.stop(); + this.frontendPushService.stop(); BannerUtils.printConfig(log, "LogisticsRuntime stop.", new String[]{ "projectUUID: " + this.projectUUID,