Zamperini


  • 首页

  • 关于

  • 标签23

  • 分类11

  • 归档56

Zookeeper 整体介绍

发表于 2018-03-03 | 分类于 zookeeper

ZK角色介绍

Leader Processor 处理

Follower Processor 处理



Observer Processor 处理

一次写操作

写操作包括 创建、修改、删除节点信息还包括 创建和删除会话操作。

阅读全文 »

Zookeeper 初始化

发表于 2018-03-03 | 分类于 zookeeper

2. 初始化过程

下面,将从客户端初始化和服务端初始化两个方面来讲解ZK的初始化过程:

客户端初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
// 观察者管理器
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
hostProvider = aHostProvider;

// ClientCnxn: client的通讯组件,负责socket通讯(发送请求等,且支持透明地切换到不同的Server)
// ClientCnxnSocket : 真正通讯的组件,ClientCnxnSocketNIO、ClientCnxnSocketNetty
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),// 客户端隔离命名空间,相当于试图模式。只对某个path的进行操作
hostProvider,// 服务器提供者列表
sessionTimeout,//会话超时时间
this,
watchManager, // 监听管理器
getClientCnxnSocket(),// 客户端套接字通讯工具 ClientCnxnSocket 比如 ClientCnxnSocketNetty
canBeReadOnly);
// 初始化
cnxn.start();
}

从上面可以看到最主要的即是 ClientCnxn,深入进去可以看到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
// 发送请求的线程
sendThread = new SendThread(clientCnxnSocket);
// 事件线程,当注册的节点发生变化时,server端会通知client,EventThread用来获取通知,并触发相应的Watcher
eventThread = new EventThread();
this.clientConfig = zooKeeper.getClientConfig();
}

public void start() {
sendThread.start();
eventThread.start();
}

