复制方式 | 优点 | 缺点 | 适应场景 |
同步刷盘 | 保证了消息不丢失 | 吞吐率相对于异步刷盘要低 | 消息可靠性要求较高的场景 |
异步刷盘 | 系统的吞吐量提高 | 系统断电等异常时会有部分丢失 | 对应吞吐量要求较高的场景 |
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // Synchronization flush 同步刷盘 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; //客户端确认要等待刷盘成功 if (messageExt.isWaitStoreMsgOK()) { //封装刷盘请求对象 nextoffset : 当前内存写的位置 + 本次要写入的字节数 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); //添加刷盘请求(后台定时任务进行刷盘,每隔10毫秒批量刷盘。10毫秒中如果有多个请求,则多个请求一块刷盘) service.putRequest(request); //等待刷盘请求结果(最长等待5秒钟,刷盘成功后马上可以获取结果。) boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { service.wakeup(); } }else {// Asynchronous flush 异步刷盘 if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { //唤醒FlushRealTimeService服务线程 flushCommitLogService.wakeup(); } else { //唤醒CommitRealTimeService服务线程 commitLogService.wakeup(); } } }
private volatile List
public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { //等待通知,如果数据过来,提前结束等待执行onWaitEnd()方法交换读写swapRequests() //刷盘请求的requestsWrite->requestsRead this.waitForRunning(10); //执行刷盘 this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } //省略代码... }
private void swapRequests() { Listtmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; }
private void doCommit() { synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { //循环每一个刷盘请求 for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush boolean flushOK = false; for (int i = 0; i < 2 && !flushOK; i++) { //判断是否已经刷盘过了,刷盘的位置和当前消息下次刷盘需要的位置比较 flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); if (!flushOK) { //0代码立刻刷盘,不管缓存中消息有多少 CommitLog.this.mappedFileQueue.flush(0); } } //返回刷盘的结果 req.wakeupCustomer(flushOK); } long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); //设置刷盘的时间点 if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } //清空requestsRead对象 this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mappedFileQueue.flush(0); } } }
public boolean flush(final int flushLeastPages) { boolean result = true; MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); //刷盘,返回刷写到磁盘指针 int offset = mappedFile.flush(flushLeastPages); //计算当前的刷盘指针,之前的所有数据已经持久化到磁盘中 long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } } return result; }
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { //唤醒FlushRealTimeService服务线程 flushCommitLogService.wakeup(); } else { //唤醒CommitRealTimeService服务线程 commitLogService.wakeup(); }
public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { // 每次刷盘的间隔时间,默认 200ms int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); // 每次commit最少的页数 默认4页 int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); // 如果上次刷新的时间+该值 小于当前时间,则改变flushPhysicQueueLeastPages =0 默认为200 int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); long begin = System.currentTimeMillis(); //距离上一次刷盘时间超过200ms则立刻刷盘,commit最少的页数置为0 if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { this.lastCommitTimestamp = begin; commitDataLeastPages = 0; } try { //刷盘 boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); long end = System.currentTimeMillis(); if (!result) { this.lastCommitTimestamp = end; // result = false means some data committed. //now wake up flush thread. flushCommitLogService.wakeup(); } if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin); } this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } } boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.info(this.getServiceName() + " service end"); } }
public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { // 每次刷盘的间隔时间,默认 200ms int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); // 每次commit最少的页数 默认4页 int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); // 如果上次刷新的时间+该值 小于当前时间,则改变flushPhysicQueueLeastPages =0 默认为200 int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); long begin = System.currentTimeMillis(); //距离上一次刷盘时间超过200ms则立刻刷盘,commit最少的页数置为0 if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { this.lastCommitTimestamp = begin; commitDataLeastPages = 0; } try { //刷盘 boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); long end = System.currentTimeMillis(); //返回的是false说明数据已经commit到了fileChannel中 if (!result) { this.lastCommitTimestamp = end; // result = false means some data committed. //now wake up flush thread. flushCommitLogService.wakeup(); } if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin); } this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } } boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.info(this.getServiceName() + " service end"); } }
public int commit(final int commitLeastPages) { if (writeBuffer == null) { //no need to commit data to file channel, so just regard wrotePosition as committedPosition. return this.wrotePosition.get(); } //如果提交的数据不满commitLeastPages则不执行本次的提交,待下一次提交 if (this.isAbleToCommit(commitLeastPages)) { if (this.hold()) { commit0(commitLeastPages); this.release(); } else { log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); } } // All dirty data has been committed to FileChannel. if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { this.transientStorePool.returnBuffer(writeBuffer); this.writeBuffer = null; } return this.committedPosition.get(); }
protected void commit0(final int commitLeastPages) { int writePos = this.wrotePosition.get(); int lastCommittedPosition = this.committedPosition.get(); if (writePos - this.committedPosition.get() > 0) { try { //创建writeBuffer的共享缓存区 ByteBuffer byteBuffer = writeBuffer.slice(); //将指针回退到上一次提交的位置 byteBuffer.position(lastCommittedPosition); //设置limit为writePos byteBuffer.limit(writePos); this.fileChannel.position(lastCommittedPosition); //将committedPosition指针到wrotePosition的数据复制(写入)到fileChannel中 this.fileChannel.write(byteBuffer); //更新committedPosition指针为writePos this.committedPosition.set(writePos); } catch (Throwable e) { log.error("Error occurred when commit data to FileChannel.", e); } } }
public int flush(final int flushLeastPages) { if (this.isAbleToFlush(flushLeastPages)) { if (this.hold()) { int value = getReadPosition(); try { //We only append data to fileChannel or mappedByteBuffer, never both. if (writeBuffer != null || this.fileChannel.position() != 0) { this.fileChannel.force(false); } else { this.mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk.", e); } //设置刷盘后的指针 this.flushedPosition.set(value); this.release(); } else { log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); this.flushedPosition.set(getReadPosition()); } } return this.getFlushedPosition(); }
private boolean isAbleToFlush(final int flushLeastPages) { int flush = this.flushedPosition.get(); int write = getReadPosition(); if (this.isFull()) { return true; } if (flushLeastPages > 0) { return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages; } return write > flush; }
异步刷盘时flushLeastPages=4 ,默认是4,需要刷盘的数据达到PageCache的页数4倍时才会刷盘,或者距上一次刷盘时间>=200ms则设置flushLeastPages=0立刻刷盘
异步刷盘有两种方式但是其逻辑都是需要刷盘的数据OS_PAGE_SIZE的4倍即(1024 * 4)*4=16k或者距上一次刷盘时间>=200ms时才刷盘,提高数据的刷盘性能