博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Rx.js使用之结合node的读写流进行数据处理
阅读量:6449 次
发布时间:2019-06-23

本文共 4317 字,大约阅读时间需要 14 分钟。

前几天接到任务要使用第三方API处理几千张图片,得到结果集。我的做法就是使用Rx.js结合node的读写流来完成数据读入、接口请求、数据处理、数据写入这些操作。本篇就来分享这个代码和其逻辑。

Rx.js是什么

Rx.js是一个响应式编程库,能简化事件/异步处理逻辑代码。其视所有的事件/数据为__流__,提供各种流处理的operators,将输入与输出平滑的链接起来。可以类比为linux上的pipe操作符: ls | grep a*b | less

Node的读写流

  • readline模块提供readline.createInterface来创建行读取流,即读取文件的每一行作为持续的输入数据

  • fs模块提供fs.createWriteStream来创建写入流, 其返回的writerwriteend两个方法,来完成流式的写入与结束写入。

第三方接口的使用情况

  • 并发数有限制,3个是出现其出现并发错误概率最低的最大并发数

  • 接口请求过于频繁,会较大概率出现连续的并发错误, 大概延迟400秒效果尚可

  • 提供给第三方的图片是链接,其需要服务器自己下载,会出现操作超时或者长时间不返回的情况。

任务列表

  1. 从文件读取图片文件名

  2. 拼接url

  3. 发送3个并发请求

  4. 请求出现超时问题重试3次,最后如果失败则放弃

  5. 出现非超时错误(如并发错误等)则一直重试,直到成功

  6. 请求成功后延迟400秒继续发起下一个请求

  7. 处理返回的数据

  8. 写入文件

代码分析

引入依赖,创建读取与写入流

const https = require('https');const querystring = require('querystring');const Rx = require('rxjs');const readline = require('readline');const fs = require('fs');const imgStream = readline.createInterface({  // 创建行读取流    input: fs.createReadStream('filelist.txt')});const writeStream = fs.createWriteStream('output.txt');  // 创建写入流

使用Rx处理读取并反馈结果给写入

Rx.Observable.fromEvent(imgStream, 'line')  // 将行读取流转化为Rx的事件流.takeUntil(Rx.Observable.fromEvent(imgStream, 'close'))  // 读取流截止时终止Rx流.map(img => generateData(img))  // 将文件名处理成post的数据 // 发起请求,并发3个,请求返回后延迟400ms后再进行下一步处理并发起下一个请求.mergeMap(data => requestAPI(data).delay(400), (o, i) => i, 3) .subscribe(data => {    // 处理数据并写入文件    let str = data.url;    if (data.status === 200 && data.data.xxx.length) {        zzz = data.data.xxx.map(x => x.zzz);        str += `    ${JSON.stringify(zzz)}`;    }    writeStream.write(`${str}\n`);}, (err) => {    console.log(err);    console.log('!!!!!!!!!!!ERROR!!!!!!!!!');}, () => {    console.log('=====complete======');    writeStream.end();});

其中的需要关注的点在.mergeMap(data => requestAPI(data).delay(400), (o, i) => i, 3) ,这里内部requestAPI返回一个封装了http异步请求并延迟400ms的Rx流,当请求完成并延迟完成后将数据返回上一层继续进行处理(可以类比为Promisethen)

使用Rx的自定义流封装一个带错误重试机制的http请求

const requestFacepp = dataStr => {    const options = {        hostname: 'api.xxx.com',        port: 443,        path: '/xxx',        method: 'POST',        headers: {            'Content-Type': 'application/x-www-form-urlencoded',            'Content-Length': Buffer.byteLength(dataStr)        }    };    const reqData = querystring.parse(dataStr);    const retry$ = new Rx.Subject();  // 触发重试的流,当其发出数据时会使`retryWhen`触发重试错误流    let retryTimes = 3;  // 设置非正常失败(超时)重试的上限    // 使用Rx的自定义流封装一个带错误重试机制的http请求,可以类比为new Promise    // 但要注意的是Rx是流,即数据是可以持续的,而Promise则只有一个结果和状态    return Rx.Observable.create(observer => {        const req = https.request(options, res => {            let data = '';            res.setEncoding('utf8');            res.on('data', chunk => {                data += chunk;            });            res.on('end', () => {                if (res.statusCode === 200) {                    // 请求正常返回,向流内推送结果并结束                    observer.next({                        status: res.statusCode,                        url: reqData.image_url,                        data: JSON.parse(data)                    });                    observer.complete();                } else {                    // 请求正常返回,但不是正常结果,抛出错误并重试                    console.log(`retring: ${reqData.image_url}`);                    observer.error({                        status: res.statusCode,                        url: reqData.image_url                    });                    retry$.next(true);                }            });        });        req.setTimeout(4000, () => {            // 设置请求4s超时,超时后终止,引发请求抛错            req.abort();        });        req.on('error', err => {            console.log(`retring(${retryTimes}): ${reqData.image_url}`);            // 请求抛错时重试,超出次数则终止本次请求            observer.error(`error: ${err.message}`);            if (retryTimes > 0) {                retryTimes--;                retry$.next(true);            } else {                retry$.complete();            }        });        req.write(dataStr);        req.end();        return () => { req.abort() };  // 返回终止流的处理回调    })    .retryWhen(errs => errs.switchMap(err => {        // 未超过次数返回重试流,超出则返回错误数据并终止本次Rx流        return retryTimes > 0 ? retry$ : Rx.Observable.of({            status: 500,            url: reqData.image_url        });    }));};

收尾

到此就搬砖完毕,开个车让他慢慢跑就可以了。

本篇展示了Rx在流数据处理与异步处理上的方式,逻辑与代码都挺清晰、扁平。在处理交杂的逻辑时也不错(重试部分)。如果喜欢或者有帮助的话可以后面在发一篇Rx在复杂DOM事件处理上的应用。;-)


本文始发于本人的公众号:枫之叶。公众号二维码

640?wx_fmt=jpeg&wxfrom=5&wx_lazy=1

转载地址:http://qrlwo.baihongyu.com/

你可能感兴趣的文章
headfirst python 03, 04
查看>>
Git在Githib和Github上的使用
查看>>
mysql使用学习的帮助文档
查看>>
Apache Rewrite规则详解
查看>>
SQL细小知识点
查看>>
linux系统调用的三种方法
查看>>
bzoj 2818 欧拉函数
查看>>
【cisco探索之路】
查看>>
Python条件语句
查看>>
JavaScript小结
查看>>
python Web开发你要理解的WSGI & uwsgi详解
查看>>
基于CentOS与VmwareStation10搭建Oracle11G RAC 64集群环境
查看>>
SQL语言:DDL/DML/DQL/DCL
查看>>
swift代理使用
查看>>
代数几何
查看>>
大牛网站
查看>>
Ajax传数据到servlet
查看>>
springMVC和struts2有什么不同?为什么要用springMVC或者struts2?让你实现一个MVC框架大概如何设计?...
查看>>
微信JSApi支付~坑和如何填坑
查看>>
使用 iview Table 表格组件修改操作的显示隐藏
查看>>