博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMQ源码解析:Message存储
阅读量:5859 次
发布时间:2019-06-19

本文共 40789 字,大约阅读时间需要 135 分钟。

???关注微信公众号:【芋艿的后端小屋】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右

1、概述

本文接。

主要解析 CommitLog 存储消息部分。

2、CommitLog 结构

CommitLogMappedFileQueueMappedFile 的关系如下:

CommitLog、MappedFileQueue、MappedFile的关系
CommitLog :
MappedFileQueue :
MappedFile = 1 : 1 : N。

反应到系统文件如下:

Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd/Users/yunai/store/commitlogYunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -ltotal 10485760-rw-r--r--  1 yunai  staff  1073741824  4 21 16:27 00000000000000000000-rw-r--r--  1 yunai  staff  1073741824  4 21 16:29 00000000001073741824-rw-r--r--  1 yunai  staff  1073741824  4 21 16:32 00000000002147483648-rw-r--r--  1 yunai  staff  1073741824  4 21 16:33 00000000003221225472-rw-r--r--  1 yunai  staff  1073741824  4 21 16:32 00000000004294967296复制代码

CommitLogMappedFileQueueMappedFile 的定义如下:

  • MappedFile :00000000000000000000、00000000001073741824、00000000002147483648等文件。
  • MappedFileQueueMappedFile 所在的文件夹,对 MappedFile 进行封装成文件队列,对上层提供可无限使用的文件容量。
    • 每个 MappedFile 统一文件大小。
    • 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在 CommitLog 里默认为 1GB。
  • CommitLog :针对 MappedFileQueue 的封装使用。

CommitLog 目前存储在 MappedFile 有两种内容类型:

  1. MESSAGE :消息。
  2. BLANK :文件不足以存储消息时的空白占位。

CommitLog 存储在 MappedFile的结构:

MESSAGE[1] MESSAGE[2] ... MESSAGE[n - 1] MESSAGE[n] BLANK

MESSAGECommitLog 存储结构:

第几位 字段 说明 数据类型 字节数
1 MsgLen 消息总长度 Int 4
2 MagicCode MESSAGE_MAGIC_CODE Int 4
3 BodyCRC 消息内容CRC Int 4
4 QueueId 消息队列编号 Int 4
5 Flag flag Int 4
6 QueueOffset 消息队列位置 Long 8
7 PhysicalOffset 物理位置。在 CommitLog 的顺序存储位置。 Long 8
8 SysFlag MessageSysFlag Int 4
9 BornTimestamp 生成消息时间戳 Long 8
10 BornHost 生效消息的地址+端口 Long 8
11 StoreTimestamp 存储消息时间戳 Long 8
12 StoreHost 存储消息的地址+端口 Long 8
13 ReconsumeTimes 重新消费消息次数 Int 4
14 PreparedTransationOffset Long 8
15 BodyLength + Body 内容长度 + 内容 Int + Bytes 4 + bodyLength
16 TopicLength + Topic Topic长度 + Topic Byte + Bytes 1 + topicLength
17 PropertiesLength + Properties 拓展字段长度 + 拓展字段 Short + Bytes 2 + PropertiesLength

BLANKCommitLog 存储结构:

第几位 字段 说明 数据类型 字节数
1 maxBlank 空白长度 Int 4
2 MagicCode BLANK_MAGIC_CODE Int 4

3、CommitLog 存储消息

Broker存储发送消息顺序图

CommitLog#putMessage(...)

