0%

Subject

Rxjs Subject 源码片段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* Subject继承于Observable
*/
export class Subject extends Observable {
constructor() {
super();
this.observers = []; // 观察者列表
this.closed = false;
this.isStopped = false;
this.hasError = false;
this.thrownError = null;
}
next(value) {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
if (!this.isStopped) {
const { observers } = this;
const len = observers.length;
const copy = observers.slice();
for (let i = 0; i < len; i++) { // 循环调用观察者next方法,通知观察者
copy[i].next(value);
}
}
}
error(err) {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
this.hasError = true;
this.thrownError = err;
this.isStopped = true;
const { observers } = this;
const len = observers.length;
const copy = observers.slice();
for (let i = 0; i < len; i++) { // 循环调用观察者error方法
copy[i].error(err);
}
this.observers.length = 0;
}
complete() {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
this.isStopped = true;
const { observers } = this;
const len = observers.length;
const copy = observers.slice();
for (let i = 0; i < len; i++) { // 循环调用观察者complete方法
copy[i].complete();
}
this.observers.length = 0; // 清空内部观察者列表
}
}

因为 Subject 在订阅时,是把 observer 存放到观察者列表中,并在接收到新值的时候,遍历观察者列表并调用观察者上的 next 方法

Subject继承自Observable,将Observable的单路推送转换为多路推送。它就是讲单路Observable转变为多路Observable的桥梁。

Subject的几个衍生类:BehaviorSubject,ReplaySubject,AsyncSubject;

BehaviorSubject:保存最近向数据消费者发送的值,当一个Observer订阅后,他会立即收到最新的值;它非常适合表示随时间推移的值;BehaviorSubject 形容一个人的生日,随时间不断更新;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var subject = new Rx.BehaviorSubject(0) //初始值
subject.subscribe({
next:(v) => {
console.log('A' + v )
}
})
subject.next(1);
subject.next(2);
subject.subscribe({
next:(v) => {
console.log('B' + v )
}
})
subject.next(3);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var subject = new Rx.ReplaySubject(3); /* 回放数量 */
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});

subject.next(5);

ReplaySubject 如同于 BehaviorSubject 是 Subject 的子类。通过 ReplaySubject 可以向新的订阅者推送旧数值,就像一个录像机 ReplaySubject 可以记录Observable的一部分状态(过去时间内推送的值);.一个 ReplaySubject 可以记录Observable执行过程中推送的多个值,并向新的订阅者回放它们。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var subject = new Rx.AsyncSubject();

subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();

AsyncSubject是Subject的另外一个衍生类,Observable仅会在执行完成后(complete),推送执行环境中的最后一个值。业务上很少用

既然Subject是一个Observer,你可以把它作为subscribe(订阅)普通Observable时的参数

1
2
3
4
5
6
7
8
9
10
11
12
var subject = new Rx.Subject();

subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); // 你可以传递Subject来订阅observable

通过添加两个Observer到Observer列表中,之后Observable直接订阅Observer列表将普通的单路推送转换为多路推送

Cold & HOT

observable is default cold; cold: 表示只有 subscribe 出现 observer 才会被激活; 当有多个subscribe时,每一个都是一条独立的链;hot: 每个subscirbe共享一个链,不管什么时间插入subscribe,都不会重新开始。如何把一个cold 变成 hot?Subject则可以充当中介。multicast、refCount、publish、share则是通过Subject完成将cold转变为hot的方法。

1
2
3
4
5
6
7
8
9
10
11
let sub = new Subject();
let obs = sub.map(v => {
console.log("ajax call");
});
obs.subscribe(v => console.log("subscribe 1"));
obs.subscribe(v => console.log("subscribe 2"));
sub.next("value");
// ajax call
// subscribe 1
// ajax call
// subscribe 2

模拟异步请求数据的业务场景,如果有更多的subscribe的时候,则会对请求服务器多次,造成服务器负载严重,此时一般解决方法为以下两种

1
2
3
let obs = sub.map(v => {
console.log("ajax call");
}).share();
1
2
3
let obs = sub.map(v => {
console.log("ajax call");
}).publish().refCount();

