Zamperini


  • 首页

  • 关于

  • 标签23

  • 分类11

  • 归档56

RocketMQ 消息发送&存储

发表于 2018-03-21 | 分类于 rocketmq

5. 消息发送并存储

发送分为同步和异步的方式,本章主要解析同步方式,异步方式与同步方式很相似。
发送的过程,Producer发送消息,Broker接收消息并存储。

发送

发送过程,最主要逻辑在DefaultMQProducerImpl.sendDefaultImpl中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) throws ... {
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
@SuppressWarnings("UnusedAssignment")
long endTimestamp = beginTimestampFirst;
// 获取topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
SendResult sendResult = null;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
// 最多重试timeTotal次
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 按一定策略 选择一个队列进行发送消息(目前是轮询法)
@SuppressWarnings("SpellCheckingInspection")
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (tmpmq != null) {
mq = tmpmq;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
// 调用发送消息核心方法
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
endTimestamp = System.currentTimeMillis();
// 更新Broker可用性信息
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
//重试
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch //...
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
//...
}
//...
}

可以看出整个过程:首先获取路由信息,然后按一定策略选择一个队列进行发送消息,当失败时会重试,最多重试timesTotal次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 跟据队列对应broker地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
// 是否使用VIP通道,否认使用,之前在介绍broker的时候介绍过broker会起两个server,端口号-2 的server为fast server
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
try {
// 生成msg唯一ID 时间差 + counter
MessageClientIDSetter.setUniqID(msg);
int sysFlag = 0;
// 消息压缩
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
}
// 事务
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
// ...发送前校验逻辑
// 构建发送消息请求
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
// 消息重发
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
// 发送消息
SendResult sendResult = null;
switch (communicationMode) {
case SYNC:
// 调用MQclient进行发送消息
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
context,
this);
break;
case //...
}
// 返回发送结果
return sendResult;
} catch //...
}
}

接收

Broker的fast Server接收到消息后,交给SendMessageProcessor进行处理。以下为其主要核心逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
private RemotingCommand sendMessage(final ChannelHandlerContext ctx, final RemotingCommand request, final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
// 初始化响应
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
final byte[] body = request.getBody();
//.... 校验
// 如果队列小于0,从可用队列随机选择
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
}
int sysFlag = requestHeader.getSysFlag();
if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
}
String newTopic = requestHeader.getTopic();
//重试消息
if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return response;
}
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
//超过消费最大次数则放入死信队列中
int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
if (reconsumeTimes >= maxReconsumeTimes) {
newTopic = MixAll.getDLQTopic(groupName);
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0);
}
}
// 创建MessageExtBrokerInner
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(sysFlag);
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());

// 校验是否不允许发送事务消息
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
return response;
}
}
// 调用MessageStore存储消息
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
//... 统计与响应
return response;
}
阅读全文 »

RocketMQ 存储

发表于 2018-03-20 | 分类于 rocketmq

4. store****

[TOC]

存储是RocketMQ中至关重要的一部分。
DefaultMessageStore是RocketMQ中用于存储消息的入口。一条消息存储过程大致如图所示:

首先通过调用CommitLog.putMessage方法将消息追加到MappedFile文件中,然后再以异步的形式进行构建ConsumeQueue,如果用户设置了Key则同时进行构建Index。

我们将从DefaultMessageStore的初始化和其追加消息来分析其工作原理。

初始化

首先进行成员变量的初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
//当有新消息来时触发监听器(后面再讲Consumer的长轮询的时候会讲到)
this.messageArrivingListener = messageArrivingListener;
this.brokerConfig = brokerConfig;
this.messageStoreConfig = messageStoreConfig;
//状态管理
this.brokerStatsManager = brokerStatsManager;
//MapppedFile文件空间申请服务
this.allocateMappedFileService = new AllocateMappedFileService(this);
//提交日志,用于存放消息的地方,最关键的部分
this.commitLog = new CommitLog(this);
//消费队列表,用于保存每个topic下,每个queue的消费进度
this.consumeQueueTable = new ConcurrentHashMap<>(32);
//负责将消费队列表进行刷盘的服务
this.flushConsumeQueueService = new FlushConsumeQueueService();
//负责定时清理旧消息文件的后台服务
this.cleanCommitLogService = new CleanCommitLogService();
//负责定时清理旧消费队列文件的后台服务
this.cleanConsumeQueueService = new CleanConsumeQueueService();
// 存储状态的服务
this.storeStatsService = new StoreStatsService();
//索引服务
this.indexService = new IndexService(this);
// 主从同步服务
this.haService = new HAService(this);
//重放消息线程服务,用于从CommitLog中构建新增消息的ConsumeQueue并构建Index
this.reputMessageService = new ReputMessageService();
//延时消息服务
this.scheduleMessageService = new ScheduleMessageService(this);
// 内存字节缓冲区
this.transientStorePool = new TransientStorePool(messageStoreConfig);
if (messageStoreConfig.isTransientStorePoolEnable()) {
this.transientStorePool.init();
}
//开启文件分配后台线程
this.allocateMappedFileService.start();
this.indexService.start();
}

