Zamperini


  • 首页

  • 关于

  • 标签23

  • 分类11

  • 归档56

Netty NioEventLoop

发表于 2018-04-08 | 分类于 netty

NioEventLoop

在初始化过程中,Client和Server用的 EventLoopGroup是 NioEventLoopGroup,而其内部是NioEventLoop数组。
NioEventLoop并不是一个纯粹的I/O线程,还负责处理两类任务:

  • 系统Task: 当I/O线程和用户线程同时操作网路资源时,为了防止并发操作导致的锁竞争,将用户线程的操作封装成 Task放入消息队列中,由I/O线程负责执行,这样就实现了局部无锁化;
  • 定时任务: 通过schedule方法可以执行定时任务。
    继续来看其run方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
protected void run() {
for (;;) {
// 唤醒 Selector
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
// io任务比例,默认 50
final int ioRatio = this.ioRatio;
// 100时,则先执行io任务,再执行task
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {
//记录IO时间
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
//反推任务能执行的时间长度
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
}
}

可以看到里面有个ioRatio,表示IO任务的比例。
当为100%时,先进行IO操作再执行任务;否则先进行IO操作,然后根据 IO操作的时间 来计算任务可以执行的时间长度。默认为50%。
再细看processSelectedKeys:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
}
//...
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

它遍历每个SelectionKey,查看有哪些已经准备好的操作,根据准备好的事件进行相应的操作。

runAllTasks方法从 taskQueue 中获取所有的任务进行执行。对于taskQueue维护着的业务方提交的任务。在执行其execute方法的时候即把任务放进taskQueue中。

阅读全文 »

Netty 初始化

发表于 2018-04-07 | 分类于 netty

初始化

对于Client的初始化过程,本章主要会介绍怎样初始化ChannelPipeline以及与Server建立连接。
Server初始化过程比Client端复杂一些,它分为两部分,分为Acceptor的处理链初始化过程和Reactor链的初始化过程。绑定端口提供服务的过程。

Client

首先来看Client的初始化代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 配置事件组,即线程组,会在构造函数中创建线程,默认情况下他会创建 CPU数*2个 线程
EventLoopGroup group = new NioEventLoopGroup();
try {
// Client端启动辅助类
Bootstrap b = new Bootstrap();
// 设置Client端线程池,一般一个线程负责至少一个Channel
b.group(group)
// 设置 Client 的 SocketChannel,也可以选择 像 EpollSocketChannel这种!
.channel(NioSocketChannel.class)
//添加配置项
.option(ChannelOption.TCP_NODELAY, true)
// 设置 ChannelHandler,这里的 ChannelInitializer不是最终最终编制在 Pipeline上的ChannelHandler,而只是起一个初始化作用的Handler,后文中会细说
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//在其中添加业务定制的Handler
p.addLast(new EchoClientHandler());
}
});

// 发起连接
ChannelFuture f = b.connect(HOST, PORT).sync();
// 阻塞等待,一直等到链接关闭
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}

Bootstrap类是Client端的启动辅助类。
整个过程中,分为两大步,第一步进行参数设置,设置Bootstrap的参数,第二步则是发起连接。
在Bootstrap参数设置里,可以看到业务方配置了group(客户端事件线程池EventLoopGroup,后面单领出来讲解)、Channel的类型(比如对于NIO和EpollChannel)和 ChannelHandler(这里的ChannelInitializer 是给业务放提供的入口,让业务将自定义的ChannelHandler编制进Pipeline)。

发起连接: 利用设置的参数进行初始化处理链,注册事件、开启事件线程、发起连接的过程:

1
2
3
4
5
6
7
8
9
Bootstrap.connect
--> Bootstrap.doResolveAndConnect
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 初始化Channel,并将Channel注册到事件线程中
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// 解析地址 并创建连接
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
}

执行InitAndRegister方法主要是初始化Channel并与某个EventLoop绑定。执行doResolveAndConnect0方法用于解析地址并创建连接。下面就来分别详细解析这两个方法:

1
2
3
4
5
6
7
8
9
10
11
final ChannelFuture initAndRegister() {
Channel channel = null;
channel =
// 根据之前设置的Channel类型实例化一个Channel然后根绝配置的option设置参数
channelFactory.newChannel();
//参数设置,options
init(channel);
// 将channel和一个EventLoop绑定
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}
阅读全文 »

Netty 简介

发表于 2018-04-06 | 分类于 netty

Netty

