#Hystrix原理浅析
[TOC]
带着问题看原理:
- 执行流程
- 如何做到信号量隔离和线程隔离
- 动态配置的实现
- 如何记录信号量数据
- 如何实现断路器
- 如何实现合并请求
1. Hystrix类结构以及命令执行流程
1. Hystrix类结构
Hystrix的命令实现主体部分是在抽象类AbstractCommnd
中,HystrixCommand
、HystrixObservableCommand
是其的实现类。
HystrixCommand:
最终的执行主体是在方法AbstractCommnd.toObservable()
中。
1 | public R execute() { |
HystrixObservableCommand:
1 | public Observable<R> observe() { |
2. 执行流程
执行过程分为以下几个步骤:
- 首先判断是否开启缓存,如果缓存开启,且可以从缓存中直接查询到结果则直接返回。否则进入下一步;
- 第二步 进行断路器判断逻辑:断路器通过健康统计信息来判断是否打开断路器,如果打开则调用降级方法或向外告知调用者,否则继续下一步;
- 第三步 隔离控制判断:当请求的数量达到阈值(信号隔离)或者达到线程池上界则拒绝请求调用降级方法或向外告知调用者;并记录此次事件。
- 第四步执行业务逻辑,如果此时业务逻辑抛出异常或者超市则会调用降级逻辑或…,并记录此次事件。
- 当正常运行结束后也会记录执行成功的事件和运行的时延等信息,一是供断路器使用、二是提供实时监控数据输出
整个执行过程:将每个步骤都放在一个监听器(Action~
)里。
1 | public Observable<R> toObservable() { |
执行命令:
1 | private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { |
1 | private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { |
AbstractCommand中有四大关键功能属性:
1 | //配置 |
怎样实现动态配置?
先来看怎么动态配置属性(这是Netflix形成的一套体系,不用注解、反射 ):
看Hystrix中线程池的🌰:
1 | class HystrixThreadPoolDefault implements HystrixThreadPool { |
每次获取线程池之前会刷新一次线程池的配置
HystrixThreadPoolProperties:
1 | HystrixProperty<Integer> corePoolSize; |
Hystrix 默认使用 Archaius
来实现属性的配置:HystrixProperty<R>
的实现是ArchaiusDynamicProperty
的子类(StringDynamicProperty
、IntegerDynamicProperty
、LongDynamicProperty
、BooleanDynamicProperty
)。ArchaiusDynamicProperty
有个静态容器:final static ConcurrentHashMap<String, DynamicProperty> ALL_PROPS
其用来包含所有动态属性。
- 属性获取:
当通过HystrixProperty<R>.getValue(name)
获取配置值时,会先去ALL_PROPS
中寻找动态配置的值,如果没有设置则会使用ArchaiusDynamicProperty
中初始化时设置的默认值。 - 属性修改:
当我们使用ConfigurationManager
去修改配置值时,会通过name
去ALL_PROPS
中找到DynamicProperty
并设置值即可生效。
HystrixCircuitBreaker:
1 |
|
HystrixCommandMetrics:
HystrixCommandMetrics
是Hystrix中比较精彩的一部分:
它定义了如下几个事件流:
1 | //健康数量流(滑动窗口) |
下面,本文将以HealthCountsStream
为例来介绍它是怎样获取命令执行信息,并通过滑动窗口的形式聚合信息成供断路器使用的HealthCounts
:
- 获取命令执行信息:
在每个命令执行完成(成功或失败)后,都会执行handleCommandEnd方法。此方法会记录命令执行时长信息到执行结果ExecutionResult
中,并把ExecutionResult
告诉Metrics。
Metrics会调用HystrixThreadEventStream.executionDone()
方法创建一个命令执行完成对象HystrixCommandCompletion
,并把这个对象写入事件流HystrixCommandCompletionStream
中(对于每个命令,Hystrix都会保存此命令的唯一HystrixCommandCompletionStream
用于接受和发送命令完成事件的流)。
1 | private void handleCommandEnd(boolean commandExecutionStarted) { |
此时,所有需要获取命令执行信息,只需要成为HystrixCommandCompletionStream
的订阅者即可。
HystrixCommandCompletionStream
中通过SerializedSubject
来保证多线程并发写不会引起乱序等问题。new SerializedSubject<HystrixCommandCompletion, HystrixCommandCompletion>(PublishSubject.<HystrixCommandCompletion>create())
。
- 消费命令信息
下面以HealthCountsStream
为例来介绍其是怎么消费命令执行信息,并输出可供断路器使用的信息的:
1 | private HealthCountsStream(final HystrixCommandKey commandKey, final int numBuckets, final int bucketSizeInMs, |
跟进其父类BucketedCounterStream
和BucketedRollingCounterStream
,可以看到命令完成事件数据流是怎么被消费的。
首先,BucketedCounterStream
是用来按指定的时间片段长度来生成Bucketed数据流(保留一个时间片段内的所有执行结果类型命令请求数)。
1 | protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs, |
BucketedRollingCounterStream
会订阅BucketedCounterStream
生成的bucketedStream流,并基于此做了滑动窗口的功能,并聚合欢动窗口内的所有命令执行结果类型计量信息。
1 | protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs, |
再回过头来看断路器逻辑中,它监听的即是BucketedRollingCounterStream
中的sourceStream
。它向外推送appendRawEventToBucket
的结果数据。
HystrixCollapser:
Collapser的原理是:将一个时间窗内的所有请求合并批量发送到依赖方进行请求返回。HystrixCollapser
有两种域范围:用户请求范围、全局范围。用户请求范围只会把用户请求范围内的请求合并,全局范围则会合并所有请求
整个过程的时序图如下:
1 | HystrixCollapser->RequestCollapser: toObservable |
HystrixCommandProperties命令列表::
1 | HystrixProperty<Integer> circuitBreakerRequestVolumeThreshold; // number of requests that must be made within a statisticalWindow before open/close decisions are made using stats |