CommitLog是消息存储中最重要的部分,因为其他损失了可以根据此部分来补齐。CommitLog内部也需要实例化成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
this.defaultMessageStore = defaultMessageStore;
// 当开始同步刷盘时,使用GroupCommitService,非同步时使用FlushRealTimeService。
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
this.flushCommitLogService = new FlushRealTimeService();
}
//当开启内存字节缓冲区时,使用此服务进行刷盘
this.commitLogService = new CommitRealTimeService();
// 日志写入详细内容
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}

初始化完成员变量后,会执行一次load()方法。判断是否有历史文件,如果有则加载历史文件。

阅读全文 »

分布式事务

发表于 2018-03-19 | 分类于 database

分布式事务

[TOC]

事务

Transactions provide an “all-or-nothing“ proposition, stating that each work-unit performed in a database must either complete in its entirety or have no effect whatsoever.

事务应该具有4个属性:原子性、一致性、隔离性、持久性

分布式事务场景

什么是分布式事务

A distributed transaction is a database transaction in which two or more network hosts are involved. Usually, hosts provide transactional resources… Wiki Definition
分布式事务是指多个提供事务资源的网络节点参与的数据库事务
跨越多个服务,操作多个数据库。保证对于多个资源服务器的数据的操作,要么全部成功,要么全部失败。本质上来说,是为了保证不同资源服务器的数据一致性。

阅读全文 »

RocketMQ 初始化

发表于 2018-03-19 | 分类于 rocketmq

3. 初始化

[TOC]

NameServer和Broker是RocetMQ中独立部署的集群。
下面我们将来分别介绍他们。

NameServer

NameServer管理Broker集群的注册信息并提供心跳来检测他们是否可用,持有关于Broker集群和队列的全部路由信息。接下来详细了解其具体功能和实现。

启动

NameServer的启动类为NamesrvController,其包含成员变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 后台线程,负责扫描失活的Broker,并销毁与其的链接
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));
// 用于加载NameServer的配置参数,配置路径 $HOME/namesrv/kvConfig.json
private final KVConfigManager kvConfigManager;
// 路由信息管理
private final RouteInfoManager routeInfoManager;
// 基于Netty实现远端服务,监听请求并转发到`DefaultRequestProcessor`,并根据请求码进行不同请求逻辑的处理
private RemotingServer remotingServer;
// Broker连接管理服务,用于监听Netty的Channel事件,当异常时从RouteInfoManager中删除相应Channel对应的路由信息
private BrokerHousekeepingService brokerHousekeepingService;
private ExecutorService remotingExecutor;
private Configuration configuration;

NameServer的启动过程如下:

1
2
3
4
5
6
7
8
9
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
//初始化服务
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
//启动Socekt链接,对外提供服务
controller.start();
阅读全文 »

RocketMQ 通信

发表于 2018-03-18 | 分类于 rocketmq

2. remoting

[TOC]

RocketMQ的通讯模块实现的比较简单,不像Dubbo那样给用户提供多种可选的方式,只有一种基于通讯框架Netty的实现方式。
其最主要的两个类是NettyRemotingServer、NettyRemotingClient。以下为其类图:

首先我们来分析Server部分,再看Client部分。

Server:

1. 初始化

首先初始化Server,初始化Netty 的AcceptorEventLoopGroup和SelectorEventLoopGroup,并设置限流阈值和通道事件监听器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) {
// 限流,设置信号量,设置同一时间单向请求和异步请求最大的请求数。
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
// 设置通道事件监听器
this.channelEventListener = channelEventListener;
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
// 通用的处理线程池实例化,当业务方没有设置processor的线程池时,使用共用的线程池publiceExecutor
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
// 设置Netty Acceptor线程池大小
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
}
});
// 设置Netty Selector线程池大小,此处判断若是linux平台,使用Epoll线程池,否则使用
if (RemotingUtil.isLinuxPlatform() //
&& nettyServerConfig.isUseEpollNativeSelector()) {
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
//否则使用一般的
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}
}

2. 启动

调用start()方法,启动Server。监听端口,开启服务,并开启后台线程,扫描超时的响应并作回调。

阅读全文 »
prev1…789…12next
Zamperini

Zamperini

GitHub E-Mail Weibo
© 2020 Zamperini
由 Hexo 强力驱动
|
主题 — NexT.Gemini v6.0.3