From 5e2dcbeb0816431871314d1d0635ea8bcedfe85c Mon Sep 17 00:00:00 2001 From: luoyifan Date: Wed, 2 Jul 2025 21:26:32 +0800 Subject: [PATCH] LccRedisService / LccMqttService --- servo/build.gradle.kts | 2 + .../com/galaxis/rcs/ptr/AmrMessageHandler.java | 4 +- .../main/java/com/galaxis/rcs/ptr/PtrAgvItem.java | 2 +- .../main/java/com/yvan/logisticsEnv/EnvConfig.java | 2 + .../yvan/logisticsModel/FrontendPushService.java | 150 --------- .../com/yvan/logisticsModel/LogisticsRuntime.java | 55 +++- .../java/com/yvan/logisticsModel/RuntimeState.java | 56 ++++ .../yvan/logisticsModel/SystemMetricsStore.java | 10 + .../main/java/com/yvan/mqtt/LccMqttService.java | 345 +++++++++++++++++++++ .../main/java/com/yvan/redis/LccRedisService.java | 238 ++++++++++++++ .../yvan/workbench/controller/EnvController.java | 41 --- .../yvan/workbench/controller/LccController.java | 93 +++++- .../yvan/workbench/controller/RcsController.java | 30 -- .../yvan/workbench/model/entity/LccProject.java | 1 + .../workbench/service/LccServerInfoConnect.java | 124 ++++++++ 15 files changed, 915 insertions(+), 238 deletions(-) delete mode 100644 servo/src/main/java/com/yvan/logisticsModel/FrontendPushService.java create mode 100644 servo/src/main/java/com/yvan/logisticsModel/RuntimeState.java create mode 100644 servo/src/main/java/com/yvan/logisticsModel/SystemMetricsStore.java create mode 100644 servo/src/main/java/com/yvan/mqtt/LccMqttService.java create mode 100644 servo/src/main/java/com/yvan/redis/LccRedisService.java create mode 100644 servo/src/main/java/com/yvan/workbench/service/LccServerInfoConnect.java diff --git a/servo/build.gradle.kts b/servo/build.gradle.kts index 4738706..67d7b82 100644 --- a/servo/build.gradle.kts +++ b/servo/build.gradle.kts @@ -19,6 +19,8 @@ dependencies { // api("org.clever:clever-task-ext") api("org.clever:clever-js-graaljs") api("org.eclipse.paho:org.eclipse.paho.mqttv5.client:1.2.5") + // https://mvnrepository.com/artifact/com.github.oshi/oshi-core + api("com.github.oshi:oshi-core:6.8.2") runtimeOnly("org.postgresql:postgresql") runtimeOnly("mysql:mysql-connector-java") diff --git a/servo/src/main/java/com/galaxis/rcs/ptr/AmrMessageHandler.java b/servo/src/main/java/com/galaxis/rcs/ptr/AmrMessageHandler.java index 4f0120c..2d2e240 100644 --- a/servo/src/main/java/com/galaxis/rcs/ptr/AmrMessageHandler.java +++ b/servo/src/main/java/com/galaxis/rcs/ptr/AmrMessageHandler.java @@ -68,7 +68,7 @@ public class AmrMessageHandler { private volatile PtrMqttClient ptrMqttClient; public int getNewSeqNo() { - String redisKey = "lcc:rcs:" + runtime.projectUUID + ":" + runtime.envId + ":seqNo"; + String redisKey = "lcc:rcs:" + runtime.projectUuid + ":" + runtime.envId + ":seqNo"; long seqNo = redis.vIncrement(redisKey); if (seqNo > Integer.MAX_VALUE) { redis.kDelete(redisKey); @@ -512,7 +512,7 @@ public class AmrMessageHandler { } private void updateRedisNetDelay(String agvId, long netDelay) { - String statusKey = "lcc:" + runtime.projectUUID + ":" + runtime.envId + ":rcs:id_" + agvId; + String statusKey = "lcc:" + runtime.projectUuid + ":" + runtime.envId + ":rcs:id_" + agvId; redis.hPut(statusKey, "NetDelay", String.valueOf(netDelay)); } diff --git a/servo/src/main/java/com/galaxis/rcs/ptr/PtrAgvItem.java b/servo/src/main/java/com/galaxis/rcs/ptr/PtrAgvItem.java index f65e718..cd16edb 100644 --- a/servo/src/main/java/com/galaxis/rcs/ptr/PtrAgvItem.java +++ b/servo/src/main/java/com/galaxis/rcs/ptr/PtrAgvItem.java @@ -863,6 +863,6 @@ public abstract class PtrAgvItem extends ExecutorItem { private String getRedisKey(String type) { return String.format("lcc:%s:%s:device:%s:%s", - runtime.projectUUID, runtime.envId, this.getId(), type); + runtime.projectUuid, runtime.envId, this.getId(), type); } } diff --git a/servo/src/main/java/com/yvan/logisticsEnv/EnvConfig.java b/servo/src/main/java/com/yvan/logisticsEnv/EnvConfig.java index 5e19ffa..59c2c61 100644 --- a/servo/src/main/java/com/yvan/logisticsEnv/EnvConfig.java +++ b/servo/src/main/java/com/yvan/logisticsEnv/EnvConfig.java @@ -9,6 +9,8 @@ public class EnvConfig implements Serializable { private MqttConfig mqtt; + private MqttConfig frontendMqtt; + private MysqlConfig mysql; private RedisConfig redis; diff --git a/servo/src/main/java/com/yvan/logisticsModel/FrontendPushService.java b/servo/src/main/java/com/yvan/logisticsModel/FrontendPushService.java deleted file mode 100644 index db467c0..0000000 --- a/servo/src/main/java/com/yvan/logisticsModel/FrontendPushService.java +++ /dev/null @@ -1,150 +0,0 @@ -package com.yvan.logisticsModel; - -import com.yvan.logisticsEnv.EnvConfig; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.mqttv5.client.*; -import org.eclipse.paho.mqttv5.common.MqttException; -import org.eclipse.paho.mqttv5.common.MqttMessage; -import org.eclipse.paho.mqttv5.common.packet.MqttProperties; - -import java.nio.charset.StandardCharsets; -import java.util.concurrent.CountDownLatch; - -@Slf4j -public class FrontendPushService implements MqttCallback { - private final LogisticsRuntime runtime; - public final String rcsFrontendTopic; - private volatile MqttClient clientForSend; - private volatile MqttClient client; - private CountDownLatch connectLatch = new CountDownLatch(1); - - public FrontendPushService(LogisticsRuntime runtime) { - this.runtime = runtime; - this.rcsFrontendTopic = "/rcs/" + runtime.projectUUID + "/" + runtime.envId + "/frontend"; - } - - @Override - public void disconnected(MqttDisconnectResponse disconnectResponse) { - log.info("mqtt disconnected"); - } - - @Override - public void mqttErrorOccurred(MqttException exception) { - log.error("mqtt error occurred"); - } - - @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { - log.info("Message arrived on topic {}: {}", topic, new String(message.getPayload(), StandardCharsets.UTF_8)); - } - - @Override - public void deliveryComplete(IMqttToken token) { - log.info("Message delivery complete: {}", token); - } - - @Override - public void connectComplete(boolean reconnect, String serverURI) { - // BannerUtils.printConfig(log, "MQTT 开启监听", new String[]{serverURI + " topic: /agv_robot/status"}); - log.info("MQTT client connected to server: {}", serverURI); - connectLatch.countDown(); // 放行 - } - - @Override - public void authPacketArrived(int reasonCode, MqttProperties properties) { - } - - @SneakyThrows - public void stop() { - if (client != null && client.isConnected()) { - client.disconnect(); - client.close(); - log.info("MQTT client disconnected and closed."); - } else { - log.warn("MQTT client is not connected, no action taken."); - } - - if (clientForSend != null && clientForSend.isConnected()) { - clientForSend.disconnect(); - clientForSend.close(); - log.info("MQTT clientForSend disconnected and closed."); - } else { - log.warn("MQTT clientForSend is not connected, no action taken."); - } - } - - /** - * 创建一个表示服务器离线的最后消息 - */ - public MqttMessage createWillMessage() { - MqttMessage lastMessage = new MqttMessage(); - String msg = String.format(""" - { - "id": "serverOffline", - "content": { - "project": "%s", - "env": %s, - "status": "offline", - "server": "%s", - } - } - """, runtime.projectUUID, runtime.envId, runtime.clientId); - lastMessage.setPayload(msg.getBytes(StandardCharsets.UTF_8)); - lastMessage.setQos(0); - lastMessage.setRetained(true); - return lastMessage; - } - - @SneakyThrows - public void start(EnvConfig.MqttConfig mqttConfig) { - String brokerUrl = mqttConfig.getBrokerUrl(); // "tcp://10.10.203.239:1885" - String username = mqttConfig.getUsername(); // admin - String password = mqttConfig.getPassword(); // admin - - clientForSend = new MqttClient(brokerUrl, runtime.clientId + "_front_send"); // String clientId = "LUOYIFAN-PC_send" - client = new MqttClient(brokerUrl, runtime.clientId + "_front"); // String clientId = "LUOYIFAN-PC" - MqttConnectionOptions options = new MqttConnectionOptions(); - options.setServerURIs(new String[]{brokerUrl}); - options.setAutomaticReconnect(true); - options.setUserName(username); - options.setPassword(password.getBytes()); - options.setConnectionTimeout(1); - options.setKeepAliveInterval(20); - options.setExecutorServiceTimeout(1); - // 最后一条消息,表示服务离线 - options.setWill(this.rcsFrontendTopic, createWillMessage()); - - client.setCallback(this); - client.connect(options); - - clientForSend.connect(options); - connectLatch.await(); - - // 发送一条消息代表上线 - this.publish(String.format(""" - { - "id": "serverOnline", - "content": { - "project": "%s", - "env": %s, - "status": "online", - "server": "%s" - } - } - """, runtime.projectUUID, runtime.envId, runtime.clientId)); - } - - @SneakyThrows - void publish(String payloadString) { - MqttMessage message = new MqttMessage(payloadString.getBytes(StandardCharsets.UTF_8)); - message.setQos(0); - message.setRetained(false); - - if (this.clientForSend.isConnected()) { - this.clientForSend.publish(this.rcsFrontendTopic, message); - } else { - throw new RuntimeException("MQTT client is not connected"); - } - } -} diff --git a/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java b/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java index bcd50c1..22b83a3 100644 --- a/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java +++ b/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java @@ -12,11 +12,15 @@ import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.yvan.mqtt.LccMqttService; +import com.yvan.redis.LccRedisService; import com.yvan.workbench.model.entity.LccProject; import com.yvan.workbench.model.entity.LccProjectEnv; import lombok.extern.slf4j.Slf4j; import org.clever.core.BannerUtils; import org.clever.core.Conv; +import org.clever.data.redis.Redis; +import org.clever.data.redis.RedisAdmin; import java.util.Date; import java.util.List; @@ -31,7 +35,7 @@ public class LogisticsRuntime { /** * 项目UUID */ - public final String projectUUID; + public final String projectUuid; /** * 环境ID */ @@ -52,9 +56,9 @@ public class LogisticsRuntime { public final boolean isVirtual; /** - * 是否为虚拟环境 + * 服务器名_PID */ - public final String clientId; + public final String serverId; /** * 是否正在运行 @@ -76,6 +80,8 @@ public class LogisticsRuntime { */ private float timeRate; + private final Redis redis = RedisAdmin.getRedis(); + /** * 任务服务 */ @@ -109,15 +115,17 @@ public class LogisticsRuntime { public final AgvEventManager agvEventManager = new AgvEventManager(this); - public final FrontendPushService frontendPushService = new FrontendPushService(this); + public final LccMqttService lccMqttService = new LccMqttService(this); - public LogisticsRuntime(LccProject project, LccProjectEnv env, String clientId) { + public final LccRedisService lccRedisService = new LccRedisService(this); + + public LogisticsRuntime(LccProject project, LccProjectEnv env, String serverId) { this.project = project; this.env = env; - this.projectUUID = project.getProjectUuid(); + this.projectUuid = project.getProjectUuid(); this.envId = env.getEnvId(); this.isVirtual = env.getIsVirtual(); - this.clientId = clientId; + this.serverId = serverId; } /** @@ -217,14 +225,14 @@ public class LogisticsRuntime { } BannerUtils.printConfig(log, "LogisticsRuntime Start", new String[]{ - "projectUUID: " + this.projectUUID, + "projectUUID: " + this.projectUuid, "envId: " + this.envId, "projectPath: " + this.project.getProjectFileLocation(), "envPath: " + this.env.getFileLocation(), "virtual: " + this.isVirtual, "floor: " + Joiner.on(",").join(this.floorMap.values().stream().map(Floor::toString).toList()), "mqtt: " + this.env.getEnvConfig().getMqtt().getBrokerUrl(), - "clientId: " + this.clientId, + "serverId: " + this.serverId, "executors: " + this.executorItemMap.size(), }); @@ -234,8 +242,9 @@ public class LogisticsRuntime { this.timeRate = 1.0f; // 启动 MQTT 监听 - this.amrMessageHandler.start(this.env.getEnvConfig().getMqtt(), this.clientId); - this.frontendPushService.start(this.env.getEnvConfig().getMqtt()); + this.amrMessageHandler.start(this.env.getEnvConfig().getMqtt(), this.serverId); + this.lccMqttService.start(this.env.getEnvConfig().getMqtt(), this.serverId); + this.lccRedisService.start(this.env.getEnvConfig().getRedis(), this.serverId); // 开启所有机器人的任务处理 Set executorTypes = Sets.newHashSet(); @@ -304,14 +313,34 @@ public class LogisticsRuntime { // 停止 MQTT 监听 this.amrMessageHandler.stop(); - this.frontendPushService.stop(); + this.lccMqttService.stop(); + this.lccRedisService.stop(); BannerUtils.printConfig(log, "LogisticsRuntime stop.", new String[]{ - "projectUUID: " + this.projectUUID, + "projectUUID: " + this.projectUuid, "envId: " + this.envId, "virtual: " + this.isVirtual, + "serverId: " + this.serverId, "stopTime: " + new Date(this.stopTime), "timeRate: " + this.timeRate, }); } + + public RuntimeState getState() { + RuntimeState state = new RuntimeState(); + + state.projectUuid = this.projectUuid; + state.envId = this.envId; + state.isVirtual = this.isVirtual; + state.serverId = this.serverId; + state.isRunning = this.isRunning; + + state.startTime = this.startTime; + state.stopTime = this.stopTime; + state.timeRate = this.timeRate; + state.subSystemList = this.project.getSubSystemList(); + + state.fillSystemInfos(); + return state; + } } diff --git a/servo/src/main/java/com/yvan/logisticsModel/RuntimeState.java b/servo/src/main/java/com/yvan/logisticsModel/RuntimeState.java new file mode 100644 index 0000000..cb09e64 --- /dev/null +++ b/servo/src/main/java/com/yvan/logisticsModel/RuntimeState.java @@ -0,0 +1,56 @@ +package com.yvan.logisticsModel; + +import java.util.HashMap; +import java.util.Map; + +public class RuntimeState { + public String projectUuid; + public long envId; + public boolean isVirtual; + public String serverId; + public boolean isRunning; + + public long startTime; + public long stopTime; + public float timeRate; + public String[] subSystemList; + + public float cpuUsage; + public float memoryUsage; + public float diskIoLoad; + /** + * 空闲内存,单位GB + */ + public float freeMemory; + /** + * 磁盘剩余空间,单位GB + */ + public float diskFreeSpace; + + public void fillSystemInfos() { + this.cpuUsage = SystemMetricsStore.cpuUsage; + this.memoryUsage = SystemMetricsStore.memoryUsage; + this.freeMemory = SystemMetricsStore.freeMemory; + this.diskIoLoad = SystemMetricsStore.diskIoLoad; + this.diskFreeSpace = SystemMetricsStore.diskFreeSpace; + } + + public Map toMap() { + Map map = new HashMap<>(); + map.put("projectUuid", projectUuid); + map.put("envId", envId); + map.put("isVirtual", isVirtual); + map.put("serverId", serverId); + map.put("isRunning", isRunning); + map.put("startTime", startTime); + map.put("stopTime", stopTime); + map.put("timeRate", timeRate); + map.put("subSystemList", subSystemList); + map.put("cpuUsage", cpuUsage); + map.put("memoryUsage", memoryUsage); + map.put("diskIoLoad", diskIoLoad); + map.put("freeMemory", freeMemory); + map.put("diskFreeSpace", diskFreeSpace); + return map; + } +} diff --git a/servo/src/main/java/com/yvan/logisticsModel/SystemMetricsStore.java b/servo/src/main/java/com/yvan/logisticsModel/SystemMetricsStore.java new file mode 100644 index 0000000..bc1fc96 --- /dev/null +++ b/servo/src/main/java/com/yvan/logisticsModel/SystemMetricsStore.java @@ -0,0 +1,10 @@ +package com.yvan.logisticsModel; + + +public class SystemMetricsStore { + public static volatile float cpuUsage; + public static volatile float memoryUsage; + public static volatile float diskIoLoad; + public static volatile float freeMemory; + public static volatile float diskFreeSpace; +} diff --git a/servo/src/main/java/com/yvan/mqtt/LccMqttService.java b/servo/src/main/java/com/yvan/mqtt/LccMqttService.java new file mode 100644 index 0000000..1a78177 --- /dev/null +++ b/servo/src/main/java/com/yvan/mqtt/LccMqttService.java @@ -0,0 +1,345 @@ +package com.yvan.mqtt; + +import com.yvan.logisticsEnv.EnvConfig; +import com.yvan.logisticsModel.LogisticsRuntime; +import lombok.Getter; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.clever.core.mapper.JacksonMapper; +import org.eclipse.paho.mqttv5.client.*; +import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +@Slf4j +public class LccMqttService implements MqttCallback { + private volatile EnvConfig.MqttConfig mqttConfig; + private volatile String clientId; + private volatile MqttClient mqttClient; + private final LogisticsRuntime runtime; + private final MemoryPersistence persistence = new MemoryPersistence(); + + private final Lock connectionLock = new ReentrantLock(); + + // QoS 配置 + private static final int QOS = 1; + + // 重连参数 + private static final int MAX_RETRIES = 5; + private static final int RETRY_INTERVAL = 3; // 秒 + + // 连接状态 + @Getter + private volatile boolean connected = false; + + public LccMqttService(LogisticsRuntime runtime) { + this.runtime = runtime; + } + + /** + * 启动MQTT服务 + */ + @SneakyThrows + public void start(EnvConfig.MqttConfig mqttConfig, String clientId) { + this.mqttConfig = mqttConfig; + this.clientId = clientId; + connectionLock.lock(); + try { + if (connected) { + log.warn("MQTT service is already started"); + return; + } + + log.info("Starting MQTT service for project: {}, env: {}", this.runtime.projectUuid, this.runtime.envId); + + // 创建MQTT客户端 + mqttClient = new MqttClient(mqttConfig.getBrokerUrl(), clientId, persistence); + mqttClient.setCallback(this); + + // 配置连接选项 + MqttConnectionOptions options = new MqttConnectionOptions(); + options.setServerURIs(new String[]{mqttConfig.getBrokerUrl()}); + options.setUserName(mqttConfig.getUsername()); + options.setPassword(mqttConfig.getPassword().getBytes()); + options.setAutomaticReconnect(true); + options.setConnectionTimeout(10); + options.setKeepAliveInterval(60); + options.setExecutorServiceTimeout(1); + + // 尝试连接 + int attempts = 0; + while (attempts < MAX_RETRIES && !connected) { + attempts++; + try { + log.info("Connecting to MQTT broker (attempt {}/{})", attempts, MAX_RETRIES); + mqttClient.connect(options); + connected = true; + log.info("MQTT connected successfully"); + } catch (MqttException e) { + log.error("MQTT connection failed (attempt " + attempts + "/{" + MAX_RETRIES + "})", e); + + // 重试前等待 + if (attempts < MAX_RETRIES) { + try { + TimeUnit.SECONDS.sleep(RETRY_INTERVAL); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + } + } + + if (!connected) { + log.error("Failed to connect to MQTT broker after {} attempts", MAX_RETRIES); + } + } finally { + connectionLock.unlock(); + } + } + + /** + * 停止MQTT服务 + */ + public void stop() { + connectionLock.lock(); + try { + if (mqttClient != null && mqttClient.isConnected()) { + try { + mqttClient.disconnect(); + mqttClient.close(); + log.info("MQTT disconnected"); + } catch (MqttException e) { + log.error("Error disconnecting MQTT", e); + } + } + connected = false; + } finally { + connectionLock.unlock(); + } + } + + // ==================== 消息推送方法 ==================== + + /** + * 推送服务器状态 + * /lcc/{proj_id}/{env_id}/server + * + * @param statusData 状态数据 + */ + public void pushServerState(Map statusData) { + String topic = buildTopic("server"); + publishJson(topic, statusData); + } + + /** + * 推送客户端状态 + * /lcc/{proj_id}/{env_id}/client + * + * @param clientData 客户端数据 + */ + public void pushClientState(Map clientData) { + String topic = buildTopic("client"); + publishJson(topic, clientData); + } + + /** + * 推送任务更新 + * /lcc/{proj_id}/{env_id}/task + * + * @param taskData 任务数据 + */ + public void pushTaskUpdate(Object taskData) { + String topic = buildTopic("task"); + publishJson(topic, taskData); + } + + /** + * 推送库存更新 + * /lcc/{proj_id}/{env_id}/inv/{catalogCode} + * + * @param catalogCode 货位目录编码 + * @param before 更新前库存 + * @param after 更新后库存 + */ + public void pushInventoryUpdate(String catalogCode, Object before, Object after) { + String topic = buildTopic("inv/" + catalogCode); + + Map data = new HashMap<>(); + data.put("before", before); + data.put("after", after); + + publishJson(topic, data); + } + + /** + * 推送设备状态 + * /lcc/{proj_id}/{env_id}/device/{id}/status + * + * @param deviceId 设备ID + * @param statusData 状态数据 + */ + public void pushDeviceStatus(String deviceId, Map statusData) { + String topic = buildTopic("device/" + deviceId + "/status"); + publishJson(topic, statusData); + } + + /** + * 推送设备存活状态 + * + * @param deviceId 设备ID + * @param online 是否在线 + */ + public void pushDeviceAlive(String deviceId, boolean online) { + String topic = buildTopic("device/" + deviceId + "/alive"); + publishString(topic, online ? "online" : "offline"); + } + + /** + * 推送日志 + * /lcc/{proj_id}/{env_id}/log/{type} + * + * @param logType 日志类型 + * @param logData 日志数据 + */ + public void pushLogs(String logType, Object logData) { + String topic = buildTopic("log/" + logType); + publishJson(topic, logData); + } + + /** + * 推送告警 + * /lcc/{proj_id}/{env_id}/alarm + * + * @param alarmData 告警数据 + */ + public void pushAlarm(Object alarmData) { + String topic = buildTopic("alarm"); + publishJson(topic, alarmData); + } + + /** + * 推送脚本更新 + * /lcc/{proj_id}/script + * + * @param scriptData 脚本数据 + */ + public void pushScriptUpdate(Object scriptData) { + // 脚本系统没有环境ID + String topic = "/lcc/" + this.runtime.projectUuid + "/script"; + publishJson(topic, scriptData); + } + + // ==================== 内部工具方法 ==================== + + /** + * 构建主题路径 + */ + private String buildTopic(String suffix) { + return "/lcc/" + this.runtime.projectUuid + "/" + this.runtime.envId + "/" + suffix; + } + + /** + * 发布JSON数据 + */ + private void publishJson(String topic, Object data) { + try { + String json = JacksonMapper.getInstance().toJson(data); + publish(topic, json); + } catch (Exception e) { + log.error("Failed to serialize JSON for topic: " + topic, e); + } + } + + /** + * 发布字符串数据 + */ + private void publishString(String topic, String message) { + publish(topic, message); + } + + /** + * 通用发布方法 + */ + private void publish(String topic, String payload) { + if (!connected) { + log.error("Attempted to publish while disconnected: {}", topic); + return; + } + + try { + MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8)); + message.setQos(QOS); + message.setRetained(false); + + mqttClient.publish(topic, message); + + log.debug("Published to {}: {}", topic, payload); + } catch (MqttException e) { + log.error("Failed to publish to topic: " + topic, e); + } + } + + // ==================== MQTT 回调方法 ==================== + + @Override + public void disconnected(MqttDisconnectResponse disconnectResponse) { + log.warn("MQTT disconnected: {}", disconnectResponse); + connectionLock.lock(); + try { + connected = false; + } finally { + connectionLock.unlock(); + } + } + + @Override + public void mqttErrorOccurred(MqttException exception) { + log.error("MQTT error occurred", exception); + } + + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { + log.debug("Message arrived on topic: {}", topic); + // 作为服务端,我们只发布消息,不接收消息 + // 可以记录或处理意外收到的消息 + } + + @Override + public void deliveryComplete(IMqttToken iMqttToken) { + try { + String[] topics = iMqttToken.getTopics(); + if (topics != null && topics.length > 0) { + log.info("Message delivery confirmed for topic: {}", topics[0]); + } else { + log.info("Message delivery confirmed (no topic info)"); + } + } catch (Exception e) { + log.warn("Error getting delivery token info", e); + } + } + + @Override + public void connectComplete(boolean reconnect, String serverURI) { + log.info("MQTT connection complete: reconnect={}, serverURI={}", reconnect, serverURI); + connectionLock.lock(); + try { + connected = true; + } finally { + connectionLock.unlock(); + } + } + + @Override + public void authPacketArrived(int i, MqttProperties mqttProperties) { + log.info("MQTT authPacketArrived({}, {})", i, mqttProperties); + } +} diff --git a/servo/src/main/java/com/yvan/redis/LccRedisService.java b/servo/src/main/java/com/yvan/redis/LccRedisService.java new file mode 100644 index 0000000..5349b31 --- /dev/null +++ b/servo/src/main/java/com/yvan/redis/LccRedisService.java @@ -0,0 +1,238 @@ +package com.yvan.redis; + +import com.yvan.logisticsEnv.EnvConfig; +import com.yvan.logisticsModel.LogisticsRuntime; +import lombok.extern.slf4j.Slf4j; +import org.clever.data.redis.Redis; +import org.clever.data.redis.RedisAdmin; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class LccRedisService { + private final LogisticsRuntime runtime; + private volatile Redis redis; + private String serverId; // 客户端ID + private volatile EnvConfig.RedisConfig config; + + // 心跳管理 + private final ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor(); + private final Map heartbeatKeys = new HashMap<>(); + + // 设备存活状态管理 + private final Map deviceAliveKeys = new HashMap<>(); + + public LccRedisService(LogisticsRuntime runtime) { + this.runtime = runtime; + } + + public void start(EnvConfig.RedisConfig config, String clientId) { + redis = RedisAdmin.getRedis(); + this.serverId = clientId; + + // 启动服务器心跳 + heartbeatScheduler.scheduleAtFixedRate( + this::updateServerHeartbeat, + 0, 5, TimeUnit.SECONDS + ); + + log.info("LccRedisService started for project: " + runtime.projectUuid + ", env: " + runtime.envId); + } + + /** + * 停止服务 + */ + public void stop() { + // 停止心跳调度 + heartbeatScheduler.shutdown(); + try { + if (!heartbeatScheduler.awaitTermination(5, TimeUnit.SECONDS)) { + heartbeatScheduler.shutdownNow(); + } + } catch (InterruptedException e) { + heartbeatScheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + + // 清除所有心跳状态 + clearAllHeartbeats(); + + log.info("LccRedisService stopped for project: " + runtime.projectUuid + ", env: " + runtime.envId); + } + + // ==================== 心跳管理 ==================== + + /** + * 更新服务器心跳 + */ + private void updateServerHeartbeat() { + // 服务器状态 + Map statusData = this.runtime.getState().toMap(); + + String statusKey = buildKey("server/rcs/status"); + redis.hPutAll(statusKey, statusData); + + // 服务器存活状态 + String aliveKey = buildKey("server/rcs/alive"); + redis.vSet(aliveKey, this.serverId); + redis.kExpire(aliveKey, 15); // 15秒过期 + + heartbeatKeys.put(statusKey, System.currentTimeMillis()); + heartbeatKeys.put(aliveKey, System.currentTimeMillis()); + } + + /** + * 更新客户端心跳 + */ + private void updateClientHeartbeat(String clientId, String clientUser) { + // 客户端状态 + Map statusData = new HashMap<>(); + statusData.put("clientId", clientId); + statusData.put("user", clientUser); + statusData.put("timestamp", String.valueOf(System.currentTimeMillis())); + + String statusKey = buildKey("client/status"); + redis.hPutAll(statusKey, statusData); + + // 客户端存活状态 + String aliveKey = buildKey("client/alive"); + redis.vSet(aliveKey, clientId); + redis.kExpire(aliveKey, 15); // 15秒过期 + + heartbeatKeys.put(statusKey, System.currentTimeMillis()); + heartbeatKeys.put(aliveKey, System.currentTimeMillis()); + } + + /** + * 更新设备存活状态 + * + * @param deviceId 设备ID + */ + public void updateDeviceAlive(String deviceId) { + String aliveKey = buildKey("device/" + deviceId + "/alive"); + redis.vSet(aliveKey, "1"); + redis.kExpire(aliveKey, 5); // 5秒过期 + + deviceAliveKeys.put(aliveKey, System.currentTimeMillis()); + } + + /** + * 清除所有心跳状态 + */ + private void clearAllHeartbeats() { + // 清除服务器心跳 + for (String key : heartbeatKeys.keySet()) { + redis.kDelete(key); + } + + // 清除设备心跳 + for (String key : deviceAliveKeys.keySet()) { + redis.kDelete(key); + } + } + + // ==================== 监控数据管理 ==================== + // 1. 服务器状态 + + /** + * 获取服务器状态 + */ + public Map getServerStatus() { + String key = buildKey("server/rcs/status"); + return (Map) ((Map) redis.hEntries(key)); + } + + /** + * 检查服务器是否存活 + */ + public boolean isServerAlive() { + String key = buildKey("server/rcs/alive"); + return redis.kHasKey(key); + } + + // 2. 客户端状态 + + /** + * 获取客户端状态 + */ + public Map getClientStatus() { + String key = buildKey("client/status"); + return (Map) ((Map) redis.hEntries(key)); + } + + /** + * 检查客户端是否存活 + */ + public boolean isClientAlive() { + String key = buildKey("client/alive"); + return redis.kHasKey(key); + } + + // 5. 设备监控 + + /** + * 保存设备状态 + * + * @param deviceId 设备ID + * @param statusData 状态数据 + */ + public void saveDeviceStatus(String deviceId, Map statusData) { + String key = buildKey("device/" + deviceId + "/status"); + redis.hPutAll(key, statusData); + } + + /** + * 获取设备状态 + * + * @param deviceId 设备ID + */ + public Map getDeviceStatus(String deviceId) { + String key = buildKey("device/" + deviceId + "/status"); + return (Map) ((Map) redis.hEntries(key)); + } + + /** + * 检查设备是否存活 + * + * @param deviceId 设备ID + */ + public boolean isDeviceAlive(String deviceId) { + String key = buildKey("device/" + deviceId + "/alive"); + return redis.kHasKey(key); + } + + // ==================== 工具方法 ==================== + + /** + * 构建完整的Redis Key + */ + private String buildKey(String suffix) { + return "/lcc/" + runtime.projectUuid + "/" + runtime.envId + "/" + suffix; + } + + /* + private ScheduledExecutorService schedulerOfWriteToRedisAlive = Executors.newSingleThreadScheduledExecutor(); + private ScheduledExecutorService schedulerOfWriteToRedisStatus = Executors.newSingleThreadScheduledExecutor(); + + public void writeToRedisStatus() { + String redisKey = String.format("/lcc/%s/%s/server/rcs/status", this.projectUuid, this.envId); + + var currentMap = this.getState().toMap(); + var originMap = redis.hMultiGet(redisKey, currentMap.keySet()); + + // 对比两个 map 如果有不一致的地方, 通知状态变更 + + redis.hPutAll(redisKey, currentMap); + } + + public void writeToRedisAlive() { + String redisKey = String.format("/lcc/%s/%s/server/rcs/alive", this.projectUuid, this.envId); + redis.vSet(redisKey, this.serverId); + redis.kExpire(redisKey, 15); // 15秒过期 + } + */ +} diff --git a/servo/src/main/java/com/yvan/workbench/controller/EnvController.java b/servo/src/main/java/com/yvan/workbench/controller/EnvController.java index fe2c6ab..ff21e1e 100644 --- a/servo/src/main/java/com/yvan/workbench/controller/EnvController.java +++ b/servo/src/main/java/com/yvan/workbench/controller/EnvController.java @@ -12,10 +12,7 @@ import org.clever.web.mvc.annotation.RequestBody; import java.util.Map; -import static com.galaxis.rcs.common.query.QLccBasContainer.lccBasContainer; import static com.galaxis.rcs.common.query.QLccBasExecutor.lccBasExecutor; -import static com.galaxis.rcs.common.query.QLccBasLocation.lccBasLocation; -import static com.galaxis.rcs.common.query.QLccInvLpn.lccInvLpn; public class EnvController { static final QueryDSL queryDSL = DaoFactory.getQueryDSL(); @@ -38,44 +35,6 @@ public class EnvController { return R.success(list); } - public static R getAllInv(@RequestBody Map params) { - String catalogCode = Conv.asString(params.get("catalogCode")); - Long envId = Conv.asLong(params.get("envId")); - - if (Strings.isNullOrEmpty(catalogCode)) { - return R.fail("catalogCode must not be null"); - } - if (envId == null) { - return R.fail("envId must not be null"); - } - - var list = queryDSL.select(QueryDslUtils.linkedMap( - lccBasContainer.containerType, - lccInvLpn.lpn, - lccInvLpn.locCode, - lccBasLocation.rack, - lccBasLocation.bay, - lccBasLocation.level, - lccBasLocation.cell - )) - .from(lccInvLpn) - .innerJoin(lccBasLocation).on( - lccInvLpn.locCode.eq(lccBasLocation.locCode).and( - lccBasLocation.envId.eq(lccBasLocation.envId) - ) - ) - .innerJoin(lccBasContainer).on( - lccInvLpn.lpn.eq(lccInvLpn.lpn).and( - lccBasContainer.envId.eq(lccInvLpn.envId) - ) - ) - .where(lccInvLpn.envId.eq(envId)) - .where(lccBasLocation.catalogCode.eq(catalogCode)) - .fetch(); - - return R.success(list); - } - public static R getAllExecutor(@RequestBody Map params) { String catalogCode = Conv.asString(params.get("catalogCode")); Long envId = Conv.asLong(params.get("envId")); diff --git a/servo/src/main/java/com/yvan/workbench/controller/LccController.java b/servo/src/main/java/com/yvan/workbench/controller/LccController.java index 56b64e5..57cff06 100644 --- a/servo/src/main/java/com/yvan/workbench/controller/LccController.java +++ b/servo/src/main/java/com/yvan/workbench/controller/LccController.java @@ -1,15 +1,106 @@ package com.yvan.workbench.controller; +import com.galaxis.rcs.RCSService; +import com.google.common.base.Strings; import com.yvan.workbench.SpringContext; import com.yvan.workbench.service.LccMapService; +import org.clever.core.Conv; import org.clever.core.model.response.R; +import org.clever.data.jdbc.DaoFactory; +import org.clever.data.jdbc.QueryDSL; +import org.clever.data.jdbc.querydsl.utils.QueryDslUtils; +import org.clever.web.mvc.annotation.RequestBody; + +import java.util.Map; + +import static com.galaxis.rcs.common.query.QLccBasContainer.lccBasContainer; +import static com.galaxis.rcs.common.query.QLccBasLocation.lccBasLocation; +import static com.galaxis.rcs.common.query.QLccInvLpn.lccInvLpn; /** - * /api/workbench/LccController@get1 + * LCC API 服务端实现 */ public class LccController { + static final QueryDSL queryDSL = DaoFactory.getQueryDSL(); + public static R getAllProjects() { var mapService = SpringContext.HOLDER.getBean(LccMapService.class); return R.success(mapService.getAllProjects()); } + + public static R projectStart(@RequestBody Map params) { + String projectUuid = Conv.asString(params.get("projectUUID")); + Long envId = Conv.asLong(params.get("envId")); + + if (Strings.isNullOrEmpty(projectUuid)) { + return R.fail("projectUUID Must not be empty"); + } + if (envId == null || envId < 0) { + return R.fail("envId Must not be empty"); + } + + // 启动 RCS 服务器 + RCSService.projectStart(projectUuid, envId); + + // 启动MFC服务器 / 启动WCS服务器 / 启动PES服务器 等等 + + return R.success("Project started successfully"); + } + + public static R projectStop(@RequestBody Map params) { + String projectUuid = Conv.asString(params.get("projectUUID")); + Long envId = Conv.asLong(params.get("envId")); + + if (Strings.isNullOrEmpty(projectUuid)) { + return R.fail("projectUUID Must not be empty"); + } + if (envId == null || envId < 0) { + return R.fail("envId Must not be empty"); + } + + // 停止RCS服务器 + RCSService.projectStop(projectUuid, envId); + + // 停止MFC服务器 / 停止WCS服务器 / 停止PES服务器 等等 + return R.success("Project stopped successfully"); + } + + + public static R getAllInv(@RequestBody Map params) { + String catalogCode = Conv.asString(params.get("catalogCode")); + Long envId = Conv.asLong(params.get("envId")); + + if (Strings.isNullOrEmpty(catalogCode)) { + return R.fail("catalogCode must not be null"); + } + if (envId == null) { + return R.fail("envId must not be null"); + } + + var list = queryDSL.select(QueryDslUtils.linkedMap( + lccInvLpn.lpn, + lccBasContainer.containerType, + lccInvLpn.locCode, + lccBasLocation.rack, + lccBasLocation.bay, + lccBasLocation.level, + lccBasLocation.cell + )) + .from(lccInvLpn) + .innerJoin(lccBasLocation).on( + lccInvLpn.locCode.eq(lccBasLocation.locCode).and( + lccBasLocation.envId.eq(lccBasLocation.envId) + ) + ) + .innerJoin(lccBasContainer).on( + lccInvLpn.lpn.eq(lccInvLpn.lpn).and( + lccBasContainer.envId.eq(lccInvLpn.envId) + ) + ) + .where(lccInvLpn.envId.eq(envId)) + .where(lccBasLocation.catalogCode.eq(catalogCode)) + .fetch(); + + return R.success(list); + } } diff --git a/servo/src/main/java/com/yvan/workbench/controller/RcsController.java b/servo/src/main/java/com/yvan/workbench/controller/RcsController.java index 1da1349..c79b177 100644 --- a/servo/src/main/java/com/yvan/workbench/controller/RcsController.java +++ b/servo/src/main/java/com/yvan/workbench/controller/RcsController.java @@ -25,36 +25,6 @@ import java.util.Map; public class RcsController { static final SnowFlake snowFlake = new SnowFlake(); - public static R projectStart(@RequestBody Map params) { - String projectUuid = Conv.asString(params.get("projectUUID")); - Long envId = Conv.asLong(params.get("envId")); - - if (Strings.isNullOrEmpty(projectUuid)) { - return R.fail("projectUUID Must not be empty"); - } - if (envId == null || envId < 0) { - return R.fail("envId Must not be empty"); - } - - RCSService.projectStart(projectUuid, envId); - return R.success("Project started successfully"); - } - - public static R projectStop(@RequestBody Map params) { - String projectUuid = Conv.asString(params.get("projectUUID")); - Long envId = Conv.asLong(params.get("envId")); - - if (Strings.isNullOrEmpty(projectUuid)) { - return R.fail("projectUUID Must not be empty"); - } - if (envId == null || envId < 0) { - return R.fail("envId Must not be empty"); - } - - RCSService.projectStop(projectUuid, envId); - return R.success("Project stopped successfully"); - } - public static R agvToCharger(@RequestBody Map params) { Object ret = getCommonParamAndCreateBizTask(params); if (ret instanceof R) { diff --git a/servo/src/main/java/com/yvan/workbench/model/entity/LccProject.java b/servo/src/main/java/com/yvan/workbench/model/entity/LccProject.java index 6361b97..9bc29be 100644 --- a/servo/src/main/java/com/yvan/workbench/model/entity/LccProject.java +++ b/servo/src/main/java/com/yvan/workbench/model/entity/LccProject.java @@ -9,6 +9,7 @@ import java.util.Map; public class LccProject { private String projectUuid; private String projectLabel; + private String[] subSystemList; private String projectFileLocation; private CatalogGroup[] directoryData; private Map otherData; diff --git a/servo/src/main/java/com/yvan/workbench/service/LccServerInfoConnect.java b/servo/src/main/java/com/yvan/workbench/service/LccServerInfoConnect.java new file mode 100644 index 0000000..59a3773 --- /dev/null +++ b/servo/src/main/java/com/yvan/workbench/service/LccServerInfoConnect.java @@ -0,0 +1,124 @@ +package com.yvan.workbench.service; + +import com.yvan.logisticsModel.SystemMetricsStore; +import oshi.SystemInfo; +import oshi.hardware.CentralProcessor; +import oshi.hardware.GlobalMemory; +import oshi.hardware.HardwareAbstractionLayer; +import oshi.software.os.FileSystem; +import oshi.software.os.OSFileStore; +import oshi.software.os.OperatingSystem; + +import org.springframework.context.SmartLifecycle; +import org.springframework.stereotype.Component; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +@Component +public class LccServerInfoConnect implements SmartLifecycle { + + private boolean running = false; + private final SystemInfo systemInfo = new SystemInfo(); + private final HardwareAbstractionLayer hardware = systemInfo.getHardware(); + private final OperatingSystem os = systemInfo.getOperatingSystem(); + private long[] lastTicks = null; + private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + @Override + public void start() { + if (running) return; + + running = true; + scheduler.scheduleAtFixedRate(this::updateAllMetrics, 0, 30, TimeUnit.SECONDS); + } + + @Override + public void stop() { + running = false; + if (!scheduler.isShutdown()) { + scheduler.shutdownNow(); + } + } + + @Override + public boolean isRunning() { + return running; + } + + private void updateAllMetrics() { + updateCpuUsage(); + updateMemoryUsage(); + updateDiskIoLoad(); + updateDiskFreeSpace(); + } + + private void updateCpuUsage() { + CentralProcessor processor = hardware.getProcessor(); + if (lastTicks == null) { + lastTicks = processor.getSystemCpuLoadTicks(); // 初始化一次 + try { + Thread.sleep(50); // 初次需要等待一点时间才能获取有效数据 + } catch (InterruptedException ignored) { + } + } + long[] currentTicks = processor.getSystemCpuLoadTicks(); + + double cpuLoad = processor.getSystemCpuLoadBetweenTicks(lastTicks); + lastTicks = currentTicks; + + SystemMetricsStore.cpuUsage = (float) (cpuLoad * 100); + } + + private void updateMemoryUsage() { + GlobalMemory memory = hardware.getMemory(); + long total = memory.getTotal(); + long available = memory.getAvailable(); + long used = total - available; + SystemMetricsStore.memoryUsage = (float) ((double) used / total * 100); + SystemMetricsStore.freeMemory = available / (1024f * 1024 * 1024); // GB + } + + private void updateDiskIoLoad() { + CentralProcessor processor = hardware.getProcessor(); + long[] prevTicks = processor.getSystemCpuLoadTicks(); + try { + Thread.sleep(500); + } catch (InterruptedException ignored) { + } + long[] currTicks = processor.getSystemCpuLoadTicks(); + + long busyTime = (currTicks[CentralProcessor.TickType.USER.getIndex()] + + currTicks[CentralProcessor.TickType.NICE.getIndex()] + + currTicks[CentralProcessor.TickType.SYSTEM.getIndex()] + + currTicks[CentralProcessor.TickType.IRQ.getIndex()]) + - (prevTicks[CentralProcessor.TickType.USER.getIndex()] + + prevTicks[CentralProcessor.TickType.NICE.getIndex()] + + prevTicks[CentralProcessor.TickType.SYSTEM.getIndex()] + + prevTicks[CentralProcessor.TickType.IRQ.getIndex()]); + long idleTime = (currTicks[CentralProcessor.TickType.IDLE.getIndex()] + + currTicks[CentralProcessor.TickType.IOWAIT.getIndex()]) + - (prevTicks[CentralProcessor.TickType.IDLE.getIndex()] + + prevTicks[CentralProcessor.TickType.IOWAIT.getIndex()]); + double total = busyTime + idleTime; + float ioWait = total == 0 ? 0 : (float) ((currTicks[CentralProcessor.TickType.IOWAIT.getIndex()] + - prevTicks[CentralProcessor.TickType.IOWAIT.getIndex()]) / total * 100); + SystemMetricsStore.diskIoLoad = ioWait; + } + + private void updateDiskFreeSpace() { + FileSystem fileSystem = os.getFileSystem(); + float minFree = Float.MAX_VALUE; + for (OSFileStore fs : fileSystem.getFileStores()) { + long usableSpace = fs.getUsableSpace(); + float freeGb = usableSpace / (1024f * 1024 * 1024); + if (freeGb < minFree) { + minFree = freeGb; + } + } + if (minFree != Float.MAX_VALUE) { + SystemMetricsStore.diskFreeSpace = minFree; + } + } +}