jai1 closed pull request #1718: Fix: handle invalid markDelete position at 
managed-cursor
URL: https://github.com/apache/incubator-pulsar/pull/1718
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 22cdf3d0f4..81727841b6 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1291,7 +1291,7 @@ public void asyncMarkDelete(final Position position, 
Map<String, Long> propertie
             final MarkDeleteCallback callback, final Object ctx) {
         checkNotNull(position);
         checkArgument(position instanceof PositionImpl);
-
+        
         if (STATE_UPDATER.get(this) == State.Closed) {
             callback.markDeleteFailed(new ManagedLedgerException("Cursor was 
already closed"), ctx);
             return;
@@ -1312,6 +1312,16 @@ public void asyncMarkDelete(final Position position, 
Map<String, Long> propertie
             log.debug("[{}] Mark delete cursor {} up to position: {}", 
ledger.getName(), name, position);
         }
         PositionImpl newPosition = (PositionImpl) position;
+        
+        if (((PositionImpl) 
ledger.getLastConfirmedEntry()).compareTo(newPosition) < 0) {
+            if (log.isDebugEnabled()) {
+                log.debug(
+                        "[{}] Failed mark delete due to invalid markDelete {} 
is ahead of last-confirmed-entry {} for cursor [{}]",
+                        ledger.getName(), position, 
ledger.getLastConfirmedEntry(), name);
+            }
+            callback.markDeleteFailed(new ManagedLedgerException("Invalid mark 
deleted position"), ctx);
+            return;
+        }
 
         lock.writeLock().lock();
         try {
@@ -1509,6 +1519,16 @@ public void asyncDelete(Position pos, final 
AsyncCallbacks.DeleteCallback callba
                         ledger.getName(), name, pos, 
individualDeletedMessages, markDeletePosition, previousPosition);
             }
 
+            if (((PositionImpl) 
ledger.getLastConfirmedEntry()).compareTo(position) < 0) {
+                if (log.isDebugEnabled()) {
+                    log.debug(
+                            "[{}] Failed mark delete due to invalid markDelete 
{} is ahead of last-confirmed-entry {} for cursor [{}]",
+                            ledger.getName(), position, 
ledger.getLastConfirmedEntry(), name);
+                }
+                callback.deleteFailed(new ManagedLedgerException("Invalid mark 
deleted position"), ctx);
+                return;
+            }
+
             if (individualDeletedMessages.contains(position) || 
position.compareTo(markDeletePosition) <= 0) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] [{}] Position was already deleted {}", 
ledger.getName(), name, position);
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 61e9b3da78..59e159456c 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -49,6 +49,7 @@
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -2614,5 +2615,60 @@ public void operationFailed(MetaStoreException e) {
         assertEquals(entries.size(), totalAddEntries / 2);
     }
 
+    @Test
+    public void testInvalidMarkDelete() throws Exception {
+        ManagedLedger ledger = factory.open("my_test_ledger", new 
ManagedLedgerConfig());
+
+        ManagedCursor cursor = ledger.openCursor("c1");
+        Position readPosition = cursor.getReadPosition();
+        Position markDeletePosition = cursor.getMarkDeletedPosition();
+
+        List<Position> addedPositions = new ArrayList<>();
+        for (int i = 0; i < 20; i++) {
+            Position p = ledger.addEntry(("dummy-entry-" + 
i).getBytes(Encoding));
+            addedPositions.add(p);
+        }
+
+        // validate: cursor.asyncMarkDelete(..)
+        CountDownLatch markDeleteCallbackLatch = new CountDownLatch(1);
+        Position position = PositionImpl.get(100, 100);
+        AtomicBoolean markDeleteCallFailed = new AtomicBoolean(false);
+        cursor.asyncMarkDelete(position, new MarkDeleteCallback() {
+            @Override
+            public void markDeleteComplete(Object ctx) {
+                markDeleteCallbackLatch.countDown();
+            }
+
+            @Override
+            public void markDeleteFailed(ManagedLedgerException exception, 
Object ctx) {
+                markDeleteCallFailed.set(true);
+                markDeleteCallbackLatch.countDown();
+            }
+        }, null);
+        markDeleteCallbackLatch.await();
+        assertEquals(readPosition, cursor.getReadPosition());
+        assertEquals(markDeletePosition, cursor.getMarkDeletedPosition());
+
+        // validate : cursor.asyncDelete(..)
+        CountDownLatch deleteCallbackLatch = new CountDownLatch(1);
+        markDeleteCallFailed.set(false);
+        cursor.asyncDelete(position, new DeleteCallback() {
+            @Override
+            public void deleteComplete(Object ctx) {
+                deleteCallbackLatch.countDown();
+            }
+
+            @Override
+            public void deleteFailed(ManagedLedgerException exception, Object 
ctx) {
+                markDeleteCallFailed.set(true);
+                deleteCallbackLatch.countDown();
+            }
+        }, null);
+
+        deleteCallbackLatch.await();
+        assertEquals(readPosition, cursor.getReadPosition());
+        assertEquals(markDeletePosition, cursor.getMarkDeletedPosition());
+    }
+    
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorTest.class);
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to