sijie closed pull request #1548: Fixed race condition intruduced managed ledger 
addEntry introduced in #1521
URL: https://github.com/apache/incubator-pulsar/pull/1548
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index bf2cd8c587..c60bd49827 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -92,7 +92,6 @@
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
-import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -202,7 +201,7 @@
      * Queue of pending entries to be added to the managed ledger. Typically 
entries are queued when a new ledger is
      * created asynchronously and hence there is no ready ledger to write into.
      */
-    final GrowableArrayBlockingQueue<OpAddEntry> pendingAddEntries = new 
GrowableArrayBlockingQueue<>();
+    final ConcurrentLinkedQueue<OpAddEntry> pendingAddEntries = new 
ConcurrentLinkedQueue<>();
 
     // //////////////////////////////////////////////////////////////////////
 
@@ -488,10 +487,11 @@ public void asyncAddEntry(ByteBuf buffer, 
AddEntryCallback callback, Object ctx)
         }
 
         OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, 
ctx);
-        pendingAddEntries.add(addOperation);
 
         // Jump to specific thread to avoid contention from writers writing 
from different threads
         executor.executeOrdered(name, safeRun(() -> {
+            pendingAddEntries.add(addOperation);
+
             internalAsyncAddEntry(addOperation);
         }));
     }
@@ -1197,7 +1197,7 @@ public synchronized void updateLedgersIdsComplete(Stat 
stat) {
         }
 
         // Process all the pending addEntry requests
-        for (OpAddEntry op : pendingAddEntries.toList()) {
+        for (OpAddEntry op : pendingAddEntries) {
             op.setLedger(currentLedger);
             ++currentLedgerEntries;
             currentLedgerSize += op.data.readableBytes();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to