RocketMQ消息存储过程

/ RocketMQ / 40浏览

NameServer集群数据管理

NameServer管理着集群的信息。主要的存储结构如下:

QueueData

private String brokerName;
private int readQueueNums;
private int writeQueueNums;
private int perm;
private int topicSynFlag;

每个BrokerName下的小集群信息。对应的一个Master和多个Slave。

QueueData里面存储了Broker的名称、读写Queue的数量和同步标识。

BrokerData

private String cluster;
private String brokerName;
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

每个brokerName下的集群信息。包含了一个master broker 和 多个 slave broker的地址信息。

TopicRouteData

private String orderTopicConf;
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

Topic路由信息,包含了某个topic的QueueData信息和每个Broker地址映射信息。

RouteInfoManager中保存着集群状态信息。

HashMap<String/* topic */, List<QueueData>> topicQueueTable;
HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

topicQueueTable

存储了所有topic的信息。key是topic的名称,value是QueueData队列,队列的长度等于topic数据存储的master broker的个数。即:每个小集群里面包含了几个读写队列。

brokerAddrTable

以brokerName为key,存储了集群中所有的brokerName,每个brokerName下有几个机器,分别对应的地址是什么。

clusterAddrTable

所有的集群信息。以clusterName为key,value中存储了某个集群中都有哪些brokerName。

brokerLiveTable

以每一个broker的地址作为key,也就是对应着一台机器,value存储了这个机器下的实时状态,包括上次更新状态的时间戳,NameServer会定期检查这个时间戳,超过时间没有更新,就会把它剔除出去。

filterServerTable

每个机器中存在的过滤器。

Topic创建流程

手动创建

./mqadmin updateTopic -n localhost:9876  -b localhost:10911  -t tx-mq-TOPIC

-n NameServer地址

-b 在某个Broker上创建topic

-t topic名称

通过命令进入后,会去创建Topic。可以看到,在创建topic时,默认设置了8个读写队列。

UpdateTopicSubCommand.execute();
TopicConfig topicConfig = new TopicConfig();
topicConfig.setReadQueueNums(8);
topicConfig.setWriteQueueNums(8);
topicConfig.setTopicName(commandLine.getOptionValue('t').trim());

指定-b操作时,在指定的broker上来创建topic的读写队列。

如果指定的是-c,即在某个集群上来创建topic的读写队列。

具体实现代码:

if (commandLine.hasOption('b')) {
    String addr = commandLine.getOptionValue('b').trim();
    defaultMQAdminExt.start();
    defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
    System.out.printf("create topic to %s success.%n", addr);
    return;

} else if (commandLine.hasOption('c')) {
    String clusterName = commandLine.getOptionValue('c').trim();
    defaultMQAdminExt.start();

    // 获取集群中所有的master broker
    Set<String> masterSet =
        CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
    // 循环在每个master broker中创建topic。
    for (String addr : masterSet) {
        defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
        System.out.printf("create topic to %s success.%n", addr);
    }
    System.out.printf("%s", topicConfig);
    return;
}

由此可以看出,创建topic时,会和每个master建立连接。

自动创建

发送消息时创建topic。类似手动创建。

发送消息流程

img

MessageQueue选择方式

消息在发送时,是如何选择把消息发到哪个MessageQueue中呢。

1. 手动指定

可以通过手动指定的方式来选择。例如:

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);

手动创建一个MessageQueueSelector来选择。

2. 自动选择

如果不指定MessageQueueSelector,RocketMQ会自动选择一个。具体方式如下(核心逻辑,不相关的删了):

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 获取topic所对应的MessageQueue
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

    // 选择一个MessageQueue    
    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

    //发送message到被选择的MessageQueue中。
    this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
}

MessageQueue选择机制

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        // 默认false,中间省去n行代码
        return tpInfo.selectOneMessageQueue();
    }

    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        int index = this.sendWhichQueue.getAndIncrement();
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int pos = Math.abs(index++) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        return selectOneMessageQueue();
    }
}

public MessageQueue selectOneMessageQueue() {
    int index = this.sendWhichQueue.getAndIncrement();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    // 从已有的MessageQueue中顺序选取一个发送
    return this.messageQueueList.get(pos);
}

读消息流程

主从切换

在master宕机之后。consumer是如何选择去slave中读取数据的。

DefaultMQPullConsumer

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();

