Browse Source

Agv Event 体系

master
修宁 6 months ago
parent
commit
8f770cff9d
  1. 10
      servo/src/main/java/com/galaxis/rcs/plan/path/PathUtils.java
  2. 4
      servo/src/main/java/com/galaxis/rcs/ptr/PosDirection.java
  3. 150
      servo/src/main/java/com/galaxis/rcs/ptr/PtrAgvItem.java
  4. 104
      servo/src/main/java/com/yvan/event/AgvEventManager.java
  5. 23
      servo/src/main/java/com/yvan/event/AgvEventType.java
  6. 24
      servo/src/main/java/com/yvan/event/EventManager.java
  7. 4
      servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java
  8. 14
      servo/src/main/java/com/yvan/state/VariableMonitor.java

10
servo/src/main/java/com/galaxis/rcs/plan/path/PathUtils.java

@ -208,4 +208,14 @@ public class PathUtils {
return 15;
}
public static LCCDirection getDirectionByArmDirection(short direction) {
return switch (direction) {
case 0 -> LCCDirection.RIGHT;
case 1 -> LCCDirection.DOWN;
case 2 -> LCCDirection.LEFT;
case 3 -> LCCDirection.UP;
default -> null;
};
}
}

4
servo/src/main/java/com/galaxis/rcs/ptr/PosDirection.java

@ -1,6 +1,8 @@
package com.galaxis.rcs.ptr;
import com.galaxis.rcs.common.enums.LCCDirection;
public record PosDirection(int logicX,
int logicY,
short direction) {
LCCDirection direction) {
}

150
servo/src/main/java/com/galaxis/rcs/ptr/PtrAgvItem.java

