This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push: new f5e4a983e4 Improve DefaultEntryLogger read performance. (#4038) f5e4a983e4 is described below commit f5e4a983e48f90e40dad4115be43505d47434e0a Author: Yan Zhao <horizo...@apache.org> AuthorDate: Mon Feb 12 09:20:14 2024 +0800 Improve DefaultEntryLogger read performance. (#4038) * Avoid system call to improve read performance. * Fix ci. * Add comments for getCurrentWritingLogId * Fix ci. * Consider compacting log. * Fix checkstyle. * Address the comment. * Address comment. * Address the comments. * Add tests. * Fix checkstyle. * address the comments. * Fix concurrency problem. --- .../bookkeeper/bookie/BufferedReadChannel.java | 27 +++++++++++- .../bookkeeper/bookie/DefaultEntryLogger.java | 13 +++++- .../bookkeeper/bookie/EntryLogManagerBase.java | 6 ++- .../bookie/EntryLogManagerForSingleEntryLog.java | 5 ++- .../bookkeeper/bookie/EntryLoggerAllocator.java | 27 +++++++++--- .../bookie/TransactionalEntryLogCompactor.java | 11 +++++ .../bookkeeper/bookie/DefaultEntryLogTest.java | 48 ++++++++++++++++++++++ 7 files changed, 125 insertions(+), 12 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java index 22f5a81690..4de3890e08 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java @@ -30,7 +30,7 @@ import java.nio.channels.FileChannel; /** * A Buffered channel without a write buffer. Only reads are buffered. */ -public class BufferedReadChannel extends BufferedChannelBase { +public class BufferedReadChannel extends BufferedChannelBase { // The capacity of the read buffer. protected final int readCapacity; @@ -43,9 +43,16 @@ public class BufferedReadChannel extends BufferedChannelBase { long invocationCount = 0; long cacheHitCount = 0; + private volatile long fileSize = -1; + final boolean sealed; public BufferedReadChannel(FileChannel fileChannel, int readCapacity) { + this(fileChannel, readCapacity, false); + } + + public BufferedReadChannel(FileChannel fileChannel, int readCapacity, boolean sealed) { super(fileChannel); + this.sealed = sealed; this.readCapacity = readCapacity; this.readBuffer = Unpooled.buffer(readCapacity); } @@ -64,10 +71,26 @@ public class BufferedReadChannel extends BufferedChannelBase { return read(dest, pos, dest.writableBytes()); } + @Override + public long size() throws IOException { + if (sealed) { + if (fileSize == -1) { + synchronized (this) { + if (fileSize == -1) { + fileSize = validateAndGetFileChannel().size(); + } + } + } + return fileSize; + } else { + return validateAndGetFileChannel().size(); + } + } + public synchronized int read(ByteBuf dest, long pos, int length) throws IOException { invocationCount++; long currentPosition = pos; - long eof = validateAndGetFileChannel().size(); + long eof = size(); // return -1 if the given position is greater than or equal to the file's current size. if (pos >= eof) { return -1; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index d02ede52fb..c47c0411c2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -587,6 +587,10 @@ public class DefaultEntryLogger implements EntryLogger { } } + void clearCompactingLogId() { + entryLoggerAllocator.clearCompactingLogId(); + } + /** * Flushes all rotated log channels. After log channels are flushed, * move leastUnflushedLogId ptr to current logId. @@ -894,7 +898,8 @@ public class DefaultEntryLogger implements EntryLogger { } } - private BufferedReadChannel getChannelForLogId(long entryLogId) throws IOException { + @VisibleForTesting + BufferedReadChannel getChannelForLogId(long entryLogId) throws IOException { BufferedReadChannel fc = getFromChannels(entryLogId); if (fc != null) { return fc; @@ -910,7 +915,11 @@ public class DefaultEntryLogger implements EntryLogger { } // We set the position of the write buffer of this buffered channel to Long.MAX_VALUE // so that there are no overlaps with the write buffer while reading - fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes()); + if (entryLogManager instanceof EntryLogManagerForSingleEntryLog) { + fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes(), entryLoggerAllocator.isSealed(entryLogId)); + } else { + fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes(), false); + } putInReadChannels(entryLogId, fc); return fc; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java index 36ce928a08..e997906c23 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java @@ -161,6 +161,7 @@ abstract class EntryLogManagerBase implements EntryLogManager { logChannel.appendLedgersMap(); BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog(selectDirForNextEntryLog()); + entryLoggerAllocator.setWritingLogId(newLogChannel.getLogId()); setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel); log.info("Flushing entry logger {} back to filesystem, pending for syncing entry loggers : {}.", logChannel.getLogId(), rotatedLogChannels); @@ -168,8 +169,9 @@ abstract class EntryLogManagerBase implements EntryLogManager { listener.onRotateEntryLog(); } } else { - setCurrentLogForLedgerAndAddToRotate(ledgerId, - entryLoggerAllocator.createNewLog(selectDirForNextEntryLog())); + BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog(selectDirForNextEntryLog()); + entryLoggerAllocator.setWritingLogId(newLogChannel.getLogId()); + setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java index 59bcc02a57..b784511868 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java @@ -262,6 +262,9 @@ class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase { @Override public DefaultEntryLogger.BufferedLogChannel createNewLogForCompaction() throws IOException { - return entryLoggerAllocator.createNewLogForCompaction(selectDirForNextEntryLog()); + BufferedLogChannel newLogForCompaction = entryLoggerAllocator.createNewLogForCompaction( + selectDirForNextEntryLog()); + entryLoggerAllocator.setWritingCompactingLogId(newLogForCompaction.getLogId()); + return newLogForCompaction; } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java index 68fc1eb3ca..aec2fb1cd0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java @@ -64,6 +64,8 @@ class EntryLoggerAllocator { private final boolean entryLogPreAllocationEnabled; private final ByteBufAllocator byteBufAllocator; final ByteBuf logfileHeader = Unpooled.buffer(DefaultEntryLogger.LOGFILE_HEADER_SIZE); + private volatile long writingLogId = -1; + private volatile long writingCompactingLogId = -1; EntryLoggerAllocator(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, long logId, @@ -91,16 +93,19 @@ class EntryLoggerAllocator { return preallocatedLogId; } + public boolean isSealed(long logId) { + return logId != writingLogId && logId != writingCompactingLogId; + } + BufferedLogChannel createNewLog(File dirForNextEntryLog) throws IOException { synchronized (createEntryLogLock) { BufferedLogChannel bc; - if (!entryLogPreAllocationEnabled){ + if (!entryLogPreAllocationEnabled) { // create a new log directly - bc = allocateNewLog(dirForNextEntryLog); - return bc; + return allocateNewLog(dirForNextEntryLog); } else { // allocate directly to response request - if (null == preallocation){ + if (null == preallocation) { bc = allocateNewLog(dirForNextEntryLog); } else { // has a preallocated entry log @@ -116,7 +121,7 @@ class EntryLoggerAllocator { throw new IOException("Task to allocate a new entry log is cancelled.", ce); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); - throw new IOException("Intrrupted when waiting a new entry log to be allocated.", ie); + throw new IOException("Interrupted when waiting a new entry log to be allocated.", ie); } } // preallocate a new log in background upon every call @@ -132,6 +137,18 @@ class EntryLoggerAllocator { } } + void setWritingLogId(long lodId) { + this.writingLogId = lodId; + } + + void setWritingCompactingLogId(long logId) { + this.writingCompactingLogId = logId; + } + + void clearCompactingLogId() { + writingCompactingLogId = -1; + } + private synchronized BufferedLogChannel allocateNewLog(File dirForNextEntryLog) throws IOException { return allocateNewLog(dirForNextEntryLog, ".log"); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java index 2b6fca30c1..9a27bcccd8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java @@ -199,6 +199,7 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor { LOG.info("No valid entry is found in entry log after scan, removing entry log now."); logRemovalListener.removeEntryLog(metadata.getEntryLogId()); compactionLog.abort(); + compactingLogWriteDone(); return false; } return true; @@ -209,6 +210,13 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor { offsets.clear(); // since we haven't flushed yet, we only need to delete the unflushed compaction file. compactionLog.abort(); + compactingLogWriteDone(); + } + } + + private void compactingLogWriteDone() { + if (entryLogger instanceof DefaultEntryLogger) { + ((DefaultEntryLogger) entryLogger).clearCompactingLogId(); } } @@ -241,6 +249,8 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor { } catch (IOException ioe) { LOG.warn("Error marking compaction as done", ioe); return false; + } finally { + compactingLogWriteDone(); } } @@ -249,6 +259,7 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor { offsets.clear(); // remove compaction log file and its hardlink compactionLog.abort(); + compactingLogWriteDone(); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java index 38a9ebaf21..3048ef33a8 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java @@ -67,6 +67,7 @@ import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirExcepti import org.apache.bookkeeper.common.testing.annotations.FlakyTest; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.conf.TestBKConfiguration; +import org.apache.bookkeeper.test.TestStatsProvider; import org.apache.bookkeeper.util.DiskChecker; import org.apache.bookkeeper.util.IOUtils; import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap; @@ -154,6 +155,53 @@ public class DefaultEntryLogTest { assertEquals(0L, entryLogManager.getCurrentLogId()); } + @Test + public void testEntryLogIsSealedWithPerLedgerDisabled() throws Exception { + ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + conf.setEntryLogPerLedgerEnabled(false); + conf.setEntryLogFilePreAllocationEnabled(true); + + TestStatsProvider statsProvider = new TestStatsProvider(); + TestStatsProvider.TestStatsLogger statsLogger = + statsProvider.getStatsLogger(BookKeeperServerStats.ENTRYLOGGER_SCOPE); + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, dirsMgr, null, statsLogger, + UnpooledByteBufAllocator.DEFAULT); + EntryLogManagerBase entrylogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager(); + entrylogManager.createNewLog(0); + BufferedReadChannel channel = entryLogger.getChannelForLogId(0); + assertFalse(channel.sealed); + entrylogManager.createNewLog(1); + channel = entryLogger.getChannelForLogId(0); + assertFalse(channel.sealed); + entrylogManager.createNewLog(2); + channel = entryLogger.getChannelForLogId(1); + assertTrue(channel.sealed); + } + + @Test + public void testEntryLogIsSealedWithPerLedgerEnabled() throws Exception { + ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + //If entryLogPerLedgerEnabled is true, the buffer channel sealed flag always false. + conf.setEntryLogPerLedgerEnabled(true); + conf.setEntryLogFilePreAllocationEnabled(true); + + TestStatsProvider statsProvider = new TestStatsProvider(); + TestStatsProvider.TestStatsLogger statsLogger = + statsProvider.getStatsLogger(BookKeeperServerStats.ENTRYLOGGER_SCOPE); + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, dirsMgr, null, statsLogger, + UnpooledByteBufAllocator.DEFAULT); + EntryLogManagerBase entrylogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager(); + entrylogManager.createNewLog(0); + BufferedReadChannel channel = entryLogger.getChannelForLogId(0); + assertFalse(channel.sealed); + entrylogManager.createNewLog(1); + channel = entryLogger.getChannelForLogId(0); + assertFalse(channel.sealed); + entrylogManager.createNewLog(2); + channel = entryLogger.getChannelForLogId(1); + assertFalse(channel.sealed); + } + @Test public void testDeferCreateNewLogWithoutEnoughDiskSpaces() throws Exception { entryLogger.close();