RocketMQ同/异步刷盘以及故障恢复

在写这篇文章之前首先写几个疑问,期望在整理完这篇文章后,能够得到有效的解答,而不是模棱两可。

  1. broker将消息写入CommitLog的FileChannel之后,在还没有进行刷盘的时候,如果down机,是不是意味着会有部分消息丢失?
  2. broker在将消息成功刷盘之后,在做consumeQueue与IndexFile分发的时候,如果down机,那么怎么保证这些没有分发的消息继续被分发?

    会将最后一个有效文件的消息进行重新分发。

  3. 会不会存在消息成功被分发且刷盘,但是消息还没有持久化到磁盘?

    可能会存在这种问题,但是在重启的时候会被删除。

  4. 异步刷盘的运行原理是什么?

故障恢复:

DefaultMessageStore.load()

  1. 加载CommitLog
    查询MappedFileQueue目录下的所有文件。
    • 创建MappedFile,给wrotePosition,FlushedPosition,commitedPositon赋值为文件大小。该MappedFile为CommitLog的MappedFile.
    • 如果MappedFile的大小与broker配置的单个文件大小不同,那么不进行加载。
      TODO: 如果CommitLog的大小被修改,那么之前的消息如何处理?
  2. 加载ConsumeQueue
    • 查询topic列表
    • 遍历topic目录,拿到queue列表,创建ConsumeQueue
    • 将ConsumeQueue放入ConsumeQueueTable中
    • 执行consumeQueue.load()方法,将目录下的所有文件加载为MappedFile文件。
  3. TODO: 如果加载失败
  4. 加载checkPoint文件,如果checkPoint文件存在,加载commitlog,consumeQueue,indexFile文件的刷盘点。
    TODO: 什么时候记录的刷盘点?
  5. 加载IndexFile, 加载IndexHeader.以下是indexHeader所拥有的属性:
    TODO: 这里不太明白indexCount的作用
IndexHeader
beginTimestamp 文件起始时间戳
endTimestamp 文件结束时间戳
beginPhyOffset 起始消息的物理地址
endPhyOffset 结束消息的物理地址
hashSlotCount hash槽的个数
indexCount 索引的个数
  1. 如果broker非正常退出,如果索引文中消息最大时间戳大于索引文件的刷盘时间,那么将该索引文件销毁。
    • TODO: 为什么索引文件的时间比索引文件的刷盘时间还大,这说明了什么?
  2. 遍历consumeQueueTable, 拿到每个consumeQueue,执行recover()方法。并且拿到所有topic,queue的MappedFile文件,取到最大的物理偏移量。并且返回。
    TODO: 这里为啥要执行recover方法,而不像commitLog,直接创建呢?
  3. 根据上次是否是正常退出,来执行commitlog的recover逻辑。
ConsumeQueue.revocer()
  1. 拿到该mappedQueue的所有MappedFile,
    • 如果MappedFile为空,直接返回。
  2. 从第三个文件开始,读取最大偏移量。 processOffset
    • 赋值给MappedFileQueue的flushWhere,commitedWhere变量
    • 赋值给最后MappedFile的wrotePosition,CommitedPosition,flushedPosition.
    • 如果最大偏移量,小于最小,也小于最大,则说明是一个空文件,直接删除该文件。 TODO: 不知道理解的对不对
CommitLog.recoverNormally(maxPhyOffsetOfConsumeQueue)
  1. 读取配置,判断是否校验CRC
  2. 遍历MappedFile
  3. 从倒数第三个文件开始读取MappedFile.
  4. 调用checkMessageAndReturnSize(byteBuffer,checkCRCOnRecover)来获取消息。该方法的消息调用步骤在《RocketMQ消息存储》的创建ConsumeQueue章节有详细讲解。
  5. 如果读取成功,mappedFileOffset增加消息的大小。
  6. 如果success为true, size为0,则证明为blank,读取下一个文件。
  7. 处理:

    • 赋值给MappedFileQueue的flushWhere,commitedWhere变量
    • 赋值给最后MappedFile的wrotePosition,CommitedPosition,flushedPosition.
    • 如果最大偏移量,小于最小,也小于最大,则说明是一个空文件,直接删除该文件。 TODO: 不知道理解的对不对
  8. 如果这里读取的最大偏移量小于consumeQueue中的最大偏移量。。那么说明consumeQueue中存储的多了,

    • 遍历所有ConsumeQueue,获取最后的文件,如果文件起始值都要大于commitLog偏移量,那么直接删除文件。
    • 如果起始值要小于commitlog偏移量,但是结束值大于commitlog偏移量,那么将wrotePosition,flushPosition,CommitedPosition设置为最大偏移量。

TODO: 我记得刚刚初始化commitlog的时候也会设置一次值,那么和recover的设置有什么区别?

CommitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue)

处理commitLog的异常恢复逻辑与正恢复逻辑相似。
但是正常处理是从倒数第三个文件开始,然后往后找。异常恢复的逻辑是从最后一个文件开始,往前找,找到第一个正常的文件。

  1. 如果commitlog的mappedFiles为空,那么删除所有的consumeQueue.
  2. 组装消息,进行分发,建立consumeQueue以及IndexFile.
  3. 如果size为0,代表已经到了文件末尾,index++取下一个文件。
  4. 如果这里读取的最大偏移量小于consumeQueue中的最大偏移量。。那么说明consumeQueue中存储的多了,
    • 遍历所有ConsumeQueue,获取最后的文件,如果文件起始值都要大于commitLog偏移量,那么直接删除文件。
    • 如果起始值要小于commitlog偏移量,但是结束值大于commitlog偏移量,那么将wrotePosition,flushPosition,CommitedPosition设置为最大偏移量。

那么如何判断一个文件是否是正常的文件?

CommitLog.isMappedFileMatchedRecover(MappedFile)
  1. 首先判断文件的魔数,如果文件的魔数不对,说明不是消息的存储文件。直接返回false.
  2. 如果第一条消息的存储时间为0,返回false.
  3. 如果messageIndexSafe设置为了true,那么索引文件也参与进来。

总结:
存储启动的时候,文件恢复主要做了: 完成flushedPosition以及commitedWhere指针的设置,消息消费队列最大偏移量加载到内存中,删除flushedPosition之后的所有文件。在文件恢复的过程中,RocketMQ会将最后一个有效文件中的所有消息重新分发,确保消息不丢失。但是会带来消息重复消费的问题。

刷盘