|
|
@ -5,6 +5,7 @@ import com.yvan.logisticsModel.LogisticsRuntime; |
|
|
import lombok.Getter; |
|
|
import lombok.Getter; |
|
|
import lombok.SneakyThrows; |
|
|
import lombok.SneakyThrows; |
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
|
|
import org.clever.core.BannerUtils; |
|
|
import org.clever.core.mapper.JacksonMapper; |
|
|
import org.clever.core.mapper.JacksonMapper; |
|
|
import org.eclipse.paho.mqttv5.client.*; |
|
|
import org.eclipse.paho.mqttv5.client.*; |
|
|
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; |
|
|
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; |
|
|
@ -54,11 +55,11 @@ public class LccMqttService implements MqttCallback { |
|
|
connectionLock.lock(); |
|
|
connectionLock.lock(); |
|
|
try { |
|
|
try { |
|
|
if (connected) { |
|
|
if (connected) { |
|
|
log.warn("MQTT service is already started"); |
|
|
log.warn("LCC_MQTT service is already started"); |
|
|
return; |
|
|
return; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
log.info("Starting MQTT service for project: {}, env: {}", this.runtime.projectUuid, this.runtime.envId); |
|
|
log.info("Starting LCC_MQTT service for project: {}, env: {}", this.runtime.projectUuid, this.runtime.envId); |
|
|
|
|
|
|
|
|
// 创建MQTT客户端
|
|
|
// 创建MQTT客户端
|
|
|
mqttClient = new MqttClient(mqttConfig.getBrokerUrl(), clientId, persistence); |
|
|
mqttClient = new MqttClient(mqttConfig.getBrokerUrl(), clientId, persistence); |
|
|
@ -79,12 +80,12 @@ public class LccMqttService implements MqttCallback { |
|
|
while (attempts < MAX_RETRIES && !connected) { |
|
|
while (attempts < MAX_RETRIES && !connected) { |
|
|
attempts++; |
|
|
attempts++; |
|
|
try { |
|
|
try { |
|
|
log.info("Connecting to MQTT broker (attempt {}/{})", attempts, MAX_RETRIES); |
|
|
log.info("Connecting to LCC_MQTT broker (attempt {}/{})", attempts, MAX_RETRIES); |
|
|
mqttClient.connect(options); |
|
|
mqttClient.connect(options); |
|
|
connected = true; |
|
|
connected = true; |
|
|
log.info("MQTT connected successfully"); |
|
|
log.info("LCC_MQTT connected successfully"); |
|
|
} catch (MqttException e) { |
|
|
} catch (MqttException e) { |
|
|
log.error("MQTT connection failed (attempt " + attempts + "/{" + MAX_RETRIES + "})", e); |
|
|
log.error("LCC_MQTT connection failed (attempt " + attempts + "/{" + MAX_RETRIES + "})", e); |
|
|
|
|
|
|
|
|
// 重试前等待
|
|
|
// 重试前等待
|
|
|
if (attempts < MAX_RETRIES) { |
|
|
if (attempts < MAX_RETRIES) { |
|
|
@ -99,7 +100,7 @@ public class LccMqttService implements MqttCallback { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if (!connected) { |
|
|
if (!connected) { |
|
|
log.error("Failed to connect to MQTT broker after {} attempts", MAX_RETRIES); |
|
|
log.error("Failed to connect to LCC_MQTT broker after {} attempts", MAX_RETRIES); |
|
|
} |
|
|
} |
|
|
} finally { |
|
|
} finally { |
|
|
connectionLock.unlock(); |
|
|
connectionLock.unlock(); |
|
|
@ -116,9 +117,9 @@ public class LccMqttService implements MqttCallback { |
|
|
try { |
|
|
try { |
|
|
mqttClient.disconnect(); |
|
|
mqttClient.disconnect(); |
|
|
mqttClient.close(); |
|
|
mqttClient.close(); |
|
|
log.info("MQTT disconnected"); |
|
|
log.info("LCC_MQTT disconnected"); |
|
|
} catch (MqttException e) { |
|
|
} catch (MqttException e) { |
|
|
log.error("Error disconnecting MQTT", e); |
|
|
log.error("Error disconnecting LCC_MQTT", e); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
connected = false; |
|
|
connected = false; |
|
|
@ -292,7 +293,7 @@ public class LccMqttService implements MqttCallback { |
|
|
|
|
|
|
|
|
@Override |
|
|
@Override |
|
|
public void disconnected(MqttDisconnectResponse disconnectResponse) { |
|
|
public void disconnected(MqttDisconnectResponse disconnectResponse) { |
|
|
log.warn("MQTT disconnected: {}", disconnectResponse); |
|
|
log.warn("LCC_MQTT disconnected: {}", disconnectResponse); |
|
|
connectionLock.lock(); |
|
|
connectionLock.lock(); |
|
|
try { |
|
|
try { |
|
|
connected = false; |
|
|
connected = false; |
|
|
@ -303,7 +304,7 @@ public class LccMqttService implements MqttCallback { |
|
|
|
|
|
|
|
|
@Override |
|
|
@Override |
|
|
public void mqttErrorOccurred(MqttException exception) { |
|
|
public void mqttErrorOccurred(MqttException exception) { |
|
|
log.error("MQTT error occurred", exception); |
|
|
log.error("LCC_MQTT error occurred", exception); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@Override |
|
|
@Override |
|
|
@ -329,7 +330,10 @@ public class LccMqttService implements MqttCallback { |
|
|
|
|
|
|
|
|
@Override |
|
|
@Override |
|
|
public void connectComplete(boolean reconnect, String serverURI) { |
|
|
public void connectComplete(boolean reconnect, String serverURI) { |
|
|
log.info("MQTT connection complete: reconnect={}, serverURI={}", reconnect, serverURI); |
|
|
BannerUtils.printConfig(log, "LCC_MQTT 开启监听成功", new String[]{ |
|
|
|
|
|
"brokerUrl: " + serverURI, |
|
|
|
|
|
"userName: " + this.mqttConfig.getUsername(), |
|
|
|
|
|
"clientId: " + clientId}); |
|
|
connectionLock.lock(); |
|
|
connectionLock.lock(); |
|
|
try { |
|
|
try { |
|
|
connected = true; |
|
|
connected = true; |
|
|
@ -340,6 +344,6 @@ public class LccMqttService implements MqttCallback { |
|
|
|
|
|
|
|
|
@Override |
|
|
@Override |
|
|
public void authPacketArrived(int i, MqttProperties mqttProperties) { |
|
|
public void authPacketArrived(int i, MqttProperties mqttProperties) { |
|
|
log.info("MQTT authPacketArrived({}, {})", i, mqttProperties); |
|
|
log.info("LCC_MQTT authPacketArrived({}, {})", i, mqttProperties); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|