在微信小程序调用mqtt.js Client.end()方法是出现内部异常?
发布于 4 年前 作者 gangmao 2720 次浏览 来自 官方Issues

import BaseService from "./BaseService";
// import mqtt from "../utils/mqtt/mqtt.min"
import mqtt from "../utils/mqtt/mqtt-2.18.8"
import MqttConstant from "../constants/MqttConstant";
import LiveMessageError from "../dto/LiveMessageError";
import ErrorConst from "../constants/ErrorConst";
export default class MqttClientService extends BaseService {
    constructor(liveMessageApi) {
        super(liveMessageApi)
        this.mqttClient = null;
        this.topics = [];
        this.mqttOption = {
            keepalive: 60,
            clean: true,
            reconnectPeriod: 0,
            connectTimeout: 30 * 1000,
            username: "",
            password: "",
            protocolId: 'MQTT',
            protocolVersion: 4
        };
    }
    /**
     * 连接EMQX服务
     * @param {Array} serverUrls 
     * @param {Object} mqttOption 
     * @param {Array} topics
     */
    connectMqtt = async (serverUrl, mqttOption, topics) => {
        try {
            if (this.mqttClient && this.mqttClient.connected) {
                return;
            }
            this.topics = topics;
            let option = Object.assign({}, this.mqttOption, mqttOption);
            console.log("EMQX Connect option:", option);
            this.mqttClient = mqtt.connect(serverUrl, option);
            this.mqttClient.on("connect", (connack) => {
                console.log("EMQX服务连接成功");
                this.mqttClient.subscribe(topics, {
                    qos: MqttConstant.QOS_2
                }, (err, granted) => {
                    if (err != null) {
                        console.log("EMQX TOPIC订阅失败:", err);
                    } else {
                        console.log("EMQX TOPIC订阅成功:");
                        for (let i = 0; i < granted.length; i++) {
                            let grantedData = granted[i];
                            console.log("topic:", grantedData.topic, "qos:", grantedData.qos);
                        }
                        /**
                         * 发布事件
                         */
                        const message = {
                            eventName: MqttConstant.MQTT_CLIENT_EVENT_NAME_SUBSCRIBE_SUCCESS,
                            eventData: {
                                appId: this.liveMessageApi.options.appId,
                                clientId: this.liveMessageApi.clientId,
                                accId: this.liveMessageApi.userInfo.accId,
                                accToken: this.liveMessageApi.userInfo.accToken,
                            }
                        }
                        this.publishMessage(MqttConstant.MQTT_PUBLISH_TOPIC_CLIENT_EVENT, message);
                    }
                });
            });

            this.mqttClient.on("reconnect", () => {
                console.log("EMQX Client reconnect");
            });
            // this.mqttClient.on("close", () => {
            //     console.log("EMQX Client close");
            // });
            this.mqttClient.on("disconnect", (packet) => {
                console.log("EMQX Client disconnect");
            });
            this.mqttClient.on("offline", () => {
                console.log("EMQX Client offline");
            });
            this.mqttClient.on("error", (error) => {
                console.log("EMQX Client error");
            });
            // this.mqttClient.on("end", () => {
            //     console.log("EMQX Client end");
            // });

            this.mqttClient.on("message", (topic, payload, packet) => {
                console.log("EMQX Client 收到消息-topic:", topic);
                console.log("EMQX Client 收到消息-payload:", payload.toString());
            });
        } catch (error) {
            console.log("EMQX Client connectMqtt异常:", error);
        }
    }
    /**
     * 关闭
     */
    disconnect = () => {
        try {
            if (this.mqttClient && this.mqttClient.connected) {
                this.mqttClient.end();
                this.mqttClient = null;
            }
        } catch (error) {

        }
    }

    /**
     * 发布消息
     */

    publishMessage = async (topic, message) => {
        return new Promise((reslove, reject) => {
            try {
                if (this.mqttClient && this.mqttClient.connected) {
                    let messageStr = JSON.stringify(message);
                    this.mqttClient.publish(topic, messageStr, {
                        qos: 2
                    }, (err) => {
                        if (err) {
                            console.log("EMQX Client 消息发布错误:", err);
                            reject(err);
                        } else {
                            console.log("EMQX Client 发布消息-topic:", topic, "payload:", message);
                            reslove(true);
                        }
                    });

                } else {
                    reject(new LiveMessageError(ErrorConst.ERROR_CODE_MQTT_UNCONNECT, "客户端未连接EMQX服务"));
                }
            } catch (error) {
                console.log("EMQX Client 消息发布异常:", error);
                reject(error);
            }
        });
    }
}
1 回复

小程序只有五个socket通道,超出了5个,建议app.js中做初始化。全局都使用一个。这样会解决这个问题。

回到顶部