|
|
|
@ -1,84 +1,90 @@ |
|
|
|
package com.galaxis.rcs.ptr; |
|
|
|
|
|
|
|
import com.google.common.base.Splitter; |
|
|
|
import com.yvan.logisticsEnv.EnvPayload; |
|
|
|
import com.yvan.logisticsModel.LogisticsRuntime; |
|
|
|
import lombok.SneakyThrows; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.clever.core.BannerUtils; |
|
|
|
import org.clever.core.json.JsonWrapper; |
|
|
|
import org.eclipse.paho.mqttv5.client.*; |
|
|
|
import org.eclipse.paho.mqttv5.common.MqttException; |
|
|
|
import org.eclipse.paho.mqttv5.common.MqttMessage; |
|
|
|
import org.eclipse.paho.mqttv5.common.packet.MqttProperties; |
|
|
|
|
|
|
|
import java.net.InetAddress; |
|
|
|
import java.nio.charset.StandardCharsets; |
|
|
|
import java.util.concurrent.CountDownLatch; |
|
|
|
|
|
|
|
@Slf4j |
|
|
|
public class PtrMqttClient implements MqttCallback { |
|
|
|
public final AmrMessageHandler amrMessageHandler; |
|
|
|
public final EnvPayload.MqttConfig mqttConfig; |
|
|
|
private final MqttClient clientForSend; |
|
|
|
private final MqttClient client; |
|
|
|
private final String clientId; |
|
|
|
private CountDownLatch connectLatch = new CountDownLatch(1); |
|
|
|
|
|
|
|
|
|
|
|
@SneakyThrows |
|
|
|
public PtrMqttClient(AmrMessageHandler handler, EnvPayload.MqttConfig mqttConfig, String clientId) { |
|
|
|
this.amrMessageHandler = handler; |
|
|
|
this.mqttConfig = mqttConfig; |
|
|
|
this.clientId = clientId; |
|
|
|
|
|
|
|
String brokerUrl = mqttConfig.getBrokerUrl(); |
|
|
|
String username = mqttConfig.getUsername(); |
|
|
|
String password = mqttConfig.getPassword(); |
|
|
|
String brokerUrl = mqttConfig.getBrokerUrl(); // "tcp://10.10.203.239:1885"
|
|
|
|
String username = mqttConfig.getUsername(); // admin
|
|
|
|
String password = mqttConfig.getPassword(); // admin
|
|
|
|
|
|
|
|
client = new MqttClient(brokerUrl, clientId); |
|
|
|
clientForSend = new MqttClient(brokerUrl, clientId + "_send"); // String clientId = "LUOYIFAN-PC_send"
|
|
|
|
client = new MqttClient(brokerUrl, clientId); // String clientId = "LUOYIFAN-PC"
|
|
|
|
MqttConnectionOptions options = new MqttConnectionOptions(); |
|
|
|
options.setServerURIs(new String[]{brokerUrl}); |
|
|
|
options.setAutomaticReconnect(true); |
|
|
|
options.setUserName(username); |
|
|
|
options.setPassword(password.getBytes()); |
|
|
|
options.setConnectionTimeout(10); |
|
|
|
options.setConnectionTimeout(1); |
|
|
|
options.setKeepAliveInterval(20); |
|
|
|
options.setExecutorServiceTimeout(1); |
|
|
|
|
|
|
|
client.setCallback(this); |
|
|
|
|
|
|
|
client.connect(options); |
|
|
|
|
|
|
|
clientForSend.connect(options); |
|
|
|
connectLatch.await(); |
|
|
|
|
|
|
|
BannerUtils.printConfig(log, "MQTT 开启监听成功", new String[]{ |
|
|
|
"brokerUrl: " + brokerUrl, |
|
|
|
"userName: " + username, |
|
|
|
"clientId: " + clientId, |
|
|
|
"topic: /agv_robot/status"}); |
|
|
|
|
|
|
|
client.subscribe("/agv_robot/status", 0); |
|
|
|
} |
|
|
|
|
|
|
|
@SneakyThrows |
|
|
|
public void publish(String topic, String payload) { |
|
|
|
this.client.publish(topic, payload.getBytes(StandardCharsets.UTF_8), 0, false); |
|
|
|
public void publish(String topic, String payloadString) { |
|
|
|
MqttMessage message = new MqttMessage(payloadString.getBytes(StandardCharsets.UTF_8)); |
|
|
|
message.setQos(0); |
|
|
|
message.setRetained(false); |
|
|
|
|
|
|
|
if (this.clientForSend.isConnected()) { |
|
|
|
this.clientForSend.publish(topic, message); |
|
|
|
} else { |
|
|
|
throw new RuntimeException("MQTT client is not connected, cannot publish message."); |
|
|
|
} |
|
|
|
log.info("Message published to topic {}: finish", topic); |
|
|
|
} |
|
|
|
|
|
|
|
@SneakyThrows |
|
|
|
public static void main(String[] args) { |
|
|
|
String brokerUrl = "tcp://10.10.203.239:1885"; |
|
|
|
String clientId = "yvan-rcs-dev"; |
|
|
|
String topic = "test/topic"; |
|
|
|
String content = "Hello from Java"; |
|
|
|
String username = "admin"; |
|
|
|
String password = "admin"; |
|
|
|
int qos = 0; |
|
|
|
|
|
|
|
MqttClient client = new MqttClient(brokerUrl, clientId); |
|
|
|
MqttConnectionOptions options = new MqttConnectionOptions(); |
|
|
|
options.setServerURIs(new String[]{brokerUrl}); |
|
|
|
options.setAutomaticReconnect(true); |
|
|
|
options.setUserName(username); |
|
|
|
options.setPassword(password.getBytes()); |
|
|
|
options.setConnectionTimeout(10); |
|
|
|
options.setKeepAliveInterval(20); |
|
|
|
|
|
|
|
System.out.println("Connecting to broker..."); |
|
|
|
client.connect(options); |
|
|
|
System.out.println("Connected"); |
|
|
|
|
|
|
|
System.out.println("Publishing message..."); |
|
|
|
long start = System.currentTimeMillis(); |
|
|
|
client.publish(topic, content.getBytes(), qos, false); |
|
|
|
System.out.println("Published in " + (System.currentTimeMillis() - start) + " ms"); |
|
|
|
|
|
|
|
client.disconnect(); |
|
|
|
System.out.println("Disconnected"); |
|
|
|
var config = new EnvPayload.MqttConfig(); |
|
|
|
config.setBrokerUrl("tcp://10.10.203.239:1885"); |
|
|
|
config.setUsername("admin"); |
|
|
|
config.setPassword("admin"); |
|
|
|
|
|
|
|
PtrMqttClient mqttClient = new PtrMqttClient(null, config, "LUOYIFAN-PC"); |
|
|
|
for (int i = 0; i < 100; i++) { |
|
|
|
mqttClient.publish("/agv_robot/status", "Hello from Java " + i); |
|
|
|
} |
|
|
|
log.info("Message published successfully."); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
@ -96,7 +102,11 @@ public class PtrMqttClient implements MqttCallback { |
|
|
|
switch (topic) { |
|
|
|
case "/agv_robot/status": |
|
|
|
try { |
|
|
|
amrMessageHandler.handleAgvRobotStatusMessage(message); |
|
|
|
if (this.amrMessageHandler == null) { |
|
|
|
log.info("amrMessageHandler is null, skipping message handling {}", new String(message.getPayload(), StandardCharsets.UTF_8)); |
|
|
|
return; |
|
|
|
} |
|
|
|
this.amrMessageHandler.handleAgvRobotStatusMessage(message); |
|
|
|
|
|
|
|
} catch (Exception e) { |
|
|
|
log.error("amrMessageHandler.handleAgvRobotStatusMessage 异常", e); |
|
|
|
@ -109,11 +119,14 @@ public class PtrMqttClient implements MqttCallback { |
|
|
|
|
|
|
|
@Override |
|
|
|
public void deliveryComplete(IMqttToken token) { |
|
|
|
log.info("Message delivery complete: {}", token); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void connectComplete(boolean reconnect, String serverURI) { |
|
|
|
BannerUtils.printConfig(log, "MQTT 开启监听", new String[]{serverURI + " topic: /agv_robot/status"}); |
|
|
|
// BannerUtils.printConfig(log, "MQTT 开启监听", new String[]{serverURI + " topic: /agv_robot/status"});
|
|
|
|
log.info("MQTT client connected to server: {}", serverURI); |
|
|
|
connectLatch.countDown(); // 放行
|
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
|