This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new f5e4a983e4 Improve DefaultEntryLogger read performance. (#4038)
f5e4a983e4 is described below

commit f5e4a983e48f90e40dad4115be43505d47434e0a
Author: Yan Zhao <horizo...@apache.org>
AuthorDate: Mon Feb 12 09:20:14 2024 +0800

    Improve DefaultEntryLogger read performance. (#4038)
    
    * Avoid system call to improve read performance.
    
    * Fix ci.
    
    * Add comments for getCurrentWritingLogId
    
    * Fix ci.
    
    * Consider compacting log.
    
    * Fix checkstyle.
    
    * Address the comment.
    
    * Address comment.
    
    * Address the comments.
    
    * Add tests.
    
    * Fix checkstyle.
    
    * address the comments.
    
    * Fix concurrency problem.
---
 .../bookkeeper/bookie/BufferedReadChannel.java     | 27 +++++++++++-
 .../bookkeeper/bookie/DefaultEntryLogger.java      | 13 +++++-
 .../bookkeeper/bookie/EntryLogManagerBase.java     |  6 ++-
 .../bookie/EntryLogManagerForSingleEntryLog.java   |  5 ++-
 .../bookkeeper/bookie/EntryLoggerAllocator.java    | 27 +++++++++---
 .../bookie/TransactionalEntryLogCompactor.java     | 11 +++++
 .../bookkeeper/bookie/DefaultEntryLogTest.java     | 48 ++++++++++++++++++++++
 7 files changed, 125 insertions(+), 12 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
index 22f5a81690..4de3890e08 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
@@ -30,7 +30,7 @@ import java.nio.channels.FileChannel;
 /**
  * A Buffered channel without a write buffer. Only reads are buffered.
  */
-public class BufferedReadChannel extends BufferedChannelBase  {
+public class BufferedReadChannel extends BufferedChannelBase {
 
     // The capacity of the read buffer.
     protected final int readCapacity;
@@ -43,9 +43,16 @@ public class BufferedReadChannel extends BufferedChannelBase 
 {
 
     long invocationCount = 0;
     long cacheHitCount = 0;
+    private volatile long fileSize = -1;
+    final boolean sealed;
 
     public BufferedReadChannel(FileChannel fileChannel, int readCapacity) {
+        this(fileChannel, readCapacity, false);
+    }
+
+    public BufferedReadChannel(FileChannel fileChannel, int readCapacity, 
boolean sealed) {
         super(fileChannel);
+        this.sealed = sealed;
         this.readCapacity = readCapacity;
         this.readBuffer = Unpooled.buffer(readCapacity);
     }
@@ -64,10 +71,26 @@ public class BufferedReadChannel extends 
BufferedChannelBase  {
         return read(dest, pos, dest.writableBytes());
     }
 
+    @Override
+    public long size() throws IOException {
+        if (sealed) {
+            if (fileSize == -1) {
+                synchronized (this) {
+                    if (fileSize == -1) {
+                        fileSize = validateAndGetFileChannel().size();
+                    }
+                }
+            }
+            return fileSize;
+        } else {
+            return validateAndGetFileChannel().size();
+        }
+    }
+
     public synchronized int read(ByteBuf dest, long pos, int length) throws 
IOException {
         invocationCount++;
         long currentPosition = pos;
-        long eof = validateAndGetFileChannel().size();
+        long eof = size();
         // return -1 if the given position is greater than or equal to the 
file's current size.
         if (pos >= eof) {
             return -1;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java
index d02ede52fb..c47c0411c2 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java
@@ -587,6 +587,10 @@ public class DefaultEntryLogger implements EntryLogger {
         }
     }
 
+    void clearCompactingLogId() {
+        entryLoggerAllocator.clearCompactingLogId();
+    }
+
     /**
      * Flushes all rotated log channels. After log channels are flushed,
      * move leastUnflushedLogId ptr to current logId.
@@ -894,7 +898,8 @@ public class DefaultEntryLogger implements EntryLogger {
         }
     }
 
-    private BufferedReadChannel getChannelForLogId(long entryLogId) throws 
IOException {
+    @VisibleForTesting
+    BufferedReadChannel getChannelForLogId(long entryLogId) throws IOException 
{
         BufferedReadChannel fc = getFromChannels(entryLogId);
         if (fc != null) {
             return fc;
@@ -910,7 +915,11 @@ public class DefaultEntryLogger implements EntryLogger {
         }
         // We set the position of the write buffer of this buffered channel to 
Long.MAX_VALUE
         // so that there are no overlaps with the write buffer while reading
-        fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes());
+        if (entryLogManager instanceof EntryLogManagerForSingleEntryLog) {
+            fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes(), 
entryLoggerAllocator.isSealed(entryLogId));
+        } else {
+            fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes(), 
false);
+        }
         putInReadChannels(entryLogId, fc);
         return fc;
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
index 36ce928a08..e997906c23 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
@@ -161,6 +161,7 @@ abstract class EntryLogManagerBase implements 
EntryLogManager {
             logChannel.appendLedgersMap();
 
             BufferedLogChannel newLogChannel = 
entryLoggerAllocator.createNewLog(selectDirForNextEntryLog());
+            entryLoggerAllocator.setWritingLogId(newLogChannel.getLogId());
             setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
             log.info("Flushing entry logger {} back to filesystem, pending for 
syncing entry loggers : {}.",
                     logChannel.getLogId(), rotatedLogChannels);
@@ -168,8 +169,9 @@ abstract class EntryLogManagerBase implements 
EntryLogManager {
                 listener.onRotateEntryLog();
             }
         } else {
-            setCurrentLogForLedgerAndAddToRotate(ledgerId,
-                    
entryLoggerAllocator.createNewLog(selectDirForNextEntryLog()));
+            BufferedLogChannel newLogChannel = 
entryLoggerAllocator.createNewLog(selectDirForNextEntryLog());
+            entryLoggerAllocator.setWritingLogId(newLogChannel.getLogId());
+            setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
         }
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
index 59bcc02a57..b784511868 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
@@ -262,6 +262,9 @@ class EntryLogManagerForSingleEntryLog extends 
EntryLogManagerBase {
 
     @Override
     public DefaultEntryLogger.BufferedLogChannel createNewLogForCompaction() 
throws IOException {
-        return 
entryLoggerAllocator.createNewLogForCompaction(selectDirForNextEntryLog());
+        BufferedLogChannel newLogForCompaction = 
entryLoggerAllocator.createNewLogForCompaction(
+                selectDirForNextEntryLog());
+        
entryLoggerAllocator.setWritingCompactingLogId(newLogForCompaction.getLogId());
+        return newLogForCompaction;
     }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
index 68fc1eb3ca..aec2fb1cd0 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
@@ -64,6 +64,8 @@ class EntryLoggerAllocator {
     private final boolean entryLogPreAllocationEnabled;
     private final ByteBufAllocator byteBufAllocator;
     final ByteBuf logfileHeader = 
Unpooled.buffer(DefaultEntryLogger.LOGFILE_HEADER_SIZE);
+    private volatile long writingLogId = -1;
+    private volatile long writingCompactingLogId = -1;
 
     EntryLoggerAllocator(ServerConfiguration conf, LedgerDirsManager 
ledgerDirsManager,
                          DefaultEntryLogger.RecentEntryLogsStatus 
recentlyCreatedEntryLogsStatus, long logId,
@@ -91,16 +93,19 @@ class EntryLoggerAllocator {
         return preallocatedLogId;
     }
 
+    public boolean isSealed(long logId) {
+        return logId != writingLogId && logId != writingCompactingLogId;
+    }
+
     BufferedLogChannel createNewLog(File dirForNextEntryLog) throws 
IOException {
         synchronized (createEntryLogLock) {
             BufferedLogChannel bc;
-            if (!entryLogPreAllocationEnabled){
+            if (!entryLogPreAllocationEnabled) {
                 // create a new log directly
-                bc = allocateNewLog(dirForNextEntryLog);
-                return bc;
+                return allocateNewLog(dirForNextEntryLog);
             } else {
                 // allocate directly to response request
-                if (null == preallocation){
+                if (null == preallocation) {
                     bc = allocateNewLog(dirForNextEntryLog);
                 } else {
                     // has a preallocated entry log
@@ -116,7 +121,7 @@ class EntryLoggerAllocator {
                         throw new IOException("Task to allocate a new entry 
log is cancelled.", ce);
                     } catch (InterruptedException ie) {
                         Thread.currentThread().interrupt();
-                        throw new IOException("Intrrupted when waiting a new 
entry log to be allocated.", ie);
+                        throw new IOException("Interrupted when waiting a new 
entry log to be allocated.", ie);
                     }
                 }
                 // preallocate a new log in background upon every call
@@ -132,6 +137,18 @@ class EntryLoggerAllocator {
         }
     }
 
+    void setWritingLogId(long lodId) {
+        this.writingLogId = lodId;
+    }
+
+    void setWritingCompactingLogId(long logId) {
+        this.writingCompactingLogId = logId;
+    }
+
+    void clearCompactingLogId() {
+        writingCompactingLogId = -1;
+    }
+
     private synchronized BufferedLogChannel allocateNewLog(File 
dirForNextEntryLog) throws IOException {
         return allocateNewLog(dirForNextEntryLog, ".log");
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
index 2b6fca30c1..9a27bcccd8 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
@@ -199,6 +199,7 @@ public class TransactionalEntryLogCompactor extends 
AbstractLogCompactor {
                 LOG.info("No valid entry is found in entry log after scan, 
removing entry log now.");
                 logRemovalListener.removeEntryLog(metadata.getEntryLogId());
                 compactionLog.abort();
+                compactingLogWriteDone();
                 return false;
             }
             return true;
@@ -209,6 +210,13 @@ public class TransactionalEntryLogCompactor extends 
AbstractLogCompactor {
             offsets.clear();
             // since we haven't flushed yet, we only need to delete the 
unflushed compaction file.
             compactionLog.abort();
+            compactingLogWriteDone();
+        }
+    }
+
+    private void compactingLogWriteDone() {
+        if (entryLogger instanceof DefaultEntryLogger) {
+            ((DefaultEntryLogger) entryLogger).clearCompactingLogId();
         }
     }
 
@@ -241,6 +249,8 @@ public class TransactionalEntryLogCompactor extends 
AbstractLogCompactor {
             } catch (IOException ioe) {
                 LOG.warn("Error marking compaction as done", ioe);
                 return false;
+            } finally {
+                compactingLogWriteDone();
             }
         }
 
@@ -249,6 +259,7 @@ public class TransactionalEntryLogCompactor extends 
AbstractLogCompactor {
             offsets.clear();
             // remove compaction log file and its hardlink
             compactionLog.abort();
+            compactingLogWriteDone();
         }
     }
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java
index 38a9ebaf21..3048ef33a8 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java
@@ -67,6 +67,7 @@ import 
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirExcepti
 import org.apache.bookkeeper.common.testing.annotations.FlakyTest;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.test.TestStatsProvider;
 import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
@@ -154,6 +155,53 @@ public class DefaultEntryLogTest {
         assertEquals(0L, entryLogManager.getCurrentLogId());
     }
 
+    @Test
+    public void testEntryLogIsSealedWithPerLedgerDisabled() throws Exception {
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogPerLedgerEnabled(false);
+        conf.setEntryLogFilePreAllocationEnabled(true);
+
+        TestStatsProvider statsProvider = new TestStatsProvider();
+        TestStatsProvider.TestStatsLogger statsLogger =
+                
statsProvider.getStatsLogger(BookKeeperServerStats.ENTRYLOGGER_SCOPE);
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, dirsMgr, 
null, statsLogger,
+                UnpooledByteBufAllocator.DEFAULT);
+        EntryLogManagerBase entrylogManager = (EntryLogManagerBase) 
entryLogger.getEntryLogManager();
+        entrylogManager.createNewLog(0);
+        BufferedReadChannel channel = entryLogger.getChannelForLogId(0);
+        assertFalse(channel.sealed);
+        entrylogManager.createNewLog(1);
+        channel = entryLogger.getChannelForLogId(0);
+        assertFalse(channel.sealed);
+        entrylogManager.createNewLog(2);
+        channel = entryLogger.getChannelForLogId(1);
+        assertTrue(channel.sealed);
+    }
+
+    @Test
+    public void testEntryLogIsSealedWithPerLedgerEnabled() throws Exception {
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        //If entryLogPerLedgerEnabled is true, the buffer channel sealed flag 
always false.
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setEntryLogFilePreAllocationEnabled(true);
+
+        TestStatsProvider statsProvider = new TestStatsProvider();
+        TestStatsProvider.TestStatsLogger statsLogger =
+                
statsProvider.getStatsLogger(BookKeeperServerStats.ENTRYLOGGER_SCOPE);
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, dirsMgr, 
null, statsLogger,
+                UnpooledByteBufAllocator.DEFAULT);
+        EntryLogManagerBase entrylogManager = (EntryLogManagerBase) 
entryLogger.getEntryLogManager();
+        entrylogManager.createNewLog(0);
+        BufferedReadChannel channel = entryLogger.getChannelForLogId(0);
+        assertFalse(channel.sealed);
+        entrylogManager.createNewLog(1);
+        channel = entryLogger.getChannelForLogId(0);
+        assertFalse(channel.sealed);
+        entrylogManager.createNewLog(2);
+        channel = entryLogger.getChannelForLogId(1);
+        assertFalse(channel.sealed);
+    }
+
     @Test
     public void testDeferCreateNewLogWithoutEnoughDiskSpaces() throws 
Exception {
         entryLogger.close();

Reply via email to