Goober Blog


  • 首页

  • 归档

  • 标签

  • 分类

  • 关于
Goober Blog

Go-micro 框架解析

发表于 2020-08-01 | 分类于 Golang

Go-micro golang版本的微服务框架,其构建了go语言微服务框架的经典设计架构。其抽象出的结构定义轻量,灵活,方便企业做二次开发。

阅读全文 »
Goober Blog

限流器

发表于 2020-05-01 | 分类于 Golang

限流器 顾名思义用来对高并发的请求进行流量限制的组件。
为啥需要进行流量限制。首先后端服务由于各个业务的不同和复杂性,各自在容器部署的时候都可能会有单台的瓶颈,超过瓶颈会导致内存或者cpu的瓶颈,进而导致发生服务不可用或者单台容器直接挂掉或重启。
流量的限制在众多微服务和service mesh系统中多有应用。笔者在本文的程序示例均以golang作为示例。

阅读全文 »
Goober Blog

Promethues Golang 客户端 源码解析

发表于 2019-10-04 | 分类于 Golang

Promethues Golang 客户端 源码解析

监控利器 promethues,线上产品必备的监控组件。笔者此处不做过多的介绍,开始client端源码旅程。
源码一共分为三个部分

  • Register
  • Collector
  • Push Gateway client
    阅读全文 »
Goober Blog

TCP协议详解

发表于 2019-03-01 | 分类于 TCP

TCP协议详解
众所周知,TCP协议是一个可靠的的协议,也是一个流量控制协议

阅读全文 »
Goober Blog

Netty源码解析-Epoll篇

发表于 2019-02-01 | 分类于 Netty

Epoll是Linux系统中高性能IO的底层机制,由操作系统底层内核提供,Linux版本在2.6之后的版本才会有此功能及api暴露出来,应用层一般采用c或c++去编写。
本文主要讲述Netty源码中Epoll的实现原理和源码设计思想。

阅读全文 »
Goober Blog

Netty源码解析-ByteBuf篇

发表于 2019-02-01 | 分类于 Netty

Netty是如今互联网最流行的java通讯库,在众多优秀组件 dubbo,grpc-java,jetty,RocketMQ中都使用了netty作为其通讯组件。
在netty的使用中,最常见的就是ByteBuf类,这个类最常见是由于其是字节数据的容器,白话一点就是存放字节数据的数组。
因为所有的网络通信最终都是基于底层的字节流传输,所以需要一个方便、易用的数据接口进行字节流的基本操作,正好ByteBuf提供了
基础的操作。

阅读全文 »
Goober Blog

rxjava2 源码解析(二)

发表于 2019-01-06 | 分类于 rxjava2

前一篇文章我们讲述到rxjava2 的内部设计模式与原理机制,包括观察者模式和装饰者模式,其本质上都是rxjava2的事件驱动,那么本篇文章将会讲到rxjava2的另外一个功能:异步功能

rxjava2 源码解析

