0%

rxjs常用操作符及项目应用

前言

RXJS全名Reactive Extensions for JavaScript,是JavaScript的响应式扩展。什么是响应式?响应式就是跟随时间不断变化的数据、状态、事件等转换成可被观察的序列,然后订阅那些变化,一旦变化则会执行业务逻辑。适用于异步场景。ReactiveX结合了观察者模式、迭代器模式和函数式编程构建一个管理事件序列的理想方式。

RxJS所能解决的问题:

时刻保持响应。这对于一个应用来说意味着当他处理用户的输入或者凭借AJAX从服务器接受一些数据时停止是一件不可能接受的事情。在JavaScript中解决问题的方案始终是大量运用回调函数来进行一些应用的处理。但回调的使用使内容丰富的大型应用变得凌乱,一旦你需要多块数据时你就陷入了回调地狱。Angular2中,组件间通讯@Output对应的EventEmitter实际上就是一个Subject;Http模块中Observable作为大部分API的交互对象使用。但是这只是官方推荐的外部扩展并不必须,也可以使用Promise,之后会介绍Observable和Promise的区别。

RxJS初探:

首先尝试一个简单的小例子:
  • 存在一个数组,里面含有多种数据类型的元素
  • 找到其中的数字及字数组成的字符串
  • 每一个符合标准的元素乘以2
  • 累加

需要同时满足以上四个要求,可以通过循环列表来筛选满足要求的元素在进一步操作

1
2
3
4
5
6
7
8
const source = [1, 5, 9, 3, 'hi', 'tb', 456, '11', 'yoyoyo'];
let total = 0;
for (let i = 0; i < source.length; i++) {
const num = parseInt(source[i], 10);
if (!isNaN(num)) {
total += num * 2;
}
}

如果拥有函数式编程的经验,相信大家一定会通过es6的映射函数进行操作,接下来通过这个例子说明一下命令式编程和函数式编程的区别。

声明式编程发轫于人工智能的研究,主要包括函数式编程(functional programming,简称FP)和逻辑式编程(logic programming,简称LP);
如果想探究声明式编程与函数式编程的具体关系请访问:函数式与声明式的关系,在此不是本篇的重点

1
2
3
4
5
6
const source = [1, 5, 9, 3, 'hi', 'tb', 456, '11', 'yoyoyo'];
let total = source
.map(x => parseInt(x as any, 10))
.filter(x => !isNaN(x))
.map(x => x * 2)
.reduce((total, value) => total + value);

函数式编程中的函数这个术语不是指计算机中的函数,而是指数学中的函数,即自变量的映射。也就是说一个函数返回的值仅决定于函数参数的值,不依赖其他状态。命令式编程注重的是函数执行的细节,函数式编程注重的是函数执行的结果。
函数式编程对函数的使用有一些特殊要求:

  • 声明式函数
  • 纯函数
  • 数据不可变性

声明式编程是人脑思维方式的抽象,即利用数理逻辑或既定规范对已知条件进行推理或运算。声明式的函数,让开发者只需要表达”想要做什么”,而不需要表达“怎么去做”。
纯函数指的是执行结果由输入参数决定,参数相同时结果相同,不受其他数据影响,并且不会带来副作用的函数。副作用指的是函数做了和本身运算返回值没有关系的事情,如修改外部变量或传入的参数对象,甚至是执行console.log都算是副作用。前端中常见的副作用有发送http请求、操作DOM、调用alert或者confirm函数等。
数据不可变就是指这个数据一旦产生,它的值就永远不会变。JavaScript中字符串类型和数字类型就是不可改变的,而对象基本都是可变的,可能会带来各种副作用。

函数式编程带来的好处主要可以总结为以下两点:

  • 相比命令式编程,少了非常多的状态变量的声明与维护
  • 代码更为简洁,可读性更强

进入RxJS

流(Stream)无非是随时间流逝的一系列事件。流可以用来处理任何类型的事件,如:鼠标点击,键盘按下等等。你可以把流作为变量,它有能力从数据角度对发生的改变做出反应。Stream在其时间轴中发出三样东西,一个值,一个错误和完整的信号。我们必须捕获此异步事件并相应地执行函数。

想要抓取事件,一般可以用 callback 或是 Promise 来达成,promise和observable都是为解决异步问题而设计的(避免“回调地狱”), 然而 Promise 主要設设计一次性的事件与单一回傳=传值,而RxJS除了包含Promise外,提供了observable可观察对象,以惰性的方式推送多值的集合

Pull拉取 VS Push推送

拉和推是数据生产者和数据的消费者两种不同的交流协议;什么是”pull”?在”pull”体系中,数据的消费者决定何时从数据生产者那里获取数据,而生产者自身并不会意识到什么时候数据将会被发送给消费者。每一个JS函数都是一个“pull”体系.
什么是”push”?在push体系中,数据的生产者决定何时发送数据给消费者,消费者不会在接收数据之前意识到它将要接收这个数据。
Promise(承诺))是当今JS中最常见的Push体系,一个Promise(数据的生产者)发送一个resolved value(成功状态的值)来注册一个回调(数据消费者),但是不同于函数的地方的是:Promise决定着何时数据才被推送至这个回调函数。
RxJS引入了Observables(可观察对象),一个全新的”推体系”。一个可观察对象是一个产生多值的生产者,并”推送给”Observer(观察者)。

