Rxjs Subject 源码片段
1 | /** |
因为 Subject 在订阅时,是把 observer 存放到观察者列表中,并在接收到新值的时候,遍历观察者列表并调用观察者上的 next 方法
Subject继承自Observable,将Observable的单路推送转换为多路推送。它就是讲单路Observable转变为多路Observable的桥梁。
Subject的几个衍生类:BehaviorSubject,ReplaySubject,AsyncSubject;
BehaviorSubject:保存最近向数据消费者发送的值,当一个Observer订阅后,他会立即收到最新的值;它非常适合表示随时间推移的值;BehaviorSubject 形容一个人的生日,随时间不断更新;
1 | var subject = new Rx.BehaviorSubject(0) //初始值 |
1 | var subject = new Rx.ReplaySubject(3); /* 回放数量 */ |
ReplaySubject 如同于 BehaviorSubject 是 Subject 的子类。通过 ReplaySubject 可以向新的订阅者推送旧数值,就像一个录像机 ReplaySubject 可以记录Observable的一部分状态(过去时间内推送的值);.一个 ReplaySubject 可以记录Observable执行过程中推送的多个值,并向新的订阅者回放它们。
1 | var subject = new Rx.AsyncSubject(); |
AsyncSubject是Subject的另外一个衍生类,Observable仅会在执行完成后(complete),推送执行环境中的最后一个值。业务上很少用
既然Subject是一个Observer,你可以把它作为subscribe(订阅)普通Observable时的参数
1 | var subject = new Rx.Subject(); |
通过添加两个Observer到Observer列表中,之后Observable直接订阅Observer列表将普通的单路推送转换为多路推送
Cold & HOT
observable is default cold; cold: 表示只有 subscribe 出现 observer 才会被激活; 当有多个subscribe时,每一个都是一条独立的链;hot: 每个subscirbe共享一个链,不管什么时间插入subscribe,都不会重新开始。如何把一个cold 变成 hot?Subject则可以充当中介。multicast、refCount、publish、share则是通过Subject完成将cold转变为hot的方法。
1 | let sub = new Subject(); |
模拟异步请求数据的业务场景,如果有更多的subscribe的时候,则会对请求服务器多次,造成服务器负载严重,此时一般解决方法为以下两种
1 | let obs = sub.map(v => { |
1 | let obs = sub.map(v => { |
引入multicast(组播)的概念,通过中介者订阅源序列在由它推送出去,下面是它的运作方式
1 | var source = Rx.Observable.interval(1000).take(3); |
换一种形式,用multicast方法来实现
1 | var source = Rx.Observable.interval(1000) |
1 | var result = Observable.interval(1000).take(6) //执行两次 |
常用应用场景
1 | let sub = new Subject(); |
ajax会打印两次,增加服务器端负载; 调用share()方法;其中angular2中的http也是not share的,在类似场景中同样的问题;
建立一個 subject 先拿去訂閱 observable(source),再把我們真正的 observer 加到 subject 中,這樣一來就能完成訂閱,而每個加到 subject 中的 observer 都能整組的接收到相同的元素。
Observable.multicast(new Rx.Subject()) == Observable.publish();对于Subject三种衍生形式,publishReplay(1)、publishBehavior(0)、publishLast()
另外 Observable.publish().refCount() == Observable.share()
总结Subject!
- 既是Observable又是Observer
- 对内部的observers进行组播
- observer default is cold and not share.(cold 表示只有 subscribe 出现 observer 才会被激活. not share 表示每一个 subscribe 都会激活 observer 链)
业务场景:窗口a接收到A,b接收到B,c接受到C,本窗口d则需要异步的捕获a和b窗口的值并乘c窗口的值,d = (a+b)*c;
我们可以把每个数据的变更定义成流,然后定义出这些流的组合关系
1 | const A = new Rx.Subject() |