This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push: new ef4ce1385f AutoRecovery supports batch read (#4211) ef4ce1385f is described below commit ef4ce1385f8f59fe81cb9f47d72777356a99969d Author: Hang Chen <chenh...@apache.org> AuthorDate: Wed Feb 21 02:11:12 2024 +0800 AutoRecovery supports batch read (#4211) * AutoRecovery support batch read * Fix check style * address comments --- .../client/LedgerFragmentReplicator.java | 142 +++++++++++++++++++-- .../bookkeeper/conf/ClientConfiguration.java | 23 ++++ .../replication/TestReplicationWorker.java | 63 +++++++++ 3 files changed, 216 insertions(+), 12 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java index 5cc22362ac..9f6c90d29e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java @@ -171,29 +171,41 @@ public class LedgerFragmentReplicator { return; } - /* - * Add all the entries to entriesToReplicate list from - * firstStoredEntryId to lastStoredEntryID. - */ - List<Long> entriesToReplicate = new LinkedList<Long>(); - long lastStoredEntryId = lf.getLastStoredEntryId(); - for (long i = lf.getFirstStoredEntryId(); i <= lastStoredEntryId; i++) { - entriesToReplicate.add(i); - } /* * Now asynchronously replicate all of the entries for the ledger * fragment that were on the dead bookie. */ + int entriesToReplicateCnt = (int) (endEntryId - startEntryId + 1); MultiCallback ledgerFragmentEntryMcb = new MultiCallback( - entriesToReplicate.size(), ledgerFragmentMcb, null, BKException.Code.OK, + entriesToReplicateCnt, ledgerFragmentMcb, null, BKException.Code.OK, BKException.Code.LedgerRecoveryException); if (this.replicationThrottle != null) { this.replicationThrottle.resetRate(this.conf.getReplicationRateByBytes()); } - for (final Long entryId : entriesToReplicate) { - recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb, + + if (conf.isRecoveryBatchReadEnabled() + && conf.getUseV2WireProtocol() + && conf.isBatchReadEnabled() + && lh.getLedgerMetadata().getEnsembleSize() == lh.getLedgerMetadata().getWriteQuorumSize()) { + batchRecoverLedgerFragmentEntry(startEntryId, endEntryId, lh, ledgerFragmentEntryMcb, newBookies, onReadEntryFailureCallback); + + } else { + /* + * Add all the entries to entriesToReplicate list from + * firstStoredEntryId to lastStoredEntryID. + */ + List<Long> entriesToReplicate = new LinkedList<Long>(); + long lastStoredEntryId = lf.getLastStoredEntryId(); + for (long i = lf.getFirstStoredEntryId(); i <= lastStoredEntryId; i++) { + entriesToReplicate.add(i); + } + for (final Long entryId : entriesToReplicate) { + recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb, + newBookies, onReadEntryFailureCallback); + } } + } /** @@ -433,6 +445,112 @@ public class LedgerFragmentReplicator { }, null); } + void batchRecoverLedgerFragmentEntry(final long startEntryId, + final long endEntryId, + final LedgerHandle lh, + final AsyncCallback.VoidCallback ledgerFragmentMcb, + final Set<BookieId> newBookies, + final BiConsumer<Long, Long> onReadEntryFailureCallback) + throws InterruptedException { + int entriesToReplicateCnt = (int) (endEntryId - startEntryId + 1); + int maxBytesToReplicate = conf.getReplicationRateByBytes(); + if (replicationThrottle != null) { + if (maxBytesToReplicate != -1 && maxBytesToReplicate > averageEntrySize.get() * entriesToReplicateCnt) { + maxBytesToReplicate = averageEntrySize.get() * entriesToReplicateCnt; + } + replicationThrottle.acquire(maxBytesToReplicate); + } + + lh.asyncBatchReadEntries(startEntryId, entriesToReplicateCnt, maxBytesToReplicate, + new ReadCallback() { + @Override + public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) { + if (rc != BKException.Code.OK) { + LOG.error("BK error reading ledger entries: {} - {}", + startEntryId, endEntryId, BKException.create(rc)); + onReadEntryFailureCallback.accept(lh.getId(), startEntryId); + for (int i = 0; i < entriesToReplicateCnt; i++) { + ledgerFragmentMcb.processResult(rc, null, null); + } + return; + } + long lastEntryId = startEntryId; + while (seq.hasMoreElements()) { + LedgerEntry entry = seq.nextElement(); + lastEntryId = entry.getEntryId(); + byte[] data = entry.getEntry(); + final long dataLength = data.length; + numEntriesRead.inc(); + numBytesRead.registerSuccessfulValue(dataLength); + + ReferenceCounted toSend = lh.getDigestManager() + .computeDigestAndPackageForSending(entry.getEntryId(), + lh.getLastAddConfirmed(), entry.getLength(), + Unpooled.wrappedBuffer(data, 0, data.length), + lh.getLedgerKey(), + BookieProtocol.FLAG_RECOVERY_ADD); + if (replicationThrottle != null) { + if (toSend instanceof ByteBuf) { + updateAverageEntrySize(((ByteBuf) toSend).readableBytes()); + } else if (toSend instanceof ByteBufList) { + updateAverageEntrySize(((ByteBufList) toSend).readableBytes()); + } + } + AtomicInteger numCompleted = new AtomicInteger(0); + AtomicBoolean completed = new AtomicBoolean(false); + + WriteCallback multiWriteCallback = new WriteCallback() { + @Override + public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) { + if (rc != BKException.Code.OK) { + LOG.error("BK error writing entry for ledgerId: {}, entryId: {}, bookie: {}", + ledgerId, entryId, addr, BKException.create(rc)); + if (completed.compareAndSet(false, true)) { + ledgerFragmentMcb.processResult(rc, null, null); + } + } else { + numEntriesWritten.inc(); + if (ctx instanceof Long) { + numBytesWritten.registerSuccessfulValue((Long) ctx); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Success writing ledger id {}, entry id {} to a new bookie {}!", + ledgerId, entryId, addr); + } + if (numCompleted.incrementAndGet() == newBookies.size() + && completed.compareAndSet(false, true)) { + ledgerFragmentMcb.processResult(rc, null, null); + } + } + } + }; + + for (BookieId newBookie : newBookies) { + long startWriteEntryTime = MathUtils.nowInNano(); + bkc.getBookieClient().addEntry(newBookie, lh.getId(), + lh.getLedgerKey(), entry.getEntryId(), toSend, + multiWriteCallback, dataLength, BookieProtocol.FLAG_RECOVERY_ADD, + false, WriteFlag.NONE); + writeDataLatency.registerSuccessfulEvent( + MathUtils.elapsedNanos(startWriteEntryTime), TimeUnit.NANOSECONDS); + } + toSend.release(); + } + if (lastEntryId != endEntryId) { + try { + batchRecoverLedgerFragmentEntry(lastEntryId + 1, endEntryId, lh, + ledgerFragmentMcb, newBookies, onReadEntryFailureCallback); + } catch (InterruptedException e) { + int remainingEntries = (int) (endEntryId - lastEntryId); + for (int i = 0; i < remainingEntries; i++) { + ledgerFragmentMcb.processResult(BKException.Code.InterruptedException, null, null); + } + } + } + } + }, null); + } + private void updateAverageEntrySize(int toSendSize) { averageEntrySize.updateAndGet(value -> (int) (value * AVERAGE_ENTRY_SIZE_RATIO + (1 - AVERAGE_ENTRY_SIZE_RATIO) * toSendSize)); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index 924dee4ada..03eb6d1abd 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -115,6 +115,7 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati protected static final String RECOVERY_READ_BATCH_SIZE = "recoveryReadBatchSize"; protected static final String REORDER_READ_SEQUENCE_ENABLED = "reorderReadSequenceEnabled"; protected static final String STICKY_READS_ENABLED = "stickyReadSEnabled"; + protected static final String RECOVERY_BATCH_READ_ENABLED = "recoveryBatchReadEnabled"; // Add Parameters protected static final String OPPORTUNISTIC_STRIPING = "opportunisticStriping"; protected static final String DELAY_ENSEMBLE_CHANGE = "delayEnsembleChange"; @@ -1203,6 +1204,23 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati return this; } + /** + * If recovery batch read enabled or not. + * @return + */ + public boolean isRecoveryBatchReadEnabled() { + return getBoolean(RECOVERY_BATCH_READ_ENABLED, false); + } + + /** + * Enable/disable recovery batch read. + * @param enabled + * @return + */ + public ClientConfiguration setRecoveryBatchReadEnabled(boolean enabled) { + setProperty(RECOVERY_BATCH_READ_ENABLED, enabled); + return this; + } /** * Get Ensemble Placement Policy Class. * @@ -2084,6 +2102,11 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati return getBoolean(BATCH_READ_ENABLED, true); } + public ClientConfiguration setBatchReadEnabled(boolean enabled) { + setProperty(BATCH_READ_ENABLED, enabled); + return this; + } + @Override protected ClientConfiguration getThis() { return this; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java index a991423389..f4a9245c76 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java @@ -437,6 +437,69 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase { } + @Test + public void testMultipleLedgerReplicationWithReplicationWorkerBatchRead() throws Exception { + LedgerHandle lh1 = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD); + for (int i = 0; i < 200; ++i) { + lh1.addEntry(data); + } + BookieId replicaToKillFromFirstLedger = lh1.getLedgerMetadata().getAllEnsembles().get(0L).get(0); + + LedgerHandle lh2 = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD); + for (int i = 0; i < 200; ++i) { + lh2.addEntry(data); + } + + BookieId replicaToKillFromSecondLedger = lh2.getLedgerMetadata().getAllEnsembles().get(0L).get(0); + + LOG.info("Killing Bookie : {}", replicaToKillFromFirstLedger); + killBookie(replicaToKillFromFirstLedger); + lh1.close(); + + LOG.info("Killing Bookie : {}", replicaToKillFromSecondLedger); + killBookie(replicaToKillFromSecondLedger); + lh2.close(); + + BookieId newBkAddr = startNewBookieAndReturnBookieId(); + LOG.info("New Bookie addr : {}", newBkAddr); + + if (replicaToKillFromFirstLedger != replicaToKillFromSecondLedger) { + BookieId newBkAddr2 = startNewBookieAndReturnBookieId(); + LOG.info("New Bookie addr : {}", newBkAddr2); + } + + ClientConfiguration clientConfiguration = new ClientConfiguration(baseClientConf); + clientConfiguration.setUseV2WireProtocol(true); + clientConfiguration.setRecoveryBatchReadEnabled(true); + clientConfiguration.setBatchReadEnabled(true); + clientConfiguration.setRereplicationEntryBatchSize(100); + clientConfiguration.setReplicationRateByBytes(3 * 1024); + ReplicationWorker rw = new ReplicationWorker(new ServerConfiguration(clientConfiguration)); + + rw.start(); + try { + // Mark ledger1 and ledger2 as underreplicated + underReplicationManager.markLedgerUnderreplicated(lh1.getId(), replicaToKillFromFirstLedger.toString()); + underReplicationManager.markLedgerUnderreplicated(lh2.getId(), replicaToKillFromSecondLedger.toString()); + + while (ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh1.getId(), basePath)) { + Thread.sleep(100); + } + + while (ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh2.getId(), basePath)) { + Thread.sleep(100); + } + + killAllBookies(lh1, newBkAddr); + + // Should be able to read the entries from 0-99 + verifyRecoveredLedgers(lh1, 0, 199); + verifyRecoveredLedgers(lh2, 0, 199); + } finally { + rw.shutdown(); + } + } + /** * Tests that ReplicationWorker should fence the ledger and release ledger * lock after timeout. Then replication should happen normally.