JasonChen Blog

rxjava2 源码解析(二)

前一篇文章我们讲述到rxjava2 的内部设计模式与原理机制,包括观察者模式和装饰者模式,其本质上都是rxjava2的事件驱动,那么本篇文章将会讲到rxjava2的另外一个功能:异步功能

rxjava2 源码解析

依旧是阅读源码,还是从源码开始带着疑惑去读,前一篇文章我们讲到subcribeOn方法内部的实现涉及线程池:Scheduler.Worker w = scheduler.createWorker() 这边涉及两个重要组件:

  1. scheduler调度器
  2. 自定义线程池
    scheduler调度器源码解析
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public final class Schedulers {
    @NonNull
    // 针对单一任务设置的单个定时线程池
    static final Scheduler SINGLE;
    @NonNull
    // 针对计算任务设置的定时线程池的资源池(数组)
    static final Scheduler COMPUTATION;
    @NonNull
    // 针对IO任务设置的单个可复用的定时线程池
    static final Scheduler IO;
    @NonNull
    // trampoline翻译是蹦床(佩服作者的脑洞)
    这个调度器的源码注释是:任务在当前线程工作(不是线程池)但是不会立即执行,任务会被放入队列并在当前
    的任务完成之后执行。简单点说其实就是入队然后慢慢线性执行(这里巧妙的方法其实和前面我们所讲的回压实现机制基本是一致的,值得借鉴)
    static final Scheduler TRAMPOLINE;
    @NonNull
    // 单个的周期线程池和single基本一致唯一不同的是single对thread进行了一个简单的NonBlocking封装,这个封装从源码来看基本没有作用,只是一个marker interface标志接口
    static final Scheduler NEW_THREAD;

一共五种调度器,分别对应不同的场景,当然企业可以针对自身的场景设置自己的调度器

computation调度器源码分析

computation调度器针对大量计算场景,在后端并发场景会更多的用到,那么其是如何实现的呢?接下来带着疑惑进行源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public final class ComputationScheduler extends Scheduler implements SchedulerMultiWorkerSupport {
// 资源池
final AtomicReference<FixedSchedulerPool> pool;
// 这是computationScheduler类中实现的createWork()方法
public Worker createWorker() {
// 创建EventLoop工作者,入参是一个PoolWorker
return new EventLoopWorker(pool.get().getEventLoop());
}
static final class FixedSchedulerPool implements SchedulerMultiWorkerSupport {
final int cores;
// 资源池工作者,每个工作者其实都是一个定时线程池
final PoolWorker[] eventLoops;
long n;
// 对应前面的函数调用
public PoolWorker getEventLoop() {
int c = cores;
if (c == 0) {
return SHUTDOWN_WORKER;
}
// simple round robin, improvements to come
// 这里其实就是从工作者数组中轮询选出一个工作者
这里其实拥有提升和优化的空间,这里笔者可能会向开源社区提交一个pr
以此进行比较好的调度器调度
return eventLoops[(int)(n++ % c)];
}
// 此处是一个简单的封装
static final class PoolWorker extends NewThreadWorker {
PoolWorker(ThreadFactory threadFactory) {
super(threadFactory);
}
}
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
// 进行定时线程池的初始化
executor = SchedulerPoolFactory.create(threadFactory);
}
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec =
// 初始化一个定时线程池
Executors.newScheduledThreadPool(1, factory);
tryPutIntoPool(PURGE_ENABLED, exec);
return exec;
}

上述代码清晰的展示了computation调度器的实现细节,这里需要说明的是定时线程池的core设置为1,线程池的个数最多为cpu数量,这里涉及到ScheduledThreadPoolExecutor定时线程池的原理,简单的说起内部是一个可自动增长的数组(队列)类似于ArrayList,也就是说队列永远不会满,线程池中的线程数不会增加。
接下来结合订阅线程和发布线程分析其之间如何进行沟通的本质
发布线程在上一篇的文章已经提到,内部是一个worker,那么订阅线程也是么,很显然必须是的,接下来我们来看下源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
// 还是从subscribeActul开始(原因见上一篇文章)
public void subscribeActual(Subscriber<? super T> s) {
Worker worker = scheduler.createWorker();
if (s instanceof ConditionalSubscriber) {
source.subscribe(new ObserveOnConditionalSubscriber<T>(
(ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
} else {
// 其内部封装了一个ObserveOnsubcriber,这是个对下流订阅者的封装,主要什么作用呢,为什么要这个呢,
// 其实这个涉及订阅线程内部的机制,接着看源代码了解其内部机制
source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch));
}
}
// 基类
abstract static class BaseObserveOnSubscriber<T>
extends BasicIntQueueSubscription<T>
implements FlowableSubscriber<T>, Runnable {
private static final long serialVersionUID = -8241002408341274697L;
final Worker worker;
final boolean delayError;
final int prefetch;
final int limit;
final AtomicLong requested;
Subscription upstream;
SimpleQueue<T> queue;
volatile boolean cancelled;
volatile boolean done;
Throwable error;
int sourceMode;
long produced;
boolean outputFused;
BaseObserveOnSubscriber(
Worker worker,
boolean delayError,
int prefetch) {
this.worker = worker;
this.delayError = delayError;
this.prefetch = prefetch;
this.requested = new AtomicLong();
this.limit = prefetch - (prefetch >> 2);
}
@Override
public final void onNext(T t) {
if (done) {
return;
}
if (sourceMode == ASYNC) {
trySchedule();
return;
}
/**
* 当上游的装修者(上一篇提到的装修者模式)调用onNext方 * 法时,这时并没有类似的去调用下游的onNext方法,那这个
* 时候其实就是订阅者线程模式的核心原理:采用queue队列
* 进行数据的store,这里尝试将数据放进队列
*/
if (!queue.offer(t)) {
upstream.cancel();
error = new MissingBackpressureException("Queue is full?!");
done = true;
}
// 开启订阅者线程池模式的调度,具体实现在子类中实现
trySchedule();
}
@Override
public final void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
trySchedule();
}
@Override
public final void onComplete() {
if (!done) {
done = true;
trySchedule();
}
}
// 这里并没有向上传递request请求,而是把自己当做数据发射者进行request计数
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
// 开启调度
trySchedule();
}
}
@Override
public final void cancel() {
if (cancelled) {
return;
}
cancelled = true;
upstream.cancel();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
}
}
// 调度代码
final void trySchedule() {
// 上一篇文章讲过这个的用法
if (getAndIncrement() != 0) {
return;
}
// 启用一个work来进行任务的执行 this对象说明实现了runable接口
worker.schedule(this);
}
// 调度实现的代码
@Override
public final void run() {
if (outputFused) {
runBackfused();
} else if (sourceMode == SYNC) {
runSync();
} else {
// 一般会调用runAsync方法
runAsync();
}
}
abstract void runBackfused();
abstract void runSync();
abstract void runAsync();
final boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a) {
if (cancelled) {
clear();
return true;
}
if (d) {
if (delayError) {
if (empty) {
cancelled = true;
Throwable e = error;
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
Throwable e = error;
if (e != null) {
cancelled = true;
clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
cancelled = true;
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
@Override
public final int requestFusion(int requestedMode) {
if ((requestedMode & ASYNC) != 0) {
outputFused = true;
return ASYNC;
}
return NONE;
}
@Override
public final void clear() {
queue.clear();
}
@Override
public final boolean isEmpty() {
return queue.isEmpty();
}
}
// 具体实现类
static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>
implements FlowableSubscriber<T> {
private static final long serialVersionUID = -4547113800637756442L;
final Subscriber<? super T> downstream;
ObserveOnSubscriber(
Subscriber<? super T> actual,
Worker worker,
boolean delayError,
int prefetch) {
super(worker, delayError, prefetch);
this.downstream = actual;
}
//这是上游回调这个subscriber时调用的方法,详情见上一篇文章
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.upstream, s)) {
this.upstream = s;
if (s instanceof QueueSubscription) {
@SuppressWarnings("unchecked")
QueueSubscription<T> f = (QueueSubscription<T>) s;
int m = f.requestFusion(ANY | BOUNDARY);
if (m == SYNC) {
sourceMode = SYNC;
queue = f;
done = true;
downstream.onSubscribe(this);
return;
} else
if (m == ASYNC) {
sourceMode = ASYNC;
queue = f;
downstream.onSubscribe(this);
s.request(prefetch);
return;
}
}
// 设置缓存队列
// 这里涉及一个特别之处就是预获取(提前获取数据)
queue = new SpscArrayQueue<T>(prefetch);
// 触发下游subscriber 如果有request则会触发下游对上游数据的request
downstream.onSubscribe(this);
// 请求上游数据 上面的代码和这行代码就是起到承上启下的一个作用,也就是预获取,放在队列中
s.request(prefetch);
}
}
@Override
void runSync() {
int missed = 1;
final Subscriber<? super T> a = downstream;
final SimpleQueue<T> q = queue;
long e = produced;
for (;;) {
long r = requested.get();
while (e != r) {
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancelled = true;
upstream.cancel();
a.onError(ex);
worker.dispose();
return;
}
if (cancelled) {
return;
}
if (v == null) {
cancelled = true;
a.onComplete();
worker.dispose();
return;
}
a.onNext(v);
e++;
}
if (cancelled) {
return;
}
if (q.isEmpty()) {
cancelled = true;
a.onComplete();
worker.dispose();
return;
}
int w = get();
if (missed == w) {
produced = e;
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
} else {
missed = w;
}
}
}
@Override
void runAsync() {
int missed = 1;
final Subscriber<? super T> a = downstream;
final SimpleQueue<T> q = queue;
long e = produced;
for (;;) {
long r = requested.get();
while (e != r) {
boolean d = done;
T v;
try {
// 获取数据
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancelled = true;
upstream.cancel();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
e++;
// limit = prefetch - (prefetch >> 2)
// prefetch = BUFFER_SIZE(上一篇文章提到的默认128)
// 前面说过,订阅者把自己当成一个发射者,那数/据从哪里来呢,而且还要持续有数据,那么下面代码说明了数据来源,当数据达到limit,开始新的数据的prefetch,每次preftch的数量是limit
if (e == limit) {
if (r != Long.MAX_VALUE) {
r = requested.addAndGet(-e);
}
upstream.request(e);
e = 0L;
}
}
if (e == r && checkTerminated(done, q.isEmpty(), a)) {
return;
}
// 下面的代码机制在上一篇讲过主要涉及异步编程技巧
int w = get();
if (missed == w) {
produced = e;
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
} else {
missed = w;
}
}
}
@Override
void runBackfused() {
int missed = 1;
for (;;) {
if (cancelled) {
return;
}
boolean d = done;
downstream.onNext(null);
if (d) {
cancelled = true;
Throwable e = error;
if (e != null) {
downstream.onError(e);
} else {
downstream.onComplete();
}
worker.dispose();
return;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
@Nullable
@Override
public T poll() throws Exception {
T v = queue.poll();
if (v != null && sourceMode != SYNC) {
long p = produced + 1;
if (p == limit) {
produced = 0;
upstream.request(p);
} else {
produced = p;
}
}
return v;
}
}

为何要将订阅者这样区别设置呢,其实原因很简单,订阅者和发布者需要不同的线程机制异步地执行,比如订阅者需要computation的线程机制来进行大量的耗时数据计算,但又要保持一致的装修者模式,所以源码的做法是订阅者这边打破回调的调用流,采用数据队列进行两个线程池之间的数据传送

本文总结

笔者喜欢总结,总结意味着我们反思和学习前面的知识点,应用点以及自身的不足。

  1. rxjava2线程调度的原理机制,不同场景下线程机制需要进行定制
  2. rxjava2生产和消费的异步原理和实现方式