依旧是阅读源码,还是从源码开始带着疑惑去读,前一篇文章我们讲到subcribeOn方法内部的实现涉及线程池:Scheduler.Worker w = scheduler.createWorker() 这边涉及两个重要组件:

  1. scheduler调度器
  2. 自定义线程池
    scheduler调度器源码解析
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public final class Schedulers {
    @NonNull
    // 针对单一任务设置的单个定时线程池
    static final Scheduler SINGLE;
    @NonNull
    // 针对计算任务设置的定时线程池的资源池(数组)
    static final Scheduler COMPUTATION;
    @NonNull
    // 针对IO任务设置的单个可复用的定时线程池
    static final Scheduler IO;
    @NonNull
    // trampoline翻译是蹦床(佩服作者的脑洞)
    这个调度器的源码注释是:任务在当前线程工作(不是线程池)但是不会立即执行,任务会被放入队列并在当前
    的任务完成之后执行。简单点说其实就是入队然后慢慢线性执行(这里巧妙的方法其实和前面我们所讲的回压实现机制基本是一致的,值得借鉴)
    static final Scheduler TRAMPOLINE;
    @NonNull
    // 单个的周期线程池和single基本一致唯一不同的是single对thread进行了一个简单的NonBlocking封装,这个封装从源码来看基本没有作用,只是一个marker interface标志接口
    static final Scheduler NEW_THREAD;

一共五种调度器,分别对应不同的场景,当然企业可以针对自身的场景设置自己的调度器

computation调度器源码分析

computation调度器针对大量计算场景,在后端并发场景会更多的用到,那么其是如何实现的呢?接下来带着疑惑进行源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public final class ComputationScheduler extends Scheduler implements SchedulerMultiWorkerSupport {
// 资源池
final AtomicReference<FixedSchedulerPool> pool;
// 这是computationScheduler类中实现的createWork()方法
public Worker createWorker() {
// 创建EventLoop工作者,入参是一个PoolWorker
return new EventLoopWorker(pool.get().getEventLoop());
}
static final class FixedSchedulerPool implements SchedulerMultiWorkerSupport {
final int cores;
// 资源池工作者,每个工作者其实都是一个定时线程池
final PoolWorker[] eventLoops;
long n;
// 对应前面的函数调用
public PoolWorker getEventLoop() {
int c = cores;
if (c == 0) {
return SHUTDOWN_WORKER;
}
// simple round robin, improvements to come
// 这里其实就是从工作者数组中轮询选出一个工作者
这里其实拥有提升和优化的空间,这里笔者可能会向开源社区提交一个pr
以此进行比较好的调度器调度
return eventLoops[(int)(n++ % c)];
}
// 此处是一个简单的封装
static final class PoolWorker extends NewThreadWorker {
PoolWorker(ThreadFactory threadFactory) {
super(threadFactory);
}
}
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
// 进行定时线程池的初始化
executor = SchedulerPoolFactory.create(threadFactory);
}
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec =
// 初始化一个定时线程池
Executors.newScheduledThreadPool(1, factory);
tryPutIntoPool(PURGE_ENABLED, exec);
return exec;
}

上述代码清晰的展示了computation调度器的实现细节,这里需要说明的是定时线程池的core设置为1,线程池的个数最多为cpu数量,这里涉及到ScheduledThreadPoolExecutor定时线程池的原理,简单的说起内部是一个可自动增长的数组(队列)类似于ArrayList,也就是说队列永远不会满,线程池中的线程数不会增加。
接下来结合订阅线程和发布线程分析其之间如何进行沟通的本质
发布线程在上一篇的文章已经提到,内部是一个worker,那么订阅线程也是么,很显然必须是的,接下来我们来看下源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
// 还是从subscribeActul开始(原因见上一篇文章)
public void subscribeActual(Subscriber<? super T> s) {
Worker worker = scheduler.createWorker();
if (s instanceof ConditionalSubscriber) {
source.subscribe(new ObserveOnConditionalSubscriber<T>(
(ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
} else {
// 其内部封装了一个ObserveOnsubcriber,这是个对下流订阅者的封装,主要什么作用呢,为什么要这个呢,
// 其实这个涉及订阅线程内部的机制,接着看源代码了解其内部机制
source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch));
}
}
// 基类
abstract static class BaseObserveOnSubscriber<T>
extends BasicIntQueueSubscription<T>
implements FlowableSubscriber<T>, Runnable {
private static final long serialVersionUID = -8241002408341274697L;
final Worker worker;
final boolean delayError;
final int prefetch;
final int limit;
final AtomicLong requested;
Subscription upstream;
SimpleQueue<T> queue;
volatile boolean cancelled;
volatile boolean done;
Throwable error;
int sourceMode;
long produced;
boolean outputFused;
BaseObserveOnSubscriber(
Worker worker,
boolean delayError,
int prefetch) {
this.worker = worker;
this.delayError = delayError;
this.prefetch = prefetch;
this.requested = new AtomicLong();
this.limit = prefetch - (prefetch >> 2);
}
@Override
public final void onNext(T t) {
if (done) {
return;
}
if (sourceMode == ASYNC) {
trySchedule();
return;
}
/**
* 当上游的装修者(上一篇提到的装修者模式)调用onNext方 * 法时,这时并没有类似的去调用下游的onNext方法,那这个
* 时候其实就是订阅者线程模式的核心原理:采用queue队列
* 进行数据的store,这里尝试将数据放进队列
*/
if (!queue.offer(t)) {
upstream.cancel();
error = new MissingBackpressureException("Queue is full?!");
done = true;
}
// 开启订阅者线程池模式的调度,具体实现在子类中实现
trySchedule();
}
@Override
public final void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
trySchedule();
}
@Override
public final void onComplete() {
if (!done) {
done = true;
trySchedule();
}
}
// 这里并没有向上传递request请求,而是把自己当做数据发射者进行request计数
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
// 开启调度
trySchedule();
}
}
@Override
public final void cancel() {
if (cancelled) {
return;
}
cancelled = true;
upstream.cancel();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
}
}
// 调度代码
final void trySchedule() {
// 上一篇文章讲过这个的用法
if (getAndIncrement() != 0) {
return;
}
// 启用一个work来进行任务的执行 this对象说明实现了runable接口
worker.schedule(this);
}
// 调度实现的代码
@Override
public final void run() {
if (outputFused) {
runBackfused();
} else if (sourceMode == SYNC) {
runSync();
} else {
// 一般会调用runAsync方法
runAsync();
}
}
abstract void runBackfused();
abstract void runSync();
abstract void runAsync();
final boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a) {
if (cancelled) {
clear();
return true;
}
if (d) {
if (delayError) {
if (empty) {
cancelled = true;
Throwable e = error;
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
Throwable e = error;
if (e != null) {
cancelled = true;
clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
cancelled = true;
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
@Override
public final int requestFusion(int requestedMode) {
if ((requestedMode & ASYNC) != 0) {
outputFused = true;
return ASYNC;
}
return NONE;
}
@Override
public final void clear() {
queue.clear();
}
@Override
public final boolean isEmpty() {
return queue.isEmpty();
}
}
// 具体实现类
static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>
implements FlowableSubscriber<T> {
private static final long serialVersionUID = -4547113800637756442L;
final Subscriber<? super T> downstream;
ObserveOnSubscriber(
Subscriber<? super T> actual,
Worker worker,
boolean delayError,
int prefetch) {
super(worker, delayError, prefetch);
this.downstream = actual;
}
//这是上游回调这个subscriber时调用的方法,详情见上一篇文章
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.upstream, s)) {
this.upstream = s;
if (s instanceof QueueSubscription) {
@SuppressWarnings("unchecked")
QueueSubscription<T> f = (QueueSubscription<T>) s;
int m = f.requestFusion(ANY | BOUNDARY);
if (m == SYNC) {
sourceMode = SYNC;
queue = f;
done = true;
downstream.onSubscribe(this);
return;
} else
if (m == ASYNC) {
sourceMode = ASYNC;
queue = f;
downstream.onSubscribe(this);
s.request(prefetch);
return;
}
}
// 设置缓存队列
// 这里涉及一个特别之处就是预获取(提前获取数据)
queue = new SpscArrayQueue<T>(prefetch);
// 触发下游subscriber 如果有request则会触发下游对上游数据的request
downstream.onSubscribe(this);
// 请求上游数据 上面的代码和这行代码就是起到承上启下的一个作用,也就是预获取,放在队列中
s.request(prefetch);
}
}
@Override
void runSync() {
int missed = 1;
final Subscriber<? super T> a = downstream;
final SimpleQueue<T> q = queue;
long e = produced;
for (;;) {
long r = requested.get();
while (e != r) {
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancelled = true;
upstream.cancel();
a.onError(ex);
worker.dispose();
return;
}
if (cancelled) {
return;
}
if (v == null) {
cancelled = true;
a.onComplete();
worker.dispose();
return;
}
a.onNext(v);
e++;
}
if (cancelled) {
return;
}
if (q.isEmpty()) {
cancelled = true;
a.onComplete();
worker.dispose();
return;
}
int w = get();
if (missed == w) {
produced = e;
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
} else {
missed = w;
}
}
}
@Override
void runAsync() {
int missed = 1;
final Subscriber<? super T> a = downstream;
final SimpleQueue<T> q = queue;
long e = produced;
for (;;) {
long r = requested.get();
while (e != r) {
boolean d = done;
T v;
try {
// 获取数据
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancelled = true;
upstream.cancel();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
e++;
// limit = prefetch - (prefetch >> 2)
// prefetch = BUFFER_SIZE(上一篇文章提到的默认128)
// 前面说过,订阅者把自己当成一个发射者,那数/据从哪里来呢,而且还要持续有数据,那么下面代码说明了数据来源,当数据达到limit,开始新的数据的prefetch,每次preftch的数量是limit
if (e == limit) {
if (r != Long.MAX_VALUE) {
r = requested.addAndGet(-e);
}
upstream.request(e);
e = 0L;
}
}
if (e == r && checkTerminated(done, q.isEmpty(), a)) {
return;
}
// 下面的代码机制在上一篇讲过主要涉及异步编程技巧
int w = get();
if (missed == w) {
produced = e;
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
} else {
missed = w;
}
}
}
@Override
void runBackfused() {
int missed = 1;
for (;;) {
if (cancelled) {
return;
}
boolean d = done;
downstream.onNext(null);
if (d) {
cancelled = true;
Throwable e = error;
if (e != null) {
downstream.onError(e);
} else {
downstream.onComplete();
}
worker.dispose();
return;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
@Nullable
@Override
public T poll() throws Exception {
T v = queue.poll();
if (v != null && sourceMode != SYNC) {
long p = produced + 1;
if (p == limit) {
produced = 0;
upstream.request(p);
} else {
produced = p;
}
}
return v;
}
}

为何要将订阅者这样区别设置呢,其实原因很简单,订阅者和发布者需要不同的线程机制异步地执行,比如订阅者需要computation的线程机制来进行大量的耗时数据计算,但又要保持一致的装修者模式,所以源码的做法是订阅者这边打破回调的调用流,采用数据队列进行两个线程池之间的数据传送。

本文总结

笔者喜欢总结,总结意味着我们反思和学习前面的知识点,应用点以及自身的不足。

  1. rxjava2线程调度的原理机制,不同场景下线程机制需要进行定制
  2. rxjava2生产和消费的异步原理和实现方式
Goober Blog

rxjava2 源码解析(一)

发表于 2018-12-19 | 分类于 rxjava2

ReactiveX 响应式编程库,这是一个程序库,通过使用可观察的事件序列来构成异步和事件驱动的程序。
其简化了异步多线程编程,在以前多线程编程的世界中,锁,可重入锁,同步队列器,信号量,并发同步器,同步计数器,
并行框架等都是具有一定的使用门槛,稍有不慎或者使用不成熟或对其源码理解不深入都会造成相应的程序错误和程序性能的低下。

观察者模型

24种设计模式的一种,观察者Observer和主题Subject之间建立组合关系:Subject类实例中包含观察者Observer的引用,
增加引用的目的就是为了通知notify,重要点就是要在Subject的notify功能中调用Observer的接受处理函数receiveAndHandle
个人理解:观察者模型其实是一种异步回调通知,将数据的处理者先注册到数据的输入者那边,这样通过数据输入者执行某个函数
去调用数据处理者的某个处理方法。

RxJava2

Rx有很多语言的实现库,目前比较出名的就是RxJava2。这里主讲Rxjava2的部门源码解读,内部设计机制和内部执行的线程模型

基本使用

使用rxjava2大致分为四个操作:

  1. 建立数据发布者
  2. 添加数据变换函数
  3. 设置数据发布线程池机制,订阅线程池机制
  4. 添加数据订阅者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // 创建flowable
    Flowable<Map<String, Map<String,Object>>> esFlowable = Flowable.create(new ElasticSearchAdapter(), BackpressureStrategy.BUFFER);
    Disposable disposeable = esFlowable
    // map操作 1.采集、2.清洗
    .map(DataProcess::dataProcess)
    .subscribeOn(Schedulers.single())
    //计算任务调度器
    .observeOn(Schedulers.computation())
    // 订阅者 consumer 执行运算
    .subscribe(keyMaps -> new PredictEntranceForkJoin().predictLogic(keyMaps));

以上就是一个实际的例子,里面的ElasticSearchAdapter实际隐藏了一个用户自定义实现数据生产的subscribe接口:FlowableOnSubscribe source
用户需要实现这个接口函数:void subscribe(@NonNull FlowableEmitter emitter) throws Exception 这个接口主要用于内部回调,后面会有具体分析,
emitter 英文翻译发射器,很形象,数据就是由它产生的,也是业务系统需要对接的地方,一般业务代码实现这个接口类然后发射出需要处理的原始数据。
map函数作为数据变换处理的功能函数将原来的数据输入变换为另外的数据集合,然后设置发布的线程池机制subscribeOn(Schedulers.single()),订阅的线程池
机制observeOn(Schedulers.computation()),最后添加数据订阅函数,也就是业务系统需要实现另外一个地方,从而实现数据的自定义处理消费。

rxjava2支持的lambda语法

  • 创建操作符:just fromArray empty error never fromIterable timer interval intervalRange range/rangeLong defer
  • 变换操作符:map flatMap flatmapIterable concatMap switchmap cast scan buffer toList groupBy toMap
  • 过滤操作符:filter take takeLast firstElement/lastElement first/last firstOrError/lastOrError elementAt/elementAtOrError ofType skip/skipLast
    ignoreElements distinct/distinctUntilChanged timeout throttleFirst throttleLast/sample throttleWithTimeout/debounce
  • 合并聚合操作符:startWith/startWithArray concat/concatArray merge/mergeArray concatDelayError/mergeDelayError zip combineLatest combineLatestDelayError
    reduce count collect
  • 条件操作符:all ambArray contains any isEmpty defaultIfEmpty switchIfEmpty sequenceEqual takeUntil takeWhile skipUntil skipWhile

有一篇博客详细介绍了rxjava的各种操作符,链接https://maxwell-nc.github.io/android/rxjava2-1.html

rxjava2 源码解析

阅读源码个人比较喜欢带着疑惑去看,这样与目标有方向。接下来的分析以Flowable为例,这里所有的例子都是按照Flowable为例,因为Flowable在实际
项目中比Observable可能用的多,因为实际场景中数据生产速度和数据消费速度都会有一定的不一致甚至数据生产速度远大于数据消费速度。

数据发布和订阅

首先从数据订阅者开始,点进源码看进一步解析,里面有很多subscribe重载接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Subscription> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
//组装成FlowableSubscriber
LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);
//调用核心的订阅方法
subscribe(ls);
return ls;
}
public final void subscribe(FlowableSubscriber<? super T> s) {
ObjectHelper.requireNonNull(s, "s is null");
try {
//注册一些钩子这里对此不进行讲解,主要不是核心方法
Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
ObjectHelper.requireNonNull(z, "The RxJavaPlugins.onSubscribe hook returned a null FlowableSubscriber. Please check the handler provided to RxJavaPlugins.setOnFlowableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
//核心订阅方法,从名字也能读出是指订阅实际调用处
//不同的数据产生类也就是实现Flowable抽象类的类
//比如FlowableCreate,FlowSingle,FlowMap等等去实现自己的实际方法
subscribeActual(z);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Subscription has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
//选择FlowCreate的subscribeActual(Subscriber<? super T> t)方法进行剖析
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
//根据不同的回压模式选择不一样的数据发射类
//神奇的回压模式其实本质上就是一个个数据发射-消费模式
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
//回调注册的FlowableSubscriber的onSubscribe方法
//这里非常重要,因为这里涉及了rxjava特有的 request请求再消费数据的模式
//也就是说如果没有request数据,那么就不会调用数据发射(发布)者的onNext方法,
//那么数据订阅者也就不会消费到数据
t.onSubscribe(emitter);
try {
//回调注册的FlowableOnSubscribe<T> source的subscribe方法
//这个source其实就是在创建Flow流时注册的数据产生类,进一步验证了上文中
//提及的其需要实现FlowableOnSubscribe<T>接口
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
//重点分析BufferAsyncEmitter这个类,看字面意思这是一个switch的默认选择类,
//但其实它是回压策略为BUFFER时的数据发射类
//首先这个类的构造函数具有两个参数,很明显这是 actul就是前面的t这个变量,也就是
//注册的数据消费(订阅)者,capacityHint则是设置容量大小的,默认是128,如果需要扩大需要
//自行设置环境变量 rx2.buffer-size
BufferAsyncEmitter(Subscriber<? super T> actual, int capacityHint) {
super(actual);
this.queue = new SpscLinkedArrayQueue<T>(capacityHint);
this.wip = new AtomicInteger();
}
public void onNext(T t) {
if (done || isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
// queue 是存储元素的队列,也就是buffer的核心存储。
// 当我们开始向下游发送数据的时候首先存入队列,然后下面的drain则是进行核心的
queue.offer(t);
drain();
}
//核心的类
void drain() {
//关键的地方 解决生产速率和消费速率不一致的关键地方,也是我们写并发程序值得借鉴的地方。
//当数据的产生者(发布)频繁调用onNext方法时,这里产生并发调用关系,wip变量是atomic变量,
//当第一次执行drain函数时,为0继续执行后面的流程,当快速的继续调用onNext方法时,wip不为0然后返回
//那么后面的流程我们其实已经很大概率会猜测到应该是去取队列的数据然后做一些操作
if (wip.getAndIncrement() != 0) {
return;
}
int missed = 1;
//这里的downstream其实就是注册的数据订阅者,它是基类BaseEmitter的变量,前面初始化时调用了基类的构造函数
final Subscriber<? super T> a = downstream;
final SpscLinkedArrayQueue<T> q = queue;
for (;;) {
long r = get();
long e = 0L;
while (e != r) {
if (isCancelled()) {
q.clear();
return;
}
boolean d = done;
//取队列中的数据
T o = q.poll();
boolean empty = o == null;
if (d && empty) {
Throwable ex = error;
if (ex != null) {
error(ex);
} else {
complete();
}
return;
}
if (empty) {
break;
}
//此处回调订阅者的onNext方法去真正的执行数据实例程序
//到此数据从产生到消费其生命周期已经走完
a.onNext(o);
e++;
}
if (e == r) {
if (isCancelled()) {
q.clear();
return;
}
boolean d = done;
boolean empty = q.isEmpty();
if (d && empty) {
Throwable ex = error;
if (ex != null) {
error(ex);
} else {
complete();
}
return;
}
}
if (e != 0) {
//标记已经消费的个数
BackpressureHelper.produced(this, e);
}
//前面说过wip会原子性的增加,而且是每调用一次onNext增加一次
//missed从其名解释是指错过的意思,个人理解是错过消费的数据个数,错过消费
//的意思其实就是指没有进行a.onNext数据消费处理的数据
missed = wip.addAndGet(-missed);
if (missed == 0) {
//如果没有错过的数据也就是全部都消费完那就跳出for循环
//此处for循环方式和JUC源码中Doug Lea的做法都有类似之处
break;
}
}
}

操作符与线程池机制原理剖析

首先在进行源码分析之前讲述一下一种模式:装饰者模式 24种模式中的一种,在java io源码包中广泛应用
简单的来说是与被装饰者具有相同接口父类同时又对被装饰者进行一层封装(持有被装饰者的引用),以此用来加上自身的特性。
回归主题,当我们使用操作符和线程池机制的时候做法都是在数据发布者后面进行相应的函数操作:

1
2
3
4
Disposable disposeable = scheduleObservable
.map(aLong -> dataAdapter.handlerDpti())
.map(DataProcess::dataProcess)
.subscribeOn(Schedulers.single())

那么为何这么做,接下来我们进行源码分析:

  1. subscribeOn map 方法都在Flowable类中:
    1
    2
    3
    4
    5
    6
    7
    8
    public final <R> Flowable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new FlowableMap<T, R>(this, mapper));
    }
    public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, requestOn));
    }

这里是实例方法调用,传进了this对象这个很关键,这里其实就是我们前面提到的装修者模式,持有上游对象也就是数据源source的引用
以FlowableSubscribeOn为例进行分析,这个类经常会用到,因为其内部设置了线程池的机制所以在实际使用项目中会大量使用,那么是如何
做到线程池方式的呢?进一步利用源码进行分析:

  1. 装饰者的内部代码分析
    以subscribeOn 为例
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    //很明显 实现的抽象类其实是装修者抽象类
    public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T>
    // 这个在前面我们重点分析过这是实际订阅执行的类方法,其实也就是我们说的装饰方法,里面实现了每个类自己的特定“装修”方法
    @Override
    public void subscribeActual(final Subscriber<? super T> s) {
    // 获取订阅者,下一篇文章会重点讲述rxjava的线程池分配机制
    Scheduler.Worker w = scheduler.createWorker();
    final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);
    // 跟前面一样调用数据订阅者的onSubscribe方法
    s.onSubscribe(sos);
    // 由分配的调度者进行订阅任务的执行
    w.schedule(sos);
    }
    // 开始分析SubscribeOnSubscriber这个静态内部类的内部代码
    // 实现了Runable用来异步执行
    static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
    implements FlowableSubscriber<T>, Subscription, Runnable
    // 下游订阅引用
    final Subscriber<? super T> downstream;
    // 上游发射类引用
    final AtomicReference<Subscription> upstream;
    // 上游数据源引用 跟上游引用有区别,简单的说每个上游数据源引用有自己的上游发射类
    Publisher<T> source;
    // 这里是装饰的核心代码
    @Override
    public void run() {
    lazySet(Thread.currentThread());
    // source即为上游,表示其所装饰的源
    Publisher<T> src = source;
    source = null;
    // 调用上游的自身的subscribe方法,在上面一开始我们说这个方法内部会去调用自身实现的subscribeActual方法
    // 从而实现上游自己的特定方法,比如假设source是FlowCreate那么此处就会调用前面一开始我们所讲到的数据的发射
    src.subscribe(this);
    }
    // 既然已经保证了数据的发射那么数据的处理是不是也要处理
    // 很明显这是调用了下游订阅者的onNext方法
    @Override
    public void onNext(T t) {
    downstream.onNext(t);
    }

本文总结

笔者喜欢总结,总结意味着我们反思和学习前面的知识点,应用点以及自身的不足。

  • 设计模式:观察者模式和装修者模式
  • 并发处理技巧:回压策略(其实本质是缓存)的实现原理以及细节点
Goober Blog

分布式CAP理论与BASE理论

发表于 2018-05-19 | 分类于 分布式

CAP理论

CAP理论是分布式工程项目邻域的基石理论。
CAP一共三个概念:C:Consistency 一致性;A:Availability 可用性;P:Partition tolerance 分区容忍性

  • 一致性(C):在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点保持同一份数据的最新版本)
  • 可用性(A):在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(读,写均需要满足)
  • 分区容忍性(P):以实际效果而言,分区相当于对通信的时限要求。(不会出现延迟,丢包等)

