sijie closed pull request #1548: Fixed race condition intruduced managed ledger addEntry introduced in #1521 URL: https://github.com/apache/incubator-pulsar/pull/1548
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index bf2cd8c587..c60bd49827 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -92,7 +92,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; -import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -202,7 +201,7 @@ * Queue of pending entries to be added to the managed ledger. Typically entries are queued when a new ledger is * created asynchronously and hence there is no ready ledger to write into. */ - final GrowableArrayBlockingQueue<OpAddEntry> pendingAddEntries = new GrowableArrayBlockingQueue<>(); + final ConcurrentLinkedQueue<OpAddEntry> pendingAddEntries = new ConcurrentLinkedQueue<>(); // ////////////////////////////////////////////////////////////////////// @@ -488,10 +487,11 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) } OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, ctx); - pendingAddEntries.add(addOperation); // Jump to specific thread to avoid contention from writers writing from different threads executor.executeOrdered(name, safeRun(() -> { + pendingAddEntries.add(addOperation); + internalAsyncAddEntry(addOperation); })); } @@ -1197,7 +1197,7 @@ public synchronized void updateLedgersIdsComplete(Stat stat) { } // Process all the pending addEntry requests - for (OpAddEntry op : pendingAddEntries.toList()) { + for (OpAddEntry op : pendingAddEntries) { op.setLedger(currentLedger); ++currentLedgerEntries; currentLedgerSize += op.data.readableBytes(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services