Browse Source

client-id 问题

master
修宁 6 months ago
parent
commit
e66ff8a4c5
  1. 4
      servo/src/main/java/com/galaxis/rcs/RCSService.java
  2. 8
      servo/src/main/java/com/galaxis/rcs/ptr/AmrMessageHandler.java
  3. 60
      servo/src/main/java/com/galaxis/rcs/ptr/PtrMqttClient.java
  4. 1
      servo/src/main/java/com/yvan/logisticsEnv/EnvPayload.java
  5. 2
      servo/src/main/java/com/yvan/logisticsEnv/EnvStartParam.java
  6. 2
      servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java

4
servo/src/main/java/com/galaxis/rcs/RCSService.java

@ -28,6 +28,8 @@ import org.clever.data.jdbc.DaoFactory;
import org.clever.data.jdbc.QueryDSL; import org.clever.data.jdbc.QueryDSL;
import org.clever.data.jdbc.querydsl.utils.QueryDslUtils; import org.clever.data.jdbc.querydsl.utils.QueryDslUtils;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.util.Map; import java.util.Map;
import static com.galaxis.rcs.common.query.QLccEnvInfo.lccEnvInfo; import static com.galaxis.rcs.common.query.QLccEnvInfo.lccEnvInfo;
@ -52,6 +54,7 @@ public class RCSService {
* 加载所有自动启动的环境和楼层数据 * 加载所有自动启动的环境和楼层数据
* 并启动对应的环境 * 并启动对应的环境
*/ */
@SneakyThrows
private void createAutoStartEnv() { private void createAutoStartEnv() {
QueryDSL queryDSL = DaoFactory.getQueryDSL(); QueryDSL queryDSL = DaoFactory.getQueryDSL();
var list = queryDSL.select(QueryDslUtils.linkedMap( var list = queryDSL.select(QueryDslUtils.linkedMap(
@ -98,6 +101,7 @@ public class RCSService {
param.setTimeRate(1); param.setTimeRate(1);
param.setVirtual(false); param.setVirtual(false);
param.setEnvPayload(envPayload); param.setEnvPayload(envPayload);
param.setClientId(InetAddress.getLocalHost().getHostName());
runtime.start(param); runtime.start(param);
} }

8
servo/src/main/java/com/galaxis/rcs/ptr/AmrMessageHandler.java

@ -68,7 +68,7 @@ public class AmrMessageHandler {
private static final Redis redis = AppContextHolder.getBean("defaultRedis", Redis.class, true); private static final Redis redis = AppContextHolder.getBean("defaultRedis", Redis.class, true);
public final LogisticsRuntime runtime; public final LogisticsRuntime runtime;
public volatile PtrMqttClient ptrMqttClient; private volatile PtrMqttClient ptrMqttClient;
public int getNewSeqNo() { public int getNewSeqNo() {
String redisKey = "lcc:rcs:" + runtime.projectUUID + ":" + runtime.envId + ":seqNo"; String redisKey = "lcc:rcs:" + runtime.projectUUID + ":" + runtime.envId + ":seqNo";
@ -84,8 +84,8 @@ public class AmrMessageHandler {
this.runtime = runtime; this.runtime = runtime;
} }
public void start(EnvPayload.MqttConfig mqttConfig) { public void start(EnvPayload.MqttConfig mqttConfig, String clientId) {
this.ptrMqttClient = new PtrMqttClient(this, mqttConfig); this.ptrMqttClient = new PtrMqttClient(this, mqttConfig, clientId);
} }
public void stop() { public void stop() {
@ -209,7 +209,7 @@ public class AmrMessageHandler {
BannerUtils.printConfig(log, "MQTT 发送报文 [" + id + "] RcsMessageType." + RcsMessageType.fromValue(id) + " - " + RcsMessageType.fromValue(id).description, ar); BannerUtils.printConfig(log, "MQTT 发送报文 [" + id + "] RcsMessageType." + RcsMessageType.fromValue(id) + " - " + RcsMessageType.fromValue(id).description, ar);
} }
ptrMqttClient.client.publish(topic, payload.getBytes(StandardCharsets.UTF_8), 0, false); ptrMqttClient.publish(topic, payload);
} }
/** /**

60
servo/src/main/java/com/galaxis/rcs/ptr/PtrMqttClient.java

@ -12,14 +12,16 @@ import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties; import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import java.nio.charset.StandardCharsets;
@Slf4j @Slf4j
public class PtrMqttClient implements MqttCallback { public class PtrMqttClient implements MqttCallback {
public final AmrMessageHandler amrMessageHandler; public final AmrMessageHandler amrMessageHandler;
public final EnvPayload.MqttConfig mqttConfig; public final EnvPayload.MqttConfig mqttConfig;
public final MqttClient client; private final MqttClient client;
@SneakyThrows @SneakyThrows
public PtrMqttClient(AmrMessageHandler handler, EnvPayload.MqttConfig mqttConfig) { public PtrMqttClient(AmrMessageHandler handler, EnvPayload.MqttConfig mqttConfig, String clientId) {
this.amrMessageHandler = handler; this.amrMessageHandler = handler;
this.mqttConfig = mqttConfig; this.mqttConfig = mqttConfig;
@ -27,17 +29,56 @@ public class PtrMqttClient implements MqttCallback {
String username = mqttConfig.getUsername(); String username = mqttConfig.getUsername();
String password = mqttConfig.getPassword(); String password = mqttConfig.getPassword();
client = new MqttClient(brokerUrl, mqttConfig.getClientId()); client = new MqttClient(brokerUrl, clientId);
MqttConnectionOptions options = new MqttConnectionOptions();
options.setServerURIs(new String[]{brokerUrl});
options.setAutomaticReconnect(true);
options.setUserName(username);
options.setPassword(password.getBytes());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
client.setCallback(this); client.setCallback(this);
client.connect(options);
client.subscribe("/agv_robot/status", 0);
}
@SneakyThrows
public void publish(String topic, String payload) {
this.client.publish(topic, payload.getBytes(StandardCharsets.UTF_8), 0, false);
}
@SneakyThrows
public static void main(String[] args) {
String brokerUrl = "tcp://10.10.203.239:1885";
String clientId = "yvan-rcs-dev";
String topic = "test/topic";
String content = "Hello from Java";
String username = "admin";
String password = "admin";
int qos = 0;
MqttClient client = new MqttClient(brokerUrl, clientId);
MqttConnectionOptions options = new MqttConnectionOptions(); MqttConnectionOptions options = new MqttConnectionOptions();
options.setServerURIs(new String[]{brokerUrl}); options.setServerURIs(new String[]{brokerUrl});
options.setAutomaticReconnect(true); // 启用自动重连 options.setAutomaticReconnect(true);
options.setUserName(username); options.setUserName(username);
options.setPassword(password.getBytes()); options.setPassword(password.getBytes());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
System.out.println("Connecting to broker...");
client.connect(options); client.connect(options);
client.subscribe("/agv_robot/status", 0); System.out.println("Connected");
System.out.println("Publishing message...");
long start = System.currentTimeMillis();
client.publish(topic, content.getBytes(), qos, false);
System.out.println("Published in " + (System.currentTimeMillis() - start) + " ms");
client.disconnect();
System.out.println("Disconnected");
} }
@Override @Override
@ -81,10 +122,13 @@ public class PtrMqttClient implements MqttCallback {
@SneakyThrows @SneakyThrows
public void stop() { public void stop() {
if (client.isConnected()) { if (client != null && client.isConnected()) {
client.disconnect(); client.disconnect();
}
client.close(); client.close();
log.info("MQTT client stopped"); log.info("MQTT client disconnected and closed.");
} else {
log.warn("MQTT client is not connected, no action taken.");
} }
}
} }

1
servo/src/main/java/com/yvan/logisticsEnv/EnvPayload.java

@ -12,7 +12,6 @@ public class EnvPayload implements Serializable {
@Data @Data
public static class MqttConfig { public static class MqttConfig {
private String brokerUrl; private String brokerUrl;
private String clientId;
private String username; private String username;
private String password; private String password;
} }

2
servo/src/main/java/com/yvan/logisticsEnv/EnvStartParam.java

@ -10,6 +10,8 @@ import lombok.Setter;
@Setter @Setter
public class EnvStartParam { public class EnvStartParam {
public String clientId;
/** /**
* 是否虚拟仿真环境 * 是否虚拟仿真环境
*/ */

2
servo/src/main/java/com/yvan/logisticsModel/LogisticsRuntime.java

@ -176,7 +176,7 @@ public class LogisticsRuntime {
this.logisticsEnv.start(param); this.logisticsEnv.start(param);
// 启动 MQTT 监听 // 启动 MQTT 监听
this.amrMessageHandler.start(param.getEnvPayload().getMqtt()); this.amrMessageHandler.start(param.getEnvPayload().getMqtt(), param.clientId);
// 开启所有机器人的任务处理 // 开启所有机器人的任务处理
Set<String> executorTypes = Sets.newHashSet(); Set<String> executorTypes = Sets.newHashSet();

Loading…
Cancel
Save