From 7cb0785f12e34d6750576a738c3c3f36b9e2edbc Mon Sep 17 00:00:00 2001 From: luoyifan Date: Sat, 5 Jul 2025 00:25:11 +0800 Subject: [PATCH] BackendMessageReceiver / FrontendMessagePushService --- .../com/galaxis/rcs/plan/PlanTaskSequence.java | 14 +++- .../main/java/com/galaxis/rcs/ptr/PtrAgvItem.java | 5 +- .../src/main/java/com/yvan/entity/AgvStatusVo.java | 31 +++++++ .../com/yvan/logisticsModel/LogisticsRuntime.java | 97 +++++++++++++++++++--- .../yvan/pusher/FrontendMessagePushService.java | 49 ++++++----- 5 files changed, 158 insertions(+), 38 deletions(-) create mode 100644 servo/src/main/java/com/yvan/entity/AgvStatusVo.java diff --git a/servo/src/main/java/com/galaxis/rcs/plan/PlanTaskSequence.java b/servo/src/main/java/com/galaxis/rcs/plan/PlanTaskSequence.java index 34cbac7..cd98efb 100644 --- a/servo/src/main/java/com/galaxis/rcs/plan/PlanTaskSequence.java +++ b/servo/src/main/java/com/galaxis/rcs/plan/PlanTaskSequence.java @@ -93,7 +93,7 @@ public class PlanTaskSequence { } // 添加取货动作 - public RcsTaskPlan addLoad( String rackId, int bay, int level, int cell) { + public RcsTaskPlan addLoad(String rackId, int bay, int level, int cell) { RcsTaskPlan task = this.createTaskPlanEntity(PlanTaskType.LOAD.toString()); task.setTargetId(rackId); task.setTargetBay(bay); @@ -199,7 +199,17 @@ public class PlanTaskSequence { return this.taskList.isEmpty(); } - public int size() { + public int taskTotalCount() { return this.taskList.size(); } + + public int completedCount() { + int count = 0; + for (RcsTaskPlan task : taskList) { + if (PlanTaskStatus.FINISHED.toString().equals(task.getPlanTaskStatus())) { + count++; + } + } + return count; + } } diff --git a/servo/src/main/java/com/galaxis/rcs/ptr/PtrAgvItem.java b/servo/src/main/java/com/galaxis/rcs/ptr/PtrAgvItem.java index 4441b44..05f3320 100644 --- a/servo/src/main/java/com/galaxis/rcs/ptr/PtrAgvItem.java +++ b/servo/src/main/java/com/galaxis/rcs/ptr/PtrAgvItem.java @@ -16,6 +16,7 @@ import com.google.common.collect.Queues; import com.yvan.logisticsModel.ExecutorItem; import com.yvan.logisticsModel.LogisticsRuntime; import com.yvan.logisticsModel.StaticItem; +import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.clever.core.Conv; @@ -60,8 +61,10 @@ public abstract class PtrAgvItem extends ExecutorItem { public short direction; // agv当前转动角度值 public double orientation; + public boolean isBlocked = false; // 任务模式 + @Getter private AmrTaskMode __taskMode; private volatile boolean isPaused = false; @@ -446,7 +449,7 @@ public abstract class PtrAgvItem extends ExecutorItem { this.runtime.eventManager.fireOfflineEvent(this); } - private String getTaskStatus() { + public String getTaskStatus() { if (planTaskSequence == null) return "IDLE"; if (isPaused) return "PAUSED"; return "EXECUTING"; diff --git a/servo/src/main/java/com/yvan/entity/AgvStatusVo.java b/servo/src/main/java/com/yvan/entity/AgvStatusVo.java new file mode 100644 index 0000000..dcfd921 --- /dev/null +++ b/servo/src/main/java/com/yvan/entity/AgvStatusVo.java @@ -0,0 +1,31 @@ +package com.yvan.entity; + +import com.galaxis.rcs.common.enums.BizTaskStatus; +import com.galaxis.rcs.common.enums.BizTaskType; +import com.galaxis.rcs.common.enums.LCCDirection; +import com.galaxis.rcs.ptr.AmrTaskMode; + +public record AgvStatusVo(String id, + String type, + double x, + double y, + double z, + int logicX, + int logicY, + LCCDirection direction, + double orientation, + double soc, + AmrTaskMode mode, + String taskStatus, + boolean isBlocked, + int taskCompleted, + int taskTotalCount, + // 业务任务ID + Long bizTaskId, + BizTaskType bizTaskType, + BizTaskStatus bizTaskStatus, + String bizTaskFrom, + String bizTaskTo, + String bizLpn) { + +} diff --git a/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java b/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java index e471c72..019c4b4 100644 --- a/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java +++ b/servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java @@ -1,25 +1,30 @@ package com.yvan.logisticsModel; import com.galaxis.rcs.common.entity.RcsTaskPlan; +import com.galaxis.rcs.common.enums.BizTaskStatus; +import com.galaxis.rcs.common.enums.BizTaskType; import com.galaxis.rcs.common.enums.PlanTaskType; import com.galaxis.rcs.connector.cl2.Cl2Item; import com.galaxis.rcs.inv.InvManager; import com.galaxis.rcs.plan.PlanTaskSequence; import com.galaxis.rcs.plan.path.NavigationGraph; +import com.galaxis.rcs.plan.path.PathUtils; import com.galaxis.rcs.plan.path.PtrPathPlanner; import com.galaxis.rcs.ptr.AmrMessageHandler; +import com.galaxis.rcs.ptr.PtrAgvItem; import com.galaxis.rcs.task.TaskDispatchFactory; import com.galaxis.rcs.task.TaskService; import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.yvan.entity.AgvStatusVo; +import com.yvan.entity.LccProject; +import com.yvan.entity.LccProjectEnv; import com.yvan.event.AgvEventManager; import com.yvan.event.AgvEventType; import com.yvan.pusher.FrontendMessagePushService; import com.yvan.redis.LccRedisService; -import com.yvan.entity.LccProject; -import com.yvan.entity.LccProjectEnv; import lombok.extern.slf4j.Slf4j; import org.clever.core.BannerUtils; import org.clever.core.Conv; @@ -28,7 +33,10 @@ import org.clever.data.jdbc.QueryDSL; import org.clever.data.redis.Redis; import org.clever.data.redis.RedisAdmin; -import java.util.*; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Set; /** * 物流运行时 @@ -128,10 +136,10 @@ public class LogisticsRuntime { this.isVirtual = env.getIsVirtual(); this.serverId = serverId; - this.setupEventHandle(); + this.setupAgvEventHandle(); } - private void setupEventHandle() { + private void setupAgvEventHandle() { eventManager.subscribe((AgvEventType type, Object... args) -> { ExecutorItem sender = (ExecutorItem) args[0]; @@ -139,6 +147,8 @@ public class LogisticsRuntime { for (int i = 1; i < args.length; i++) { eventArgs[i - 1] = Conv.asString(args[i]); } + + // ===================== 库存变化处理 ===================== if (type == AgvEventType.PLAN_TASK_COMPLETE) { PlanTaskSequence taskSequence = (PlanTaskSequence) args[1]; RcsTaskPlan taskPlan = (RcsTaskPlan) args[2]; @@ -150,7 +160,72 @@ public class LogisticsRuntime { // 处理库存变化 agv -> rack changeInvOfUnload(taskSequence, taskPlan); } + } + + // ===================== 推送 AGV 事件到前端 ===================== + switch (type) { + case ONLINE: + // AGV上线 + this.frontendMessagePushService.pushDeviceAlive(sender.id, sender.getT(), true); + break; + case OFFLINE: + // AGV下线 + this.frontendMessagePushService.pushDeviceAlive(sender.id, sender.getT(), true); + break; + + case FREE: + case PLAN_TASK_SEQUENCE_COMPLETE: + case PLAN_TASK_COMPLETE: + case PLAN_TASK_EXCEPTION: + case PLAN_TASK_SEQUENCE_ACCEPT: + case PLAN_TASK_SEQUENCE_CANCEL: + case PLAN_TASK_SEQUENCE_PAUSE: + case PLAN_TASK_SEQUENCE_RESUME: + case BLOCKED: + case BLOCKED_RECOVER: + case POS_CHANGED: + case DIRECTION_CHANGED: + case LOW_BATTERY: + case MODE_CHANGED: + // 设备状态变化 + if (sender instanceof PtrAgvItem) { + var ptr = (PtrAgvItem) sender; + int taskCompleted = 0; + int taskTotalCount = 0; + if (ptr.planTaskSequence != null) { + taskCompleted = ptr.planTaskSequence.completedCount(); + taskTotalCount = ptr.planTaskSequence.taskTotalCount(); + } + var status = new AgvStatusVo( + sender.id, + sender.getT(), + ptr.x, + ptr.y, + ptr.z, + ptr.logicX, + ptr.logicY, + PathUtils.getDirectionByArmDirection(ptr.direction), + ptr.orientation, + ptr.battery.SOC, + ptr.get__taskMode(), + ptr.getTaskStatus(), + ptr.isBlocked, + taskCompleted, + taskTotalCount, + ptr.planTaskSequence == null ? null : ptr.planTaskSequence.bizTask.getBizTaskId(), + ptr.planTaskSequence == null ? null : BizTaskType.fromString(ptr.planTaskSequence.bizTask.getBizType()), + ptr.planTaskSequence == null ? null : BizTaskStatus.fromString(ptr.planTaskSequence.bizTask.getBizTaskStatus()), + ptr.planTaskSequence == null ? null : ptr.planTaskSequence.bizTask.getTaskFrom(), + ptr.planTaskSequence == null ? null : ptr.planTaskSequence.bizTask.getTaskTo(), + ptr.planTaskSequence == null ? null : ptr.planTaskSequence.bizTask.getLpn() + ); + this.frontendMessagePushService.pushDeviceStatus(sender.id, status); + + } else { + log.error("AGV事件类型 {} 仅支持 PtrAgvItem 类型的执行器", type); + } + break; } BannerUtils.printConfig(log, this.projectUuid + "(" + this.envId + ") " + type + " AGV:" + sender.getId(), eventArgs); @@ -161,28 +236,24 @@ public class LogisticsRuntime { * 库存转移 AGV->货架 */ private void changeInvOfUnload(PlanTaskSequence taskSequence, RcsTaskPlan taskPlan) { + String lpn = taskSequence.carryLpn; queryDSL.beginTX(status -> { - String lpn = taskSequence.carryLpn; - InvManager.invSave(this.envId, taskSequence.bizTask.getBizTaskId(), lpn, taskSequence.executorVo.getLocCode(), -taskSequence.carryQty); InvManager.invSave(this.envId, taskSequence.bizTask.getBizTaskId(), lpn, taskSequence.unloadBasLocationVo.getLocCode(), taskSequence.carryQty); - - this.frontendMessagePushService.pushInventoryUpdate(lpn, taskSequence.executorVo, taskSequence.unloadBasLocationVo, taskSequence.carryQty); }); + this.frontendMessagePushService.pushInvUpdate(lpn, taskSequence.executorVo, taskSequence.unloadBasLocationVo, taskSequence.carryQty); } /** * 库存转移 货架->AGV */ private void changeInvOfLoad(PlanTaskSequence taskSequence, RcsTaskPlan taskPlan) { + String lpn = taskSequence.carryLpn; queryDSL.beginTX(status -> { - String lpn = taskSequence.carryLpn; - InvManager.invSave(this.envId, taskSequence.bizTask.getBizTaskId(), lpn, taskSequence.loadBasLocationVo.getLocCode(), -taskSequence.carryQty); InvManager.invSave(this.envId, taskSequence.bizTask.getBizTaskId(), lpn, taskSequence.executorVo.getLocCode(), taskSequence.carryQty); - - this.frontendMessagePushService.pushInventoryUpdate(lpn, taskSequence.loadBasLocationVo, taskSequence.executorVo, taskSequence.carryQty); }); + this.frontendMessagePushService.pushInvUpdate(lpn, taskSequence.loadBasLocationVo, taskSequence.executorVo, taskSequence.carryQty); } /** diff --git a/servo/src/main/java/com/yvan/pusher/FrontendMessagePushService.java b/servo/src/main/java/com/yvan/pusher/FrontendMessagePushService.java index c58ddff..7cbcbe6 100644 --- a/servo/src/main/java/com/yvan/pusher/FrontendMessagePushService.java +++ b/servo/src/main/java/com/yvan/pusher/FrontendMessagePushService.java @@ -1,5 +1,6 @@ package com.yvan.pusher; +import com.yvan.entity.AgvStatusVo; import com.yvan.entity.BasLocationVo; import com.yvan.logisticsEnv.EnvConfig; import com.yvan.logisticsModel.LogisticsRuntime; @@ -56,11 +57,11 @@ public class FrontendMessagePushService implements MqttCallback { connectionLock.lock(); try { if (connected) { - log.warn("LCC_MQTT service is already started"); + log.warn("FRONTEND_MQTT service is already started"); return; } - log.info("Starting LCC_MQTT service for project: {}, env: {}", this.runtime.projectUuid, this.runtime.envId); + log.info("Starting FRONTEND_MQTT service for project: {}, env: {}", this.runtime.projectUuid, this.runtime.envId); // 创建MQTT客户端 mqttClient = new MqttClient(mqttConfig.getBrokerUrl(), clientId, persistence); @@ -81,12 +82,12 @@ public class FrontendMessagePushService implements MqttCallback { while (attempts < MAX_RETRIES && !connected) { attempts++; try { - log.info("Connecting to LCC_MQTT broker (attempt {}/{})", attempts, MAX_RETRIES); + log.info("Connecting to FRONTEND_MQTT broker (attempt {}/{})", attempts, MAX_RETRIES); mqttClient.connect(options); connected = true; - log.info("LCC_MQTT connected successfully"); + log.info("FRONTEND_MQTT connected successfully"); } catch (MqttException e) { - log.error("LCC_MQTT connection failed (attempt " + attempts + "/{" + MAX_RETRIES + "})", e); + log.error("FRONTEND_MQTT connection failed (attempt " + attempts + "/{" + MAX_RETRIES + "})", e); // 重试前等待 if (attempts < MAX_RETRIES) { @@ -101,7 +102,7 @@ public class FrontendMessagePushService implements MqttCallback { } if (!connected) { - log.error("Failed to connect to LCC_MQTT broker after {} attempts", MAX_RETRIES); + log.error("Failed to connect to FRONTEND_MQTT broker after {} attempts", MAX_RETRIES); } } finally { connectionLock.unlock(); @@ -118,9 +119,9 @@ public class FrontendMessagePushService implements MqttCallback { try { mqttClient.disconnect(); mqttClient.close(); - log.info("LCC_MQTT disconnected"); + log.info("FRONTEND_MQTT disconnected"); } catch (MqttException e) { - log.error("Error disconnecting LCC_MQTT", e); + log.error("Error disconnecting FRONTEND_MQTT", e); } } connected = false; @@ -172,7 +173,7 @@ public class FrontendMessagePushService implements MqttCallback { * @param before 更新前库存 * @param after 更新后库存 */ - public void pushInventoryUpdate(String lpn, BasLocationVo before, BasLocationVo after, int qty) { + public void pushInvUpdate(String lpn, BasLocationVo before, BasLocationVo after, int qty) { String topic = buildTopic("inv"); Map data = new HashMap<>(); @@ -191,7 +192,7 @@ public class FrontendMessagePushService implements MqttCallback { * @param deviceId 设备ID * @param statusData 状态数据 */ - public void pushDeviceStatus(String deviceId, Map statusData) { + public void pushDeviceStatus(String deviceId, AgvStatusVo statusData) { String topic = buildTopic("device/" + deviceId + "/status"); publishJson(topic, statusData); } @@ -200,11 +201,18 @@ public class FrontendMessagePushService implements MqttCallback { * 推送设备存活状态 * * @param deviceId 设备ID + * @param type 设备类型 * @param online 是否在线 */ - public void pushDeviceAlive(String deviceId, boolean online) { + public void pushDeviceAlive(String deviceId, String type, boolean online) { String topic = buildTopic("device/" + deviceId + "/alive"); - publishString(topic, online ? "online" : "offline"); + + Map data = new HashMap<>(); + data.put("id", deviceId); + data.put("type", type); + data.put("online", online); + + publishJson(topic, data); } /** @@ -255,12 +263,8 @@ public class FrontendMessagePushService implements MqttCallback { * 发布JSON数据 */ private void publishJson(String topic, Object data) { - try { - String json = JacksonMapper.getInstance().toJson(data); - publish(topic, json); - } catch (Exception e) { - log.error("Failed to serialize JSON for topic: " + topic, e); - } + String json = JacksonMapper.getInstance().toJson(data); + publish(topic, json); } /** @@ -296,7 +300,7 @@ public class FrontendMessagePushService implements MqttCallback { @Override public void disconnected(MqttDisconnectResponse disconnectResponse) { - log.warn("LCC_MQTT disconnected: {}", disconnectResponse); + log.warn("FRONTEND_MQTT disconnected: {}", disconnectResponse); connectionLock.lock(); try { connected = false; @@ -307,7 +311,7 @@ public class FrontendMessagePushService implements MqttCallback { @Override public void mqttErrorOccurred(MqttException exception) { - log.error("LCC_MQTT error occurred", exception); + log.error("FRONTEND_MQTT error occurred", exception); } @Override @@ -333,7 +337,7 @@ public class FrontendMessagePushService implements MqttCallback { @Override public void connectComplete(boolean reconnect, String serverURI) { - BannerUtils.printConfig(log, "LCC_MQTT 开启监听成功", new String[]{ + BannerUtils.printConfig(log, "FRONTEND_MQTT 开启监听成功", new String[]{ "brokerUrl: " + serverURI, "userName: " + this.mqttConfig.getUsername(), "clientId: " + clientId}); @@ -347,6 +351,7 @@ public class FrontendMessagePushService implements MqttCallback { @Override public void authPacketArrived(int i, MqttProperties mqttProperties) { - log.info("LCC_MQTT authPacketArrived({}, {})", i, mqttProperties); + log.info("FRONTEND_MQTT authPacketArrived({}, {})", i, mqttProperties); } + }