C与A的抉择

由于当前的网络不能保证一定不会出现延迟,丢包等问题所以P是必须选择的一项,那么剩下的C和A需要选择其中一个。
那么为啥不能全选呢?

CAP三者不能全部满足

在分布式系统的设计中,没有一种设计可以同时满足一致性,可用性,分区容错性 3个特性
我们来看一个简单的问题:
我们来看一个简单的问题, 一个DB服务 搭建在两个机房(北京,广州),两个DB实例同时提供写入和读取

  1. 假设DB的更新操作是同时写北京和广州的DB都成功才返回成功
    在没有出现网络故障的时候,满足CA原则,C 即我的任何一个写入,更新操作成功并返回客户端完成后,分布式的所有节点在同一时间的数据完全一致,A 即我的读写操作都能够成功,但是当出现网络故障时,不能同时保证CA,即P条件无法满足

  2. 假设DB的更新操作是只写本地机房成功就返回,通过binlog/oplog回放方式同步至侧边机房
    这种操作保证了在出现网络故障时,双边机房都是可以提供服务的,且读写操作都能成功,意味着他满足了AP ,但是它不满足C,因为更新操作返回成功后,双边机房的DB看到的数据会存在短暂不一致,且在网络故障时,不一致的时间差会很大(仅能保证最终一致性)

  3. 假设DB的更新操作是同时写北京和广州的DB都成功才返回成功且网络故障时提供降级服务
    降级服务,如停止写入,只提供读取功能,这样能保证数据是一致的,且网络故障时能提供服务,满足CP原则,但是他无法满足可用性原则