单值与多值

如果您通过Promise提出请求并等待回复。您可以确定对同一请求不会有多个响应。Observables允许您在调用observer.complete()函数之前解析多个值

总结RxJS VS Promise —— 三个最重要的区别

区别 Rxjs Promise
动作是否可以取消?
是否可以发射多个值?
各种工具函数?

开始了解RxJS中的几个重要成员

  • Observable(可观察对象):表示一个可调用的未来值或者时间序列上的事件集合
  • Observer(观察者):一个回调函数集合,它知道怎样去监听被Observable发送的值
  • Subscription(订阅): 表示一个可观察对象的执行,主要用于取消执行
  • Subject(主题):等同于一个事件驱动器,是将一个值或者事件广播到多个观察者的唯一途径
  • Operators(操作符): 纯函数,使得以函数编程的方式处理集合

Observable

Observable是一个具有一些特殊的特征的函数。它接收一个“观察者”(一个带有“next”,“error”和“complete”方法的对象)

  • Observable支持在应用程序中的发布者和订阅者之间传递消息。
  • Observable很懒惰。它不会开始生成数据,直到您订阅它为止。
  • subscribe()返回一个订阅,消费者可以在unsubscribe()取消订阅并销毁生产者。
  • RxJS提供了许多可用于创建Observable的函数。这些函数可以简化创建可观察对象的过程

Observer

什么是Observer?Observer是Observable传递过来的数据的消费者。Observers由一个带有“next”,“error”和“complete”方法的对象构成,next、error、和 complete用来传递数据。

1
2
3
4
5
6
var observer = {
next: x => console.log('Observable got a next value: ' + x),
error: err => console.log('Observable got and error: ' + err),
complete: () => console.log('Observable got a complete notification')
};
observable.subscribe(observer)

Subscription

一个Subscription代表了一个一次性的资源,通常表示的是一个Observable执行。一个Subscription有一个重要的方法,unsubscribe,它不需要参数,仅仅是取消订阅释放资源。

1
2
3
const observable = interval(1000);
const subscription = observable.subscribe(x => console.log(x));
subscription.unsubscribe();

Subscriptions也可以放在一起,这样会导致使用一个unsubscribe()将取消多个Observable执行,通过add、remove方法维护关联的Subscription

1
2
3
4
5
6
7
8
const observable1 = interval(400);
const observable2 = interval(300);
const subscription = observable1.subscribe(x => console.log('first: ' + x));
const childSubscription = observable2.subscribe(x => console.log('second: ' + x));
subscription.add(childSubscription);
setTimeout(() => {
subscription.unsubscribe();
}, 1000);

Subject

  • Subject是一种特殊类型的Observable,允许将值多播到许多观察者。虽然普通的Observable是单播的(每个订阅的Observer都拥有Observable的独立执行),但Subject是多播的

  • 每一个Subject都是一个observable可观察对象,给定一个Subject后,你可以订阅它,提供的观察者将会正常的开始接收值。从观察者的角度来看,它不能判断一个可观察对象的执行时来自于单播的Observable还是来自于一个Subject.
    在Subject的内部,subscribe并不调用一个新的发送值得执行。它仅仅在观察者注册表中注册给定的观察者,类似addEventListener的工作方式。

  • 每一个Subject都是一个Observer观察者对象。它是一个拥有next()/error()/complete()方法的对象。要想Subject提供一个新的值,只需调用next(),它将会被多播至用来监听Subject的观察者。

Subject就是一个可观察对象,只不过可以被多播至多个观察者。同时Subject也类似于EventEmitter:维护者着众多事件监听器的注册表。

BehaviorSubject

Subjects的一个变体是BehaviorSubject,其有”当前值”的概念。它储存着要发射给消费者的最新的值。无论何时一个新的观察者订阅它,都会立即接受到这个来自BehaviorSubject的”当前值”

BehaviorSubject对于表示”随时间的值”是很有用的。举个例子,人的生日的事件流是一个Subject,然而人的年龄的流是一个BehaviorSubject。

ReplaySubject

一个ReplaySubject类似于一个BehaviorSubject,因为它可以发送一个过去的值(old values)给一个新的订阅者,但是它也可以记录可观察对象的一部分执行。

一个ReplaySubject 从一个可观察对象的执行中记录多个值,并且可以重新发送给新的订阅者。

AsyncSubject

AsyncSubject是另一个变体,它只发送给观察者可观察对象执行的最新值,并且仅在执行结束时。AsyncSubject类似于last()操作符,因为它为了发送单一值而等待complete通知。

常用的操作符

