diff --git a/servo/src/main/java/com/galaxis/rcs/common/enums/AgvEventType.java b/servo/src/main/java/com/galaxis/rcs/common/enums/AgvEventType.java deleted file mode 100644 index e7555c6..0000000 --- a/servo/src/main/java/com/galaxis/rcs/common/enums/AgvEventType.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.galaxis.rcs.common.enums; - -public enum AgvEventType { - OFFLINE, ONLINE, FREE, - PLAN_COMPLETE, PLAN_ACCEPT, PLAN_CANCEL, - PLAN_PAUSE, PLAN_RESUME, - DEVICE_TASK_COMPLETE, DEVICE_TASK_EXCEPTION, - POS_CHANGED, DIRECTION_CHANGED, - LOW_BATTERY, STUCK -} diff --git a/servo/src/main/java/com/galaxis/rcs/ptr/AgvEventManager.java b/servo/src/main/java/com/galaxis/rcs/ptr/AgvEventManager.java deleted file mode 100644 index a0c3bc8..0000000 --- a/servo/src/main/java/com/galaxis/rcs/ptr/AgvEventManager.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.galaxis.rcs.ptr; - -import com.galaxis.rcs.common.enums.AgvEventType; -import com.yvan.logisticsModel.LogisticsRuntime; -import lombok.extern.slf4j.Slf4j; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.*; - -@Slf4j -public class AgvEventManager { - private final Map> listeners = new ConcurrentHashMap<>(); - private final Map> stuckMonitors = new ConcurrentHashMap<>(); - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(4); - private final LogisticsRuntime runtime; - - public AgvEventManager(LogisticsRuntime runtime) { - this.runtime = runtime; - } - - public void registerListener(AgvEventType type, AgvEventListener listener) { - listeners.computeIfAbsent(type, k -> new CopyOnWriteArrayList<>()).add(listener); - } - - public void unregisterListener(AgvEventType type, AgvEventListener listener) { - List list = listeners.get(type); - if (list != null) { - list.remove(listener); - } - } - - public void fireEvent(AgvEventType type, Object... args) { - List list = listeners.get(type); - if (list != null) { - for (AgvEventListener listener : list) { - try { - listener.onEvent(type, args); - } catch (Exception e) { - log.error("Error handling event {}", type, e); - } - } - } - } - - public void monitorMovement(String agvId, PosDirection startPos, int delayOfSeconds) { - // 取消现有的监控 - cancelStuckMonitor(agvId); - - ScheduledFuture future = scheduler.schedule(() -> { - PtrAgvItem agv = (PtrAgvItem) this.runtime.executorItemMap.get(agvId); - if (agv != null && agv.isSamePosition(startPos)) { - fireEvent(AgvEventType.STUCK, agv, delayOfSeconds); - } - }, delayOfSeconds, TimeUnit.SECONDS); - - stuckMonitors.put(agvId, future); - } - - public void cancelStuckMonitor(String agvId) { - ScheduledFuture future = stuckMonitors.remove(agvId); - if (future != null) { - future.cancel(false); - } - } -} diff --git a/servo/src/main/java/com/galaxis/rcs/ptr/PtrAgvItem.java b/servo/src/main/java/com/galaxis/rcs/ptr/PtrAgvItem.java index 1d2b6ff..e53af26 100644 --- a/servo/src/main/java/com/galaxis/rcs/ptr/PtrAgvItem.java +++ b/servo/src/main/java/com/galaxis/rcs/ptr/PtrAgvItem.java @@ -2,7 +2,7 @@ package com.galaxis.rcs.ptr; import com.fasterxml.jackson.annotation.JsonIgnore; import com.galaxis.rcs.common.entity.RcsTaskPlan; -import com.galaxis.rcs.common.enums.AgvEventType; +import com.yvan.event.AgvEventType; import com.galaxis.rcs.common.enums.LCCDirection; import com.galaxis.rcs.common.enums.PlanTaskStatus; import com.galaxis.rcs.common.enums.PlanTaskType; @@ -15,6 +15,7 @@ import com.galaxis.rcs.ptr.sendEntity.RcsSRMessage; import com.galaxis.rcs.ptr.sendEntity.RcsSetLocationMessage; import com.google.common.base.Joiner; import com.google.common.collect.Queues; +import com.yvan.event.EventManager; import com.yvan.logisticsModel.ExecutorItem; import com.yvan.logisticsModel.LogisticsRuntime; import com.yvan.logisticsModel.StaticItem; @@ -73,8 +74,6 @@ public abstract class PtrAgvItem extends ExecutorItem { private volatile PosDirection lastPausedPosition; private volatile boolean isOnline = false; - private final Set eventListeners = new CopyOnWriteArraySet<>(); - // 执行中的任务 @JsonIgnore public List runningDeviceTaskList = new ArrayList<>(); @@ -91,6 +90,7 @@ public abstract class PtrAgvItem extends ExecutorItem { @JsonIgnore final BlockingQueue deviceTaskQueue = Queues.newArrayBlockingQueue(BLOCKING_QUEUE_CAPACITY); + @JsonIgnore final Cl2DeviceConnector cl2DeviceConnector = new Cl2DeviceConnector(this.runtime); @JsonIgnore @@ -159,10 +159,14 @@ public abstract class PtrAgvItem extends ExecutorItem { this.planTaskSequence = taskSequence; buildPlanToDeviceTask(); - fireEvent(AgvEventType.PLAN_ACCEPT, this); + fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_ACCEPT, this, taskSequence); connectorThread.resumeProcessing(); } + private void fireEvent(AgvEventType agvEventType, Object... args) { + this.runtime.eventManager.fireEvent(agvEventType, args); + } + public synchronized void pauseTask() { if (planTaskSequence == null) { throw new IllegalStateException("No active task to pause"); @@ -184,7 +188,7 @@ public abstract class PtrAgvItem extends ExecutorItem { log.error("Failed to send stop command to AGV {}", this.getId(), e); } - fireEvent(AgvEventType.PLAN_PAUSE, this); + fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_PAUSE, this); } public synchronized void resumeTask() { @@ -202,7 +206,7 @@ public abstract class PtrAgvItem extends ExecutorItem { isPaused = false; connectorThread.resumeProcessing(); - fireEvent(AgvEventType.PLAN_RESUME, this); + fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_RESUME, this); } @SneakyThrows @@ -217,7 +221,7 @@ public abstract class PtrAgvItem extends ExecutorItem { deviceTaskQueue.clear(); } - fireEvent(AgvEventType.PLAN_CANCEL, this); + fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_CANCEL, this); } @SneakyThrows @@ -284,7 +288,7 @@ public abstract class PtrAgvItem extends ExecutorItem { } if (planTaskSequence != null && planTaskSequence.isAllCompleted()) { - fireEvent(AgvEventType.PLAN_COMPLETE, this); + fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_COMPLETE, this); this.runningDeviceTaskList.clear(); planTaskSequence = null; } @@ -306,13 +310,12 @@ public abstract class PtrAgvItem extends ExecutorItem { // 触发位置变化事件 if (oldX != logicX || oldY != logicY) { fireEvent(AgvEventType.POS_CHANGED, this, - new PosDirection(oldX, oldY, oldDirection), - new PosDirection(logicX, logicY, direction)); + new PosDirection(logicX, logicY, direction), + new PosDirection(oldX, oldY, oldDirection)); } if (oldDirection != direction) { - fireEvent(AgvEventType.DIRECTION_CHANGED, this, - oldDirection, direction); + fireEvent(AgvEventType.DIRECTION_CHANGED, this, direction, oldDirection); } boolean needCompute = false; @@ -341,14 +344,15 @@ public abstract class PtrAgvItem extends ExecutorItem { // 更新计划任务 RcsTaskPlan planTask = planTaskSequence.getByPlanTaskId(task.movePlanTaskId); - if (planTask != null) { + if (planTask != null && !PlanTaskStatus.FINISHED.toString().equals(planTask.getPlanTaskStatus())) { + fireEvent(AgvEventType.DEVICE_TASK_COMPLETE, this, planTaskSequence, planTask); planTask.setPlanTaskStatus(PlanTaskStatus.FINISHED.toString()); planTaskSequence.savePlanTask(planTask); } } if (planTaskSequence.isAllCompleted()) { - fireEvent(AgvEventType.PLAN_COMPLETE, this); + fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_COMPLETE, this); this.runningDeviceTaskList.clear(); planTaskSequence = null; } @@ -446,21 +450,6 @@ public abstract class PtrAgvItem extends ExecutorItem { fireEvent(AgvEventType.OFFLINE, this); } - // 事件监听管理 - public void addEventListener(AgvEventListener listener) { - eventListeners.add(listener); - } - - public void removeEventListener(AgvEventListener listener) { - eventListeners.remove(listener); - } - - private void fireEvent(AgvEventType type, Object... args) { - for (AgvEventListener listener : eventListeners) { - listener.onEvent(type, args); - } - } - private String getTaskStatus() { if (planTaskSequence == null) return "IDLE"; if (isPaused) return "PAUSED"; @@ -801,11 +790,6 @@ public abstract class PtrAgvItem extends ExecutorItem { // planQueue.addAll(sequence.taskList); deviceTaskQueue.addAll(deviceTaskList); - - String json = JsonWrapper.toJson(deviceTaskList); - log.info("deviceTaskList: {}", json); - - // TODO: 开启轮询线程,等待下一个待执行任务 } public boolean isSamePosition(PosDirection startPos) { diff --git a/servo/src/main/java/com/galaxis/rcs/ptr/AgvEventListener.java b/servo/src/main/java/com/yvan/event/AgvEventListener.java similarity index 53% rename from servo/src/main/java/com/galaxis/rcs/ptr/AgvEventListener.java rename to servo/src/main/java/com/yvan/event/AgvEventListener.java index c3f3a9b..19c7119 100644 --- a/servo/src/main/java/com/galaxis/rcs/ptr/AgvEventListener.java +++ b/servo/src/main/java/com/yvan/event/AgvEventListener.java @@ -1,6 +1,4 @@ -package com.galaxis.rcs.ptr; - -import com.galaxis.rcs.common.enums.AgvEventType; +package com.yvan.event; public interface AgvEventListener { void onEvent(AgvEventType type, Object... args); diff --git a/servo/src/main/java/com/yvan/event/AgvEventType.java b/servo/src/main/java/com/yvan/event/AgvEventType.java new file mode 100644 index 0000000..579517d --- /dev/null +++ b/servo/src/main/java/com/yvan/event/AgvEventType.java @@ -0,0 +1,82 @@ +package com.yvan.event; + +public enum AgvEventType { + /** + * 设备离线 + */ + OFFLINE, + /** + * 设备上线 + */ + ONLINE, + /** + * 设备阻挡 + */ + BLOCKED, + /** + * 设备阻挡恢复 + */ + BLOCKED_RECOVER, + /** + * 设备空闲 + */ + FREE, + /** + * 规划任务序列全部完成 + * (ExecutorItem sender) + */ + PLAN_TASK_SEQUENCE_COMPLETE, + /** + * 某一个规划任务被完成 + * (ExecutorItem sender, PlanTaskSequence taskSequence, RcsTaskPlan taskPlan) + */ + PLAN_TASK_COMPLETE, + /** + * 规划任务异常 + */ + PLAN_TASK_EXCEPTION, + /** + * 规划任务已接受 + * (ExecutorItem sender, PlanTaskSequence taskSequence) + */ + PLAN_TASK_SEQUENCE_ACCEPT, + /** + * 规划任务已取消 + * (ExecutorItem sender) + */ + PLAN_TASK_SEQUENCE_CANCEL, + /** + * 规划任务已暂停 + * (ExecutorItem sender) + */ + PLAN_TASK_SEQUENCE_PAUSE, + /** + * 规划任务已恢复 + * (ExecutorItem sender) + */ + PLAN_TASK_SEQUENCE_RESUME, + /** + * 设备任务已完成 + * (ExecutorItem sender, PtrAgvDeviceTask deviceTask) + */ + DEVICE_TASK_COMPLETE, + /** + * 设备任务异常 + */ + DEVICE_TASK_EXCEPTION, + /** + * 设备位置改变 + * (ExecutorItem sender, PosDirection newPosDirection, PosDirection originPosDirection) + */ + POS_CHANGED, + /** + * 设备姿态改变 + * (ExecutorItem sender, PosDirection newPosDirection, PosDirection originPosDirection) + */ + DIRECTION_CHANGED, + /** + * 设备低电量 + * (ExecutorItem sender) + */ + LOW_BATTERY, +} diff --git a/servo/src/main/java/com/yvan/event/EventManager.java b/servo/src/main/java/com/yvan/event/EventManager.java new file mode 100644 index 0000000..7fb5ec6 --- /dev/null +++ b/servo/src/main/java/com/yvan/event/EventManager.java @@ -0,0 +1,24 @@ +package com.yvan.event; + +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +public class EventManager { + + private final Set eventListeners = new CopyOnWriteArraySet<>(); + + public void fireEvent(AgvEventType type, Object... args) { + for (AgvEventListener listener : eventListeners) { + listener.onEvent(type, args); + } + } + + // 事件监听管理 + public void subscribe(AgvEventListener listener) { + eventListeners.add(listener); + } + + public void unsubscribe(AgvEventListener listener) { + eventListeners.remove(listener); + } +} diff --git a/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java b/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java index 820de2d..076b5e0 100644 --- a/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java +++ b/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java @@ -1,10 +1,8 @@ package com.yvan.logisticsModel; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.galaxis.rcs.connector.cl2.Cl2Item; import com.galaxis.rcs.plan.path.NavigationGraph; import com.galaxis.rcs.plan.path.PtrPathPlanner; -import com.galaxis.rcs.ptr.AgvEventManager; import com.galaxis.rcs.ptr.AmrMessageHandler; import com.galaxis.rcs.task.TaskDispatchFactory; import com.galaxis.rcs.task.TaskService; @@ -12,7 +10,8 @@ import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.yvan.mqtt.LccMqttService; +import com.yvan.event.EventManager; +import com.yvan.mqtt.FrontendMessagePushService; import com.yvan.redis.LccRedisService; import com.yvan.workbench.model.entity.LccProject; import com.yvan.workbench.model.entity.LccProjectEnv; @@ -28,7 +27,7 @@ import java.util.Map; import java.util.Set; /** - * 物流上下文运行时 + * 物流运行时 */ @Slf4j public class LogisticsRuntime { @@ -85,13 +84,11 @@ public class LogisticsRuntime { /** * 任务服务 */ - @JsonIgnore public final TaskService taskService = new TaskService(this); /** * 任务分配服务 */ - @JsonIgnore public final TaskDispatchFactory taskDispatchFactory = new TaskDispatchFactory(this); /** @@ -102,7 +99,6 @@ public class LogisticsRuntime { /** * AGV导航地图 车类型 t -> NavigationGraph */ - @JsonIgnore public final Map pathPlannerMap = Maps.newHashMap(); /** @@ -110,15 +106,14 @@ public class LogisticsRuntime { */ public final Map floorMap = Maps.newHashMap(); - @JsonIgnore public final AmrMessageHandler amrMessageHandler = new AmrMessageHandler(this); - public final AgvEventManager agvEventManager = new AgvEventManager(this); - - public final LccMqttService lccMqttService = new LccMqttService(this); + public final FrontendMessagePushService frontendMessagePushService = new FrontendMessagePushService(this); public final LccRedisService lccRedisService = new LccRedisService(this); + public final EventManager eventManager = new EventManager(); + public LogisticsRuntime(LccProject project, LccProjectEnv env, String serverId) { this.project = project; this.env = env; @@ -244,7 +239,7 @@ public class LogisticsRuntime { // 启动 MQTT 监听 this.amrMessageHandler.start(this.env.getEnvConfig().getMqtt(), this.serverId); - this.lccMqttService.start(this.env.getEnvConfig().getMqtt(), this.serverId + "_lcc_send"); + this.frontendMessagePushService.start(this.env.getEnvConfig().getMqtt(), this.serverId + "_lcc_send"); this.lccRedisService.start(this.env.getEnvConfig().getRedis(), this.serverId); // 开启所有机器人的任务处理 @@ -314,7 +309,7 @@ public class LogisticsRuntime { // 停止 MQTT 监听 this.amrMessageHandler.stop(); - this.lccMqttService.stop(); + this.frontendMessagePushService.stop(); this.lccRedisService.stop(); BannerUtils.printConfig(log, "LogisticsRuntime stop.", new String[]{ diff --git a/servo/src/main/java/com/yvan/mqtt/LccMqttService.java b/servo/src/main/java/com/yvan/mqtt/FrontendMessagePushService.java similarity index 98% rename from servo/src/main/java/com/yvan/mqtt/LccMqttService.java rename to servo/src/main/java/com/yvan/mqtt/FrontendMessagePushService.java index ee6bc14..a22c7c5 100644 --- a/servo/src/main/java/com/yvan/mqtt/LccMqttService.java +++ b/servo/src/main/java/com/yvan/mqtt/FrontendMessagePushService.java @@ -21,7 +21,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @Slf4j -public class LccMqttService implements MqttCallback { +public class FrontendMessagePushService implements MqttCallback { private volatile EnvConfig.MqttConfig mqttConfig; private volatile String clientId; private volatile MqttClient mqttClient; @@ -41,7 +41,7 @@ public class LccMqttService implements MqttCallback { @Getter private volatile boolean connected = false; - public LccMqttService(LogisticsRuntime runtime) { + public FrontendMessagePushService(LogisticsRuntime runtime) { this.runtime = runtime; } diff --git a/servo/src/main/java/com/yvan/state/VariableMonitor.java b/servo/src/main/java/com/yvan/state/VariableMonitor.java new file mode 100644 index 0000000..eb5bd3c --- /dev/null +++ b/servo/src/main/java/com/yvan/state/VariableMonitor.java @@ -0,0 +1,114 @@ +package com.yvan.state; + +import org.clever.core.mapper.JacksonMapper; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * VariableMonitor 类用于周期性检查某个变量的值是否发生变化。 + * 如果发生变化,就触发回调通知所有监听器。 + *

+ * 本实现使用 Jackson 序列化对象为 JSON 字符串进行比较,适用于: + * - Map + * - List + * - Set + * - 自定义实体类 + * - 嵌套结构 + *

+ * 并自动防止循环引用问题。 + * + * @param 被监控变量的类型 + */ +public class VariableMonitor { + + // 提供当前变量值的方法 + private final Supplier provider; + + // 轮询间隔时间(毫秒) + private final long intervalMillis; + + // 定时任务执行器 + private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + // 当前调度的任务对象 + private ScheduledFuture future; + + // 上一次变量的 JSON 表示 + private String lastJson; + + // 存储所有的监听器 + private final List> listeners = new ArrayList<>(); + + public VariableMonitor(Supplier provider, long intervalMillis) { + this.provider = provider; + this.intervalMillis = intervalMillis; + this.lastJson = JacksonMapper.getInstance().toJson(provider.get()); + } + + /** + * 启动定时轮询任务 + */ + public void start() { + future = scheduler.scheduleAtFixedRate(() -> { + T currentValue = provider.get(); + String currentJson; + currentJson = JacksonMapper.getInstance().toJson(currentValue); + + if (!Objects.equals(lastJson, currentJson)) { + notifyListeners(currentValue); + lastJson = currentJson; + } + }, 0, intervalMillis, TimeUnit.MILLISECONDS); + } + + /** + * 停止定时任务 + */ + public void stop() { + if (future != null) { + future.cancel(false); + } + scheduler.shutdownNow(); + } + + /** + * 添加一个监听器 + * + * @param listener 回调函数,接收新值作为参数 + */ + public void addListener(Consumer listener) { + listeners.add(listener); + } + + /** + * 移除一个监听器 + * + * @param listener 要移除的监听器 + */ + public void removeListener(Consumer listener) { + listeners.remove(listener); + } + + /** + * 通知所有监听器变量发生了变化 + * + * @param newValue 新的变量值 + */ + private void notifyListeners(T newValue) { + for (Consumer listener : listeners) { + try { + listener.accept(newValue); + } catch (Exception e) { + System.err.println("监听器执行出错: " + e.getMessage()); + } + } + } +}