@ -1,13 +1,12 @@
package com.galaxis.rcs.ptr;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.galaxis.rcs.common.entity.RcsTaskPlan;
import com.yvan.event.AgvEventType;
import com.galaxis.rcs.common.enums.LCCDirection;
import com.galaxis.rcs.common.enums.PlanTaskStatus;
import com.galaxis.rcs.common.enums.PlanTaskType;
import com.galaxis.rcs.connector.cl2.Cl2DeviceConnector;
import com.galaxis.rcs.plan.PlanTaskSequence;
import com.galaxis.rcs.plan.path.PathUtils;
import com.galaxis.rcs.ptr.receiveEntity.AmrHeartbeatMessage;
import com.galaxis.rcs.ptr.receiveEntity.base.CurBatteryData;
import com.galaxis.rcs.ptr.sendEntity.RcsConfigMessage;
@ -15,25 +14,24 @@ import com.galaxis.rcs.ptr.sendEntity.RcsSRMessage;
import com.galaxis.rcs.ptr.sendEntity.RcsSetLocationMessage;
import com.google.common.base.Joiner;
import com.google.common.collect.Queues;
import com.yvan.event.EventManager;
import com.yvan.LccUtils;
import com.yvan.event.AgvEventType;
import com.yvan.logisticsModel.ExecutorItem;
import com.yvan.logisticsModel.LogisticsRuntime;
import com.yvan.logisticsModel.StaticItem;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.clever.core.BannerUtils;
import org.clever.core.json.JsonWrapper;
import org.clever.data.redis.Redis;
import org.clever.data.redis.RedisAdmin;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.LockSupport;
//0.4m/ss // a max 1.2m/s
//90 = 3.5s cl2
//90 = 5s // cLX
/**
* 侧叉式AGV执行器
*/
@Slf4j
public abstract class PtrAgvItem extends ExecutorItem {
private static final int BLOCKING_QUEUE_CAPACITY = 100;
@ -67,34 +65,28 @@ public abstract class PtrAgvItem extends ExecutorItem {
public double orientation;
// 任务模式
public AmrTaskMode taskMode;
private AmrTaskMode __taskMode;
private volatile boolean isManualMode = false;
private volatile boolean isPaused = false;
private volatile PosDirection lastPausedPosition;
private volatile boolean isOnline = false;
// 执行中的任务
@JsonIgnore
public List<PtrAgvDeviceTask> runningDeviceTaskList = new ArrayList<>();
/**
* 当前执行的任务规划列表
*/
@JsonIgnore
public volatile PlanTaskSequence planTaskSequence;
/**
* 当前执行的设备任务列表
*/
@JsonIgnore
final BlockingQueue<PtrAgvDeviceTask> deviceTaskQueue = Queues.newArrayBlockingQueue(BLOCKING_QUEUE_CAPACITY);
@JsonIgnore
final Cl2DeviceConnector cl2DeviceConnector = new Cl2DeviceConnector(this.runtime);
@JsonIgnore
public final AmrMessageHandler amrMessageHandler;
final AmrMessageHandler amrMessageHandler;
/**
* 连接器线程
@ -107,23 +99,6 @@ public abstract class PtrAgvItem extends ExecutorItem {
this.amrMessageHandler = logisticsRuntime.amrMessageHandler;
}
@Override
public void stop() {
// 停止连接器线程
stopConnector();
// 清理任务序列
if (planTaskSequence != null) {
planTaskSequence = null;
}
// 清理设备任务队列
deviceTaskQueue.clear();
// 清理运行中的设备任务列表
runningDeviceTaskList.clear();
// 更新Redis状态
updateRedisStatus();
}
@Override
public boolean isRunning() {
return connectorThread.isRunning();
@ -131,11 +106,11 @@ public abstract class PtrAgvItem extends ExecutorItem {
public abstract RcsConfigMessage getConfig();
@Override
public void start() {
this.amrMessageHandler.registeHeartBeatSet(this);
// 查询当前状态
requestCurrentStatus();
this.isRunning = true;
@ -143,9 +118,23 @@ public abstract class PtrAgvItem extends ExecutorItem {
this.startConnector();
}
public synchronized void shutdown() {
this.stopConnector();
@Override
public void stop() {
// 停止连接器线程
stopConnector();
this.amrMessageHandler.unregisteHeartBeatSet(this);
// 清理任务序列
if (planTaskSequence != null) {
planTaskSequence = null;
}
// 清理设备任务队列
deviceTaskQueue.clear();
// 清理运行中的设备任务列表
runningDeviceTaskList.clear();
// 更新Redis状态
updateRedisStatus();
}
public synchronized void dispatchTask(PlanTaskSequence taskSequence) {
@ -153,27 +142,19 @@ public abstract class PtrAgvItem extends ExecutorItem {
throw new IllegalStateException("AGV is not free to accept new tasks");
}
if (isManualMode) {
throw new IllegalStateException("AGV is in manual mode and cannot accept tasks");
}
this.planTaskSequence = taskSequence;
buildPlanToDeviceTask();
fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_ACCEPT, this, taskSequence);
this.runtime.eventManager.firePlanTaskSequenceAcceptEvent(this, taskSequence);
connectorThread.resumeProcessing();
}
private void fireEvent(AgvEventType agvEventType, Object... args) {
this.runtime.eventManager.fireEvent(agvEventType, args);
}
public synchronized void pauseTask() {
if (planTaskSequence == null) {
throw new IllegalStateException("No active task to pause");
}
isPaused = true;
lastPausedPosition = new PosDirection(logicX, logicY, direction);
lastPausedPosition = new PosDirection(logicX, logicY, PathUtils.getDirectionByArmDirection(direction));
// 发送停止指令
RcsSRMessage stopMsg = new RcsSRMessage(this.runtime);
@ -188,7 +169,7 @@ public abstract class PtrAgvItem extends ExecutorItem {
log.error("Failed to send stop command to AGV {}", this.getId(), e);
}
fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_PAUSE, this);
this.runtime.eventManager.firePlanTaskSequencePauseEvent(this, planTaskSequence);
}
public synchronized void resumeTask() {
@ -199,14 +180,14 @@ public abstract class PtrAgvItem extends ExecutorItem {
// 检查当前位置是否与暂停位置一致
if (Math.abs(logicX - lastPausedPosition.logicX()) > 1 ||
Math.abs(logicY - lastPausedPosition.logicY()) > 1 ||
direction != lastPausedPosition.direction()) {
PathUtils.getDirectionByArmDirection(direction) != lastPausedPosition.direction()) {
// 需要返回暂停位置
throw new RuntimeException("AGV position has changed since pause, cannot resume task safely");
}
isPaused = false;
connectorThread.resumeProcessing();
fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_RESUME, this);
this.runtime.eventManager.firePlanTaskSequenceResumeEvent(this, planTaskSequence);
}
@SneakyThrows
@ -221,7 +202,7 @@ public abstract class PtrAgvItem extends ExecutorItem {
deviceTaskQueue.clear();
}
fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_CANCEL, this);
this.runtime.eventManager.firePlanTaskSequenceCancelEvent(this, planTaskSequence);
}
@SneakyThrows
@ -234,18 +215,6 @@ public abstract class PtrAgvItem extends ExecutorItem {
amrMessageHandler.sendCmdSetLocation(this.getId(), setLoc);
}
public void setControlMode(ControlMode mode) {
// 硬件控制模式设置逻辑
this.isManualMode = (mode == ControlMode.MANUAL);
if (mode != ControlMode.FULL_AUTO && planTaskSequence != null) {
cancelTask();
}
// 更新Redis状态
updateRedisStatus();
}
@SneakyThrows
public void requestCurrentStatus() {
amrMessageHandler.sendCmdQueryStatus(this.getId());
@ -265,9 +234,9 @@ public abstract class PtrAgvItem extends ExecutorItem {
if (this.isPaused) {
return false;
}
// if (this.taskMode != AmrTaskMode.AMR_FREE_MODE) {
// return false;
// }
// if (this.taskMode != AmrTaskMode.AMR_FREE_MODE) {
// return false;
// }
return this.isOnline;
}
@ -278,9 +247,17 @@ public abstract class PtrAgvItem extends ExecutorItem {
for (PtrAgvDeviceTask task : runningDeviceTaskList) {
task.taskGroupStatus = taskStatus;
if (taskStatus == 4) {
this.runtime.eventManager.fireDeviceTaskCompleteEvent(this, task);
task.taskStatus = 4;
// 更新计划任务
List<RcsTaskPlan> planTaskList = planTaskSequence.taskList.stream().filter(pt -> task.movePlanTaskId.equals(pt.getPlanTaskId()) || task.planTaskIdSet.contains(pt.getPlanTaskId())).toList();
for (RcsTaskPlan planTask : planTaskList) {
if (PlanTaskStatus.FINISHED.toString().equals(planTask.getPlanTaskStatus())) {
continue;
}
this.runtime.eventManager.firePlanTaskCompleteEvent(this, planTaskSequence, planTask);
planTask.setPlanTaskStatus(PlanTaskStatus.FINISHED.toString());
planTaskSequence.savePlanTask(planTask);
}
@ -288,7 +265,7 @@ public abstract class PtrAgvItem extends ExecutorItem {
}
if (planTaskSequence != null && planTaskSequence.isAllCompleted()) {
fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_COMPLETE, this);
this.runtime.eventManager.firePlanTaskSequenceCompleteEvent(this, planTaskSequence);
this.runningDeviceTaskList.clear();
planTaskSequence = null;
}
@ -304,18 +281,21 @@ public abstract class PtrAgvItem extends ExecutorItem {
this.logicY = logicY;
this.direction = direction;
LCCDirection oldLccDirection = PathUtils.getDirectionByArmDirection(oldDirection);
LCCDirection newLccDirection = PathUtils.getDirectionByArmDirection(direction);
// 更新Redis
updateRedisStatus();
// 触发位置变化事件
if (oldX != logicX || oldY != logicY) {
fireEvent(AgvEventType.POS_CHANGED, this,
new PosDirection(logicX, logicY, direction),
new PosDirection(oldX, oldY, oldDirection));
this.runtime.eventManager.firePosChangedEvent(this,
new PosDirection(logicX, logicY, newLccDirection),
new PosDirection(oldX, oldY, oldLccDirection));
}
if (oldDirection != direction) {
fireEvent(AgvEventType.DIRECTION_CHANGED, this, direction, oldDirection);
this.runtime.eventManager.fireDirectionChangedEvent(this, newLccDirection, oldLccDirection);
}
boolean needCompute = false;
@ -340,19 +320,18 @@ public abstract class PtrAgvItem extends ExecutorItem {
for (int i = 0; i <= finishTargetIndex; i++) {
PtrAgvDeviceTask task = runningDeviceTaskList.get(i);
task.taskStatus = 4; // 标记为完成
fireEvent(AgvEventType.DEVICE_TASK_COMPLETE, this, task);
this.runtime.eventManager.fireDeviceTaskCompleteEvent(this, task);
// 更新计划任务
RcsTaskPlan planTask = planTaskSequence.getByPlanTaskId(task.movePlanTaskId);
if (planTask != null && !PlanTaskStatus.FINISHED.toString().equals(planTask.getPlanTaskStatus())) {
fireEvent(AgvEventType.DEVICE_TASK_COMPLETE, this, planTaskSequence, planTask);
planTask.setPlanTaskStatus(PlanTaskStatus.FINISHED.toString());
planTaskSequence.savePlanTask(planTask);
this.runtime.eventManager.firePlanTaskCompleteEvent(this, planTaskSequence, planTask);
}
}
if (planTaskSequence.isAllCompleted()) {
fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_COMPLETE, this);
this.runtime.eventManager.firePlanTaskSequenceCompleteEvent(this, planTaskSequence);
this.runningDeviceTaskList.clear();
planTaskSequence = null;
}
@ -397,7 +376,14 @@ public abstract class PtrAgvItem extends ExecutorItem {
}
public void updateTaskMode(int taskMode) {
this.taskMode = AmrTaskMode.fromValue(taskMode);
this.setTaskMode(AmrTaskMode.fromValue(taskMode));
}
private void setTaskMode(AmrTaskMode taskMode) {
var originalMode = this.__taskMode;
this.__taskMode = taskMode;
this.runtime.eventManager.fireModeChangeEvent(this, taskMode, originalMode);
updateRedisStatus();
}
public void updateRedisStatus() {
@ -411,11 +397,10 @@ public abstract class PtrAgvItem extends ExecutorItem {
statusMap.put("direction", String.valueOf(direction));
statusMap.put("orientation", String.valueOf(orientation));
statusMap.put("soc", this.battery == null ? "-1" : String.valueOf(this.battery.SOC));
statusMap.put("mode", isManualMode ? "MANUAL" : "AUTO");
statusMap.put("mode", this.__taskMode.toString());
statusMap.put("taskStatus", getTaskStatus());
redis.hPutAll(statusKey, statusMap);
redis.kExpire(statusKey, 10); // 10秒过期
}
public void handleHeartbeat(AmrHeartbeatMessage heartbeat) {
@ -433,7 +418,7 @@ public abstract class PtrAgvItem extends ExecutorItem {
// 检查低电量
if (this.battery.SOC < 20) {
fireEvent(AgvEventType.LOW_BATTERY, this);
this.runtime.eventManager.fireLowBatteryEvent(this);
}
updateRedisStatus();
@ -441,13 +426,13 @@ public abstract class PtrAgvItem extends ExecutorItem {
public void handleOnlineEvent() {
isOnline = true;
fireEvent(AgvEventType.ONLINE, this);
this.runtime.eventManager.fireOnlineEvent(this);
requestCurrentStatus();
}
public void handleOfflineEvent() {
isOnline = false;
fireEvent(AgvEventType.OFFLINE, this);
this.runtime.eventManager.fireOfflineEvent(this);
}
private String getTaskStatus() {
@ -787,13 +772,12 @@ public abstract class PtrAgvItem extends ExecutorItem {
}
}
// planQueue.addAll(sequence.taskList);
deviceTaskQueue.addAll(deviceTaskList);
}
public boolean isSamePosition(PosDirection startPos) {
return this.logicX == startPos.logicX() && this.logicY == startPos.logicY() && this.direction == startPos.direction();
return this.logicX == startPos.logicX() && this.logicY == startPos.logicY() &&
PathUtils.getDirectionByArmDirection(this.direction) == startPos.direction();
}
private static class CDirection {

104
servo/src/main/java/com/yvan/event/AgvEventManager.java

@ -0,0 +1,104 @@
package com.yvan.event;
import com.galaxis.rcs.common.entity.RcsTaskPlan;
import com.galaxis.rcs.common.enums.LCCDirection;
import com.galaxis.rcs.plan.PlanTaskSequence;
import com.galaxis.rcs.ptr.AmrTaskMode;
import com.galaxis.rcs.ptr.PosDirection;
import com.galaxis.rcs.ptr.PtrAgvDeviceTask;
import com.yvan.logisticsModel.ExecutorItem;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
public class AgvEventManager {
private final Set<AgvEventListener> eventListeners = new CopyOnWriteArraySet<>();
private void fireEvent(AgvEventType type, Object... args) {
for (AgvEventListener listener : eventListeners) {
listener.onEvent(type, args);
}
}
// 事件监听管理
public void subscribe(AgvEventListener listener) {
eventListeners.add(listener);
}
public void unsubscribe(AgvEventListener listener) {
eventListeners.remove(listener);
}
public void fireOfflineEvent(ExecutorItem sender) {
fireEvent(AgvEventType.OFFLINE, sender);
}
public void fireOnlineEvent(ExecutorItem sender) {
fireEvent(AgvEventType.ONLINE, sender);
}
public void fireBlockedEvent(ExecutorItem sender) {
fireEvent(AgvEventType.BLOCKED, sender);
}
public void fireBlockedRecoverEvent(ExecutorItem sender) {
fireEvent(AgvEventType.BLOCKED_RECOVER, sender);
}
public void fireFreeEvent(ExecutorItem sender) {
fireEvent(AgvEventType.FREE, sender);
}
public void firePlanTaskSequenceCompleteEvent(ExecutorItem sender, PlanTaskSequence taskSequence) {
fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_COMPLETE, sender, taskSequence);
}
public void firePlanTaskCompleteEvent(ExecutorItem sender, PlanTaskSequence taskSequence, RcsTaskPlan taskPlan) {
fireEvent(AgvEventType.PLAN_TASK_COMPLETE, sender, taskSequence, taskPlan);
}
public void firePlanTaskExceptionEvent(ExecutorItem sender, PlanTaskSequence taskSequence, RcsTaskPlan taskPlan, Object exception) {
fireEvent(AgvEventType.PLAN_TASK_EXCEPTION, sender, taskSequence, exception);
}
public void firePlanTaskSequenceAcceptEvent(ExecutorItem sender, PlanTaskSequence taskSequence) {
fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_ACCEPT, sender, taskSequence);
}
public void firePlanTaskSequenceCancelEvent(ExecutorItem sender, PlanTaskSequence taskSequence) {
fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_CANCEL, sender, taskSequence);
}
public void firePlanTaskSequencePauseEvent(ExecutorItem sender, PlanTaskSequence taskSequence) {
fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_PAUSE, sender, taskSequence);
}
public void firePlanTaskSequenceResumeEvent(ExecutorItem sender, PlanTaskSequence taskSequence) {
fireEvent(AgvEventType.PLAN_TASK_SEQUENCE_RESUME, sender, taskSequence);
}
public void fireDeviceTaskCompleteEvent(ExecutorItem sender, PtrAgvDeviceTask deviceTask) {
fireEvent(AgvEventType.DEVICE_TASK_COMPLETE, sender, deviceTask);
}
public void fireDeviceTaskExceptionEvent(ExecutorItem sender, PtrAgvDeviceTask deviceTask, Object exception) {
fireEvent(AgvEventType.DEVICE_TASK_EXCEPTION, sender, deviceTask, exception);
}
public void firePosChangedEvent(ExecutorItem sender, PosDirection newPosDirection, PosDirection originPosDirection) {
fireEvent(AgvEventType.POS_CHANGED, sender, newPosDirection, originPosDirection);
}
public void fireDirectionChangedEvent(ExecutorItem sender, LCCDirection direction, LCCDirection originDirection) {
fireEvent(AgvEventType.DIRECTION_CHANGED, sender, direction, originDirection);
}
public void fireLowBatteryEvent(ExecutorItem sender) {
fireEvent(AgvEventType.LOW_BATTERY, sender);
}
public void fireModeChangeEvent(ExecutorItem sender, AmrTaskMode mode, AmrTaskMode originMode) {
fireEvent(AgvEventType.MODE_CHANGED, sender, mode, originMode);
}
}

23
servo/src/main/java/com/yvan/event/AgvEventType.java

@ -3,27 +3,32 @@ package com.yvan.event;
public enum AgvEventType {
/**
* 设备离线
* (ExecutorItem sender)
*/
OFFLINE,
/**
* 设备上线
* (ExecutorItem sender)
*/
ONLINE,
/**
* 设备阻挡
* (ExecutorItem sender)
*/
BLOCKED,
/**
* 设备阻挡恢复
* (ExecutorItem sender)
*/
BLOCKED_RECOVER,
/**
* 设备空闲
* (ExecutorItem sender)
*/
FREE,
/**
* 规划任务序列全部完成
* (ExecutorItem sender)
* (ExecutorItem sender, PlanTaskSequence taskSequence)
*/
PLAN_TASK_SEQUENCE_COMPLETE,
/**
@ -33,6 +38,7 @@ public enum AgvEventType {
PLAN_TASK_COMPLETE,
/**
* 规划任务异常
* (ExecutorItem sender, PlanTaskSequence taskSequence, RcsTaskPlan taskPlan, Object exception)
*/
PLAN_TASK_EXCEPTION,
/**
@ -42,17 +48,17 @@ public enum AgvEventType {
PLAN_TASK_SEQUENCE_ACCEPT,
/**
* 规划任务已取消
* (ExecutorItem sender)
* (ExecutorItem sender, PlanTaskSequence taskSequence)
*/
PLAN_TASK_SEQUENCE_CANCEL,
/**
* 规划任务已暂停
* (ExecutorItem sender)
* (ExecutorItem sender, PlanTaskSequence taskSequence)
*/
PLAN_TASK_SEQUENCE_PAUSE,
/**
* 规划任务已恢复
* (ExecutorItem sender)
* (ExecutorItem sender, PlanTaskSequence taskSequence)
*/
PLAN_TASK_SEQUENCE_RESUME,
/**
@ -62,6 +68,7 @@ public enum AgvEventType {
DEVICE_TASK_COMPLETE,
/**
* 设备任务异常
* (ExecutorItem sender, PtrAgvDeviceTask deviceTask, Object exception)
*/
DEVICE_TASK_EXCEPTION,
/**
@ -71,7 +78,7 @@ public enum AgvEventType {
POS_CHANGED,
/**
* 设备姿态改变
* (ExecutorItem sender, PosDirection newPosDirection, PosDirection originPosDirection)
* (ExecutorItem sender, LCCDirection direction, LCCDirection originDirection)
*/
DIRECTION_CHANGED,
/**
@ -79,4 +86,10 @@ public enum AgvEventType {
* (ExecutorItem sender)
*/
LOW_BATTERY,
/**
* 设备模式改变
* (ExecutorItem sender, AmrTaskMode newMode, AmrTaskMode originMode)
*/
MODE_CHANGED,
}

24
servo/src/main/java/com/yvan/event/EventManager.java

@ -1,24 +0,0 @@
package com.yvan.event;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
public class EventManager {
private final Set<AgvEventListener> eventListeners = new CopyOnWriteArraySet<>();
public void fireEvent(AgvEventType type, Object... args) {
for (AgvEventListener listener : eventListeners) {
listener.onEvent(type, args);
}
}
// 事件监听管理
public void subscribe(AgvEventListener listener) {
eventListeners.add(listener);
}
public void unsubscribe(AgvEventListener listener) {
eventListeners.remove(listener);
}
}

4
servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java

@ -10,7 +10,7 @@ import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.yvan.event.EventManager;
import com.yvan.event.AgvEventManager;
import com.yvan.mqtt.FrontendMessagePushService;
import com.yvan.redis.LccRedisService;
import com.yvan.workbench.model.entity.LccProject;
@ -112,7 +112,7 @@ public class LogisticsRuntime {
public final LccRedisService lccRedisService = new LccRedisService(this);
public final EventManager eventManager = new EventManager();
public final AgvEventManager eventManager = new AgvEventManager();
public LogisticsRuntime(LccProject project, LccProjectEnv env, String serverId) {
this.project = project;

14
servo/src/main/java/com/yvan/state/VariableMonitor.java

@ -50,13 +50,13 @@ public class VariableMonitor<T> {
public VariableMonitor(Supplier<T> provider, long intervalMillis) {
this.provider = provider;
this.intervalMillis = intervalMillis;
this.lastJson = JacksonMapper.getInstance().toJson(provider.get());
}
/**
* 启动定时轮询任务
*/
public void start() {
public VariableMonitor<T> start() {
this.lastJson = JacksonMapper.getInstance().toJson(provider.get());
future = scheduler.scheduleAtFixedRate(() -> {
T currentValue = provider.get();
String currentJson;
@ -67,16 +67,18 @@ public class VariableMonitor<T> {
lastJson = currentJson;
}
}, 0, intervalMillis, TimeUnit.MILLISECONDS);
return this;
}
/**
* 停止定时任务
*/
public void stop() {
public VariableMonitor<T> stop() {
if (future != null) {
future.cancel(false);
}
scheduler.shutdownNow();
return this;
}
/**
@ -84,8 +86,9 @@ public class VariableMonitor<T> {
*
* @param listener 回调函数接收新值作为参数
*/
public void addListener(Consumer<T> listener) {
public VariableMonitor<T> addListener(Consumer<T> listener) {
listeners.add(listener);
return this;
}
/**
@ -93,8 +96,9 @@ public class VariableMonitor<T> {
*
* @param listener 要移除的监听器
*/
public void removeListener(Consumer<T> listener) {
public VariableMonitor<T> removeListener(Consumer<T> listener) {
listeners.remove(listener);
return this;
}
/**

Loading…
Cancel
Save