RocketMQ消息存储

前言

在看RocketMQ消息生产以及消息消费的过程中,总是对broker端做的动作有些模糊不解,在看broker端处理的代码时候,因为不理解RocketMQ的消息布局,所以也看不大懂。因此下定决心将RocketMQ中消息存储部分的源码进行梳理,从而对RocketMQ的核心部分有一个整体的了解,同时方便之后的复习。

所有TODO,都是原理待看的地方。这里先对主流程进行梳理, 细节稍后再看。
另外涉及到内存映射,同/异步刷盘,主从同步的地方也是稍微再看。现在暂时只对主流程进行处理。

理论部分

RocketMQ的内存存储分为三个部分: commitlog, consumeQueue, index.

commitlog

RocketMQ使用MappedFile以及MappedFileQueue来组织commitlog.
MappedFile是一个commitlog文件,而MappedFileQueue为MappedFile的管理器,是对文件目录的封装。

1
2
MappedQueue中有一个属性叫做:CopyOnWriteArrayList<MappedFile> mappedFiles
用来存储所管理的MappedFile. 有一点要注意的是,RocketMQ会定时删除所有消费过的消息。原因是:RocketMQ中都是基于内存映射的,所以存储目录下的所有文件都会在内存中开辟一块等大的空间,如果不定时将已消费的消息从存储文件中删除,那么会造成极大的内存压力与资源浪费。

源码部分

commitLog的处理逻辑

入口为DefaultMessageStore类.

DefaultMessageStore.putMessage(MessageExtBrokerInner msg)
  1. 校验store是否关闭,如果关闭,返回错误。SERVICE_NOT_AVAILABLE
  2. 校验当前broker的角色是否是slave,如果是slave,返回错误。 SERVICE_NOT_AVAILABLE
  3. 校验当前 runningFlag是否具备写权限,如果不具备,返回错误。SERVICE_NOT_AVAILABLE
  4. 校验消息topic是否超长,如果超长(128字符长度),返回错误。MESSAGE_ILLEGAL
  5. 校验消息的属性是否超长,如果超长(65536字符长度),返回错误。PROPERTIES_SIZE_EXCEEDED
  6. 校验当前OSPageCache是否繁忙,如果繁忙,返回错误。OS_PAGECACHE_BUSY
    TODO: 如何检验OSPageCache是否繁忙?
  7. 调用commitLog.putMessage(msg)方法,等待返回。 返回值如下:
PutMessageResult 简介
PutMessageStatus 处理状态, 如果成功为PUT_OK
AppendMessageResult 在处理MappedFile的时候包装的返回值
  1. 计算处理时间,如果大于500ms,打印warn级别日志。
  2. 统计处理时间。
  3. 如果result为空,或者不为成功,统计失败次数。
  4. 返回result。
CommitLog.putMessage(MessageExtBrokerInner msg)
  1. 设置消息的存储时间storeTimestamp。
  2. 设置消息的CRC。TODO: 什么是CRC?
  3. 根据当前消息的sysFlag拿到事务消息的flag.
    TODO: 这里消息的sysFlag是什么东西,和消息消费中的flag有什么区别?
  4. (不是事务消息 || 到了事务消息的commit阶段) && 同时消息为延时消息。 那么将消息的topic以及queueId放入消息属性中,key分别为:REAL_TOPICREAL_QID。 将消息的topic设置为:SCHEDULE_TOPIC_XXX, 然后根据消息的延时级别放入对应的queueId中。
  5. 从mappedFileQueue中拿到最新的mappedFile.
    TODO: 什么是mappedFileQueue, 什么是mappedFile
    TODO: 如何从mappedFileQueue中拿到最新的mappedFile
  6. 上锁,这里分为自旋锁,以及可重入锁。
    TODO:这里自旋锁以及可重入锁的实现。什么情况下用什么样的锁?
  7. 重新设置storeTimestamp,因为同一时间,只有一个线程可以进入commitLog,所以在临界区中再重新设置时间,可以保证消息的全局有序性。
    TODO:为什么上面要设置一遍存储时间?
  8. 如果mappedFile为空,或者已满。会创建一个新的mappedFile.
    • 如果mappedFile为空,那么新建的mappedFile的起始偏移量为0。
    • 如果已满,那么起始偏移量为当前mappedFile的起始迁移量+mappedFileSize。
  9. 如果mappedFile仍旧为空(多数情况下是因为磁盘的空间不足或者权限不够),返回错误。CREATE_MAPEDFILE_FAILED
  10. 执行,MappedFile.appendMessage(msg, this.appendMessageCallback), 等待返回,返回值在MappedFile中会提到。
  11. 拿到返回结果,释放锁。
    • 如果为PUT_OK
    • 如果为END_OF_FILE,说明当前的mappedFile的空间不足,需要创建一个新的mappedFile。如果在创建新mappedFile的时候出现异常,则返回错误。CREATE_MAPEDFILE_FAILED。 拿到新的mappedFile之后重新执行appendMessage()。 这一步会给unlockMappedFile复制,值为旧的mappedFile.
    • 如果是MESSAGE_SIZE_EXCEEDED或者PROPERTIES_SIZE_EXCEEDED 返回错误, MESSAGE_ILLEGAL
    • 如果是UNKNOWN_ERROR或者是别的错误, 返回错误,UNKNOWN_ERROR
  12. 计算总共锁的时间,如果超过了500ms,则打印warn日志。
  13. 如果unlockMappedFile不为空(在第11步会赋值) && messageStoreConfig.isWarmMapedFileEnable = true(默认为false)
    TODO: 这一步不影响主流程,目前看不出是做什么
  14. 包装返回信息。PutMessageResult 状态为: PUT_OK
  15. 调用storeStatsService进行统计。
  16. 调用handleDiskFlush(result, putMessageResult, msg)进行刷盘。
    TODO: 同步刷盘以及异步刷盘,很重要。
  17. handleHA(result, putMessageResult, msg);
    TODO: 主从同步,很重要
  18. 返回putMessageResult。

