ZK角色介绍
Leader Processor 处理
Follower Processor 处理
Observer Processor 处理
一次写操作
写操作包括 创建、修改、删除节点信息还包括 创建和删除会话操作。
下面,将从客户端初始化和服务端初始化两个方面来讲解ZK的初始化过程:
1 | public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException { |
从上面可以看到最主要的即是 ClientCnxn,深入进去可以看到:
1 | public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, |
SendThread
是负责发送请求,而EventThread
负责接收并处理事件。
ZK Client发送请求的过程是非常简单的:将请求进行包装成Packet
,放进双向阻塞队列outgoingQueue中。然后SendThread
不停地从其中拿出请求向Server
发送请求。
其过程是在SendThread
的run()
方法中:
1 | public void run() { |
SendThread
中不仅有调用clientCnxnSocket
发送请求的逻辑,同时也维护着心跳逻辑,10秒内如果没有发送请求则发送一次心跳。
另外,值得一提的是,通过startConnect(serverAddress)
完成与服务端的连接后,会触发一次sendThread.primeConnection()
,它的作用是向服务端发起一次创建Session请求或者续联。另外,如果配置了zookeeper.disableAutoWatchReset = false
会将本地的Watcher
重新向连接的服务端注册一遍。
当服务端返回时,则由ClientCnxnSocketNetty.ZKClientHandler
进行处理,而其最终会调用SendThread.readResponse
。在readResponse
会根据返回的内容进行相应处理。如果返回的Xid为-1时,表示事件通知,反序列化后触发相应的Wacther
。
ThreadLocal
, Callable
, Runnable
, ThreadPoolExecutor
、 BlockingQueue
是起实现线程隔离的基础。Hystrix已经实现了这些工具,并且开箱即用。但是在很大场景下,客户想要嵌入个性化的实现或者装饰行为,此时,可以实现HystrixConcurrencyStrategy
,并覆盖一下方法:getThreadPool()
and getBlockingQueue()
方法中嵌入Log,或者记录Metrics;wrapCallable()
允许我们去装饰每个被Hystrix执行的Callable
,比如可以通过其来传递ThreadLocal
中的数据;不用担心 Trace链被打断,2333getRequestVariable()
希望返回 HystrixRequestVariable<T>
的实现类,其可以做到在一个请求的所有线程中共享上下文信息HystrixRequestContext
等HystrixCommandExecutionHook
的实现可以获取是命令HystrixInvokable
(HystrixCommand
or HystrixObservableCommand
)的执行声明周期。基于此,我们可以嵌入一些行为、日志、覆盖返回、上报线程状态等。#Hystrix原理浅析
[TOC]
带着问题看原理:
Hystrix的命令实现主体部分是在抽象类AbstractCommnd
中,HystrixCommand
、HystrixObservableCommand
是其的实现类。
HystrixCommand:
最终的执行主体是在方法AbstractCommnd.toObservable()
中。
1 | public R execute() { |
HystrixObservableCommand:
ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持Java、.NET、JavaScript和C++,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io。
在ReactiveX中,一个观察者(Observer)订阅一个可观察对象(Observable)。观察者对Observable发射的数据或数据序列作出响应。这种模式可以极大地简化并发操作,因为它创建了一个处于待命状态的观察者哨兵,在未来某个时刻响应Observable的通知,不需要阻塞等待Observable发射数据。
本文主要聚焦于RxJava
先来简单介绍 观察者模式里一些概念:
Observable:发射源,英文释义“可观察的”,在观察者模式中称为“被观察者”或“可观察对象”;
Observer:接收源,英文释义“观察者”,就是观察者模式中的“观察者”,可接收Observable、Subject发射的数据;
Subject:Subject是一个比较特殊的对象,既可充当发射源,也可充当接收源。它是消息收发的桥梁。
Subscriber:“订阅者”,也是接收源,Subscriber实现了Observer接口,比Observer多了一个方法unsubscribe(),用来取消订阅,当你不再想接收数据了,可以调用unsubscribe()方法停止接收,Observer 在 subscribe() 过程中,最终也会被转换成 Subscriber 对象,一般情况下,建议使用Subscriber作为接收源;
Subscription :Observable调用subscribe( )方法返回的对象,同样有unsubscribe( )方法,可以用来取消订阅事件;
Action~:RxJava中的一个接口,
Action~
是用来代替并简化Subscriber
,它只有无返回值的call()
方法,对应 ~ 为09时,它们有09个入参;Func~:与
Action~
非常相似,也有call()方法。但是它是有返回值的,同样也有Func0、Func1…Func9;
首先来看RxJva最简单的使用方式:
1 | //消息发送者 |
这样就形成RxJava一个简单的用法,sender发射”Hello World!”,将会被receiver的onNext的接收,通过这个例子,这就是 “观察者模式”。RxJava让观察者模式变得更简单和简洁,而RxJava所有的一切都将围绕这两个点展开,一个是发射数据,一个是接收数据。
just( )
— 将一个或多个对象转换成发射这些对象的Observable,并触发onNext( )
操作from( )
— 将一个Iterable, 一个Future, 或者一个数组转换成一个Observablerepeat( )
— 创建一个重复发射指定数据或数据序列的ObservablerepeatWhen( )
— 创建一个重复发射指定数据或数据序列的Observable,它依赖于另一个Observable发射的数据create( )
— 使用一个函数从头创建一个Observabledefer( )
— 只有当订阅者订阅才创建Observable;为每个订阅创建一个新的Observablerange( )
— 创建一个发射指定范围的整数序列的Observableinterval( )
— 创建一个按照给定的时间间隔发射整数序列的Observabletimer( )
— 创建一个在给定的延时之后发射单个数据的Observableempty( )
— 创建一个什么都不做直接通知完成的Observableerror( )
— 创建一个什么都不做直接通知错误的Observablenever( )
— 创建一个不发射任何数据的Observable