前几天接到任务要使用第三方API处理几千张图片,得到结果集。我的做法就是使用Rx.js结合node的读写流来完成数据读入、接口请求、数据处理、数据写入这些操作。本篇就来分享这个代码和其逻辑。
Rx.js是什么
Rx.js是一个响应式编程库,能简化事件/异步处理逻辑代码。其视所有的事件/数据为__流__,提供各种流处理的operators
,将输入与输出平滑的链接起来。可以类比为linux
上的pipe
操作符: ls | grep a*b | less
。
Node的读写流
readline
模块提供readline.createInterface
来创建行读取流,即读取文件的每一行作为持续的输入数据fs
模块提供fs.createWriteStream
来创建写入流, 其返回的writer
有write
和end
两个方法,来完成流式的写入与结束写入。
第三方接口的使用情况
并发数有限制,3个是出现其出现
并发错误
概率最低的最大并发数接口请求过于频繁,会较大概率出现连续的
并发错误
, 大概延迟400秒效果尚可提供给第三方的图片是链接,其需要服务器自己下载,会出现操作超时或者长时间不返回的情况。
任务列表
从文件读取图片文件名
拼接url
发送3个并发请求
请求出现超时问题重试3次,最后如果失败则放弃
出现非超时错误(如并发错误等)则一直重试,直到成功
请求成功后延迟400秒继续发起下一个请求
处理返回的数据
写入文件
代码分析
引入依赖,创建读取与写入流
const https = require('https');const querystring = require('querystring');const Rx = require('rxjs');const readline = require('readline');const fs = require('fs');const imgStream = readline.createInterface({ // 创建行读取流 input: fs.createReadStream('filelist.txt')});const writeStream = fs.createWriteStream('output.txt'); // 创建写入流
使用Rx处理读取并反馈结果给写入
Rx.Observable.fromEvent(imgStream, 'line') // 将行读取流转化为Rx的事件流.takeUntil(Rx.Observable.fromEvent(imgStream, 'close')) // 读取流截止时终止Rx流.map(img => generateData(img)) // 将文件名处理成post的数据 // 发起请求,并发3个,请求返回后延迟400ms后再进行下一步处理并发起下一个请求.mergeMap(data => requestAPI(data).delay(400), (o, i) => i, 3) .subscribe(data => { // 处理数据并写入文件 let str = data.url; if (data.status === 200 && data.data.xxx.length) { zzz = data.data.xxx.map(x => x.zzz); str += ` ${JSON.stringify(zzz)}`; } writeStream.write(`${str}\n`);}, (err) => { console.log(err); console.log('!!!!!!!!!!!ERROR!!!!!!!!!');}, () => { console.log('=====complete======'); writeStream.end();});
其中的需要关注的点在.mergeMap(data => requestAPI(data).delay(400), (o, i) => i, 3)
,这里内部requestAPI
返回一个封装了http异步请求并延迟400ms的Rx流,当请求完成并延迟完成后将数据返回上一层继续进行处理(可以类比为Promise
的then
)
使用Rx的自定义流封装一个带错误重试机制的http请求
const requestFacepp = dataStr => { const options = { hostname: 'api.xxx.com', port: 443, path: '/xxx', method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded', 'Content-Length': Buffer.byteLength(dataStr) } }; const reqData = querystring.parse(dataStr); const retry$ = new Rx.Subject(); // 触发重试的流,当其发出数据时会使`retryWhen`触发重试错误流 let retryTimes = 3; // 设置非正常失败(超时)重试的上限 // 使用Rx的自定义流封装一个带错误重试机制的http请求,可以类比为new Promise // 但要注意的是Rx是流,即数据是可以持续的,而Promise则只有一个结果和状态 return Rx.Observable.create(observer => { const req = https.request(options, res => { let data = ''; res.setEncoding('utf8'); res.on('data', chunk => { data += chunk; }); res.on('end', () => { if (res.statusCode === 200) { // 请求正常返回,向流内推送结果并结束 observer.next({ status: res.statusCode, url: reqData.image_url, data: JSON.parse(data) }); observer.complete(); } else { // 请求正常返回,但不是正常结果,抛出错误并重试 console.log(`retring: ${reqData.image_url}`); observer.error({ status: res.statusCode, url: reqData.image_url }); retry$.next(true); } }); }); req.setTimeout(4000, () => { // 设置请求4s超时,超时后终止,引发请求抛错 req.abort(); }); req.on('error', err => { console.log(`retring(${retryTimes}): ${reqData.image_url}`); // 请求抛错时重试,超出次数则终止本次请求 observer.error(`error: ${err.message}`); if (retryTimes > 0) { retryTimes--; retry$.next(true); } else { retry$.complete(); } }); req.write(dataStr); req.end(); return () => { req.abort() }; // 返回终止流的处理回调 }) .retryWhen(errs => errs.switchMap(err => { // 未超过次数返回重试流,超出则返回错误数据并终止本次Rx流 return retryTimes > 0 ? retry$ : Rx.Observable.of({ status: 500, url: reqData.image_url }); }));};
收尾
到此就搬砖完毕,开个车让他慢慢跑就可以了。
本篇展示了Rx在流数据处理与异步处理上的方式,逻辑与代码都挺清晰、扁平。在处理交杂的逻辑时也不错(重试部分)。如果喜欢或者有帮助的话可以后面在发一篇Rx在复杂DOM事件处理上的应用。;-)本文始发于本人的公众号:枫之叶。公众号二维码