MappedFile.appendMessage(MessageExtBrokerInner msg, AppendMessageCallback cb)
  1. 如果当前mappedFile的写指针大于或者等于当前mappedFile的文件大小那么抛出UNKOWN_ERROR异常。
  2. 如果mappedFile的写指针小于mappedFile的大小,那么创建一个与mappedFile相同大小的内存映射,同时将缓存的位置与当前mappedFile的写指针的位置对齐。
    TODO:这里涉及到了内存映射,是一个核心知识。
  3. 根据消息的所属进行不同的处理,消息的所属是指:该消息是单笔消息还是批量消息。如果消息不属于这两个中的任何一个,返回异常。UNKNOWN_ERROR
  4. 如果是单笔消息,执行AppendMessageCallback.doAppend(fileFromOffset,byteBuffer, maxBlank,MessageExtBrokerInner) 等待返回。 返回值如下:
    • maxBlank = fileSize - currentPos
AppendMessageResult 中的属性 简介
AppendMessageStatus 处理状态,如果处理成功为PUT_OK
wroteOffset 消息在commitlog中的物理地址
wroteBytes 消息的长度
msgId 消息ID
storeTimestamp 存储时间
pagecacheRT 存入内存映射的耗时

需要注意的是在拿到第四步结果的时候,消息还没有持久到磁盘中,当前只是存在于内存中

  1. MapFile的wrotePositon增加对应消息的长度。
  2. 返回返回AppendMessageResult。

DefaultAppendMessageCallback.doAppend(long fileFromOffset,ByteBuffer byteBuffer,int maxBlank,MessageExtBrokerInner msgInner)
  1. 计算消息的物理偏移位置: long wroteOffset = fileFromOffset + byteBuffer.position();
  2. 将字节缓冲区的可读位置重置为0,同时限制只能读取八位。
    TODO: 这里涉及到了NIO中的ByteBuffer的flip函数以及limit函数
  3. 创建消息ID. ID的组成为: 4字节host地址+4字节端口号+8字节的消息偏移量。
  4. 查询当前commitLog实例的topicQueueTable变量,拿到当前topig-queueId待写入的偏移量。如果为空,那么从0开始,并添加到topicQueueTable中。

    • DefaultAppendMessageCallback是CommitLog类的内部类因此可以在DefaultAppendMessageCallBck对象中操作外部类的对象。
      这里运用了StringBuilder.setLength()方法重用了StringBuilder对象
      TODO: 这里的queueOffset与上面计算出来的物理偏移地址有什么区别?

      queueOffset是位于consumeQueue中的逻辑偏移地址。 1代表第一个消息,2代表是第二个消息。

  5. 检查当前消息是否是事务消息,如果是事务消息,阶段为预处理或者是撤销,那么将queueOffset设置为0,不加入consumeQueue中去。

  6. 将消息的properties进行序列化。如果消息属性的字节长度大于32768,那么返回错误。PROPERTIES_SIZE_EXCEEDED
    TODO: 这里有个问题就是在刚开的时候也会对消息的长度进行校验,但是那个时候是对String类型进行校验,而这里是在String类型序列化后进行校验。所以一个是字符长度,一个是字节长度?
  7. 拿到消息的topic字节数组,以及消息体的字节数组,再拿到上一步序列化后的properties的长度,计算消息的长度。如果消息的长度大于4M(默认),返回错误。MESSAGE_SIZE_EXCEEDED
  8. 如果消息长度+空字节数(固定位八个字节)大于当前mappedFile的剩余空间,将mappedFile对应的内存映射ByteBuffer剩余空间填空,返回错误 END_OF_FILE.
    • 当前mappedFile的剩余空间在调用方法的时候传过来,属性名为maxBlank
  9. 重置msgStoreItemMemory,将消息按照固定顺序写入msgStoreItemMemory中。再将msgStoreItemMemory写入byteBuffer中。
    TODO: 这里为什么不直接往byteBuffer中写呢?做个中间转移有什么作用?
  10. 包装返回结果,返回信息为PUT_OK
  11. 根据消息的sysFlag对消息的逻辑地址进行修改。如果消息不是事务消息,或者消息到了事务消息的commit阶段,那么将topicQueueTable中的ququeOffset加1.
  12. 返回AppendMessageResult。