C与A

一致性,这里的一致性并不是弱一致性或者强一致性,一致性指的是完全一致性。
对于大多数互联网应用来说,因为机器数量庞大,部署节点分散,网络故障是常态,可用性是必须需要保证的,所以只有设置一致性来保证服务的AP,通常常见的高可用服务追求稳定性本质都是放弃C选择AP,对于需要确保强一致性的场景,如银行,通常会权衡CA和CP模型,CA模型网络故障时完全不可用,CP模型具备部分可用性,实际的选择需要通过业务场景来权衡(并不是所有情况CP都好于CA,只能查看信息不能更新信息有时候从产品层面还不如直接拒绝服务)

BASE理论

BASE是 Basically Available(基本可用)、Soft state(软状态)和Eventually consistent(最终一致性) 三个短语的简写,BASE是对CAP中一致性和可用性权衡的结果,其来源于对大规模互联网系统分布式实践的结论,是基于CAP定理逐步演化而来的,其核心思想是即使无法做到强一致性(Strong consistency),但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性(Eventual consistency)。接下来我们着重对BASE中的三要素进行详细讲解。

基本可用

基本可用是指分布式系统在出现不可预知故障的时候,允许损失部分可用性——但请注意,这绝不等价于系统不可用,以下两个就是“基本可用”的典型例子。

