Browse Source

FrontendMessagePushService 不允许关闭

master
修宁 5 months ago
parent
commit
3faf4a4cc3
  1. 25
      servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java
  2. 57
      servo/src/main/java/com/yvan/pusher/FrontendMessagePushService.java
  3. 7
      servo/src/main/java/com/yvan/workbench/controller/InvController.java
  4. 12
      servo/src/main/java/com/yvan/workbench/service/LccServerInfoConnect.java

25
servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java

@ -123,8 +123,6 @@ public class LogisticsRuntime {
public final AmrMessageHandler amrMessageHandler = new AmrMessageHandler(this);
public final FrontendMessagePushService frontendMessagePushService = new FrontendMessagePushService(this);
public final LccRedisService lccRedisService = new LccRedisService(this);
public final AgvEventManager eventManager = new AgvEventManager();
@ -168,25 +166,25 @@ public class LogisticsRuntime {
case ONLINE:
// AGV上线
sender.setIsOnline(true);
this.frontendMessagePushService.pushDeviceAlive(sender.id, sender.getT(), true);
FrontendMessagePushService.INSTANCE.pushDeviceAlive(this, sender.id, sender.getT(), true);
break;
case OFFLINE:
// AGV下线
sender.setIsOnline(false);
this.frontendMessagePushService.pushDeviceAlive(sender.id, sender.getT(), true);
FrontendMessagePushService.INSTANCE.pushDeviceAlive(this, sender.id, sender.getT(), true);
break;
case BLOCKED:
// AGV上线
sender.setIsBlocked(true);
this.frontendMessagePushService.pushDeviceAlive(sender.id, sender.getT(), true);
FrontendMessagePushService.INSTANCE.pushDeviceAlive(this, sender.id, sender.getT(), true);
break;
case BLOCKED_RECOVER:
// AGV上线
sender.setIsBlocked(false);
this.frontendMessagePushService.pushDeviceAlive(sender.id, sender.getT(), true);
FrontendMessagePushService.INSTANCE.pushDeviceAlive(this, sender.id, sender.getT(), true);
break;
case FREE:
@ -205,7 +203,7 @@ public class LogisticsRuntime {
// 设备状态变化
if (sender instanceof PtrAgvItem) {
var ptr = (PtrAgvItem) sender;
this.frontendMessagePushService.pushDeviceStatus(sender.id, ptr.getState());
FrontendMessagePushService.INSTANCE.pushDeviceStatus(this, sender.id, ptr.getState());
} else {
log.error("AGV事件类型 {} 仅支持 PtrAgvItem 类型的执行器", type);
@ -226,7 +224,7 @@ public class LogisticsRuntime {
InvManager.invSave(this.envId, taskSequence.bizTask.getBizTaskId(), lpn, taskSequence.executorVo.getLocCode(), -taskSequence.carryQty);
InvManager.invSave(this.envId, taskSequence.bizTask.getBizTaskId(), lpn, taskSequence.unloadBasLocationVo.getLocCode(), taskSequence.carryQty);
});
this.frontendMessagePushService.pushInvUpdate(lpn, taskSequence.executorVo, taskSequence.unloadBasLocationVo, taskSequence.carryQty);
FrontendMessagePushService.INSTANCE.pushInvUpdate(this, lpn, taskSequence.executorVo, taskSequence.unloadBasLocationVo, taskSequence.carryQty);
}
/**
@ -238,7 +236,7 @@ public class LogisticsRuntime {
InvManager.invSave(this.envId, taskSequence.bizTask.getBizTaskId(), lpn, taskSequence.loadBasLocationVo.getLocCode(), -taskSequence.carryQty);
InvManager.invSave(this.envId, taskSequence.bizTask.getBizTaskId(), lpn, taskSequence.executorVo.getLocCode(), taskSequence.carryQty);
});
this.frontendMessagePushService.pushInvUpdate(lpn, taskSequence.loadBasLocationVo, taskSequence.executorVo, taskSequence.carryQty);
FrontendMessagePushService.INSTANCE.pushInvUpdate(this, lpn, taskSequence.loadBasLocationVo, taskSequence.executorVo, taskSequence.carryQty);
}
/**
@ -359,10 +357,6 @@ public class LogisticsRuntime {
this.amrMessageHandler.start(this.env.getEnvConfig().getMqtt(), this.serverId);
this.lccRedisService.start(this.env.getEnvConfig().getRedis(), this.serverId);
var lccMapService = SpringContext.HOLDER.getBean(LccMapService.class);
LccConfigProperties lccConfigProperties = lccMapService.config;
this.frontendMessagePushService.start(lccConfigProperties.getFrontendMqtt(), this.serverId + "_lcc_send");
// 开启所有机器人的任务处理
Set<String> executorTypes = Sets.newHashSet();
for (ExecutorItem executorItem : executorItemMap.values()) {
@ -382,7 +376,7 @@ public class LogisticsRuntime {
this.taskDispatchFactory.startPolling();
// 推送服务状态
this.frontendMessagePushService.pushServerState(this.getState());
FrontendMessagePushService.INSTANCE.pushServerState(this, this.getState());
}
public boolean isRunning() {
@ -435,8 +429,7 @@ public class LogisticsRuntime {
this.amrMessageHandler.stop();
this.lccRedisService.stop();
this.frontendMessagePushService.pushServerState(this.getState());
this.frontendMessagePushService.stop();
FrontendMessagePushService.INSTANCE.pushServerState(this, this.getState());
BannerUtils.printConfig(log, "LogisticsRuntime stop.", new String[]{
"projectUUID: " + this.projectUuid,

57
servo/src/main/java/com/yvan/pusher/FrontendMessagePushService.java

@ -27,10 +27,11 @@ import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class FrontendMessagePushService implements MqttCallback {
public static final FrontendMessagePushService INSTANCE = new FrontendMessagePushService();
private volatile LccConfigProperties.FrontendMqtt mqttConfig;
private volatile String clientId;
private volatile MqttClient mqttClient;
private final LogisticsRuntime runtime;
private final MemoryPersistence persistence = new MemoryPersistence();
private final Lock connectionLock = new ReentrantLock();
@ -46,8 +47,7 @@ public class FrontendMessagePushService implements MqttCallback {
@Getter
private volatile boolean connected = false;
public FrontendMessagePushService(LogisticsRuntime runtime) {
this.runtime = runtime;
private FrontendMessagePushService() {
}
/**
@ -64,7 +64,7 @@ public class FrontendMessagePushService implements MqttCallback {
return;
}
log.info("Starting FRONTEND_MQTT service for project: {}, env: {}", this.runtime.projectUuid, this.runtime.envId);
log.info("Starting FRONTEND_MQTT service for clientId: {}", this.clientId);
// 创建MQTT客户端
mqttClient = new MqttClient(mqttConfig.getBrokerUrl(), clientId, persistence);
@ -139,10 +139,11 @@ public class FrontendMessagePushService implements MqttCallback {
* 推送服务器状态
* /lcc/{proj_id}/{env_id}/server
*
* @param runtime 运行时环境
* @param statusData 状态数据
*/
public void pushServerState(ServerStatusVo statusData) {
String topic = buildTopic("server");
public void pushServerState(LogisticsRuntime runtime, ServerStatusVo statusData) {
String topic = buildTopic(runtime, "server");
publishJson(topic, statusData);
}
@ -150,10 +151,11 @@ public class FrontendMessagePushService implements MqttCallback {
* 推送客户端状态
* /lcc/{proj_id}/{env_id}/client
*
* @param runtime 运行时环境
* @param clientData 客户端数据
*/
public void pushClientState(Map<String, Object> clientData) {
String topic = buildTopic("client");
public void pushClientState(LogisticsRuntime runtime, Map<String, Object> clientData) {
String topic = buildTopic(runtime, "client");
publishJson(topic, clientData);
}
@ -161,10 +163,11 @@ public class FrontendMessagePushService implements MqttCallback {
* 推送任务更新
* /lcc/{proj_id}/{env_id}/task
*
* @param runtime 运行时环境
* @param taskData 任务数据
*/
public void pushTaskUpdate(Object taskData) {
String topic = buildTopic("task");
public void pushTaskUpdate(LogisticsRuntime runtime, Object taskData) {
String topic = buildTopic(runtime, "task");
publishJson(topic, taskData);
}
@ -172,13 +175,14 @@ public class FrontendMessagePushService implements MqttCallback {
* 推送库存更新
* /lcc/{proj_id}/{env_id}/inv
*
* @param runtime 运行时环境
* @param lpn 容器号
* @param before 更新前库存
* @param after 更新后库存
* @param qty 更新数量
*/
public void pushInvUpdate(String lpn, BasLocationVo before, BasLocationVo after, int qty) {
String topic = buildTopic("inv");
public void pushInvUpdate(LogisticsRuntime runtime, String lpn, BasLocationVo before, BasLocationVo after, int qty) {
String topic = buildTopic(runtime, "inv");
publishJson(topic, new InvUpdateVo(
lpn,
@ -192,23 +196,25 @@ public class FrontendMessagePushService implements MqttCallback {
* 推送设备状态
* /lcc/{proj_id}/{env_id}/device/{id}/status
*
* @param runtime 运行时环境
* @param deviceId 设备ID
* @param statusData 状态数据
*/
public void pushDeviceStatus(String deviceId, AgvStatusVo statusData) {
String topic = buildTopic("device/" + deviceId + "/status");
public void pushDeviceStatus(LogisticsRuntime runtime, String deviceId, AgvStatusVo statusData) {
String topic = buildTopic(runtime, "device/" + deviceId + "/status");
publishJson(topic, statusData);
}
/**
* 推送设备存活状态
*
* @param runtime 运行时环境
* @param deviceId 设备ID
* @param type 设备类型
* @param online 是否在线
*/
public void pushDeviceAlive(String deviceId, String type, boolean online) {
String topic = buildTopic("device/" + deviceId + "/alive");
public void pushDeviceAlive(LogisticsRuntime runtime, String deviceId, String type, boolean online) {
String topic = buildTopic(runtime, "device/" + deviceId + "/alive");
Map<String, Object> data = new HashMap<>();
data.put("id", deviceId);
@ -222,11 +228,12 @@ public class FrontendMessagePushService implements MqttCallback {
* 推送日志
* /lcc/{proj_id}/{env_id}/log/{type}
*
* @param runtime 运行时环境
* @param logType 日志类型
* @param logData 日志数据
*/
public void pushLogs(String logType, Object logData) {
String topic = buildTopic("log/" + logType);
public void pushLogs(LogisticsRuntime runtime, String logType, Object logData) {
String topic = buildTopic(runtime, "log/" + logType);
publishJson(topic, logData);
}
@ -234,10 +241,11 @@ public class FrontendMessagePushService implements MqttCallback {
* 推送告警
* /lcc/{proj_id}/{env_id}/alarm
*
* @param runtime 运行时环境
* @param alarmData 告警数据
*/
public void pushAlarm(Object alarmData) {
String topic = buildTopic("alarm");
public void pushAlarm(LogisticsRuntime runtime, Object alarmData) {
String topic = buildTopic(runtime, "alarm");
publishJson(topic, alarmData);
}
@ -245,11 +253,12 @@ public class FrontendMessagePushService implements MqttCallback {
* 推送脚本更新
* /lcc/{proj_id}/script
*
* @param runtime 运行时环境
* @param scriptData 脚本数据
*/
public void pushScriptUpdate(Object scriptData) {
public void pushScriptUpdate(LogisticsRuntime runtime, Object scriptData) {
// 脚本系统没有环境ID
String topic = "/lcc/" + this.runtime.projectUuid + "/script";
String topic = "/lcc/" + runtime.projectUuid + "/script";
publishJson(topic, scriptData);
}
@ -258,8 +267,8 @@ public class FrontendMessagePushService implements MqttCallback {
/**
* 构建主题路径
*/
private String buildTopic(String suffix) {
return "/lcc/" + this.runtime.projectUuid + "/" + this.runtime.envId + "/" + suffix;
private String buildTopic(LogisticsRuntime runtime, String suffix) {
return "/lcc/" + runtime.projectUuid + "/" + runtime.envId + "/" + suffix;
}
/**

7
servo/src/main/java/com/yvan/workbench/controller/InvController.java

@ -7,6 +7,7 @@ import com.google.common.base.Strings;
import com.yvan.entity.BasLocationVo;
import com.yvan.logisticsModel.LogisticsRuntime;
import com.yvan.logisticsModel.LogisticsRuntimeService;
import com.yvan.pusher.FrontendMessagePushService;
import org.clever.core.Conv;
import org.clever.core.model.response.R;
import org.clever.data.jdbc.DaoFactory;
@ -95,7 +96,7 @@ public class InvController {
// 推送前端更新
LogisticsRuntime runtime = LogisticsRuntimeService.INSTANCE.getByProjectEnv(projectUuid, envId);
if (runtime != null) {
runtime.frontendMessagePushService.pushInvUpdate(
FrontendMessagePushService.INSTANCE.pushInvUpdate(runtime,
lpn, null, new BasLocationVo(basLocation), qty
);
}
@ -151,7 +152,7 @@ public class InvController {
// 推送前端更新
LogisticsRuntime runtime = LogisticsRuntimeService.INSTANCE.getByProjectEnv(projectUuid, envId);
if (runtime != null) {
runtime.frontendMessagePushService.pushInvUpdate(
FrontendMessagePushService.INSTANCE.pushInvUpdate(runtime,
lpn, new BasLocationVo(basLocation), null, -inv.getQty()
);
}
@ -231,7 +232,7 @@ public class InvController {
// 推送前端更新
LogisticsRuntime runtime = LogisticsRuntimeService.INSTANCE.getByProjectEnv(projectUuid, envId);
if (runtime != null) {
runtime.frontendMessagePushService.pushInvUpdate(
FrontendMessagePushService.INSTANCE.pushInvUpdate(runtime,
lpn, new BasLocationVo(sourceLocation), new BasLocationVo(basLocation), inv.getQty()
);
}

12
servo/src/main/java/com/yvan/workbench/service/LccServerInfoConnect.java

@ -1,6 +1,10 @@
package com.yvan.workbench.service;
import com.yvan.logisticsModel.SystemMetricsStore;
import com.yvan.pusher.FrontendMessagePushService;
import com.yvan.workbench.SpringContext;
import com.yvan.workbench.autoconfigure.LccConfigProperties;
import lombok.SneakyThrows;
import oshi.SystemInfo;
import oshi.hardware.CentralProcessor;
import oshi.hardware.GlobalMemory;
@ -12,6 +16,7 @@ import oshi.software.os.OperatingSystem;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;
import java.net.InetAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -26,12 +31,18 @@ public class LccServerInfoConnect implements SmartLifecycle {
private long[] lastTicks = null;
private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
@SneakyThrows
@Override
public void start() {
if (running) return;
running = true;
scheduler.scheduleAtFixedRate(this::updateAllMetrics, 0, 30, TimeUnit.SECONDS);
var lccMapService = SpringContext.HOLDER.getBean(LccMapService.class);
LccConfigProperties lccConfigProperties = lccMapService.config;
FrontendMessagePushService.INSTANCE.start(lccConfigProperties.getFrontendMqtt(), InetAddress.getLocalHost().getHostName() + "_lcc_send");
}
@Override
@ -40,6 +51,7 @@ public class LccServerInfoConnect implements SmartLifecycle {
if (!scheduler.isShutdown()) {
scheduler.shutdownNow();
}
FrontendMessagePushService.INSTANCE.stop();
}
@Override

Loading…
Cancel
Save