package com.yvan.pusher; import com.yvan.entity.AgvStatusVo; import com.yvan.entity.BasLocationVo; import com.yvan.entity.InvUpdateVo; import com.yvan.entity.ServerStatusVo; import com.yvan.logisticsEnv.EnvConfig; import com.yvan.logisticsModel.LogisticsRuntime; import com.yvan.workbench.autoconfigure.LccConfigProperties; import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.clever.core.BannerUtils; import org.clever.core.mapper.JacksonMapper; import org.eclipse.paho.mqttv5.client.*; import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @Slf4j public class FrontendMessagePushService implements MqttCallback { public static final FrontendMessagePushService INSTANCE = new FrontendMessagePushService(); private volatile LccConfigProperties.FrontendMqtt mqttConfig; private volatile String clientId; private volatile MqttClient mqttClient; private final MemoryPersistence persistence = new MemoryPersistence(); private final Lock connectionLock = new ReentrantLock(); // QoS 配置 private static final int QOS = 1; // 重连参数 private static final int MAX_RETRIES = 5; private static final int RETRY_INTERVAL = 3; // 秒 // 连接状态 @Getter private volatile boolean connected = false; private FrontendMessagePushService() { } /** * 启动MQTT服务 */ @SneakyThrows public void start(LccConfigProperties.FrontendMqtt mqttConfig, String clientId) { this.mqttConfig = mqttConfig; this.clientId = clientId; connectionLock.lock(); try { if (connected) { log.warn("FRONTEND_MQTT service is already started"); return; } log.info("Starting FRONTEND_MQTT service for clientId: {}", this.clientId); // 创建MQTT客户端 mqttClient = new MqttClient(mqttConfig.getBrokerUrl(), clientId, persistence); mqttClient.setCallback(this); // 配置连接选项 MqttConnectionOptions options = new MqttConnectionOptions(); options.setServerURIs(new String[]{mqttConfig.getBrokerUrl()}); options.setUserName(mqttConfig.getUsername()); options.setPassword(mqttConfig.getPassword().getBytes()); options.setAutomaticReconnect(true); options.setConnectionTimeout(10); options.setKeepAliveInterval(60); options.setExecutorServiceTimeout(1); // 尝试连接 int attempts = 0; while (attempts < MAX_RETRIES && !connected) { attempts++; try { log.info("Connecting to FRONTEND_MQTT broker (attempt {}/{})", attempts, MAX_RETRIES); mqttClient.connect(options); connected = true; log.info("FRONTEND_MQTT connected successfully"); } catch (MqttException e) { log.error("FRONTEND_MQTT connection failed (attempt " + attempts + "/{" + MAX_RETRIES + "})", e); // 重试前等待 if (attempts < MAX_RETRIES) { try { TimeUnit.SECONDS.sleep(RETRY_INTERVAL); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); break; } } } } if (!connected) { log.error("Failed to connect to FRONTEND_MQTT broker after {} attempts", MAX_RETRIES); } } finally { connectionLock.unlock(); } } /** * 停止MQTT服务 */ public void stop() { connectionLock.lock(); try { if (mqttClient != null && mqttClient.isConnected()) { try { mqttClient.disconnect(); mqttClient.close(); log.info("FRONTEND_MQTT disconnected"); } catch (MqttException e) { log.error("Error disconnecting FRONTEND_MQTT", e); } } connected = false; } finally { connectionLock.unlock(); } } // ==================== 消息推送方法 ==================== /** * 推送服务器状态 * /lcc/{proj_id}/{env_id}/server * * @param runtime 运行时环境 * @param statusData 状态数据 */ public void pushServerState(LogisticsRuntime runtime, ServerStatusVo statusData) { String topic = buildTopic(runtime, "server"); publishJson(topic, statusData); } /** * 推送客户端状态 * /lcc/{proj_id}/{env_id}/client * * @param runtime 运行时环境 * @param clientData 客户端数据 */ public void pushClientState(LogisticsRuntime runtime, Map clientData) { String topic = buildTopic(runtime, "client"); publishJson(topic, clientData); } /** * 推送任务更新 * /lcc/{proj_id}/{env_id}/task * * @param runtime 运行时环境 * @param taskData 任务数据 */ public void pushTaskUpdate(LogisticsRuntime runtime, Object taskData) { String topic = buildTopic(runtime, "task"); publishJson(topic, taskData); } /** * 推送库存更新 * /lcc/{proj_id}/{env_id}/inv * * @param runtime 运行时环境 * @param lpn 容器号 * @param before 更新前库存 * @param after 更新后库存 * @param qty 更新数量 */ public void pushInvUpdate(LogisticsRuntime runtime, String lpn, BasLocationVo before, BasLocationVo after, int qty) { String topic = buildTopic(runtime, "inv"); publishJson(topic, new InvUpdateVo( lpn, before, after, qty )); } /** * 推送设备状态 * /lcc/{proj_id}/{env_id}/device/{id}/status * * @param runtime 运行时环境 * @param deviceId 设备ID * @param statusData 状态数据 */ public void pushDeviceStatus(LogisticsRuntime runtime, String deviceId, AgvStatusVo statusData) { String topic = buildTopic(runtime, "device/" + deviceId + "/status"); publishJson(topic, statusData); } /** * 推送设备存活状态 * * @param runtime 运行时环境 * @param deviceId 设备ID * @param type 设备类型 * @param online 是否在线 */ public void pushDeviceAlive(LogisticsRuntime runtime, String deviceId, String type, boolean online) { String topic = buildTopic(runtime, "device/" + deviceId + "/alive"); Map data = new HashMap<>(); data.put("id", deviceId); data.put("type", type); data.put("online", online); publishJson(topic, data); } /** * 推送日志 * /lcc/{proj_id}/{env_id}/log/{type} * * @param runtime 运行时环境 * @param logType 日志类型 * @param log 日志数据 */ public void pushLogs(LogisticsRuntime runtime, String logType, String log) { String topic = buildTopic(runtime, "log/" + logType); publish(topic, log); } /** * 推送告警 * /lcc/{proj_id}/{env_id}/alarm * * @param runtime 运行时环境 * @param alarmData 告警数据 */ public void pushAlarm(LogisticsRuntime runtime, Object alarmData) { String topic = buildTopic(runtime, "alarm"); publishJson(topic, alarmData); } /** * 推送脚本更新 * /lcc/{proj_id}/script * * @param runtime 运行时环境 * @param scriptData 脚本数据 */ public void pushScriptUpdate(LogisticsRuntime runtime, Object scriptData) { // 脚本系统没有环境ID String topic = "/lcc/" + runtime.projectUuid + "/script"; publishJson(topic, scriptData); } // ==================== 内部工具方法 ==================== /** * 构建主题路径 */ private String buildTopic(LogisticsRuntime runtime, String suffix) { return "/lcc/" + runtime.projectUuid + "/" + runtime.envId + "/" + suffix; } /** * 发布JSON数据 */ private void publishJson(String topic, Object data) { String json = JacksonMapper.getInstance().toJson(data); log.info("Publishing to topic: {}, data: {}", topic, json); publish(topic, json); } /** * 通用发布方法 */ private void publish(String topic, String payload) { if (!connected) { log.error("Attempted to publish while disconnected: {}", topic); return; } try { MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8)); message.setQos(QOS); message.setRetained(false); mqttClient.publish(topic, message); log.debug("Published to {}: {}", topic, payload); } catch (MqttException e) { log.error("Failed to publish to topic: " + topic, e); } } // ==================== MQTT 回调方法 ==================== @Override public void disconnected(MqttDisconnectResponse disconnectResponse) { log.warn("FRONTEND_MQTT disconnected: {}", disconnectResponse); connectionLock.lock(); try { connected = false; } finally { connectionLock.unlock(); } } @Override public void mqttErrorOccurred(MqttException exception) { log.error("FRONTEND_MQTT error occurred", exception); } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.debug("Message arrived on topic: {}", topic); // 作为服务端,我们只发布消息,不接收消息 // 可以记录或处理意外收到的消息 } @Override public void deliveryComplete(IMqttToken iMqttToken) { try { String[] topics = iMqttToken.getTopics(); if (topics != null && topics.length > 0) { log.info("Message delivery confirmed for topic: {}", topics[0]); } else { log.info("Message delivery confirmed (no topic info)"); } } catch (Exception e) { log.warn("Error getting delivery token info", e); } } @Override public void connectComplete(boolean reconnect, String serverURI) { BannerUtils.printConfig(log, "FRONTEND_MQTT 开启监听成功", new String[]{ "brokerUrl: " + serverURI, "userName: " + this.mqttConfig.getUsername(), "clientId: " + clientId}); connectionLock.lock(); try { connected = true; } finally { connectionLock.unlock(); } } @Override public void authPacketArrived(int i, MqttProperties mqttProperties) { log.info("FRONTEND_MQTT authPacketArrived({}, {})", i, mqttProperties); } }