This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b22769b  Fixed flaky test in managed ledger close path (#1539)
b22769b is described below

commit b22769bb57defe435cc4faedcbebe1d3260cdee4
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Tue Apr 10 12:33:22 2018 -0700

    Fixed flaky test in managed ledger close path (#1539)
    
    * Fixed flaky test in managed ledger close path: 
ManagedLedgerBkTest.managedLedgerClosed
    
    * Fixed cursorPersistenceAsyncMarkDeleteSameThread test
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  5 ++++-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 26 +++++++++++-----------
 2 files changed, 17 insertions(+), 14 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index b043dc8..2773090 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1368,7 +1368,10 @@ public class ManagedCursorImpl implements ManagedCursor {
         case Open:
             if (pendingReadOps > 0) {
                 // Wait until no read operation are pending
-                pendingMarkDeleteOps.add(mdEntry);
+                if (!pendingMarkDeleteOps.offer(mdEntry)) {
+                    callback.markDeleteFailed(new 
ManagedLedgerException("Cursor queue of mark-delete operations full"), ctx);
+                    return;
+                }
                 if (pendingReadOps == 0) {
                     // If the value changed while enqueuing, trigger a flush 
to make sure we don't delay current request
                     flushPendingMarkDeletes();
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 77b1dff..bf2cd8c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -53,9 +53,8 @@ import 
org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -487,17 +486,6 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         if (log.isDebugEnabled()) {
             log.debug("[{}] asyncAddEntry size={} state={}", name, 
buffer.readableBytes(), state);
         }
-        final State state = STATE_UPDATER.get(this);
-        if (state == State.Fenced) {
-            callback.addFailed(new ManagedLedgerFencedException(), ctx);
-            return;
-        } else if (state == State.Terminated) {
-            callback.addFailed(new ManagedLedgerTerminatedException("Managed 
ledger was already terminated"), ctx);
-            return;
-        } else if (state == State.Closed) {
-            callback.addFailed(new ManagedLedgerException("Managed ledger was 
already closed"), ctx);
-            return;
-        }
 
         OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, 
ctx);
         pendingAddEntries.add(addOperation);
@@ -509,6 +497,18 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     }
 
     private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
+        final State state = STATE_UPDATER.get(this);
+        if (state == State.Fenced) {
+            addOperation.failed(new ManagedLedgerFencedException());
+            return;
+        } else if (state == State.Terminated) {
+            addOperation.failed(new ManagedLedgerTerminatedException("Managed 
ledger was already terminated"));
+            return;
+        } else if (state == State.Closed) {
+            addOperation.failed(new 
ManagedLedgerAlreadyClosedException("Managed ledger was already closed"));
+            return;
+        }
+
         if (state == State.ClosingLedger || state == State.CreatingLedger) {
             // We don't have a ready ledger to write into
             // We are waiting for a new ledger to be created

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to