如何将JavaScript请求调度器改写为支持长尾词查询的?
- 内容介绍
- 文章标签
- 相关推荐
本文共计1873个文字,预计阅读时间需要8分钟。
前言:JS 支持并行请求,但同时也可能带来一些问题,例如目标服务器压力过大。因此,引入请求调度器来调节并发度,优化并发请求。TLDR;直接跳转至《抽象和复用》章节。为了获取请求调度器。
前言:JS 天然支持并行请求,但与此同时会带来一些问题,比如会造成目标服务器压力过大,所以本文引入“请求调度器”来节制并发度。
TLDR; 直接跳转『抽象和复用』章节。
为了获取一批互不依赖的资源,通常从性能考虑可以用 Promise.all(arrayOfPromises)来并发执行。比如我们已有 100 个应用的 id,需求是聚合所有应用的 PV,我们通常会这么写:
const ids = [1001, 1002, 1003, 1004, 1005]; const urlPrefix = 'opensearch.example.com/api/apps'; // fetch 函数发送 HTTP 请求,返回 Promise const appPromises = ids.map(id => `${urlPrefix}/${id}`).map(fetch); Promise.all(appPromises) // 通过 reduce 做累加 .then(apps => apps.reduce((initial, current) => initial + current.pv, 0)) .catch((error) => console.log(error));
上面的代码在应用个数不多的情况下,可以运行正常。当应用个数达到成千上万时,对支持并发数不是很好的系统,你的「压测」会把第三放服务器搞挂,暂时无法响应请求:
<html> <head><title>502 Bad Gateway</title></head> <body bgcolor="white"> <center><h1>502 Bad Gateway</h1></center> <hr><center>nginx/1.10.1</center> </body> </html>
如何解决呢?
一个很自然的想法是,既然不支持这么多的并发请求,那就分割成几大块,每块为一个 chunk,chunk 内部的请求依然并发,但块的大小(chunkSize)限制在系统支持的最大并发数以内。前一个 chunk 结束后一个 chunk 才能继续执行,也就是说 chunk 内部的请求是并发的,但 chunk 之间是串行的。思路其实很简单,写起来却有一定难度。总结起来三个操作:分块、串行、聚合
难点在如何串行执行 Promise,Promise 仅提供了并行(Promise.all)功能,并没有提供串行功能。我们从简单的三个请求开始,看如何实现,启发式解决问题(heuristic)。
// task1, task2, task3 是三个返回 Promise 的工厂函数,模拟我们的异步请求 const task1 = () => new Promise((resolve) => { setTimeout(() => { resolve(1); console.log('task1 executed'); }, 1000); }); const task2 = () => new Promise((resolve) => { setTimeout(() => { resolve(2); console.log('task2 executed'); }, 1000); }); const task3 = () => new Promise((resolve) => { setTimeout(() => { resolve(3); console.log('task3 executed'); }, 1000); }); // 聚合结果 let result = 0; const resultPromise = [task1, task2, task3].reduce((current, next) => current.then((number) => { console.log('resolved with number', number); // task2, task3 的 Promise 将在这里被 resolve result += number; return next(); }), Promise.resolve(0)) // 聚合初始值 .then(function(last) { console.log('The last promise resolved with number', last); // task3 的 Promise 在这里被 resolve result += last; console.log('all executed with result', result); return Promise.resolve(result); });
运行结果如图 1:
代码解析:我们想要的效果,直观展示其实是 fn1().then(() => fn2()).then(() => fn3())。上面代码能让一组 Promise 按顺序执行的关键之处就在 reduce 这个“引擎”在一步步推动 Promise 工厂函数的执行。
难点解决了,我们看看最终代码:
/** * 模拟 HTTP 请求 * @param {String} url * @return {Promise} */ function fetch(url) { console.log(`Fetching ${url}`); return new Promise((resolve) => { setTimeout(() => resolve({ pv: Number(url.match(/\d+$/)) }), 2000); }); } const urlPrefix = 'opensearch.example.com/api/apps'; const aggregator = { /** * 入口方法,开启定时任务 * * @return {Promise} */ start() { return this.fetchAppIds() .then(ids => this.fetchAppsSerially(ids, 2)) .then(apps => this.sumPv(apps)) .catch(error => console.error(error)); }, /** * 获取所有应用的 ID * * @private * * @return {Promise} */ fetchAppIds() { return Promise.resolve([1001, 1002, 1003, 1004, 1005]); }, promiseFactory(ids) { return () => Promise.all(ids.map(id => `${urlPrefix}/${id}`).map(fetch)); }, /** * 获取所有应用的详情 * * 一次并发请求 `concurrency` 个应用,称为一个 chunk * 前一个 `chunk` 并发完成后一个才继续,直至所有应用获取完毕 * * @private * * @param {[Number]} ids * @param {Number} concurrency 一次并发的请求数量 * @return {[Object]} 所有应用的信息 */ fetchAppsSerially(ids, concurrency = 100) { // 分块 let chunkOfIds = ids.splice(0, concurrency); const tasks = []; while (chunkOfIds.length !== 0) { tasks.push(this.promiseFactory(chunkOfIds)); chunkOfIds = ids.splice(0, concurrency); } // 按块顺序执行 const result = []; return tasks.reduce((current, next) => current.then((chunkOfApps) => { console.info('Chunk of', chunkOfApps.length, 'concurrency requests has finished with result:', chunkOfApps, '\n\n'); result.push(...chunkOfApps); // 拍扁数组 return next(); }), Promise.resolve([])) .then((lastchunkOfApps) => { console.info('Chunk of', lastchunkOfApps.length, 'concurrency requests has finished with result:', lastchunkOfApps, '\n\n'); result.push(...lastchunkOfApps); // 再次拍扁它 console.info('All chunks has been executed with result', result); return result; }); }, /** * 聚合所有应用的 PV * * @private * * @param {[]} apps * @return {[type]} [description] */ sumPv(apps) { const initial = { pv: 0 }; return apps.reduce((accumulator, app) => ({ pv: accumulator.pv + app.pv }), initial); } }; // 开始运行 aggregator.start().then(console.log);
运行结果如图 2:
抽象和复用
目的达到了,因具备通用性,下面开始抽象成一个模式以便复用。
串行
先模拟一个 jsonplaceholder.typicode.com/posts/${id}`) ); // 串行执行之 const tasks = await jsonplaceholder.typicode.com/posts/1 GET jsonplaceholder.typicode.com/posts/2 GET jsonplaceholder.typicode.com/posts/3 GET jsonplaceholder.typicode.com/posts/4 GET jsonplaceholder.typicode.com/posts/5 GET jsonplaceholder.typicode.com/posts/6 GET jsonplaceholder.typicode.com/posts/7
分段串行,段中并行
重点来了。本文的请求调度器实现
/** * Schedule promises. * @param {Array<(...arg: any[]) => Promise<any>>} factories * @param {number} concurrency */ function schedulePromises(factories, concurrency) { /** * chunk * @param {any[]} arr * @param {number} size * @returns {Array<any[]>} */ const chunk = (arr, size = 1) => { return arr.reduce((acc, cur, idx) => { const modulo = idx % size; if (modulo === 0) { acc[acc.length] = [cur]; } else { acc[acc.length - 1].push(cur); } return acc; }, []) }; const chunks = chunk(factories, concurrency); let resps = []; return chunks.reduce( (acc, cur) => { return acc .then(() => { console.log('---'); return Promise.all(cur.map(f => f())); }) .then((intermediateResponses) => { resps.push(...intermediateResponses); return resps; }) }, Promise.resolve() ); }
测试下,执行调度器:
// 分段串行,段中并行 schedulePromises(jsonplaceholder.typicode.com/posts/1 GET jsonplaceholder.typicode.com/posts/2 GET jsonplaceholder.typicode.com/posts/3 --- GET jsonplaceholder.typicode.com/posts/4 GET jsonplaceholder.typicode.com/posts/5 GET jsonplaceholder.typicode.com/posts/6 --- GET jsonplaceholder.typicode.com/posts/7 resps: [ { "url": "jsonplaceholder.typicode.com/posts/1", "delay": 733.010980640727, "at": 1615131322163 }, { "url": "jsonplaceholder.typicode.com/posts/2", "delay": 594.5056229848931, "at": 1615131322024 }, { "url": "jsonplaceholder.typicode.com/posts/3", "delay": 738.8230109146299, "at": 1615131322168 }, { "url": "jsonplaceholder.typicode.com/posts/4", "delay": 525.4604386109747, "at": 1615131322698 }, { "url": "jsonplaceholder.typicode.com/posts/5", "delay": 29.086379722201183, "at": 1615131322201 }, { "url": "jsonplaceholder.typicode.com/posts/6", "delay": 592.2345027398272, "at": 1615131322765 }, { "url": "jsonplaceholder.typicode.com/posts/7", "delay": 513.0684467560949, "at": 1615131323284 } ]
总结
- 如果并发请求的数量太大,可以考虑分块串行,块中请求并发。
- 问题看似复杂,不放先简化之,然后一步步推导出关键点,最后抽象,就能找到解决方案。
- 本文的精髓在于使用
reduce作为串行推动的引擎,故掌握其对我们日常开发遇到的迷局破解可提供新思路,reduce精通见上篇 你终于用 Reduce 了 🎉。
以上就是JS 实现请求调度器的详细内容,更多关于JS 请求调度器的资料请关注自由互联其它相关文章!
本文共计1873个文字,预计阅读时间需要8分钟。
前言:JS 支持并行请求,但同时也可能带来一些问题,例如目标服务器压力过大。因此,引入请求调度器来调节并发度,优化并发请求。TLDR;直接跳转至《抽象和复用》章节。为了获取请求调度器。
前言:JS 天然支持并行请求,但与此同时会带来一些问题,比如会造成目标服务器压力过大,所以本文引入“请求调度器”来节制并发度。
TLDR; 直接跳转『抽象和复用』章节。
为了获取一批互不依赖的资源,通常从性能考虑可以用 Promise.all(arrayOfPromises)来并发执行。比如我们已有 100 个应用的 id,需求是聚合所有应用的 PV,我们通常会这么写:
const ids = [1001, 1002, 1003, 1004, 1005]; const urlPrefix = 'opensearch.example.com/api/apps'; // fetch 函数发送 HTTP 请求,返回 Promise const appPromises = ids.map(id => `${urlPrefix}/${id}`).map(fetch); Promise.all(appPromises) // 通过 reduce 做累加 .then(apps => apps.reduce((initial, current) => initial + current.pv, 0)) .catch((error) => console.log(error));
上面的代码在应用个数不多的情况下,可以运行正常。当应用个数达到成千上万时,对支持并发数不是很好的系统,你的「压测」会把第三放服务器搞挂,暂时无法响应请求:
<html> <head><title>502 Bad Gateway</title></head> <body bgcolor="white"> <center><h1>502 Bad Gateway</h1></center> <hr><center>nginx/1.10.1</center> </body> </html>
如何解决呢?
一个很自然的想法是,既然不支持这么多的并发请求,那就分割成几大块,每块为一个 chunk,chunk 内部的请求依然并发,但块的大小(chunkSize)限制在系统支持的最大并发数以内。前一个 chunk 结束后一个 chunk 才能继续执行,也就是说 chunk 内部的请求是并发的,但 chunk 之间是串行的。思路其实很简单,写起来却有一定难度。总结起来三个操作:分块、串行、聚合
难点在如何串行执行 Promise,Promise 仅提供了并行(Promise.all)功能,并没有提供串行功能。我们从简单的三个请求开始,看如何实现,启发式解决问题(heuristic)。
// task1, task2, task3 是三个返回 Promise 的工厂函数,模拟我们的异步请求 const task1 = () => new Promise((resolve) => { setTimeout(() => { resolve(1); console.log('task1 executed'); }, 1000); }); const task2 = () => new Promise((resolve) => { setTimeout(() => { resolve(2); console.log('task2 executed'); }, 1000); }); const task3 = () => new Promise((resolve) => { setTimeout(() => { resolve(3); console.log('task3 executed'); }, 1000); }); // 聚合结果 let result = 0; const resultPromise = [task1, task2, task3].reduce((current, next) => current.then((number) => { console.log('resolved with number', number); // task2, task3 的 Promise 将在这里被 resolve result += number; return next(); }), Promise.resolve(0)) // 聚合初始值 .then(function(last) { console.log('The last promise resolved with number', last); // task3 的 Promise 在这里被 resolve result += last; console.log('all executed with result', result); return Promise.resolve(result); });
运行结果如图 1:
代码解析:我们想要的效果,直观展示其实是 fn1().then(() => fn2()).then(() => fn3())。上面代码能让一组 Promise 按顺序执行的关键之处就在 reduce 这个“引擎”在一步步推动 Promise 工厂函数的执行。
难点解决了,我们看看最终代码:
/** * 模拟 HTTP 请求 * @param {String} url * @return {Promise} */ function fetch(url) { console.log(`Fetching ${url}`); return new Promise((resolve) => { setTimeout(() => resolve({ pv: Number(url.match(/\d+$/)) }), 2000); }); } const urlPrefix = 'opensearch.example.com/api/apps'; const aggregator = { /** * 入口方法,开启定时任务 * * @return {Promise} */ start() { return this.fetchAppIds() .then(ids => this.fetchAppsSerially(ids, 2)) .then(apps => this.sumPv(apps)) .catch(error => console.error(error)); }, /** * 获取所有应用的 ID * * @private * * @return {Promise} */ fetchAppIds() { return Promise.resolve([1001, 1002, 1003, 1004, 1005]); }, promiseFactory(ids) { return () => Promise.all(ids.map(id => `${urlPrefix}/${id}`).map(fetch)); }, /** * 获取所有应用的详情 * * 一次并发请求 `concurrency` 个应用,称为一个 chunk * 前一个 `chunk` 并发完成后一个才继续,直至所有应用获取完毕 * * @private * * @param {[Number]} ids * @param {Number} concurrency 一次并发的请求数量 * @return {[Object]} 所有应用的信息 */ fetchAppsSerially(ids, concurrency = 100) { // 分块 let chunkOfIds = ids.splice(0, concurrency); const tasks = []; while (chunkOfIds.length !== 0) { tasks.push(this.promiseFactory(chunkOfIds)); chunkOfIds = ids.splice(0, concurrency); } // 按块顺序执行 const result = []; return tasks.reduce((current, next) => current.then((chunkOfApps) => { console.info('Chunk of', chunkOfApps.length, 'concurrency requests has finished with result:', chunkOfApps, '\n\n'); result.push(...chunkOfApps); // 拍扁数组 return next(); }), Promise.resolve([])) .then((lastchunkOfApps) => { console.info('Chunk of', lastchunkOfApps.length, 'concurrency requests has finished with result:', lastchunkOfApps, '\n\n'); result.push(...lastchunkOfApps); // 再次拍扁它 console.info('All chunks has been executed with result', result); return result; }); }, /** * 聚合所有应用的 PV * * @private * * @param {[]} apps * @return {[type]} [description] */ sumPv(apps) { const initial = { pv: 0 }; return apps.reduce((accumulator, app) => ({ pv: accumulator.pv + app.pv }), initial); } }; // 开始运行 aggregator.start().then(console.log);
运行结果如图 2:
抽象和复用
目的达到了,因具备通用性,下面开始抽象成一个模式以便复用。
串行
先模拟一个 jsonplaceholder.typicode.com/posts/${id}`) ); // 串行执行之 const tasks = await jsonplaceholder.typicode.com/posts/1 GET jsonplaceholder.typicode.com/posts/2 GET jsonplaceholder.typicode.com/posts/3 GET jsonplaceholder.typicode.com/posts/4 GET jsonplaceholder.typicode.com/posts/5 GET jsonplaceholder.typicode.com/posts/6 GET jsonplaceholder.typicode.com/posts/7
分段串行,段中并行
重点来了。本文的请求调度器实现
/** * Schedule promises. * @param {Array<(...arg: any[]) => Promise<any>>} factories * @param {number} concurrency */ function schedulePromises(factories, concurrency) { /** * chunk * @param {any[]} arr * @param {number} size * @returns {Array<any[]>} */ const chunk = (arr, size = 1) => { return arr.reduce((acc, cur, idx) => { const modulo = idx % size; if (modulo === 0) { acc[acc.length] = [cur]; } else { acc[acc.length - 1].push(cur); } return acc; }, []) }; const chunks = chunk(factories, concurrency); let resps = []; return chunks.reduce( (acc, cur) => { return acc .then(() => { console.log('---'); return Promise.all(cur.map(f => f())); }) .then((intermediateResponses) => { resps.push(...intermediateResponses); return resps; }) }, Promise.resolve() ); }
测试下,执行调度器:
// 分段串行,段中并行 schedulePromises(jsonplaceholder.typicode.com/posts/1 GET jsonplaceholder.typicode.com/posts/2 GET jsonplaceholder.typicode.com/posts/3 --- GET jsonplaceholder.typicode.com/posts/4 GET jsonplaceholder.typicode.com/posts/5 GET jsonplaceholder.typicode.com/posts/6 --- GET jsonplaceholder.typicode.com/posts/7 resps: [ { "url": "jsonplaceholder.typicode.com/posts/1", "delay": 733.010980640727, "at": 1615131322163 }, { "url": "jsonplaceholder.typicode.com/posts/2", "delay": 594.5056229848931, "at": 1615131322024 }, { "url": "jsonplaceholder.typicode.com/posts/3", "delay": 738.8230109146299, "at": 1615131322168 }, { "url": "jsonplaceholder.typicode.com/posts/4", "delay": 525.4604386109747, "at": 1615131322698 }, { "url": "jsonplaceholder.typicode.com/posts/5", "delay": 29.086379722201183, "at": 1615131322201 }, { "url": "jsonplaceholder.typicode.com/posts/6", "delay": 592.2345027398272, "at": 1615131322765 }, { "url": "jsonplaceholder.typicode.com/posts/7", "delay": 513.0684467560949, "at": 1615131323284 } ]
总结
- 如果并发请求的数量太大,可以考虑分块串行,块中请求并发。
- 问题看似复杂,不放先简化之,然后一步步推导出关键点,最后抽象,就能找到解决方案。
- 本文的精髓在于使用
reduce作为串行推动的引擎,故掌握其对我们日常开发遇到的迷局破解可提供新思路,reduce精通见上篇 你终于用 Reduce 了 🎉。
以上就是JS 实现请求调度器的详细内容,更多关于JS 请求调度器的资料请关注自由互联其它相关文章!