Netty可以说是Java领域不得不提的高性能通讯框架,就笔者之前所学习的开源代码,只要涉及到通讯的无不涉及。比如说RocketMQ、Dubbo、ZooKeeper、MtThrift以及后面要深入理解的Zuul2。
鉴于此,笔者决定深入挖掘一下Netty。笔者会从以下几方面来讲解:

  • Client、Server 初始化过程
  • NioEventLoop 内部执行原理
  • 客户端 发送请求与接收响应过程
  • 服务端 接收请求与响应请求过程
  • Netty的内存管理算法与实现
  • Epoll实现与其他实现差别

章节中主要以Netty.example.echo为例来讲解整个过程。另外,本章侧重于Netty的原理讲解,而不是使用。对于使用,可以查阅 官方文档。

阅读全文 »

RocketMQ 消费

发表于 2018-03-22 | 分类于 rocketmq

6. 消息拉取并消费

本章我们将分别从Consumer和Broker两个地方进行介绍。

Consumer 处理

在第四章初始化与启动中,介绍了Consumer的初始化启动过程,其中会启动一个RebalanceService服务。它的作用就是定时去做重负载。
重负载的内容就是去Broker去查询当前所有的消费者然后根据消息分配策略计算出分配给自己的队列有哪些。最后跟本地进行比较,如果是新增的就发起消息的消费,如果是本地不再存在的则从本地删除,其他的则不变。
对于Consumer刚启动的时候,也会触发一次重负载。
重负载的逻辑在RebalanceImpl.rebalanceByTopic方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case CLUSTERING: {
// 获取topic下的所有queue
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//获取本消费组下的所有消费者
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
// 根据 队列分配策略 分配消费队列
List<MessageQueue> allocateResult;
// 根据指定分配策略分配出给本consumer的消费队列
allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
Set<MessageQueue> allocateResultSet = new HashSet<>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
// 跟据最新的分配结果去更新consumer本地的消费队列
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
}
break;
}
case //...
}
}

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
boolean changed = false;
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
// 筛选出不在最新消费队列中的本地队列进行移除,另外对于拉取超时的也同样移除
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
// 筛选出新加的
List<PullRequest> pullRequestList = new ArrayList<>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
// 有序消费的时候,需要锁定记录
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
}
}
}
// 发起消息拉取请求
this.dispatchPullRequest(pullRequestList);
return changed;
}

特别需要说的是:对于有序消费,在重负载的时候会尝试锁住资源;如果锁资源失败则不进行消费。
对于新增的消费队列,consumer会构造拉取参数PullRequest调用dispatchPullRequest进行提交消费。可以看到,最终是将拉取请求丢进PullMessageService的消息拉取队列中pullRequestQueue。

1
2
3
4
5
6
7
8
9
10
11
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
}
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
this.pullRequestQueue.put(pullRequest);
}

PullMessageService中有个后台线程会从pullRequestQueue不停取PullRequest开始拉取消息。
拉取消费过程大致如下:

  1. 调用pullAPIWrapper.pullKernelImpl去拉取消息
  2. 对拉取回来的消息调用PullCallback进行消费

拉取消息请求的是Broker的PullMessageProcessor,后面我们再来分析它。

阅读全文 »

RocketMQ 特性

发表于 2018-03-22 | 分类于 rocketmq

7. 特性

顺序消息

RocketMQ无法保证消息的绝对顺序,即所有消息都按发送顺序被消费。因为当有N多个消费者时,可能有的消息者拿到后来的消息但由于消费的比其他消费者快导致后来的消息先消费。
但是使用方可以保证局部有序。

RocketMQ提供API,让用户自己选择消息发送到哪个Queue:

1
2
3
4
5
6
7
8
9
Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);

根据指定的id来选择发往哪个队列,这样同一标识的消息会按时间先后顺序发往队列。
再者,在Consumer重负载的时候,如果是顺序消息会锁住消息不然其他Consumer进行消费。
最后,当消息到达Consumer本地时,选择以顺序的形式进行消费ConsumeMessageOrderlyService,则可保证消息按时间顺序消费。

顺序性也带来一定的弊端:

  • 并行度会成为消息系统的瓶颈
  • 更多的异常处理。消费端出现问题,就会导致真个流程阻塞。

消息重复

RocketMQ并不能保证消息完全不重复,如果业务方需要此属性,需要业务方自己去重。

阅读全文 »
prev1…678…12next
Zamperini

Zamperini

GitHub E-Mail Weibo
© 2020 Zamperini
由 Hexo 强力驱动
|
主题 — NexT.Gemini v6.0.3