15 changed files with 915 additions and 238 deletions
@ -1,150 +0,0 @@ |
|||
package com.yvan.logisticsModel; |
|||
|
|||
import com.yvan.logisticsEnv.EnvConfig; |
|||
import lombok.SneakyThrows; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.eclipse.paho.mqttv5.client.*; |
|||
import org.eclipse.paho.mqttv5.common.MqttException; |
|||
import org.eclipse.paho.mqttv5.common.MqttMessage; |
|||
import org.eclipse.paho.mqttv5.common.packet.MqttProperties; |
|||
|
|||
import java.nio.charset.StandardCharsets; |
|||
import java.util.concurrent.CountDownLatch; |
|||
|
|||
@Slf4j |
|||
public class FrontendPushService implements MqttCallback { |
|||
private final LogisticsRuntime runtime; |
|||
public final String rcsFrontendTopic; |
|||
private volatile MqttClient clientForSend; |
|||
private volatile MqttClient client; |
|||
private CountDownLatch connectLatch = new CountDownLatch(1); |
|||
|
|||
public FrontendPushService(LogisticsRuntime runtime) { |
|||
this.runtime = runtime; |
|||
this.rcsFrontendTopic = "/rcs/" + runtime.projectUUID + "/" + runtime.envId + "/frontend"; |
|||
} |
|||
|
|||
@Override |
|||
public void disconnected(MqttDisconnectResponse disconnectResponse) { |
|||
log.info("mqtt disconnected"); |
|||
} |
|||
|
|||
@Override |
|||
public void mqttErrorOccurred(MqttException exception) { |
|||
log.error("mqtt error occurred"); |
|||
} |
|||
|
|||
@Override |
|||
public void messageArrived(String topic, MqttMessage message) throws Exception { |
|||
log.info("Message arrived on topic {}: {}", topic, new String(message.getPayload(), StandardCharsets.UTF_8)); |
|||
} |
|||
|
|||
@Override |
|||
public void deliveryComplete(IMqttToken token) { |
|||
log.info("Message delivery complete: {}", token); |
|||
} |
|||
|
|||
@Override |
|||
public void connectComplete(boolean reconnect, String serverURI) { |
|||
// BannerUtils.printConfig(log, "MQTT 开启监听", new String[]{serverURI + " topic: /agv_robot/status"});
|
|||
log.info("MQTT client connected to server: {}", serverURI); |
|||
connectLatch.countDown(); // 放行
|
|||
} |
|||
|
|||
@Override |
|||
public void authPacketArrived(int reasonCode, MqttProperties properties) { |
|||
} |
|||
|
|||
@SneakyThrows |
|||
public void stop() { |
|||
if (client != null && client.isConnected()) { |
|||
client.disconnect(); |
|||
client.close(); |
|||
log.info("MQTT client disconnected and closed."); |
|||
} else { |
|||
log.warn("MQTT client is not connected, no action taken."); |
|||
} |
|||
|
|||
if (clientForSend != null && clientForSend.isConnected()) { |
|||
clientForSend.disconnect(); |
|||
clientForSend.close(); |
|||
log.info("MQTT clientForSend disconnected and closed."); |
|||
} else { |
|||
log.warn("MQTT clientForSend is not connected, no action taken."); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 创建一个表示服务器离线的最后消息 |
|||
*/ |
|||
public MqttMessage createWillMessage() { |
|||
MqttMessage lastMessage = new MqttMessage(); |
|||
String msg = String.format(""" |
|||
{ |
|||
"id": "serverOffline", |
|||
"content": { |
|||
"project": "%s", |
|||
"env": %s, |
|||
"status": "offline", |
|||
"server": "%s", |
|||
} |
|||
} |
|||
""", runtime.projectUUID, runtime.envId, runtime.clientId); |
|||
lastMessage.setPayload(msg.getBytes(StandardCharsets.UTF_8)); |
|||
lastMessage.setQos(0); |
|||
lastMessage.setRetained(true); |
|||
return lastMessage; |
|||
} |
|||
|
|||
@SneakyThrows |
|||
public void start(EnvConfig.MqttConfig mqttConfig) { |
|||
String brokerUrl = mqttConfig.getBrokerUrl(); // "tcp://10.10.203.239:1885"
|
|||
String username = mqttConfig.getUsername(); // admin
|
|||
String password = mqttConfig.getPassword(); // admin
|
|||
|
|||
clientForSend = new MqttClient(brokerUrl, runtime.clientId + "_front_send"); // String clientId = "LUOYIFAN-PC_send"
|
|||
client = new MqttClient(brokerUrl, runtime.clientId + "_front"); // String clientId = "LUOYIFAN-PC"
|
|||
MqttConnectionOptions options = new MqttConnectionOptions(); |
|||
options.setServerURIs(new String[]{brokerUrl}); |
|||
options.setAutomaticReconnect(true); |
|||
options.setUserName(username); |
|||
options.setPassword(password.getBytes()); |
|||
options.setConnectionTimeout(1); |
|||
options.setKeepAliveInterval(20); |
|||
options.setExecutorServiceTimeout(1); |
|||
// 最后一条消息,表示服务离线
|
|||
options.setWill(this.rcsFrontendTopic, createWillMessage()); |
|||
|
|||
client.setCallback(this); |
|||
client.connect(options); |
|||
|
|||
clientForSend.connect(options); |
|||
connectLatch.await(); |
|||
|
|||
// 发送一条消息代表上线
|
|||
this.publish(String.format(""" |
|||
{ |
|||
"id": "serverOnline", |
|||
"content": { |
|||
"project": "%s", |
|||
"env": %s, |
|||
"status": "online", |
|||
"server": "%s" |
|||
} |
|||
} |
|||
""", runtime.projectUUID, runtime.envId, runtime.clientId)); |
|||
} |
|||
|
|||
@SneakyThrows |
|||
void publish(String payloadString) { |
|||
MqttMessage message = new MqttMessage(payloadString.getBytes(StandardCharsets.UTF_8)); |
|||
message.setQos(0); |
|||
message.setRetained(false); |
|||
|
|||
if (this.clientForSend.isConnected()) { |
|||
this.clientForSend.publish(this.rcsFrontendTopic, message); |
|||
} else { |
|||
throw new RuntimeException("MQTT client is not connected"); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,56 @@ |
|||
package com.yvan.logisticsModel; |
|||
|
|||
import java.util.HashMap; |
|||
import java.util.Map; |
|||
|
|||
public class RuntimeState { |
|||
public String projectUuid; |
|||
public long envId; |
|||
public boolean isVirtual; |
|||
public String serverId; |
|||
public boolean isRunning; |
|||
|
|||
public long startTime; |
|||
public long stopTime; |
|||
public float timeRate; |
|||
public String[] subSystemList; |
|||
|
|||
public float cpuUsage; |
|||
public float memoryUsage; |
|||
public float diskIoLoad; |
|||
/** |
|||
* 空闲内存,单位GB |
|||
*/ |
|||
public float freeMemory; |
|||
/** |
|||
* 磁盘剩余空间,单位GB |
|||
*/ |
|||
public float diskFreeSpace; |
|||
|
|||
public void fillSystemInfos() { |
|||
this.cpuUsage = SystemMetricsStore.cpuUsage; |
|||
this.memoryUsage = SystemMetricsStore.memoryUsage; |
|||
this.freeMemory = SystemMetricsStore.freeMemory; |
|||
this.diskIoLoad = SystemMetricsStore.diskIoLoad; |
|||
this.diskFreeSpace = SystemMetricsStore.diskFreeSpace; |
|||
} |
|||
|
|||
public Map<String, Object> toMap() { |
|||
Map<String, Object> map = new HashMap<>(); |
|||
map.put("projectUuid", projectUuid); |
|||
map.put("envId", envId); |
|||
map.put("isVirtual", isVirtual); |
|||
map.put("serverId", serverId); |
|||
map.put("isRunning", isRunning); |
|||
map.put("startTime", startTime); |
|||
map.put("stopTime", stopTime); |
|||
map.put("timeRate", timeRate); |
|||
map.put("subSystemList", subSystemList); |
|||
map.put("cpuUsage", cpuUsage); |
|||
map.put("memoryUsage", memoryUsage); |
|||
map.put("diskIoLoad", diskIoLoad); |
|||
map.put("freeMemory", freeMemory); |
|||
map.put("diskFreeSpace", diskFreeSpace); |
|||
return map; |
|||
} |
|||
} |
|||
@ -0,0 +1,10 @@ |
|||
package com.yvan.logisticsModel; |
|||
|
|||
|
|||
public class SystemMetricsStore { |
|||
public static volatile float cpuUsage; |
|||
public static volatile float memoryUsage; |
|||
public static volatile float diskIoLoad; |
|||
public static volatile float freeMemory; |
|||
public static volatile float diskFreeSpace; |
|||
} |
|||
@ -0,0 +1,345 @@ |
|||
package com.yvan.mqtt; |
|||
|
|||
import com.yvan.logisticsEnv.EnvConfig; |
|||
import com.yvan.logisticsModel.LogisticsRuntime; |
|||
import lombok.Getter; |
|||
import lombok.SneakyThrows; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.clever.core.mapper.JacksonMapper; |
|||
import org.eclipse.paho.mqttv5.client.*; |
|||
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; |
|||
import org.eclipse.paho.mqttv5.common.MqttException; |
|||
import org.eclipse.paho.mqttv5.common.MqttMessage; |
|||
import org.eclipse.paho.mqttv5.common.packet.MqttProperties; |
|||
|
|||
import java.nio.charset.StandardCharsets; |
|||
import java.util.HashMap; |
|||
import java.util.Map; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.locks.Lock; |
|||
import java.util.concurrent.locks.ReentrantLock; |
|||
|
|||
@Slf4j |
|||
public class LccMqttService implements MqttCallback { |
|||
private volatile EnvConfig.MqttConfig mqttConfig; |
|||
private volatile String clientId; |
|||
private volatile MqttClient mqttClient; |
|||
private final LogisticsRuntime runtime; |
|||
private final MemoryPersistence persistence = new MemoryPersistence(); |
|||
|
|||
private final Lock connectionLock = new ReentrantLock(); |
|||
|
|||
// QoS 配置
|
|||
private static final int QOS = 1; |
|||
|
|||
// 重连参数
|
|||
private static final int MAX_RETRIES = 5; |
|||
private static final int RETRY_INTERVAL = 3; // 秒
|
|||
|
|||
// 连接状态
|
|||
@Getter |
|||
private volatile boolean connected = false; |
|||
|
|||
public LccMqttService(LogisticsRuntime runtime) { |
|||
this.runtime = runtime; |
|||
} |
|||
|
|||
/** |
|||
* 启动MQTT服务 |
|||
*/ |
|||
@SneakyThrows |
|||
public void start(EnvConfig.MqttConfig mqttConfig, String clientId) { |
|||
this.mqttConfig = mqttConfig; |
|||
this.clientId = clientId; |
|||
connectionLock.lock(); |
|||
try { |
|||
if (connected) { |
|||
log.warn("MQTT service is already started"); |
|||
return; |
|||
} |
|||
|
|||
log.info("Starting MQTT service for project: {}, env: {}", this.runtime.projectUuid, this.runtime.envId); |
|||
|
|||
// 创建MQTT客户端
|
|||
mqttClient = new MqttClient(mqttConfig.getBrokerUrl(), clientId, persistence); |
|||
mqttClient.setCallback(this); |
|||
|
|||
// 配置连接选项
|
|||
MqttConnectionOptions options = new MqttConnectionOptions(); |
|||
options.setServerURIs(new String[]{mqttConfig.getBrokerUrl()}); |
|||
options.setUserName(mqttConfig.getUsername()); |
|||
options.setPassword(mqttConfig.getPassword().getBytes()); |
|||
options.setAutomaticReconnect(true); |
|||
options.setConnectionTimeout(10); |
|||
options.setKeepAliveInterval(60); |
|||
options.setExecutorServiceTimeout(1); |
|||
|
|||
// 尝试连接
|
|||
int attempts = 0; |
|||
while (attempts < MAX_RETRIES && !connected) { |
|||
attempts++; |
|||
try { |
|||
log.info("Connecting to MQTT broker (attempt {}/{})", attempts, MAX_RETRIES); |
|||
mqttClient.connect(options); |
|||
connected = true; |
|||
log.info("MQTT connected successfully"); |
|||
} catch (MqttException e) { |
|||
log.error("MQTT connection failed (attempt " + attempts + "/{" + MAX_RETRIES + "})", e); |
|||
|
|||
// 重试前等待
|
|||
if (attempts < MAX_RETRIES) { |
|||
try { |
|||
TimeUnit.SECONDS.sleep(RETRY_INTERVAL); |
|||
} catch (InterruptedException ie) { |
|||
Thread.currentThread().interrupt(); |
|||
break; |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
if (!connected) { |
|||
log.error("Failed to connect to MQTT broker after {} attempts", MAX_RETRIES); |
|||
} |
|||
} finally { |
|||
connectionLock.unlock(); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 停止MQTT服务 |
|||
*/ |
|||
public void stop() { |
|||
connectionLock.lock(); |
|||
try { |
|||
if (mqttClient != null && mqttClient.isConnected()) { |
|||
try { |
|||
mqttClient.disconnect(); |
|||
mqttClient.close(); |
|||
log.info("MQTT disconnected"); |
|||
} catch (MqttException e) { |
|||
log.error("Error disconnecting MQTT", e); |
|||
} |
|||
} |
|||
connected = false; |
|||
} finally { |
|||
connectionLock.unlock(); |
|||
} |
|||
} |
|||
|
|||
// ==================== 消息推送方法 ====================
|
|||
|
|||
/** |
|||
* 推送服务器状态 |
|||
* /lcc/{proj_id}/{env_id}/server |
|||
* |
|||
* @param statusData 状态数据 |
|||
*/ |
|||
public void pushServerState(Map<String, Object> statusData) { |
|||
String topic = buildTopic("server"); |
|||
publishJson(topic, statusData); |
|||
} |
|||
|
|||
/** |
|||
* 推送客户端状态 |
|||
* /lcc/{proj_id}/{env_id}/client |
|||
* |
|||
* @param clientData 客户端数据 |
|||
*/ |
|||
public void pushClientState(Map<String, Object> clientData) { |
|||
String topic = buildTopic("client"); |
|||
publishJson(topic, clientData); |
|||
} |
|||
|
|||
/** |
|||
* 推送任务更新 |
|||
* /lcc/{proj_id}/{env_id}/task |
|||
* |
|||
* @param taskData 任务数据 |
|||
*/ |
|||
public void pushTaskUpdate(Object taskData) { |
|||
String topic = buildTopic("task"); |
|||
publishJson(topic, taskData); |
|||
} |
|||
|
|||
/** |
|||
* 推送库存更新 |
|||
* /lcc/{proj_id}/{env_id}/inv/{catalogCode} |
|||
* |
|||
* @param catalogCode 货位目录编码 |
|||
* @param before 更新前库存 |
|||
* @param after 更新后库存 |
|||
*/ |
|||
public void pushInventoryUpdate(String catalogCode, Object before, Object after) { |
|||
String topic = buildTopic("inv/" + catalogCode); |
|||
|
|||
Map<String, Object> data = new HashMap<>(); |
|||
data.put("before", before); |
|||
data.put("after", after); |
|||
|
|||
publishJson(topic, data); |
|||
} |
|||
|
|||
/** |
|||
* 推送设备状态 |
|||
* /lcc/{proj_id}/{env_id}/device/{id}/status |
|||
* |
|||
* @param deviceId 设备ID |
|||
* @param statusData 状态数据 |
|||
*/ |
|||
public void pushDeviceStatus(String deviceId, Map<String, Object> statusData) { |
|||
String topic = buildTopic("device/" + deviceId + "/status"); |
|||
publishJson(topic, statusData); |
|||
} |
|||
|
|||
/** |
|||
* 推送设备存活状态 |
|||
* |
|||
* @param deviceId 设备ID |
|||
* @param online 是否在线 |
|||
*/ |
|||
public void pushDeviceAlive(String deviceId, boolean online) { |
|||
String topic = buildTopic("device/" + deviceId + "/alive"); |
|||
publishString(topic, online ? "online" : "offline"); |
|||
} |
|||
|
|||
/** |
|||
* 推送日志 |
|||
* /lcc/{proj_id}/{env_id}/log/{type} |
|||
* |
|||
* @param logType 日志类型 |
|||
* @param logData 日志数据 |
|||
*/ |
|||
public void pushLogs(String logType, Object logData) { |
|||
String topic = buildTopic("log/" + logType); |
|||
publishJson(topic, logData); |
|||
} |
|||
|
|||
/** |
|||
* 推送告警 |
|||
* /lcc/{proj_id}/{env_id}/alarm |
|||
* |
|||
* @param alarmData 告警数据 |
|||
*/ |
|||
public void pushAlarm(Object alarmData) { |
|||
String topic = buildTopic("alarm"); |
|||
publishJson(topic, alarmData); |
|||
} |
|||
|
|||
/** |
|||
* 推送脚本更新 |
|||
* /lcc/{proj_id}/script |
|||
* |
|||
* @param scriptData 脚本数据 |
|||
*/ |
|||
public void pushScriptUpdate(Object scriptData) { |
|||
// 脚本系统没有环境ID
|
|||
String topic = "/lcc/" + this.runtime.projectUuid + "/script"; |
|||
publishJson(topic, scriptData); |
|||
} |
|||
|
|||
// ==================== 内部工具方法 ====================
|
|||
|
|||
/** |
|||
* 构建主题路径 |
|||
*/ |
|||
private String buildTopic(String suffix) { |
|||
return "/lcc/" + this.runtime.projectUuid + "/" + this.runtime.envId + "/" + suffix; |
|||
} |
|||
|
|||
/** |
|||
* 发布JSON数据 |
|||
*/ |
|||
private void publishJson(String topic, Object data) { |
|||
try { |
|||
String json = JacksonMapper.getInstance().toJson(data); |
|||
publish(topic, json); |
|||
} catch (Exception e) { |
|||
log.error("Failed to serialize JSON for topic: " + topic, e); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 发布字符串数据 |
|||
*/ |
|||
private void publishString(String topic, String message) { |
|||
publish(topic, message); |
|||
} |
|||
|
|||
/** |
|||
* 通用发布方法 |
|||
*/ |
|||
private void publish(String topic, String payload) { |
|||
if (!connected) { |
|||
log.error("Attempted to publish while disconnected: {}", topic); |
|||
return; |
|||
} |
|||
|
|||
try { |
|||
MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8)); |
|||
message.setQos(QOS); |
|||
message.setRetained(false); |
|||
|
|||
mqttClient.publish(topic, message); |
|||
|
|||
log.debug("Published to {}: {}", topic, payload); |
|||
} catch (MqttException e) { |
|||
log.error("Failed to publish to topic: " + topic, e); |
|||
} |
|||
} |
|||
|
|||
// ==================== MQTT 回调方法 ====================
|
|||
|
|||
@Override |
|||
public void disconnected(MqttDisconnectResponse disconnectResponse) { |
|||
log.warn("MQTT disconnected: {}", disconnectResponse); |
|||
connectionLock.lock(); |
|||
try { |
|||
connected = false; |
|||
} finally { |
|||
connectionLock.unlock(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void mqttErrorOccurred(MqttException exception) { |
|||
log.error("MQTT error occurred", exception); |
|||
} |
|||
|
|||
@Override |
|||
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { |
|||
log.debug("Message arrived on topic: {}", topic); |
|||
// 作为服务端,我们只发布消息,不接收消息
|
|||
// 可以记录或处理意外收到的消息
|
|||
} |
|||
|
|||
@Override |
|||
public void deliveryComplete(IMqttToken iMqttToken) { |
|||
try { |
|||
String[] topics = iMqttToken.getTopics(); |
|||
if (topics != null && topics.length > 0) { |
|||
log.info("Message delivery confirmed for topic: {}", topics[0]); |
|||
} else { |
|||
log.info("Message delivery confirmed (no topic info)"); |
|||
} |
|||
} catch (Exception e) { |
|||
log.warn("Error getting delivery token info", e); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void connectComplete(boolean reconnect, String serverURI) { |
|||
log.info("MQTT connection complete: reconnect={}, serverURI={}", reconnect, serverURI); |
|||
connectionLock.lock(); |
|||
try { |
|||
connected = true; |
|||
} finally { |
|||
connectionLock.unlock(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void authPacketArrived(int i, MqttProperties mqttProperties) { |
|||
log.info("MQTT authPacketArrived({}, {})", i, mqttProperties); |
|||
} |
|||
} |
|||
@ -0,0 +1,238 @@ |
|||
package com.yvan.redis; |
|||
|
|||
import com.yvan.logisticsEnv.EnvConfig; |
|||
import com.yvan.logisticsModel.LogisticsRuntime; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.clever.data.redis.Redis; |
|||
import org.clever.data.redis.RedisAdmin; |
|||
|
|||
import java.util.HashMap; |
|||
import java.util.Map; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.concurrent.ScheduledExecutorService; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
@Slf4j |
|||
public class LccRedisService { |
|||
private final LogisticsRuntime runtime; |
|||
private volatile Redis redis; |
|||
private String serverId; // 客户端ID
|
|||
private volatile EnvConfig.RedisConfig config; |
|||
|
|||
// 心跳管理
|
|||
private final ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor(); |
|||
private final Map<String, Long> heartbeatKeys = new HashMap<>(); |
|||
|
|||
// 设备存活状态管理
|
|||
private final Map<String, Long> deviceAliveKeys = new HashMap<>(); |
|||
|
|||
public LccRedisService(LogisticsRuntime runtime) { |
|||
this.runtime = runtime; |
|||
} |
|||
|
|||
public void start(EnvConfig.RedisConfig config, String clientId) { |
|||
redis = RedisAdmin.getRedis(); |
|||
this.serverId = clientId; |
|||
|
|||
// 启动服务器心跳
|
|||
heartbeatScheduler.scheduleAtFixedRate( |
|||
this::updateServerHeartbeat, |
|||
0, 5, TimeUnit.SECONDS |
|||
); |
|||
|
|||
log.info("LccRedisService started for project: " + runtime.projectUuid + ", env: " + runtime.envId); |
|||
} |
|||
|
|||
/** |
|||
* 停止服务 |
|||
*/ |
|||
public void stop() { |
|||
// 停止心跳调度
|
|||
heartbeatScheduler.shutdown(); |
|||
try { |
|||
if (!heartbeatScheduler.awaitTermination(5, TimeUnit.SECONDS)) { |
|||
heartbeatScheduler.shutdownNow(); |
|||
} |
|||
} catch (InterruptedException e) { |
|||
heartbeatScheduler.shutdownNow(); |
|||
Thread.currentThread().interrupt(); |
|||
} |
|||
|
|||
// 清除所有心跳状态
|
|||
clearAllHeartbeats(); |
|||
|
|||
log.info("LccRedisService stopped for project: " + runtime.projectUuid + ", env: " + runtime.envId); |
|||
} |
|||
|
|||
// ==================== 心跳管理 ====================
|
|||
|
|||
/** |
|||
* 更新服务器心跳 |
|||
*/ |
|||
private void updateServerHeartbeat() { |
|||
// 服务器状态
|
|||
Map<String, Object> statusData = this.runtime.getState().toMap(); |
|||
|
|||
String statusKey = buildKey("server/rcs/status"); |
|||
redis.hPutAll(statusKey, statusData); |
|||
|
|||
// 服务器存活状态
|
|||
String aliveKey = buildKey("server/rcs/alive"); |
|||
redis.vSet(aliveKey, this.serverId); |
|||
redis.kExpire(aliveKey, 15); // 15秒过期
|
|||
|
|||
heartbeatKeys.put(statusKey, System.currentTimeMillis()); |
|||
heartbeatKeys.put(aliveKey, System.currentTimeMillis()); |
|||
} |
|||
|
|||
/** |
|||
* 更新客户端心跳 |
|||
*/ |
|||
private void updateClientHeartbeat(String clientId, String clientUser) { |
|||
// 客户端状态
|
|||
Map<String, String> statusData = new HashMap<>(); |
|||
statusData.put("clientId", clientId); |
|||
statusData.put("user", clientUser); |
|||
statusData.put("timestamp", String.valueOf(System.currentTimeMillis())); |
|||
|
|||
String statusKey = buildKey("client/status"); |
|||
redis.hPutAll(statusKey, statusData); |
|||
|
|||
// 客户端存活状态
|
|||
String aliveKey = buildKey("client/alive"); |
|||
redis.vSet(aliveKey, clientId); |
|||
redis.kExpire(aliveKey, 15); // 15秒过期
|
|||
|
|||
heartbeatKeys.put(statusKey, System.currentTimeMillis()); |
|||
heartbeatKeys.put(aliveKey, System.currentTimeMillis()); |
|||
} |
|||
|
|||
/** |
|||
* 更新设备存活状态 |
|||
* |
|||
* @param deviceId 设备ID |
|||
*/ |
|||
public void updateDeviceAlive(String deviceId) { |
|||
String aliveKey = buildKey("device/" + deviceId + "/alive"); |
|||
redis.vSet(aliveKey, "1"); |
|||
redis.kExpire(aliveKey, 5); // 5秒过期
|
|||
|
|||
deviceAliveKeys.put(aliveKey, System.currentTimeMillis()); |
|||
} |
|||
|
|||
/** |
|||
* 清除所有心跳状态 |
|||
*/ |
|||
private void clearAllHeartbeats() { |
|||
// 清除服务器心跳
|
|||
for (String key : heartbeatKeys.keySet()) { |
|||
redis.kDelete(key); |
|||
} |
|||
|
|||
// 清除设备心跳
|
|||
for (String key : deviceAliveKeys.keySet()) { |
|||
redis.kDelete(key); |
|||
} |
|||
} |
|||
|
|||
// ==================== 监控数据管理 ====================
|
|||
// 1. 服务器状态
|
|||
|
|||
/** |
|||
* 获取服务器状态 |
|||
*/ |
|||
public Map<String, String> getServerStatus() { |
|||
String key = buildKey("server/rcs/status"); |
|||
return (Map<String, String>) ((Map) redis.hEntries(key)); |
|||
} |
|||
|
|||
/** |
|||
* 检查服务器是否存活 |
|||
*/ |
|||
public boolean isServerAlive() { |
|||
String key = buildKey("server/rcs/alive"); |
|||
return redis.kHasKey(key); |
|||
} |
|||
|
|||
// 2. 客户端状态
|
|||
|
|||
/** |
|||
* 获取客户端状态 |
|||
*/ |
|||
public Map<String, String> getClientStatus() { |
|||
String key = buildKey("client/status"); |
|||
return (Map<String, String>) ((Map) redis.hEntries(key)); |
|||
} |
|||
|
|||
/** |
|||
* 检查客户端是否存活 |
|||
*/ |
|||
public boolean isClientAlive() { |
|||
String key = buildKey("client/alive"); |
|||
return redis.kHasKey(key); |
|||
} |
|||
|
|||
// 5. 设备监控
|
|||
|
|||
/** |
|||
* 保存设备状态 |
|||
* |
|||
* @param deviceId 设备ID |
|||
* @param statusData 状态数据 |
|||
*/ |
|||
public void saveDeviceStatus(String deviceId, Map<String, String> statusData) { |
|||
String key = buildKey("device/" + deviceId + "/status"); |
|||
redis.hPutAll(key, statusData); |
|||
} |
|||
|
|||
/** |
|||
* 获取设备状态 |
|||
* |
|||
* @param deviceId 设备ID |
|||
*/ |
|||
public Map<String, String> getDeviceStatus(String deviceId) { |
|||
String key = buildKey("device/" + deviceId + "/status"); |
|||
return (Map<String, String>) ((Map) redis.hEntries(key)); |
|||
} |
|||
|
|||
/** |
|||
* 检查设备是否存活 |
|||
* |
|||
* @param deviceId 设备ID |
|||
*/ |
|||
public boolean isDeviceAlive(String deviceId) { |
|||
String key = buildKey("device/" + deviceId + "/alive"); |
|||
return redis.kHasKey(key); |
|||
} |
|||
|
|||
// ==================== 工具方法 ====================
|
|||
|
|||
/** |
|||
* 构建完整的Redis Key |
|||
*/ |
|||
private String buildKey(String suffix) { |
|||
return "/lcc/" + runtime.projectUuid + "/" + runtime.envId + "/" + suffix; |
|||
} |
|||
|
|||
/* |
|||
private ScheduledExecutorService schedulerOfWriteToRedisAlive = Executors.newSingleThreadScheduledExecutor(); |
|||
private ScheduledExecutorService schedulerOfWriteToRedisStatus = Executors.newSingleThreadScheduledExecutor(); |
|||
|
|||
public void writeToRedisStatus() { |
|||
String redisKey = String.format("/lcc/%s/%s/server/rcs/status", this.projectUuid, this.envId); |
|||
|
|||
var currentMap = this.getState().toMap(); |
|||
var originMap = redis.hMultiGet(redisKey, currentMap.keySet()); |
|||
|
|||
// 对比两个 map 如果有不一致的地方, 通知状态变更
|
|||
|
|||
redis.hPutAll(redisKey, currentMap); |
|||
} |
|||
|
|||
public void writeToRedisAlive() { |
|||
String redisKey = String.format("/lcc/%s/%s/server/rcs/alive", this.projectUuid, this.envId); |
|||
redis.vSet(redisKey, this.serverId); |
|||
redis.kExpire(redisKey, 15); // 15秒过期
|
|||
} |
|||
*/ |
|||
} |
|||
@ -1,15 +1,106 @@ |
|||
package com.yvan.workbench.controller; |
|||
|
|||
import com.galaxis.rcs.RCSService; |
|||
import com.google.common.base.Strings; |
|||
import com.yvan.workbench.SpringContext; |
|||
import com.yvan.workbench.service.LccMapService; |
|||
import org.clever.core.Conv; |
|||
import org.clever.core.model.response.R; |
|||
import org.clever.data.jdbc.DaoFactory; |
|||
import org.clever.data.jdbc.QueryDSL; |
|||
import org.clever.data.jdbc.querydsl.utils.QueryDslUtils; |
|||
import org.clever.web.mvc.annotation.RequestBody; |
|||
|
|||
import java.util.Map; |
|||
|
|||
import static com.galaxis.rcs.common.query.QLccBasContainer.lccBasContainer; |
|||
import static com.galaxis.rcs.common.query.QLccBasLocation.lccBasLocation; |
|||
import static com.galaxis.rcs.common.query.QLccInvLpn.lccInvLpn; |
|||
|
|||
/** |
|||
* /api/workbench/LccController@get1 |
|||
* LCC API 服务端实现 |
|||
*/ |
|||
public class LccController { |
|||
static final QueryDSL queryDSL = DaoFactory.getQueryDSL(); |
|||
|
|||
public static R<?> getAllProjects() { |
|||
var mapService = SpringContext.HOLDER.getBean(LccMapService.class); |
|||
return R.success(mapService.getAllProjects()); |
|||
} |
|||
|
|||
public static R<?> projectStart(@RequestBody Map<String, Object> params) { |
|||
String projectUuid = Conv.asString(params.get("projectUUID")); |
|||
Long envId = Conv.asLong(params.get("envId")); |
|||
|
|||
if (Strings.isNullOrEmpty(projectUuid)) { |
|||
return R.fail("projectUUID Must not be empty"); |
|||
} |
|||
if (envId == null || envId < 0) { |
|||
return R.fail("envId Must not be empty"); |
|||
} |
|||
|
|||
// 启动 RCS 服务器
|
|||
RCSService.projectStart(projectUuid, envId); |
|||
|
|||
// 启动MFC服务器 / 启动WCS服务器 / 启动PES服务器 等等
|
|||
|
|||
return R.success("Project started successfully"); |
|||
} |
|||
|
|||
public static R<?> projectStop(@RequestBody Map<String, Object> params) { |
|||
String projectUuid = Conv.asString(params.get("projectUUID")); |
|||
Long envId = Conv.asLong(params.get("envId")); |
|||
|
|||
if (Strings.isNullOrEmpty(projectUuid)) { |
|||
return R.fail("projectUUID Must not be empty"); |
|||
} |
|||
if (envId == null || envId < 0) { |
|||
return R.fail("envId Must not be empty"); |
|||
} |
|||
|
|||
// 停止RCS服务器
|
|||
RCSService.projectStop(projectUuid, envId); |
|||
|
|||
// 停止MFC服务器 / 停止WCS服务器 / 停止PES服务器 等等
|
|||
return R.success("Project stopped successfully"); |
|||
} |
|||
|
|||
|
|||
public static R<?> getAllInv(@RequestBody Map<String, Object> params) { |
|||
String catalogCode = Conv.asString(params.get("catalogCode")); |
|||
Long envId = Conv.asLong(params.get("envId")); |
|||
|
|||
if (Strings.isNullOrEmpty(catalogCode)) { |
|||
return R.fail("catalogCode must not be null"); |
|||
} |
|||
if (envId == null) { |
|||
return R.fail("envId must not be null"); |
|||
} |
|||
|
|||
var list = queryDSL.select(QueryDslUtils.linkedMap( |
|||
lccInvLpn.lpn, |
|||
lccBasContainer.containerType, |
|||
lccInvLpn.locCode, |
|||
lccBasLocation.rack, |
|||
lccBasLocation.bay, |
|||
lccBasLocation.level, |
|||
lccBasLocation.cell |
|||
)) |
|||
.from(lccInvLpn) |
|||
.innerJoin(lccBasLocation).on( |
|||
lccInvLpn.locCode.eq(lccBasLocation.locCode).and( |
|||
lccBasLocation.envId.eq(lccBasLocation.envId) |
|||
) |
|||
) |
|||
.innerJoin(lccBasContainer).on( |
|||
lccInvLpn.lpn.eq(lccInvLpn.lpn).and( |
|||
lccBasContainer.envId.eq(lccInvLpn.envId) |
|||
) |
|||
) |
|||
.where(lccInvLpn.envId.eq(envId)) |
|||
.where(lccBasLocation.catalogCode.eq(catalogCode)) |
|||
.fetch(); |
|||
|
|||
return R.success(list); |
|||
} |
|||
} |
|||
|
|||
@ -0,0 +1,124 @@ |
|||
package com.yvan.workbench.service; |
|||
|
|||
import com.yvan.logisticsModel.SystemMetricsStore; |
|||
import oshi.SystemInfo; |
|||
import oshi.hardware.CentralProcessor; |
|||
import oshi.hardware.GlobalMemory; |
|||
import oshi.hardware.HardwareAbstractionLayer; |
|||
import oshi.software.os.FileSystem; |
|||
import oshi.software.os.OSFileStore; |
|||
import oshi.software.os.OperatingSystem; |
|||
|
|||
import org.springframework.context.SmartLifecycle; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.util.concurrent.Executors; |
|||
import java.util.concurrent.ScheduledExecutorService; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
@Component |
|||
public class LccServerInfoConnect implements SmartLifecycle { |
|||
|
|||
private boolean running = false; |
|||
private final SystemInfo systemInfo = new SystemInfo(); |
|||
private final HardwareAbstractionLayer hardware = systemInfo.getHardware(); |
|||
private final OperatingSystem os = systemInfo.getOperatingSystem(); |
|||
private long[] lastTicks = null; |
|||
private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); |
|||
|
|||
@Override |
|||
public void start() { |
|||
if (running) return; |
|||
|
|||
running = true; |
|||
scheduler.scheduleAtFixedRate(this::updateAllMetrics, 0, 30, TimeUnit.SECONDS); |
|||
} |
|||
|
|||
@Override |
|||
public void stop() { |
|||
running = false; |
|||
if (!scheduler.isShutdown()) { |
|||
scheduler.shutdownNow(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public boolean isRunning() { |
|||
return running; |
|||
} |
|||
|
|||
private void updateAllMetrics() { |
|||
updateCpuUsage(); |
|||
updateMemoryUsage(); |
|||
updateDiskIoLoad(); |
|||
updateDiskFreeSpace(); |
|||
} |
|||
|
|||
private void updateCpuUsage() { |
|||
CentralProcessor processor = hardware.getProcessor(); |
|||
if (lastTicks == null) { |
|||
lastTicks = processor.getSystemCpuLoadTicks(); // 初始化一次
|
|||
try { |
|||
Thread.sleep(50); // 初次需要等待一点时间才能获取有效数据
|
|||
} catch (InterruptedException ignored) { |
|||
} |
|||
} |
|||
long[] currentTicks = processor.getSystemCpuLoadTicks(); |
|||
|
|||
double cpuLoad = processor.getSystemCpuLoadBetweenTicks(lastTicks); |
|||
lastTicks = currentTicks; |
|||
|
|||
SystemMetricsStore.cpuUsage = (float) (cpuLoad * 100); |
|||
} |
|||
|
|||
private void updateMemoryUsage() { |
|||
GlobalMemory memory = hardware.getMemory(); |
|||
long total = memory.getTotal(); |
|||
long available = memory.getAvailable(); |
|||
long used = total - available; |
|||
SystemMetricsStore.memoryUsage = (float) ((double) used / total * 100); |
|||
SystemMetricsStore.freeMemory = available / (1024f * 1024 * 1024); // GB
|
|||
} |
|||
|
|||
private void updateDiskIoLoad() { |
|||
CentralProcessor processor = hardware.getProcessor(); |
|||
long[] prevTicks = processor.getSystemCpuLoadTicks(); |
|||
try { |
|||
Thread.sleep(500); |
|||
} catch (InterruptedException ignored) { |
|||
} |
|||
long[] currTicks = processor.getSystemCpuLoadTicks(); |
|||
|
|||
long busyTime = (currTicks[CentralProcessor.TickType.USER.getIndex()] |
|||
+ currTicks[CentralProcessor.TickType.NICE.getIndex()] |
|||
+ currTicks[CentralProcessor.TickType.SYSTEM.getIndex()] |
|||
+ currTicks[CentralProcessor.TickType.IRQ.getIndex()]) |
|||
- (prevTicks[CentralProcessor.TickType.USER.getIndex()] |
|||
+ prevTicks[CentralProcessor.TickType.NICE.getIndex()] |
|||
+ prevTicks[CentralProcessor.TickType.SYSTEM.getIndex()] |
|||
+ prevTicks[CentralProcessor.TickType.IRQ.getIndex()]); |
|||
long idleTime = (currTicks[CentralProcessor.TickType.IDLE.getIndex()] |
|||
+ currTicks[CentralProcessor.TickType.IOWAIT.getIndex()]) |
|||
- (prevTicks[CentralProcessor.TickType.IDLE.getIndex()] |
|||
+ prevTicks[CentralProcessor.TickType.IOWAIT.getIndex()]); |
|||
double total = busyTime + idleTime; |
|||
float ioWait = total == 0 ? 0 : (float) ((currTicks[CentralProcessor.TickType.IOWAIT.getIndex()] |
|||
- prevTicks[CentralProcessor.TickType.IOWAIT.getIndex()]) / total * 100); |
|||
SystemMetricsStore.diskIoLoad = ioWait; |
|||
} |
|||
|
|||
private void updateDiskFreeSpace() { |
|||
FileSystem fileSystem = os.getFileSystem(); |
|||
float minFree = Float.MAX_VALUE; |
|||
for (OSFileStore fs : fileSystem.getFileStores()) { |
|||
long usableSpace = fs.getUsableSpace(); |
|||
float freeGb = usableSpace / (1024f * 1024 * 1024); |
|||
if (freeGb < minFree) { |
|||
minFree = freeGb; |
|||
} |
|||
} |
|||
if (minFree != Float.MAX_VALUE) { |
|||
SystemMetricsStore.diskFreeSpace = minFree; |
|||
} |
|||
} |
|||
} |
|||
Loading…
Reference in new issue