1: public PutMessageResult putMessage(final MessageExtBrokerInner msg) {  2:     // Set the storage time  3:     msg.setStoreTimestamp(System.currentTimeMillis());  4:     // Set the message body BODY CRC (consider the most appropriate setting  5:     // on the client)  6:     msg.setBodyCRC(UtilAll.crc32(msg.getBody()));  7:     // Back to Results  8:     AppendMessageResult result = null;  9:  10:     StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); 11:  12:     String topic = msg.getTopic(); 13:     int queueId = msg.getQueueId(); 14:  15:     // 事务相关 TODO 待读:事务相关 16:     final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); 17:     if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE// 18:         || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { 19:         // Delay Delivery 20:         if (msg.getDelayTimeLevel() > 0) { 21:             if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { 22:                 msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); 23:             } 24:  25:             topic = ScheduleMessageService.SCHEDULE_TOPIC; 26:             queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); 27:  28:             // Backup real topic, queueId 29:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); 30:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); 31:             msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); 32:  33:             msg.setTopic(topic); 34:             msg.setQueueId(queueId); 35:         } 36:     } 37:  38:     long eclipseTimeInLock = 0; 39:  40:     // 获取写入映射文件 41:     MappedFile unlockMappedFile = null; 42:     MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); 43:  44:     // 获取写入锁 45:     lockForPutMessage(); //spin... 46:     try { 47:         long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); 48:         this.beginTimeInLock = beginLockTimestamp; 49:  50:         // Here settings are stored timestamp, in order to ensure an orderly 51:         // global 52:         msg.setStoreTimestamp(beginLockTimestamp); 53:  54:         // 当不存在映射文件时,进行创建 55:         if (null == mappedFile || mappedFile.isFull()) { 56:             mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise 57:         } 58:         if (null == mappedFile) { 59:             log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); 60:             beginTimeInLock = 0; 61:             return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); 62:         } 63:  64:         // 存储消息 65:         result = mappedFile.appendMessage(msg, this.appendMessageCallback); 66:         switch (result.getStatus()) { 67:             case PUT_OK: 68:                 break; 69:             case END_OF_FILE: // 当文件尾时,获取新的映射文件,并进行插入 70:                 unlockMappedFile = mappedFile; 71:                 // Create a new file, re-write the message 72:                 mappedFile = this.mappedFileQueue.getLastMappedFile(0); 73:                 if (null == mappedFile) { 74:                     // XXX: warn and notify me 75:                     log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); 76:                     beginTimeInLock = 0; 77:                     return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); 78:                 } 79:                 result = mappedFile.appendMessage(msg, this.appendMessageCallback); 80:                 break; 81:             case MESSAGE_SIZE_EXCEEDED: 82:             case PROPERTIES_SIZE_EXCEEDED: 83:                 beginTimeInLock = 0; 84:                 return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); 85:             case UNKNOWN_ERROR: 86:                 beginTimeInLock = 0; 87:                 return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); 88:             default: 89:                 beginTimeInLock = 0; 90:                 return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); 91:         } 92:  93:         eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; 94:         beginTimeInLock = 0; 95:     } finally { 96:         // 释放写入锁 97:         releasePutMessageLock(); 98:     } 99: 100:     if (eclipseTimeInLock > 500) {101:         log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);102:     }103: 104:     // 105:     if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {106:         this.defaultMessageStore.unlockMappedFile(unlockMappedFile);107:     }108: 109:     PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);110: 111:     // Statistics112:     storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();113:     storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());114: 115:     // 进行同步||异步 flush||commit116:     GroupCommitRequest request = null;117:     // Synchronization flush118:     if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {119:         final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;120:         if (msg.isWaitStoreMsgOK()) {121:             request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());122:             service.putRequest(request);123:             boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());124:             if (!flushOK) {125:                 log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()126:                     + " client address: " + msg.getBornHostString());127:                 putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);128:             }129:         } else {130:             service.wakeup();131:         }132:     }133:     // Asynchronous flush134:     else {135:         if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {136:             flushCommitLogService.wakeup(); // important:唤醒commitLog线程,进行flush137:         } else {138:             commitLogService.wakeup();139:         }140:     }141: 142:     // Synchronous write double 如果是同步Master,同步到从节点 // TODO 待读:数据同步143:     if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {144:         HAService service = this.defaultMessageStore.getHaService();145:         if (msg.isWaitStoreMsgOK()) {146:             // Determine whether to wait147:             if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {148:                 if (null == request) {149:                     request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());150:                 }151:                 service.putRequest(request);152: 153:                 service.getWaitNotifyObject().wakeupAll();154: 155:                 boolean flushOK =156:                     // TODO157:                     request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());158:                 if (!flushOK) {159:                     log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "160:                         + msg.getTags() + " client address: " + msg.getBornHostString());161:                     putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);162:                 }163:             }164:             // Slave problem165:             else {166:                 // Tell the producer, slave not available167:                 putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);168:             }169:         }170:     }171: 172:     return putMessageResult;173: }复制代码
  • 说明 :存储消息,并返回存储结果。
  • 第 2 行 :设置存储时间等。
  • 第 16 至 36 行 :事务消息相关,暂未了解。
  • 第 45 & 97 行 :获取锁与释放锁。
  • 第 52 行 :再次设置存储时间。目前会有多处地方设置存储时间。
  • 第 55 至 62 行 :获取 MappedFile,若不存在或已满,则进行创建。详细解析见:。
  • 第 65 行 :插入消息MappedFile,解析解析见:。
  • 第 69 至 80 行 :MappedFile 已满,创建新的,再次插入消息
  • 第 116 至 140 行 :消息刷盘,即持久化到文件。上面插入消息实际未存储到硬盘。此处,根据不同的刷盘策略,执行会有不同。详细解析见:。
  • 第 143 至 173 行 :Broker 主从同步。后面的文章会详细解析?。

MappedFileQueue#getLastMappedFile(...)

1: public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {  2:     long createOffset = -1; // 创建文件开始offset。-1时,不创建  3:     MappedFile mappedFileLast = getLastMappedFile();  4:   5:     if (mappedFileLast == null) { // 一个映射文件都不存在  6:         createOffset = startOffset - (startOffset % this.mappedFileSize);  7:     }  8:   9:     if (mappedFileLast != null && mappedFileLast.isFull()) { // 最后一个文件已满 10:         createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize; 11:     } 12:  13:     if (createOffset != -1 && needCreate) { // 创建文件 14:         String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset); 15:         String nextNextFilePath = this.storePath + File.separator 16:             + UtilAll.offset2FileName(createOffset + this.mappedFileSize); 17:         MappedFile mappedFile = null; 18:  19:         if (this.allocateMappedFileService != null) { 20:             mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath, 21:                 nextNextFilePath, this.mappedFileSize); 22:         } else { 23:             try { 24:                 mappedFile = new MappedFile(nextFilePath, this.mappedFileSize); 25:             } catch (IOException e) { 26:                 log.error("create mappedFile exception", e); 27:             } 28:         } 29:  30:         if (mappedFile != null) { 31:             if (this.mappedFiles.isEmpty()) { 32:                 mappedFile.setFirstCreateInQueue(true); 33:             } 34:             this.mappedFiles.add(mappedFile); 35:         } 36:  37:         return mappedFile; 38:     } 39:  40:     return mappedFileLast; 41: }复制代码
  • 说明 :获取最后一个 MappedFile,若不存在或文件已满,则进行创建。
  • 第 5 至 11 行 :计算当文件不存在或已满时,新创建文件的 createOffset
  • 第 14 行 :计算文件名。从此处我们可

    以得知,MappedFile的文件命名规则:

    fileName[n] = fileName[n - 1] + n * mappedFileSize

    fileName[0] = startOffset - (startOffset % this.mappedFileSize)

    目前 CommitLogstartOffset 为 0。

    此处有个疑问,为什么需要 (startOffset % this.mappedFileSize)。例如:

    | startOffset | mappedFileSize | createOffset |

    | --- | :-- | :-- |
    | 5 | 1 | 5 |
    | 5 | 2 | 4 |
    | 5 | 3 | 3 |
    | 5 | 4 | 4 |
    | 5 | > 5 | 0 |

    如果有知道的同学,麻烦提示下。?

    解答:fileName[0] = startOffset - (startOffset % this.mappedFileSize) 计算出来的是,以 this.mappedFileSize 为每个文件大小时,startOffset 所在文件的开始offset

  • 第 30 至 35 行 :设置 MappedFile是否是第一个创建的文件。该标识用于 ConsumeQueue 对应的 MappedFile ,详见 ConsumeQueue#fillPreBlank

MappedFile#appendMessage(...)

1: public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {  2:     assert msg != null;  3:     assert cb != null;  4:   5:     int currentPos = this.wrotePosition.get();  6:   7:     if (currentPos < this.fileSize) {  8:         ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();  9:         byteBuffer.position(currentPos); 10:         AppendMessageResult result = 11:             cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg); 12:         this.wrotePosition.addAndGet(result.getWroteBytes()); 13:         this.storeTimestamp = result.getStoreTimestamp(); 14:         return result; 15:     } 16:  17:     log.error("MappedFile.appendMessage return null, wrotePosition: " + currentPos + " fileSize: " 18:         + this.fileSize); 19:     return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); 20: }复制代码
  • 说明 :插入消息MappedFile,并返回插入结果。
  • 第 8 行 :获取需要写入的字节缓冲区。为什么会有 writeBuffer != null 的判断后,使用不同的字节缓冲区,见:。
  • 第 9 至 11 行 :设置写入 position,执行写入,更新 wrotePosition(当前写入位置,下次开始写入开始位置)。

