实现 Promise 并发
极简实现
借助 Promise.race
js
// 模拟异步请求
const request = url => new Promise(resolve => {
setTimeout(() => {
resolve(`任务 ${url.padEnd(10, ' ')} 完成`)
}, 1000)
})
// 并发控制函数
async function asyncPool(tasks = [], max = 3, fn = () => Promise.resolve()) {
// 待执行任务数组
// 正在执行的任务数组
let pool = []
for (const item of tasks) {
// 生成异步任务
const task = fn(item)
// 添加到正在执行的任务数组
pool.push(task)
task.then(data => {
// 当任务执行完毕,将其从正在执行任务数组中移除
console.log(`${data}; 当前并发数: ${pool.length}`)
pool.splice(pool.indexOf(task), 1)
})
// 当并发池满了,就先去执行并发池中的任务,有任务执行完成后,再继续循环
if (pool.length === max) {
await Promise.race(pool)
}
}
}
const tasks = new Array(10).fill('').map((task, i) => `url - ${i + 1}`)
asyncPool(tasks, 3, request)
不借助 Promise.race
js
function asyncPool(urls, max) {
return new Promise((resolve, reject) => {
if (urls.length === 0)
resolve([])
let index = 0 // 下一次异步任务的下标
let count = 0 // 请求完成的数量
let active = 1 // 当前活跃的数量
const result = []
async function request() {
active++
const i = index
const url = urls[index++]
try {
const resp = await fetch(url)
result[i] = resp
} catch(err) {
result[i] = err
} finally {
active--
count++
console.log(`${urls[i]}; 当前并发数: ${active}`)
// 判断请求是否完成
if (count === urls.length) {
resolve(result)
}
// 处理下标边界
// 如果还有剩余请求且当前活跃请求未达到最大并发数,继续发起请求
if (index < urls.length && active <= max) {
request()
}
}
}
// 取数组长度和并发数量的最小值
for (let i = 0; i < Math.min(urls.length, max); i++) {
request()
}
})
}
// 测试
const urls = []
for (let i = 1; i < 100; i++) {
urls.push(`https://jsonplaceholder.typicode.com/albums/${i}`)
}
asyncPool(urls, 20).then(resp => {
console.log(resp)
})
async-pool 源码实现
- 先初始化 limit 个 promise 实例,将他们放到 executing 数组中
- 使用 Promise.race 等待这 limit 个 promise 实例的执行结果
- 一旦某一个 promise 的状态发生变更,就将其从 executing 中删除,然后再执行循环生成新的 promise,放入 executing 中
- 重复 2、3 两个步骤,直到所有的 promise 都被执行完
- 最后使用 Promise.all 返回所有 promise 实例的执行结果
es6 版本
js
function asyncPool(poolLimit, array, iteratorFn) {
let i = 0
const ret = []
const executing = []
const enqueue = function () {
if (i === array.length) return Promise.resolve()
const item = array[i++]
const p = Promise.resolve().then(() => iteratorFn(item, array))
ret.push(p)
let r = Promise.resolve()
if (poolLimit <= array.length) {
const e = p.then(() => executing.splice(executing.indexOf(e), 1))
executing.push(e)
if (executing.length >= poolLimit) {
r = Promise.race(executing)
}
}
return r.then(() => enqueue())
}
return enqueue().then(() => Promise.all(ret))
}
es7 版本
js
async function asyncPool(poolLimit, array, iteratorFn) {
const ret = []
const executing = []
for (const item of array) {
const p = Promise.resolve().then(() => iteratorFn(item, array))
ret.push(p)
if (poolLimit <= array.length) {
// then 回调中,当这个 promise 状态变为 fulfilled 后,将其从正在执行的 promise 列表 executing 中删除
const e = p.then(() => executing.splice(executing.indexOf(e), 1))
executing.push(e)
if (executing.length >= poolLimit) {
// 一旦正在执行的 promise 列表数量等于限制数,就使用 Promise.race 等待某一个 promise 状态发生变更
// 状态变更后,就会执行上面 then 的回调,将该 promise 从 executing 中删除
// 然后再进入到下一次 for 循环,生成新的 promise 进行补充
await Promise.race(executing)
}
}
}
return Promise.all(ret)
}