引入multicast(组播)的概念,通过中介者订阅源序列在由它推送出去,下面是它的运作方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
var source = Rx.Observable.interval(1000).take(3);

var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}

var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}

var subject = {
observers: [],
subscribe: function(observer) { //addObserver
this.observers.push(observer)
},
next: function(value) {
this.observers.forEach(o => o.next(value))
},
error: function(error){
this.observers.forEach(o => o.error(error))
},
complete: function() {
this.observers.forEach(o => o.complete())
}
}

subject.subscribe(observerA)

source.subscribe(subject);

setTimeout(() => {
subject.subscribe(observerB);
}, 1000);

换一种形式,用multicast方法来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
var source = Rx.Observable.interval(1000)
.take(3)
.multicast(new Rx.Subject());

var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}

var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}

source.subscribe(observerA); // subject.subscribe(observerA)

source.connect(); // source.subscribe(subject) //开始推送

setTimeout(() => {
source.subscribe(observerB); // subject.subscribe(observerA)
}, 1000);
1
2
3
4
5
6
7
var result = Observable.interval(1000).take(6)  //执行两次
.map(x => Math.random())
// .share() //不会因为订阅者数量而执行多次
// .publish().refCount()

var subA = result.subscribe(x => console.log('A: ' + x));
var subB = result.subscribe(x => console.log('B: ' + x));

常用应用场景

1
2
3
4
5
6
7
let sub = new Subject();
let obs = sub.map(v => {
console.log("ajax call"); //请求接口
});
obs.subscribe(v => console.log("subscribe 1")); //分发
obs.subscribe(v => console.log("subscribe 2"));
sub.next("value");

ajax会打印两次,增加服务器端负载; 调用share()方法;其中angular2中的http也是not share的,在类似场景中同样的问题;

建立一個 subject 先拿去訂閱 observable(source),再把我們真正的 observer 加到 subject 中,這樣一來就能完成訂閱,而每個加到 subject 中的 observer 都能整組的接收到相同的元素。

Observable.multicast(new Rx.Subject()) == Observable.publish();对于Subject三种衍生形式,publishReplay(1)、publishBehavior(0)、publishLast()

另外 Observable.publish().refCount() == Observable.share()

总结Subject!

  • 既是Observable又是Observer
  • 对内部的observers进行组播
  • observer default is cold and not share.(cold 表示只有 subscribe 出现 observer 才会被激活. not share 表示每一个 subscribe 都会激活 observer 链)

业务场景:窗口a接收到A,b接收到B,c接受到C,本窗口d则需要异步的捕获a和b窗口的值并乘c窗口的值,d = (a+b)*c;

我们可以把每个数据的变更定义成流,然后定义出这些流的组合关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const A = new Rx.Subject()
const B = new Rx.Subject()
const C = new Rx.Subject()

const D = Rx.Observable
.combineLatest(A, B, C)
.map(data => {
let [a, b, c] = data
return (a + b) * c
})

D.subscribe(result => console.log(result))

setTimeout(() => A.next(2), 3000)
setTimeout(() => B.next(3), 5000)
setTimeout(() => C.next(5), 2000)

setTimeout(() => C.next(11), 10000)

为了简单,我们用定时器来模拟异步消息。实际业务中,对每个Subject的赋值是可以跟AJAX或者WebSocket结合起来,而且对D的那段实现毫无影响。我们可以看到,在整个这个过程中,最大的便利性在于,一旦定义完整个规则,变动整个表达式树上任意一个点,整个过程都会重跑一遍,以确保最终得到正确结果。无论中间环节上哪个东西变了,它只要更新自己就可以了,别人怎么用它的,不必关心。而且,我们从D的角度看,他只关心自己的数据来源是如何组织的,这些来源最终形成了一棵树,从各叶子汇聚到树根,也就是我们的订阅者这里,树上每个节点变更,都会自动触发从它往下到树根的所有数据变动,这个过程是最精确的,不会触发无效的数据更新。