Browse Source

BackendMessageReceiver 失误关闭问题

master
修宁 5 months ago
parent
commit
06f60f60ca
  1. 38
      src/core/manager/BackendMessageReceiver.ts

38
src/core/manager/BackendMessageReceiver.ts

@ -1,6 +1,5 @@
import mqtt, { type IClientOptions, type IClientSubscribeOptions } from 'mqtt'
import { reactive } from 'vue'
import type Viewport from '@/core/engine/Viewport.ts'
// 定义MQTT消息类型
export interface MqttMessage {
@ -22,6 +21,7 @@ type ProcessFn = (message: MqttMessage, handler: BackendMessageHandler) => void
interface HandlerNode {
processFn: ProcessFn
handler: BackendMessageHandler
handlerId: string
}
/**
@ -86,6 +86,7 @@ export default class BackendMessageReceiver {
this.client.on('close', () => {
this.state.status = ConnectionStatus.DISCONNECTED
this.state.isConnected = false
debugger
console.log('backendMQTT disconnected')
})
@ -175,7 +176,8 @@ export default class BackendMessageReceiver {
if (!this.handlers.has(topic)) {
this.handlers.set(topic, [])
}
this.handlers.get(topic)?.push({ processFn, handler })
const handlerId = system.createUUID()
this.handlers.get(topic)?.push({ processFn, handler, handlerId })
// 如果尚未订阅该主题
if (!this.state.subscribedTopics.includes(topic) && this.client) {
@ -185,44 +187,37 @@ export default class BackendMessageReceiver {
this.client.subscribe(topic, options, (err) => {
if (err) {
console.error(`Failed to subscribe to ${topic}:`, err)
} else {
console.log(`BackendMQTT Subscribed to ${topic}`)
}
})
}
return () => {
// 取消订阅
this.unsubscribe(type, handler)
this.unsubscribe(handlerId)
}
}
// 取消订阅
public unsubscribe(type: BackendTopicType, handler: BackendMessageHandler): void {
private unsubscribe(handlerId: string): void {
// if (!this.client?.connected) {
// throw new Error('Cannot unsubscribe - backendMQTT not connected')
// }
const [topic, processFn] = this.getTopicStringByType(type)
if (!topic) {
throw new Error(`Invalid topic for type ${type}`)
}
// 移除特定处理函数
if (handler && this.handlers.has(topic)) {
const handlers = this.handlers.get(topic) || []
const newHandlers = handlers.filter(h => (h.processFn !== processFn && h.handler !== handler))
for (const [topic, handlers] of this.handlers.entries()) {
const newHandlers = handlers.filter(node => node.handlerId !== handlerId)
if (newHandlers.length > 0) {
this.handlers.set(topic, newHandlers)
} else {
this.handlers.delete(topic)
}
} else {
this.handlers.delete(topic)
}
// 如果没有处理器了,取消订阅
if (!this.handlers.has(topic) && this.client) {
this.handlers.delete(topic)
if (this.client) {
const options = {}
console.log(`client.unsubscribe(${topic},`, options)
this.client.unsubscribe(topic, options, (err) => {
@ -230,10 +225,13 @@ export default class BackendMessageReceiver {
console.error(`Failed to unsubscribe from ${topic}:`, err)
} else {
this.state.subscribedTopics = this.state.subscribedTopics.filter(t => t !== topic)
console.log(`Unsubscribed from ${topic}`)
console.log(`BackendMQTT Unsubscribed from ${topic}`)
}
})
}
}
}
}
// 处理消息分发

Loading…
Cancel
Save