package com.yvan.logisticsModel; import com.yvan.logisticsEnv.EnvConfig; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.mqttv5.client.*; import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; @Slf4j public class FrontendPushService implements MqttCallback { private final LogisticsRuntime runtime; public final String rcsFrontendTopic; private volatile MqttClient clientForSend; private volatile MqttClient client; private CountDownLatch connectLatch = new CountDownLatch(1); public FrontendPushService(LogisticsRuntime runtime) { this.runtime = runtime; this.rcsFrontendTopic = "/rcs/" + runtime.projectUUID + "/" + runtime.envId + "/frontend"; } @Override public void disconnected(MqttDisconnectResponse disconnectResponse) { log.info("mqtt disconnected"); } @Override public void mqttErrorOccurred(MqttException exception) { log.error("mqtt error occurred"); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { log.info("Message arrived on topic {}: {}", topic, new String(message.getPayload(), StandardCharsets.UTF_8)); } @Override public void deliveryComplete(IMqttToken token) { log.info("Message delivery complete: {}", token); } @Override public void connectComplete(boolean reconnect, String serverURI) { // BannerUtils.printConfig(log, "MQTT 开启监听", new String[]{serverURI + " topic: /agv_robot/status"}); log.info("MQTT client connected to server: {}", serverURI); connectLatch.countDown(); // 放行 } @Override public void authPacketArrived(int reasonCode, MqttProperties properties) { } @SneakyThrows public void stop() { if (client != null && client.isConnected()) { client.disconnect(); client.close(); log.info("MQTT client disconnected and closed."); } else { log.warn("MQTT client is not connected, no action taken."); } if (clientForSend != null && clientForSend.isConnected()) { clientForSend.disconnect(); clientForSend.close(); log.info("MQTT clientForSend disconnected and closed."); } else { log.warn("MQTT clientForSend is not connected, no action taken."); } } /** * 创建一个表示服务器离线的最后消息 */ public MqttMessage createWillMessage() { MqttMessage lastMessage = new MqttMessage(); String msg = String.format(""" { "id": "serverOffline", "content": { "project": "%s", "env": %s, "status": "offline", "server": "%s", } } """, runtime.projectUUID, runtime.envId, runtime.clientId); lastMessage.setPayload(msg.getBytes(StandardCharsets.UTF_8)); lastMessage.setQos(0); lastMessage.setRetained(true); return lastMessage; } @SneakyThrows public void start(EnvConfig.MqttConfig mqttConfig) { String brokerUrl = mqttConfig.getBrokerUrl(); // "tcp://10.10.203.239:1885" String username = mqttConfig.getUsername(); // admin String password = mqttConfig.getPassword(); // admin clientForSend = new MqttClient(brokerUrl, runtime.clientId + "_front_send"); // String clientId = "LUOYIFAN-PC_send" client = new MqttClient(brokerUrl, runtime.clientId + "_front"); // String clientId = "LUOYIFAN-PC" MqttConnectionOptions options = new MqttConnectionOptions(); options.setServerURIs(new String[]{brokerUrl}); options.setAutomaticReconnect(true); options.setUserName(username); options.setPassword(password.getBytes()); options.setConnectionTimeout(1); options.setKeepAliveInterval(20); options.setExecutorServiceTimeout(1); // 最后一条消息,表示服务离线 options.setWill(this.rcsFrontendTopic, createWillMessage()); client.setCallback(this); client.connect(options); clientForSend.connect(options); connectLatch.await(); // 发送一条消息代表上线 this.publish(String.format(""" { "id": "serverOnline", "content": { "project": "%s", "env": %s, "status": "online", "server": "%s" } } """, runtime.projectUUID, runtime.envId, runtime.clientId)); } @SneakyThrows void publish(String payloadString) { MqttMessage message = new MqttMessage(payloadString.getBytes(StandardCharsets.UTF_8)); message.setQos(0); message.setRetained(false); if (this.clientForSend.isConnected()) { this.clientForSend.publish(this.rcsFrontendTopic, message); } else { throw new RuntimeException("MQTT client is not connected"); } } }