Re: [PR] [fix] Fix concurrent containers concurrency issue [pulsar]

2024-04-26 Thread via GitHub


codecov-commenter commented on PR #22604:
URL: https://github.com/apache/pulsar/pull/22604#issuecomment-2080372023

   ## 
[Codecov](https://app.codecov.io/gh/apache/pulsar/pull/22604?dropdown=coverage=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 Report
   Attention: Patch coverage is `93.3%` with `2 lines` in your changes are 
missing coverage. Please review.
   > Project coverage is 73.96%. Comparing base 
[(`bbc6224`)](https://app.codecov.io/gh/apache/pulsar/commit/bbc62245c5ddba1de4b1e7cee4ab49334bc36277?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 to head 
[(`3a2c24a`)](https://app.codecov.io/gh/apache/pulsar/pull/22604?dropdown=coverage=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache).
   > Report is 201 commits behind head on master.
   
   Additional details and impacted files
   
   
   [![Impacted file tree 
graph](https://app.codecov.io/gh/apache/pulsar/pull/22604/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/22604?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
   
   ```diff
   @@ Coverage Diff  @@
   ## master   #22604  +/-   ##
   
   + Coverage 73.57%   73.96%   +0.38% 
   - Complexity3262433148 +524 
   
 Files  1877 1885   +8 
 Lines139502   140543+1041 
 Branches  1529915431 +132 
   
   + Hits 102638   103946+1308 
   + Misses2890828563 -345 
   - Partials   7956 8034  +78 
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/pulsar/pull/22604/flags?src=pr=flags_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[inttests](https://app.codecov.io/gh/apache/pulsar/pull/22604/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `27.27% <53.33%> (+2.68%)` | :arrow_up: |
   | 
[systests](https://app.codecov.io/gh/apache/pulsar/pull/22604/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `24.61% <56.66%> (+0.28%)` | :arrow_up: |
   | 
[unittests](https://app.codecov.io/gh/apache/pulsar/pull/22604/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `73.24% <93.33%> (+0.39%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   | 
[Files](https://app.codecov.io/gh/apache/pulsar/pull/22604?dropdown=coverage=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[...common/util/collections/ConcurrentLongHashMap.java](https://app.codecov.io/gh/apache/pulsar/pull/22604?src=pr=tree=pulsar-common%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fcommon%2Futil%2Fcollections%2FConcurrentLongHashMap.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWNvbW1vbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbW1vbi91dGlsL2NvbGxlY3Rpb25zL0NvbmN1cnJlbnRMb25nSGFzaE1hcC5qYXZh)
 | `90.40% <100.00%> (+1.51%)` | :arrow_up: |
   | 
[...til/collections/ConcurrentLongLongPairHashMap.java](https://app.codecov.io/gh/apache/pulsar/pull/22604?src=pr=tree=pulsar-common%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fcommon%2Futil%2Fcollections%2FConcurrentLongLongPairHashMap.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWNvbW1vbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbW1vbi91dGlsL2NvbGxlY3Rpb25zL0NvbmN1cnJlbnRMb25nTG9uZ1BhaXJIYXNoTWFwLmphdmE=)
 | `89.23% <100.00%> (+0.71%)` | :arrow_up: |
   | 
[...common/util/collections/ConcurrentOpenHashMap.java](https://app.codecov.io/gh/apache/pulsar/pull/22604?src=pr=tree=pulsar-common%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fcommon%2Futil%2Fcollections%2FConcurrentOpenHashMap.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWNvbW1vbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbW1vbi91dGlsL2NvbGxlY3Rpb25zL0NvbmN1cnJlbnRPcGVuSGFzaE1hcC5qYXZh)
 | `94.59% <100.00%> (-0.30%)` | :arrow_down: |
   | 

[PR] [fix] Fix concurrent containers concurrency issue [pulsar]

2024-04-26 Thread via GitHub


dao-jun opened a new pull request, #22604:
URL: https://github.com/apache/pulsar/pull/22604

   Fixes https://github.com/apache/pulsar/issues/22603
   
   
   ### Motivation
   
   Fix concurrent containers concurrency issue
   
   ### Modifications
   
   
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] [Bug] Pulsar concurrent containers has concurrency issues [pulsar]

2024-04-26 Thread via GitHub


dao-jun opened a new issue, #22603:
URL: https://github.com/apache/pulsar/issues/22603

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [X] I understand that unsupported versions don't get bug fixes. I will 
attempt to reproduce the issue on a supported version of Pulsar client and 
Pulsar broker.
   
   
   ### Version
   
   master branch
   
   ### Minimal reproduce step
   
   See: https://github.com/apache/bookkeeper/issues/4318
   
   `ConcurrentLongHashMap`, `ConcurrentLongLongPairHashMap`, 
`ConcurrentLongPairSet`, `ConcurrentOpenHashMap`, `ConcurrentOpenHashSet` under 
`org.apache.pulsar.common.util.collections` package are invovled.
   
   ### What did you expect to see?
   
   NONE
   
   ### What did you see instead?
   
   NONE
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [improve][broker] perf: Reduce stickyHash calculations of non-persistent topics in SHARED subscriptions (#22536)

2024-04-26 Thread ayegorov
This is an automated email from the ASF dual-hosted git repository.

ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new bf5d6aac1b6 [improve][broker] perf: Reduce stickyHash calculations of 
non-persistent topics in SHARED subscriptions (#22536)
bf5d6aac1b6 is described below

commit bf5d6aac1b62d195c544a486bcefec676948a3a4
Author: Andrey Yegorov <8622884+dl...@users.noreply.github.com>
AuthorDate: Fri Apr 26 13:22:19 2024 -0700

[improve][broker] perf: Reduce stickyHash calculations of non-persistent 
topics in SHARED subscriptions (#22536)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 21 ---
 ...istentStickyKeyDispatcherMultipleConsumers.java | 30 +-
 ...ntStickyKeyDispatcherMultipleConsumersTest.java |  8 +++---
 3 files changed, 45 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 6b2028095e2..b1c3687b3a0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -286,16 +286,29 @@ public class Consumer {
 totalChunkedMessages, redeliveryTracker, 
DEFAULT_CONSUMER_EPOCH);
 }
 
+public Future sendMessages(final List entries, 
EntryBatchSizes batchSizes,
+ EntryBatchIndexesAcks batchIndexesAcks,
+ int totalMessages, long totalBytes, long 
totalChunkedMessages,
+ RedeliveryTracker redeliveryTracker, long 
epoch) {
+return sendMessages(entries, null, batchSizes, batchIndexesAcks, 
totalMessages, totalBytes,
+totalChunkedMessages, redeliveryTracker, epoch);
+}
+
 /**
  * Dispatch a list of entries to the consumer. 
  * It is also responsible to release entries data and recycle entries 
object.
  *
  * @return a SendMessageInfo object that contains the detail of what was 
sent to consumer
  */
-public Future sendMessages(final List entries, 
EntryBatchSizes batchSizes,
+public Future sendMessages(final List entries,
+ final List stickyKeyHashes,
+ EntryBatchSizes batchSizes,
  EntryBatchIndexesAcks batchIndexesAcks,
- int totalMessages, long totalBytes, long 
totalChunkedMessages,
- RedeliveryTracker redeliveryTracker, long 
epoch) {
+ int totalMessages,
+ long totalBytes,
+ long totalChunkedMessages,
+ RedeliveryTracker redeliveryTracker,
+ long epoch) {
 this.lastConsumedTimestamp = System.currentTimeMillis();
 
 if (entries.isEmpty() || totalMessages == 0) {
@@ -323,7 +336,7 @@ public class Consumer {
 // because this consumer is possible to disconnect at this 
time.
 if (pendingAcks != null) {
 int batchSize = batchSizes.getBatchSize(i);
-int stickyKeyHash = getStickyKeyHash(entry);
+int stickyKeyHash = stickyKeyHashes == null ? 
getStickyKeyHash(entry) : stickyKeyHashes.get(i);
 long[] ackSet = batchIndexesAcks == null ? null : 
batchIndexesAcks.getAckSet(i);
 if (ackSet != null) {
 unackedMessages -= (batchSize - 
BitSet.valueOf(ackSet).cardinality());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 2cad253f96e..fb7bd22de94 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -126,6 +126,14 @@ public class 
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
 }
 };
 
+private static final FastThreadLocal>> 
localGroupedStickyKeyHashes =
+new FastThreadLocal>>() {
+@Override
+protected Map> initialValue() throws 
Exception {
+return new HashMap<>();
+}
+};
+
 @Override
 public void sendMessages(List entries) {
 if (entries.isEmpty()) {
@@ 

Re: [PR] [improve][broker] perf: Reduce stickyHash calculations of non-persistent topics in SHARED subscriptions [pulsar]

2024-04-26 Thread via GitHub


dlg99 merged PR #22536:
URL: https://github.com/apache/pulsar/pull/22536


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] Don't use forkjoin pool by default for deleting partitioned topics [pulsar]

2024-04-26 Thread via GitHub


lhotari merged PR #22598:
URL: https://github.com/apache/pulsar/pull/22598


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [improve][broker] Don't use forkjoin pool by default for deleting partitioned topics (#22598)

2024-04-26 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 8323a3c4991 [improve][broker] Don't use forkjoin pool by default for 
deleting partitioned topics (#22598)
8323a3c4991 is described below

commit 8323a3c49912976aee723787fa67bee4d7d8d846
Author: Lari Hotari 
AuthorDate: Fri Apr 26 23:17:51 2024 +0300

[improve][broker] Don't use forkjoin pool by default for deleting 
partitioned topics (#22598)
---
 .../apache/pulsar/broker/resources/NamespaceResources.java | 14 +++---
 .../apache/pulsar/broker/resources/PulsarResources.java| 12 ++--
 .../main/java/org/apache/pulsar/broker/PulsarService.java  |  2 +-
 3 files changed, 22 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 1ba353dccaa..975b23192f9 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -54,10 +56,14 @@ public class NamespaceResources extends 
BaseResources {
 private static final String NAMESPACE_BASE_PATH = "/namespace";
 
 public NamespaceResources(MetadataStore configurationStore, int 
operationTimeoutSec) {
+this(configurationStore, operationTimeoutSec, 
ForkJoinPool.commonPool());
+}
+
+public NamespaceResources(MetadataStore configurationStore, int 
operationTimeoutSec, Executor executor) {
 super(configurationStore, Policies.class, operationTimeoutSec);
 this.configurationStore = configurationStore;
 isolationPolicies = new IsolationPolicyResources(configurationStore, 
operationTimeoutSec);
-partitionedTopicResources = new 
PartitionedTopicResources(configurationStore, operationTimeoutSec);
+partitionedTopicResources = new 
PartitionedTopicResources(configurationStore, operationTimeoutSec, executor);
 }
 
 public CompletableFuture> listNamespacesAsync(String tenant) {
@@ -234,9 +240,11 @@ public class NamespaceResources extends 
BaseResources {
 
 public static class PartitionedTopicResources extends 
BaseResources {
 private static final String PARTITIONED_TOPIC_PATH = 
"/admin/partitioned-topics";
+private final Executor executor;
 
-public PartitionedTopicResources(MetadataStore configurationStore, int 
operationTimeoutSec) {
+public PartitionedTopicResources(MetadataStore configurationStore, int 
operationTimeoutSec, Executor executor) {
 super(configurationStore, PartitionedTopicMetadata.class, 
operationTimeoutSec);
+this.executor = executor;
 }
 
 public CompletableFuture updatePartitionedTopicAsync(TopicName 
tn, Function {
 future.complete(deleteResult);
 }
 });
-});
+}, executor);
 
 return future;
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index fe7ffe0bc7b..cc64eeb52f6 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.broker.resources;
 
 import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
 import lombok.Getter;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -57,13 +59,19 @@ public class PulsarResources {
 public PulsarResources(MetadataStore localMetadataStore, MetadataStore 
configurationMetadataStore) {
 this(localMetadataStore, configurationMetadataStore, 
DEFAULT_OPERATION_TIMEOUT_SEC);
 }
+
+public PulsarResources(MetadataStore localMetadataStore, MetadataStore 
configurationMetadataStore,
+   int operationTimeoutSec) {
+this(localMetadataStore, configurationMetadataStore, 
operationTimeoutSec, ForkJoinPool.commonPool());
+}
+
 public PulsarResources(MetadataStore localMetadataStore, MetadataStore 

(pulsar) branch master updated: [improve][broker] Close TopicPoliciesService to allow Pulsar broker graceful shutdown (#22589)

2024-04-26 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 7a44c801f86 [improve][broker] Close TopicPoliciesService to allow 
Pulsar broker graceful shutdown (#22589)
7a44c801f86 is described below

commit 7a44c801f86c4276533b0f008e768fb8deba4abc
Author: Lari Hotari 
AuthorDate: Fri Apr 26 23:17:18 2024 +0300

[improve][broker] Close TopicPoliciesService to allow Pulsar broker 
graceful shutdown (#22589)
---
 .../org/apache/pulsar/broker/PulsarService.java|  5 ++
 .../SystemTopicBasedTopicPoliciesService.java  | 63 +++---
 .../SystemTopicTxnBufferSnapshotService.java   | 20 ++-
 .../broker/service/TopicPoliciesService.java   |  7 ++-
 4 files changed, 87 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index c21c7dc771e..51dffc20d07 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -565,6 +565,11 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 transactionBufferClient.close();
 }
 
+if (topicPoliciesService != null) {
+topicPoliciesService.close();
+topicPoliciesService = null;
+}
+
 if (client != null) {
 client.close();
 client = null;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 0449e5c885c..6d18d6d61b0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -32,6 +32,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nonnull;
 import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
@@ -72,6 +73,7 @@ public class SystemTopicBasedTopicPoliciesService implements 
TopicPoliciesServic
 private final PulsarService pulsarService;
 private final HashSet localCluster;
 private final String clusterName;
+private final AtomicBoolean closed = new AtomicBoolean(false);
 
 private final ConcurrentInitializer
 namespaceEventsSystemTopicFactoryLazyInitializer = new 
LazyInitializer<>() {
@@ -110,12 +112,18 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
 this.writerCaches = Caffeine.newBuilder()
 .expireAfterAccess(5, TimeUnit.MINUTES)
 .removalListener((namespaceName, writer, cause) -> {
-((SystemTopicClient.Writer) 
writer).closeAsync().exceptionally(ex -> {
-log.error("[{}] Close writer error.", namespaceName, 
ex);
-return null;
-});
+try {
+((SystemTopicClient.Writer) writer).close();
+} catch (Exception e) {
+log.error("[{}] Close writer error.", namespaceName, 
e);
+}
 })
+.executor(pulsarService.getExecutor())
 .buildAsync((namespaceName, executor) -> {
+if (closed.get()) {
+return CompletableFuture.failedFuture(
+new 
BrokerServiceException(getClass().getName() + " is closed."));
+}
 SystemTopicClient systemTopicClient = 
getNamespaceEventsSystemTopicFactory()
 
.createTopicPoliciesSystemTopicClient(namespaceName);
 return systemTopicClient.newWriterAsync();
@@ -382,6 +390,10 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
 
 protected CompletableFuture> 
createSystemTopicClient(
 NamespaceName namespace) {
+if (closed.get()) {
+return CompletableFuture.failedFuture(
+new BrokerServiceException(getClass().getName() + " is 
closed."));
+}
 try {
 createSystemTopicFactoryIfNeeded();
 } catch (PulsarServerException ex) {
@@ -430,6 +442,11 @@ public class SystemTopicBasedTopicPoliciesService 
implements 

Re: [PR] [improve][broker] Close TopicPoliciesService to allow Pulsar broker graceful shutdown [pulsar]

2024-04-26 Thread via GitHub


lhotari merged PR #22589:
URL: https://github.com/apache/pulsar/pull/22589


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [fix][broker] Continue closing even when executor is shut down (#22599)

2024-04-26 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new f8f256cfbdc [fix][broker] Continue closing even when executor is shut 
down (#22599)
f8f256cfbdc is described below

commit f8f256cfbdcd780c81442dc5566b6ed071141645
Author: Lari Hotari 
AuthorDate: Fri Apr 26 23:16:54 2024 +0300

[fix][broker] Continue closing even when executor is shut down (#22599)
---
 .../pulsar/broker/service/persistent/PersistentTopic.java  | 10 +-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c7d762d595c..155b6777882 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -45,6 +45,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -1429,7 +1430,14 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 }
 FutureUtil.waitForAll(futures).thenRunAsync(() -> {
 closeClientFuture.complete(null);
-}, getOrderedExecutor()).exceptionally(ex -> {
+}, command -> {
+try {
+getOrderedExecutor().execute(command);
+} catch (RejectedExecutionException e) {
+// executor has been shut down, execute in current 
thread
+command.run();
+}
+}).exceptionally(ex -> {
 log.error("[{}] Error closing clients", topic, ex);
 unfenceTopicToResume();
 closeClientFuture.completeExceptionally(ex);



Re: [PR] [fix][broker] Continue closing even when executor is shut down [pulsar]

2024-04-26 Thread via GitHub


lhotari merged PR #22599:
URL: https://github.com/apache/pulsar/pull/22599


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][misc] Upgrade slf4j to 2.0.13 [pulsar]

2024-04-26 Thread via GitHub


nodece commented on PR #22391:
URL: https://github.com/apache/pulsar/pull/22391#issuecomment-2079804762

   Could you provide the log4j/log4j2 and slf4j versions? 
   I can help you solve the issues you are facing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [D] What is the intended result of creating a cluster via PUT /admin/v2/clusters/{cluster-name}? [pulsar]

2024-04-26 Thread via GitHub


GitHub user visortelle edited a comment on the discussion: What is the intended 
result of creating a cluster via PUT /admin/v2/clusters/{cluster-name}?

@dameiss-tibco hi 

You'll need to use these cluster names in further operations. For example, the 
"create tenant" operation requires at least one allowed cluster. The "create 
namespace" operation also requires clusters when you have at least two clusters 
in your Pulsar instance.

> with completely bogus values for brokerServiceUrl and serviceUrl, and the 
> cluster is created. But it is obviously not useable since the URLs are invalid

https://github.com/apache/pulsar/assets/9302460/662547af-26c7-4061-98dc-0ed85e096da6;>

>From the screenshot above you can see that there is some URL validation. At 
>the same time, you're right that the cluster you register may be unavailable 
>at the moment of its registration in another cluster.

In practice, it doesn't introduce any problems.

GitHub link: 
https://github.com/apache/pulsar/discussions/22602#discussioncomment-9239830


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] What is the intended result of creating a cluster via PUT /admin/v2/clusters/{cluster-name}? [pulsar]

2024-04-26 Thread via GitHub


GitHub user visortelle edited a comment on the discussion: What is the intended 
result of creating a cluster via PUT /admin/v2/clusters/{cluster-name}?

@dameiss-tibco hi 

You'll need to use these cluster names in further operations. For example, the 
"create tenant" operation requires at least one allowed cluster. The "create 
namespace" operation also requires clusters when you have at least two clusters 
in your Pulsar instance.

> with completely bogus values for brokerServiceUrl and serviceUrl, and the 
> cluster is created. But it is obviously not useable since the URLs are invalid

https://github.com/apache/pulsar/assets/9302460/662547af-26c7-4061-98dc-0ed85e096da6;>

>From the screenshot above you can see that there is some URL validation. At 
>the same time, you're right that the cluster you register may be unavailable 
>at the moment of its registration in another cluster.

In practice, it doesn't introduce any problems. It seems a completely valid 
behavior to me for a distributed system that should continue to work where some 
component unexpectedly become unavailable, e.g. one of the clusters is down.

Example unrelated to Pulsar. Imagine two services A and B where each should 
know about another to work properly. If you make the availability check on 
service start, you're in trouble. If on starting service A you'd check that 
service B is available, and service B would check that service A is available, 
they both wouldn't be able to start due to a cyclic dependency.

You can consider the cluster registration a similar case to the example above, 
but the configuration is dynamic.

GitHub link: 
https://github.com/apache/pulsar/discussions/22602#discussioncomment-9239830


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] What is the intended result of creating a cluster via PUT /admin/v2/clusters/{cluster-name}? [pulsar]

2024-04-26 Thread via GitHub


GitHub user visortelle edited a comment on the discussion: What is the intended 
result of creating a cluster via PUT /admin/v2/clusters/{cluster-name}?

@dameiss-tibco hi 

You'll need to use these cluster names in further operations. For example, the 
"create tenant" operation requires at least one allowed cluster. The "create 
namespace" operation also requires clusters when you have at least two clusters 
in your Pulsar instance.

> with completely bogus values for brokerServiceUrl and serviceUrl, and the 
> cluster is created. But it is obviously not useable since the URLs are invalid

https://github.com/apache/pulsar/assets/9302460/662547af-26c7-4061-98dc-0ed85e096da6;>

>From the screenshot above you can see that there is some URL validation. At 
>the same time, you're right that the cluster you register may be unavailable 
>at the moment of its registration in another cluster.

In practice, it doesn't introduce any problems and seems a completely valid 
behavior to me.

Example unrelated to Pulsar. Imagine two services A and B where each should 
know about another to work properly. If you make the availability check on 
service start, you're in trouble. If on starting service A you'd check that 
service B is available, and service B would check that service A is available, 
they both wouldn't be able to start due to a cyclic dependency.

You can consider the cluster registration a similar case to the example above, 
but the configuration is dynamic.

GitHub link: 
https://github.com/apache/pulsar/discussions/22602#discussioncomment-9239830


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] What is the intended result of creating a cluster via PUT /admin/v2/clusters/{cluster-name}? [pulsar]

2024-04-26 Thread via GitHub


GitHub user visortelle edited a comment on the discussion: What is the intended 
result of creating a cluster via PUT /admin/v2/clusters/{cluster-name}?

@dameiss-tibco hi 

You'll need to use these cluster names in further operations. For example, the 
"create tenant" operation requires at least one allowed cluster. The "create 
namespace" operation also requires clusters when you have at least two clusters 
in your Pulsar instance.

> with completely bogus values for brokerServiceUrl and serviceUrl, and the 
> cluster is created. But it is obviously not useable since the URLs are invalid

https://github.com/apache/pulsar/assets/9302460/662547af-26c7-4061-98dc-0ed85e096da6;>

>From the screenshot above you can see that there is some URL validation. At 
>the same time, you're right that the cluster you register may be unavailable 
>at the moment of its registration in another cluster.

In practice, it doesn't introduce any problems and seems a completely valid 
behavior to me.

Example unrelated to Pulsar. Imagine two services A and B where each should 
know about another to work properly. If you make the availability check on 
service start, you're in trouble. If on starting service A you'd check that 
service B is available, and service B would check that service A is available, 
they both wouldn't be able to start. Such a cyclic dependency.

You can consider the cluster registration a similar case to the example above, 
but the configuration is dynamic.

GitHub link: 
https://github.com/apache/pulsar/discussions/22602#discussioncomment-9239830


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] What is the intended result of creating a cluster via PUT /admin/v2/clusters/{cluster-name}? [pulsar]

2024-04-26 Thread via GitHub


GitHub user visortelle added a comment to the discussion: What is the intended 
result of creating a cluster via PUT /admin/v2/clusters/{cluster-name}?

@dameiss-tibco hi 

You'll need to use these cluster names in further operations. For example, the 
"create tenant" operation requires at least one allowed cluster. The "create 
namespace" operation also requires clusters when you have at least two clusters 
in your Pulsar instance.

> with completely bogus values for brokerServiceUrl and serviceUrl, and the 
> cluster is created. But it is obviously not useable since the URLs are invalid

https://github.com/apache/pulsar/assets/9302460/662547af-26c7-4061-98dc-0ed85e096da6;>

>From the screenshot above you can see that there is some URL validation. At 
>the same time, you're right that the cluster you register may be unavailable 
>at the moment of its registration in another cluster.

In practice, it doesn't introduce any problems and seems a completely valid 
behavior to me.

Example unrelated to Pulsar. Imagine two services A and B where each should 
know about another to work properly. Such cyclic dependency. If on starting 
service A you'd check that service B is available, and service B would check 
that service A is available, they both wouldn't be able to start.

You can consider the cluster registration a similar case to the example above, 
but the configuration is dynamic.

GitHub link: 
https://github.com/apache/pulsar/discussions/22602#discussioncomment-9239830


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [PR] [fix][broker] One topic can be closed multiple times concurrently [pulsar]

2024-04-26 Thread via GitHub


poorbarcode commented on PR #17524:
URL: https://github.com/apache/pulsar/pull/17524#issuecomment-2079693146

   > @poorbarcode looks like OneWayReplicatorTest.testUnFenceTopicToReuse fails
   
   Sorry, I found a behavior change(before: broker tries to unfence topic to 
reuse when clos clients fail; after: this mechanism does not work), and it is 
difficulte to be fixed **gracefully**, I will try to fix it tomorrow.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][meta] Fix invalid use of drain API and race condition in closing metadata store [pulsar]

2024-04-26 Thread via GitHub


merlimat commented on code in PR #22585:
URL: https://github.com/apache/pulsar/pull/22585#discussion_r1581199791


##
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java:
##
@@ -99,7 +103,13 @@ public void close() throws Exception {
 private void flush() {
 while (!readOps.isEmpty()) {
 List ops = new ArrayList<>();
-readOps.drain(ops::add, maxOperations);
+for (int i = 0; i < maxOperations; i++) {

Review Comment:
   This one should be ok, since it's already done in a loop: `while 
(!readOps.isEmpty()) {...}`



##
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java:
##
@@ -87,9 +87,13 @@ public void close() throws Exception {
 // Fail all the pending items
 MetadataStoreException ex =
 new 
MetadataStoreException.AlreadyClosedException("Metadata store is getting 
closed");
-readOps.drain(op -> op.getFuture().completeExceptionally(ex));
-writeOps.drain(op -> op.getFuture().completeExceptionally(ex));
-
+MetadataOp op;
+while ((op = readOps.poll()) != null) {
+op.getFuture().completeExceptionally(ex);
+}
+while ((op = writeOps.poll()) != null) {
+op.getFuture().completeExceptionally(ex);
+}

Review Comment:
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [fix][doc] Fixes example code in Cluster Failover Java Client [pulsar-site]

2024-04-26 Thread via GitHub


sandeep-mst opened a new pull request, #891:
URL: https://github.com/apache/pulsar-site/pull/891

   Cluster failover Java client is available for 2.10.0 and later versions.
   This PR fixes example code shown in 
[Automatic-failover](https://pulsar.apache.org/docs/3.2.x/client-libraries-cluster-level-failover/#automatic-failover)
 and 
[Controlled-failover](https://pulsar.apache.org/docs/3.2.x/client-libraries-cluster-level-failover/#controlled-failover)
   
   It adds required ServiceUrlProvider while building PulsarClient
   `.serviceUrlProvider(failover)`
   
   ### ✅ Contribution Checklist
   
   
   
   - [x] I read the [contribution 
guide](https://pulsar.apache.org/contribute/document-contribution/)
   - [x] I updated the [versioned 
docs](https://pulsar.apache.org/contribute/document-contribution/#update-versioned-docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug] [ARM] Partially acknowledged batches are not redelivered [pulsar-client-cpp]

2024-04-26 Thread via GitHub


bph-sag commented on issue #424:
URL: 
https://github.com/apache/pulsar-client-cpp/issues/424#issuecomment-2079446323

   So, looking at the internal stats for the topic, it does appear that the 
mark-delete position indicates that the full batch is indeed acknowledged for 
our aarch32 runs.
   
   ###  Unsuccessful run, with missed events
    Log lines before termination of first consumer (last acknowledgement 
and the message associated with that acknowledgement, aarch32) 
   (Ledger 17, entry 1, 190/309)
   ```log
   2024-04-26 14:44:32.322 INFO  [3944048800] - 
 Got Message with batch 
index: 190, of batch size: 309, ledger: 17, entry: 1, full message was: [...]
   2024-04-26 14:44:32.324 INFO  [3944048800] - 
 SUCCESS ACK - 
partition: -1, ledgerId: 17, entryId: 1, batchIndex: 190, topicName: 
persistent://public/default/ConnPlugincor24546ea9837f45e49629df60696e54502e4
   ```
    Mark delete position from 
`persistent/:tenant/:namespace/:topic/internalStats?authoritative=false=false`
   ```json
   "markDeletePosition": "17:1",
   ```
   
    First message received on the 2nd consumer
   (Ledger 17, entry 2, 1/309)
   ```
   2024-04-26 14:45:03.577 INFO  [3948247200] - 
 Got Message with batch 
index: 0, of batch size: 309, ledger: 17, entry: 2, full message was: [...]
   ```
   ### Successful run, no missing messages (x86_64)
    Last messages prior to shutdown
   (Ledger 36, entry 2, 198/309)
   ```log
   2024-04-25 14:18:49.657 INFO  [3946145952] - 
 SUCCESS ACK - 
partition: -1, ledgerId: 36, entryId: 2, batchIndex: 198, topicName: 
persistent://public/default/ConnPlugincor245ae0980739d4344e98f3f3b29f0f07d8b
   2024-04-25 14:18:49.657 INFO  [3946145952] - 
 Got Message with batch 
index: 198, of batch size: 309, ledger: 36, entry: 2, full message was: [...]
   ```
   
     Mark delete position from 
`persistent/:tenant/:namespace/:topic/internalStats?authoritative=false=false`
   ```json
   
   "markDeletePosition": "36:1",
   ```
    First message received on 2nd consumer
   (Ledger 36, entry 2, 1/309)
   ```
   2024-04-25 14:19:11.523 INFO  [3948247200] - 
 Got Message with batch 
index: 0, of batch size: 309, ledger: 36, entry: 2, full message was: [...]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.1 updated: [improve][broker] Propagate cause exception in TopicBusyException when applicable (#22596)

2024-04-26 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new d00d715f487 [improve][broker] Propagate cause exception in 
TopicBusyException when applicable (#22596)
d00d715f487 is described below

commit d00d715f487c1a12d9756c0460f8648e3920903d
Author: Lari Hotari 
AuthorDate: Fri Apr 26 16:23:36 2024 +0300

[improve][broker] Propagate cause exception in TopicBusyException when 
applicable (#22596)

(cherry picked from commit 3b9602c04db5a6577e2dc2fabddbf7a6e1d1a4a2)
---
 .../java/org/apache/pulsar/broker/service/BrokerServiceException.java | 4 
 .../pulsar/broker/service/nonpersistent/NonPersistentTopic.java   | 3 ++-
 .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java  | 3 ++-
 3 files changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 831d6068e20..6abe40f811d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -146,6 +146,10 @@ public class BrokerServiceException extends Exception {
 public TopicBusyException(String msg) {
 super(msg);
 }
+
+public TopicBusyException(String msg, Throwable t) {
+super(msg, t);
+}
 }
 
 public static class TopicNotFoundException extends BrokerServiceException {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 911ea7896b3..2bea208cc6a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -472,7 +472,8 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
 }
 }).exceptionally(ex -> {
 deleteFuture.completeExceptionally(
-new TopicBusyException("Failed to close clients before 
deleting topic."));
+new TopicBusyException("Failed to close clients before 
deleting topic.",
+FutureUtil.unwrapCompletionException(ex)));
 return null;
 });
 } finally {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index ee367750ad6..a473257e191 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1442,7 +1442,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 }).exceptionally(ex->{
 unfenceTopicToResume();
 deleteFuture.completeExceptionally(
-new TopicBusyException("Failed to close clients 
before deleting topic."));
+new TopicBusyException("Failed to close clients 
before deleting topic.",
+FutureUtil.unwrapCompletionException(ex)));
 return null;
 });
 



(pulsar) branch branch-2.11 updated: [improve][broker] Propagate cause exception in TopicBusyException when applicable (#22596)

2024-04-26 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
 new 82308b95f59 [improve][broker] Propagate cause exception in 
TopicBusyException when applicable (#22596)
82308b95f59 is described below

commit 82308b95f5935a65f0098e77e143d48cf2ef2d8c
Author: Lari Hotari 
AuthorDate: Fri Apr 26 16:23:36 2024 +0300

[improve][broker] Propagate cause exception in TopicBusyException when 
applicable (#22596)

(cherry picked from commit 3b9602c04db5a6577e2dc2fabddbf7a6e1d1a4a2)
---
 .../java/org/apache/pulsar/broker/service/BrokerServiceException.java | 4 
 .../pulsar/broker/service/nonpersistent/NonPersistentTopic.java   | 3 ++-
 .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java  | 3 ++-
 3 files changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index c6d8ffabcec..04146383c1f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -132,6 +132,10 @@ public class BrokerServiceException extends Exception {
 public TopicBusyException(String msg) {
 super(msg);
 }
+
+public TopicBusyException(String msg, Throwable t) {
+super(msg, t);
+}
 }
 
 public static class TopicNotFoundException extends BrokerServiceException {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index dd7815ef98b..5e3b3d8bd39 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -455,7 +455,8 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
 }
 }).exceptionally(ex -> {
 deleteFuture.completeExceptionally(
-new TopicBusyException("Failed to close clients before 
deleting topic."));
+new TopicBusyException("Failed to close clients before 
deleting topic.",
+FutureUtil.unwrapCompletionException(ex)));
 return null;
 });
 } finally {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d8c1629003b..71696007049 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1311,7 +1311,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 }).exceptionally(ex->{
 unfenceTopicToResume();
 deleteFuture.completeExceptionally(
-new TopicBusyException("Failed to close clients 
before deleting topic."));
+new TopicBusyException("Failed to close clients 
before deleting topic.",
+FutureUtil.unwrapCompletionException(ex)));
 return null;
 });
 } finally {



(pulsar) branch branch-3.2 updated: [improve][broker] Propagate cause exception in TopicBusyException when applicable (#22596)

2024-04-26 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 7cc99dfff63 [improve][broker] Propagate cause exception in 
TopicBusyException when applicable (#22596)
7cc99dfff63 is described below

commit 7cc99dfff6397108890e671df52404645e1cbdd3
Author: Lari Hotari 
AuthorDate: Fri Apr 26 16:23:36 2024 +0300

[improve][broker] Propagate cause exception in TopicBusyException when 
applicable (#22596)

(cherry picked from commit 3b9602c04db5a6577e2dc2fabddbf7a6e1d1a4a2)
---
 .../java/org/apache/pulsar/broker/service/BrokerServiceException.java | 4 
 .../pulsar/broker/service/nonpersistent/NonPersistentTopic.java   | 3 ++-
 .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java  | 3 ++-
 3 files changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 831d6068e20..6abe40f811d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -146,6 +146,10 @@ public class BrokerServiceException extends Exception {
 public TopicBusyException(String msg) {
 super(msg);
 }
+
+public TopicBusyException(String msg, Throwable t) {
+super(msg, t);
+}
 }
 
 public static class TopicNotFoundException extends BrokerServiceException {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 5c75ec28c50..b0c6443332b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -477,7 +477,8 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
 }
 }).exceptionally(ex -> {
 deleteFuture.completeExceptionally(
-new TopicBusyException("Failed to close clients before 
deleting topic."));
+new TopicBusyException("Failed to close clients before 
deleting topic.",
+FutureUtil.unwrapCompletionException(ex)));
 return null;
 });
 } finally {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a76ac548e0c..3a1da8ccee7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1470,7 +1470,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 }).exceptionally(ex->{
 unfenceTopicToResume();
 deleteFuture.completeExceptionally(
-new TopicBusyException("Failed to close clients 
before deleting topic."));
+new TopicBusyException("Failed to close clients 
before deleting topic.",
+FutureUtil.unwrapCompletionException(ex)));
 return null;
 });
 



(pulsar) branch branch-3.0 updated: [improve][broker] Propagate cause exception in TopicBusyException when applicable (#22596)

2024-04-26 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 6e0d0d98b29 [improve][broker] Propagate cause exception in 
TopicBusyException when applicable (#22596)
6e0d0d98b29 is described below

commit 6e0d0d98b29f7cfff42464d104987bc5408c56a5
Author: Lari Hotari 
AuthorDate: Fri Apr 26 16:23:36 2024 +0300

[improve][broker] Propagate cause exception in TopicBusyException when 
applicable (#22596)

(cherry picked from commit 3b9602c04db5a6577e2dc2fabddbf7a6e1d1a4a2)
---
 .../java/org/apache/pulsar/broker/service/BrokerServiceException.java | 4 
 .../pulsar/broker/service/nonpersistent/NonPersistentTopic.java   | 3 ++-
 .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java  | 3 ++-
 3 files changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 831d6068e20..6abe40f811d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -146,6 +146,10 @@ public class BrokerServiceException extends Exception {
 public TopicBusyException(String msg) {
 super(msg);
 }
+
+public TopicBusyException(String msg, Throwable t) {
+super(msg, t);
+}
 }
 
 public static class TopicNotFoundException extends BrokerServiceException {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 4efaee2a1fa..a994c6336ad 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -475,7 +475,8 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
 }
 }).exceptionally(ex -> {
 deleteFuture.completeExceptionally(
-new TopicBusyException("Failed to close clients before 
deleting topic."));
+new TopicBusyException("Failed to close clients before 
deleting topic.",
+FutureUtil.unwrapCompletionException(ex)));
 return null;
 });
 } finally {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 85fdc182ccc..6a97ac92625 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1439,7 +1439,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 }).exceptionally(ex->{
 unfenceTopicToResume();
 deleteFuture.completeExceptionally(
-new TopicBusyException("Failed to close clients 
before deleting topic."));
+new TopicBusyException("Failed to close clients 
before deleting topic.",
+FutureUtil.unwrapCompletionException(ex)));
 return null;
 });
 



(pulsar) branch master updated (3b9602c04db -> f411e3c0f26)

2024-04-26 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 3b9602c04db [improve][broker] Propagate cause exception in 
TopicBusyException when applicable (#22596)
 add f411e3c0f26 [fix][broker] Avoid being stuck when closing the broker 
with extensible load manager (#22573)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/pulsar/broker/PulsarService.java|   3 +
 .../store/TableViewLoadDataStoreImpl.java  |   6 +-
 .../pulsar/broker/service/BrokerService.java   |  11 +++
 .../extensions/ExtensibleLoadManagerCloseTest.java | 107 +
 4 files changed, 122 insertions(+), 5 deletions(-)
 create mode 100644 
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java



Re: [I] [Bug] Broker could take 30+ seconds to close with extensible load manager [pulsar]

2024-04-26 Thread via GitHub


lhotari closed issue #22569: [Bug] Broker could take 30+ seconds to close with 
extensible load manager
URL: https://github.com/apache/pulsar/issues/22569


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Avoid being stuck when closing the broker with extensible load manager [pulsar]

2024-04-26 Thread via GitHub


lhotari merged PR #22573:
URL: https://github.com/apache/pulsar/pull/22573


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] Close TopicPoliciesService to allow Pulsar broker graceful shutdown [pulsar]

2024-04-26 Thread via GitHub


lhotari commented on code in PR #22589:
URL: https://github.com/apache/pulsar/pull/22589#discussion_r1581034618


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##
@@ -110,11 +111,13 @@ public SystemTopicBasedTopicPoliciesService(PulsarService 
pulsarService) {
 this.writerCaches = Caffeine.newBuilder()
 .expireAfterAccess(5, TimeUnit.MINUTES)
 .removalListener((namespaceName, writer, cause) -> {
-((SystemTopicClient.Writer) 
writer).closeAsync().exceptionally(ex -> {
-log.error("[{}] Close writer error.", namespaceName, 
ex);
-return null;
-});
+try {
+((SystemTopicClient.Writer) writer).close();
+} catch (IOException e) {

Review Comment:
   done



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java:
##
@@ -142,8 +143,26 @@ private SystemTopicClient 
getTransactionBufferSystemTopicClient(NamespaceName
 
 public void close() throws Exception {
 for (Map.Entry> entry : 
clients.entrySet()) {
-entry.getValue().close();
+try {
+entry.getValue().close();
+} catch (Exception e) {
+log.error("Failed to close system topic client for namespace 
{}", entry.getKey(), e);
+}
+}
+clients.clear();
+for (Map.Entry> entry : 
refCountedWriterMap.entrySet()) {
+CompletableFuture> future = 
entry.getValue().getFuture();
+if (!future.isCompletedExceptionally()) {
+future.thenAccept(writer -> {
+try {
+writer.close();
+} catch (IOException e) {

Review Comment:
   done



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##
@@ -740,4 +743,21 @@ protected AsyncLoadingCache
 }
 
 private static final Logger log = 
LoggerFactory.getLogger(SystemTopicBasedTopicPoliciesService.class);
+
+@Override
+public void close() throws Exception {
+writerCaches.synchronous().invalidateAll();
+readerCaches.values().forEach(future -> {
+if (future != null && !future.isCompletedExceptionally()) {
+future.thenAccept(reader -> {
+try {
+reader.close();
+} catch (IOException e) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated (69839c72f13 -> 3b9602c04db)

2024-04-26 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 69839c72f13 [improve][meta] Log a warning when ZK batch fails with 
connectionloss (#22566)
 add 3b9602c04db [improve][broker] Propagate cause exception in 
TopicBusyException when applicable (#22596)

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/pulsar/broker/service/BrokerServiceException.java | 4 
 .../pulsar/broker/service/nonpersistent/NonPersistentTopic.java   | 3 ++-
 .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java  | 3 ++-
 3 files changed, 8 insertions(+), 2 deletions(-)



Re: [PR] [improve][broker] Propagate cause exception in TopicBusyException when applicable [pulsar]

2024-04-26 Thread via GitHub


lhotari merged PR #22596:
URL: https://github.com/apache/pulsar/pull/22596


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.2 updated: [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566)

2024-04-26 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new b764afa2bb3 [improve][meta] Log a warning when ZK batch fails with 
connectionloss (#22566)
b764afa2bb3 is described below

commit b764afa2bb367e20849c1297b282beb5d746b474
Author: Lari Hotari 
AuthorDate: Fri Apr 26 15:16:47 2024 +0300

[improve][meta] Log a warning when ZK batch fails with connectionloss 
(#22566)

(cherry picked from commit 69839c72f1375d141b56734bc5e041c13e366c57)
---
 .../org/apache/pulsar/metadata/impl/ZKMetadataStore.java  | 15 ++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 079ae3e2ae5..2e88cb33324 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -192,7 +192,20 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
 Code code = Code.get(rc);
 if (code == Code.CONNECTIONLOSS) {
 // There is the chance that we caused a connection 
reset by sending or requesting a batch
-// that passed the max ZK limit. Retry with the 
individual operations
+// that passed the max ZK limit.
+
+// Build the log warning message
+// summarize the operations by type
+String countsByType = ops.stream().collect(
+
Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1)))
+.entrySet().stream().map(e -> e.getValue() + " 
" + e.getKey().name() + " entries")
+.collect(Collectors.joining(", "));
+Long totalSize = 
ops.stream().collect(Collectors.summingLong(MetadataOp::size));
+log.warn("Connection loss while executing batch 
operation of {} "
++ "of total data size of {}. "
++ "Retrying individual operations 
one-by-one.", countsByType, totalSize);
+
+// Retry with the individual operations
 executor.schedule(() -> {
 ops.forEach(o -> 
batchOperation(Collections.singletonList(o)));
 }, 100, TimeUnit.MILLISECONDS);



(pulsar) branch branch-3.1 updated: [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566)

2024-04-26 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new 701d4cfe21b [improve][meta] Log a warning when ZK batch fails with 
connectionloss (#22566)
701d4cfe21b is described below

commit 701d4cfe21b8bc291d8530f7510ccdaa6c6280f0
Author: Lari Hotari 
AuthorDate: Fri Apr 26 15:16:47 2024 +0300

[improve][meta] Log a warning when ZK batch fails with connectionloss 
(#22566)

(cherry picked from commit 69839c72f1375d141b56734bc5e041c13e366c57)
---
 .../org/apache/pulsar/metadata/impl/ZKMetadataStore.java  | 15 ++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 079ae3e2ae5..2e88cb33324 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -192,7 +192,20 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
 Code code = Code.get(rc);
 if (code == Code.CONNECTIONLOSS) {
 // There is the chance that we caused a connection 
reset by sending or requesting a batch
-// that passed the max ZK limit. Retry with the 
individual operations
+// that passed the max ZK limit.
+
+// Build the log warning message
+// summarize the operations by type
+String countsByType = ops.stream().collect(
+
Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1)))
+.entrySet().stream().map(e -> e.getValue() + " 
" + e.getKey().name() + " entries")
+.collect(Collectors.joining(", "));
+Long totalSize = 
ops.stream().collect(Collectors.summingLong(MetadataOp::size));
+log.warn("Connection loss while executing batch 
operation of {} "
++ "of total data size of {}. "
++ "Retrying individual operations 
one-by-one.", countsByType, totalSize);
+
+// Retry with the individual operations
 executor.schedule(() -> {
 ops.forEach(o -> 
batchOperation(Collections.singletonList(o)));
 }, 100, TimeUnit.MILLISECONDS);



(pulsar) branch branch-2.11 updated: [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566)

2024-04-26 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
 new 66a1edb64b9 [improve][meta] Log a warning when ZK batch fails with 
connectionloss (#22566)
66a1edb64b9 is described below

commit 66a1edb64b96ecac39d1191536520b11149da0dc
Author: Lari Hotari 
AuthorDate: Fri Apr 26 15:16:47 2024 +0300

[improve][meta] Log a warning when ZK batch fails with connectionloss 
(#22566)

(cherry picked from commit 69839c72f1375d141b56734bc5e041c13e366c57)
---
 .../org/apache/pulsar/metadata/impl/ZKMetadataStore.java  | 15 ++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 2dbe0a7c3f3..64148d898fc 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -188,7 +188,20 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
 Code code = Code.get(rc);
 if (code == Code.CONNECTIONLOSS) {
 // There is the chance that we caused a connection 
reset by sending or requesting a batch
-// that passed the max ZK limit. Retry with the 
individual operations
+// that passed the max ZK limit.
+
+// Build the log warning message
+// summarize the operations by type
+String countsByType = ops.stream().collect(
+
Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1)))
+.entrySet().stream().map(e -> e.getValue() + " 
" + e.getKey().name() + " entries")
+.collect(Collectors.joining(", "));
+Long totalSize = 
ops.stream().collect(Collectors.summingLong(MetadataOp::size));
+log.warn("Connection loss while executing batch 
operation of {} "
++ "of total data size of {}. "
++ "Retrying individual operations 
one-by-one.", countsByType, totalSize);
+
+// Retry with the individual operations
 executor.schedule(() -> {
 ops.forEach(o -> 
batchOperation(Collections.singletonList(o)));
 }, 100, TimeUnit.MILLISECONDS);



(pulsar) branch branch-3.0 updated: [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566)

2024-04-26 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new ec646f90c48 [improve][meta] Log a warning when ZK batch fails with 
connectionloss (#22566)
ec646f90c48 is described below

commit ec646f90c48d9b2664c5d3484d1beab726fdad20
Author: Lari Hotari 
AuthorDate: Fri Apr 26 15:16:47 2024 +0300

[improve][meta] Log a warning when ZK batch fails with connectionloss 
(#22566)

(cherry picked from commit 69839c72f1375d141b56734bc5e041c13e366c57)
---
 .../org/apache/pulsar/metadata/impl/ZKMetadataStore.java  | 15 ++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 079ae3e2ae5..2e88cb33324 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -192,7 +192,20 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
 Code code = Code.get(rc);
 if (code == Code.CONNECTIONLOSS) {
 // There is the chance that we caused a connection 
reset by sending or requesting a batch
-// that passed the max ZK limit. Retry with the 
individual operations
+// that passed the max ZK limit.
+
+// Build the log warning message
+// summarize the operations by type
+String countsByType = ops.stream().collect(
+
Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1)))
+.entrySet().stream().map(e -> e.getValue() + " 
" + e.getKey().name() + " entries")
+.collect(Collectors.joining(", "));
+Long totalSize = 
ops.stream().collect(Collectors.summingLong(MetadataOp::size));
+log.warn("Connection loss while executing batch 
operation of {} "
++ "of total data size of {}. "
++ "Retrying individual operations 
one-by-one.", countsByType, totalSize);
+
+// Retry with the individual operations
 executor.schedule(() -> {
 ops.forEach(o -> 
batchOperation(Collections.singletonList(o)));
 }, 100, TimeUnit.MILLISECONDS);



(pulsar) branch master updated: [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566)

2024-04-26 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 69839c72f13 [improve][meta] Log a warning when ZK batch fails with 
connectionloss (#22566)
69839c72f13 is described below

commit 69839c72f1375d141b56734bc5e041c13e366c57
Author: Lari Hotari 
AuthorDate: Fri Apr 26 15:16:47 2024 +0300

[improve][meta] Log a warning when ZK batch fails with connectionloss 
(#22566)
---
 .../org/apache/pulsar/metadata/impl/ZKMetadataStore.java  | 15 ++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 079ae3e2ae5..2e88cb33324 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -192,7 +192,20 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
 Code code = Code.get(rc);
 if (code == Code.CONNECTIONLOSS) {
 // There is the chance that we caused a connection 
reset by sending or requesting a batch
-// that passed the max ZK limit. Retry with the 
individual operations
+// that passed the max ZK limit.
+
+// Build the log warning message
+// summarize the operations by type
+String countsByType = ops.stream().collect(
+
Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1)))
+.entrySet().stream().map(e -> e.getValue() + " 
" + e.getKey().name() + " entries")
+.collect(Collectors.joining(", "));
+Long totalSize = 
ops.stream().collect(Collectors.summingLong(MetadataOp::size));
+log.warn("Connection loss while executing batch 
operation of {} "
++ "of total data size of {}. "
++ "Retrying individual operations 
one-by-one.", countsByType, totalSize);
+
+// Retry with the individual operations
 executor.schedule(() -> {
 ops.forEach(o -> 
batchOperation(Collections.singletonList(o)));
 }, 100, TimeUnit.MILLISECONDS);



Re: [PR] [improve][meta] Log a warning when ZK batch fails with connectionloss [pulsar]

2024-04-26 Thread via GitHub


lhotari merged PR #22566:
URL: https://github.com/apache/pulsar/pull/22566


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.2 updated: [fix][test] Clear fields in test cleanup to reduce memory consumption (#22583)

2024-04-26 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 1be23d00295 [fix][test] Clear fields in test cleanup to reduce memory 
consumption (#22583)
1be23d00295 is described below

commit 1be23d00295e9021d66bf38058a18cd2fcb38f31
Author: Lari Hotari 
AuthorDate: Thu Apr 25 21:54:55 2024 +0300

[fix][test] Clear fields in test cleanup to reduce memory consumption 
(#22583)

(cherry picked from commit 3de14c55de138770ed61d1a14cd883048ea1915c)
---
 .../pulsar/broker/MultiBrokerTestZKBaseTest.java   |  1 +
 .../apache/pulsar/broker/SLAMonitoringTest.java| 21 +++--
 .../loadbalance/LeaderElectionServiceTest.java |  5 +-
 .../broker/loadbalance/LoadBalancerTest.java   | 15 --
 .../loadbalance/SimpleLoadManagerImplTest.java | 30 +---
 .../loadbalance/extensions/BrokerRegistryTest.java | 10 +++-
 .../ExtensibleLoadManagerImplBaseTest.java |  9 +++-
 .../loadbalance/impl/BundleSplitterTaskTest.java   | 10 +++-
 .../impl/ModularLoadManagerImplTest.java   | 35 ++
 .../broker/service/AdvertisedAddressTest.java  | 10 +++-
 .../pulsar/broker/service/BkEnsemblesTestBase.java | 15 --
 .../broker/service/BrokerBookieIsolationTest.java  |  6 ++-
 .../CanReconnectZKClientPulsarServiceBaseTest.java | 24 +++---
 .../pulsar/broker/service/MaxMessageSizeTest.java  | 15 --
 .../broker/service/OneWayReplicatorTestBase.java   | 44 ++
 .../pulsar/broker/service/ReplicatorTest.java  |  8 ++--
 .../pulsar/broker/service/ReplicatorTestBase.java  | 53 --
 .../pulsar/broker/service/TopicOwnerTest.java  | 15 --
 .../coordinator/TransactionMetaStoreTestBase.java  | 17 ---
 .../client/api/ClientDeduplicationFailureTest.java | 20 ++--
 .../worker/PulsarFunctionE2ESecurityTest.java  | 25 --
 .../worker/PulsarFunctionPublishTest.java  | 25 --
 .../functions/worker/PulsarFunctionTlsTest.java|  8 +++-
 .../worker/PulsarWorkerAssignmentTest.java | 25 --
 .../apache/pulsar/io/AbstractPulsarE2ETest.java| 42 +
 .../apache/pulsar/io/PulsarFunctionAdminTest.java  | 25 --
 .../apache/pulsar/io/PulsarFunctionTlsTest.java| 20 ++--
 .../pulsar/zookeeper/ZookeeperServerTest.java  | 13 --
 .../org/apache/pulsar/metadata/TestZKServer.java   |  1 +
 29 files changed, 417 insertions(+), 130 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java
index d6a39fadec4..f07c8f8398a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java
@@ -59,6 +59,7 @@ public abstract class MultiBrokerTestZKBaseTest extends 
MultiBrokerBaseTest {
 } catch (Exception e) {
 log.error("Error in stopping ZK server", e);
 }
+testZKServer = null;
 }
 }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index 4a6524bf245..941229fc3d9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -126,15 +126,26 @@ public class SLAMonitoringTest {
 @AfterClass(alwaysRun = true)
 public void shutdown() throws Exception {
 log.info("--- Shutting down ---");
-executor.shutdownNow();
-executor = null;
+if (executor != null) {
+executor.shutdownNow();
+executor = null;
+}
 
 for (int i = 0; i < BROKER_COUNT; i++) {
-pulsarAdmins[i].close();
-pulsarServices[i].close();
+if (pulsarAdmins[i] != null) {
+pulsarAdmins[i].close();
+pulsarAdmins[i] = null;
+}
+if (pulsarServices[i] != null) {
+pulsarServices[i].close();
+pulsarServices[i] = null;
+}
 }
 
-bkEnsemble.stop();
+if (bkEnsemble != null) {
+bkEnsemble.stop();
+bkEnsemble = null;
+}
 }
 
 @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
index ded4ee8e58d..358410f1f28 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
+++ 

(pulsar) 02/02: [fix][test] Clear fields in test cleanup to reduce memory consumption (#22583)

2024-04-26 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 72833feba28740b7f02e1978edae7baf36a462bf
Author: Lari Hotari 
AuthorDate: Thu Apr 25 21:54:55 2024 +0300

[fix][test] Clear fields in test cleanup to reduce memory consumption 
(#22583)

(cherry picked from commit 3de14c55de138770ed61d1a14cd883048ea1915c)
---
 .../pulsar/broker/MultiBrokerTestZKBaseTest.java   |  1 +
 .../apache/pulsar/broker/SLAMonitoringTest.java| 21 +++--
 .../loadbalance/LeaderElectionServiceTest.java |  5 +-
 .../broker/loadbalance/LoadBalancerTest.java   | 11 -
 .../loadbalance/SimpleLoadManagerImplTest.java | 30 +---
 .../loadbalance/extensions/BrokerRegistryTest.java | 10 +++-
 .../ExtensibleLoadManagerImplBaseTest.java |  8 +++-
 .../loadbalance/impl/BundleSplitterTaskTest.java   | 10 +++-
 .../impl/ModularLoadManagerImplTest.java   | 35 ++
 .../broker/service/AdvertisedAddressTest.java  | 10 +++-
 .../pulsar/broker/service/BkEnsemblesTestBase.java | 15 --
 .../broker/service/BrokerBookieIsolationTest.java  |  6 ++-
 .../CanReconnectZKClientPulsarServiceBaseTest.java | 24 +++---
 .../pulsar/broker/service/MaxMessageSizeTest.java  | 14 +-
 .../broker/service/OneWayReplicatorTestBase.java   | 44 ++
 .../pulsar/broker/service/ReplicatorTest.java  |  8 ++--
 .../pulsar/broker/service/ReplicatorTestBase.java  | 53 --
 .../pulsar/broker/service/TopicOwnerTest.java  | 15 --
 .../coordinator/TransactionMetaStoreTestBase.java  | 17 ---
 .../client/api/ClientDeduplicationFailureTest.java | 20 ++--
 .../worker/PulsarFunctionE2ESecurityTest.java  | 25 --
 .../worker/PulsarFunctionPublishTest.java  | 25 --
 .../functions/worker/PulsarFunctionTlsTest.java|  8 +++-
 .../worker/PulsarWorkerAssignmentTest.java | 25 --
 .../apache/pulsar/io/AbstractPulsarE2ETest.java| 42 +
 .../apache/pulsar/io/PulsarFunctionAdminTest.java  | 25 --
 .../apache/pulsar/io/PulsarFunctionTlsTest.java| 20 ++--
 .../pulsar/zookeeper/ZookeeperServerTest.java  | 15 --
 .../org/apache/pulsar/metadata/TestZKServer.java   |  1 +
 29 files changed, 414 insertions(+), 129 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java
index 0cd5bce5d51..b1e608b620a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java
@@ -48,6 +48,7 @@ public abstract class MultiBrokerTestZKBaseTest extends 
MultiBrokerBaseTest {
 } catch (Exception e) {
 log.error("Error in stopping ZK server", e);
 }
+testZKServer = null;
 }
 }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index 4a6524bf245..941229fc3d9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -126,15 +126,26 @@ public class SLAMonitoringTest {
 @AfterClass(alwaysRun = true)
 public void shutdown() throws Exception {
 log.info("--- Shutting down ---");
-executor.shutdownNow();
-executor = null;
+if (executor != null) {
+executor.shutdownNow();
+executor = null;
+}
 
 for (int i = 0; i < BROKER_COUNT; i++) {
-pulsarAdmins[i].close();
-pulsarServices[i].close();
+if (pulsarAdmins[i] != null) {
+pulsarAdmins[i].close();
+pulsarAdmins[i] = null;
+}
+if (pulsarServices[i] != null) {
+pulsarServices[i].close();
+pulsarServices[i] = null;
+}
 }
 
-bkEnsemble.stop();
+if (bkEnsemble != null) {
+bkEnsemble.stop();
+bkEnsemble = null;
+}
 }
 
 @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
index ded4ee8e58d..358410f1f28 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
@@ -59,7 +59,10 @@ public class LeaderElectionServiceTest {
 
 @AfterMethod(alwaysRun = true)
 void shutdown() throws Exception {
-  

(pulsar) 01/02: [fix][test] Fix resource leak in TransactionCoordinatorClientTest (#21380)

2024-04-26 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8ecfd64a8993e93a2ff4a45b3ca32b74f7a04997
Author: Lari Hotari 
AuthorDate: Tue Oct 17 21:50:16 2023 +0300

[fix][test] Fix resource leak in TransactionCoordinatorClientTest (#21380)

(cherry picked from commit cb7c98a7f56c3115a9e73bff96b8b4daac2c8180)
---
 .../transaction/coordinator/TransactionMetaStoreTestBase.java   | 6 ++
 1 file changed, 6 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
index eb714dd848a..7a0fb48f911 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
@@ -120,6 +120,9 @@ public abstract class TransactionMetaStoreTestBase extends 
TestRetrySupport {
 
 @Override
 protected void cleanup() throws Exception {
+if (transactionCoordinatorClient != null) {
+transactionCoordinatorClient.close();
+}
 for (PulsarAdmin admin : pulsarAdmins) {
 if (admin != null) {
 admin.close();
@@ -133,6 +136,9 @@ public abstract class TransactionMetaStoreTestBase extends 
TestRetrySupport {
 service.close();
 }
 }
+if (bkEnsemble != null) {
+bkEnsemble.stop();
+}
 Mockito.reset();
 }
 }



(pulsar) branch branch-3.0 updated (5ffec8a3cd8 -> 72833feba28)

2024-04-26 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 5ffec8a3cd8 [fix][admin] Fix namespace admin api exception response 
(#22587)
 new 8ecfd64a899 [fix][test] Fix resource leak in 
TransactionCoordinatorClientTest (#21380)
 new 72833feba28 [fix][test] Clear fields in test cleanup to reduce memory 
consumption (#22583)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../pulsar/broker/MultiBrokerTestZKBaseTest.java   |  1 +
 .../apache/pulsar/broker/SLAMonitoringTest.java| 21 +++--
 .../loadbalance/LeaderElectionServiceTest.java |  5 +-
 .../broker/loadbalance/LoadBalancerTest.java   | 11 -
 .../loadbalance/SimpleLoadManagerImplTest.java | 30 +---
 .../loadbalance/extensions/BrokerRegistryTest.java | 10 +++-
 .../ExtensibleLoadManagerImplBaseTest.java |  8 +++-
 .../loadbalance/impl/BundleSplitterTaskTest.java   | 10 +++-
 .../impl/ModularLoadManagerImplTest.java   | 35 ++
 .../broker/service/AdvertisedAddressTest.java  | 10 +++-
 .../pulsar/broker/service/BkEnsemblesTestBase.java | 15 --
 .../broker/service/BrokerBookieIsolationTest.java  |  6 ++-
 .../CanReconnectZKClientPulsarServiceBaseTest.java | 24 +++---
 .../pulsar/broker/service/MaxMessageSizeTest.java  | 14 +-
 .../broker/service/OneWayReplicatorTestBase.java   | 44 ++
 .../pulsar/broker/service/ReplicatorTest.java  |  8 ++--
 .../pulsar/broker/service/ReplicatorTestBase.java  | 53 --
 .../pulsar/broker/service/TopicOwnerTest.java  | 15 --
 .../coordinator/TransactionMetaStoreTestBase.java  | 23 +++---
 .../client/api/ClientDeduplicationFailureTest.java | 20 ++--
 .../worker/PulsarFunctionE2ESecurityTest.java  | 25 --
 .../worker/PulsarFunctionPublishTest.java  | 25 --
 .../functions/worker/PulsarFunctionTlsTest.java|  8 +++-
 .../worker/PulsarWorkerAssignmentTest.java | 25 --
 .../apache/pulsar/io/AbstractPulsarE2ETest.java| 42 +
 .../apache/pulsar/io/PulsarFunctionAdminTest.java  | 25 --
 .../apache/pulsar/io/PulsarFunctionTlsTest.java| 20 ++--
 .../pulsar/zookeeper/ZookeeperServerTest.java  | 15 --
 .../org/apache/pulsar/metadata/TestZKServer.java   |  1 +
 29 files changed, 420 insertions(+), 129 deletions(-)



Re: [PR] [improve][misc] Upgrade slf4j to 2.0.13 [pulsar]

2024-04-26 Thread via GitHub


BewareMyPower commented on PR #22391:
URL: https://github.com/apache/pulsar/pull/22391#issuecomment-2079249332

   I tried adding the `reload4j` dependency and it didn't work.
   
   ```
   [INFO] \- org.slf4j:slf4j-reload4j:jar:2.0.13:test
   [INFO]\- ch.qos.reload4j:reload4j:jar:1.2.22:test
   ```
   
   ```
   SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
   SLF4J: Defaulting to no-operation (NOP) logger implementation
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][misc] Upgrade slf4j to 2.0.13 [pulsar]

2024-04-26 Thread via GitHub


BewareMyPower commented on PR #22391:
URL: https://github.com/apache/pulsar/pull/22391#issuecomment-2079238842

   It's a breaking change that affects the down
   
   In our KSN project (the private version of 
[KoP](https://github.com/streamnative/kop)), after upgrading to include this 
commit, logs cannot be printed anymore.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] One topic can be closed multiple times concurrently [pulsar]

2024-04-26 Thread via GitHub


lhotari commented on PR #17524:
URL: https://github.com/apache/pulsar/pull/17524#issuecomment-2079199183

   @poorbarcode looks like OneWayReplicatorTest.testUnFenceTopicToReuse fails


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [cleanup] [test] remove useless TestAuthorizationProvider2 (#22595)

2024-04-26 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new d19860c706e [cleanup] [test] remove useless TestAuthorizationProvider2 
(#22595)
d19860c706e is described below

commit d19860c706e47b3f2525678da5edfad3f6adafd1
Author: thetumbled <52550727+thetumb...@users.noreply.github.com>
AuthorDate: Fri Apr 26 19:21:45 2024 +0800

[cleanup] [test] remove useless TestAuthorizationProvider2 (#22595)
---
 .../api/AuthorizationProducerConsumerTest.java | 28 ++
 1 file changed, 2 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 3ead51ad7fc..2638709abc5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -350,7 +350,8 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
 } catch (Exception e) {
 // my-sub1 has no msg backlog, so expire message won't be issued 
on that subscription
 assertTrue(e.getMessage().startsWith("Expire message by timestamp 
not issued on topic"));
-}sub1Admin.topics().peekMessages(topicName, subscriptionName, 
1);
+}
+sub1Admin.topics().peekMessages(topicName, subscriptionName, 1);
 sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
 sub1Admin.topics().resetCursor(topicName, subscriptionName, 
MessageId.earliest);
 
@@ -992,31 +993,6 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
 }
 }
 
-/**
- * This provider always fails authorization on consumer and passes on 
producer
- *
- */
-public static class TestAuthorizationProvider2 extends 
TestAuthorizationProvider {
-
-@Override
-public CompletableFuture canProduceAsync(TopicName topicName, 
String role,
-AuthenticationDataSource authenticationData) {
-return CompletableFuture.completedFuture(true);
-}
-
-@Override
-public CompletableFuture canConsumeAsync(TopicName topicName, 
String role,
-AuthenticationDataSource authenticationData, String 
subscription) {
-return CompletableFuture.completedFuture(false);
-}
-
-@Override
-public CompletableFuture canLookupAsync(TopicName topicName, 
String role,
-AuthenticationDataSource authenticationData) {
-return CompletableFuture.completedFuture(true);
-}
-}
-
 public static class TestAuthorizationProviderWithSubscriptionPrefix 
extends TestAuthorizationProvider {
 @Override
 public CompletableFuture allowTopicOperationAsync(TopicName 
topic,



Re: [PR] [cleanup] [test] remove useless TestAuthorizationProvider2 [pulsar]

2024-04-26 Thread via GitHub


lhotari merged PR #22595:
URL: https://github.com/apache/pulsar/pull/22595


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [cleanup] [test] remove useless TestAuthorizationProvider2 [pulsar]

2024-04-26 Thread via GitHub


thetumbled commented on PR #22595:
URL: https://github.com/apache/pulsar/pull/22595#issuecomment-2079048232

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] [Bug] parseMessageMetadata error when broker entry metadata enable with high loading [pulsar]

2024-04-26 Thread via GitHub


semistone opened a new issue, #22601:
URL: https://github.com/apache/pulsar/issues/22601

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [X] I understand that unsupported versions don't get bug fixes. I will 
attempt to reproduce the issue on a supported version of Pulsar client and 
Pulsar broker.
   
   
   ### Version
   
   - 3.2.2
   - 3.1.2
   
   ### Minimal reproduce step
   
   publish event in about 6k QPS and 100Mbits/sec
   with metaData
   BatcherBuilder.KEY_BASED mode
   and producer and send message by high concurrent/parallel producer process.
   it happens only in almost real time consumer (almost zero backlog)
   
   ### What did you expect to see?
   
   no lost event
   
   ### What did you see instead?
   
   could see error log in broker and show 
   Failed to peek sticky key from the message metadata
   
   it look like thread safe issue, because it happen randomly. 
   in 1M events, it only happen few times.
   
   
   
   ### Anything else?
   
   the error similar to 
   https://github.com/apache/pulsar/issues/10967 but I think it's different 
issue.
   
   the data in bookkeeper is correct.
   I can download the event from bookkeeper and parse it successfully.
   or consume the same event few minutes later and it could consume 
successfully.
   but all subscriptions will get the same error in the same event in real time 
consumer(zero backlog).
   
   
   I have trace source code.
   it happens in 
   PersistentDispatcherMultipleConsumers.readEntriesComplete -> 
AbstractBaseDispatcher.filterEntriesForConsumer
-> Commands.peekAndCopyMessageMetadata
   
   and I also print the ByteBuf contents,
   it's I could clear see the data isn't the same in bookkeeper
   
   in normal event , the hex code usually start by 010e (magicCrc32c)
   
   000  010e95295fbc0a033a0a6e697267
   
   in one of our error event, the bytebuf have about 48 bytes strange data, 
then continue with normal data
   
   
   000  a61002007239 
<== from here

   020  02001339ea17a8b08b8efa5e
   
   040  2af02675f6451623d17edc34526def44 
<=== until here is garbage
 
   060  010e95295fbc0a033a0a6e697267 
<== from here is normal data
   -
   
   this is just an example, but sometimes the first new bytes are correct and 
something wrong after first new bytes.
   
   
   I am still trying to debug when and how the ByteBuf returns incorrect data, 
and why it only happens during stress testing. It is still not easy to 
reproduce using the perf tool, but we can 100% reproduce it in our producer 
code.
   
   Does anyone have any idea what could be causing this issue, and any 
suggestions on which library or class may have potential issues? Additionally, 
any suggestions on how to debug this issue or if I need to print any specific 
information to help identify the root cause would be appreciated. Thank you.
   
   
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][test] Flaky-test: ManagedLedgerTest.testTimestampOnWorkingLedger [pulsar]

2024-04-26 Thread via GitHub


Technoboy- commented on code in PR #22600:
URL: https://github.com/apache/pulsar/pull/22600#discussion_r1580764559


##
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java:
##
@@ -2473,19 +2473,17 @@ public void testTimestampOnWorkingLedger() throws 
Exception {
 
 ml.addEntry("msg02".getBytes());
 
+// reopen a new ml2
 ml.close();
-// Thread.sleep(1000);
-iter = ml.getLedgersInfoAsList().iterator();
-ts = -1;
-while (iter.hasNext()) {
-LedgerInfo i = iter.next();
-if (iter.hasNext()) {
-assertTrue(ts <= i.getTimestamp(), i.toString());
-ts = i.getTimestamp();
-} else {
-assertTrue(i.getTimestamp() > 0, "well closed LedgerInfo 
should set a timestamp > 0");
-}
-}
+ManagedLedgerImpl ml2 = (ManagedLedgerImpl) 
factory.open("my_test_ledger", conf);

Review Comment:
   Need to close the `ml2`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][test] Flaky-test: ManagedLedgerTest.testTimestampOnWorkingLedger [pulsar]

2024-04-26 Thread via GitHub


codecov-commenter commented on PR #22600:
URL: https://github.com/apache/pulsar/pull/22600#issuecomment-2079013686

   ## 
[Codecov](https://app.codecov.io/gh/apache/pulsar/pull/22600?dropdown=coverage=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 Report
   All modified and coverable lines are covered by tests :white_check_mark:
   > Project coverage is 73.92%. Comparing base 
[(`bbc6224`)](https://app.codecov.io/gh/apache/pulsar/commit/bbc62245c5ddba1de4b1e7cee4ab49334bc36277?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 to head 
[(`d8cdd6e`)](https://app.codecov.io/gh/apache/pulsar/pull/22600?dropdown=coverage=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache).
   > Report is 193 commits behind head on master.
   
   Additional details and impacted files
   
   
   [![Impacted file tree 
graph](https://app.codecov.io/gh/apache/pulsar/pull/22600/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/22600?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
   
   ```diff
   @@ Coverage Diff  @@
   ## master   #22600  +/-   ##
   
   + Coverage 73.57%   73.92%   +0.34% 
   + Complexity3262432606  -18 
   
 Files  1877 1885   +8 
 Lines139502   140524+1022 
 Branches  1529915452 +153 
   
   + Hits 102638   103877+1239 
   + Misses2890828600 -308 
   - Partials   7956 8047  +91 
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/pulsar/pull/22600/flags?src=pr=flags_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[inttests](https://app.codecov.io/gh/apache/pulsar/pull/22600/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `26.99% <ø> (+2.41%)` | :arrow_up: |
   | 
[systests](https://app.codecov.io/gh/apache/pulsar/pull/22600/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `24.39% <ø> (+0.07%)` | :arrow_up: |
   | 
[unittests](https://app.codecov.io/gh/apache/pulsar/pull/22600/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `73.21% <ø> (+0.37%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   [see 250 files with indirect coverage 
changes](https://app.codecov.io/gh/apache/pulsar/pull/22600/indirect-changes?src=pr=tree-more_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [D] Apache Beam support for Pulsar [pulsar]

2024-04-26 Thread via GitHub


GitHub user hpvd added a comment to the discussion: Apache Beam support for 
Pulsar

if you are interested in Beam support for Pulsar plz upvote this discussion 
topic to gain visibility 
and take a look on open issues in beam repository summarized in 
https://github.com/apache/beam/issues/31078

GitHub link: 
https://github.com/apache/pulsar/discussions/18453#discussioncomment-9234229


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [PR] [fix][broker] Continue closing even when executor is shut down [pulsar]

2024-04-26 Thread via GitHub


lhotari commented on code in PR #22599:
URL: https://github.com/apache/pulsar/pull/22599#discussion_r1580671136


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -1429,7 +1430,14 @@ private CompletableFuture delete(boolean 
failIfHasSubscriptions,
 }
 FutureUtil.waitForAll(futures).thenRunAsync(() -> {
 closeClientFuture.complete(null);
-}, getOrderedExecutor()).exceptionally(ex -> {
+}, command -> {
+try {
+getOrderedExecutor().execute(command);

Review Comment:
   Sure, but that would require more logic. This would address the current 
retry loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Continue closing even when executor is shut down [pulsar]

2024-04-26 Thread via GitHub


poorbarcode commented on code in PR #22599:
URL: https://github.com/apache/pulsar/pull/22599#discussion_r1580626116


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -1429,7 +1430,14 @@ private CompletableFuture delete(boolean 
failIfHasSubscriptions,
 }
 FutureUtil.waitForAll(futures).thenRunAsync(() -> {
 closeClientFuture.complete(null);
-}, getOrderedExecutor()).exceptionally(ex -> {
+}, command -> {
+try {
+getOrderedExecutor().execute(command);

Review Comment:
   This executor will only be closed when the broker is shutdown, so we can 
skip the closing of this topic here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][fn]make sure the classloader for ContextImpl is `functionClassLoader` in different runtimes [pulsar]

2024-04-26 Thread via GitHub


freeznet commented on PR #22501:
URL: https://github.com/apache/pulsar/pull/22501#issuecomment-2078827890

   @lhotari could you please help to review this PR when you have time, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [cleanup] [test] remove useless TestAuthorizationProvider2 [pulsar]

2024-04-26 Thread via GitHub


thetumbled commented on code in PR #22595:
URL: https://github.com/apache/pulsar/pull/22595#discussion_r1580519788


##
pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java:
##
@@ -349,7 +349,8 @@ public void testSubscriberPermission() throws Exception {
 } catch (Exception e) {
 // my-sub1 has no msg backlog, so expire message won't be issued 
on that subscription
 assertTrue(e.getMessage().startsWith("Expire message by timestamp 
not issued on topic"));
-}sub1Admin.topics().peekMessages(topicName, subscriptionName, 
1);
+}
+sub1Admin.topics().peekMessages(topicName, subscriptionName, 1);

Review Comment:
   Just to correct the code format by the way, not logical change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [fix][test] Flaky-test: ManagedLedgerTest.testTimestampOnWorkingLedger [pulsar]

2024-04-26 Thread via GitHub


shibd opened a new pull request, #22600:
URL: https://github.com/apache/pulsar/pull/22600

   ### Motivation
   #22430
   
   After #22034, When the ledger is closed due to full entry,  will trigger 
create a new ledger, However, since the creation of the ledger is asynchronous, 
sometimes the last ledger is not created before the function returns.
   
   
https://github.com/apache/pulsar/blob/57a616eaa79096af5b49db89c99cd39ccc94ec00/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1775-L1778
   
   
   For this test: `ml.close();` is no make sure that the `currentLedger` switch 
successful, and there are two possible case:
   
   - Case1:  **[Ledger01{entry1}], [Ledger02{entry1}], [Ledger03{}]**  
   - Case2:  **[Ledger01{entry1}], [Ledger02{entry1}]** 
   
   **Case 1** will cause the test to fail.
   
   ### Modifications
   When `ml.close` to reopen a new `ManagerLedger` that will make sure into 
`Case1`, and then to assert timestamp of `Ledger02` than 0.
   
   ### Verifying this change
   - Run this test locally 1000 times always successful.
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [fix][broker] Continue closing even when executor is shut down [pulsar]

2024-04-26 Thread via GitHub


lhotari opened a new pull request, #22599:
URL: https://github.com/apache/pulsar/pull/22599

   ### Motivation
   
   ```
   2024-04-26T09:36:28,397 - ERROR - 
[ForkJoinPool.commonPool-worker-10:PersistentTopic] - 
[persistent://pulsar/global/removeClusterTest/__change_events] Error closing 
clients
   java.util.concurrent.CompletionException: 
java.util.concurrent.RejectedExecutionException: Executor is shutting down
at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
 ~[?:?]
at 
java.base/java.util.concurrent.CompletableFuture.uniRunNow(CompletableFuture.java:823)
 ~[?:?]
at 
java.base/java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:803)
 ~[?:?]
at 
java.base/java.util.concurrent.CompletableFuture.thenRunAsync(CompletableFuture.java:2204)
 ~[?:?]
at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$44(PersistentTopic.java:1430)
 ~[classes/:?]
at 
org.apache.pulsar.broker.resources.NamespaceResources$PartitionedTopicResources.lambda$runWithMarkDeleteAsync$9(NamespaceResources.java:359)
 ~[pulsar-broker-common-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
 [?:?]
at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
 [?:?]
at 
java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483)
 [?:?]
at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) [?:?]
at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
 [?:?]
at 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) [?:?]
at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) 
[?:?]
at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
 [?:?]
   Caused by: java.util.concurrent.RejectedExecutionException: Executor is 
shutting down
at 
org.apache.bookkeeper.common.util.SingleThreadExecutor.execute(SingleThreadExecutor.java:208)
 ~[bookkeeper-common-4.17.0.jar:4.17.0]
at 
java.base/java.util.concurrent.CompletableFuture.uniRunNow(CompletableFuture.java:817)
 ~[?:?]
... 12 more
   ```
   
   ### Modifications
   
   - provide custom Executor that catches RejectedExecutionException and runs 
in current thread in that case.
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Avoid being stuck when closing the broker with extensible load manager [pulsar]

2024-04-26 Thread via GitHub


BewareMyPower commented on code in PR #22573:
URL: https://github.com/apache/pulsar/pull/22573#discussion_r1580574803


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##
@@ -926,9 +927,13 @@ public void unloadNamespaceBundlesGracefully() {
 }
 
 public void unloadNamespaceBundlesGracefully(int maxConcurrentUnload, 
boolean closeWithoutWaitingClientDisconnect) {
+if (unloaded) {
+return;
+}

Review Comment:
   It's not in a critical path. Here I use a volatile field just because I 
realized shutdown could also be triggered via an admin API (see 
`BrokersBase#shutDownBrokerGracefully`). Therefore, to avoid thread-safety 
issues, I used volatile here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker][WIP] Don't check replication clusters for ownership of system topics [pulsar]

2024-04-26 Thread via GitHub


heesung-sn commented on PR #22597:
URL: https://github.com/apache/pulsar/pull/22597#issuecomment-2078763715

   To answer your questions, Im not sure. I assume all system topics should 
live in local cluster only,but I could be wrong.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker][WIP] Don't check replication clusters for ownership of system topics [pulsar]

2024-04-26 Thread via GitHub


heesung-sn commented on PR #22597:
URL: https://github.com/apache/pulsar/pull/22597#issuecomment-2078761854

   I realized that Pulsar has some loopholes that unnecessarily try to 
replicate, migrate or do other operations on system topics.
   
   We need to check all system topics and define their behaviors for each 
feature.
   
   System namespace and system topics
   - load report
   - blue-green migration 
   - geo-replication 
   - cross-cluster ownership check
   - and others
   
   
   I am checking the system topics from the new load balancer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Avoid being stuck when closing the broker with extensible load manager [pulsar]

2024-04-26 Thread via GitHub


lhotari commented on code in PR #22573:
URL: https://github.com/apache/pulsar/pull/22573#discussion_r1580566781


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##
@@ -926,9 +927,13 @@ public void unloadNamespaceBundlesGracefully() {
 }
 
 public void unloadNamespaceBundlesGracefully(int maxConcurrentUnload, 
boolean closeWithoutWaitingClientDisconnect) {
+if (unloaded) {
+return;
+}

Review Comment:
   is there a need to prevent multiple calls to concurrently call this method? 
This solution doesn't seem very effective.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker][WIP] Don't check replication clusters for ownership of system topics [pulsar]

2024-04-26 Thread via GitHub


lhotari commented on PR #22597:
URL: https://github.com/apache/pulsar/pull/22597#issuecomment-2078730928

   @poorbarcode @heesung-sn Do you think that relaxing the 
checkLocalOrGetPeerReplicationCluster checks for system topics would be useful? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [fix][broker][WIP] Don't check replication clusters for ownership of system topics [pulsar]

2024-04-26 Thread via GitHub


lhotari opened a new pull request, #22597:
URL: https://github.com/apache/pulsar/pull/22597

   WIP, not ready for review. PR opened in draft mode for initial discussion 
and feedback.
   
   ### Motivation
   
   When investigating another issue and reproducing the problem using these 
instructions, 
https://github.com/apache/pulsar/pull/21948#issuecomment-2078264388, I noticed 
this WARN log message:
   
   ```
   2024-04-26T09:34:01,535 - WARN  - [pulsar-io-35-3:ServerCnx] - Failed to get 
Partitioned Metadata [/127.0.0.1:56820] 
persistent://pulsar/global/removeClusterTest/__change_events: Namespace missing 
local cluster name in clusters list: local_cluster=r1 
ns=pulsar/global/removeClusterTest clusters=[r2, r3]
   org.apache.pulsar.broker.web.RestException: Namespace missing local cluster 
name in clusters list: local_cluster=r1 ns=pulsar/global/removeClusterTest 
clusters=[r2, r3]
   at 
org.apache.pulsar.broker.web.PulsarWebResource.lambda$checkLocalOrGetPeerReplicationCluster$27(PulsarWebResource.java:922)
 ~[classes/:?]
   at 
java.base/java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:757)
 ~[?:?]
   at 
java.base/java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:735)
 ~[?:?]
   at 
java.base/java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2182)
 ~[?:?]
   at 
org.apache.pulsar.broker.web.PulsarWebResource.lambda$checkLocalOrGetPeerReplicationCluster$29(PulsarWebResource.java:912)
 ~[classes/:?]
   at 
java.base/java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:757)
 ~[?:?]
   at 
java.base/java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:735)
 ~[?:?]
   at 
java.base/java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2182)
 ~[?:?]
   at 
org.apache.pulsar.broker.web.PulsarWebResource.checkLocalOrGetPeerReplicationCluster(PulsarWebResource.java:896)
 ~[classes/:?]
   at 
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync(PersistentTopicsBase.java:4304)
 ~[classes/:?]
   at 
org.apache.pulsar.broker.service.ServerCnx.lambda$handlePartitionMetadataRequest$7(ServerCnx.java:610)
 ~[classes/:?]
   at 
java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:684)
 [?:?]
   at 
java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662)
 [?:?]
   at 
java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2168)
 [?:?]
   at 
org.apache.pulsar.broker.service.ServerCnx.handlePartitionMetadataRequest(ServerCnx.java:607)
 [classes/:?]
   at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:134)
 [pulsar-common-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
   ```
   
   ### Modifications
   
   - Don't limit system topics with the `checkLocalOrGetPeerReplicationCluster` 
checks.
   
   ### Additional Context
   
   This is related to #20304 changes since that made some relaxation on the 
checks.
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [improve][broker] Propagate cause exception in TopicBusyException when applicable [pulsar]

2024-04-26 Thread via GitHub


lhotari opened a new pull request, #22596:
URL: https://github.com/apache/pulsar/pull/22596

   ### Motivation
   
   When topic is closing and throws an exception, the exception isn't logged 
and it's swallowed.
   It's better to propagate the cause in TopicBusyException in these causes.
   
   ### Modifications
   
   - Add support for cause exception to TopicBusyException
   - Propagate the cause when closing throws an exception
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [cleanup] [test] remove useless TestAuthorizationProvider2 [pulsar]

2024-04-26 Thread via GitHub


thetumbled commented on code in PR #22595:
URL: https://github.com/apache/pulsar/pull/22595#discussion_r1580519788


##
pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java:
##
@@ -349,7 +349,8 @@ public void testSubscriberPermission() throws Exception {
 } catch (Exception e) {
 // my-sub1 has no msg backlog, so expire message won't be issued 
on that subscription
 assertTrue(e.getMessage().startsWith("Expire message by timestamp 
not issued on topic"));
-}sub1Admin.topics().peekMessages(topicName, subscriptionName, 
1);
+}
+sub1Admin.topics().peekMessages(topicName, subscriptionName, 1);

Review Comment:
   Just to correct the code format by the way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.2 updated: [improve][admin] Check if the topic existed before the permission operations (#22547)

2024-04-26 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new adac20a72ef [improve][admin] Check if the topic existed before the 
permission operations (#22547)
adac20a72ef is described below

commit adac20a72efdc2b1d9b16464ebffb569c41014e9
Author: Jiwei Guo 
AuthorDate: Fri Apr 26 14:05:30 2024 +0800

[improve][admin] Check if the topic existed before the permission 
operations (#22547)
---
 .../pulsar/broker/admin/impl/PersistentTopicsBase.java |  9 ++---
 .../pulsar/broker/admin/AdminApiSchemaWithAuthTest.java|  1 +
 .../java/org/apache/pulsar/broker/admin/AdminApiTest.java  | 12 
 .../apache/pulsar/broker/admin/PersistentTopicsTest.java   | 10 --
 .../org/apache/pulsar/broker/auth/AuthorizationTest.java   | 14 +-
 .../client/api/AuthenticatedProducerConsumerTest.java  |  4 +++-
 .../client/api/AuthorizationProducerConsumerTest.java  |  2 ++
 .../pulsar/websocket/proxy/ProxyAuthorizationTest.java |  8 +---
 8 files changed, 46 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index b0968f494ee..4b29452f98c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -207,6 +207,7 @@ public class PersistentTopicsBase extends AdminResource {
 protected CompletableFuture>> 
internalGetPermissionsOnTopic() {
 // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
 return validateAdminAccessForTenantAsync(namespaceName.getTenant())
+.thenCompose(__ -> internalCheckTopicExists(topicName))
 .thenCompose(__ -> 
getAuthorizationService().getPermissionsAsync(topicName));
 }
 
@@ -258,9 +259,10 @@ public class PersistentTopicsBase extends AdminResource {
Set actions) {
 // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
 validateAdminAccessForTenantAsync(namespaceName.getTenant())
-.thenCompose(__ -> 
validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
-grantPermissionsAsync(topicName, role, actions)
-.thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()
+.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+.thenCompose(__ -> internalCheckTopicExists(topicName))
+.thenCompose(unused1 -> grantPermissionsAsync(topicName, role, 
actions))
+.thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()))
 .exceptionally(ex -> {
 Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
 log.error("[{}] Failed to get permissions for topic {}", 
clientAppId(), topicName, realCause);
@@ -272,6 +274,7 @@ public class PersistentTopicsBase extends AdminResource {
 protected void internalRevokePermissionsOnTopic(AsyncResponse 
asyncResponse, String role) {
 // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
 validateAdminAccessForTenantAsync(namespaceName.getTenant())
+.thenCompose(__ -> internalCheckTopicExists(topicName))
 .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
true, false)
 .thenCompose(metadata -> {
 int numPartitions = metadata.partitions;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
index e89b4ff5e83..2dcb930fbe7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
@@ -120,6 +120,7 @@ public class AdminApiSchemaWithAuthTest extends 
MockedPulsarServiceBaseTest {
 .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : 
brokerUrlTls.toString())
 .authentication(AuthenticationToken.class.getName(), 
PRODUCE_TOKEN)
 .build();
+admin.topics().createNonPartitionedTopic(topicName);
 admin.topics().grantPermission(topicName, "consumer", 
EnumSet.of(AuthAction.consume));
 admin.topics().grantPermission(topicName, "producer", 

Re: [PR] [cleanup] [test] remove useless TestAuthorizationProvider2 [pulsar]

2024-04-26 Thread via GitHub


thetumbled commented on code in PR #22595:
URL: https://github.com/apache/pulsar/pull/22595#discussion_r1580519788


##
pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java:
##
@@ -349,7 +349,8 @@ public void testSubscriberPermission() throws Exception {
 } catch (Exception e) {
 // my-sub1 has no msg backlog, so expire message won't be issued 
on that subscription
 assertTrue(e.getMessage().startsWith("Expire message by timestamp 
not issued on topic"));
-}sub1Admin.topics().peekMessages(topicName, subscriptionName, 
1);
+}
+sub1Admin.topics().peekMessages(topicName, subscriptionName, 1);

Review Comment:
   Just to correct the code format.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [improve][admin] Check if the topic existed before the permission operations (#22547)

2024-04-26 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 69a600e86bb [improve][admin] Check if the topic existed before the 
permission operations (#22547)
69a600e86bb is described below

commit 69a600e86bb5110a118d836125411e941b83764d
Author: Jiwei Guo 
AuthorDate: Fri Apr 26 14:05:30 2024 +0800

[improve][admin] Check if the topic existed before the permission 
operations (#22547)
---
 .../pulsar/broker/admin/impl/PersistentTopicsBase.java  |  9 ++---
 .../pulsar/broker/admin/AdminApiSchemaWithAuthTest.java |  1 +
 .../java/org/apache/pulsar/broker/admin/AdminApiTest.java   | 12 
 .../apache/pulsar/broker/admin/PersistentTopicsTest.java| 10 --
 .../org/apache/pulsar/broker/auth/AuthorizationTest.java| 13 -
 .../client/api/AuthenticatedProducerConsumerTest.java   |  4 +++-
 .../client/api/AuthorizationProducerConsumerTest.java   |  2 ++
 .../pulsar/websocket/proxy/ProxyAuthorizationTest.java  |  8 +---
 8 files changed, 45 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 63ea987bb07..682f41dcdb6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -205,6 +205,7 @@ public class PersistentTopicsBase extends AdminResource {
 protected CompletableFuture>> 
internalGetPermissionsOnTopic() {
 // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
 return validateAdminAccessForTenantAsync(namespaceName.getTenant())
+.thenCompose(__ -> internalCheckTopicExists(topicName))
 .thenCompose(__ -> 
getAuthorizationService().getPermissionsAsync(topicName));
 }
 
@@ -256,9 +257,10 @@ public class PersistentTopicsBase extends AdminResource {
Set actions) {
 // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
 validateAdminAccessForTenantAsync(namespaceName.getTenant())
-.thenCompose(__ -> 
validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
-grantPermissionsAsync(topicName, role, actions)
-.thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()
+.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+.thenCompose(__ -> internalCheckTopicExists(topicName))
+.thenCompose(unused1 -> grantPermissionsAsync(topicName, role, 
actions))
+.thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()))
 .exceptionally(ex -> {
 Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
 log.error("[{}] Failed to get permissions for topic {}", 
clientAppId(), topicName, realCause);
@@ -270,6 +272,7 @@ public class PersistentTopicsBase extends AdminResource {
 protected void internalRevokePermissionsOnTopic(AsyncResponse 
asyncResponse, String role) {
 // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
 validateAdminAccessForTenantAsync(namespaceName.getTenant())
+.thenCompose(__ -> internalCheckTopicExists(topicName))
 .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
true, false)
 .thenCompose(metadata -> {
 int numPartitions = metadata.partitions;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
index e89b4ff5e83..2dcb930fbe7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
@@ -120,6 +120,7 @@ public class AdminApiSchemaWithAuthTest extends 
MockedPulsarServiceBaseTest {
 .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : 
brokerUrlTls.toString())
 .authentication(AuthenticationToken.class.getName(), 
PRODUCE_TOKEN)
 .build();
+admin.topics().createNonPartitionedTopic(topicName);
 admin.topics().grantPermission(topicName, "consumer", 
EnumSet.of(AuthAction.consume));
 admin.topics().grantPermission(topicName, "producer", 

Re: [PR] [improve][admin] Check if the topic existed before the permission operations [pulsar]

2024-04-26 Thread via GitHub


Technoboy- merged PR #22547:
URL: https://github.com/apache/pulsar/pull/22547


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] One topic can be closed multiple times concurrently [pulsar]

2024-04-26 Thread via GitHub


poorbarcode commented on PR #17524:
URL: https://github.com/apache/pulsar/pull/17524#issuecomment-2078692154

   > However, I think this Topic state management needs a serious refactoring.
   
   Agree with you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] One topic can be closed multiple times concurrently [pulsar]

2024-04-26 Thread via GitHub


lhotari commented on PR #17524:
URL: https://github.com/apache/pulsar/pull/17524#issuecomment-2078692302

   > However, I think this Topic state management needs a serious refactoring.
   > 
   > I suggest defining TopicState and revisit topic state transitions in a 
state machine manner.
   
   @heesung-sn I agree that a state machine style would result in a more 
maintainable solution. We can handle that in a second step. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]

2024-04-26 Thread via GitHub


poorbarcode commented on code in PR #21948:
URL: https://github.com/apache/pulsar/pull/21948#discussion_r1580517607


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -3881,9 +3895,27 @@ private void fenceTopicToCloseOrDelete() {
 }
 
 private void unfenceTopicToResume() {
-subscriptions.values().forEach(sub -> sub.resumeAfterFence());
 isFenced = false;
 isClosingOrDeleting = false;
+subscriptions.values().forEach(sub -> sub.resumeAfterFence());
+unfenceReplicatorsToResume();
+}
+
+private void unfenceReplicatorsToResume() {
+checkReplication();

Review Comment:
   Agree with you, we need more than one separate PRs to merge the stats 
`fenced, close, deleting`, just like we discussed 
[here](https://github.com/apache/pulsar/pull/17524#issuecomment-1886688685)



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -3881,9 +3895,27 @@ private void fenceTopicToCloseOrDelete() {
 }
 
 private void unfenceTopicToResume() {
-subscriptions.values().forEach(sub -> sub.resumeAfterFence());
 isFenced = false;
 isClosingOrDeleting = false;
+subscriptions.values().forEach(sub -> sub.resumeAfterFence());
+unfenceReplicatorsToResume();
+}
+
+private void unfenceReplicatorsToResume() {
+checkReplication();

Review Comment:
   Agree with you, we need more than one separate PRs to merge the states 
`fenced, close, deleting`, just like we discussed 
[here](https://github.com/apache/pulsar/pull/17524#issuecomment-1886688685)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org