小程序中如何实现并发控制?
发布于 3 年前 作者 fli 2634 次浏览 来自 分享

小程序中如何实现并发控制?

一、性能之网络请求数

wx.requestwx.uploadFilewx.downloadFile 的最大并发限制是 10 个;

小程序中,短时间内发起太多请求会触发小程序并发请求数量的限制同时太多请求也可能导致加载慢等问题,应合理控制请求数量,甚至做请求的合并等。

上传多图使用Promise.all并发处理,很容易触发限制,导致请求失败。

故需做并发控制。实现并发控制此处将对p-limitasync-pool进行研究

二、p-limit并发控制的实现

2.1 p-limit的使用

const limit = pLimit(1);

const input = [
	limit(() => fetchSomething('foo')),
	limit(() => fetchSomething('bar')),
	limit(() => doSomething())
];

// 一次只运行一个promise
const result = await Promise.all(input);
console.log(result);

pLimit(concurrency)

返回一个limit函数。

  • concurrency(Number): 表示并发限制,最小值为1,默认值为Infinity

limit(fn, …args)

返回通过调用fn(...args)返回的promise

fn(Function): 表示Promise-returning/async function
args: 表示传递给fn的参数

limit.activeCount

当前正在运行的promise数量

limit.pendingCount

等待运行的promise数量(即它们的内部fn尚未被调用)。

limit.clearQueue()

丢弃等待运行的pending promises。

2.2 p-limit 的实现

要想理解p-limit必须先掌握浏览器中event-loop的执行顺序:

console.log('script start')

async function async1() {
  await async2()
  console.log('async1 end')
}
async function async2() {
  console.log('async2 end')
}
// 以上两个async函数可改写为以下代码
// new Promise((resolve, reject) => {
//   console.log('async2 end')
//   // Promise.resolve() 将代码插入微任务队列尾部
//   // resolve 再次插入微任务队列尾部
//   resolve(Promise.resolve())
// }).then(() => {
//   console.log('async1 end')
// })
// 如果 await 后面跟着 Promise 的话,async1 end 需要等待三个 tick 才能执行到
async1()

setTimeout(function() {
  console.log('setTimeout')
}, 0)

new Promise(resolve => {
  console.log('Promise')
  resolve()
})
  .then(function() {
    console.log('promise1')
  })
  .then(function() {
    console.log('promise2')
  })

console.log('script end')
// script start => async2 end => Promise => script end => promise1 => promise2 => async1 end => setTimeout

先介绍下宏任务和微任务具体有哪些内容,

微任务包括 process.nextTickpromiseMutationObserver,其中 process.nextTick 为 Node 独有。

宏任务包括 scriptsetTimeoutsetIntervalsetImmediateI/OUI rendering

Event Loop 执行顺序如下所示:

  • 首先执行同步代码,这属于宏任务
  • 当执行完所有同步代码后,执行栈为空,查询是否有异步代码需要执行
  • 执行所有微任务
  • 当执行完所有微任务后,如有必要会渲染页面
  • 然后开始下一轮 Event Loop,执行宏任务中的异步代码,也就是 setTimeout 中的回调函数
