This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 3de8358b22a [fix][broker] Fix cursor should use latest ledger config 
(#22644)
3de8358b22a is described below

commit 3de8358b22aa48bb3fba5d301938609868791924
Author: Zixuan Liu <node...@gmail.com>
AuthorDate: Fri May 10 10:37:44 2024 +0800

    [fix][broker] Fix cursor should use latest ledger config (#22644)
    
    Signed-off-by: Zixuan Liu <node...@gmail.com>
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 59 +++++++++++-----------
 .../mledger/impl/ManagedCursorMXBeanImpl.java      |  3 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  8 +--
 .../mledger/impl/NonDurableCursorImpl.java         |  5 +-
 .../bookkeeper/mledger/impl/OpReadEntry.java       |  3 +-
 .../bookkeeper/mledger/impl/RangeSetWrapper.java   |  2 +-
 .../mledger/impl/ReadOnlyCursorImpl.java           |  5 +-
 .../mledger/impl/ReadOnlyManagedLedgerImpl.java    |  2 +-
 ...ManagedCursorIndividualDeletedMessagesTest.java |  3 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  7 ++-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  2 +-
 .../broker/service/BrokerBkEnsemblesTests.java     |  8 +--
 .../service/persistent/PersistentTopicTest.java    | 25 +++++++++
 13 files changed, 76 insertions(+), 56 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 7065af203da..911eca48bac 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -119,7 +119,6 @@ public class ManagedCursorImpl implements ManagedCursor {
         return 0;
     };
     protected final BookKeeper bookkeeper;
-    protected final ManagedLedgerConfig config;
     protected final ManagedLedgerImpl ledger;
     private final String name;
 
@@ -299,31 +298,30 @@ public class ManagedCursorImpl implements ManagedCursor {
         void operationFailed(ManagedLedgerException exception);
     }
 
-    ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, 
ManagedLedgerImpl ledger, String cursorName) {
+    ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerImpl ledger, String 
cursorName) {
         this.bookkeeper = bookkeeper;
         this.cursorProperties = Collections.emptyMap();
-        this.config = config;
         this.ledger = ledger;
         this.name = cursorName;
         this.individualDeletedMessages = new 
RangeSetWrapper<>(positionRangeConverter,
                 positionRangeReverseConverter, this);
-        if (config.isDeletionAtBatchIndexLevelEnabled()) {
+        if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
             this.batchDeletedIndexes = new ConcurrentSkipListMap<>();
         } else {
             this.batchDeletedIndexes = null;
         }
-        this.digestType = 
BookKeeper.DigestType.fromApiDigestType(config.getDigestType());
+        this.digestType = 
BookKeeper.DigestType.fromApiDigestType(getConfig().getDigestType());
         STATE_UPDATER.set(this, State.Uninitialized);
         PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.set(this, 0);
         PENDING_READ_OPS_UPDATER.set(this, 0);
         RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE);
         WAITING_READ_OP_UPDATER.set(this, null);
-        this.clock = config.getClock();
+        this.clock = getConfig().getClock();
         this.lastActive = this.clock.millis();
         this.lastLedgerSwitchTimestamp = this.clock.millis();
 
-        if (config.getThrottleMarkDelete() > 0.0) {
-            markDeleteLimiter = 
RateLimiter.create(config.getThrottleMarkDelete());
+        if (getConfig().getThrottleMarkDelete() > 0.0) {
+            markDeleteLimiter = 
RateLimiter.create(getConfig().getThrottleMarkDelete());
         } else {
             // Disable mark-delete rate limiter
             markDeleteLimiter = null;
@@ -602,7 +600,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
                     
recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
                 }
-                if (config.isDeletionAtBatchIndexLevelEnabled()
+                if (getConfig().isDeletionAtBatchIndexLevelEnabled()
                     && positionInfo.getBatchedEntryDeletionIndexInfoCount() > 
0) {
                     
recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
                 }
@@ -611,7 +609,8 @@ public class ManagedCursorImpl implements ManagedCursor {
             }, null);
         };
         try {
-            bookkeeper.asyncOpenLedger(ledgerId, digestType, 
config.getPassword(), openCallback, null);
+            bookkeeper.asyncOpenLedger(ledgerId, digestType, 
getConfig().getPassword(), openCallback,
+                    null);
         } catch (Throwable t) {
             log.error("[{}] Encountered error on opening cursor ledger {} for 
cursor {}",
                 ledger.getName(), ledgerId, name, t);
@@ -968,10 +967,10 @@ public class ManagedCursorImpl implements ManagedCursor {
 
             // Check again for new entries after the configured time, then if 
still no entries are available register
             // to be notified
-            if (config.getNewEntriesCheckDelayInMillis() > 0) {
+            if (getConfig().getNewEntriesCheckDelayInMillis() > 0) {
                 ledger.getScheduledExecutor()
                         .schedule(() -> checkForNewEntries(op, callback, ctx),
-                                config.getNewEntriesCheckDelayInMillis(), 
TimeUnit.MILLISECONDS);
+                                getConfig().getNewEntriesCheckDelayInMillis(), 
TimeUnit.MILLISECONDS);
             } else {
                 // If there's no delay, check directly from the same thread
                 checkForNewEntries(op, callback, ctx);
@@ -1319,7 +1318,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     lastMarkDeleteEntry = new 
MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor()
                             ? getProperties() : Collections.emptyMap(), null, 
null);
                     individualDeletedMessages.clear();
-                    if (config.isDeletionAtBatchIndexLevelEnabled()) {
+                    if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
                         
batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
                         batchDeletedIndexes.clear();
                         long[] resetWords = newReadPosition.ackSet;
@@ -1578,7 +1577,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
         lock.readLock().lock();
         try {
-            if (config.isUnackedRangesOpenCacheSetEnabled()) {
+            if (getConfig().isUnackedRangesOpenCacheSetEnabled()) {
                 int cardinality = individualDeletedMessages.cardinality(
                         range.lowerEndpoint().ledgerId, 
range.lowerEndpoint().entryId,
                         range.upperEndpoint().ledgerId, 
range.upperEndpoint().entryId);
@@ -1958,7 +1957,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
         PositionImpl newPosition = (PositionImpl) position;
 
-        if (config.isDeletionAtBatchIndexLevelEnabled()) {
+        if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
             if (newPosition.ackSet != null) {
                 AtomicReference<BitSetRecyclable> bitSetRecyclable = new 
AtomicReference<>();
                 BitSetRecyclable givenBitSet = 
BitSetRecyclable.create().resetWords(newPosition.ackSet);
@@ -2141,7 +2140,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 try {
                     
individualDeletedMessages.removeAtMost(mdEntry.newPosition.getLedgerId(),
                             mdEntry.newPosition.getEntryId());
-                    if (config.isDeletionAtBatchIndexLevelEnabled()) {
+                    if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
                         Map<PositionImpl, BitSetRecyclable> subMap = 
batchDeletedIndexes.subMap(PositionImpl.EARLIEST,
                                 false, 
PositionImpl.get(mdEntry.newPosition.getLedgerId(),
                                 mdEntry.newPosition.getEntryId()), true);
@@ -2279,7 +2278,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 }
 
                 if (isMessageDeleted(position)) {
-                    if (config.isDeletionAtBatchIndexLevelEnabled()) {
+                    if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
                         BitSetRecyclable bitSetRecyclable = 
batchDeletedIndexes.remove(position);
                         if (bitSetRecyclable != null) {
                             bitSetRecyclable.recycle();
@@ -2291,7 +2290,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     continue;
                 }
                 if (position.ackSet == null) {
-                    if (config.isDeletionAtBatchIndexLevelEnabled()) {
+                    if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
                         BitSetRecyclable bitSetRecyclable = 
batchDeletedIndexes.remove(position);
                         if (bitSetRecyclable != null) {
                             bitSetRecyclable.recycle();
@@ -2308,7 +2307,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                         log.debug("[{}] [{}] Individually deleted messages: 
{}", ledger.getName(), name,
                             individualDeletedMessages);
                     }
-                } else if (config.isDeletionAtBatchIndexLevelEnabled()) {
+                } else if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
                     BitSetRecyclable givenBitSet = 
BitSetRecyclable.create().resetWords(position.ackSet);
                     BitSetRecyclable bitSet = 
batchDeletedIndexes.computeIfAbsent(position, (v) -> givenBitSet);
                     if (givenBitSet != bitSet) {
@@ -2655,8 +2654,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     private boolean shouldPersistUnackRangesToLedger() {
         return cursorLedger != null
                 && !isCursorLedgerReadOnly
-                && config.getMaxUnackedRangesToPersist() > 0
-                && individualDeletedMessages.size() > 
config.getMaxUnackedRangesToPersistInMetadataStore();
+                && getConfig().getMaxUnackedRangesToPersist() > 0
+                && individualDeletedMessages.size() > 
getConfig().getMaxUnackedRangesToPersistInMetadataStore();
     }
 
     private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl 
position, Map<String, Long> properties,
@@ -2681,7 +2680,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         
info.addAllCursorProperties(buildStringPropertiesMap(cursorProperties));
         if (persistIndividualDeletedMessageRanges) {
             
info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
-            if (config.isDeletionAtBatchIndexLevelEnabled()) {
+            if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
                 
info.addAllBatchedEntryDeletionIndexInfo(buildBatchEntryDeletionIndexInfoList());
             }
         }
@@ -2946,7 +2945,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     private CompletableFuture<LedgerHandle> doCreateNewMetadataLedger() {
         CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
-        ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) 
-> {
+        ledger.asyncCreateLedger(bookkeeper, getConfig(), digestType, (rc, lh, 
ctx) -> {
 
             if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
                 future.complete(null);
@@ -3051,7 +3050,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 acksSerializedSize.addAndGet(messageRange.getSerializedSize());
                 rangeList.add(messageRange);
 
-                return rangeList.size() <= 
config.getMaxUnackedRangesToPersist();
+                return rangeList.size() <= 
getConfig().getMaxUnackedRangesToPersist();
             });
 
             this.individualDeletedMessagesSerializedSize = 
acksSerializedSize.get();
@@ -3065,7 +3064,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     private List<MLDataFormats.BatchedEntryDeletionIndexInfo> 
buildBatchEntryDeletionIndexInfoList() {
         lock.readLock().lock();
         try {
-            if (!config.isDeletionAtBatchIndexLevelEnabled() || 
batchDeletedIndexes.isEmpty()) {
+            if (!getConfig().isDeletionAtBatchIndexLevelEnabled() || 
batchDeletedIndexes.isEmpty()) {
                 return Collections.emptyList();
             }
             MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = 
MLDataFormats.NestedPositionInfo
@@ -3074,7 +3073,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     .BatchedEntryDeletionIndexInfo.newBuilder();
             List<MLDataFormats.BatchedEntryDeletionIndexInfo> result = new 
ArrayList<>();
             Iterator<Map.Entry<PositionImpl, BitSetRecyclable>> iterator = 
batchDeletedIndexes.entrySet().iterator();
-            while (iterator.hasNext() && result.size() < 
config.getMaxBatchDeletedIndexToPersist()) {
+            while (iterator.hasNext() && result.size() < 
getConfig().getMaxBatchDeletedIndexToPersist()) {
                 Map.Entry<PositionImpl, BitSetRecyclable> entry = 
iterator.next();
                 
nestedPositionBuilder.setLedgerId(entry.getKey().getLedgerId());
                 nestedPositionBuilder.setEntryId(entry.getKey().getEntryId());
@@ -3170,8 +3169,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     boolean shouldCloseLedger(LedgerHandle lh) {
         long now = clock.millis();
         if (ledger.getFactory().isMetadataServiceAvailable()
-                && (lh.getLastAddConfirmed() >= 
config.getMetadataMaxEntriesPerLedger()
-                || lastLedgerSwitchTimestamp < (now - 
config.getLedgerRolloverTimeout() * 1000))
+                && (lh.getLastAddConfirmed() >= 
getConfig().getMetadataMaxEntriesPerLedger()
+                || lastLedgerSwitchTimestamp < (now - 
getConfig().getLedgerRolloverTimeout() * 1000))
                 && (STATE_UPDATER.get(this) != State.Closed && 
STATE_UPDATER.get(this) != State.Closing)) {
             // It's safe to modify the timestamp since this method will be 
only called from a callback, implying that
             // calls will be serialized on one single thread
@@ -3527,7 +3526,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     @Override
     public long[] getDeletedBatchIndexesAsLongArray(PositionImpl position) {
-        if (config.isDeletionAtBatchIndexLevelEnabled()) {
+        if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
             BitSetRecyclable bitSet = batchDeletedIndexes.get(position);
             return bitSet == null ? null : bitSet.toLongArray();
         } else {
@@ -3628,7 +3627,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorImpl.class);
 
     public ManagedLedgerConfig getConfig() {
-        return config;
+        return getManagedLedger().getConfig();
     }
 
     /***
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java
index 48465e6294b..a183c0d61ce 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java
@@ -90,7 +90,8 @@ public class ManagedCursorMXBeanImpl implements 
ManagedCursorMXBean {
 
     @Override
     public void addWriteCursorLedgerSize(final long size) {
-        writeCursorLedgerSize.add(size * ((ManagedCursorImpl) 
managedCursor).config.getWriteQuorumSize());
+        writeCursorLedgerSize.add(
+                size * 
managedCursor.getManagedLedger().getConfig().getWriteQuorumSize());
         writeCursorLedgerLogicalSize.add(size);
     }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index bd74629e605..fa2dc45357e 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -578,7 +578,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                     for (final String cursorName : consumers) {
                         log.info("[{}] Loading cursor {}", name, cursorName);
                         final ManagedCursorImpl cursor;
-                        cursor = new ManagedCursorImpl(bookKeeper, config, 
ManagedLedgerImpl.this, cursorName);
+                        cursor = new ManagedCursorImpl(bookKeeper, 
ManagedLedgerImpl.this, cursorName);
 
                         cursor.recover(new VoidCallback() {
                             @Override
@@ -609,7 +609,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                             log.debug("[{}] Recovering cursor {} lazily", 
name, cursorName);
                         }
                         final ManagedCursorImpl cursor;
-                        cursor = new ManagedCursorImpl(bookKeeper, config, 
ManagedLedgerImpl.this, cursorName);
+                        cursor = new ManagedCursorImpl(bookKeeper, 
ManagedLedgerImpl.this, cursorName);
                         CompletableFuture<ManagedCursor> cursorRecoveryFuture 
= new CompletableFuture<>();
                         uninitializedCursors.put(cursorName, 
cursorRecoveryFuture);
 
@@ -991,7 +991,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Creating new cursor: {}", name, cursorName);
         }
-        final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper, 
config, this, cursorName);
+        final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper, 
this, cursorName);
         CompletableFuture<ManagedCursor> cursorFuture = new 
CompletableFuture<>();
         uninitializedCursors.put(cursorName, cursorFuture);
         PositionImpl position = InitialPosition.Earliest == initialPosition ? 
getFirstPosition() : getLastPosition();
@@ -1124,7 +1124,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             return cachedCursor;
         }
 
-        NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, 
config, this, cursorName,
+        NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, 
this, cursorName,
                 (PositionImpl) startCursorPosition, initialPosition, 
isReadCompacted);
         cursor.setActive();
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 77216ce2e45..734eab20bc5 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -25,7 +25,6 @@ import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
-import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.slf4j.Logger;
@@ -35,10 +34,10 @@ public class NonDurableCursorImpl extends ManagedCursorImpl 
{
 
     private final boolean readCompacted;
 
-    NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, 
ManagedLedgerImpl ledger, String cursorName,
+    NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerImpl ledger, 
String cursorName,
                          PositionImpl startCursorPosition, 
CommandSubscribe.InitialPosition initialPosition,
                          boolean isReadCompacted) {
-        super(bookkeeper, config, ledger, cursorName);
+        super(bookkeeper, ledger, cursorName);
         this.readCompacted = isReadCompacted;
 
         // Compare with "latest" position marker by using only the ledger id. 
Since the C++ client is using 48bits to
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index a79ba3fb5e2..534ef3d76cb 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -111,7 +111,8 @@ class OpReadEntry implements ReadEntriesCallback {
                 callback.readEntriesComplete(entries, ctx);
                 recycle();
             });
-        } else if (cursor.config.isAutoSkipNonRecoverableData() && exception 
instanceof NonRecoverableLedgerException) {
+        } else if (cursor.getConfig().isAutoSkipNonRecoverableData()
+                && exception instanceof NonRecoverableLedgerException) {
             log.warn("[{}][{}] read failed from ledger at position:{} : {}", 
cursor.ledger.getName(), cursor.getName(),
                     readPosition, exception.getMessage());
             final ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
cursor.getManagedLedger();
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
index 02e43504482..f235ffc63ac 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
@@ -52,7 +52,7 @@ public class RangeSetWrapper<T extends Comparable<T>> 
implements LongPairRangeSe
                            RangeBoundConsumer<T> rangeBoundConsumer,
                            ManagedCursorImpl managedCursor) {
         requireNonNull(managedCursor);
-        this.config = managedCursor.getConfig();
+        this.config = managedCursor.getManagedLedger().getConfig();
         this.rangeConverter = rangeConverter;
         this.rangeSet = config.isUnackedRangesOpenCacheSetEnabled()
                 ? new ConcurrentOpenLongPairRangeSet<>(4096, rangeConverter)
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
index 1661613f07d..2461bcf780e 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Range;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
-import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.ReadOnlyCursor;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
@@ -31,9 +30,9 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 @Slf4j
 public class ReadOnlyCursorImpl extends ManagedCursorImpl implements 
ReadOnlyCursor {
 
-    public ReadOnlyCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig 
config, ManagedLedgerImpl ledger,
+    public ReadOnlyCursorImpl(BookKeeper bookkeeper, ManagedLedgerImpl ledger,
                               PositionImpl startPosition, String cursorName) {
-        super(bookkeeper, config, ledger, cursorName);
+        super(bookkeeper, ledger, cursorName);
 
         if (startPosition.equals(PositionImpl.EARLIEST)) {
             readPosition = ledger.getFirstPosition().getNext();
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
index 707b71c9d9f..d8449635999 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
@@ -143,7 +143,7 @@ public class ReadOnlyManagedLedgerImpl extends 
ManagedLedgerImpl {
             }
         }
 
-        return new ReadOnlyCursorImpl(bookKeeper, config, this, startPosition, 
"read-only-cursor");
+        return new ReadOnlyCursorImpl(bookKeeper, this, startPosition, 
"read-only-cursor");
     }
 
     @Override
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java
index aa0d04783d9..864c25c6c43 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java
@@ -56,8 +56,9 @@ public class ManagedCursorIndividualDeletedMessagesTest {
 
         ManagedLedgerImpl ledger = mock(ManagedLedgerImpl.class);
         doReturn(ledgersInfo).when(ledger).getLedgersInfo();
+        doReturn(config).when(ledger).getConfig();
 
-        ManagedCursorImpl cursor = spy(new ManagedCursorImpl(bookkeeper, 
config, ledger, "test-cursor"));
+        ManagedCursorImpl cursor = spy(new ManagedCursorImpl(bookkeeper, 
ledger, "test-cursor"));
         LongPairRangeSet<PositionImpl> deletedMessages = 
cursor.getIndividuallyDeletedMessagesSet();
 
         Method recoverMethod = 
ManagedCursorImpl.class.getDeclaredMethod("recoverIndividualDeletedMessages",
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 5c10533e247..4c95454e33a 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -3465,10 +3465,10 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         when(ml.getNextValidLedger(markDeleteLedgerId)).thenReturn(3L);
         when(ml.getNextValidPosition(lastPosition)).thenReturn(nextPosition);
         when(ml.ledgerExists(markDeleteLedgerId)).thenReturn(false);
+        when(ml.getConfig()).thenReturn(new ManagedLedgerConfig());
 
         BookKeeper mockBookKeeper = mock(BookKeeper.class);
-        final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, 
new ManagedLedgerConfig(), ml,
-                cursorName);
+        final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, 
ml, cursorName);
 
         cursor.recover(new VoidCallback() {
             @Override
@@ -4772,8 +4772,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         // Reopen the ledger.
         ledger = (ManagedLedgerImpl) factory.open(mlName, config);
         BookKeeper mockBookKeeper = mock(BookKeeper.class);
-        final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, 
new ManagedLedgerConfig(), ledger,
-                cursorName);
+        final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, 
ledger, cursorName);
 
         CompletableFuture<Void> recoverFuture = new CompletableFuture<>();
         // Recover the cursor.
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index e983523c1b6..122bada487a 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3159,7 +3159,7 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         // (2) test read-timeout for: ManagedLedger.asyncReadEntry(..)
         AtomicReference<ManagedLedgerException> responseException2 = new 
AtomicReference<>();
         PositionImpl readPositionRef = PositionImpl.EARLIEST;
-        ManagedCursorImpl cursor = new ManagedCursorImpl(bk, config, ledger, 
"cursor1");
+        ManagedCursorImpl cursor = new ManagedCursorImpl(bk, ledger, 
"cursor1");
         OpReadEntry opReadEntry = OpReadEntry.create(cursor, readPositionRef, 
1, new ReadEntriesCallback() {
 
             @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 42b9358911a..82892ad353a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -210,10 +210,8 @@ public class BrokerBkEnsemblesTests extends 
BkEnsemblesTestBase {
         PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topic1).get();
         ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
         ManagedCursorImpl cursor = (ManagedCursorImpl) 
ml.getCursors().iterator().next();
-        Field configField = ManagedCursorImpl.class.getDeclaredField("config");
-        configField.setAccessible(true);
         // Create multiple data-ledger
-        ManagedLedgerConfig config = (ManagedLedgerConfig) 
configField.get(cursor);
+        ManagedLedgerConfig config = ml.getConfig();
         config.setMaxEntriesPerLedger(entriesPerLedger);
         config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
         // bookkeeper client
@@ -323,10 +321,8 @@ public class BrokerBkEnsemblesTests extends 
BkEnsemblesTestBase {
         PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topic1).get();
         ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
         ManagedCursorImpl cursor = (ManagedCursorImpl) 
ml.getCursors().iterator().next();
-        Field configField = ManagedCursorImpl.class.getDeclaredField("config");
-        configField.setAccessible(true);
         // Create multiple data-ledger
-        ManagedLedgerConfig config = (ManagedLedgerConfig) 
configField.get(cursor);
+        ManagedLedgerConfig config = ml.getConfig();
         config.setMaxEntriesPerLedger(entriesPerLedger);
         config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
         // bookkeeper client
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 8130c818e3a..9fa960796ea 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -64,6 +64,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.PrometheusMetricsTestUtil;
@@ -753,6 +754,30 @@ public class PersistentTopicTest extends BrokerTestBase {
         admin.topics().delete(topicName);
     }
 
+    @Test
+    public void testCursorGetConfigAfterTopicPoliciesChanged() throws 
Exception {
+        final String topicName = "persistent://prop/ns-abc/" + 
UUID.randomUUID();
+        final String subName = "test_sub";
+
+        @Cleanup
+        Consumer<byte[]> subscribe = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        PersistentSubscription subscription = 
persistentTopic.getSubscription(subName);
+
+        int maxConsumers = 100;
+        admin.topicPolicies().setMaxConsumers(topicName, 100);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(admin.topicPolicies().getMaxConsumers(topicName, 
false), maxConsumers);
+        });
+
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
subscription.getCursor();
+        assertEquals(cursor.getConfig(), 
persistentTopic.getManagedLedger().getConfig());
+
+        subscribe.close();
+        admin.topics().delete(topicName);
+    }
+
     @Test
     public void testAddWaitingCursorsForNonDurable() throws Exception {
         final String ns = "prop/ns-test";

Reply via email to