This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new ef4ce1385f AutoRecovery supports batch read (#4211)
ef4ce1385f is described below

commit ef4ce1385f8f59fe81cb9f47d72777356a99969d
Author: Hang Chen <chenh...@apache.org>
AuthorDate: Wed Feb 21 02:11:12 2024 +0800

    AutoRecovery supports batch read (#4211)
    
    * AutoRecovery support batch read
    
    * Fix check style
    
    * address comments
---
 .../client/LedgerFragmentReplicator.java           | 142 +++++++++++++++++++--
 .../bookkeeper/conf/ClientConfiguration.java       |  23 ++++
 .../replication/TestReplicationWorker.java         |  63 +++++++++
 3 files changed, 216 insertions(+), 12 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 5cc22362ac..9f6c90d29e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -171,29 +171,41 @@ public class LedgerFragmentReplicator {
             return;
         }
 
-        /*
-         * Add all the entries to entriesToReplicate list from
-         * firstStoredEntryId to lastStoredEntryID.
-         */
-        List<Long> entriesToReplicate = new LinkedList<Long>();
-        long lastStoredEntryId = lf.getLastStoredEntryId();
-        for (long i = lf.getFirstStoredEntryId(); i <= lastStoredEntryId; i++) 
{
-            entriesToReplicate.add(i);
-        }
         /*
          * Now asynchronously replicate all of the entries for the ledger
          * fragment that were on the dead bookie.
          */
+        int entriesToReplicateCnt = (int) (endEntryId - startEntryId + 1);
         MultiCallback ledgerFragmentEntryMcb = new MultiCallback(
-                entriesToReplicate.size(), ledgerFragmentMcb, null, 
BKException.Code.OK,
+                entriesToReplicateCnt, ledgerFragmentMcb, null, 
BKException.Code.OK,
                 BKException.Code.LedgerRecoveryException);
         if (this.replicationThrottle != null) {
             
this.replicationThrottle.resetRate(this.conf.getReplicationRateByBytes());
         }
-        for (final Long entryId : entriesToReplicate) {
-            recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb,
+
+        if (conf.isRecoveryBatchReadEnabled()
+                && conf.getUseV2WireProtocol()
+                && conf.isBatchReadEnabled()
+                && lh.getLedgerMetadata().getEnsembleSize() == 
lh.getLedgerMetadata().getWriteQuorumSize()) {
+            batchRecoverLedgerFragmentEntry(startEntryId, endEntryId, lh, 
ledgerFragmentEntryMcb,
                     newBookies, onReadEntryFailureCallback);
+
+        } else {
+            /*
+             * Add all the entries to entriesToReplicate list from
+             * firstStoredEntryId to lastStoredEntryID.
+             */
+            List<Long> entriesToReplicate = new LinkedList<Long>();
+            long lastStoredEntryId = lf.getLastStoredEntryId();
+            for (long i = lf.getFirstStoredEntryId(); i <= lastStoredEntryId; 
i++) {
+                entriesToReplicate.add(i);
+            }
+            for (final Long entryId : entriesToReplicate) {
+                recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb,
+                        newBookies, onReadEntryFailureCallback);
+            }
         }
+
     }
 
     /**
@@ -433,6 +445,112 @@ public class LedgerFragmentReplicator {
         }, null);
     }
 
+    void batchRecoverLedgerFragmentEntry(final long startEntryId,
+                                         final long endEntryId,
+                                         final LedgerHandle lh,
+                                         final AsyncCallback.VoidCallback 
ledgerFragmentMcb,
+                                         final Set<BookieId> newBookies,
+                                         final BiConsumer<Long, Long> 
onReadEntryFailureCallback)
+            throws InterruptedException {
+        int entriesToReplicateCnt = (int) (endEntryId - startEntryId + 1);
+        int maxBytesToReplicate = conf.getReplicationRateByBytes();
+        if (replicationThrottle != null) {
+            if (maxBytesToReplicate != -1 && maxBytesToReplicate > 
averageEntrySize.get() * entriesToReplicateCnt) {
+                maxBytesToReplicate = averageEntrySize.get() * 
entriesToReplicateCnt;
+            }
+            replicationThrottle.acquire(maxBytesToReplicate);
+        }
+
+        lh.asyncBatchReadEntries(startEntryId, entriesToReplicateCnt, 
maxBytesToReplicate,
+            new ReadCallback() {
+                @Override
+                public void readComplete(int rc, LedgerHandle lh, 
Enumeration<LedgerEntry> seq, Object ctx) {
+                    if (rc != BKException.Code.OK) {
+                        LOG.error("BK error reading ledger entries: {} - {}",
+                                startEntryId, endEntryId, 
BKException.create(rc));
+                        onReadEntryFailureCallback.accept(lh.getId(), 
startEntryId);
+                        for (int i = 0; i < entriesToReplicateCnt; i++) {
+                            ledgerFragmentMcb.processResult(rc, null, null);
+                        }
+                        return;
+                    }
+                    long lastEntryId = startEntryId;
+                    while (seq.hasMoreElements()) {
+                        LedgerEntry entry = seq.nextElement();
+                        lastEntryId = entry.getEntryId();
+                        byte[] data = entry.getEntry();
+                        final long dataLength = data.length;
+                        numEntriesRead.inc();
+                        numBytesRead.registerSuccessfulValue(dataLength);
+
+                        ReferenceCounted toSend = lh.getDigestManager()
+                                
.computeDigestAndPackageForSending(entry.getEntryId(),
+                                        lh.getLastAddConfirmed(), 
entry.getLength(),
+                                        Unpooled.wrappedBuffer(data, 0, 
data.length),
+                                        lh.getLedgerKey(),
+                                        BookieProtocol.FLAG_RECOVERY_ADD);
+                        if (replicationThrottle != null) {
+                            if (toSend instanceof ByteBuf) {
+                                updateAverageEntrySize(((ByteBuf) 
toSend).readableBytes());
+                            } else if (toSend instanceof ByteBufList) {
+                                updateAverageEntrySize(((ByteBufList) 
toSend).readableBytes());
+                            }
+                        }
+                        AtomicInteger numCompleted = new AtomicInteger(0);
+                        AtomicBoolean completed = new AtomicBoolean(false);
+
+                        WriteCallback multiWriteCallback = new WriteCallback() 
{
+                            @Override
+                            public void writeComplete(int rc, long ledgerId, 
long entryId, BookieId addr, Object ctx) {
+                                if (rc != BKException.Code.OK) {
+                                    LOG.error("BK error writing entry for 
ledgerId: {}, entryId: {}, bookie: {}",
+                                            ledgerId, entryId, addr, 
BKException.create(rc));
+                                    if (completed.compareAndSet(false, true)) {
+                                        ledgerFragmentMcb.processResult(rc, 
null, null);
+                                    }
+                                } else {
+                                    numEntriesWritten.inc();
+                                    if (ctx instanceof Long) {
+                                        
numBytesWritten.registerSuccessfulValue((Long) ctx);
+                                    }
+                                    if (LOG.isDebugEnabled()) {
+                                        LOG.debug("Success writing ledger id 
{}, entry id {} to a new bookie {}!",
+                                                ledgerId, entryId, addr);
+                                    }
+                                    if (numCompleted.incrementAndGet() == 
newBookies.size()
+                                            && completed.compareAndSet(false, 
true)) {
+                                        ledgerFragmentMcb.processResult(rc, 
null, null);
+                                    }
+                                }
+                            }
+                        };
+
+                        for (BookieId newBookie : newBookies) {
+                            long startWriteEntryTime = MathUtils.nowInNano();
+                            bkc.getBookieClient().addEntry(newBookie, 
lh.getId(),
+                                    lh.getLedgerKey(), entry.getEntryId(), 
toSend,
+                                    multiWriteCallback, dataLength, 
BookieProtocol.FLAG_RECOVERY_ADD,
+                                    false, WriteFlag.NONE);
+                            writeDataLatency.registerSuccessfulEvent(
+                                    
MathUtils.elapsedNanos(startWriteEntryTime), TimeUnit.NANOSECONDS);
+                        }
+                        toSend.release();
+                    }
+                    if (lastEntryId != endEntryId) {
+                        try {
+                            batchRecoverLedgerFragmentEntry(lastEntryId + 1, 
endEntryId, lh,
+                                    ledgerFragmentMcb, newBookies, 
onReadEntryFailureCallback);
+                        } catch (InterruptedException e) {
+                            int remainingEntries = (int) (endEntryId - 
lastEntryId);
+                            for (int i = 0; i < remainingEntries; i++) {
+                                
ledgerFragmentMcb.processResult(BKException.Code.InterruptedException, null, 
null);
+                            }
+                        }
+                    }
+                }
+            }, null);
+    }
+
     private void updateAverageEntrySize(int toSendSize) {
         averageEntrySize.updateAndGet(value -> (int) (value * 
AVERAGE_ENTRY_SIZE_RATIO
                 + (1 - AVERAGE_ENTRY_SIZE_RATIO) * toSendSize));
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 924dee4ada..03eb6d1abd 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -115,6 +115,7 @@ public class ClientConfiguration extends 
AbstractConfiguration<ClientConfigurati
     protected static final String RECOVERY_READ_BATCH_SIZE = 
"recoveryReadBatchSize";
     protected static final String REORDER_READ_SEQUENCE_ENABLED = 
"reorderReadSequenceEnabled";
     protected static final String STICKY_READS_ENABLED = "stickyReadSEnabled";
+    protected static final String RECOVERY_BATCH_READ_ENABLED = 
"recoveryBatchReadEnabled";
     // Add Parameters
     protected static final String OPPORTUNISTIC_STRIPING = 
"opportunisticStriping";
     protected static final String DELAY_ENSEMBLE_CHANGE = 
"delayEnsembleChange";
@@ -1203,6 +1204,23 @@ public class ClientConfiguration extends 
AbstractConfiguration<ClientConfigurati
         return this;
     }
 
+    /**
+     * If recovery batch read enabled or not.
+     * @return
+     */
+    public boolean isRecoveryBatchReadEnabled() {
+        return getBoolean(RECOVERY_BATCH_READ_ENABLED, false);
+    }
+
+    /**
+     * Enable/disable recovery batch read.
+     * @param enabled
+     * @return
+     */
+    public ClientConfiguration setRecoveryBatchReadEnabled(boolean enabled) {
+        setProperty(RECOVERY_BATCH_READ_ENABLED, enabled);
+        return this;
+    }
     /**
      * Get Ensemble Placement Policy Class.
      *
@@ -2084,6 +2102,11 @@ public class ClientConfiguration extends 
AbstractConfiguration<ClientConfigurati
         return getBoolean(BATCH_READ_ENABLED, true);
     }
 
+    public ClientConfiguration setBatchReadEnabled(boolean enabled) {
+        setProperty(BATCH_READ_ENABLED, enabled);
+        return this;
+    }
+
     @Override
     protected ClientConfiguration getThis() {
         return this;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index a991423389..f4a9245c76 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -437,6 +437,69 @@ public class TestReplicationWorker extends 
BookKeeperClusterTestCase {
 
     }
 
+    @Test
+    public void testMultipleLedgerReplicationWithReplicationWorkerBatchRead() 
throws Exception {
+        LedgerHandle lh1 = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, 
TESTPASSWD);
+        for (int i = 0; i < 200; ++i) {
+            lh1.addEntry(data);
+        }
+        BookieId replicaToKillFromFirstLedger = 
lh1.getLedgerMetadata().getAllEnsembles().get(0L).get(0);
+
+        LedgerHandle lh2 = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, 
TESTPASSWD);
+        for (int i = 0; i < 200; ++i) {
+            lh2.addEntry(data);
+        }
+
+        BookieId replicaToKillFromSecondLedger = 
lh2.getLedgerMetadata().getAllEnsembles().get(0L).get(0);
+
+        LOG.info("Killing Bookie : {}", replicaToKillFromFirstLedger);
+        killBookie(replicaToKillFromFirstLedger);
+        lh1.close();
+
+        LOG.info("Killing Bookie : {}", replicaToKillFromSecondLedger);
+        killBookie(replicaToKillFromSecondLedger);
+        lh2.close();
+
+        BookieId newBkAddr = startNewBookieAndReturnBookieId();
+        LOG.info("New Bookie addr : {}", newBkAddr);
+
+        if (replicaToKillFromFirstLedger != replicaToKillFromSecondLedger) {
+            BookieId newBkAddr2 = startNewBookieAndReturnBookieId();
+            LOG.info("New Bookie addr : {}", newBkAddr2);
+        }
+
+        ClientConfiguration clientConfiguration = new 
ClientConfiguration(baseClientConf);
+        clientConfiguration.setUseV2WireProtocol(true);
+        clientConfiguration.setRecoveryBatchReadEnabled(true);
+        clientConfiguration.setBatchReadEnabled(true);
+        clientConfiguration.setRereplicationEntryBatchSize(100);
+        clientConfiguration.setReplicationRateByBytes(3 * 1024);
+        ReplicationWorker rw = new ReplicationWorker(new 
ServerConfiguration(clientConfiguration));
+
+        rw.start();
+        try {
+            // Mark ledger1 and ledger2 as underreplicated
+            underReplicationManager.markLedgerUnderreplicated(lh1.getId(), 
replicaToKillFromFirstLedger.toString());
+            underReplicationManager.markLedgerUnderreplicated(lh2.getId(), 
replicaToKillFromSecondLedger.toString());
+
+            while (ReplicationTestUtil.isLedgerInUnderReplication(zkc, 
lh1.getId(), basePath)) {
+                Thread.sleep(100);
+            }
+
+            while (ReplicationTestUtil.isLedgerInUnderReplication(zkc, 
lh2.getId(), basePath)) {
+                Thread.sleep(100);
+            }
+
+            killAllBookies(lh1, newBkAddr);
+
+            // Should be able to read the entries from 0-99
+            verifyRecoveredLedgers(lh1, 0, 199);
+            verifyRecoveredLedgers(lh2, 0, 199);
+        } finally {
+            rw.shutdown();
+        }
+    }
+
     /**
      * Tests that ReplicationWorker should fence the ledger and release ledger
      * lock after timeout. Then replication should happen normally.

Reply via email to