ConsumeQueue的处理逻辑

ReputMessageServcie可以准实时的转发CommitLog文件的更新事件,相应的任务处理器根据转发的消息及时更新ConsumeQueue和IndexFile文件。

ReputMessageService.run()

ReputMessageService.start()

  1. 每隔一秒钟执行一次doReput()方法,将commitLog中尚未提交到consumeQueue的消息进行处理。

  2. 根据reputFromOffset查找所属的commitLog文件,然后拿到该commitLog文件从reputFromOffset开始之后的所有数据。

    • 如果数据为为空,设置doNext为false,暂停一秒后进行下一次循环。
    • 如果数据不为空,执行3.
  3. 简单介绍一下返回值 SelectMappedBufferResult的属性
SelectMappedBufferResult
startOffset 消息在commitlog中的物理起始偏移量
byteBuffer 消息的字节缓冲区
size 消息的大小
mappedFile commitlog所对应的mappedFile的对象
  1. 调用CommitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false) 创建分发请求: DispatchRequest.
  2. 对结果进行判断
    • 如果 success=false && size>0 读到的消息的长度与消息存入时的总长度不同,打印error级别日志,同时跳过该消息。
    • 如果 success=false && size=0 代表魔数不对,如果当前broker为master,打印error级别日志, TODO:这里没有理解
    • DefaultMessageStore.this.doDispatch(dispatchRequest);

6.

CommitLog.checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody)

按照当初存储消息的顺序来进行读取消息。

  1. 读取消息总大小
  2. 读取消息魔数
    • 如果魔数是MESSAGE_MAGIC_CODE,那么走到3
    • 如果魔数是BLANK_MAGIC_CODE, 说明是空白填充,直接返回。size = 0, success = true.
    • 如果是别的那么打印warn级别日志,并返回异常。 size = -1 , success = false.
  3. 读取一系列的msg属性
    • CRC
    • queueId
    • flag
    • queueOffset
    • physicOffset
    • sysFlag
    • bornTimestamp
    • host
    • storeTimestamp
    • port
    • reconsumeTimes
    • preparedTransactionOffset
    • 获取body体的长度
      • 如果需要检查body的CRC验证,则进行CRC验证
    • 获取topic的长度
    • 获取消息topic
    • 获取消息属性的长度
    • 获取消息的属性
      • 拿到索引的keys
      • 获取消息唯一键
      • 获取消息的tags
      • 对tags进行hash赋值给tagsCode
  4. 获取消息的延迟级别,如果延迟消息不为空,同时延迟级别不为0,那么tagsCode的值为提交时间。
  5. 计算上面拿到的总的消息长度,并与totalSize进行对比,如果不等则打印warn级别日志,并且seccess为false. 同时携带消息的总长度。
  6. 返回DispatchRequest对象,一个DispatchRequest对象代表了一条待处理的消息。
CommitLogDispatcherBuildConsumeQueue.dispatch(DispatchRequest request)
  1. 拿到当前消息的tranType, 如果是事务消息的prepared阶段 那么直接返回。如果不是事务消息,或者到了事务消息的commit阶段,继续处理。
  2. 查询当前topic以及queueId对应的consumeQueue.
  3. ConsumeQueue.putMessagePositionInfoWrapper(dispatchRequest).
  4. 判断是否写入成功:
    • 如果写入成功: 拿到DefaultMessageStore对象中storeCheckPoint, 给LogicsMsgTimestamp赋值为消息的落库时间。
    • 如果写入失败:打印warn级别日志,线程睡眠一秒.
ConsumeQueue.putMessagePositionInfoWrapper(DispatchRequest

request)

  1. 检查当前messageStore是否可写。
  2. 拿到上面计算出的tagsCode.
  3. 如果开启了consumeQueueExt(EnableConsumeQueueExt=true),做一些特殊处理。TODO: 与主流程无关
  4. 判断消息的物理偏移量与当前consumeQueue的最大物理偏移量,如果消息的物理偏移量小那么直接返回true.
  5. 根据消息中的consumeQueue,全文搜索topicQueueTable可以查到该值的来源。 使用consumeQueueOffset * msgsize(20) 就可以计算出该消息在consumeQueue中的逻辑偏移地址 。
  6. 使用该逻辑便宜地址去MappedFileQueue中拿到最新的MappedFile.
  7. 如果MappedFile为空,返回false.
  8. 更新consumeQueue的最大物理偏移地址(maxPhysicOffset), 并执行mappedFile.appendMessage(this.byteBufferIndex.array()). : 将数据写入MappedFile对应的fileChannel,并更新写指针
    TODO: 内存映射中,直接写入fileChannel,写入ByteBuffer, 写入MappedByteBuffer, 这三个究竟有什么区别,,?
  9. 根据写入情况,返回true或者false.