package com.galaxis.rcs.ptr; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.galaxis.rcs.ptr.receiveEntity.*; import com.galaxis.rcs.ptr.receiveEntity.base.*; import com.galaxis.rcs.ptr.sendEntity.*; import com.google.common.base.Splitter; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.yvan.logisticsEnv.EnvPayload; import com.yvan.logisticsModel.LogisticsRuntime; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.clever.core.BannerUtils; import org.clever.core.json.JsonWrapper; import org.clever.core.mapper.BeanCopyUtils; import org.clever.data.redis.Redis; import org.clever.data.redis.RedisAdmin; import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttMessage; import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @Slf4j public class AmrMessageHandler { private static final TypeReference> typeRef20010Message = new TypeReference>() { }; private static final TypeReference>> typeRef20011_defaultMessage = new TypeReference>>() { }; private static final TypeReference>> typeRef20011_1Message = new TypeReference>>() { }; private static final TypeReference>> typeRef20011_4Message = new TypeReference>>() { }; private static final TypeReference>> typeRef20011_8Message = new TypeReference>>() { }; private static final TypeReference> typeRef20012Message = new TypeReference>() { }; private static final TypeReference> typeRef20020Message = new TypeReference>() { }; private static final TypeReference> typeRef20050Message = new TypeReference>() { }; private static final TypeReference> typeRef20060Message = new TypeReference>() { }; private static final TypeReference> typeRef20100Message = new TypeReference>() { }; private static final TypeReference> typeRef20147Message = new TypeReference>() { }; private static final TypeReference> typeRef20148Message = new TypeReference>() { }; private static final TypeReference> typeRef20149Message = new TypeReference>() { }; private static final TypeReference> typeRef20150Message = new TypeReference>() { }; private static final TypeReference> typeRef20200Message = new TypeReference>() { }; private static final TypeReference> typeRef20250Message = new TypeReference>() { }; private static final Redis redis = RedisAdmin.getRedis(); public final LogisticsRuntime runtime; private volatile PtrMqttClient ptrMqttClient; public int getNewSeqNo() { String redisKey = "lcc:rcs:" + runtime.projectUUID + ":" + runtime.envId + ":seqNo"; long seqNo = redis.vIncrement(redisKey); if (seqNo > Integer.MAX_VALUE) { redis.kDelete(redisKey); seqNo = redis.vIncrement(redisKey).intValue(); } return (int) seqNo; } public AmrMessageHandler(LogisticsRuntime runtime) { this.runtime = runtime; } public void start(EnvPayload.MqttConfig mqttConfig, String clientId) { this.ptrMqttClient = new PtrMqttClient(this, mqttConfig, clientId); } public void stop() { if (ptrMqttClient != null) { ptrMqttClient.stop(); } } public void handleAgvRobotStatusMessage(MqttMessage message) throws MqttException, JsonProcessingException { byte[] messageData = message.getPayload(); String json = new String(messageData, StandardCharsets.UTF_8); JsonWrapper jw = new JsonWrapper(json); int id = jw.asInt("id"); String agvId = jw.asInt("content", "VehicleId") + ""; int seqNo = jw.asInt("content", "SeqNo"); PtrAgvItem agvItem = getPtrAgvItem(agvId); if (agvItem == null) { return; } // 计算网络延迟 long receiveTime = System.currentTimeMillis(); Long sentTime = lastMessageTimeMap.get(agvId); if (sentTime != null) { long netDelay = receiveTime - sentTime; updateRedisNetDelay(agvId, netDelay); } /* * 消息标识 * 小车作业完成 20010 AmrTaskCompletedMessage * 任务状态上报 20011 AmrTaskStatusMessage * 小车子模块任务状态 20012 AmrModuleTaskStatusMessage * 地标报告 20020 AmrLandmarkMessage * 消息应答 20050 AmrAckMessage * 状态上报 20060 AmrStatusMessage * 心跳 20100 AmrHeartbeatMessage * 开机上报 20147 AmrBootMessage * 关机上报 20148 AmrShutdownMessage * 小车主程序启动 20149 AmrAppStartMessage * 小车上线 20150 AmrOnlineMessage * 小车离线 20200 AmrOfflineMessage * 异常上报 20250 AmrExceptionMessage */ if (id != ArmMessageType.AMR_ACK.value && id != ArmMessageType.AMR_HEARTBEAT.value) { var list = Splitter.on("\n").splitToList(jw.toString()); String[] ar = new String[list.size()]; list.toArray(ar); // log.info("3-Received message: " + json); BannerUtils.printConfig(log, "MQTT 收到报文 [" + id + "] ArmMessageType." + ArmMessageType.fromValue(id) + " - " + ArmMessageType.fromValue(id).description, ar); } AmrMessage amrMessage = null; switch (ArmMessageType.fromValue(id)) { case AMR_TASK_COMPLETED: AmrMessage taskCompletedMessage = JacksonUtils.parse(json, typeRef20010Message); handleTaskCompletedMessage(agvItem, taskCompletedMessage.content); break; case AMR_TASK_STATUS: handleTaskStatusMessage(agvItem, jw, json); break; case AMR_MODULE_TASK_STATUS: amrMessage = JacksonUtils.parse(json, typeRef20012Message); break; case AMR_LANDMARK: AmrMessage landmarkMessage = JacksonUtils.parse(json, typeRef20020Message); handleLandmarkMessage(agvItem, landmarkMessage.content); break; case AMR_ACK: amrMessage = JacksonUtils.parse(json, typeRef20050Message); break; case AMR_STATUS: AmrMessage statusMessage = JacksonUtils.parse(json, typeRef20060Message); handleStatusMessage(agvItem, statusMessage.content); break; case AMR_HEARTBEAT: AmrMessage heartbeatMessage = JacksonUtils.parse(json, typeRef20100Message); // sendCmdHeartBeat(heartbeatMessage.content.VehicleId + ""); handleHeartbeatMessage(agvItem, heartbeatMessage.content); break; case AMR_BOOT: amrMessage = JacksonUtils.parse(json, typeRef20147Message); break; case AMR_SHUTDOWN: amrMessage = JacksonUtils.parse(json, typeRef20148Message); break; case AMR_APP_START: amrMessage = JacksonUtils.parse(json, typeRef20149Message); break; case AMR_ONLINE: // amrMessage = JacksonUtils.parse(json, typeRef20150Message); this.sendCmdConfig(agvId, agvItem.getConfig()); break; case AMR_OFFLINE: amrMessage = JacksonUtils.parse(json, typeRef20200Message); break; case AMR_EXCEPTION: amrMessage = JacksonUtils.parse(json, typeRef20250Message); break; default: // log.error("未知消息:{}", json); break; } if (id != ArmMessageType.AMR_ACK.value && id != ArmMessageType.AMR_HEARTBEAT.value) { sendCmdAck(agvId, seqNo); } } public PtrAgvItem getPtrAgvItem(String vehicleId) { var executorItem = runtime.executorItemMap.get(vehicleId); return (PtrAgvItem) executorItem; } @SneakyThrows private void publish(String topic, String payload, int id) { if (id != RcsMessageType.RCS_ACK.value && id != RcsMessageType.RCS_HEARTBEAT.value) { var list = Splitter.on("\n").splitToList(new JsonWrapper(payload).toString()); String[] ar = new String[list.size()]; list.toArray(ar); BannerUtils.printConfig(log, "MQTT 发送报文 [" + id + "] RcsMessageType." + RcsMessageType.fromValue(id) + " - " + RcsMessageType.fromValue(id).description, ar); } ptrMqttClient.publish(topic, payload); } /** * 发送任务 * * @param vehicleId 设备id * @param rcsTaskMessage 任务信息 * @throws JsonProcessingException * @throws MqttException */ public void sendCmdTask(String vehicleId, RcsTaskMessage rcsTaskMessage) throws JsonProcessingException, MqttException { // 记录发送时间 lastMessageTimeMap.put(vehicleId, System.currentTimeMillis()); BaseMessage baseMessage = new BaseMessage(); baseMessage.id = RcsMessageType.RCS_TASK.value; baseMessage.content = rcsTaskMessage; String json = JacksonUtils.toJson(baseMessage); // log.info("sendCmd10010: {}", json); // log.debug("Sending task to {}: {}", vehicleId, json); publish("/wcs_server/" + vehicleId, json, baseMessage.id); } /** * 发送SR消息 * * @param vehicleId 设备id * @param rcsSRMessage SR消息 * @throws JsonProcessingException * @throws MqttException */ public void sendCmdSR(String vehicleId, RcsSRMessage rcsSRMessage) throws JsonProcessingException, MqttException { BaseMessage baseMessage = new BaseMessage(); baseMessage.id = RcsMessageType.RCS_SR.value; baseMessage.content = rcsSRMessage; String json = JacksonUtils.toJson(baseMessage); publish("/wcs_server/" + vehicleId, json, baseMessage.id); } /** * 发送应答消息 * * @param vehicleId 设备id * @param seqNo 被应答的消息序号 * @throws JsonProcessingException * @throws MqttException */ public void sendCmdAck(String vehicleId, int seqNo) throws JsonProcessingException, MqttException { BaseMessage baseMessage = new BaseMessage(); RcsAckMessage rcsAckMsg = new RcsAckMessage(); rcsAckMsg.SeqNo = seqNo; baseMessage.id = RcsMessageType.RCS_ACK.value; baseMessage.content = rcsAckMsg; String json = JacksonUtils.toJson(baseMessage); publish("/wcs_server/" + vehicleId, json, baseMessage.id); } /** * 发送旋转货架消息 * * @param vehicleId 设备id * @param rcsRotateRackMessage 旋转货架消息 * @throws JsonProcessingException * @throws MqttException */ public void sendCmdRotateRack(String vehicleId, RcsRotateRackMessage rcsRotateRackMessage) throws JsonProcessingException, MqttException { BaseMessage baseMessage = new BaseMessage(); baseMessage.id = RcsMessageType.RCS_ROTATE_RACK.value; baseMessage.content = rcsRotateRackMessage; String json = JacksonUtils.toJson(baseMessage); publish("/wcs_server/" + vehicleId, json, baseMessage.id); } /** * 发送旋转车身消息 * * @param vehicleId 设备id * @param rcsRotateBodyMessage 旋转车身消息 * @throws JsonProcessingException * @throws MqttException */ public void sendCmdRotateBody(String vehicleId, RcsRotateBodyMessage rcsRotateBodyMessage) throws JsonProcessingException, MqttException { BaseMessage baseMessage = new BaseMessage(); baseMessage.id = RcsMessageType.RCS_ROTATE_BODY.value; baseMessage.content = rcsRotateBodyMessage; String json = JacksonUtils.toJson(baseMessage); publish("/wcs_server/" + vehicleId, json, baseMessage.id); } // 发送控制卷帘门消息 10082 public void sendCmdControlDoor(String vehicleId, RcsControlDoorMessage rcsControlDoorMessage) throws MqttException, JsonProcessingException { BaseMessage baseMessage = new BaseMessage(); baseMessage.id = RcsMessageType.RCS_CONTROL_DOOR.value; baseMessage.content = rcsControlDoorMessage; String json = JacksonUtils.toJson(baseMessage); publish("/wcs_server/" + vehicleId, json, baseMessage.id); } /** * 发送配置信息 * * @param vehicleId 设备id * @param rcsConfigMessage 配置信息 * @throws MqttException * @throws JsonProcessingException */ public void sendCmdConfig(String vehicleId, RcsConfigMessage rcsConfigMessage) throws MqttException, JsonProcessingException { AmrMessage baseMessage = new AmrMessage<>(); baseMessage.id = RcsMessageType.RCS_CONFIG.value; baseMessage.content = rcsConfigMessage; String json = JacksonUtils.toJson(baseMessage); publish("/wcs_server/" + vehicleId, json, baseMessage.id); } /** * 发送心跳 * * @param vehicleId 设备id * @throws JsonProcessingException * @throws MqttException */ public void sendCmdHeartBeat(String vehicleId) throws JsonProcessingException, MqttException { BaseMessage baseMessage = new BaseMessage(); baseMessage.id = RcsMessageType.RCS_HEARTBEAT.value; baseMessage.content = new RcsHeartBeatMessage(); String json = JacksonUtils.toJson(baseMessage); publish("/wcs_server/" + vehicleId, json, baseMessage.id); } /** * 发送状态查询消息 * * @param vehicleId 设备id * @throws MqttException * @throws JsonProcessingException */ public void sendCmdQueryStatus(String vehicleId) throws MqttException, JsonProcessingException { BaseMessage baseMessage = new BaseMessage(); baseMessage.id = RcsMessageType.RCS_QUERY_STATUS.value; RcsQueryStatusMessage rcsQueryStatusMessage = new RcsQueryStatusMessage(this.runtime); rcsQueryStatusMessage.SeqNo = getNewSeqNo(); baseMessage.content = rcsQueryStatusMessage; String json = JacksonUtils.toJson(baseMessage); publish("/wcs_server/" + vehicleId, json, baseMessage.id); log.info("发送查询设备状态消息: {}", json); } /** * 发送取消任务消息 * * @param vehicleId 设备id * @param seqNo 待取消的任务序号 * @throws JsonProcessingException * @throws MqttException */ public void sendCmdCancelTask(String vehicleId, int seqNo) throws JsonProcessingException, MqttException { BaseMessage baseMessage = new BaseMessage(); baseMessage.id = RcsMessageType.RCS_CANCEL_TASK.value; baseMessage.content = new RcsCancelTaskMessage(seqNo); String json = JacksonUtils.toJson(baseMessage); publish("/wcs_server/" + vehicleId, json, baseMessage.id); } /** * 发送设置小车坐标消息 * * @param vehicleId 设备id * @param rcsSetLocationMessage 位置信息 * @throws JsonProcessingException * @throws MqttException */ public void sendCmdSetLocation(String vehicleId, RcsSetLocationMessage rcsSetLocationMessage) throws JsonProcessingException, MqttException { BaseMessage baseMessage = new BaseMessage(); baseMessage.id = RcsMessageType.RCS_SET_LOCATION.value; baseMessage.content = rcsSetLocationMessage; String json = JacksonUtils.toJson(baseMessage); publish("/wcs_server/" + vehicleId, json, baseMessage.id); } /** * 发送等待就绪消息 * * @param vehicleId 设备id * @param rcsWaitMessage 等待消息 * @throws JsonProcessingException * @throws MqttException */ public void sendCmdWait(String vehicleId, RcsWaitMessage rcsWaitMessage) throws JsonProcessingException, MqttException { BaseMessage baseMessage = new BaseMessage(); baseMessage.id = RcsMessageType.RCS_WAIT.value; baseMessage.content = rcsWaitMessage; String json = JacksonUtils.toJson(baseMessage); publish("/wcs_server/" + vehicleId, json, baseMessage.id); } /////////////=============================== 新加入的代码 =========================================== private final Map lastMessageTimeMap = Maps.newConcurrentMap(); private final ScheduledExecutorService delayCalculator = Executors.newScheduledThreadPool(4); private void handleHeartbeatMessage(PtrAgvItem agvItem, AmrHeartbeatMessage message) { agvItem.handleHeartbeat(message); // 计算延迟 long now = System.currentTimeMillis(); long netDelay = now - message.SendTime; updateRedisNetDelay(agvItem.getId(), netDelay); } private void handleTaskCompletedMessage(PtrAgvItem agvItem, AmrTaskCompletedMessage message) { agvItem.taskCompleted(message.CurX, message.CurY, message.CurDirection, 4); } private void handleLandmarkMessage(PtrAgvItem agvItem, AmrLandmarkMessage message) { // 这是源逻辑,CurLogicX / CurLogicY / CurDirection 需要到 PtrAgvItem 中更新, 因为要触发事件 agvItem.x = message.X; agvItem.y = message.Y; agvItem.orientation = message.CurOrientation; agvItem.updatePosition(message.CurLogicX, message.CurLogicY, message.CurDirection); } private void handleStatusMessage(PtrAgvItem agvItem, AmrStatusMessage message) { // 更新位置. TODO 貌似不包含 direction 信息 agvItem.updatePosition(message.CurLogicX, message.CurLogicY, agvItem.direction); agvItem.battery = new CurBatteryData(); BeanCopyUtils.copyTo(message.CurBattery, agvItem.battery); agvItem.x = message.X; agvItem.y = message.Y; agvItem.orientation = message.CurOrientation; agvItem.logicX = message.CurLogicX; agvItem.logicY = message.CurLogicY; // 更新Redis agvItem.updateRedisStatus(); } private void handleTaskStatusMessage(PtrAgvItem agvItem, JsonWrapper jw, String json) { // int eventId = jw.asInt("content", "EventId"); // int seqNo = jw.asInt("content", "SeqNo"); // // switch (eventId) { // case 4: // 任务完成 // int curLogicX = jw.asInt("content", "Info", "CurLogicX"); // int curLogicY = jw.asInt("content", "Info", "CurLogicY"); // agvItem.updateDeviceTaskStatus(seqNo, curLogicX, curLogicY, eventId); // break; // default: // agvItem.updateDeviceTaskStatus(seqNo, 0, 0, eventId); // break; // } int EventId = jw.asInt("content", "EventId"); // log.info("1-Received message: " + json); switch (EventId) { case 0: case 2: case 3: case 5: case 6: case 7: AmrMessage> taskStatusChangeDefault = JacksonUtils.parse(json, typeRef20011_defaultMessage); AmrTaskStatusMessage statusMessage = taskStatusChangeDefault.content; agvItem.updateDeviceTaskStatus((int) statusMessage.SeqNo, 0, 0, statusMessage.EventId); break; case 1: AmrMessage> taskModeChange = JacksonUtils.parse(json, typeRef20011_1Message); AmrTaskStatusMessage modeMessage = taskModeChange.content; agvItem.updateDeviceTaskStatus((int) modeMessage.SeqNo, 0, 0, modeMessage.EventId); agvItem.updateTaskMode(modeMessage.Info.TaskMode); break; case 4: AmrMessage> taskCompleted = JacksonUtils.parse(json, typeRef20011_4Message); AmrTaskStatusMessage completedMessage = taskCompleted.content; agvItem.updateDeviceTaskStatus((int) completedMessage.SeqNo, completedMessage.Info.CurLogicX, completedMessage.Info.CurLogicY, completedMessage.EventId); // agvItem.logicX = completedMessage.Info.CurLogicX; // agvItem.logicY = completedMessage.Info.CurLogicY; // // agvStatusAndInfo.orientation = landmarkMessage.content.CurOrientation; // agvItem.direction = taskCompleted.content.Info.CurDirection; // agvItem.updateTask(completedMessage.Info.CurLogicX, completedMessage.Info.CurLogicY, taskCompleted.content.Info.CurDirection, 4); agvItem.updatePosition(completedMessage.Info.CurLogicX, completedMessage.Info.CurLogicY, taskCompleted.content.Info.CurDirection); break; case 8: AmrMessage> taskTypeChange = JacksonUtils.parse(json, typeRef20011_8Message); AmrTaskStatusMessage typeMessage = taskTypeChange.content; agvItem.updateDeviceTaskStatus((int) typeMessage.SeqNo, 0, 0, typeMessage.EventId); default: break; } } private void updateRedisNetDelay(String agvId, long netDelay) { String statusKey = "lcc:" + runtime.projectUUID + ":" + runtime.envId + ":rcs:id_" + agvId; redis.hPut(statusKey, "NetDelay", String.valueOf(netDelay)); } private final Set heartBeatSet = Sets.newConcurrentHashSet(); /** * 注册心跳单元 */ public void registeHeartBeatSet(PtrAgvItem ptrAgvItem) { this.heartBeatSet.add(ptrAgvItem.getId()); } /** * 注销心跳单元 */ public void unregisteHeartBeatSet(PtrAgvItem ptrAgvItem) { this.heartBeatSet.remove(ptrAgvItem.getId()); } // private final ScheduledExecutorService heartBeatMonitor = Executors.newSingleThreadScheduledExecutor(); // // private void startHeartBeatSendAndStatusMonitor() { // heartBeatMonitor.scheduleAtFixedRate(() -> { // String aliveKey = getRedisKey("alive"); // if (!redis.kExists(aliveKey)) { // handleOfflineEvent(); // } else if (!isOnline) { // handleOnlineEvent(); // } // }, 0, 1, TimeUnit.SECONDS); // } }