This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new e33795b Fix: handle invalid markDelete position at managed-cursor (#1554) e33795b is described below commit e33795b7ce43a908d2e9d285d66e07cf21626510 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Wed Apr 11 22:02:11 2018 -0700 Fix: handle invalid markDelete position at managed-cursor (#1554) * Fix: handle invalid markDelete position at managed-cursor * Fix position validation --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 22 ++++++++- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 56 ++++++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 34efb5c..1b7555e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1301,7 +1301,7 @@ public class ManagedCursorImpl implements ManagedCursor { final MarkDeleteCallback callback, final Object ctx) { checkNotNull(position); checkArgument(position instanceof PositionImpl); - + if (STATE_UPDATER.get(this) == State.Closed) { callback.markDeleteFailed(new ManagedLedgerException("Cursor was already closed"), ctx); return; @@ -1322,6 +1322,16 @@ public class ManagedCursorImpl implements ManagedCursor { log.debug("[{}] Mark delete cursor {} up to position: {}", ledger.getName(), name, position); } PositionImpl newPosition = (PositionImpl) position; + + if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(newPosition) < 0) { + if (log.isDebugEnabled()) { + log.debug( + "[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]", + ledger.getName(), position, ledger.getLastConfirmedEntry(), name); + } + callback.markDeleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx); + return; + } lock.writeLock().lock(); try { @@ -1537,6 +1547,16 @@ public class ManagedCursorImpl implements ManagedCursor { for (Position pos : positions) { PositionImpl position = (PositionImpl) checkNotNull(pos); + + if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(position) < 0) { + if (log.isDebugEnabled()) { + log.debug( + "[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]", + ledger.getName(), position, ledger.getLastConfirmedEntry(), name); + } + callback.deleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx); + return; + } if (individualDeletedMessages.contains(position) || position.compareTo(markDeletePosition) <= 0) { if (log.isDebugEnabled()) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 8cf5ee4..da15f15 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -46,6 +46,7 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; @@ -2629,5 +2630,60 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { assertEquals(entries.size(), totalAddEntries / 2); } + @Test + public void testInvalidMarkDelete() throws Exception { + ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig()); + + ManagedCursor cursor = ledger.openCursor("c1"); + Position readPosition = cursor.getReadPosition(); + Position markDeletePosition = cursor.getMarkDeletedPosition(); + + List<Position> addedPositions = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + Position p = ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding)); + addedPositions.add(p); + } + + // validate: cursor.asyncMarkDelete(..) + CountDownLatch markDeleteCallbackLatch = new CountDownLatch(1); + Position position = PositionImpl.get(100, 100); + AtomicBoolean markDeleteCallFailed = new AtomicBoolean(false); + cursor.asyncMarkDelete(position, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + markDeleteCallbackLatch.countDown(); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + markDeleteCallFailed.set(true); + markDeleteCallbackLatch.countDown(); + } + }, null); + markDeleteCallbackLatch.await(); + assertEquals(readPosition, cursor.getReadPosition()); + assertEquals(markDeletePosition, cursor.getMarkDeletedPosition()); + + // validate : cursor.asyncDelete(..) + CountDownLatch deleteCallbackLatch = new CountDownLatch(1); + markDeleteCallFailed.set(false); + cursor.asyncDelete(position, new DeleteCallback() { + @Override + public void deleteComplete(Object ctx) { + deleteCallbackLatch.countDown(); + } + + @Override + public void deleteFailed(ManagedLedgerException exception, Object ctx) { + markDeleteCallFailed.set(true); + deleteCallbackLatch.countDown(); + } + }, null); + + deleteCallbackLatch.await(); + assertEquals(readPosition, cursor.getReadPosition()); + assertEquals(markDeletePosition, cursor.getMarkDeletedPosition()); + } + private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); } -- To stop receiving notification emails like this one, please contact mme...@apache.org.