|
|
@ -8,9 +8,6 @@ export interface MqttMessage { |
|
|
payload: Buffer | string |
|
|
payload: Buffer | string |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// 定义MQTT消息处理器
|
|
|
|
|
|
export type MqttMessageHandler = (message: MqttMessage) => void |
|
|
|
|
|
|
|
|
|
|
|
// 定义MQTT连接状态
|
|
|
// 定义MQTT连接状态
|
|
|
enum ConnectionStatus { |
|
|
enum ConnectionStatus { |
|
|
DISCONNECTED = 'disconnected', |
|
|
DISCONNECTED = 'disconnected', |
|
|
@ -20,12 +17,21 @@ enum ConnectionStatus { |
|
|
ERROR = 'error' |
|
|
ERROR = 'error' |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
type ProcessFn = (message: MqttMessage, handler: BackendMessageHandler) => void |
|
|
|
|
|
|
|
|
|
|
|
interface HandlerNode { |
|
|
|
|
|
processFn: ProcessFn |
|
|
|
|
|
handler: BackendMessageHandler |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
/** |
|
|
/** |
|
|
* 服务端将变化数据推送给客户端的管理器类 |
|
|
* 服务端将变化数据推送给客户端的管理器类 |
|
|
*/ |
|
|
*/ |
|
|
export default class LccMqttManager { |
|
|
export default class BackendMessageReceiver { |
|
|
|
|
|
private projectUuid: string |
|
|
|
|
|
private envId: number |
|
|
private client: mqtt.MqttClient | null = null |
|
|
private client: mqtt.MqttClient | null = null |
|
|
private handlers: Map<string, MqttMessageHandler[]> = new Map() |
|
|
private handlers: Map<string, HandlerNode[]> = new Map() |
|
|
|
|
|
|
|
|
// 状态管理
|
|
|
// 状态管理
|
|
|
public state = reactive({ |
|
|
public state = reactive({ |
|
|
@ -37,7 +43,7 @@ export default class LccMqttManager { |
|
|
}) |
|
|
}) |
|
|
|
|
|
|
|
|
// 启动MQTT连接
|
|
|
// 启动MQTT连接
|
|
|
public async start(config: MqttConfig): Promise<boolean> { |
|
|
public async start(projectUuid: string, envId: number, config: MqttConfig): Promise<boolean> { |
|
|
// 如果已经连接,先断开
|
|
|
// 如果已经连接,先断开
|
|
|
if (this.client?.connected) { |
|
|
if (this.client?.connected) { |
|
|
await this.dispose() |
|
|
await this.dispose() |
|
|
@ -123,21 +129,50 @@ export default class LccMqttManager { |
|
|
}) |
|
|
}) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public getTopicStringByType(type: BackendTopicType): [string, ProcessFn] { |
|
|
|
|
|
// 根据类型生成订阅的 topic 字符串
|
|
|
|
|
|
const projId = this.projectUuid |
|
|
|
|
|
const envId = this.envId |
|
|
|
|
|
switch (type) { |
|
|
|
|
|
case 'ServerState': |
|
|
|
|
|
return [`lcc/${projId}/${envId}/server`, this.handleServerState] |
|
|
|
|
|
case 'ClientState': |
|
|
|
|
|
return [`lcc/${projId}/${envId}/client`, this.handleClientState] |
|
|
|
|
|
case 'TaskUpdate': |
|
|
|
|
|
return [`lcc/${projId}/${envId}/task`, this.handleTaskMonitor] |
|
|
|
|
|
case 'InvUpdate': |
|
|
|
|
|
return [`lcc/${projId}/${envId}/inv/#`, this.handleInventoryMonitor] |
|
|
|
|
|
case 'DeviceStatus': |
|
|
|
|
|
return [`lcc/${projId}/${envId}/device/+/status`, this.handleDeviceStatus] |
|
|
|
|
|
case 'DeviceAlive': |
|
|
|
|
|
return [`lcc/${projId}/${envId}/device/+/alive`, this.handleDeviceAlive] |
|
|
|
|
|
case 'Logs': |
|
|
|
|
|
return [`lcc/${projId}/${envId}/log/#`, this.handleLogMonitor] |
|
|
|
|
|
case 'Alarm': |
|
|
|
|
|
return [`lcc/${projId}/${envId}/alarm`, this.handleAlarmMonitor] |
|
|
|
|
|
case 'ScriptUpdate': |
|
|
|
|
|
return [`lcc/${projId}/script`, this.handleScriptSystem] |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
throw new Error(`Invalid topic for type ${type}`) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// 订阅主题
|
|
|
// 订阅主题
|
|
|
public subscribe(topic: string, handler: MqttMessageHandler): void { |
|
|
public subscribe(type: BackendTopicType, handler: BackendMessageHandler): void { |
|
|
if (!this.client?.connected) { |
|
|
if (!this.client?.connected) { |
|
|
console.warn('Cannot subscribe - MQTT not connected') |
|
|
throw new Error('Cannot subscribe - MQTT not connected') |
|
|
return |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
const [topic, processFn] = this.getTopicStringByType(type) |
|
|
|
|
|
|
|
|
// 添加消息处理器
|
|
|
// 添加消息处理器
|
|
|
if (!this.handlers.has(topic)) { |
|
|
if (!this.handlers.has(topic)) { |
|
|
this.handlers.set(topic, []) |
|
|
this.handlers.set(topic, []) |
|
|
} |
|
|
} |
|
|
this.handlers.get(topic)?.push(handler) |
|
|
this.handlers.get(topic)?.push({ processFn, handler }) |
|
|
|
|
|
|
|
|
// 如果尚未订阅该主题
|
|
|
// 如果尚未订阅该主题
|
|
|
if (!this.state.subscribedTopics.includes(topic)) { |
|
|
if (!this.state.subscribedTopics.includes(topic) && this.client) { |
|
|
const options: IClientSubscribeOptions = { qos: 1 } |
|
|
const options: IClientSubscribeOptions = { qos: 1 } |
|
|
|
|
|
|
|
|
this.client.subscribe(topic, options, (err) => { |
|
|
this.client.subscribe(topic, options, (err) => { |
|
|
@ -152,16 +187,20 @@ export default class LccMqttManager { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// 取消订阅
|
|
|
// 取消订阅
|
|
|
public unsubscribe(topic: string, handler?: MqttMessageHandler): void { |
|
|
public unsubscribe(type: BackendTopicType, handler: BackendMessageHandler): void { |
|
|
if (!this.client?.connected) { |
|
|
// if (!this.client?.connected) {
|
|
|
console.warn('Cannot unsubscribe - MQTT not connected') |
|
|
// throw new Error('Cannot unsubscribe - MQTT not connected')
|
|
|
return |
|
|
// }
|
|
|
|
|
|
|
|
|
|
|
|
const [topic, processFn] = this.getTopicStringByType(type) |
|
|
|
|
|
if (!topic) { |
|
|
|
|
|
throw new Error(`Invalid topic for type ${type}`) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// 移除特定处理函数
|
|
|
// 移除特定处理函数
|
|
|
if (handler && this.handlers.has(topic)) { |
|
|
if (handler && this.handlers.has(topic)) { |
|
|
const handlers = this.handlers.get(topic) || [] |
|
|
const handlers = this.handlers.get(topic) || [] |
|
|
const newHandlers = handlers.filter(h => h !== handler) |
|
|
const newHandlers = handlers.filter(h => (h.processFn !== processFn && h.handler !== handler)) |
|
|
|
|
|
|
|
|
if (newHandlers.length > 0) { |
|
|
if (newHandlers.length > 0) { |
|
|
this.handlers.set(topic, newHandlers) |
|
|
this.handlers.set(topic, newHandlers) |
|
|
@ -173,7 +212,7 @@ export default class LccMqttManager { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// 如果没有处理器了,取消订阅
|
|
|
// 如果没有处理器了,取消订阅
|
|
|
if (!this.handlers.has(topic)) { |
|
|
if (!this.handlers.has(topic) && this.client) { |
|
|
this.client.unsubscribe(topic, {}, (err) => { |
|
|
this.client.unsubscribe(topic, {}, (err) => { |
|
|
if (err) { |
|
|
if (err) { |
|
|
console.error(`Failed to unsubscribe from ${topic}:`, err) |
|
|
console.error(`Failed to unsubscribe from ${topic}:`, err) |
|
|
@ -191,7 +230,12 @@ export default class LccMqttManager { |
|
|
|
|
|
|
|
|
// 1. 查找该主题的精确匹配处理器
|
|
|
// 1. 查找该主题的精确匹配处理器
|
|
|
if (this.handlers.has(topic)) { |
|
|
if (this.handlers.has(topic)) { |
|
|
this.handlers.get(topic)?.forEach(handler => handler(message)) |
|
|
this.handlers.get(topic)?.forEach(node => { |
|
|
|
|
|
|
|
|
|
|
|
const { processFn, handler } = node |
|
|
|
|
|
processFn(message, handler) |
|
|
|
|
|
|
|
|
|
|
|
}) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// 2. 查找通配符匹配的处理器
|
|
|
// 2. 查找通配符匹配的处理器
|
|
|
@ -199,7 +243,12 @@ export default class LccMqttManager { |
|
|
|
|
|
|
|
|
for (const pattern of wildcardTopics) { |
|
|
for (const pattern of wildcardTopics) { |
|
|
if (this.matchTopic(pattern, topic)) { |
|
|
if (this.matchTopic(pattern, topic)) { |
|
|
this.handlers.get(pattern)?.forEach(handler => handler(message)) |
|
|
this.handlers.get(pattern)?.forEach(node => { |
|
|
|
|
|
|
|
|
|
|
|
const { processFn, handler } = node |
|
|
|
|
|
processFn(message, handler) |
|
|
|
|
|
|
|
|
|
|
|
}) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
@ -232,129 +281,98 @@ export default class LccMqttManager { |
|
|
return patternParts.length === topicParts.length |
|
|
return patternParts.length === topicParts.length |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// 注册LCC监控处理器
|
|
|
|
|
|
public registerLccHandlers(projId: string, envId: string) { |
|
|
|
|
|
// 服务器状态监控
|
|
|
|
|
|
this.subscribe(`lcc/${projId}/${envId}/server`, this.handleServerState) |
|
|
|
|
|
|
|
|
|
|
|
// 客户端状态监控
|
|
|
|
|
|
this.subscribe(`lcc/${projId}/${envId}/client`, this.handleClientState) |
|
|
|
|
|
|
|
|
|
|
|
// 任务监控
|
|
|
|
|
|
this.subscribe(`lcc/${projId}/${envId}/task`, this.handleTaskMonitor) |
|
|
|
|
|
|
|
|
|
|
|
// 库存监控
|
|
|
|
|
|
this.subscribe(`lcc/${projId}/${envId}/inv/#`, this.handleInventoryMonitor) |
|
|
|
|
|
|
|
|
|
|
|
// 设备监控
|
|
|
|
|
|
this.subscribe(`lcc/${projId}/${envId}/device/+/status`, this.handleDeviceStatus) |
|
|
|
|
|
this.subscribe(`lcc/${projId}/${envId}/device/+/alive`, this.handleDeviceAlive) |
|
|
|
|
|
|
|
|
|
|
|
// 日志监控
|
|
|
|
|
|
this.subscribe(`lcc/${projId}/${envId}/log/#`, this.handleLogMonitor) |
|
|
|
|
|
|
|
|
|
|
|
// 告警监控
|
|
|
|
|
|
this.subscribe(`lcc/${projId}/${envId}/alarm`, this.handleAlarmMonitor) |
|
|
|
|
|
|
|
|
|
|
|
// 脚本系统
|
|
|
|
|
|
this.subscribe(`lcc/${projId}/script`, this.handleScriptSystem) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// ==================== 消息处理器实现 ====================
|
|
|
// ==================== 消息处理器实现 ====================
|
|
|
|
|
|
|
|
|
private handleServerState(message: MqttMessage) { |
|
|
private handleServerState(message: MqttMessage, handler: BackendMessageHandler) { |
|
|
try { |
|
|
try { |
|
|
const data = JSON.parse(message.payload.toString()) |
|
|
const data = JSON.parse(message.payload.toString()) |
|
|
console.log('Server state update:', data) |
|
|
handler('ServerState', message.topic, _.cloneDeep(data)) |
|
|
// 这里可以分发到Vue store或其他状态管理系统
|
|
|
|
|
|
} catch (error) { |
|
|
} catch (error) { |
|
|
console.error('Error parsing server state:', error) |
|
|
console.error('Error parsing server state:', error) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private handleClientState(message: MqttMessage) { |
|
|
private handleClientState(message: MqttMessage, handler: BackendMessageHandler) { |
|
|
try { |
|
|
try { |
|
|
const data = JSON.parse(message.payload.toString()) |
|
|
const data = JSON.parse(message.payload.toString()) |
|
|
console.log('Client state update:', data) |
|
|
handler('ClientState', message.topic, _.cloneDeep(data)) |
|
|
// 处理客户端状态更新
|
|
|
|
|
|
} catch (error) { |
|
|
} catch (error) { |
|
|
console.error('Error parsing client state:', error) |
|
|
console.error('Error parsing client state:', error) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private handleTaskMonitor(message: MqttMessage) { |
|
|
private handleTaskMonitor(message: MqttMessage, handler: BackendMessageHandler) { |
|
|
try { |
|
|
try { |
|
|
const taskData = JSON.parse(message.payload.toString()) |
|
|
const data = JSON.parse(message.payload.toString()) |
|
|
console.log('Task update:', taskData) |
|
|
handler('TaskUpdate', message.topic, _.cloneDeep(data)) |
|
|
// 处理任务更新
|
|
|
|
|
|
} catch (error) { |
|
|
} catch (error) { |
|
|
console.error('Error parsing task data:', error) |
|
|
console.error('Error parsing task data:', error) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private handleInventoryMonitor(message: MqttMessage) { |
|
|
private handleInventoryMonitor(message: MqttMessage, handler: BackendMessageHandler) { |
|
|
try { |
|
|
try { |
|
|
const [projId, envId, inv, catalogCode] = message.topic.split('/') |
|
|
const data = JSON.parse(message.payload.toString()) |
|
|
const inventoryData = JSON.parse(message.payload.toString()) |
|
|
handler('InvUpdate', message.topic, _.cloneDeep(data)) |
|
|
|
|
|
|
|
|
console.log(`Inventory update for ${catalogCode}:`, inventoryData) |
|
|
|
|
|
// 处理库存更新
|
|
|
// 处理库存更新
|
|
|
} catch (error) { |
|
|
} catch (error) { |
|
|
console.error('Error parsing inventory data:', error) |
|
|
console.error('Error parsing inventory data:', error) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private handleDeviceStatus(message: MqttMessage) { |
|
|
private handleDeviceStatus(message: MqttMessage, handler: BackendMessageHandler) { |
|
|
try { |
|
|
try { |
|
|
const [, projId, envId, device, deviceId, status] = message.topic.split('/') |
|
|
const data = JSON.parse(message.payload.toString()) |
|
|
const deviceData = JSON.parse(message.payload.toString()) |
|
|
handler('DeviceStatus', message.topic, _.cloneDeep(data)) |
|
|
|
|
|
|
|
|
console.log(`Device status for ${deviceId}:`, deviceData) |
|
|
|
|
|
// 处理设备状态更新
|
|
|
// 处理设备状态更新
|
|
|
} catch (error) { |
|
|
} catch (error) { |
|
|
console.error('Error parsing device status:', error) |
|
|
console.error('Error parsing device status:', error) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private handleDeviceAlive(message: MqttMessage) { |
|
|
private handleDeviceAlive(message: MqttMessage, handler: BackendMessageHandler) { |
|
|
try { |
|
|
try { |
|
|
const [, projId, envId, device, deviceId, alive] = message.topic.split('/') |
|
|
const data = JSON.parse(message.payload.toString()) |
|
|
const status = message.payload.toString() |
|
|
handler('DeviceAlive', message.topic, _.cloneDeep(data)) |
|
|
|
|
|
|
|
|
console.log(`Device ${deviceId} is ${status}`) |
|
|
|
|
|
// 处理设备存活状态
|
|
|
// 处理设备存活状态
|
|
|
} catch (error) { |
|
|
} catch (error) { |
|
|
console.error('Error parsing device alive status:', error) |
|
|
console.error('Error parsing device alive status:', error) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private handleLogMonitor(message: MqttMessage) { |
|
|
private handleLogMonitor(message: MqttMessage, handler: BackendMessageHandler) { |
|
|
try { |
|
|
try { |
|
|
const [, projId, envId, log, logType] = message.topic.split('/') |
|
|
const data = JSON.parse(message.payload.toString()) |
|
|
const logData = JSON.parse(message.payload.toString()) |
|
|
handler('Logs', message.topic, _.cloneDeep(data)) |
|
|
|
|
|
|
|
|
console.log(`Logs for ${logType}:`, logData) |
|
|
|
|
|
// 处理日志更新
|
|
|
// 处理日志更新
|
|
|
} catch (error) { |
|
|
} catch (error) { |
|
|
console.error('Error parsing log data:', error) |
|
|
console.error('Error parsing log data:', error) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private handleAlarmMonitor(message: MqttMessage) { |
|
|
private handleAlarmMonitor(message: MqttMessage, handler: BackendMessageHandler) { |
|
|
try { |
|
|
try { |
|
|
const alarmData = JSON.parse(message.payload.toString()) |
|
|
const data = JSON.parse(message.payload.toString()) |
|
|
console.log('Alarm update:', alarmData) |
|
|
handler('Alarm', message.topic, _.cloneDeep(data)) |
|
|
// 处理告警更新
|
|
|
|
|
|
} catch (error) { |
|
|
} catch (error) { |
|
|
console.error('Error parsing alarm data:', error) |
|
|
console.error('Error parsing alarm data:', error) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private handleScriptSystem(message: MqttMessage) { |
|
|
private handleScriptSystem(message: MqttMessage, handler: BackendMessageHandler) { |
|
|
try { |
|
|
try { |
|
|
const scriptData = JSON.parse(message.payload.toString()) |
|
|
const data = JSON.parse(message.payload.toString()) |
|
|
console.log('Script system update:', scriptData) |
|
|
handler('ScriptUpdate', message.topic, _.cloneDeep(data)) |
|
|
// 处理脚本系统更新
|
|
|
|
|
|
} catch (error) { |
|
|
} catch (error) { |
|
|
console.error('Error parsing script data:', error) |
|
|
console.error('Error parsing script data:', error) |
|
|
} |
|
|
} |