This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 86c69e86db65c0e1d54a89dba41a6a2879b29767
Author: lipenghui <peng...@apache.org>
AuthorDate: Tue Jun 16 09:20:18 2020 +0800

    Avoid introduce null read position for the managed cursor. (#7264)
    
    ### Motivation
    
    Avoid introduce null read position for the managed cursor.
    
    Here is the error log related to null read position:
    ```
    18:52:13.366 [pulsar-stats-updater-23-1] ERROR 
org.apache.pulsar.broker.service.persistent.PersistentTopic - Got exception 
when creating consumer stats for subscription 
itom-di-dp-preload_chotest_2-reader-4bd4e3dd50: null
    java.lang.NullPointerException: null
        at 
com.google.common.base.Preconditions.checkNotNull(Preconditions.java:877) 
~[com.google.guava-guava-25.1-jre.jar:?]
        at 
org.apache.bookkeeper.mledger.impl.PositionImpl.compareTo(PositionImpl.java:92) 
~[org.apache.pulsar-managed-ledger-2.5.2.jar:2.5.2]
        at 
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.getNumberOfEntriesSinceFirstNotAckedMessage(ManagedCursorImpl.java:721)
 ~[org.apache.pulsar-managed-ledger-2.5.2.jar:2.5.2]
        at 
org.apache.pulsar.broker.service.persistent.PersistentSubscription.getNumberOfEntriesSinceFirstNotAckedMessage(PersistentSubscription.java:790)
 ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
        at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$updateRates$46(PersistentTopic.java:1419)
 ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
 ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
 ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
        at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.updateRates(PersistentTopic.java:1387)
 ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
        at 
org.apache.pulsar.broker.service.PulsarStats.lambda$null$1(PulsarStats.java:134)
 ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
 ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
 ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
        at 
org.apache.pulsar.broker.service.PulsarStats.lambda$null$3(PulsarStats.java:131)
 ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
 ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
 ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
        at 
org.apache.pulsar.broker.service.PulsarStats.lambda$updateStats$4(PulsarStats.java:120)
 ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
 ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
 ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
        at 
org.apache.pulsar.broker.service.PulsarStats.updateStats(PulsarStats.java:110) 
~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
        at 
org.apache.pulsar.broker.service.BrokerService.updateRates(BrokerService.java:1145)
 ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
        at 
org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) 
[org.apache.pulsar-managed-ledger-2.5.2.jar:2.5.2]
        at 
org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) 
[org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_242]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
[?:1.8.0_242]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_242]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 [?:1.8.0_242]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_242]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_242]
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
    ```
    The most doubtful thing is `getNextValidPosition` method in the 
ManagedLedgerImpl. If given a position which greater than the last add 
position, it will return a null value. This may cause the read position to 
become null. But I haven’t found how this situation appears. So in the PR, I 
added a log and print the stack trace which can help us to find the root cause 
and failback to the next position of the last position if the null next valid 
position occurs.
    
    (cherry picked from commit 7955cef6c5dff2f22cfc91e53d1d29562f232846)
---
 .../apache/bookkeeper/mledger/impl/ManagedCursorImpl.java   |  2 +-
 .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java   | 13 ++++++++++++-
 .../apache/bookkeeper/mledger/impl/ManagedLedgerTest.java   |  2 ++
 .../pulsar/broker/service/persistent/PersistentTopic.java   |  2 +-
 4 files changed, 16 insertions(+), 3 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 304429d..66e977f 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -780,7 +780,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         // validate it before preparing range
         PositionImpl markDeletePosition = this.markDeletePosition;
         PositionImpl readPosition = this.readPosition;
-        return (markDeletePosition.compareTo(readPosition) < 0)
+        return (markDeletePosition != null && readPosition != null && 
markDeletePosition.compareTo(readPosition) < 0)
                 ? 
ledger.getNumberOfEntries(Range.openClosed(markDeletePosition, readPosition))
                 : 0;
     }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 4b3937f..27d8848 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2864,11 +2864,22 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
     }
 
     public PositionImpl getNextValidPosition(final PositionImpl position) {
+        PositionImpl next;
+        try {
+            next = getNextValidPositionInternal(position);
+        } catch (NullPointerException e) {
+            next = lastConfirmedEntry.getNext();
+            log.error("[{}] Can't find next valid position, fail back to the 
next position of the last position.", name, e);
+        }
+        return next;
+    }
+
+    public PositionImpl getNextValidPositionInternal(final PositionImpl 
position) {
         PositionImpl nextPosition = position.getNext();
         while (!isValidPosition(nextPosition)) {
             Long nextLedgerId = ledgers.ceilingKey(nextPosition.getLedgerId() 
+ 1);
             if (nextLedgerId == null) {
-                return null;
+                throw new NullPointerException();
             }
             nextPosition = PositionImpl.get(nextLedgerId, 0);
         }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index b5b702f..e507c99 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -1966,6 +1966,8 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         assertEquals(ledger.getNextValidPosition((PositionImpl) 
c1.getMarkDeletedPosition()), p1);
         assertEquals(ledger.getNextValidPosition(p1), p2);
         assertEquals(ledger.getNextValidPosition(p3), 
PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
+        
assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId(), 
p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
+        
assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId() + 1, 
p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a7fa72d..d7fcd41 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -624,7 +624,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 future.completeExceptionally(e);
             }
         }).exceptionally(ex -> {
-            log.warn("[{}] Failed to create subscription: {} error: {}", 
topic, subscriptionName, ex.getMessage());
+            log.error("[{}] Failed to create subscription: {} error: {}", 
topic, subscriptionName, ex);
             USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this);
             future.completeExceptionally(new PersistenceException(ex));
             return null;

Reply via email to