From edbbb451fb5e528263f01e587822245d2b68d877 Mon Sep 17 00:00:00 2001 From: luoyifan Date: Fri, 4 Jul 2025 14:55:43 +0800 Subject: [PATCH] =?UTF-8?q?Agv=20Event=20=E4=BD=93=E7=B3=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/galaxis/rcs/common/enums/AgvEventType.java | 10 - .../java/com/galaxis/rcs/ptr/AgvEventListener.java | 7 - .../java/com/galaxis/rcs/ptr/AgvEventManager.java | 66 ---- .../main/java/com/galaxis/rcs/ptr/PtrAgvItem.java | 52 ++- .../main/java/com/yvan/event/AgvEventListener.java | 5 + .../src/main/java/com/yvan/event/AgvEventType.java | 82 +++++ .../src/main/java/com/yvan/event/EventManager.java | 24 ++ .../com/yvan/logisticsModel/LogisticsRuntime.java | 21 +- .../com/yvan/mqtt/FrontendMessagePushService.java | 349 +++++++++++++++++++++ .../main/java/com/yvan/mqtt/LccMqttService.java | 349 --------------------- .../main/java/com/yvan/state/VariableMonitor.java | 114 +++++++ 11 files changed, 600 insertions(+), 479 deletions(-) delete mode 100644 servo/src/main/java/com/galaxis/rcs/common/enums/AgvEventType.java delete mode 100644 servo/src/main/java/com/galaxis/rcs/ptr/AgvEventListener.java delete mode 100644 servo/src/main/java/com/galaxis/rcs/ptr/AgvEventManager.java create mode 100644 servo/src/main/java/com/yvan/event/AgvEventListener.java create mode 100644 servo/src/main/java/com/yvan/event/AgvEventType.java create mode 100644 servo/src/main/java/com/yvan/event/EventManager.java create mode 100644 servo/src/main/java/com/yvan/mqtt/FrontendMessagePushService.java delete mode 100644 servo/src/main/java/com/yvan/mqtt/LccMqttService.java create mode 100644 servo/src/main/java/com/yvan/state/VariableMonitor.java diff --git a/servo/src/main/java/com/galaxis/rcs/common/enums/AgvEventType.java b/servo/src/main/java/com/galaxis/rcs/common/enums/AgvEventType.java deleted file mode 100644 index e7555c6..0000000 --- a/servo/src/main/java/com/galaxis/rcs/common/enums/AgvEventType.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.galaxis.rcs.common.enums; - -public enum AgvEventType { - OFFLINE, ONLINE, FREE, - PLAN_COMPLETE, PLAN_ACCEPT, PLAN_CANCEL, - PLAN_PAUSE, PLAN_RESUME, - DEVICE_TASK_COMPLETE, DEVICE_TASK_EXCEPTION, - POS_CHANGED, DIRECTION_CHANGED, - LOW_BATTERY, STUCK -} diff --git a/servo/src/main/java/com/galaxis/rcs/ptr/AgvEventListener.java b/servo/src/main/java/com/galaxis/rcs/ptr/AgvEventListener.java deleted file mode 100644 index c3f3a9b..0000000 --- a/servo/src/main/java/com/galaxis/rcs/ptr/AgvEventListener.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.galaxis.rcs.ptr; - -import com.galaxis.rcs.common.enums.AgvEventType; - -public interface AgvEventListener { - void onEvent(AgvEventType type, Object... args); -} diff --git a/servo/src/main/java/com/galaxis/rcs/ptr/AgvEventManager.java b/servo/src/main/java/com/galaxis/rcs/ptr/AgvEventManager.java deleted file mode 100644 index a0c3bc8..0000000 --- a/servo/src/main/java/com/galaxis/rcs/ptr/AgvEventManager.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.galaxis.rcs.ptr; - -import com.galaxis.rcs.common.enums.AgvEventType; -import com.yvan.logisticsModel.LogisticsRuntime; -import lombok.extern.slf4j.Slf4j; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.*; - -@Slf4j -public class AgvEventManager { - private final Map> listeners = new ConcurrentHashMap<>(); - private final Map> stuckMonitors = new ConcurrentHashMap<>(); - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(4); - private final LogisticsRuntime runtime; - - public AgvEventManager(LogisticsRuntime runtime) { - this.runtime = runtime; - } - - public void registerListener(AgvEventType type, AgvEventListener listener) { - listeners.computeIfAbsent(type, k -> new CopyOnWriteArrayList<>()).add(listener); - } - - public void unregisterListener(AgvEventType type, AgvEventListener listener) { - List list = listeners.get(type); - if (list != null) { - list.remove(listener); - } - } - - public void fireEvent(AgvEventType type, Object... args) { - List list = listeners.get(type); - if (list != null) { - for (AgvEventListener listener : list) { - try { - listener.onEvent(type, args); - } catch (Exception e) { - log.error("Error handling event {}", type, e); - } - } - } - } - - public void monitorMovement(String agvId, PosDirection startPos, int delayOfSeconds) { - // 取消现有的监控 - cancelStuckMonitor(agvId); - - ScheduledFuture future = scheduler.schedule(() -> { - PtrAgvItem agv = (PtrAgvItem) this.runtime.executorItemMap.get(agvId); - if (agv != null && agv.isSamePosition(startPos)) { - fireEvent(AgvEventType.STUCK, agv, delayOfSeconds); - } - }, delayOfSeconds, TimeUnit.SECONDS); - - stuckMonitors.put(agvId, future); - } - - public void cancelStuckMonitor(String agvId) { - ScheduledFuture future = stuckMonitors.remove(agvId); - if (future != null) { - future.cancel(false); - } - } -} diff --git a/servo/src/main/java/com/galaxis/rcs/ptr/PtrAgvItem.java b/servo/src/main/java/com/galaxis/rcs/ptr/PtrAgvItem.java index 1d2b6ff..e53af26 100644 --- a/servo/src/main/java/com/galaxis/rcs/ptr/PtrAgvItem.java +++ b/servo/src/main/java/com/galaxis/rcs/ptr/PtrAgvItem.java @@ -2,7 +2,7 @@ package com.galaxis.rcs.ptr; import com.fasterxml.jackson.annotation.JsonIgnore; import com.galaxis.rcs.common.entity.RcsTaskPlan; -import com.galaxis.rcs.common.enums.AgvEventType; +import com.yvan.event.AgvEventType; import com.galaxis.rcs.common.enums.LCCDirection; import com.galaxis.rcs.common.enums.PlanTaskStatus; import com.galaxis.rcs.common.enums.PlanTaskType; @@ -15,6 +15,7 @@ import com.galaxis.rcs.ptr.sendEntity.RcsSRMessage; import com.galaxis.rcs.ptr.sendEntity.RcsSetLocationMessage; import com.google.common.base.Joiner; import com.google.common.collect.Queues; +import com.yvan.event.EventManager; import com.yvan.logisticsModel.ExecutorItem; import com.yvan.logisticsModel.LogisticsRuntime; import com.yvan.logisticsModel.StaticItem; @@ -73,8 +74,6 @@ public abstract class PtrAgvItem extends ExecutorItem { private volatile PosDirection lastPausedPosition; private volatile boolean isOnline = false; - private final Set eventListeners = new CopyOnWriteArraySet<>(); - // 执行中的任务 @JsonIgnore public List runningDeviceTaskList = new ArrayList<>(); @@ -91,6 +90,7 @@ public abstract class PtrAgvItem extends ExecutorItem { @JsonIgnore final BlockingQueue deviceTaskQueue = Queues.newArrayBlockingQueue(BLOCKING_QUEUE_CAPACITY); + @JsonIgnore final Cl2DeviceConnector cl2DeviceConnector = new Cl2DeviceConnector(this.runtime); @JsonIgnore @@ -159,10 +159,14 @@ public abstract class PtrAgvItem extends ExecutorItem { this.planTaskSequence = taskSequence; buildPlanToDeviceTask(); - fireEvent(AgvEventType.PLAN_ACCEPT, this); + fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_ACCEPT, this, taskSequence); connectorThread.resumeProcessing(); } + private void fireEvent(AgvEventType agvEventType, Object... args) { + this.runtime.eventManager.fireEvent(agvEventType, args); + } + public synchronized void pauseTask() { if (planTaskSequence == null) { throw new IllegalStateException("No active task to pause"); @@ -184,7 +188,7 @@ public abstract class PtrAgvItem extends ExecutorItem { log.error("Failed to send stop command to AGV {}", this.getId(), e); } - fireEvent(AgvEventType.PLAN_PAUSE, this); + fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_PAUSE, this); } public synchronized void resumeTask() { @@ -202,7 +206,7 @@ public abstract class PtrAgvItem extends ExecutorItem { isPaused = false; connectorThread.resumeProcessing(); - fireEvent(AgvEventType.PLAN_RESUME, this); + fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_RESUME, this); } @SneakyThrows @@ -217,7 +221,7 @@ public abstract class PtrAgvItem extends ExecutorItem { deviceTaskQueue.clear(); } - fireEvent(AgvEventType.PLAN_CANCEL, this); + fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_CANCEL, this); } @SneakyThrows @@ -284,7 +288,7 @@ public abstract class PtrAgvItem extends ExecutorItem { } if (planTaskSequence != null && planTaskSequence.isAllCompleted()) { - fireEvent(AgvEventType.PLAN_COMPLETE, this); + fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_COMPLETE, this); this.runningDeviceTaskList.clear(); planTaskSequence = null; } @@ -306,13 +310,12 @@ public abstract class PtrAgvItem extends ExecutorItem { // 触发位置变化事件 if (oldX != logicX || oldY != logicY) { fireEvent(AgvEventType.POS_CHANGED, this, - new PosDirection(oldX, oldY, oldDirection), - new PosDirection(logicX, logicY, direction)); + new PosDirection(logicX, logicY, direction), + new PosDirection(oldX, oldY, oldDirection)); } if (oldDirection != direction) { - fireEvent(AgvEventType.DIRECTION_CHANGED, this, - oldDirection, direction); + fireEvent(AgvEventType.DIRECTION_CHANGED, this, direction, oldDirection); } boolean needCompute = false; @@ -341,14 +344,15 @@ public abstract class PtrAgvItem extends ExecutorItem { // 更新计划任务 RcsTaskPlan planTask = planTaskSequence.getByPlanTaskId(task.movePlanTaskId); - if (planTask != null) { + if (planTask != null && !PlanTaskStatus.FINISHED.toString().equals(planTask.getPlanTaskStatus())) { + fireEvent(AgvEventType.DEVICE_TASK_COMPLETE, this, planTaskSequence, planTask); planTask.setPlanTaskStatus(PlanTaskStatus.FINISHED.toString()); planTaskSequence.savePlanTask(planTask); } } if (planTaskSequence.isAllCompleted()) { - fireEvent(AgvEventType.PLAN_COMPLETE, this); + fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_COMPLETE, this); this.runningDeviceTaskList.clear(); planTaskSequence = null; } @@ -446,21 +450,6 @@ public abstract class PtrAgvItem extends ExecutorItem { fireEvent(AgvEventType.OFFLINE, this); } - // 事件监听管理 - public void addEventListener(AgvEventListener listener) { - eventListeners.add(listener); - } - - public void removeEventListener(AgvEventListener listener) { - eventListeners.remove(listener); - } - - private void fireEvent(AgvEventType type, Object... args) { - for (AgvEventListener listener : eventListeners) { - listener.onEvent(type, args); - } - } - private String getTaskStatus() { if (planTaskSequence == null) return "IDLE"; if (isPaused) return "PAUSED"; @@ -801,11 +790,6 @@ public abstract class PtrAgvItem extends ExecutorItem { // planQueue.addAll(sequence.taskList); deviceTaskQueue.addAll(deviceTaskList); - - String json = JsonWrapper.toJson(deviceTaskList); - log.info("deviceTaskList: {}", json); - - // TODO: 开启轮询线程,等待下一个待执行任务 } public boolean isSamePosition(PosDirection startPos) { diff --git a/servo/src/main/java/com/yvan/event/AgvEventListener.java b/servo/src/main/java/com/yvan/event/AgvEventListener.java new file mode 100644 index 0000000..19c7119 --- /dev/null +++ b/servo/src/main/java/com/yvan/event/AgvEventListener.java @@ -0,0 +1,5 @@ +package com.yvan.event; + +public interface AgvEventListener { + void onEvent(AgvEventType type, Object... args); +} diff --git a/servo/src/main/java/com/yvan/event/AgvEventType.java b/servo/src/main/java/com/yvan/event/AgvEventType.java new file mode 100644 index 0000000..579517d --- /dev/null +++ b/servo/src/main/java/com/yvan/event/AgvEventType.java @@ -0,0 +1,82 @@ +package com.yvan.event; + +public enum AgvEventType { + /** + * 设备离线 + */ + OFFLINE, + /** + * 设备上线 + */ + ONLINE, + /** + * 设备阻挡 + */ + BLOCKED, + /** + * 设备阻挡恢复 + */ + BLOCKED_RECOVER, + /** + * 设备空闲 + */ + FREE, + /** + * 规划任务序列全部完成 + * (ExecutorItem sender) + */ + PLAN_TASK_SEQUENCE_COMPLETE, + /** + * 某一个规划任务被完成 + * (ExecutorItem sender, PlanTaskSequence taskSequence, RcsTaskPlan taskPlan) + */ + PLAN_TASK_COMPLETE, + /** + * 规划任务异常 + */ + PLAN_TASK_EXCEPTION, + /** + * 规划任务已接受 + * (ExecutorItem sender, PlanTaskSequence taskSequence) + */ + PLAN_TASK_SEQUENCE_ACCEPT, + /** + * 规划任务已取消 + * (ExecutorItem sender) + */ + PLAN_TASK_SEQUENCE_CANCEL, + /** + * 规划任务已暂停 + * (ExecutorItem sender) + */ + PLAN_TASK_SEQUENCE_PAUSE, + /** + * 规划任务已恢复 + * (ExecutorItem sender) + */ + PLAN_TASK_SEQUENCE_RESUME, + /** + * 设备任务已完成 + * (ExecutorItem sender, PtrAgvDeviceTask deviceTask) + */ + DEVICE_TASK_COMPLETE, + /** + * 设备任务异常 + */ + DEVICE_TASK_EXCEPTION, + /** + * 设备位置改变 + * (ExecutorItem sender, PosDirection newPosDirection, PosDirection originPosDirection) + */ + POS_CHANGED, + /** + * 设备姿态改变 + * (ExecutorItem sender, PosDirection newPosDirection, PosDirection originPosDirection) + */ + DIRECTION_CHANGED, + /** + * 设备低电量 + * (ExecutorItem sender) + */ + LOW_BATTERY, +} diff --git a/servo/src/main/java/com/yvan/event/EventManager.java b/servo/src/main/java/com/yvan/event/EventManager.java new file mode 100644 index 0000000..7fb5ec6 --- /dev/null +++ b/servo/src/main/java/com/yvan/event/EventManager.java @@ -0,0 +1,24 @@ +package com.yvan.event; + +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +public class EventManager { + + private final Set eventListeners = new CopyOnWriteArraySet<>(); + + public void fireEvent(AgvEventType type, Object... args) { + for (AgvEventListener listener : eventListeners) { + listener.onEvent(type, args); + } + } + + // 事件监听管理 + public void subscribe(AgvEventListener listener) { + eventListeners.add(listener); + } + + public void unsubscribe(AgvEventListener listener) { + eventListeners.remove(listener); + } +} diff --git a/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java b/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java index 820de2d..076b5e0 100644 --- a/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java +++ b/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java @@ -1,10 +1,8 @@ package com.yvan.logisticsModel; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.galaxis.rcs.connector.cl2.Cl2Item; import com.galaxis.rcs.plan.path.NavigationGraph; import com.galaxis.rcs.plan.path.PtrPathPlanner; -import com.galaxis.rcs.ptr.AgvEventManager; import com.galaxis.rcs.ptr.AmrMessageHandler; import com.galaxis.rcs.task.TaskDispatchFactory; import com.galaxis.rcs.task.TaskService; @@ -12,7 +10,8 @@ import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.yvan.mqtt.LccMqttService; +import com.yvan.event.EventManager; +import com.yvan.mqtt.FrontendMessagePushService; import com.yvan.redis.LccRedisService; import com.yvan.workbench.model.entity.LccProject; import com.yvan.workbench.model.entity.LccProjectEnv; @@ -28,7 +27,7 @@ import java.util.Map; import java.util.Set; /** - * 物流上下文运行时 + * 物流运行时 */ @Slf4j public class LogisticsRuntime { @@ -85,13 +84,11 @@ public class LogisticsRuntime { /** * 任务服务 */ - @JsonIgnore public final TaskService taskService = new TaskService(this); /** * 任务分配服务 */ - @JsonIgnore public final TaskDispatchFactory taskDispatchFactory = new TaskDispatchFactory(this); /** @@ -102,7 +99,6 @@ public class LogisticsRuntime { /** * AGV导航地图 车类型 t -> NavigationGraph */ - @JsonIgnore public final Map pathPlannerMap = Maps.newHashMap(); /** @@ -110,15 +106,14 @@ public class LogisticsRuntime { */ public final Map floorMap = Maps.newHashMap(); - @JsonIgnore public final AmrMessageHandler amrMessageHandler = new AmrMessageHandler(this); - public final AgvEventManager agvEventManager = new AgvEventManager(this); - - public final LccMqttService lccMqttService = new LccMqttService(this); + public final FrontendMessagePushService frontendMessagePushService = new FrontendMessagePushService(this); public final LccRedisService lccRedisService = new LccRedisService(this); + public final EventManager eventManager = new EventManager(); + public LogisticsRuntime(LccProject project, LccProjectEnv env, String serverId) { this.project = project; this.env = env; @@ -244,7 +239,7 @@ public class LogisticsRuntime { // 启动 MQTT 监听 this.amrMessageHandler.start(this.env.getEnvConfig().getMqtt(), this.serverId); - this.lccMqttService.start(this.env.getEnvConfig().getMqtt(), this.serverId + "_lcc_send"); + this.frontendMessagePushService.start(this.env.getEnvConfig().getMqtt(), this.serverId + "_lcc_send"); this.lccRedisService.start(this.env.getEnvConfig().getRedis(), this.serverId); // 开启所有机器人的任务处理 @@ -314,7 +309,7 @@ public class LogisticsRuntime { // 停止 MQTT 监听 this.amrMessageHandler.stop(); - this.lccMqttService.stop(); + this.frontendMessagePushService.stop(); this.lccRedisService.stop(); BannerUtils.printConfig(log, "LogisticsRuntime stop.", new String[]{ diff --git a/servo/src/main/java/com/yvan/mqtt/FrontendMessagePushService.java b/servo/src/main/java/com/yvan/mqtt/FrontendMessagePushService.java new file mode 100644 index 0000000..a22c7c5 --- /dev/null +++ b/servo/src/main/java/com/yvan/mqtt/FrontendMessagePushService.java @@ -0,0 +1,349 @@ +package com.yvan.mqtt; + +import com.yvan.logisticsEnv.EnvConfig; +import com.yvan.logisticsModel.LogisticsRuntime; +import lombok.Getter; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.clever.core.BannerUtils; +import org.clever.core.mapper.JacksonMapper; +import org.eclipse.paho.mqttv5.client.*; +import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +@Slf4j +public class FrontendMessagePushService implements MqttCallback { + private volatile EnvConfig.MqttConfig mqttConfig; + private volatile String clientId; + private volatile MqttClient mqttClient; + private final LogisticsRuntime runtime; + private final MemoryPersistence persistence = new MemoryPersistence(); + + private final Lock connectionLock = new ReentrantLock(); + + // QoS 配置 + private static final int QOS = 1; + + // 重连参数 + private static final int MAX_RETRIES = 5; + private static final int RETRY_INTERVAL = 3; // 秒 + + // 连接状态 + @Getter + private volatile boolean connected = false; + + public FrontendMessagePushService(LogisticsRuntime runtime) { + this.runtime = runtime; + } + + /** + * 启动MQTT服务 + */ + @SneakyThrows + public void start(EnvConfig.MqttConfig mqttConfig, String clientId) { + this.mqttConfig = mqttConfig; + this.clientId = clientId; + connectionLock.lock(); + try { + if (connected) { + log.warn("LCC_MQTT service is already started"); + return; + } + + log.info("Starting LCC_MQTT service for project: {}, env: {}", this.runtime.projectUuid, this.runtime.envId); + + // 创建MQTT客户端 + mqttClient = new MqttClient(mqttConfig.getBrokerUrl(), clientId, persistence); + mqttClient.setCallback(this); + + // 配置连接选项 + MqttConnectionOptions options = new MqttConnectionOptions(); + options.setServerURIs(new String[]{mqttConfig.getBrokerUrl()}); + options.setUserName(mqttConfig.getUsername()); + options.setPassword(mqttConfig.getPassword().getBytes()); + options.setAutomaticReconnect(true); + options.setConnectionTimeout(10); + options.setKeepAliveInterval(60); + options.setExecutorServiceTimeout(1); + + // 尝试连接 + int attempts = 0; + while (attempts < MAX_RETRIES && !connected) { + attempts++; + try { + log.info("Connecting to LCC_MQTT broker (attempt {}/{})", attempts, MAX_RETRIES); + mqttClient.connect(options); + connected = true; + log.info("LCC_MQTT connected successfully"); + } catch (MqttException e) { + log.error("LCC_MQTT connection failed (attempt " + attempts + "/{" + MAX_RETRIES + "})", e); + + // 重试前等待 + if (attempts < MAX_RETRIES) { + try { + TimeUnit.SECONDS.sleep(RETRY_INTERVAL); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + } + } + + if (!connected) { + log.error("Failed to connect to LCC_MQTT broker after {} attempts", MAX_RETRIES); + } + } finally { + connectionLock.unlock(); + } + } + + /** + * 停止MQTT服务 + */ + public void stop() { + connectionLock.lock(); + try { + if (mqttClient != null && mqttClient.isConnected()) { + try { + mqttClient.disconnect(); + mqttClient.close(); + log.info("LCC_MQTT disconnected"); + } catch (MqttException e) { + log.error("Error disconnecting LCC_MQTT", e); + } + } + connected = false; + } finally { + connectionLock.unlock(); + } + } + + // ==================== 消息推送方法 ==================== + + /** + * 推送服务器状态 + * /lcc/{proj_id}/{env_id}/server + * + * @param statusData 状态数据 + */ + public void pushServerState(Map statusData) { + String topic = buildTopic("server"); + publishJson(topic, statusData); + } + + /** + * 推送客户端状态 + * /lcc/{proj_id}/{env_id}/client + * + * @param clientData 客户端数据 + */ + public void pushClientState(Map clientData) { + String topic = buildTopic("client"); + publishJson(topic, clientData); + } + + /** + * 推送任务更新 + * /lcc/{proj_id}/{env_id}/task + * + * @param taskData 任务数据 + */ + public void pushTaskUpdate(Object taskData) { + String topic = buildTopic("task"); + publishJson(topic, taskData); + } + + /** + * 推送库存更新 + * /lcc/{proj_id}/{env_id}/inv/{catalogCode} + * + * @param catalogCode 货位目录编码 + * @param before 更新前库存 + * @param after 更新后库存 + */ + public void pushInventoryUpdate(String catalogCode, Object before, Object after) { + String topic = buildTopic("inv/" + catalogCode); + + Map data = new HashMap<>(); + data.put("before", before); + data.put("after", after); + + publishJson(topic, data); + } + + /** + * 推送设备状态 + * /lcc/{proj_id}/{env_id}/device/{id}/status + * + * @param deviceId 设备ID + * @param statusData 状态数据 + */ + public void pushDeviceStatus(String deviceId, Map statusData) { + String topic = buildTopic("device/" + deviceId + "/status"); + publishJson(topic, statusData); + } + + /** + * 推送设备存活状态 + * + * @param deviceId 设备ID + * @param online 是否在线 + */ + public void pushDeviceAlive(String deviceId, boolean online) { + String topic = buildTopic("device/" + deviceId + "/alive"); + publishString(topic, online ? "online" : "offline"); + } + + /** + * 推送日志 + * /lcc/{proj_id}/{env_id}/log/{type} + * + * @param logType 日志类型 + * @param logData 日志数据 + */ + public void pushLogs(String logType, Object logData) { + String topic = buildTopic("log/" + logType); + publishJson(topic, logData); + } + + /** + * 推送告警 + * /lcc/{proj_id}/{env_id}/alarm + * + * @param alarmData 告警数据 + */ + public void pushAlarm(Object alarmData) { + String topic = buildTopic("alarm"); + publishJson(topic, alarmData); + } + + /** + * 推送脚本更新 + * /lcc/{proj_id}/script + * + * @param scriptData 脚本数据 + */ + public void pushScriptUpdate(Object scriptData) { + // 脚本系统没有环境ID + String topic = "/lcc/" + this.runtime.projectUuid + "/script"; + publishJson(topic, scriptData); + } + + // ==================== 内部工具方法 ==================== + + /** + * 构建主题路径 + */ + private String buildTopic(String suffix) { + return "/lcc/" + this.runtime.projectUuid + "/" + this.runtime.envId + "/" + suffix; + } + + /** + * 发布JSON数据 + */ + private void publishJson(String topic, Object data) { + try { + String json = JacksonMapper.getInstance().toJson(data); + publish(topic, json); + } catch (Exception e) { + log.error("Failed to serialize JSON for topic: " + topic, e); + } + } + + /** + * 发布字符串数据 + */ + private void publishString(String topic, String message) { + publish(topic, message); + } + + /** + * 通用发布方法 + */ + private void publish(String topic, String payload) { + if (!connected) { + log.error("Attempted to publish while disconnected: {}", topic); + return; + } + + try { + MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8)); + message.setQos(QOS); + message.setRetained(false); + + mqttClient.publish(topic, message); + + log.debug("Published to {}: {}", topic, payload); + } catch (MqttException e) { + log.error("Failed to publish to topic: " + topic, e); + } + } + + // ==================== MQTT 回调方法 ==================== + + @Override + public void disconnected(MqttDisconnectResponse disconnectResponse) { + log.warn("LCC_MQTT disconnected: {}", disconnectResponse); + connectionLock.lock(); + try { + connected = false; + } finally { + connectionLock.unlock(); + } + } + + @Override + public void mqttErrorOccurred(MqttException exception) { + log.error("LCC_MQTT error occurred", exception); + } + + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { + log.debug("Message arrived on topic: {}", topic); + // 作为服务端,我们只发布消息,不接收消息 + // 可以记录或处理意外收到的消息 + } + + @Override + public void deliveryComplete(IMqttToken iMqttToken) { + try { + String[] topics = iMqttToken.getTopics(); + if (topics != null && topics.length > 0) { + log.info("Message delivery confirmed for topic: {}", topics[0]); + } else { + log.info("Message delivery confirmed (no topic info)"); + } + } catch (Exception e) { + log.warn("Error getting delivery token info", e); + } + } + + @Override + public void connectComplete(boolean reconnect, String serverURI) { + BannerUtils.printConfig(log, "LCC_MQTT 开启监听成功", new String[]{ + "brokerUrl: " + serverURI, + "userName: " + this.mqttConfig.getUsername(), + "clientId: " + clientId}); + connectionLock.lock(); + try { + connected = true; + } finally { + connectionLock.unlock(); + } + } + + @Override + public void authPacketArrived(int i, MqttProperties mqttProperties) { + log.info("LCC_MQTT authPacketArrived({}, {})", i, mqttProperties); + } +} diff --git a/servo/src/main/java/com/yvan/mqtt/LccMqttService.java b/servo/src/main/java/com/yvan/mqtt/LccMqttService.java deleted file mode 100644 index ee6bc14..0000000 --- a/servo/src/main/java/com/yvan/mqtt/LccMqttService.java +++ /dev/null @@ -1,349 +0,0 @@ -package com.yvan.mqtt; - -import com.yvan.logisticsEnv.EnvConfig; -import com.yvan.logisticsModel.LogisticsRuntime; -import lombok.Getter; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; -import org.clever.core.BannerUtils; -import org.clever.core.mapper.JacksonMapper; -import org.eclipse.paho.mqttv5.client.*; -import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; -import org.eclipse.paho.mqttv5.common.MqttException; -import org.eclipse.paho.mqttv5.common.MqttMessage; -import org.eclipse.paho.mqttv5.common.packet.MqttProperties; - -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -@Slf4j -public class LccMqttService implements MqttCallback { - private volatile EnvConfig.MqttConfig mqttConfig; - private volatile String clientId; - private volatile MqttClient mqttClient; - private final LogisticsRuntime runtime; - private final MemoryPersistence persistence = new MemoryPersistence(); - - private final Lock connectionLock = new ReentrantLock(); - - // QoS 配置 - private static final int QOS = 1; - - // 重连参数 - private static final int MAX_RETRIES = 5; - private static final int RETRY_INTERVAL = 3; // 秒 - - // 连接状态 - @Getter - private volatile boolean connected = false; - - public LccMqttService(LogisticsRuntime runtime) { - this.runtime = runtime; - } - - /** - * 启动MQTT服务 - */ - @SneakyThrows - public void start(EnvConfig.MqttConfig mqttConfig, String clientId) { - this.mqttConfig = mqttConfig; - this.clientId = clientId; - connectionLock.lock(); - try { - if (connected) { - log.warn("LCC_MQTT service is already started"); - return; - } - - log.info("Starting LCC_MQTT service for project: {}, env: {}", this.runtime.projectUuid, this.runtime.envId); - - // 创建MQTT客户端 - mqttClient = new MqttClient(mqttConfig.getBrokerUrl(), clientId, persistence); - mqttClient.setCallback(this); - - // 配置连接选项 - MqttConnectionOptions options = new MqttConnectionOptions(); - options.setServerURIs(new String[]{mqttConfig.getBrokerUrl()}); - options.setUserName(mqttConfig.getUsername()); - options.setPassword(mqttConfig.getPassword().getBytes()); - options.setAutomaticReconnect(true); - options.setConnectionTimeout(10); - options.setKeepAliveInterval(60); - options.setExecutorServiceTimeout(1); - - // 尝试连接 - int attempts = 0; - while (attempts < MAX_RETRIES && !connected) { - attempts++; - try { - log.info("Connecting to LCC_MQTT broker (attempt {}/{})", attempts, MAX_RETRIES); - mqttClient.connect(options); - connected = true; - log.info("LCC_MQTT connected successfully"); - } catch (MqttException e) { - log.error("LCC_MQTT connection failed (attempt " + attempts + "/{" + MAX_RETRIES + "})", e); - - // 重试前等待 - if (attempts < MAX_RETRIES) { - try { - TimeUnit.SECONDS.sleep(RETRY_INTERVAL); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - break; - } - } - } - } - - if (!connected) { - log.error("Failed to connect to LCC_MQTT broker after {} attempts", MAX_RETRIES); - } - } finally { - connectionLock.unlock(); - } - } - - /** - * 停止MQTT服务 - */ - public void stop() { - connectionLock.lock(); - try { - if (mqttClient != null && mqttClient.isConnected()) { - try { - mqttClient.disconnect(); - mqttClient.close(); - log.info("LCC_MQTT disconnected"); - } catch (MqttException e) { - log.error("Error disconnecting LCC_MQTT", e); - } - } - connected = false; - } finally { - connectionLock.unlock(); - } - } - - // ==================== 消息推送方法 ==================== - - /** - * 推送服务器状态 - * /lcc/{proj_id}/{env_id}/server - * - * @param statusData 状态数据 - */ - public void pushServerState(Map statusData) { - String topic = buildTopic("server"); - publishJson(topic, statusData); - } - - /** - * 推送客户端状态 - * /lcc/{proj_id}/{env_id}/client - * - * @param clientData 客户端数据 - */ - public void pushClientState(Map clientData) { - String topic = buildTopic("client"); - publishJson(topic, clientData); - } - - /** - * 推送任务更新 - * /lcc/{proj_id}/{env_id}/task - * - * @param taskData 任务数据 - */ - public void pushTaskUpdate(Object taskData) { - String topic = buildTopic("task"); - publishJson(topic, taskData); - } - - /** - * 推送库存更新 - * /lcc/{proj_id}/{env_id}/inv/{catalogCode} - * - * @param catalogCode 货位目录编码 - * @param before 更新前库存 - * @param after 更新后库存 - */ - public void pushInventoryUpdate(String catalogCode, Object before, Object after) { - String topic = buildTopic("inv/" + catalogCode); - - Map data = new HashMap<>(); - data.put("before", before); - data.put("after", after); - - publishJson(topic, data); - } - - /** - * 推送设备状态 - * /lcc/{proj_id}/{env_id}/device/{id}/status - * - * @param deviceId 设备ID - * @param statusData 状态数据 - */ - public void pushDeviceStatus(String deviceId, Map statusData) { - String topic = buildTopic("device/" + deviceId + "/status"); - publishJson(topic, statusData); - } - - /** - * 推送设备存活状态 - * - * @param deviceId 设备ID - * @param online 是否在线 - */ - public void pushDeviceAlive(String deviceId, boolean online) { - String topic = buildTopic("device/" + deviceId + "/alive"); - publishString(topic, online ? "online" : "offline"); - } - - /** - * 推送日志 - * /lcc/{proj_id}/{env_id}/log/{type} - * - * @param logType 日志类型 - * @param logData 日志数据 - */ - public void pushLogs(String logType, Object logData) { - String topic = buildTopic("log/" + logType); - publishJson(topic, logData); - } - - /** - * 推送告警 - * /lcc/{proj_id}/{env_id}/alarm - * - * @param alarmData 告警数据 - */ - public void pushAlarm(Object alarmData) { - String topic = buildTopic("alarm"); - publishJson(topic, alarmData); - } - - /** - * 推送脚本更新 - * /lcc/{proj_id}/script - * - * @param scriptData 脚本数据 - */ - public void pushScriptUpdate(Object scriptData) { - // 脚本系统没有环境ID - String topic = "/lcc/" + this.runtime.projectUuid + "/script"; - publishJson(topic, scriptData); - } - - // ==================== 内部工具方法 ==================== - - /** - * 构建主题路径 - */ - private String buildTopic(String suffix) { - return "/lcc/" + this.runtime.projectUuid + "/" + this.runtime.envId + "/" + suffix; - } - - /** - * 发布JSON数据 - */ - private void publishJson(String topic, Object data) { - try { - String json = JacksonMapper.getInstance().toJson(data); - publish(topic, json); - } catch (Exception e) { - log.error("Failed to serialize JSON for topic: " + topic, e); - } - } - - /** - * 发布字符串数据 - */ - private void publishString(String topic, String message) { - publish(topic, message); - } - - /** - * 通用发布方法 - */ - private void publish(String topic, String payload) { - if (!connected) { - log.error("Attempted to publish while disconnected: {}", topic); - return; - } - - try { - MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8)); - message.setQos(QOS); - message.setRetained(false); - - mqttClient.publish(topic, message); - - log.debug("Published to {}: {}", topic, payload); - } catch (MqttException e) { - log.error("Failed to publish to topic: " + topic, e); - } - } - - // ==================== MQTT 回调方法 ==================== - - @Override - public void disconnected(MqttDisconnectResponse disconnectResponse) { - log.warn("LCC_MQTT disconnected: {}", disconnectResponse); - connectionLock.lock(); - try { - connected = false; - } finally { - connectionLock.unlock(); - } - } - - @Override - public void mqttErrorOccurred(MqttException exception) { - log.error("LCC_MQTT error occurred", exception); - } - - @Override - public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { - log.debug("Message arrived on topic: {}", topic); - // 作为服务端,我们只发布消息,不接收消息 - // 可以记录或处理意外收到的消息 - } - - @Override - public void deliveryComplete(IMqttToken iMqttToken) { - try { - String[] topics = iMqttToken.getTopics(); - if (topics != null && topics.length > 0) { - log.info("Message delivery confirmed for topic: {}", topics[0]); - } else { - log.info("Message delivery confirmed (no topic info)"); - } - } catch (Exception e) { - log.warn("Error getting delivery token info", e); - } - } - - @Override - public void connectComplete(boolean reconnect, String serverURI) { - BannerUtils.printConfig(log, "LCC_MQTT 开启监听成功", new String[]{ - "brokerUrl: " + serverURI, - "userName: " + this.mqttConfig.getUsername(), - "clientId: " + clientId}); - connectionLock.lock(); - try { - connected = true; - } finally { - connectionLock.unlock(); - } - } - - @Override - public void authPacketArrived(int i, MqttProperties mqttProperties) { - log.info("LCC_MQTT authPacketArrived({}, {})", i, mqttProperties); - } -} diff --git a/servo/src/main/java/com/yvan/state/VariableMonitor.java b/servo/src/main/java/com/yvan/state/VariableMonitor.java new file mode 100644 index 0000000..eb5bd3c --- /dev/null +++ b/servo/src/main/java/com/yvan/state/VariableMonitor.java @@ -0,0 +1,114 @@ +package com.yvan.state; + +import org.clever.core.mapper.JacksonMapper; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * VariableMonitor 类用于周期性检查某个变量的值是否发生变化。 + * 如果发生变化,就触发回调通知所有监听器。 + *

+ * 本实现使用 Jackson 序列化对象为 JSON 字符串进行比较,适用于: + * - Map + * - List + * - Set + * - 自定义实体类 + * - 嵌套结构 + *

+ * 并自动防止循环引用问题。 + * + * @param 被监控变量的类型 + */ +public class VariableMonitor { + + // 提供当前变量值的方法 + private final Supplier provider; + + // 轮询间隔时间(毫秒) + private final long intervalMillis; + + // 定时任务执行器 + private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + // 当前调度的任务对象 + private ScheduledFuture future; + + // 上一次变量的 JSON 表示 + private String lastJson; + + // 存储所有的监听器 + private final List> listeners = new ArrayList<>(); + + public VariableMonitor(Supplier provider, long intervalMillis) { + this.provider = provider; + this.intervalMillis = intervalMillis; + this.lastJson = JacksonMapper.getInstance().toJson(provider.get()); + } + + /** + * 启动定时轮询任务 + */ + public void start() { + future = scheduler.scheduleAtFixedRate(() -> { + T currentValue = provider.get(); + String currentJson; + currentJson = JacksonMapper.getInstance().toJson(currentValue); + + if (!Objects.equals(lastJson, currentJson)) { + notifyListeners(currentValue); + lastJson = currentJson; + } + }, 0, intervalMillis, TimeUnit.MILLISECONDS); + } + + /** + * 停止定时任务 + */ + public void stop() { + if (future != null) { + future.cancel(false); + } + scheduler.shutdownNow(); + } + + /** + * 添加一个监听器 + * + * @param listener 回调函数,接收新值作为参数 + */ + public void addListener(Consumer listener) { + listeners.add(listener); + } + + /** + * 移除一个监听器 + * + * @param listener 要移除的监听器 + */ + public void removeListener(Consumer listener) { + listeners.remove(listener); + } + + /** + * 通知所有监听器变量发生了变化 + * + * @param newValue 新的变量值 + */ + private void notifyListeners(T newValue) { + for (Consumer listener : listeners) { + try { + listener.accept(newValue); + } catch (Exception e) { + System.err.println("监听器执行出错: " + e.getMessage()); + } + } + } +}