DefaultAppendMessageCallback#doAppend(...)

1: class DefaultAppendMessageCallback implements AppendMessageCallback {  2:     // File at the end of the minimum fixed length empty  3:     private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;  4:     /**  5:      * 存储在内存中的消息编号字节Buffer  6:      */  7:     private final ByteBuffer msgIdMemory;  8:     /**  9:      * Store the message content 10:      * 存储在内存中的消息字节Buffer 11:      * 当消息传递到{
@link #doAppend(long, ByteBuffer, int, MessageExtBrokerInner)}方法时,最终写到该参数 12: */ 13: private final ByteBuffer msgStoreItemMemory; 14: /** 15: * The maximum length of the message 16: * 消息最大长度 17: */ 18: private final int maxMessageSize; 19: /** 20: * Build Message Key 21: * {
@link #topicQueueTable}的key 22: * 计算方式:topic + "-" + queueId 23: */ 24: private final StringBuilder keyBuilder = new StringBuilder(); 25: /** 26: * host字节buffer 27: * 用于重复计算host的字节内容 28: */ 29: private final ByteBuffer hostHolder = ByteBuffer.allocate(8); 30: 31: DefaultAppendMessageCallback(final int size) { 32: this.msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); 33: this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH); 34: this.maxMessageSize = size; 35: } 36: 37: public ByteBuffer getMsgStoreItemMemory() { 38: return msgStoreItemMemory; 39: } 40: 41: public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) { 42: // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET
43: 44: // PHY OFFSET 45: long wroteOffset = fileFromOffset + byteBuffer.position(); 46: 47: // 计算commitLog里的msgId 48: this.resetByteBuffer(hostHolder, 8); 49: String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset); 50: 51: // Record ConsumeQueue information 获取队列offset 52: keyBuilder.setLength(0); 53: keyBuilder.append(msgInner.getTopic()); 54: keyBuilder.append('-'); 55: keyBuilder.append(msgInner.getQueueId()); 56: String key = keyBuilder.toString(); 57: Long queueOffset = CommitLog.this.topicQueueTable.get(key); 58: if (null == queueOffset) { 59: queueOffset = 0L; 60: CommitLog.this.topicQueueTable.put(key, queueOffset); 61: } 62: 63: // Transaction messages that require special handling // TODO 疑问:用途 64: final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); 65: switch (tranType) { 66: // Prepared and Rollback message is not consumed, will not enter the 67: // consumer queue 68: case MessageSysFlag.TRANSACTION_PREPARED_TYPE: 69: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: 70: queueOffset = 0L; 71: break; 72: case MessageSysFlag.TRANSACTION_NOT_TYPE: 73: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: 74: default: 75: break; 76: } 77: 78: // 计算消息长度 79: final byte[] propertiesData = 80: msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); 81: final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; 82: if (propertiesLength > Short.MAX_VALUE) { 83: log.warn("putMessage message properties length too long. length={}", propertiesData.length); 84: return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); 85: } 86: final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); 87: final int topicLength = topicData.length; 88: final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; 89: final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength); 90: // Exceeds the maximum message 91: if (msgLen > this.maxMessageSize) { 92: CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength 93: + ", maxMessageSize: " + this.maxMessageSize); 94: return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); 95: } 96: 97: // Determines whether there is sufficient(足够) free space 98: if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { 99: this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);100: // 1 TOTAL_SIZE101: this.msgStoreItemMemory.putInt(maxBlank);102: // 2 MAGIC_CODE103: this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);104: // 3 The remaining space may be any value105: //106: 107: // Here the length of the specially set maxBlank108: final long beginTimeMills = CommitLog.this.defaultMessageStore.now();109: byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);110: return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),111: queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);112: }113: 114: // Initialization of storage space115: this.resetByteBuffer(msgStoreItemMemory, msgLen);116: // 1 TOTAL_SIZE117: this.msgStoreItemMemory.putInt(msgLen);118: // 2 MAGIC_CODE119: this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);120: // 3 BODY_CRC121: this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());122: // 4 QUEUE_ID123: this.msgStoreItemMemory.putInt(msgInner.getQueueId());124: // 5 FLAG125: this.msgStoreItemMemory.putInt(msgInner.getFlag());126: // 6 QUEUE_OFFSET127: this.msgStoreItemMemory.putLong(queueOffset);128: // 7 PHYSICAL_OFFSET129: this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());130: // 8 SYS_FLAG131: this.msgStoreItemMemory.putInt(msgInner.getSysFlag());132: // 9 BORN_TIMESTAMP133: this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());134: // 10 BORN_HOST135: this.resetByteBuffer(hostHolder, 8);136: this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));137: // 11 STORE_TIMESTAMP138: this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());139: // 12 STORE_HOST_ADDRESS140: this.resetByteBuffer(hostHolder, 8);141: this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));142: //this.msgStoreItemMemory.put(msgInner.getStoreHostBytes());143: // 13 RECONSUME_TIMES144: this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());145: // 14 Prepared Transaction Offset146: this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());147: // 15 BODY148: this.msgStoreItemMemory.putInt(bodyLength);149: if (bodyLength > 0)150: this.msgStoreItemMemory.put(msgInner.getBody());151: // 16 TOPIC152: this.msgStoreItemMemory.put((byte) topicLength);153: this.msgStoreItemMemory.put(topicData);154: // 17 PROPERTIES155: this.msgStoreItemMemory.putShort((short) propertiesLength);156: if (propertiesLength > 0)157: this.msgStoreItemMemory.put(propertiesData);158: 159: final long beginTimeMills = CommitLog.this.defaultMessageStore.now();160: // Write messages to the queue buffer161: byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);162: 163: AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,164: msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);165: 166: switch (tranType) {167: case MessageSysFlag.TRANSACTION_PREPARED_TYPE:168: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:169: break;170: case MessageSysFlag.TRANSACTION_NOT_TYPE:171: case MessageSysFlag.TRANSACTION_COMMIT_TYPE:172: // The next update ConsumeQueue information 更新队列的offset173: CommitLog.this.topicQueueTable.put(key, ++queueOffset);174: break;175: default:176: break;177: }178: return result;179: }180: 181: /**182: * 重置字节缓冲区183: *184: * @param byteBuffer 字节缓冲区185: * @param limit 长度186: */187: private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {188: byteBuffer.flip();189: byteBuffer.limit(limit);190: }191: }复制代码
  • 说明 :插入消息到字节缓冲区。
  • 第 45 行 :计算物理位置。在 CommitLog 的顺序存储位置。
  • 第 47 至 49 行 :计算 CommitLog 里的 offsetMsgId。这里一定要和 msgId 区分开。
计算方式 长度
offsetMsgId Broker存储时生成 Hex(storeHostBytes, wroteOffset) 32
msgId Client发送消息时生成 Hex(进程编号, IP, ClassLoader, startTime, currentTime, 自增序列) 32
  • 第 51 至 61 行 :获取队列位置(offset)。
  • 第 78 至 95 行 :计算消息总长度。
  • 第 98 至 112 行 :当文件剩余空间不足时,写入 BLANK 占位,返回结果。
  • 第 114 至 161 行 :写入 MESSAGE
  • 第 173 行 :更新队列位置(offset)。

FlushCommitLogService

FlushCommitLogService类图
线程服务 场景 插入消息性能
CommitRealTimeService 异步刷盘 && 开启内存字节缓冲区 第一
FlushRealTimeService 异步刷盘 && 关闭内存字节缓冲区 第二
GroupCommitService 同步刷盘 第三

MappedFile#落盘

方式
方式一 写入内存字节缓冲区(writeBuffer) 从内存字节缓冲区(write buffer)提交(commit)到文件通道(fileChannel) 文件通道(fileChannel)flush
方式二 写入映射文件字节缓冲区(mappedByteBuffer) 映射文件字节缓冲区(mappedByteBuffer)flush

MappedFile的position迁移图

flush相关代码

考虑到写入性能,满足 flushLeastPages * OS_PAGE_SIZE 才进行 flush

1: /**  2:  * flush  3:  *  4:  * @param flushLeastPages flush最小页数  5:  * @return The current flushed position  6:  */  7: public int flush(final int flushLeastPages) {  8:     if (this.isAbleToFlush(flushLeastPages)) {  9:         if (this.hold()) { 10:             int value = getReadPosition(); 11:  12:             try { 13:                 //We only append data to fileChannel or mappedByteBuffer, never both. 14:                 if (writeBuffer != null || this.fileChannel.position() != 0) { 15:                     this.fileChannel.force(false); 16:                 } else { 17:                     this.mappedByteBuffer.force(); 18:                 } 19:             } catch (Throwable e) { 20:                 log.error("Error occurred when force data to disk.", e); 21:             } 22:  23:             this.flushedPosition.set(value); 24:             this.release(); 25:         } else { 26:             log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); 27:             this.flushedPosition.set(getReadPosition()); 28:         } 29:     } 30:     return this.getFlushedPosition(); 31: } 32:  33: /** 34:  * 是否能够flush。满足如下条件任意条件: 35:  * 1. 映射文件已经写满 36:  * 2. flushLeastPages > 0 && 未flush部分超过flushLeastPages 37:  * 3. flushLeastPages = 0 && 有新写入部分 38:  * 39:  * @param flushLeastPages flush最小分页 40:  * @return 是否能够写入 41:  */ 42: private boolean isAbleToFlush(final int flushLeastPages) { 43:     int flush = this.flushedPosition.get(); 44:     int write = getReadPosition(); 45:  46:     if (this.isFull()) { 47:         return true; 48:     } 49:  50:     if (flushLeastPages > 0) { 51:         return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages; 52:     } 53:  54:     return write > flush; 55: }复制代码

commit相关代码:

考虑到写入性能,满足 commitLeastPages * OS_PAGE_SIZE 才进行 commit

1: /**  2:  * commit  3:  * 当{
@link #writeBuffer}为null时,直接返回{
@link #wrotePosition} 4: * 5: * @param commitLeastPages commit最小页数 6: * @return 当前commit位置 7: */ 8: public int commit(final int commitLeastPages) { 9: if (writeBuffer == null) { 10: //no need to commit data to file channel, so just regard wrotePosition as committedPosition. 11: return this.wrotePosition.get(); 12: } 13: if (this.isAbleToCommit(commitLeastPages)) { 14: if (this.hold()) { 15: commit0(commitLeastPages); 16: this.release(); 17: } else { 18: log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); 19: } 20: } 21: 22: // All dirty data has been committed to FileChannel. 写到文件尾时,回收writeBuffer。 23: if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { 24: this.transientStorePool.returnBuffer(writeBuffer); 25: this.writeBuffer = null; 26: } 27: 28: return this.committedPosition.get(); 29: } 30: 31: /** 32: * commit实现,将writeBuffer写入fileChannel。 33: * @param commitLeastPages commit最小页数。用不上该参数 34: */ 35: protected void commit0(final int commitLeastPages) { 36: int writePos = this.wrotePosition.get(); 37: int lastCommittedPosition = this.committedPosition.get(); 38: 39: if (writePos - this.committedPosition.get() > 0) { 40: try { 41: // 设置需要写入的byteBuffer 42: ByteBuffer byteBuffer = writeBuffer.slice(); 43: byteBuffer.position(lastCommittedPosition); 44: byteBuffer.limit(writePos); 45: // 写入fileChannel 46: this.fileChannel.position(lastCommittedPosition); 47: this.fileChannel.write(byteBuffer); 48: // 设置position 49: this.committedPosition.set(writePos); 50: } catch (Throwable e) { 51: log.error("Error occurred when commit data to FileChannel.", e); 52: } 53: } 54: } 55: 56: /** 57: * 是否能够commit。满足如下条件任意条件: 58: * 1. 映射文件已经写满 59: * 2. commitLeastPages > 0 && 未commit部分超过commitLeastPages 60: * 3. commitLeastPages = 0 && 有新写入部分 61: * 62: * @param commitLeastPages commit最小分页 63: * @return 是否能够写入 64: */ 65: protected boolean isAbleToCommit(final int commitLeastPages) { 66: int flush = this.committedPosition.get(); 67: int write = this.wrotePosition.get(); 68: 69: if (this.isFull()) { 70: return true; 71: } 72: 73: if (commitLeastPages > 0) { 74: return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages; 75: } 76: 77: return write > flush; 78: }复制代码

FlushRealTimeService

消息插入成功时,异步刷盘时使用。

1: class FlushRealTimeService extends FlushCommitLogService {  2:     /**  3:      * 最后flush时间戳  4:      */  5:     private long lastFlushTimestamp = 0;  6:     /**  7:      * print计时器。  8:      * 满足print次数时,调用{
@link #printFlushProgress()} 9: */ 10: private long printTimes = 0; 11: 12: public void run() { 13: CommitLog.log.info(this.getServiceName() + " service started"); 14: 15: while (!this.isStopped()) { 16: boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); 17: int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); 18: int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); 19: int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); 20: 21: // Print flush progress 22: // 当时间满足flushPhysicQueueThoroughInterval时,即使写入的数量不足flushPhysicQueueLeastPages,也进行flush 23: boolean printFlushProgress = false; 24: long currentTimeMillis = System.currentTimeMillis(); 25: if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { 26: this.lastFlushTimestamp = currentTimeMillis; 27: flushPhysicQueueLeastPages = 0; 28: printFlushProgress = (printTimes++ % 10) == 0; 29: } 30: 31: try { 32: // 等待执行 33: if (flushCommitLogTimed) { 34: Thread.sleep(interval); 35: } else { 36: this.waitForRunning(interval); 37: } 38: 39: if (printFlushProgress) { 40: this.printFlushProgress(); 41: } 42: 43: // flush commitLog 44: long begin = System.currentTimeMillis(); 45: CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); 46: long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); 47: if (storeTimestamp > 0) { 48: CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); 49: } 50: long past = System.currentTimeMillis() - begin; 51: if (past > 500) { 52: log.info("Flush data to disk costs {} ms", past); 53: } 54: } catch (Throwable e) { 55: CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); 56: this.printFlushProgress(); 57: } 58: } 59: 60: // Normal shutdown, to ensure that all the flush before exit 61: boolean result = false; 62: for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { 63: result = CommitLog.this.mappedFileQueue.flush(0); 64: CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); 65: } 66: 67: this.printFlushProgress(); 68: 69: CommitLog.log.info(this.getServiceName() + " service end"); 70: } 71: 72: @Override 73: public String getServiceName() { 74: return FlushRealTimeService.class.getSimpleName(); 75: } 76: 77: private void printFlushProgress() { 78: // CommitLog.log.info("how much disk fall behind memory, " 79: // + CommitLog.this.mappedFileQueue.howMuchFallBehind()); 80: } 81: 82: @Override 83: @SuppressWarnings("SpellCheckingInspection") 84: public long getJointime() { 85: return 1000 * 60 * 5; 86: } 87: }复制代码
  • 说明:实时 flush线程服务,调用 MappedFile#flush 相关逻辑。
  • 第 23 至 29 行 :每 flushPhysicQueueThoroughInterval 周期,执行一次 flush 。因为不是每次循环到都能满足 flushCommitLogLeastPages 大小,因此,需要一定周期进行一次强制 flush 。当然,不能每次循环都去执行强制 flush,这样性能较差。
  • 第 33 行 至 37 行 :根据 flushCommitLogTimed 参数,可以选择每次循环是固定周期还是等待唤醒。默认配置是后者,所以,每次插入消息完成,会去调用 commitLogService.wakeup()
  • 第 45 行 :调用 MappedFile 进行 flush
  • 第 61 至 65 行 :Broker 关闭时,强制 flush,避免有未刷盘的数据。

CommitRealTimeService

消息插入成功时,异步刷盘时使用。

FlushRealTimeService 类似,性能更好。

1: class CommitRealTimeService extends FlushCommitLogService {  2:   3:     /**  4:      * 最后 commit 时间戳  5:      */  6:     private long lastCommitTimestamp = 0;  7:   8:     @Override  9:     public String getServiceName() { 10:         return CommitRealTimeService.class.getSimpleName(); 11:     } 12:  13:     @Override 14:     public void run() { 15:         CommitLog.log.info(this.getServiceName() + " service started"); 16:         while (!this.isStopped()) { 17:             int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); 18:             int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); 19:             int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); 20:  21:             // 当时间满足commitDataThoroughInterval时,即使写入的数量不足commitDataLeastPages,也进行flush 22:             long begin = System.currentTimeMillis(); 23:             if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { 24:                 this.lastCommitTimestamp = begin; 25:                 commitDataLeastPages = 0; 26:             } 27:  28:             try { 29:                 // commit 30:                 boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); 31:                 long end = System.currentTimeMillis(); 32:                 if (!result) { // TODO 疑问:未写入成功,为啥要唤醒flushCommitLogService 33:                     this.lastCommitTimestamp = end; // result = false means some data committed. 34:                     //now wake up flush thread. 35:                     flushCommitLogService.wakeup(); 36:                 } 37:  38:                 if (end - begin > 500) { 39:                     log.info("Commit data to file costs {} ms", end - begin); 40:                 } 41:  42:                 // 等待执行 43:                 this.waitForRunning(interval); 44:             } catch (Throwable e) { 45:                 CommitLog.log.error(this.getServiceName() + " service has exception. ", e); 46:             } 47:         } 48:  49:         boolean result = false; 50:         for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { 51:             result = CommitLog.this.mappedFileQueue.commit(0); 52:             CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); 53:         } 54:         CommitLog.log.info(this.getServiceName() + " service end"); 55:     } 56: }复制代码

GroupCommitService

消息插入成功时,同步刷盘时使用。

1: class GroupCommitService extends FlushCommitLogService {  2:     /**  3:      * 写入请求队列  4:      */  5:     private volatile List
requestsWrite = new ArrayList<>(); 6: /** 7: * 读取请求队列 8: */ 9: private volatile List
requestsRead = new ArrayList<>(); 10: 11: /** 12: * 添加写入请求 13: * 14: * @param request 写入请求 15: */ 16: public synchronized void putRequest(final GroupCommitRequest request) { 17: // 添加写入请求 18: synchronized (this.requestsWrite) { 19: this.requestsWrite.add(request); 20: } 21: // 切换读写队列 22: if (hasNotified.compareAndSet(false, true)) { 23: waitPoint.countDown(); // notify 24: } 25: } 26: 27: /** 28: * 切换读写队列 29: */ 30: private void swapRequests() { 31: List
tmp = this.requestsWrite; 32: this.requestsWrite = this.requestsRead; 33: this.requestsRead = tmp; 34: } 35: 36: private void doCommit() { 37: synchronized (this.requestsRead) { 38: if (!this.requestsRead.isEmpty()) { 39: for (GroupCommitRequest req : this.requestsRead) { 40: // There may be a message in the next file, so a maximum of 41: // two times the flush (可能批量提交的messages,分布在两个MappedFile) 42: boolean flushOK = false; 43: for (int i = 0; i < 2 && !flushOK; i++) { 44: // 是否满足需要flush条件,即请求的offset超过flush的offset 45: flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); 46: if (!flushOK) { 47: CommitLog.this.mappedFileQueue.flush(0); 48: } 49: } 50: // 唤醒等待请求 51: req.wakeupCustomer(flushOK); 52: } 53: 54: long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); 55: if (storeTimestamp > 0) { 56: CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); 57: } 58: 59: // 清理读取队列 60: this.requestsRead.clear(); 61: } else { 62: // Because of individual messages is set to not sync flush, it 63: // will come to this process 不合法的请求,比如message上未设置isWaitStoreMsgOK。 64: // 走到此处的逻辑,相当于发送一条消息,落盘一条消息,实际无批量提交的效果。 65: CommitLog.this.mappedFileQueue.flush(0); 66: } 67: } 68: } 69: 70: public void run() { 71: CommitLog.log.info(this.getServiceName() + " service started"); 72: 73: while (!this.isStopped()) { 74: try { 75: this.waitForRunning(10); 76: this.doCommit(); 77: } catch (Exception e) { 78: CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); 79: } 80: } 81: 82: // Under normal circumstances shutdown, wait for the arrival of the 83: // request, and then flush 84: try { 85: Thread.sleep(10); 86: } catch (InterruptedException e) { 87: CommitLog.log.warn("GroupCommitService Exception, ", e); 88: } 89: 90: synchronized (this) { 91: this.swapRequests(); 92: } 93: 94: this.doCommit(); 95: 96: CommitLog.log.info(this.getServiceName() + " service end"); 97: } 98: 99: /**100: * 每次执行完,切换读写队列101: */102: @Override103: protected void onWaitEnd() {104: this.swapRequests();105: }106: 107: @Override108: public String getServiceName() {109: return GroupCommitService.class.getSimpleName();110: }111: 112: @Override113: public long getJointime() {114: return 1000 * 60 * 5;115: }116: }复制代码
  • 说明:批量写入线程服务。
  • 第 16 至 25 行 :添加写入请求。方法设置了 sync 的原因:this.requestsWrite 会和 this.requestsRead 不断交换,无法保证稳定的同步。
  • 第 27 至 34 行 :读写队列交换。
  • 第 38 至 60 行 :循环写入队列,进行 flush
    • 第 43 行 :考虑到有可能每次循环的消息写入的消息,可能分布在两个 MappedFile(写第N个消息时,MappedFile 已满,创建了一个新的),所以需要有循环2次。
    • 第 51 行 :唤醒等待写入请求线程,通过 CountDownLatch 实现。
  • 第 61 至 66 行 :直接刷盘。此处是由于发送的消息的 isWaitStoreMsgOK 未设置成 TRUE ,导致未走批量提交。
  • 第 73 至 80 行 :每 10ms 执行一次批量提交。当然,如果 wakeup() 时,则会立即进行一次批量提交。当 Broker 设置成同步落盘 && 消息 isWaitStoreMsgOK=true,消息需要略大于 10ms 才能发送成功。当然,性能相对异步落盘较差,可靠性更高,需要我们在实际使用时去取舍。

结尾

写的第二篇与RocketMQ源码相关的博文,看到有阅读、点赞、收藏甚至订阅,很受鼓舞。

《Message存储》比起《Message发送&接收》从难度上说是更大的,当然也是更有趣的,如果存在理解错误或者表达不清晰,还请大家多多包含。如果可以的话,还请麻烦添加 QQ:7685413 进行指出,避免自己的理解错误,给大家造成困扰。

推荐,作者站在的高度比我高的多的多,嗯,按照李小璐的说法:高一个喜马拉雅山。?认真啃读《Linux内核设计与实现(原书第3版)》,day day up。

再次感谢大家的阅读、点赞、收藏。

下一篇: 起航!

转载地址:http://ykljx.baihongyu.com/

你可能感兴趣的文章
mysql alter
查看>>
shell脚本介绍、结构和执行、变量及date命令用法
查看>>
Xamarin.Android使用教程之在Android和Xamarin Android Visua
查看>>
javaweb学习总结(三十六)——使用JDBC进行批处理
查看>>
DPI , dot per inch
查看>>
怎样模拟日月地的运动关系
查看>>
浅谈如何做一名优秀的WEB前端工程师
查看>>
千个常用DOS命令全面收藏
查看>>
Rancher被Gartner评为“四大最酷云基础设施供应商”之一!
查看>>
Gradle学习笔记之Groovy
查看>>
yaf框架封装简单的pdo类
查看>>
在Centos上Rpm模式部署Mysql
查看>>
PHP中怎么使用PDO对象实现对MYsql数据库的增、删、改、查?
查看>>
安卓开源项目周报0426
查看>>
hexo博客解决不蒜子统计无法显示问题
查看>>
Hibernate中使用c3p0连接池
查看>>
sharepoint 工作流 infopath任务表单中 重复表数据的读写
查看>>
使用fio测试磁盘I/O性能报告
查看>>
python-20:爬取糗事百科段子源码
查看>>
通过qq通信原理
查看>>