SendThread是负责发送请求,而EventThread负责接收并处理事件。
ZK Client发送请求的过程是非常简单的:将请求进行包装成Packet,放进双向阻塞队列outgoingQueue中。然后SendThread不停地从其中拿出请求向Server发送请求。
其过程是在SendThread的run()方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public void run() {
// 引入 outgoingQueue
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
// 10 秒发一次 Ping 操作
final int MAX_SEND_PING_INTERVAL = 10000;
InetSocketAddress serverAddress = null;
while (state.isAlive()) {
// 链接断开时,重连
if (!clientCnxnSocket.isConnected()) {
if (closing) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
//如果所有的host都试了一遍,则停顿 1s
serverAddress = hostProvider.next(1000);
}
// 链接服务器,首次连接时同时
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
// ...
if (state.isConnected()) {
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
//当有 MAX_SEND_PING_INTERVAL秒没发送请求,则发送 ping 心跳
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
//对于只读的则重试ping到读写服务上
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
// clientCnxnSocket 从 outgoingQueue中取请求发送
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
}
// ...
}

SendThread中不仅有调用clientCnxnSocket发送请求的逻辑,同时也维护着心跳逻辑,10秒内如果没有发送请求则发送一次心跳。
另外,值得一提的是,通过startConnect(serverAddress)完成与服务端的连接后,会触发一次sendThread.primeConnection(),它的作用是向服务端发起一次创建Session请求或者续联。另外,如果配置了zookeeper.disableAutoWatchReset = false会将本地的Watcher重新向连接的服务端注册一遍。

当服务端返回时,则由ClientCnxnSocketNetty.ZKClientHandler进行处理,而其最终会调用SendThread.readResponse。在readResponse会根据返回的内容进行相应处理。如果返回的Xid为-1时,表示事件通知,反序列化后触发相应的Wacther。

阅读全文 »

Hystrix-插件

发表于 2017-08-15 | 分类于 hystrix

Hystrix插件

  • Event Notifier : 可以自定义事件通知器,用于报警和统计信息
  • Metrics Publisher : 用于收集测量数据,并开启一个后台线程处理这些计量数据,比如持久化(Servo)
  • Properties Strategy : 用于控制系统中的属性定义
  • Concurrency Strategy : 并发策略,在Hystrix中,ThreadLocal, Callable, Runnable, ThreadPoolExecutor、 BlockingQueue 是起实现线程隔离的基础。Hystrix已经实现了这些工具,并且开箱即用。但是在很大场景下,客户想要嵌入个性化的实现或者装饰行为,此时,可以实现HystrixConcurrencyStrategy,并覆盖一下方法:
    • 比如在 getThreadPool() and getBlockingQueue() 方法中嵌入Log,或者记录Metrics;
    • wrapCallable() 允许我们去装饰每个被Hystrix执行的Callable ,比如可以通过其来传递ThreadLocal中的数据;不用担心 Trace链被打断,2333
    • 方法getRequestVariable() 希望返回 HystrixRequestVariable<T>的实现类,其可以做到在一个请求的所有线程中共享上下文信息HystrixRequestContext等
  • Command Execution Hook :HystrixCommandExecutionHook的实现可以获取是命令HystrixInvokable (HystrixCommand or HystrixObservableCommand)的执行声明周期。基于此,我们可以嵌入一些行为、日志、覆盖返回、上报线程状态等。
阅读全文 »

Hystrix-原理解析

发表于 2017-08-15 | 分类于 hystrix

#Hystrix原理浅析

[TOC]

带着问题看原理:

  1. 执行流程
  2. 如何做到信号量隔离和线程隔离
  3. 动态配置的实现
  4. 如何记录信号量数据
  5. 如何实现断路器
  6. 如何实现合并请求

1. Hystrix类结构以及命令执行流程

1. Hystrix类结构

Hystrix的命令实现主体部分是在抽象类AbstractCommnd中,HystrixCommand、HystrixObservableCommand是其的实现类。
set up-w600

HystrixCommand:
最终的执行主体是在方法AbstractCommnd.toObservable()中。

1
2
3
4
5
6
7
8
9
10
11
 public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
public Future<R> queue() {
//第一步执行
final Future<R> delegate = **toObservable()**.toBlocking().toFuture();
}

HystrixObservableCommand:

阅读全文 »

Hystrix-RxJava入门

发表于 2017-08-14 | 分类于 hystrix

RxJava入门

ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持Java、.NET、JavaScript和C++,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io。

在ReactiveX中,一个观察者(Observer)订阅一个可观察对象(Observable)。观察者对Observable发射的数据或数据序列作出响应。这种模式可以极大地简化并发操作,因为它创建了一个处于待命状态的观察者哨兵,在未来某个时刻响应Observable的通知,不需要阻塞等待Observable发射数据。

本文主要聚焦于RxJava
先来简单介绍 观察者模式里一些概念:

Observable:发射源,英文释义“可观察的”,在观察者模式中称为“被观察者”或“可观察对象”;

Observer:接收源,英文释义“观察者”,就是观察者模式中的“观察者”,可接收Observable、Subject发射的数据;

Subject:Subject是一个比较特殊的对象,既可充当发射源,也可充当接收源。它是消息收发的桥梁。

Subscriber:“订阅者”,也是接收源,Subscriber实现了Observer接口,比Observer多了一个方法unsubscribe(),用来取消订阅,当你不再想接收数据了,可以调用unsubscribe()方法停止接收,Observer 在 subscribe() 过程中,最终也会被转换成 Subscriber 对象,一般情况下,建议使用Subscriber作为接收源;

Subscription :Observable调用subscribe( )方法返回的对象,同样有unsubscribe( )方法,可以用来取消订阅事件;

Action~:RxJava中的一个接口,Action~是用来代替并简化Subscriber,它只有无返回值的call()方法,对应 ~ 为09时,它们有09个入参;

Func~:与Action~非常相似,也有call()方法。但是它是有返回值的,同样也有Func0、Func1…Func9;

首先来看RxJva最简单的使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//消息发送者
Observable<String> sender = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
while(下载进度<100%) {
subscriber.onNext("当前已下载" + 下载进度);
}

subscriber.onNext("Hello World");
}
});
//消息订阅者
Observer<String> receiver = new Observer<String>() {
@Override
public void onCompleted() {
//数据接收完成时调用
}
@Override
public void onError(Throwable e) {
//发生错误调用
}
@Override
public void onNext(String s) {
// 获取页面句柄,渲染后展示给用户看当前下载进度
viewHander.show(s);
}
};
//消息订阅
sender.subscribe(receiver);

这样就形成RxJava一个简单的用法,sender发射”Hello World!”,将会被receiver的onNext的接收,通过这个例子,这就是 “观察者模式”。RxJava让观察者模式变得更简单和简洁,而RxJava所有的一切都将围绕这两个点展开,一个是发射数据,一个是接收数据。

1. Observable的创建:

  • just( ) — 将一个或多个对象转换成发射这些对象的Observable,并触发onNext( )操作
  • from( ) — 将一个Iterable, 一个Future, 或者一个数组转换成一个Observable
  • repeat( ) — 创建一个重复发射指定数据或数据序列的Observable
  • repeatWhen( ) — 创建一个重复发射指定数据或数据序列的Observable,它依赖于另一个Observable发射的数据
  • create( ) — 使用一个函数从头创建一个Observable
  • defer( ) — 只有当订阅者订阅才创建Observable;为每个订阅创建一个新的Observable
  • range( ) — 创建一个发射指定范围的整数序列的Observable
  • interval( ) — 创建一个按照给定的时间间隔发射整数序列的Observable
  • timer( ) — 创建一个在给定的延时之后发射单个数据的Observable
  • empty( ) — 创建一个什么都不做直接通知完成的Observable
  • error( ) — 创建一个什么都不做直接通知错误的Observable
  • never( ) — 创建一个不发射任何数据的Observable
阅读全文 »
prev1…9101112next
Zamperini

Zamperini

GitHub E-Mail Weibo
© 2020 Zamperini
由 Hexo 强力驱动
|
主题 — NexT.Gemini v6.0.3