Browse Source

BackendMessageReceiver / FrontendMessagePushService

master
修宁 6 months ago
parent
commit
7cb0785f12
  1. 14
      servo/src/main/java/com/galaxis/rcs/plan/PlanTaskSequence.java
  2. 5
      servo/src/main/java/com/galaxis/rcs/ptr/PtrAgvItem.java
  3. 31
      servo/src/main/java/com/yvan/entity/AgvStatusVo.java
  4. 97
      servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java
  5. 49
      servo/src/main/java/com/yvan/pusher/FrontendMessagePushService.java

14
servo/src/main/java/com/galaxis/rcs/plan/PlanTaskSequence.java

@ -93,7 +93,7 @@ public class PlanTaskSequence {
} }
// 添加取货动作 // 添加取货动作
public RcsTaskPlan addLoad( String rackId, int bay, int level, int cell) { public RcsTaskPlan addLoad(String rackId, int bay, int level, int cell) {
RcsTaskPlan task = this.createTaskPlanEntity(PlanTaskType.LOAD.toString()); RcsTaskPlan task = this.createTaskPlanEntity(PlanTaskType.LOAD.toString());
task.setTargetId(rackId); task.setTargetId(rackId);
task.setTargetBay(bay); task.setTargetBay(bay);
@ -199,7 +199,17 @@ public class PlanTaskSequence {
return this.taskList.isEmpty(); return this.taskList.isEmpty();
} }
public int size() { public int taskTotalCount() {
return this.taskList.size(); return this.taskList.size();
} }
public int completedCount() {
int count = 0;
for (RcsTaskPlan task : taskList) {
if (PlanTaskStatus.FINISHED.toString().equals(task.getPlanTaskStatus())) {
count++;
}
}
return count;
}
} }

5
servo/src/main/java/com/galaxis/rcs/ptr/PtrAgvItem.java