function pLimit(concurrency) {
    if(!((Number.isInteger(concurrency)||concurrency===Number.POSITIVE_INFINITY)&&concurrency>0)) {
        throw new TypeError('Expected `concurrency` to be a number from 1 and up');
    }
    // 用一个queue队列维护所有Promise异步函数
    const queue=[];
    let activeCount=0;

    const next=() => {
        // 某异步函数完成后,需要将activeCount--
        activeCount--;

        if(queue.length>0) {
            // 再次从队列中出队并执行异步函数,activeCount维持在concurrency
            queue.shift()();
        }
    };

    const run=async (fn,resolve,args) => {
        // 
        activeCount++;
        // 进一步将fn封装为异步函数并运行
        const result=(async () => fn(...args))();
        // 此处返回generator函数 的resolve值,即Promise.all
        resolve(result);

        try {
            // 等待result异步函数完成(例如某请求完成)
            await result;
        } catch {}

        next();
    };

    const enqueue=(fn,resolve,args) => {

        queue.push(run.bind(undefined,fn,resolve,args));
        // setTimeout(()=>{
        //     // 正在运行的Promise数量activeCount始终不大于concurrency,从而达到控制并发的目的
        //     if(activeCount<concurrency&&queue.length>0) {
        //         // 队列出队并执行改函数
        //         queue.shift()();
        //     }
        // },0);
        (async () => {
			// 这个函数需要等到下一个微任务再比较 `activeCount` 和 `concurrency`
            // 因为 `activeCount` 在 run 函数出列和调用时异步更新。 
            // if 语句中的比较也需要异步进行,以获取 `activeCount` 的最新值。
			await Promise.resolve();
            // 正在运行的Promise数量activeCount始终不大于concurrency,从而达到控制并发的目的
			if (activeCount < concurrency && queue.length > 0) {
                // 队列出队并执行改函数
				queue.shift()();
			}
		})();
    };

    const generator = (fn,...args) => new Promise(resolve => {
        enqueue(fn,resolve,args);
    });

    Object.defineProperties(generator,{
        // 正在运行的Promise数量
        activeCount: {
            get: () => activeCount,
        },
        // 等待运行的Promise数量
        pendingCount: {
            get: () => queue.length,
        },
        // 清空queue队列中的异步函数
        clearQueue: {
            value: () => {
                while(queue.length!=0) {
                    pueue.shift();
                }
            },
        },
    });
    return generator;
}

三、asyncPool并发控制的实现

async-pool 这个库提供了 ES7 和 ES6 两种不同版本的实现,在分析其具体实现之前,我们来看一下它如何使用。

3.1 asyncPool 的使用

function asyncPool(poolLimit, array, iteratorFn){ ... }

该函数接收 3 个参数:

  • poolLimit(Number):表示限制的并发数;
  • array(Array):表示任务数组;
  • iteratorFn(Function):表示迭代函数,用于实现对每个任务项进行处理,该函数会返回一个 Promise 对象或异步函数。

asyncPool在有限的并发池中运行多个promise-returning & async functions。一旦其中一个承诺被拒绝,它就会立即拒绝。当所有 Promise 完成时,它就会resolves。它尽快调用迭代器函数(在并发限制下)。例如:

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.

通过观察以上的注释信息,我们可以大致地了解 asyncPool 函数内部的控制流程

3.2 asyncPool 实现

async function asyncPool(poolLimit, array, iteratorFn) {
  const ret = []; // 存储所有的异步任务
  const executing = []; // 存储正在执行的异步任务
  for (const item of array) {
    // 调用iteratorFn函数创建异步任务
    const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p); // 保存新的异步任务

    // 当poolLimit值小于或等于总任务个数时,进行并发控制
    if (poolLimit <= array.length) {
      // 当任务完成后,从正在执行的任务数组中移除已完成的任务
      const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e); // 保存正在执行的异步任务
      if (executing.length >= poolLimit) {
        await Promise.race(executing); // 等待较快的任务执行完成
      }
    }
  }
  return Promise.all(ret);
}

在以上代码中,充分利用了 Promise.allPromise.race 函数特点,再结合 ES7 中提供的 async await 特性,最终实现了并发控制的功能。利用 await Promise.race(executing); 这行语句,我们会等待 正在执行任务列表 中较快的任务执行完成之后,才会继续执行下一次循环。

四、实践运用

使用p-limit

const limit=pLimit(5);
const fn=() => {
     return axios({
         method: 'get',
         url: 'https://www.fastmock.site/mock/883c2b5177653e4ef705b7ffdc680af1/daily/story',
     })
         .then(function(response) {
         return response.data;
     });
};
const input=[];
for(let i=0;i<50;i++) {
    input.push(limit(fn))
}
Promise.all(input).then(res => {
    console.log('res',res)
});

使用asyncPool

const fn=() => {
    return axios({
        method: 'get',
        url: 'https://www.fastmock.site/mock/883c2b5177653e4ef705b7ffdc680af1/daily/story',
    })
        .then(function(response) {
        return response.data;
    });
}
const input=[];
for(let i=0;i<50;i++) {
    input.push(fn);
}
const timeout= f => new Promise(resolve => setTimeout(() => resolve(f())));
asyncPool(5,input,timeout).then(res => {
    console.log('res',res);
})

1 回复

并发超过10以后,会请求失败吗?不是说超出的会进入队列执行吗

回到顶部