// 获取所有的MessageQueue
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("broker-a");
for (MessageQueue mq : mqs) {
    System.out.printf("Consume from the queue: %s%n", mq);
    SINGLE_MQ:
    while (true) {
        try {
            PullResult pullResult =
                consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
            System.out.printf("%s%n", pullResult);
            putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
            switch (pullResult.getPullStatus()) {
                case FOUND:
                    break;
                case NO_MATCHED_MSG:
                    break;
                case NO_NEW_MSG:
                    break SINGLE_MQ;
                case OFFSET_ILLEGAL:
                    break;
                default:
                    break;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

consumer.shutdown();

遍历每个MessageQueue来获取数据。官方说优先从master来读取,如果master挂了,是如何来切换的呢。底层切换逻辑如下:

FindBrokerResult findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                this.recalculatePullFromWhichNode(mq), false);

findBrokerAddressInSubscribe方法中有两个参数,一个是broker名称,它下面对应着一个master和多个slave。另一个是brokerId。该brokerId的获取方式如下:

public long recalculatePullFromWhichNode(final MessageQueue mq) {
    if (this.isConnectBrokerByUser()) {
        return this.defaultBrokerId;
    }

    AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
    if (suggest != null) {
        return suggest.get();
    }

    return MixAll.MASTER_ID;
}

如果缓存中有已经建立好的连接,就返回该brokerId。如果没有,取一个建议的brokerId,如果也没有建议的brokerId,就默认是master节点。

如果master挂了,findBrokerAddressInSubscribe执行逻辑如下:

public FindBrokerResult findBrokerAddressInSubscribe(
    final String brokerName,
    final long brokerId,
    final boolean onlyThisBroker
) {
    String brokerAddr = null;
    boolean slave = false;
    boolean found = false;

    HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
    if (map != null && !map.isEmpty()) {
        brokerAddr = map.get(brokerId);        // 因为master挂了,所以get不到
        slave = brokerId != MixAll.MASTER_ID;
        found = brokerAddr != null;

        if (!found && !onlyThisBroker) {
            // get不到master节点后,从brokerAddrTable中随机取一个
            Entry<Long, String> entry = map.entrySet().iterator().next();
            brokerAddr = entry.getValue();
            slave = entry.getKey() != MixAll.MASTER_ID;
            found = true;
        }
    }

    if (found) {
        return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
    }

    return null;
}

选完之后,和该slave建立连接,读取数据。

DefaultMQPushConsumer

基于PullRequest,是一个长轮询的连接。循环检查是否有新消息。如果有新消息到达,就利用现有的连接立即返回给consumer。如果没有消息,就不返回,一直循环查看状态。

消息存储过程

RocketMQ将消息的存储抽象为MessageStore接口,默认实现是:DefaultMessageStore。它主要提供的方法:

  1. 保存消息
  2. 根据topic、queue和offset获取消息
  3. 根据messageKey查询消息

DefaultMessageStore的启动流程:

public void start() throws Exception {
    //1、写lock 文件,尝试获取lock文件锁,保证磁盘上的文件只会被一个messageStore读写
    lock = lockFile.getChannel().tryLock(0, 1, false);
    if (lock == null || lock.isShared() || !lock.isValid()) {
        throw new RuntimeException("Lock failed,MQ already started");
    }

    lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
    lockFile.getChannel().force(true);
    //2、启动FlushConsumeQueueService,是一个单线程的服务,定时将consumeQueue文件的数据刷新到磁盘,周期由参数flushIntervalConsumeQueue设置,默认1sec
    this.flushConsumeQueueService.start();
    //3、启动CommitLog
    this.commitLog.start();
    //4、消息存储指标统计服务,RT,TPS...
    this.storeStatsService.start();
    //5、针对master,启动延时消息调度服务
    if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
        this.scheduleMessageService.start();
    }
    //6、启动reputMessageService,该服务负责将CommitLog中的消息offset记录到cosumeQueue文件中
    if (this.getMessageStoreConfig().isDuplicationEnable()) {
        this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
    } else {
        this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
    }
    this.reputMessageService.start();
    //7、启动haService,数据主从同步的服务
    this.haService.start();
    //8、对于新的broker,初始化文件存储的目录
    this.createTempFile();
    //9、启动定时任务
    this.addScheduleTask();
    this.shutdown = false;
}