@ -16,6 +16,7 @@ import com.google.common.collect.Queues;
import com.yvan.logisticsModel.ExecutorItem; import com.yvan.logisticsModel.ExecutorItem;
import com.yvan.logisticsModel.LogisticsRuntime; import com.yvan.logisticsModel.LogisticsRuntime;
import com.yvan.logisticsModel.StaticItem; import com.yvan.logisticsModel.StaticItem;
import lombok.Getter;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.clever.core.Conv; import org.clever.core.Conv;
@ -60,8 +61,10 @@ public abstract class PtrAgvItem extends ExecutorItem {
public short direction; public short direction;
// agv当前转动角度值 // agv当前转动角度值
public double orientation; public double orientation;
public boolean isBlocked = false;
// 任务模式 // 任务模式
@Getter
private AmrTaskMode __taskMode; private AmrTaskMode __taskMode;
private volatile boolean isPaused = false; private volatile boolean isPaused = false;
@ -446,7 +449,7 @@ public abstract class PtrAgvItem extends ExecutorItem {
this.runtime.eventManager.fireOfflineEvent(this); this.runtime.eventManager.fireOfflineEvent(this);
} }
private String getTaskStatus() { public String getTaskStatus() {
if (planTaskSequence == null) return "IDLE"; if (planTaskSequence == null) return "IDLE";
if (isPaused) return "PAUSED"; if (isPaused) return "PAUSED";
return "EXECUTING"; return "EXECUTING";

31
servo/src/main/java/com/yvan/entity/AgvStatusVo.java

@ -0,0 +1,31 @@
package com.yvan.entity;
import com.galaxis.rcs.common.enums.BizTaskStatus;
import com.galaxis.rcs.common.enums.BizTaskType;
import com.galaxis.rcs.common.enums.LCCDirection;
import com.galaxis.rcs.ptr.AmrTaskMode;
public record AgvStatusVo(String id,
String type,
double x,
double y,
double z,
int logicX,
int logicY,
LCCDirection direction,
double orientation,
double soc,
AmrTaskMode mode,
String taskStatus,
boolean isBlocked,
int taskCompleted,
int taskTotalCount,
// 业务任务ID
Long bizTaskId,
BizTaskType bizTaskType,
BizTaskStatus bizTaskStatus,
String bizTaskFrom,
String bizTaskTo,
String bizLpn) {
}

97
servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java

@ -1,25 +1,30 @@
package com.yvan.logisticsModel; package com.yvan.logisticsModel;
import com.galaxis.rcs.common.entity.RcsTaskPlan; import com.galaxis.rcs.common.entity.RcsTaskPlan;
import com.galaxis.rcs.common.enums.BizTaskStatus;
import com.galaxis.rcs.common.enums.BizTaskType;
import com.galaxis.rcs.common.enums.PlanTaskType; import com.galaxis.rcs.common.enums.PlanTaskType;
import com.galaxis.rcs.connector.cl2.Cl2Item; import com.galaxis.rcs.connector.cl2.Cl2Item;
import com.galaxis.rcs.inv.InvManager; import com.galaxis.rcs.inv.InvManager;
import com.galaxis.rcs.plan.PlanTaskSequence; import com.galaxis.rcs.plan.PlanTaskSequence;
import com.galaxis.rcs.plan.path.NavigationGraph; import com.galaxis.rcs.plan.path.NavigationGraph;
import com.galaxis.rcs.plan.path.PathUtils;
import com.galaxis.rcs.plan.path.PtrPathPlanner; import com.galaxis.rcs.plan.path.PtrPathPlanner;
import com.galaxis.rcs.ptr.AmrMessageHandler; import com.galaxis.rcs.ptr.AmrMessageHandler;
import com.galaxis.rcs.ptr.PtrAgvItem;
import com.galaxis.rcs.task.TaskDispatchFactory; import com.galaxis.rcs.task.TaskDispatchFactory;
import com.galaxis.rcs.task.TaskService; import com.galaxis.rcs.task.TaskService;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.yvan.entity.AgvStatusVo;
import com.yvan.entity.LccProject;
import com.yvan.entity.LccProjectEnv;
import com.yvan.event.AgvEventManager; import com.yvan.event.AgvEventManager;
import com.yvan.event.AgvEventType; import com.yvan.event.AgvEventType;
import com.yvan.pusher.FrontendMessagePushService; import com.yvan.pusher.FrontendMessagePushService;
import com.yvan.redis.LccRedisService; import com.yvan.redis.LccRedisService;
import com.yvan.entity.LccProject;
import com.yvan.entity.LccProjectEnv;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.clever.core.BannerUtils; import org.clever.core.BannerUtils;
import org.clever.core.Conv; import org.clever.core.Conv;
@ -28,7 +33,10 @@ import org.clever.data.jdbc.QueryDSL;
import org.clever.data.redis.Redis; import org.clever.data.redis.Redis;
import org.clever.data.redis.RedisAdmin; import org.clever.data.redis.RedisAdmin;
import java.util.*; import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
/** /**
* 物流运行时 * 物流运行时
@ -128,10 +136,10 @@ public class LogisticsRuntime {
this.isVirtual = env.getIsVirtual(); this.isVirtual = env.getIsVirtual();
this.serverId = serverId; this.serverId = serverId;
this.setupEventHandle(); this.setupAgvEventHandle();
} }
private void setupEventHandle() { private void setupAgvEventHandle() {
eventManager.subscribe((AgvEventType type, Object... args) -> { eventManager.subscribe((AgvEventType type, Object... args) -> {
ExecutorItem sender = (ExecutorItem) args[0]; ExecutorItem sender = (ExecutorItem) args[0];
@ -139,6 +147,8 @@ public class LogisticsRuntime {
for (int i = 1; i < args.length; i++) { for (int i = 1; i < args.length; i++) {
eventArgs[i - 1] = Conv.asString(args[i]); eventArgs[i - 1] = Conv.asString(args[i]);
} }
// ===================== 库存变化处理 =====================
if (type == AgvEventType.PLAN_TASK_COMPLETE) { if (type == AgvEventType.PLAN_TASK_COMPLETE) {
PlanTaskSequence taskSequence = (PlanTaskSequence) args[1]; PlanTaskSequence taskSequence = (PlanTaskSequence) args[1];
RcsTaskPlan taskPlan = (RcsTaskPlan) args[2]; RcsTaskPlan taskPlan = (RcsTaskPlan) args[2];
@ -150,7 +160,72 @@ public class LogisticsRuntime {
// 处理库存变化 agv -> rack // 处理库存变化 agv -> rack
changeInvOfUnload(taskSequence, taskPlan); changeInvOfUnload(taskSequence, taskPlan);
} }
}
// ===================== 推送 AGV 事件到前端 =====================
switch (type) {
case ONLINE:
// AGV上线
this.frontendMessagePushService.pushDeviceAlive(sender.id, sender.getT(), true);
break;
case OFFLINE:
// AGV下线
this.frontendMessagePushService.pushDeviceAlive(sender.id, sender.getT(), true);
break;
case FREE:
case PLAN_TASK_SEQUENCE_COMPLETE:
case PLAN_TASK_COMPLETE:
case PLAN_TASK_EXCEPTION:
case PLAN_TASK_SEQUENCE_ACCEPT:
case PLAN_TASK_SEQUENCE_CANCEL:
case PLAN_TASK_SEQUENCE_PAUSE:
case PLAN_TASK_SEQUENCE_RESUME:
case BLOCKED:
case BLOCKED_RECOVER:
case POS_CHANGED:
case DIRECTION_CHANGED:
case LOW_BATTERY:
case MODE_CHANGED:
// 设备状态变化
if (sender instanceof PtrAgvItem) {
var ptr = (PtrAgvItem) sender;
int taskCompleted = 0;
int taskTotalCount = 0;
if (ptr.planTaskSequence != null) {
taskCompleted = ptr.planTaskSequence.completedCount();
taskTotalCount = ptr.planTaskSequence.taskTotalCount();
}
var status = new AgvStatusVo(
sender.id,
sender.getT(),
ptr.x,
ptr.y,
ptr.z,
ptr.logicX,
ptr.logicY,
PathUtils.getDirectionByArmDirection(ptr.direction),
ptr.orientation,
ptr.battery.SOC,
ptr.get__taskMode(),
ptr.getTaskStatus(),
ptr.isBlocked,
taskCompleted,
taskTotalCount,
ptr.planTaskSequence == null ? null : ptr.planTaskSequence.bizTask.getBizTaskId(),
ptr.planTaskSequence == null ? null : BizTaskType.fromString(ptr.planTaskSequence.bizTask.getBizType()),
ptr.planTaskSequence == null ? null : BizTaskStatus.fromString(ptr.planTaskSequence.bizTask.getBizTaskStatus()),
ptr.planTaskSequence == null ? null : ptr.planTaskSequence.bizTask.getTaskFrom(),
ptr.planTaskSequence == null ? null : ptr.planTaskSequence.bizTask.getTaskTo(),
ptr.planTaskSequence == null ? null : ptr.planTaskSequence.bizTask.getLpn()
);
this.frontendMessagePushService.pushDeviceStatus(sender.id, status);
} else {
log.error("AGV事件类型 {} 仅支持 PtrAgvItem 类型的执行器", type);
}
break;
} }
BannerUtils.printConfig(log, this.projectUuid + "(" + this.envId + ") " + type + " AGV:" + sender.getId(), eventArgs); BannerUtils.printConfig(log, this.projectUuid + "(" + this.envId + ") " + type + " AGV:" + sender.getId(), eventArgs);
@ -161,28 +236,24 @@ public class LogisticsRuntime {
* 库存转移 AGV->货架 * 库存转移 AGV->货架
*/ */
private void changeInvOfUnload(PlanTaskSequence taskSequence, RcsTaskPlan taskPlan) { private void changeInvOfUnload(PlanTaskSequence taskSequence, RcsTaskPlan taskPlan) {
String lpn = taskSequence.carryLpn;
queryDSL.beginTX(status -> { queryDSL.beginTX(status -> {
String lpn = taskSequence.carryLpn;
InvManager.invSave(this.envId, taskSequence.bizTask.getBizTaskId(), lpn, taskSequence.executorVo.getLocCode(), -taskSequence.carryQty); InvManager.invSave(this.envId, taskSequence.bizTask.getBizTaskId(), lpn, taskSequence.executorVo.getLocCode(), -taskSequence.carryQty);
InvManager.invSave(this.envId, taskSequence.bizTask.getBizTaskId(), lpn, taskSequence.unloadBasLocationVo.getLocCode(), taskSequence.carryQty); InvManager.invSave(this.envId, taskSequence.bizTask.getBizTaskId(), lpn, taskSequence.unloadBasLocationVo.getLocCode(), taskSequence.carryQty);
this.frontendMessagePushService.pushInventoryUpdate(lpn, taskSequence.executorVo, taskSequence.unloadBasLocationVo, taskSequence.carryQty);
}); });
this.frontendMessagePushService.pushInvUpdate(lpn, taskSequence.executorVo, taskSequence.unloadBasLocationVo, taskSequence.carryQty);
} }
/** /**
* 库存转移 货架->AGV * 库存转移 货架->AGV
*/ */
private void changeInvOfLoad(PlanTaskSequence taskSequence, RcsTaskPlan taskPlan) { private void changeInvOfLoad(PlanTaskSequence taskSequence, RcsTaskPlan taskPlan) {
String lpn = taskSequence.carryLpn;
queryDSL.beginTX(status -> { queryDSL.beginTX(status -> {
String lpn = taskSequence.carryLpn;
InvManager.invSave(this.envId, taskSequence.bizTask.getBizTaskId(), lpn, taskSequence.loadBasLocationVo.getLocCode(), -taskSequence.carryQty); InvManager.invSave(this.envId, taskSequence.bizTask.getBizTaskId(), lpn, taskSequence.loadBasLocationVo.getLocCode(), -taskSequence.carryQty);
InvManager.invSave(this.envId, taskSequence.bizTask.getBizTaskId(), lpn, taskSequence.executorVo.getLocCode(), taskSequence.carryQty); InvManager.invSave(this.envId, taskSequence.bizTask.getBizTaskId(), lpn, taskSequence.executorVo.getLocCode(), taskSequence.carryQty);
this.frontendMessagePushService.pushInventoryUpdate(lpn, taskSequence.loadBasLocationVo, taskSequence.executorVo, taskSequence.carryQty);
}); });
this.frontendMessagePushService.pushInvUpdate(lpn, taskSequence.loadBasLocationVo, taskSequence.executorVo, taskSequence.carryQty);
} }
/** /**

49
servo/src/main/java/com/yvan/pusher/FrontendMessagePushService.java

@ -1,5 +1,6 @@
package com.yvan.pusher; package com.yvan.pusher;
import com.yvan.entity.AgvStatusVo;
import com.yvan.entity.BasLocationVo; import com.yvan.entity.BasLocationVo;
import com.yvan.logisticsEnv.EnvConfig; import com.yvan.logisticsEnv.EnvConfig;
import com.yvan.logisticsModel.LogisticsRuntime; import com.yvan.logisticsModel.LogisticsRuntime;
@ -56,11 +57,11 @@ public class FrontendMessagePushService implements MqttCallback {
connectionLock.lock(); connectionLock.lock();
try { try {
if (connected) { if (connected) {
log.warn("LCC_MQTT service is already started"); log.warn("FRONTEND_MQTT service is already started");
return; return;
} }
log.info("Starting LCC_MQTT service for project: {}, env: {}", this.runtime.projectUuid, this.runtime.envId); log.info("Starting FRONTEND_MQTT service for project: {}, env: {}", this.runtime.projectUuid, this.runtime.envId);
// 创建MQTT客户端 // 创建MQTT客户端
mqttClient = new MqttClient(mqttConfig.getBrokerUrl(), clientId, persistence); mqttClient = new MqttClient(mqttConfig.getBrokerUrl(), clientId, persistence);
@ -81,12 +82,12 @@ public class FrontendMessagePushService implements MqttCallback {
while (attempts < MAX_RETRIES && !connected) { while (attempts < MAX_RETRIES && !connected) {
attempts++; attempts++;
try { try {
log.info("Connecting to LCC_MQTT broker (attempt {}/{})", attempts, MAX_RETRIES); log.info("Connecting to FRONTEND_MQTT broker (attempt {}/{})", attempts, MAX_RETRIES);
mqttClient.connect(options); mqttClient.connect(options);
connected = true; connected = true;
log.info("LCC_MQTT connected successfully"); log.info("FRONTEND_MQTT connected successfully");
} catch (MqttException e) { } catch (MqttException e) {
log.error("LCC_MQTT connection failed (attempt " + attempts + "/{" + MAX_RETRIES + "})", e); log.error("FRONTEND_MQTT connection failed (attempt " + attempts + "/{" + MAX_RETRIES + "})", e);
// 重试前等待 // 重试前等待
if (attempts < MAX_RETRIES) { if (attempts < MAX_RETRIES) {
@ -101,7 +102,7 @@ public class FrontendMessagePushService implements MqttCallback {
} }
if (!connected) { if (!connected) {
log.error("Failed to connect to LCC_MQTT broker after {} attempts", MAX_RETRIES); log.error("Failed to connect to FRONTEND_MQTT broker after {} attempts", MAX_RETRIES);
} }
} finally { } finally {
connectionLock.unlock(); connectionLock.unlock();
@ -118,9 +119,9 @@ public class FrontendMessagePushService implements MqttCallback {
try { try {
mqttClient.disconnect(); mqttClient.disconnect();
mqttClient.close(); mqttClient.close();
log.info("LCC_MQTT disconnected"); log.info("FRONTEND_MQTT disconnected");
} catch (MqttException e) { } catch (MqttException e) {
log.error("Error disconnecting LCC_MQTT", e); log.error("Error disconnecting FRONTEND_MQTT", e);
} }
} }
connected = false; connected = false;
@ -172,7 +173,7 @@ public class FrontendMessagePushService implements MqttCallback {
* @param before 更新前库存 * @param before 更新前库存
* @param after 更新后库存 * @param after 更新后库存
*/ */
public void pushInventoryUpdate(String lpn, BasLocationVo before, BasLocationVo after, int qty) { public void pushInvUpdate(String lpn, BasLocationVo before, BasLocationVo after, int qty) {
String topic = buildTopic("inv"); String topic = buildTopic("inv");
Map<String, Object> data = new HashMap<>(); Map<String, Object> data = new HashMap<>();
@ -191,7 +192,7 @@ public class FrontendMessagePushService implements MqttCallback {
* @param deviceId 设备ID * @param deviceId 设备ID
* @param statusData 状态数据 * @param statusData 状态数据
*/ */
public void pushDeviceStatus(String deviceId, Map<String, Object> statusData) { public void pushDeviceStatus(String deviceId, AgvStatusVo statusData) {
String topic = buildTopic("device/" + deviceId + "/status"); String topic = buildTopic("device/" + deviceId + "/status");
publishJson(topic, statusData); publishJson(topic, statusData);
} }
@ -200,11 +201,18 @@ public class FrontendMessagePushService implements MqttCallback {
* 推送设备存活状态 * 推送设备存活状态
* *
* @param deviceId 设备ID * @param deviceId 设备ID
* @param type 设备类型
* @param online 是否在线 * @param online 是否在线
*/ */
public void pushDeviceAlive(String deviceId, boolean online) { public void pushDeviceAlive(String deviceId, String type, boolean online) {
String topic = buildTopic("device/" + deviceId + "/alive"); String topic = buildTopic("device/" + deviceId + "/alive");
publishString(topic, online ? "online" : "offline");
Map<String, Object> data = new HashMap<>();
data.put("id", deviceId);
data.put("type", type);
data.put("online", online);
publishJson(topic, data);
} }
/** /**
@ -255,12 +263,8 @@ public class FrontendMessagePushService implements MqttCallback {
* 发布JSON数据 * 发布JSON数据
*/ */
private void publishJson(String topic, Object data) { private void publishJson(String topic, Object data) {
try { String json = JacksonMapper.getInstance().toJson(data);
String json = JacksonMapper.getInstance().toJson(data); publish(topic, json);
publish(topic, json);
} catch (Exception e) {
log.error("Failed to serialize JSON for topic: " + topic, e);
}
} }
/** /**
@ -296,7 +300,7 @@ public class FrontendMessagePushService implements MqttCallback {
@Override @Override
public void disconnected(MqttDisconnectResponse disconnectResponse) { public void disconnected(MqttDisconnectResponse disconnectResponse) {
log.warn("LCC_MQTT disconnected: {}", disconnectResponse); log.warn("FRONTEND_MQTT disconnected: {}", disconnectResponse);
connectionLock.lock(); connectionLock.lock();
try { try {
connected = false; connected = false;
@ -307,7 +311,7 @@ public class FrontendMessagePushService implements MqttCallback {
@Override @Override
public void mqttErrorOccurred(MqttException exception) { public void mqttErrorOccurred(MqttException exception) {
log.error("LCC_MQTT error occurred", exception); log.error("FRONTEND_MQTT error occurred", exception);
} }
@Override @Override
@ -333,7 +337,7 @@ public class FrontendMessagePushService implements MqttCallback {
@Override @Override
public void connectComplete(boolean reconnect, String serverURI) { public void connectComplete(boolean reconnect, String serverURI) {
BannerUtils.printConfig(log, "LCC_MQTT 开启监听成功", new String[]{ BannerUtils.printConfig(log, "FRONTEND_MQTT 开启监听成功", new String[]{
"brokerUrl: " + serverURI, "brokerUrl: " + serverURI,
"userName: " + this.mqttConfig.getUsername(), "userName: " + this.mqttConfig.getUsername(),
"clientId: " + clientId}); "clientId: " + clientId});
@ -347,6 +351,7 @@ public class FrontendMessagePushService implements MqttCallback {
@Override @Override
public void authPacketArrived(int i, MqttProperties mqttProperties) { public void authPacketArrived(int i, MqttProperties mqttProperties) {
log.info("LCC_MQTT authPacketArrived({}, {})", i, mqttProperties); log.info("FRONTEND_MQTT authPacketArrived({}, {})", i, mqttProperties);
} }
} }

Loading…
Cancel
Save