diff --git a/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java b/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java index 6dc2a5b..a621959 100644 --- a/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java +++ b/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java @@ -123,8 +123,6 @@ public class LogisticsRuntime { public final AmrMessageHandler amrMessageHandler = new AmrMessageHandler(this); - public final FrontendMessagePushService frontendMessagePushService = new FrontendMessagePushService(this); - public final LccRedisService lccRedisService = new LccRedisService(this); public final AgvEventManager eventManager = new AgvEventManager(); @@ -168,25 +166,25 @@ public class LogisticsRuntime { case ONLINE: // AGV上线 sender.setIsOnline(true); - this.frontendMessagePushService.pushDeviceAlive(sender.id, sender.getT(), true); + FrontendMessagePushService.INSTANCE.pushDeviceAlive(this, sender.id, sender.getT(), true); break; case OFFLINE: // AGV下线 sender.setIsOnline(false); - this.frontendMessagePushService.pushDeviceAlive(sender.id, sender.getT(), true); + FrontendMessagePushService.INSTANCE.pushDeviceAlive(this, sender.id, sender.getT(), true); break; case BLOCKED: // AGV上线 sender.setIsBlocked(true); - this.frontendMessagePushService.pushDeviceAlive(sender.id, sender.getT(), true); + FrontendMessagePushService.INSTANCE.pushDeviceAlive(this, sender.id, sender.getT(), true); break; case BLOCKED_RECOVER: // AGV上线 sender.setIsBlocked(false); - this.frontendMessagePushService.pushDeviceAlive(sender.id, sender.getT(), true); + FrontendMessagePushService.INSTANCE.pushDeviceAlive(this, sender.id, sender.getT(), true); break; case FREE: @@ -205,7 +203,7 @@ public class LogisticsRuntime { // 设备状态变化 if (sender instanceof PtrAgvItem) { var ptr = (PtrAgvItem) sender; - this.frontendMessagePushService.pushDeviceStatus(sender.id, ptr.getState()); + FrontendMessagePushService.INSTANCE.pushDeviceStatus(this, sender.id, ptr.getState()); } else { log.error("AGV事件类型 {} 仅支持 PtrAgvItem 类型的执行器", type); @@ -226,7 +224,7 @@ public class LogisticsRuntime { InvManager.invSave(this.envId, taskSequence.bizTask.getBizTaskId(), lpn, taskSequence.executorVo.getLocCode(), -taskSequence.carryQty); InvManager.invSave(this.envId, taskSequence.bizTask.getBizTaskId(), lpn, taskSequence.unloadBasLocationVo.getLocCode(), taskSequence.carryQty); }); - this.frontendMessagePushService.pushInvUpdate(lpn, taskSequence.executorVo, taskSequence.unloadBasLocationVo, taskSequence.carryQty); + FrontendMessagePushService.INSTANCE.pushInvUpdate(this, lpn, taskSequence.executorVo, taskSequence.unloadBasLocationVo, taskSequence.carryQty); } /** @@ -238,7 +236,7 @@ public class LogisticsRuntime { InvManager.invSave(this.envId, taskSequence.bizTask.getBizTaskId(), lpn, taskSequence.loadBasLocationVo.getLocCode(), -taskSequence.carryQty); InvManager.invSave(this.envId, taskSequence.bizTask.getBizTaskId(), lpn, taskSequence.executorVo.getLocCode(), taskSequence.carryQty); }); - this.frontendMessagePushService.pushInvUpdate(lpn, taskSequence.loadBasLocationVo, taskSequence.executorVo, taskSequence.carryQty); + FrontendMessagePushService.INSTANCE.pushInvUpdate(this, lpn, taskSequence.loadBasLocationVo, taskSequence.executorVo, taskSequence.carryQty); } /** @@ -359,10 +357,6 @@ public class LogisticsRuntime { this.amrMessageHandler.start(this.env.getEnvConfig().getMqtt(), this.serverId); this.lccRedisService.start(this.env.getEnvConfig().getRedis(), this.serverId); - var lccMapService = SpringContext.HOLDER.getBean(LccMapService.class); - LccConfigProperties lccConfigProperties = lccMapService.config; - this.frontendMessagePushService.start(lccConfigProperties.getFrontendMqtt(), this.serverId + "_lcc_send"); - // 开启所有机器人的任务处理 Set executorTypes = Sets.newHashSet(); for (ExecutorItem executorItem : executorItemMap.values()) { @@ -382,7 +376,7 @@ public class LogisticsRuntime { this.taskDispatchFactory.startPolling(); // 推送服务状态 - this.frontendMessagePushService.pushServerState(this.getState()); + FrontendMessagePushService.INSTANCE.pushServerState(this, this.getState()); } public boolean isRunning() { @@ -435,8 +429,7 @@ public class LogisticsRuntime { this.amrMessageHandler.stop(); this.lccRedisService.stop(); - this.frontendMessagePushService.pushServerState(this.getState()); - this.frontendMessagePushService.stop(); + FrontendMessagePushService.INSTANCE.pushServerState(this, this.getState()); BannerUtils.printConfig(log, "LogisticsRuntime stop.", new String[]{ "projectUUID: " + this.projectUuid, diff --git a/servo/src/main/java/com/yvan/pusher/FrontendMessagePushService.java b/servo/src/main/java/com/yvan/pusher/FrontendMessagePushService.java index be942ff..376b3ec 100644 --- a/servo/src/main/java/com/yvan/pusher/FrontendMessagePushService.java +++ b/servo/src/main/java/com/yvan/pusher/FrontendMessagePushService.java @@ -27,10 +27,11 @@ import java.util.concurrent.locks.ReentrantLock; @Slf4j public class FrontendMessagePushService implements MqttCallback { + public static final FrontendMessagePushService INSTANCE = new FrontendMessagePushService(); + private volatile LccConfigProperties.FrontendMqtt mqttConfig; private volatile String clientId; private volatile MqttClient mqttClient; - private final LogisticsRuntime runtime; private final MemoryPersistence persistence = new MemoryPersistence(); private final Lock connectionLock = new ReentrantLock(); @@ -46,8 +47,7 @@ public class FrontendMessagePushService implements MqttCallback { @Getter private volatile boolean connected = false; - public FrontendMessagePushService(LogisticsRuntime runtime) { - this.runtime = runtime; + private FrontendMessagePushService() { } /** @@ -64,7 +64,7 @@ public class FrontendMessagePushService implements MqttCallback { return; } - log.info("Starting FRONTEND_MQTT service for project: {}, env: {}", this.runtime.projectUuid, this.runtime.envId); + log.info("Starting FRONTEND_MQTT service for clientId: {}", this.clientId); // 创建MQTT客户端 mqttClient = new MqttClient(mqttConfig.getBrokerUrl(), clientId, persistence); @@ -139,10 +139,11 @@ public class FrontendMessagePushService implements MqttCallback { * 推送服务器状态 * /lcc/{proj_id}/{env_id}/server * + * @param runtime 运行时环境 * @param statusData 状态数据 */ - public void pushServerState(ServerStatusVo statusData) { - String topic = buildTopic("server"); + public void pushServerState(LogisticsRuntime runtime, ServerStatusVo statusData) { + String topic = buildTopic(runtime, "server"); publishJson(topic, statusData); } @@ -150,10 +151,11 @@ public class FrontendMessagePushService implements MqttCallback { * 推送客户端状态 * /lcc/{proj_id}/{env_id}/client * + * @param runtime 运行时环境 * @param clientData 客户端数据 */ - public void pushClientState(Map clientData) { - String topic = buildTopic("client"); + public void pushClientState(LogisticsRuntime runtime, Map clientData) { + String topic = buildTopic(runtime, "client"); publishJson(topic, clientData); } @@ -161,10 +163,11 @@ public class FrontendMessagePushService implements MqttCallback { * 推送任务更新 * /lcc/{proj_id}/{env_id}/task * + * @param runtime 运行时环境 * @param taskData 任务数据 */ - public void pushTaskUpdate(Object taskData) { - String topic = buildTopic("task"); + public void pushTaskUpdate(LogisticsRuntime runtime, Object taskData) { + String topic = buildTopic(runtime, "task"); publishJson(topic, taskData); } @@ -172,13 +175,14 @@ public class FrontendMessagePushService implements MqttCallback { * 推送库存更新 * /lcc/{proj_id}/{env_id}/inv * + * @param runtime 运行时环境 * @param lpn 容器号 * @param before 更新前库存 * @param after 更新后库存 * @param qty 更新数量 */ - public void pushInvUpdate(String lpn, BasLocationVo before, BasLocationVo after, int qty) { - String topic = buildTopic("inv"); + public void pushInvUpdate(LogisticsRuntime runtime, String lpn, BasLocationVo before, BasLocationVo after, int qty) { + String topic = buildTopic(runtime, "inv"); publishJson(topic, new InvUpdateVo( lpn, @@ -192,23 +196,25 @@ public class FrontendMessagePushService implements MqttCallback { * 推送设备状态 * /lcc/{proj_id}/{env_id}/device/{id}/status * + * @param runtime 运行时环境 * @param deviceId 设备ID * @param statusData 状态数据 */ - public void pushDeviceStatus(String deviceId, AgvStatusVo statusData) { - String topic = buildTopic("device/" + deviceId + "/status"); + public void pushDeviceStatus(LogisticsRuntime runtime, String deviceId, AgvStatusVo statusData) { + String topic = buildTopic(runtime, "device/" + deviceId + "/status"); publishJson(topic, statusData); } /** * 推送设备存活状态 * + * @param runtime 运行时环境 * @param deviceId 设备ID * @param type 设备类型 * @param online 是否在线 */ - public void pushDeviceAlive(String deviceId, String type, boolean online) { - String topic = buildTopic("device/" + deviceId + "/alive"); + public void pushDeviceAlive(LogisticsRuntime runtime, String deviceId, String type, boolean online) { + String topic = buildTopic(runtime, "device/" + deviceId + "/alive"); Map data = new HashMap<>(); data.put("id", deviceId); @@ -222,11 +228,12 @@ public class FrontendMessagePushService implements MqttCallback { * 推送日志 * /lcc/{proj_id}/{env_id}/log/{type} * + * @param runtime 运行时环境 * @param logType 日志类型 * @param logData 日志数据 */ - public void pushLogs(String logType, Object logData) { - String topic = buildTopic("log/" + logType); + public void pushLogs(LogisticsRuntime runtime, String logType, Object logData) { + String topic = buildTopic(runtime, "log/" + logType); publishJson(topic, logData); } @@ -234,10 +241,11 @@ public class FrontendMessagePushService implements MqttCallback { * 推送告警 * /lcc/{proj_id}/{env_id}/alarm * + * @param runtime 运行时环境 * @param alarmData 告警数据 */ - public void pushAlarm(Object alarmData) { - String topic = buildTopic("alarm"); + public void pushAlarm(LogisticsRuntime runtime, Object alarmData) { + String topic = buildTopic(runtime, "alarm"); publishJson(topic, alarmData); } @@ -245,11 +253,12 @@ public class FrontendMessagePushService implements MqttCallback { * 推送脚本更新 * /lcc/{proj_id}/script * + * @param runtime 运行时环境 * @param scriptData 脚本数据 */ - public void pushScriptUpdate(Object scriptData) { + public void pushScriptUpdate(LogisticsRuntime runtime, Object scriptData) { // 脚本系统没有环境ID - String topic = "/lcc/" + this.runtime.projectUuid + "/script"; + String topic = "/lcc/" + runtime.projectUuid + "/script"; publishJson(topic, scriptData); } @@ -258,8 +267,8 @@ public class FrontendMessagePushService implements MqttCallback { /** * 构建主题路径 */ - private String buildTopic(String suffix) { - return "/lcc/" + this.runtime.projectUuid + "/" + this.runtime.envId + "/" + suffix; + private String buildTopic(LogisticsRuntime runtime, String suffix) { + return "/lcc/" + runtime.projectUuid + "/" + runtime.envId + "/" + suffix; } /** diff --git a/servo/src/main/java/com/yvan/workbench/controller/InvController.java b/servo/src/main/java/com/yvan/workbench/controller/InvController.java index 1da1fa0..12c1cd8 100644 --- a/servo/src/main/java/com/yvan/workbench/controller/InvController.java +++ b/servo/src/main/java/com/yvan/workbench/controller/InvController.java @@ -7,6 +7,7 @@ import com.google.common.base.Strings; import com.yvan.entity.BasLocationVo; import com.yvan.logisticsModel.LogisticsRuntime; import com.yvan.logisticsModel.LogisticsRuntimeService; +import com.yvan.pusher.FrontendMessagePushService; import org.clever.core.Conv; import org.clever.core.model.response.R; import org.clever.data.jdbc.DaoFactory; @@ -95,7 +96,7 @@ public class InvController { // 推送前端更新 LogisticsRuntime runtime = LogisticsRuntimeService.INSTANCE.getByProjectEnv(projectUuid, envId); if (runtime != null) { - runtime.frontendMessagePushService.pushInvUpdate( + FrontendMessagePushService.INSTANCE.pushInvUpdate(runtime, lpn, null, new BasLocationVo(basLocation), qty ); } @@ -151,7 +152,7 @@ public class InvController { // 推送前端更新 LogisticsRuntime runtime = LogisticsRuntimeService.INSTANCE.getByProjectEnv(projectUuid, envId); if (runtime != null) { - runtime.frontendMessagePushService.pushInvUpdate( + FrontendMessagePushService.INSTANCE.pushInvUpdate(runtime, lpn, new BasLocationVo(basLocation), null, -inv.getQty() ); } @@ -231,7 +232,7 @@ public class InvController { // 推送前端更新 LogisticsRuntime runtime = LogisticsRuntimeService.INSTANCE.getByProjectEnv(projectUuid, envId); if (runtime != null) { - runtime.frontendMessagePushService.pushInvUpdate( + FrontendMessagePushService.INSTANCE.pushInvUpdate(runtime, lpn, new BasLocationVo(sourceLocation), new BasLocationVo(basLocation), inv.getQty() ); } diff --git a/servo/src/main/java/com/yvan/workbench/service/LccServerInfoConnect.java b/servo/src/main/java/com/yvan/workbench/service/LccServerInfoConnect.java index 59a3773..3cb9c9b 100644 --- a/servo/src/main/java/com/yvan/workbench/service/LccServerInfoConnect.java +++ b/servo/src/main/java/com/yvan/workbench/service/LccServerInfoConnect.java @@ -1,6 +1,10 @@ package com.yvan.workbench.service; import com.yvan.logisticsModel.SystemMetricsStore; +import com.yvan.pusher.FrontendMessagePushService; +import com.yvan.workbench.SpringContext; +import com.yvan.workbench.autoconfigure.LccConfigProperties; +import lombok.SneakyThrows; import oshi.SystemInfo; import oshi.hardware.CentralProcessor; import oshi.hardware.GlobalMemory; @@ -12,6 +16,7 @@ import oshi.software.os.OperatingSystem; import org.springframework.context.SmartLifecycle; import org.springframework.stereotype.Component; +import java.net.InetAddress; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -26,12 +31,18 @@ public class LccServerInfoConnect implements SmartLifecycle { private long[] lastTicks = null; private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + @SneakyThrows @Override public void start() { if (running) return; running = true; scheduler.scheduleAtFixedRate(this::updateAllMetrics, 0, 30, TimeUnit.SECONDS); + + var lccMapService = SpringContext.HOLDER.getBean(LccMapService.class); + LccConfigProperties lccConfigProperties = lccMapService.config; + + FrontendMessagePushService.INSTANCE.start(lccConfigProperties.getFrontendMqtt(), InetAddress.getLocalHost().getHostName() + "_lcc_send"); } @Override @@ -40,6 +51,7 @@ public class LccServerInfoConnect implements SmartLifecycle { if (!scheduler.isShutdown()) { scheduler.shutdownNow(); } + FrontendMessagePushService.INSTANCE.stop(); } @Override