响应时间上的损失:正常情况下,一个在线搜索引擎需要0.5秒内返回给用户相应的查询结果,但由于出现异常(比如系统部分机房发生断电或断网故障),查询结果的响应时间增加到了1~2秒。

功能上的损失:正常情况下,在一个电子商务网站上进行购物,消费者几乎能够顺利地完成每一笔订单,但是在一些节日大促购物高峰的时候,由于消费者的购物行为激增,为了保护购物系统的稳定性,部分消费者可能会被引导到一个降级页面。

弱状态也称为软状态,和硬状态相对,是指允许系统中的数据存在中间状态,并认为该中间状态的存在不会影响系统的整体可用性,即允许系统在不同节点的数据副本之间进行数据同步的过程存在延时。

最终一致性

最终一致性强调的是系统中所有的数据副本,在经过一段时间的同步后,最终能够达到一个一致的状态。因此,最终一致性的本质是需要系统保证最终数据能够达到一致,而不需要实时保证系统数据的强一致性。亚马逊首席技术官Werner Vogels在于2008年发表的一篇文章中对最终一致性进行了非常详细的介绍。他认为最终一致性时一种特殊的弱一致性:系统能够保证在没有其他新的更新操作的情况下,数据最终一定能够达到一致的状态,因此所有客户端对系统的数据访问都能够获取到最新的值。同时,在没有发生故障的前提下,数据达到一致状态的时间延迟,取决于网络延迟,系统负载和数据复制方案设计等因素。在实际工程实践中,最终一致性存在以下五类主要变种。

  1. 因果一致性:
    因果一致性是指,如果进程A在更新完某个数据项后通知了进程B,那么进程B之后对该数据项的访问都应该能够获取到进程A更新后的最新值,并且如果进程B要对该数据项进行更新操作的话,务必基于进程A更新后的最新值,即不能发生丢失更新情况。与此同时,与进程A无因果关系的进程C的数据访问则没有这样的限制。
  2. 读己之所写:
    读己之所写是指,进程A更新一个数据项之后,它自己总是能够访问到更新过的最新值,而不会看到旧值。也就是说,对于单个数据获取者而言,其读取到的数据一定不会比自己上次写入的值旧。因此,读己之所写也可以看作是一种特殊的因果一致性。
  3. 会话一致性:
    会话一致性将对系统数据的访问过程框定在了一个会话当中:系统能保证在同一个有效的会话中实现“读己之所写”的一致性,也就是说,执行更新操作之后,客户端能够在同一个会话中始终读取到该数据项的最新值。
  4. 单调读一致性:
    单调读一致性是指如果一个进程从系统中读取出一个数据项的某个值后,那么系统对于该进程后续的任何数据访问都不应该返回更旧的值。
  5. 单调写一致性:
    单调写一致性是指,一个系统需要能够保证来自同一个进程的写操作被顺序地执行。

