在微信小程序调用mqtt.js Client.end()方法是出现内部异常?
import BaseService from "./BaseService";
// import mqtt from "../utils/mqtt/mqtt.min"
import mqtt from "../utils/mqtt/mqtt-2.18.8"
import MqttConstant from "../constants/MqttConstant";
import LiveMessageError from "../dto/LiveMessageError";
import ErrorConst from "../constants/ErrorConst";
export default class MqttClientService extends BaseService {
constructor(liveMessageApi) {
super(liveMessageApi)
this.mqttClient = null;
this.topics = [];
this.mqttOption = {
keepalive: 60,
clean: true,
reconnectPeriod: 0,
connectTimeout: 30 * 1000,
username: "",
password: "",
protocolId: 'MQTT',
protocolVersion: 4
};
}
/**
* 连接EMQX服务
* @param {Array} serverUrls
* @param {Object} mqttOption
* @param {Array} topics
*/
connectMqtt = async (serverUrl, mqttOption, topics) => {
try {
if (this.mqttClient && this.mqttClient.connected) {
return;
}
this.topics = topics;
let option = Object.assign({}, this.mqttOption, mqttOption);
console.log("EMQX Connect option:", option);
this.mqttClient = mqtt.connect(serverUrl, option);
this.mqttClient.on("connect", (connack) => {
console.log("EMQX服务连接成功");
this.mqttClient.subscribe(topics, {
qos: MqttConstant.QOS_2
}, (err, granted) => {
if (err != null) {
console.log("EMQX TOPIC订阅失败:", err);
} else {
console.log("EMQX TOPIC订阅成功:");
for (let i = 0; i < granted.length; i++) {
let grantedData = granted[i];
console.log("topic:", grantedData.topic, "qos:", grantedData.qos);
}
/**
* 发布事件
*/
const message = {
eventName: MqttConstant.MQTT_CLIENT_EVENT_NAME_SUBSCRIBE_SUCCESS,
eventData: {
appId: this.liveMessageApi.options.appId,
clientId: this.liveMessageApi.clientId,
accId: this.liveMessageApi.userInfo.accId,
accToken: this.liveMessageApi.userInfo.accToken,
}
}
this.publishMessage(MqttConstant.MQTT_PUBLISH_TOPIC_CLIENT_EVENT, message);
}
});
});
this.mqttClient.on("reconnect", () => {
console.log("EMQX Client reconnect");
});
// this.mqttClient.on("close", () => {
// console.log("EMQX Client close");
// });
this.mqttClient.on("disconnect", (packet) => {
console.log("EMQX Client disconnect");
});
this.mqttClient.on("offline", () => {
console.log("EMQX Client offline");
});
this.mqttClient.on("error", (error) => {
console.log("EMQX Client error");
});
// this.mqttClient.on("end", () => {
// console.log("EMQX Client end");
// });
this.mqttClient.on("message", (topic, payload, packet) => {
console.log("EMQX Client 收到消息-topic:", topic);
console.log("EMQX Client 收到消息-payload:", payload.toString());
});
} catch (error) {
console.log("EMQX Client connectMqtt异常:", error);
}
}
/**
* 关闭
*/
disconnect = () => {
try {
if (this.mqttClient && this.mqttClient.connected) {
this.mqttClient.end();
this.mqttClient = null;
}
} catch (error) {
}
}
/**
* 发布消息
*/
publishMessage = async (topic, message) => {
return new Promise((reslove, reject) => {
try {
if (this.mqttClient && this.mqttClient.connected) {
let messageStr = JSON.stringify(message);
this.mqttClient.publish(topic, messageStr, {
qos: 2
}, (err) => {
if (err) {
console.log("EMQX Client 消息发布错误:", err);
reject(err);
} else {
console.log("EMQX Client 发布消息-topic:", topic, "payload:", message);
reslove(true);
}
});
} else {
reject(new LiveMessageError(ErrorConst.ERROR_CODE_MQTT_UNCONNECT, "客户端未连接EMQX服务"));
}
} catch (error) {
console.log("EMQX Client 消息发布异常:", error);
reject(error);
}
});
}
}