每一个操作符都会产生一个新的Observable,不会对上游的Observable做任何修改,这完全符合函数式编程“数据不可变”的要求。pipe方法就是数据管道,会对数据流进行处理,可以添加操作符作为参数。

interval 创建一个无限长度的周期性序列

1
interval(1000)  // 输出: 0,1,2...

timer 指定一个额外的参数来调节第一值的静默时长,第二个参数可选,若无则仅仅在规定的静默时长后输出一个值,然后结束序列

1
timer(0,1000) // 输出:0,1,2...

from 可以将已有的数据转化为Observable,参数为iterable数据集对象
1
from([1,2,3,4])   // 输出:1,2,3,4

of 不在同一个数据集中的多个来源的数据
1
2
of([1,2,3])    //    [1,2,3]
from([1,2,3]) // 1,2,3

fromEvent 将事件流转化为Observable,
1
2
const el = document.getElementById("btn"); 
fromEvent(el,"click");

delay 推迟 参数为数字或Date对象

startWith 可以在源序列之前添加额外的元素

map 对源序列进行变换,并返回新的序列(改变了源)

1
2
const source = of(1,2,3); // 输出: 1 2 3
const target = source.map(x => x * 2); //输出: 2 4 6

concat有序拼接 , merge无序拼接

mergeMap 平坦化映射:首先将一个序列的各元素映射为序列,然后将各序列融合,参数是一个映射函数,返回值为序列

1
2
3
4
5
const source = fromEvent(document, 'click');
const target = source.pipe(
mapTo(1),
mergeMap(() => interval(1000).pipe(take(3)))
).subscribe(res => console.log(res)); //输出: 0 1 2 0 ...

switchMap 与mergeMap的区别在于将最新的序列中的元素输出

concatMap 将源序列各元素映射为序列,然后按顺序拼接 (与mergeMap的区别所在)

filter 筛选源序列中满足条件的元素,并返回新的序列

1
2
const source = of(1,2,3,4,5); //序列: 1 2 3 4 5
const target = source.filter(x => x < 4) //序列: 1 2 3

take 截取序列头部元素数量输出

distinct 去重,并返回一个新序列

1
2
const source = of(1,2,2,3,4,2,1); //序列: 1 2 2 3 4 2 1
const target = source.distinct(); //序列:1 2 3 4

distinctUntilChanged 相邻元素去重,并返回一个新序列
1
2
const source = of(1,2,2,3,4,2,1); //序列: 1 2 2 3 4 2 1
const target = source.distinctUntilChanged(); //序列:1 2 3 4 2 1

debounce 去抖动,一段时间内只取最新数据作为一次发射数据,其他数据取消发射

throttle (和debounce唯一区别是debounce取一段时间内最新的,而throttle忽略这段时间后,发现新值才发送, 通俗讲,都设定一个时间周期,持续触发事件,throttle为每到时间周期便会触发一次,bebounce为触发周期小于设定时间周期不予事件触发)

zip 支持可变数量的序列作为参数,最后一个参数应当是一个组合函数, 其返回值将作为目标序列的元素

1
2
3
4
5
const source1 = of(1, 2, 3);
const source2 = of(4, 5, 6);
const target = zip(source1, source2).subscribe(([val1, val2]) => { // 序列: 1-4 2-5 3-6
console.log(val1 + '-' + val2)
});

forkJoin 将多个序列的最后一个元素组合为一个数组后,作为目标序列的唯一元素,一个常见用例是在页面加载时你希望发起多个请求,并在所有请求都响应后再采取行动
1
2
3
4
5
const source1 = of(1, 2, 3);
const source2 = of(4, 5, 6);
const target = forkJoin(source1, source2).subscribe(([val1, val2]) => { // 序列: 3-6
console.log(val1 + '-' + val2)
});

combineLatest 将多个序列的最后一个元素,使用组合函数构成目标序列的一个新元素

const setHtml = id => val => document.getElementById(id).innerHTML = val;
const addOneClick$ = id => fromEvent(document.getElementById(id), 'click')
    .pipe(
        mapTo(1),
        startWith(0),
        scan((acc, curr) => acc + curr, 0),
        tap(setHtml(`${id}Total`))
    );
    const combineTotal$ = combineLatest(
        addOneClick$('red'),
        addOneClick$('black')
    ).pipe(
        map(([val1, val2]) => val1 + val2)
    )
    .subscribe(setHtml('total'));

map用于对自身对象数值进行映射,将发射对象转换成另一个发射对象发射, 返回一个包含映射结果的Observable对象 而mergeMap是把自身对象里的数值进行映射并转换成一个新的Observable对象.返回一个内部元素为映射的Observable对象的Observable对象

marble diagrams

为了解释operators是如何工作的,光是文本解释是不够的。许多operators和时间有关,它们可能会延迟执行,例如,throttle等。图标往往能够比文字更多表达清楚。Marble Diagrams能够可视化的表现出operators是如何工作的,包括输入的Observable(s),operator和它的参数,以及输出的Observable. Marble diagrams