以上就是最终一致性的五类常见的变种,在实践中其实可以将其中的若干个变种相互结合起来,以构建一个具有最终一致性特性的分布式系统。
其实,最终一致性并不是只有那些大型分布式系统才设计的特性,许多现代的关系型数据库都采用了最终一致性模型。在现代关系型数据库中,大多都会采用同步和异步方式来实现主备数据复制技术。在同步方式中,数据的复制通常是更新事务的一部分,因此在事务完成后,主备数据库的数据就会达到一致。而在异步方式中,备库的更新往往存在延时,这取决于事务日志在主备数据库之间传输的时间长短,如果传输时间过长或者甚至在日志传输过程中出现异常导致无法及时将事务应用到备库上,那么很显然,从备库中读取的的数据将是旧的,因此就出现了不一致的情况。当然,无论是采用多次重试还是人为数据订正,关系型数据库还是能搞保证最终数据达到一致——这就是系统提供最终一致性保证的经典案例。

总结

总的来说,BASE理论面向的是大型高可用可扩展的分布式系统,和传统事务的ACID特性使相反的,它完全不同于ACID的强一致性模型,而是提出通过牺牲强一致性来获得可用性,并允许数据在一段时间内是不一致的,但最终达到一致状态。但同时,在实际的分布式场景中,不同业务单元和组件对数据一致性的要求是不同的,因此在具体的分布式系统架构设计过程中,ACID特性与BASE理论往往又会结合在一起使用。

Goober Blog

浅谈智能运维

发表于 2017-12-30 | 分类于 AI运维

浅谈智能运维

现在智能运维很火,很热门,大家都在研究这个课题项目。在国内学术界,清华大学裴丹教授已经
研究此课题一年有余,也在APM大会上进行了相关落地的阐述。在国外工业界,SIGCOMM 2016大会,
微软提出基于数据中心的故障定位方案,先用实验床把所有可能故障都模拟一下,同时收集各类监控
指标,然后通过机器学习建立模型,这个模型可以根据实际发生的监控指标的症状, 推断根因的
大致位置,以便加速止损。 在相关文献中用到的基础算法包括随机森林,故障指纹构建,逻辑回归,
马尔科夫链,狄利克雷过程等方法来进行故障定位。

阅读全文 »
12
Goober -- a backend programmer who passion for technology

Goober -- a backend programmer who passion for technology

14 日志
10 分类
3 标签
© 2021 Goober -- a backend programmer who passion for technology
由 Hexo 强力驱动
主题 - NexT.Muse