ReactiveX 响应式编程库,这是一个程序库,通过使用可观察的事件序列来构成异步和事件驱动的程序。
其简化了异步多线程编程,在以前多线程编程的世界中,锁,可重入锁,同步队列器,信号量,并发同步器,同步计数器,
并行框架等都是具有一定的使用门槛,稍有不慎或者使用不成熟或对其源码理解不深入都会造成相应的程序错误和程序性能的低下。
观察者模型
24种设计模式的一种,观察者Observer和主题Subject之间建立组合关系:Subject类实例中包含观察者Observer的引用,
增加引用的目的就是为了通知notify,重要点就是要在Subject的notify功能中调用Observer的接受处理函数receiveAndHandle
个人理解:观察者模型其实是一种异步回调通知,将数据的处理者先注册到数据的输入者那边,这样通过数据输入者执行某个函数
去调用数据处理者的某个处理方法。
RxJava2
Rx有很多语言的实现库,目前比较出名的就是RxJava2。这里主讲Rxjava2的部门源码解读,内部设计机制和内部执行的线程模型
基本使用
使用rxjava2大致分为四个操作:
- 建立数据发布者
- 添加数据变换函数
- 设置数据发布线程池机制,订阅线程池机制
- 添加数据订阅者
1
2
3
4
5
6
7
8
9
10// 创建flowable
Flowable<Map<String, Map<String,Object>>> esFlowable = Flowable.create(new ElasticSearchAdapter(), BackpressureStrategy.BUFFER);
Disposable disposeable = esFlowable
// map操作 1.采集、2.清洗
.map(DataProcess::dataProcess)
.subscribeOn(Schedulers.single())
//计算任务调度器
.observeOn(Schedulers.computation())
// 订阅者 consumer 执行运算
.subscribe(keyMaps -> new PredictEntranceForkJoin().predictLogic(keyMaps));
以上就是一个实际的例子,里面的ElasticSearchAdapter实际隐藏了一个用户自定义实现数据生产的subscribe接口:FlowableOnSubscribe
用户需要实现这个接口函数:void subscribe(@NonNull FlowableEmitter
emitter 英文翻译发射器,很形象,数据就是由它产生的,也是业务系统需要对接的地方,一般业务代码实现这个接口类然后发射出需要处理的原始数据。
map函数作为数据变换处理的功能函数将原来的数据输入变换为另外的数据集合,然后设置发布的线程池机制subscribeOn(Schedulers.single()),订阅的线程池
机制observeOn(Schedulers.computation()),最后添加数据订阅函数,也就是业务系统需要实现另外一个地方,从而实现数据的自定义处理消费。
rxjava2支持的lambda语法
- 创建操作符:just fromArray empty error never fromIterable timer interval intervalRange range/rangeLong defer
- 变换操作符:map flatMap flatmapIterable concatMap switchmap cast scan buffer toList groupBy toMap
- 过滤操作符:filter take takeLast firstElement/lastElement first/last firstOrError/lastOrError elementAt/elementAtOrError ofType skip/skipLast
ignoreElements distinct/distinctUntilChanged timeout throttleFirst throttleLast/sample throttleWithTimeout/debounce - 合并聚合操作符:startWith/startWithArray concat/concatArray merge/mergeArray concatDelayError/mergeDelayError zip combineLatest combineLatestDelayError
reduce count collect - 条件操作符:all ambArray contains any isEmpty defaultIfEmpty switchIfEmpty sequenceEqual takeUntil takeWhile skipUntil skipWhile
有一篇博客详细介绍了rxjava的各种操作符,链接https://maxwell-nc.github.io/android/rxjava2-1.html
rxjava2 源码解析
阅读源码个人比较喜欢带着疑惑去看,这样与目标有方向。接下来的分析以Flowable为例,这里所有的例子都是按照Flowable为例,因为Flowable在实际
项目中比Observable可能用的多,因为实际场景中数据生产速度和数据消费速度都会有一定的不一致甚至数据生产速度远大于数据消费速度。
数据发布和订阅
首先从数据订阅者开始,点进源码看进一步解析,里面有很多subscribe重载接口:
1 | public final Disposable subscribe(Consumer<? super T> onNext) { |
操作符与线程池机制原理剖析
首先在进行源码分析之前讲述一下一种模式:装饰者模式 24种模式中的一种,在java io源码包中广泛应用
简单的来说是与被装饰者具有相同接口父类同时又对被装饰者进行一层封装(持有被装饰者的引用),以此用来加上自身的特性。
回归主题,当我们使用操作符和线程池机制的时候做法都是在数据发布者后面进行相应的函数操作:
1 | Disposable disposeable = scheduleObservable |
那么为何这么做,接下来我们进行源码分析:
- subscribeOn map 方法都在Flowable类中:这里是实例方法调用,传进了this对象这个很关键,这里其实就是我们前面提到的装修者模式,持有上游对象也就是数据源source的引用
1
2
3
4
5
6
7
8public final <R> Flowable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new FlowableMap<T, R>(this, mapper));
}
public final Flowable<T> subscribeOn(boolean requestOn) { Scheduler scheduler,
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, requestOn));
}
以FlowableSubscribeOn为例进行分析,这个类经常会用到,因为其内部设置了线程池的机制所以在实际使用项目中会大量使用,那么是如何
做到线程池方式的呢?进一步利用源码进行分析: - 装饰者的内部代码分析
以subscribeOn 为例1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43//很明显 实现的抽象类其实是装修者抽象类
public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T>
// 这个在前面我们重点分析过这是实际订阅执行的类方法,其实也就是我们说的装饰方法,里面实现了每个类自己的特定“装修”方法
public void subscribeActual(final Subscriber<? super T> s) {
// 获取订阅者,下一篇文章会重点讲述rxjava的线程池分配机制
Scheduler.Worker w = scheduler.createWorker();
final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);
// 跟前面一样调用数据订阅者的onSubscribe方法
s.onSubscribe(sos);
// 由分配的调度者进行订阅任务的执行
w.schedule(sos);
}
// 开始分析SubscribeOnSubscriber这个静态内部类的内部代码
// 实现了Runable用来异步执行
static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
implements FlowableSubscriber<T>, Subscription, Runnable
// 下游订阅引用
final Subscriber<? super T> downstream;
// 上游发射类引用
final AtomicReference<Subscription> upstream;
// 上游数据源引用 跟上游引用有区别,简单的说每个上游数据源引用有自己的上游发射类
Publisher<T> source;
// 这里是装饰的核心代码
public void run() {
lazySet(Thread.currentThread());
// source即为上游,表示其所装饰的源
Publisher<T> src = source;
source = null;
// 调用上游的自身的subscribe方法,在上面一开始我们说这个方法内部会去调用自身实现的subscribeActual方法
// 从而实现上游自己的特定方法,比如假设source是FlowCreate那么此处就会调用前面一开始我们所讲到的数据的发射
src.subscribe(this);
}
// 既然已经保证了数据的发射那么数据的处理是不是也要处理
// 很明显这是调用了下游订阅者的onNext方法
public void onNext(T t) {
downstream.onNext(t);
}本文总结
笔者喜欢总结,总结意味着我们反思和学习前面的知识点,应用点以及自身的不足。
- 设计模式:观察者模式和装修者模式
- 并发处理技巧:回压策略(其实本质是缓存)的实现原理以及细节点