diff --git a/servo/src/main/java/com/galaxis/rcs/communication/amrCommunication/AmrMessageHandler.java b/servo/src/main/java/com/galaxis/rcs/communication/amrCommunication/AmrMessageHandler.java index c9640bb..6251d4c 100644 --- a/servo/src/main/java/com/galaxis/rcs/communication/amrCommunication/AmrMessageHandler.java +++ b/servo/src/main/java/com/galaxis/rcs/communication/amrCommunication/AmrMessageHandler.java @@ -89,6 +89,7 @@ public class AmrMessageHandler { break; case 20011: { int EventId = jw.asInt("content", "EventId"); + log.info("1-Received message: " + json); switch (EventId) { case 0: case 2: @@ -97,21 +98,27 @@ public class AmrMessageHandler { case 6: case 7: AmrMessage> taskStatusChangeDefault = JacksonUtils.parse(json, typeRef20011_defaultMessage); - + AmrTaskStatusMessage statusMessage = taskStatusChangeDefault.content; + agvItem.updateDeviceTaskStatus((int)statusMessage.SeqNo, 0, 0, statusMessage.EventId); break; case 1: AmrMessage> taskModeChange = JacksonUtils.parse(json, typeRef20011_1Message); + AmrTaskStatusMessage modeMessage = taskModeChange.content; + agvItem.updateDeviceTaskStatus((int)modeMessage.SeqNo, 0, 0, modeMessage.EventId); break; case 4: AmrMessage> taskCompleted = JacksonUtils.parse(json, typeRef20011_4Message); - agvItem.logicX = taskCompleted.content.Info.CurLogicX; - agvItem.logicY = taskCompleted.content.Info.CurLogicY; + AmrTaskStatusMessage completedMessage = taskCompleted.content; + agvItem.updateDeviceTaskStatus((int)completedMessage.SeqNo, completedMessage.Info.CurLogicX, completedMessage.Info.CurLogicY, completedMessage.EventId); + agvItem.logicX = completedMessage.Info.CurLogicX; + agvItem.logicY = completedMessage.Info.CurLogicY; // agvStatusAndInfo.orientation = landmarkMessage.content.CurOrientation; agvItem.direction = taskCompleted.content.Info.CurDirection; - log.info("1-Received message: " + json); break; case 8: AmrMessage> taskTypeChange = JacksonUtils.parse(json, typeRef20011_8Message); + AmrTaskStatusMessage typeMessage = taskTypeChange.content; + agvItem.updateDeviceTaskStatus((int)typeMessage.SeqNo, 0, 0, typeMessage.EventId); default: break; } @@ -129,7 +136,7 @@ public class AmrMessageHandler { // agvStatusAndInfo.orientation = landmarkMessage.content.CurOrientation; agvItem.direction = landmarkMessage.content.CurDirection; // if (landmarkMessage.content.VehicleId == 32) { - log.info("2-Received message: " + json); +// log.info("2-Received message: " + json); // } break; case 20050: diff --git a/servo/src/main/java/com/yvan/logisticsModel/PtrAgvConnectorThread.java b/servo/src/main/java/com/yvan/logisticsModel/PtrAgvConnectorThread.java index 4425a09..09fe072 100644 --- a/servo/src/main/java/com/yvan/logisticsModel/PtrAgvConnectorThread.java +++ b/servo/src/main/java/com/yvan/logisticsModel/PtrAgvConnectorThread.java @@ -7,22 +7,24 @@ import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.mqttv5.common.MqttException; import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; @Slf4j public class PtrAgvConnectorThread extends Thread { private final AtomicBoolean running = new AtomicBoolean(false); - private final PtrAgvItem executorItem; private final Cl2DeviceConnector cl2DeviceConnector; private final PtrAgvItem ptrAgvItem; private final LogisticsRuntime logisticsRuntime; - public PtrAgvConnectorThread(PtrAgvItem executorItem, Cl2DeviceConnector cl2DeviceConnector, PtrAgvItem ptrAgvItem, LogisticsRuntime logisticsRuntime) { - super("ExecutorConnector-" + executorItem.getId()); - this.executorItem = executorItem; + + + public PtrAgvConnectorThread(PtrAgvItem ptrAgvItem, Cl2DeviceConnector cl2DeviceConnector, LogisticsRuntime logisticsRuntime) { + super("ExecutorConnector-" + ptrAgvItem.getId()); this.cl2DeviceConnector = cl2DeviceConnector; this.ptrAgvItem = ptrAgvItem; this.logisticsRuntime = logisticsRuntime; @@ -31,7 +33,7 @@ public class PtrAgvConnectorThread extends Thread { @Override public void run() { running.set(true); - log.info("Connector thread started for executor: {}", this.executorItem.getId()); + log.info("Connector thread started for executor: {}", this.ptrAgvItem.getId()); try { @@ -43,15 +45,26 @@ public class PtrAgvConnectorThread extends Thread { PtrAgvDeviceTask nextTask = null; AmrTaskMessage taskMessage = null; + // 计算中的任务 + List computingTaskList = new ArrayList<>(); while (running.get()) { + + for (PtrAgvDeviceTask task : this.ptrAgvItem.runningDeviceTaskList) { + if (task.taskGroupStatus < 3) { + LockSupport.park(); // 阻塞当前线程 + break; + } + } + // 从队列中获取任务,如果队列为空则阻塞等待 if (nextTask == null) { - nextTask = this.executorItem.deviceTaskQueue.take(); + nextTask = this.ptrAgvItem.deviceTaskQueue.take(); + } else { currentTask = nextTask; } - if (startTask == null) { + if (startTask == null && !nextTask.isLastTask) { taskMessage = new AmrTaskMessage(); startTask = nextTask; currentTask = nextTask; @@ -64,19 +77,23 @@ public class PtrAgvConnectorThread extends Thread { taskMessage.Link = new ArrayList<>(); } - if (currentTask == nextTask) { + if (currentTask == nextTask && taskMessage != null) { currentTask.seqNo = taskMessage.SeqNo; AmrTaskMessage.LinkData link = new AmrTaskMessage.LinkData(currentTask.endPoint.logicX, currentTask.endPoint.logicY, currentTask.speed); taskMessage.Link.add(link); taskCount++; distance += euclideanDistance(currentTask.startPoint.tf[0], currentTask.endPoint.tf[0]); + computingTaskList.add(currentTask); } - - if (currentTask.operationType > 0 || currentTask.pickMode > 0 - || (((startTask.speed > 0) != (nextTask.speed > 0) || startTask.direction != nextTask.direction) && taskMessage.Link.size() > 0) - // 单向移动距离大于2m时拆分任务指令 - || (distance > 2 && taskCount > 1)) { + // 组织设备任务并发送 + else if (taskMessage != null && + (currentTask.operationType > 0 // 当前任务不是移动任务 + || currentTask.pickMode > 0 // 当前作业不是默认作业(无) + || nextTask.isLastTask // 下一个任务是结束任务 + || (((startTask.speed > 0) != (nextTask.speed > 0) || startTask.direction != nextTask.direction) && taskMessage.Link.size() > 0) // 下一个任务和开始任务方向不一致 + // 单向移动距离大于2m时并且点位数量大于1 + || (distance > 2 && taskCount > 1))) { taskMessage.OperationType = currentTask.operationType; taskMessage.PickMode = currentTask.pickMode; @@ -84,7 +101,14 @@ public class PtrAgvConnectorThread extends Thread { taskMessage.EndY = currentTask.endPoint.logicY; try { + // 发送任务 cl2DeviceConnector.sendTask(ptrAgvItem.getId(), taskMessage); + this.ptrAgvItem.runningDeviceTaskList.addAll(computingTaskList); + for (PtrAgvDeviceTask task : computingTaskList) { + task.taskStatus = 1; + task.taskGroupStatus = 1; + } + computingTaskList.clear(); } catch (MqttException | JsonProcessingException e) { log.error("Cl2DeviceConnector robotMove: executorItem={}, task={}, error={}", ptrAgvItem.getId(), currentTask.planTaskId, e); @@ -95,14 +119,14 @@ public class PtrAgvConnectorThread extends Thread { startTask = null; } - if (nextTask == currentTask) { + if (nextTask == currentTask || nextTask.isLastTask) { nextTask = null; } } } catch (InterruptedException e) { - System.out.println("Connector thread interrupted for executor: " + this.executorItem.getId()); + System.out.println("Connector thread interrupted for executor: " + this.ptrAgvItem.getId()); } finally { - System.out.println("Connector thread stopped for executor: " + this.executorItem.getId()); + System.out.println("Connector thread stopped for executor: " + this.ptrAgvItem.getId()); } } diff --git a/servo/src/main/java/com/yvan/logisticsModel/PtrAgvDeviceTask.java b/servo/src/main/java/com/yvan/logisticsModel/PtrAgvDeviceTask.java index ed6bd44..b99d0b6 100644 --- a/servo/src/main/java/com/yvan/logisticsModel/PtrAgvDeviceTask.java +++ b/servo/src/main/java/com/yvan/logisticsModel/PtrAgvDeviceTask.java @@ -24,4 +24,11 @@ public class PtrAgvDeviceTask { public Long bizTaskId; // 作业序号 发送给小车的作业序号 public int seqNo; + + // 任务状态 0创建 1发送成功 2接收成功 3任务开始 4已完成 5已取消 6已暂停 7已恢复 8任务已经变更 + public int taskStatus = 0; + public int taskGroupStatus = 0; + + // 是否最后任务 + public boolean isLastTask = false; } diff --git a/servo/src/main/java/com/yvan/logisticsModel/PtrAgvItem.java b/servo/src/main/java/com/yvan/logisticsModel/PtrAgvItem.java index 2e1a5f2..679ef99 100644 --- a/servo/src/main/java/com/yvan/logisticsModel/PtrAgvItem.java +++ b/servo/src/main/java/com/yvan/logisticsModel/PtrAgvItem.java @@ -12,6 +12,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.locks.LockSupport; //0.4m/ss // a max 1.2m/s //90 = 3.5s cl2 @@ -60,6 +61,9 @@ public class PtrAgvItem extends ExecutorItem { public double orientation; + // 执行中的任务 + public List runningDeviceTaskList = new ArrayList<>(); + /** * 当前执行的任务规划列表 */ @@ -73,6 +77,37 @@ public class PtrAgvItem extends ExecutorItem { */ private final PtrAgvConnectorThread connectorThread; + /** + * 更新设备任务状态 暂时没有处理任务取消相关的状态 + * @param seqNo + * @param x + * @param y + * @param messageStatus + */ + public void updateDeviceTaskStatus(int seqNo, int x, int y, int messageStatus) { + + if (messageStatus < 2) { + return; + } + + boolean needCompute = true; + for (PtrAgvDeviceTask task : runningDeviceTaskList) { + if (task.seqNo == seqNo) { + task.taskGroupStatus = messageStatus; + if (task.x == x && task.y == y) { + task.taskStatus = 4; + } + } + if (task.taskGroupStatus < 3 /*|| task.taskStatus < 4*/) { + needCompute = false; + } + } + if (needCompute) { + LockSupport.unpark(connectorThread); + } + } + + public void mapReady() { this.isMapReady = true; this.startConnector(); @@ -202,6 +237,10 @@ public class PtrAgvItem extends ExecutorItem { } } + // 添加结束任务 + PtrAgvDeviceTask deviceTaskEnd = new PtrAgvDeviceTask(); + deviceTaskEnd.isLastTask = true; + deviceTaskList.add(deviceTaskEnd); planQueue.addAll(sequence.taskList); deviceTaskQueue.addAll(deviceTaskList); @@ -218,7 +257,7 @@ public class PtrAgvItem extends ExecutorItem { public PtrAgvItem(LogisticsRuntime logisticsRuntime, Map raw) { super(logisticsRuntime, raw); - this.connectorThread = new PtrAgvConnectorThread(this, this.cl2DeviceConnector, this, logisticsRuntime); + this.connectorThread = new PtrAgvConnectorThread(this, this.cl2DeviceConnector, logisticsRuntime); }