package com.galaxis.rcs.ptr; import com.yvan.logisticsEnv.EnvPayload; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.clever.core.BannerUtils; import org.eclipse.paho.mqttv5.client.*; import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; import java.net.InetAddress; import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; @Slf4j public class PtrMqttClient implements MqttCallback { public final AmrMessageHandler amrMessageHandler; public final EnvPayload.MqttConfig mqttConfig; private final MqttClient client; private CountDownLatch connectLatch = new CountDownLatch(1); @SneakyThrows public PtrMqttClient(AmrMessageHandler handler, EnvPayload.MqttConfig mqttConfig, String clientId) { this.amrMessageHandler = handler; this.mqttConfig = mqttConfig; String brokerUrl = mqttConfig.getBrokerUrl(); // "tcp://10.10.203.239:1885" String username = mqttConfig.getUsername(); // admin String password = mqttConfig.getPassword(); // admin client = new MqttClient(brokerUrl, clientId); // String clientId = "LUOYIFAN-PC" MqttConnectionOptions options = new MqttConnectionOptions(); options.setServerURIs(new String[]{brokerUrl}); options.setAutomaticReconnect(true); options.setUserName(username); options.setPassword(password.getBytes()); options.setConnectionTimeout(10); options.setKeepAliveInterval(20); client.setCallback(this); BannerUtils.printConfig(log, "MQTT 开启监听", new String[]{ "brokerUrl: " + brokerUrl, "userName: " + username, "clientId: " + clientId, "topic: /agv_robot/status"}); client.connect(options); connectLatch.await(); client.subscribe("/agv_robot/status", 0); } @SneakyThrows public void publish(String topic, String payload) { // 这个一发送就阻塞 this.client.publish(topic, payload.getBytes(StandardCharsets.UTF_8), 0, false); } @SneakyThrows public static void main(String[] args) { // 这个测试是好的 String brokerUrl = "tcp://10.10.203.239:1885"; String clientId = InetAddress.getLocalHost().getHostName(); // LUOYIFAN-PC String topic = "/agv_robot/status"; String content = "Hello from Java"; String username = "admin"; String password = "admin"; int qos = 0; MqttClient client = new MqttClient(brokerUrl, clientId); MqttConnectionOptions options = new MqttConnectionOptions(); options.setServerURIs(new String[]{brokerUrl}); options.setAutomaticReconnect(true); options.setUserName(username); options.setPassword(password.getBytes()); options.setConnectionTimeout(10); options.setKeepAliveInterval(20); System.out.println("Connecting to broker..."); client.connect(options); System.out.println("Connected"); System.out.println("Publishing message..."); long start = System.currentTimeMillis(); client.publish(topic, content.getBytes(), qos, false); // 这个发送是好的 System.out.println("Published in " + (System.currentTimeMillis() - start) + " ms"); client.disconnect(); System.out.println("Disconnected"); } @Override public void disconnected(MqttDisconnectResponse disconnectResponse) { log.info("mqtt disconnected"); } @Override public void mqttErrorOccurred(MqttException exception) { log.error("mqtt error occurred"); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { switch (topic) { case "/agv_robot/status": try { amrMessageHandler.handleAgvRobotStatusMessage(message); } catch (Exception e) { log.error("amrMessageHandler.handleAgvRobotStatusMessage 异常", e); } break; default: break; } } @Override public void deliveryComplete(IMqttToken token) { } @Override public void connectComplete(boolean reconnect, String serverURI) { // BannerUtils.printConfig(log, "MQTT 开启监听", new String[]{serverURI + " topic: /agv_robot/status"}); log.info("MQTT client connected to server: {}", serverURI); connectLatch.countDown(); // 放行 } @Override public void authPacketArrived(int reasonCode, MqttProperties properties) { } @SneakyThrows public void stop() { if (client != null && client.isConnected()) { client.disconnect(); client.close(); log.info("MQTT client disconnected and closed."); } else { log.warn("MQTT client is not connected, no action taken."); } } }