Re: [PR] [fix] Fix concurrent containers concurrency issue [pulsar]
codecov-commenter commented on PR #22604: URL: https://github.com/apache/pulsar/pull/22604#issuecomment-2080372023 ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/22604?dropdown=coverage=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) Report Attention: Patch coverage is `93.3%` with `2 lines` in your changes are missing coverage. Please review. > Project coverage is 73.96%. Comparing base [(`bbc6224`)](https://app.codecov.io/gh/apache/pulsar/commit/bbc62245c5ddba1de4b1e7cee4ab49334bc36277?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) to head [(`3a2c24a`)](https://app.codecov.io/gh/apache/pulsar/pull/22604?dropdown=coverage=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache). > Report is 201 commits behind head on master. Additional details and impacted files [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/22604/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/22604?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) ```diff @@ Coverage Diff @@ ## master #22604 +/- ## + Coverage 73.57% 73.96% +0.38% - Complexity3262433148 +524 Files 1877 1885 +8 Lines139502 140543+1041 Branches 1529915431 +132 + Hits 102638 103946+1308 + Misses2890828563 -345 - Partials 7956 8034 +78 ``` | [Flag](https://app.codecov.io/gh/apache/pulsar/pull/22604/flags?src=pr=flags_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | Coverage Δ | | |---|---|---| | [inttests](https://app.codecov.io/gh/apache/pulsar/pull/22604/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `27.27% <53.33%> (+2.68%)` | :arrow_up: | | [systests](https://app.codecov.io/gh/apache/pulsar/pull/22604/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `24.61% <56.66%> (+0.28%)` | :arrow_up: | | [unittests](https://app.codecov.io/gh/apache/pulsar/pull/22604/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `73.24% <93.33%> (+0.39%)` | :arrow_up: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. | [Files](https://app.codecov.io/gh/apache/pulsar/pull/22604?dropdown=coverage=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | Coverage Δ | | |---|---|---| | [...common/util/collections/ConcurrentLongHashMap.java](https://app.codecov.io/gh/apache/pulsar/pull/22604?src=pr=tree=pulsar-common%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fcommon%2Futil%2Fcollections%2FConcurrentLongHashMap.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWNvbW1vbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbW1vbi91dGlsL2NvbGxlY3Rpb25zL0NvbmN1cnJlbnRMb25nSGFzaE1hcC5qYXZh) | `90.40% <100.00%> (+1.51%)` | :arrow_up: | | [...til/collections/ConcurrentLongLongPairHashMap.java](https://app.codecov.io/gh/apache/pulsar/pull/22604?src=pr=tree=pulsar-common%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fcommon%2Futil%2Fcollections%2FConcurrentLongLongPairHashMap.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWNvbW1vbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbW1vbi91dGlsL2NvbGxlY3Rpb25zL0NvbmN1cnJlbnRMb25nTG9uZ1BhaXJIYXNoTWFwLmphdmE=) | `89.23% <100.00%> (+0.71%)` | :arrow_up: | | [...common/util/collections/ConcurrentOpenHashMap.java](https://app.codecov.io/gh/apache/pulsar/pull/22604?src=pr=tree=pulsar-common%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fcommon%2Futil%2Fcollections%2FConcurrentOpenHashMap.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWNvbW1vbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbW1vbi91dGlsL2NvbGxlY3Rpb25zL0NvbmN1cnJlbnRPcGVuSGFzaE1hcC5qYXZh) | `94.59% <100.00%> (-0.30%)` | :arrow_down: | |
[PR] [fix] Fix concurrent containers concurrency issue [pulsar]
dao-jun opened a new pull request, #22604: URL: https://github.com/apache/pulsar/pull/22604 Fixes https://github.com/apache/pulsar/issues/22603 ### Motivation Fix concurrent containers concurrency issue ### Modifications ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] [Bug] Pulsar concurrent containers has concurrency issues [pulsar]
dao-jun opened a new issue, #22603: URL: https://github.com/apache/pulsar/issues/22603 ### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [X] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### Version master branch ### Minimal reproduce step See: https://github.com/apache/bookkeeper/issues/4318 `ConcurrentLongHashMap`, `ConcurrentLongLongPairHashMap`, `ConcurrentLongPairSet`, `ConcurrentOpenHashMap`, `ConcurrentOpenHashSet` under `org.apache.pulsar.common.util.collections` package are invovled. ### What did you expect to see? NONE ### What did you see instead? NONE ### Anything else? _No response_ ### Are you willing to submit a PR? - [X] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [improve][broker] perf: Reduce stickyHash calculations of non-persistent topics in SHARED subscriptions (#22536)
This is an automated email from the ASF dual-hosted git repository. ayegorov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new bf5d6aac1b6 [improve][broker] perf: Reduce stickyHash calculations of non-persistent topics in SHARED subscriptions (#22536) bf5d6aac1b6 is described below commit bf5d6aac1b62d195c544a486bcefec676948a3a4 Author: Andrey Yegorov <8622884+dl...@users.noreply.github.com> AuthorDate: Fri Apr 26 13:22:19 2024 -0700 [improve][broker] perf: Reduce stickyHash calculations of non-persistent topics in SHARED subscriptions (#22536) --- .../org/apache/pulsar/broker/service/Consumer.java | 21 --- ...istentStickyKeyDispatcherMultipleConsumers.java | 30 +- ...ntStickyKeyDispatcherMultipleConsumersTest.java | 8 +++--- 3 files changed, 45 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 6b2028095e2..b1c3687b3a0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -286,16 +286,29 @@ public class Consumer { totalChunkedMessages, redeliveryTracker, DEFAULT_CONSUMER_EPOCH); } +public Future sendMessages(final List entries, EntryBatchSizes batchSizes, + EntryBatchIndexesAcks batchIndexesAcks, + int totalMessages, long totalBytes, long totalChunkedMessages, + RedeliveryTracker redeliveryTracker, long epoch) { +return sendMessages(entries, null, batchSizes, batchIndexesAcks, totalMessages, totalBytes, +totalChunkedMessages, redeliveryTracker, epoch); +} + /** * Dispatch a list of entries to the consumer. * It is also responsible to release entries data and recycle entries object. * * @return a SendMessageInfo object that contains the detail of what was sent to consumer */ -public Future sendMessages(final List entries, EntryBatchSizes batchSizes, +public Future sendMessages(final List entries, + final List stickyKeyHashes, + EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks, - int totalMessages, long totalBytes, long totalChunkedMessages, - RedeliveryTracker redeliveryTracker, long epoch) { + int totalMessages, + long totalBytes, + long totalChunkedMessages, + RedeliveryTracker redeliveryTracker, + long epoch) { this.lastConsumedTimestamp = System.currentTimeMillis(); if (entries.isEmpty() || totalMessages == 0) { @@ -323,7 +336,7 @@ public class Consumer { // because this consumer is possible to disconnect at this time. if (pendingAcks != null) { int batchSize = batchSizes.getBatchSize(i); -int stickyKeyHash = getStickyKeyHash(entry); +int stickyKeyHash = stickyKeyHashes == null ? getStickyKeyHash(entry) : stickyKeyHashes.get(i); long[] ackSet = batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i); if (ackSet != null) { unackedMessages -= (batchSize - BitSet.valueOf(ackSet).cardinality()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java index 2cad253f96e..fb7bd22de94 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java @@ -126,6 +126,14 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis } }; +private static final FastThreadLocal>> localGroupedStickyKeyHashes = +new FastThreadLocal>>() { +@Override +protected Map> initialValue() throws Exception { +return new HashMap<>(); +} +}; + @Override public void sendMessages(List entries) { if (entries.isEmpty()) { @@
Re: [PR] [improve][broker] perf: Reduce stickyHash calculations of non-persistent topics in SHARED subscriptions [pulsar]
dlg99 merged PR #22536: URL: https://github.com/apache/pulsar/pull/22536 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] Don't use forkjoin pool by default for deleting partitioned topics [pulsar]
lhotari merged PR #22598: URL: https://github.com/apache/pulsar/pull/22598 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [improve][broker] Don't use forkjoin pool by default for deleting partitioned topics (#22598)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 8323a3c4991 [improve][broker] Don't use forkjoin pool by default for deleting partitioned topics (#22598) 8323a3c4991 is described below commit 8323a3c49912976aee723787fa67bee4d7d8d846 Author: Lari Hotari AuthorDate: Fri Apr 26 23:17:51 2024 +0300 [improve][broker] Don't use forkjoin pool by default for deleting partitioned topics (#22598) --- .../apache/pulsar/broker/resources/NamespaceResources.java | 14 +++--- .../apache/pulsar/broker/resources/PulsarResources.java| 12 ++-- .../main/java/org/apache/pulsar/broker/PulsarService.java | 2 +- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 1ba353dccaa..975b23192f9 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; @@ -54,10 +56,14 @@ public class NamespaceResources extends BaseResources { private static final String NAMESPACE_BASE_PATH = "/namespace"; public NamespaceResources(MetadataStore configurationStore, int operationTimeoutSec) { +this(configurationStore, operationTimeoutSec, ForkJoinPool.commonPool()); +} + +public NamespaceResources(MetadataStore configurationStore, int operationTimeoutSec, Executor executor) { super(configurationStore, Policies.class, operationTimeoutSec); this.configurationStore = configurationStore; isolationPolicies = new IsolationPolicyResources(configurationStore, operationTimeoutSec); -partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec); +partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec, executor); } public CompletableFuture> listNamespacesAsync(String tenant) { @@ -234,9 +240,11 @@ public class NamespaceResources extends BaseResources { public static class PartitionedTopicResources extends BaseResources { private static final String PARTITIONED_TOPIC_PATH = "/admin/partitioned-topics"; +private final Executor executor; -public PartitionedTopicResources(MetadataStore configurationStore, int operationTimeoutSec) { +public PartitionedTopicResources(MetadataStore configurationStore, int operationTimeoutSec, Executor executor) { super(configurationStore, PartitionedTopicMetadata.class, operationTimeoutSec); +this.executor = executor; } public CompletableFuture updatePartitionedTopicAsync(TopicName tn, Function { future.complete(deleteResult); } }); -}); +}, executor); return future; } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java index fe7ffe0bc7b..cc64eeb52f6 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.resources; import java.util.Optional; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; import lombok.Getter; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreConfig; @@ -57,13 +59,19 @@ public class PulsarResources { public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore) { this(localMetadataStore, configurationMetadataStore, DEFAULT_OPERATION_TIMEOUT_SEC); } + +public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore, + int operationTimeoutSec) { +this(localMetadataStore, configurationMetadataStore, operationTimeoutSec, ForkJoinPool.commonPool()); +} + public PulsarResources(MetadataStore localMetadataStore, MetadataStore
(pulsar) branch master updated: [improve][broker] Close TopicPoliciesService to allow Pulsar broker graceful shutdown (#22589)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 7a44c801f86 [improve][broker] Close TopicPoliciesService to allow Pulsar broker graceful shutdown (#22589) 7a44c801f86 is described below commit 7a44c801f86c4276533b0f008e768fb8deba4abc Author: Lari Hotari AuthorDate: Fri Apr 26 23:17:18 2024 +0300 [improve][broker] Close TopicPoliciesService to allow Pulsar broker graceful shutdown (#22589) --- .../org/apache/pulsar/broker/PulsarService.java| 5 ++ .../SystemTopicBasedTopicPoliciesService.java | 63 +++--- .../SystemTopicTxnBufferSnapshotService.java | 20 ++- .../broker/service/TopicPoliciesService.java | 7 ++- 4 files changed, 87 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index c21c7dc771e..51dffc20d07 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -565,6 +565,11 @@ public class PulsarService implements AutoCloseable, ShutdownService { transactionBufferClient.close(); } +if (topicPoliciesService != null) { +topicPoliciesService.close(); +topicPoliciesService = null; +} + if (client != null) { client.close(); client = null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 0449e5c885c..6d18d6d61b0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -32,6 +32,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nonnull; import org.apache.commons.lang3.concurrent.ConcurrentInitializer; @@ -72,6 +73,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic private final PulsarService pulsarService; private final HashSet localCluster; private final String clusterName; +private final AtomicBoolean closed = new AtomicBoolean(false); private final ConcurrentInitializer namespaceEventsSystemTopicFactoryLazyInitializer = new LazyInitializer<>() { @@ -110,12 +112,18 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic this.writerCaches = Caffeine.newBuilder() .expireAfterAccess(5, TimeUnit.MINUTES) .removalListener((namespaceName, writer, cause) -> { -((SystemTopicClient.Writer) writer).closeAsync().exceptionally(ex -> { -log.error("[{}] Close writer error.", namespaceName, ex); -return null; -}); +try { +((SystemTopicClient.Writer) writer).close(); +} catch (Exception e) { +log.error("[{}] Close writer error.", namespaceName, e); +} }) +.executor(pulsarService.getExecutor()) .buildAsync((namespaceName, executor) -> { +if (closed.get()) { +return CompletableFuture.failedFuture( +new BrokerServiceException(getClass().getName() + " is closed.")); +} SystemTopicClient systemTopicClient = getNamespaceEventsSystemTopicFactory() .createTopicPoliciesSystemTopicClient(namespaceName); return systemTopicClient.newWriterAsync(); @@ -382,6 +390,10 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic protected CompletableFuture> createSystemTopicClient( NamespaceName namespace) { +if (closed.get()) { +return CompletableFuture.failedFuture( +new BrokerServiceException(getClass().getName() + " is closed.")); +} try { createSystemTopicFactoryIfNeeded(); } catch (PulsarServerException ex) { @@ -430,6 +442,11 @@ public class SystemTopicBasedTopicPoliciesService implements
Re: [PR] [improve][broker] Close TopicPoliciesService to allow Pulsar broker graceful shutdown [pulsar]
lhotari merged PR #22589: URL: https://github.com/apache/pulsar/pull/22589 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [fix][broker] Continue closing even when executor is shut down (#22599)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new f8f256cfbdc [fix][broker] Continue closing even when executor is shut down (#22599) f8f256cfbdc is described below commit f8f256cfbdcd780c81442dc5566b6ed071141645 Author: Lari Hotari AuthorDate: Fri Apr 26 23:16:54 2024 +0300 [fix][broker] Continue closing even when executor is shut down (#22599) --- .../pulsar/broker/service/persistent/PersistentTopic.java | 10 +- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c7d762d595c..155b6777882 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -45,6 +45,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -1429,7 +1430,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } FutureUtil.waitForAll(futures).thenRunAsync(() -> { closeClientFuture.complete(null); -}, getOrderedExecutor()).exceptionally(ex -> { +}, command -> { +try { +getOrderedExecutor().execute(command); +} catch (RejectedExecutionException e) { +// executor has been shut down, execute in current thread +command.run(); +} +}).exceptionally(ex -> { log.error("[{}] Error closing clients", topic, ex); unfenceTopicToResume(); closeClientFuture.completeExceptionally(ex);
Re: [PR] [fix][broker] Continue closing even when executor is shut down [pulsar]
lhotari merged PR #22599: URL: https://github.com/apache/pulsar/pull/22599 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][misc] Upgrade slf4j to 2.0.13 [pulsar]
nodece commented on PR #22391: URL: https://github.com/apache/pulsar/pull/22391#issuecomment-2079804762 Could you provide the log4j/log4j2 and slf4j versions? I can help you solve the issues you are facing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [D] What is the intended result of creating a cluster via PUT /admin/v2/clusters/{cluster-name}? [pulsar]
GitHub user visortelle edited a comment on the discussion: What is the intended result of creating a cluster via PUT /admin/v2/clusters/{cluster-name}? @dameiss-tibco hi You'll need to use these cluster names in further operations. For example, the "create tenant" operation requires at least one allowed cluster. The "create namespace" operation also requires clusters when you have at least two clusters in your Pulsar instance. > with completely bogus values for brokerServiceUrl and serviceUrl, and the > cluster is created. But it is obviously not useable since the URLs are invalid https://github.com/apache/pulsar/assets/9302460/662547af-26c7-4061-98dc-0ed85e096da6;> >From the screenshot above you can see that there is some URL validation. At >the same time, you're right that the cluster you register may be unavailable >at the moment of its registration in another cluster. In practice, it doesn't introduce any problems. GitHub link: https://github.com/apache/pulsar/discussions/22602#discussioncomment-9239830 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] What is the intended result of creating a cluster via PUT /admin/v2/clusters/{cluster-name}? [pulsar]
GitHub user visortelle edited a comment on the discussion: What is the intended result of creating a cluster via PUT /admin/v2/clusters/{cluster-name}? @dameiss-tibco hi You'll need to use these cluster names in further operations. For example, the "create tenant" operation requires at least one allowed cluster. The "create namespace" operation also requires clusters when you have at least two clusters in your Pulsar instance. > with completely bogus values for brokerServiceUrl and serviceUrl, and the > cluster is created. But it is obviously not useable since the URLs are invalid https://github.com/apache/pulsar/assets/9302460/662547af-26c7-4061-98dc-0ed85e096da6;> >From the screenshot above you can see that there is some URL validation. At >the same time, you're right that the cluster you register may be unavailable >at the moment of its registration in another cluster. In practice, it doesn't introduce any problems. It seems a completely valid behavior to me for a distributed system that should continue to work where some component unexpectedly become unavailable, e.g. one of the clusters is down. Example unrelated to Pulsar. Imagine two services A and B where each should know about another to work properly. If you make the availability check on service start, you're in trouble. If on starting service A you'd check that service B is available, and service B would check that service A is available, they both wouldn't be able to start due to a cyclic dependency. You can consider the cluster registration a similar case to the example above, but the configuration is dynamic. GitHub link: https://github.com/apache/pulsar/discussions/22602#discussioncomment-9239830 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] What is the intended result of creating a cluster via PUT /admin/v2/clusters/{cluster-name}? [pulsar]
GitHub user visortelle edited a comment on the discussion: What is the intended result of creating a cluster via PUT /admin/v2/clusters/{cluster-name}? @dameiss-tibco hi You'll need to use these cluster names in further operations. For example, the "create tenant" operation requires at least one allowed cluster. The "create namespace" operation also requires clusters when you have at least two clusters in your Pulsar instance. > with completely bogus values for brokerServiceUrl and serviceUrl, and the > cluster is created. But it is obviously not useable since the URLs are invalid https://github.com/apache/pulsar/assets/9302460/662547af-26c7-4061-98dc-0ed85e096da6;> >From the screenshot above you can see that there is some URL validation. At >the same time, you're right that the cluster you register may be unavailable >at the moment of its registration in another cluster. In practice, it doesn't introduce any problems and seems a completely valid behavior to me. Example unrelated to Pulsar. Imagine two services A and B where each should know about another to work properly. If you make the availability check on service start, you're in trouble. If on starting service A you'd check that service B is available, and service B would check that service A is available, they both wouldn't be able to start due to a cyclic dependency. You can consider the cluster registration a similar case to the example above, but the configuration is dynamic. GitHub link: https://github.com/apache/pulsar/discussions/22602#discussioncomment-9239830 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] What is the intended result of creating a cluster via PUT /admin/v2/clusters/{cluster-name}? [pulsar]
GitHub user visortelle edited a comment on the discussion: What is the intended result of creating a cluster via PUT /admin/v2/clusters/{cluster-name}? @dameiss-tibco hi You'll need to use these cluster names in further operations. For example, the "create tenant" operation requires at least one allowed cluster. The "create namespace" operation also requires clusters when you have at least two clusters in your Pulsar instance. > with completely bogus values for brokerServiceUrl and serviceUrl, and the > cluster is created. But it is obviously not useable since the URLs are invalid https://github.com/apache/pulsar/assets/9302460/662547af-26c7-4061-98dc-0ed85e096da6;> >From the screenshot above you can see that there is some URL validation. At >the same time, you're right that the cluster you register may be unavailable >at the moment of its registration in another cluster. In practice, it doesn't introduce any problems and seems a completely valid behavior to me. Example unrelated to Pulsar. Imagine two services A and B where each should know about another to work properly. If you make the availability check on service start, you're in trouble. If on starting service A you'd check that service B is available, and service B would check that service A is available, they both wouldn't be able to start. Such a cyclic dependency. You can consider the cluster registration a similar case to the example above, but the configuration is dynamic. GitHub link: https://github.com/apache/pulsar/discussions/22602#discussioncomment-9239830 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] What is the intended result of creating a cluster via PUT /admin/v2/clusters/{cluster-name}? [pulsar]
GitHub user visortelle added a comment to the discussion: What is the intended result of creating a cluster via PUT /admin/v2/clusters/{cluster-name}? @dameiss-tibco hi You'll need to use these cluster names in further operations. For example, the "create tenant" operation requires at least one allowed cluster. The "create namespace" operation also requires clusters when you have at least two clusters in your Pulsar instance. > with completely bogus values for brokerServiceUrl and serviceUrl, and the > cluster is created. But it is obviously not useable since the URLs are invalid https://github.com/apache/pulsar/assets/9302460/662547af-26c7-4061-98dc-0ed85e096da6;> >From the screenshot above you can see that there is some URL validation. At >the same time, you're right that the cluster you register may be unavailable >at the moment of its registration in another cluster. In practice, it doesn't introduce any problems and seems a completely valid behavior to me. Example unrelated to Pulsar. Imagine two services A and B where each should know about another to work properly. Such cyclic dependency. If on starting service A you'd check that service B is available, and service B would check that service A is available, they both wouldn't be able to start. You can consider the cluster registration a similar case to the example above, but the configuration is dynamic. GitHub link: https://github.com/apache/pulsar/discussions/22602#discussioncomment-9239830 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [PR] [fix][broker] One topic can be closed multiple times concurrently [pulsar]
poorbarcode commented on PR #17524: URL: https://github.com/apache/pulsar/pull/17524#issuecomment-2079693146 > @poorbarcode looks like OneWayReplicatorTest.testUnFenceTopicToReuse fails Sorry, I found a behavior change(before: broker tries to unfence topic to reuse when clos clients fail; after: this mechanism does not work), and it is difficulte to be fixed **gracefully**, I will try to fix it tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][meta] Fix invalid use of drain API and race condition in closing metadata store [pulsar]
merlimat commented on code in PR #22585: URL: https://github.com/apache/pulsar/pull/22585#discussion_r1581199791 ## pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java: ## @@ -99,7 +103,13 @@ public void close() throws Exception { private void flush() { while (!readOps.isEmpty()) { List ops = new ArrayList<>(); -readOps.drain(ops::add, maxOperations); +for (int i = 0; i < maxOperations; i++) { Review Comment: This one should be ok, since it's already done in a loop: `while (!readOps.isEmpty()) {...}` ## pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java: ## @@ -87,9 +87,13 @@ public void close() throws Exception { // Fail all the pending items MetadataStoreException ex = new MetadataStoreException.AlreadyClosedException("Metadata store is getting closed"); -readOps.drain(op -> op.getFuture().completeExceptionally(ex)); -writeOps.drain(op -> op.getFuture().completeExceptionally(ex)); - +MetadataOp op; +while ((op = readOps.poll()) != null) { +op.getFuture().completeExceptionally(ex); +} +while ((op = writeOps.poll()) != null) { +op.getFuture().completeExceptionally(ex); +} Review Comment: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix][doc] Fixes example code in Cluster Failover Java Client [pulsar-site]
sandeep-mst opened a new pull request, #891: URL: https://github.com/apache/pulsar-site/pull/891 Cluster failover Java client is available for 2.10.0 and later versions. This PR fixes example code shown in [Automatic-failover](https://pulsar.apache.org/docs/3.2.x/client-libraries-cluster-level-failover/#automatic-failover) and [Controlled-failover](https://pulsar.apache.org/docs/3.2.x/client-libraries-cluster-level-failover/#controlled-failover) It adds required ServiceUrlProvider while building PulsarClient `.serviceUrlProvider(failover)` ### ✅ Contribution Checklist - [x] I read the [contribution guide](https://pulsar.apache.org/contribute/document-contribution/) - [x] I updated the [versioned docs](https://pulsar.apache.org/contribute/document-contribution/#update-versioned-docs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug] [ARM] Partially acknowledged batches are not redelivered [pulsar-client-cpp]
bph-sag commented on issue #424: URL: https://github.com/apache/pulsar-client-cpp/issues/424#issuecomment-2079446323 So, looking at the internal stats for the topic, it does appear that the mark-delete position indicates that the full batch is indeed acknowledged for our aarch32 runs. ### Unsuccessful run, with missed events Log lines before termination of first consumer (last acknowledgement and the message associated with that acknowledgement, aarch32) (Ledger 17, entry 1, 190/309) ```log 2024-04-26 14:44:32.322 INFO [3944048800] - Got Message with batch index: 190, of batch size: 309, ledger: 17, entry: 1, full message was: [...] 2024-04-26 14:44:32.324 INFO [3944048800] - SUCCESS ACK - partition: -1, ledgerId: 17, entryId: 1, batchIndex: 190, topicName: persistent://public/default/ConnPlugincor24546ea9837f45e49629df60696e54502e4 ``` Mark delete position from `persistent/:tenant/:namespace/:topic/internalStats?authoritative=false=false` ```json "markDeletePosition": "17:1", ``` First message received on the 2nd consumer (Ledger 17, entry 2, 1/309) ``` 2024-04-26 14:45:03.577 INFO [3948247200] - Got Message with batch index: 0, of batch size: 309, ledger: 17, entry: 2, full message was: [...] ``` ### Successful run, no missing messages (x86_64) Last messages prior to shutdown (Ledger 36, entry 2, 198/309) ```log 2024-04-25 14:18:49.657 INFO [3946145952] - SUCCESS ACK - partition: -1, ledgerId: 36, entryId: 2, batchIndex: 198, topicName: persistent://public/default/ConnPlugincor245ae0980739d4344e98f3f3b29f0f07d8b 2024-04-25 14:18:49.657 INFO [3946145952] - Got Message with batch index: 198, of batch size: 309, ledger: 36, entry: 2, full message was: [...] ``` Mark delete position from `persistent/:tenant/:namespace/:topic/internalStats?authoritative=false=false` ```json "markDeletePosition": "36:1", ``` First message received on 2nd consumer (Ledger 36, entry 2, 1/309) ``` 2024-04-25 14:19:11.523 INFO [3948247200] - Got Message with batch index: 0, of batch size: 309, ledger: 36, entry: 2, full message was: [...] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.1 updated: [improve][broker] Propagate cause exception in TopicBusyException when applicable (#22596)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new d00d715f487 [improve][broker] Propagate cause exception in TopicBusyException when applicable (#22596) d00d715f487 is described below commit d00d715f487c1a12d9756c0460f8648e3920903d Author: Lari Hotari AuthorDate: Fri Apr 26 16:23:36 2024 +0300 [improve][broker] Propagate cause exception in TopicBusyException when applicable (#22596) (cherry picked from commit 3b9602c04db5a6577e2dc2fabddbf7a6e1d1a4a2) --- .../java/org/apache/pulsar/broker/service/BrokerServiceException.java | 4 .../pulsar/broker/service/nonpersistent/NonPersistentTopic.java | 3 ++- .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java | 3 ++- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index 831d6068e20..6abe40f811d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -146,6 +146,10 @@ public class BrokerServiceException extends Exception { public TopicBusyException(String msg) { super(msg); } + +public TopicBusyException(String msg, Throwable t) { +super(msg, t); +} } public static class TopicNotFoundException extends BrokerServiceException { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 911ea7896b3..2bea208cc6a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -472,7 +472,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol } }).exceptionally(ex -> { deleteFuture.completeExceptionally( -new TopicBusyException("Failed to close clients before deleting topic.")); +new TopicBusyException("Failed to close clients before deleting topic.", +FutureUtil.unwrapCompletionException(ex))); return null; }); } finally { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index ee367750ad6..a473257e191 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1442,7 +1442,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal }).exceptionally(ex->{ unfenceTopicToResume(); deleteFuture.completeExceptionally( -new TopicBusyException("Failed to close clients before deleting topic.")); +new TopicBusyException("Failed to close clients before deleting topic.", +FutureUtil.unwrapCompletionException(ex))); return null; });
(pulsar) branch branch-2.11 updated: [improve][broker] Propagate cause exception in TopicBusyException when applicable (#22596)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 82308b95f59 [improve][broker] Propagate cause exception in TopicBusyException when applicable (#22596) 82308b95f59 is described below commit 82308b95f5935a65f0098e77e143d48cf2ef2d8c Author: Lari Hotari AuthorDate: Fri Apr 26 16:23:36 2024 +0300 [improve][broker] Propagate cause exception in TopicBusyException when applicable (#22596) (cherry picked from commit 3b9602c04db5a6577e2dc2fabddbf7a6e1d1a4a2) --- .../java/org/apache/pulsar/broker/service/BrokerServiceException.java | 4 .../pulsar/broker/service/nonpersistent/NonPersistentTopic.java | 3 ++- .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java | 3 ++- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index c6d8ffabcec..04146383c1f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -132,6 +132,10 @@ public class BrokerServiceException extends Exception { public TopicBusyException(String msg) { super(msg); } + +public TopicBusyException(String msg, Throwable t) { +super(msg, t); +} } public static class TopicNotFoundException extends BrokerServiceException { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index dd7815ef98b..5e3b3d8bd39 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -455,7 +455,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol } }).exceptionally(ex -> { deleteFuture.completeExceptionally( -new TopicBusyException("Failed to close clients before deleting topic.")); +new TopicBusyException("Failed to close clients before deleting topic.", +FutureUtil.unwrapCompletionException(ex))); return null; }); } finally { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index d8c1629003b..71696007049 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1311,7 +1311,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal }).exceptionally(ex->{ unfenceTopicToResume(); deleteFuture.completeExceptionally( -new TopicBusyException("Failed to close clients before deleting topic.")); +new TopicBusyException("Failed to close clients before deleting topic.", +FutureUtil.unwrapCompletionException(ex))); return null; }); } finally {
(pulsar) branch branch-3.2 updated: [improve][broker] Propagate cause exception in TopicBusyException when applicable (#22596)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 7cc99dfff63 [improve][broker] Propagate cause exception in TopicBusyException when applicable (#22596) 7cc99dfff63 is described below commit 7cc99dfff6397108890e671df52404645e1cbdd3 Author: Lari Hotari AuthorDate: Fri Apr 26 16:23:36 2024 +0300 [improve][broker] Propagate cause exception in TopicBusyException when applicable (#22596) (cherry picked from commit 3b9602c04db5a6577e2dc2fabddbf7a6e1d1a4a2) --- .../java/org/apache/pulsar/broker/service/BrokerServiceException.java | 4 .../pulsar/broker/service/nonpersistent/NonPersistentTopic.java | 3 ++- .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java | 3 ++- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index 831d6068e20..6abe40f811d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -146,6 +146,10 @@ public class BrokerServiceException extends Exception { public TopicBusyException(String msg) { super(msg); } + +public TopicBusyException(String msg, Throwable t) { +super(msg, t); +} } public static class TopicNotFoundException extends BrokerServiceException { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 5c75ec28c50..b0c6443332b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -477,7 +477,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol } }).exceptionally(ex -> { deleteFuture.completeExceptionally( -new TopicBusyException("Failed to close clients before deleting topic.")); +new TopicBusyException("Failed to close clients before deleting topic.", +FutureUtil.unwrapCompletionException(ex))); return null; }); } finally { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index a76ac548e0c..3a1da8ccee7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1470,7 +1470,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal }).exceptionally(ex->{ unfenceTopicToResume(); deleteFuture.completeExceptionally( -new TopicBusyException("Failed to close clients before deleting topic.")); +new TopicBusyException("Failed to close clients before deleting topic.", +FutureUtil.unwrapCompletionException(ex))); return null; });
(pulsar) branch branch-3.0 updated: [improve][broker] Propagate cause exception in TopicBusyException when applicable (#22596)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 6e0d0d98b29 [improve][broker] Propagate cause exception in TopicBusyException when applicable (#22596) 6e0d0d98b29 is described below commit 6e0d0d98b29f7cfff42464d104987bc5408c56a5 Author: Lari Hotari AuthorDate: Fri Apr 26 16:23:36 2024 +0300 [improve][broker] Propagate cause exception in TopicBusyException when applicable (#22596) (cherry picked from commit 3b9602c04db5a6577e2dc2fabddbf7a6e1d1a4a2) --- .../java/org/apache/pulsar/broker/service/BrokerServiceException.java | 4 .../pulsar/broker/service/nonpersistent/NonPersistentTopic.java | 3 ++- .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java | 3 ++- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index 831d6068e20..6abe40f811d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -146,6 +146,10 @@ public class BrokerServiceException extends Exception { public TopicBusyException(String msg) { super(msg); } + +public TopicBusyException(String msg, Throwable t) { +super(msg, t); +} } public static class TopicNotFoundException extends BrokerServiceException { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 4efaee2a1fa..a994c6336ad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -475,7 +475,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol } }).exceptionally(ex -> { deleteFuture.completeExceptionally( -new TopicBusyException("Failed to close clients before deleting topic.")); +new TopicBusyException("Failed to close clients before deleting topic.", +FutureUtil.unwrapCompletionException(ex))); return null; }); } finally { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 85fdc182ccc..6a97ac92625 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1439,7 +1439,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal }).exceptionally(ex->{ unfenceTopicToResume(); deleteFuture.completeExceptionally( -new TopicBusyException("Failed to close clients before deleting topic.")); +new TopicBusyException("Failed to close clients before deleting topic.", +FutureUtil.unwrapCompletionException(ex))); return null; });
(pulsar) branch master updated (3b9602c04db -> f411e3c0f26)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 3b9602c04db [improve][broker] Propagate cause exception in TopicBusyException when applicable (#22596) add f411e3c0f26 [fix][broker] Avoid being stuck when closing the broker with extensible load manager (#22573) No new revisions were added by this update. Summary of changes: .../org/apache/pulsar/broker/PulsarService.java| 3 + .../store/TableViewLoadDataStoreImpl.java | 6 +- .../pulsar/broker/service/BrokerService.java | 11 +++ .../extensions/ExtensibleLoadManagerCloseTest.java | 107 + 4 files changed, 122 insertions(+), 5 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
Re: [I] [Bug] Broker could take 30+ seconds to close with extensible load manager [pulsar]
lhotari closed issue #22569: [Bug] Broker could take 30+ seconds to close with extensible load manager URL: https://github.com/apache/pulsar/issues/22569 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Avoid being stuck when closing the broker with extensible load manager [pulsar]
lhotari merged PR #22573: URL: https://github.com/apache/pulsar/pull/22573 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] Close TopicPoliciesService to allow Pulsar broker graceful shutdown [pulsar]
lhotari commented on code in PR #22589: URL: https://github.com/apache/pulsar/pull/22589#discussion_r1581034618 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java: ## @@ -110,11 +111,13 @@ public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) { this.writerCaches = Caffeine.newBuilder() .expireAfterAccess(5, TimeUnit.MINUTES) .removalListener((namespaceName, writer, cause) -> { -((SystemTopicClient.Writer) writer).closeAsync().exceptionally(ex -> { -log.error("[{}] Close writer error.", namespaceName, ex); -return null; -}); +try { +((SystemTopicClient.Writer) writer).close(); +} catch (IOException e) { Review Comment: done ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java: ## @@ -142,8 +143,26 @@ private SystemTopicClient getTransactionBufferSystemTopicClient(NamespaceName public void close() throws Exception { for (Map.Entry> entry : clients.entrySet()) { -entry.getValue().close(); +try { +entry.getValue().close(); +} catch (Exception e) { +log.error("Failed to close system topic client for namespace {}", entry.getKey(), e); +} +} +clients.clear(); +for (Map.Entry> entry : refCountedWriterMap.entrySet()) { +CompletableFuture> future = entry.getValue().getFuture(); +if (!future.isCompletedExceptionally()) { +future.thenAccept(writer -> { +try { +writer.close(); +} catch (IOException e) { Review Comment: done ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java: ## @@ -740,4 +743,21 @@ protected AsyncLoadingCache } private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedTopicPoliciesService.class); + +@Override +public void close() throws Exception { +writerCaches.synchronous().invalidateAll(); +readerCaches.values().forEach(future -> { +if (future != null && !future.isCompletedExceptionally()) { +future.thenAccept(reader -> { +try { +reader.close(); +} catch (IOException e) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated (69839c72f13 -> 3b9602c04db)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 69839c72f13 [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566) add 3b9602c04db [improve][broker] Propagate cause exception in TopicBusyException when applicable (#22596) No new revisions were added by this update. Summary of changes: .../java/org/apache/pulsar/broker/service/BrokerServiceException.java | 4 .../pulsar/broker/service/nonpersistent/NonPersistentTopic.java | 3 ++- .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java | 3 ++- 3 files changed, 8 insertions(+), 2 deletions(-)
Re: [PR] [improve][broker] Propagate cause exception in TopicBusyException when applicable [pulsar]
lhotari merged PR #22596: URL: https://github.com/apache/pulsar/pull/22596 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.2 updated: [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new b764afa2bb3 [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566) b764afa2bb3 is described below commit b764afa2bb367e20849c1297b282beb5d746b474 Author: Lari Hotari AuthorDate: Fri Apr 26 15:16:47 2024 +0300 [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566) (cherry picked from commit 69839c72f1375d141b56734bc5e041c13e366c57) --- .../org/apache/pulsar/metadata/impl/ZKMetadataStore.java | 15 ++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 079ae3e2ae5..2e88cb33324 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -192,7 +192,20 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore Code code = Code.get(rc); if (code == Code.CONNECTIONLOSS) { // There is the chance that we caused a connection reset by sending or requesting a batch -// that passed the max ZK limit. Retry with the individual operations +// that passed the max ZK limit. + +// Build the log warning message +// summarize the operations by type +String countsByType = ops.stream().collect( + Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1))) +.entrySet().stream().map(e -> e.getValue() + " " + e.getKey().name() + " entries") +.collect(Collectors.joining(", ")); +Long totalSize = ops.stream().collect(Collectors.summingLong(MetadataOp::size)); +log.warn("Connection loss while executing batch operation of {} " ++ "of total data size of {}. " ++ "Retrying individual operations one-by-one.", countsByType, totalSize); + +// Retry with the individual operations executor.schedule(() -> { ops.forEach(o -> batchOperation(Collections.singletonList(o))); }, 100, TimeUnit.MILLISECONDS);
(pulsar) branch branch-3.1 updated: [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 701d4cfe21b [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566) 701d4cfe21b is described below commit 701d4cfe21b8bc291d8530f7510ccdaa6c6280f0 Author: Lari Hotari AuthorDate: Fri Apr 26 15:16:47 2024 +0300 [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566) (cherry picked from commit 69839c72f1375d141b56734bc5e041c13e366c57) --- .../org/apache/pulsar/metadata/impl/ZKMetadataStore.java | 15 ++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 079ae3e2ae5..2e88cb33324 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -192,7 +192,20 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore Code code = Code.get(rc); if (code == Code.CONNECTIONLOSS) { // There is the chance that we caused a connection reset by sending or requesting a batch -// that passed the max ZK limit. Retry with the individual operations +// that passed the max ZK limit. + +// Build the log warning message +// summarize the operations by type +String countsByType = ops.stream().collect( + Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1))) +.entrySet().stream().map(e -> e.getValue() + " " + e.getKey().name() + " entries") +.collect(Collectors.joining(", ")); +Long totalSize = ops.stream().collect(Collectors.summingLong(MetadataOp::size)); +log.warn("Connection loss while executing batch operation of {} " ++ "of total data size of {}. " ++ "Retrying individual operations one-by-one.", countsByType, totalSize); + +// Retry with the individual operations executor.schedule(() -> { ops.forEach(o -> batchOperation(Collections.singletonList(o))); }, 100, TimeUnit.MILLISECONDS);
(pulsar) branch branch-2.11 updated: [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 66a1edb64b9 [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566) 66a1edb64b9 is described below commit 66a1edb64b96ecac39d1191536520b11149da0dc Author: Lari Hotari AuthorDate: Fri Apr 26 15:16:47 2024 +0300 [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566) (cherry picked from commit 69839c72f1375d141b56734bc5e041c13e366c57) --- .../org/apache/pulsar/metadata/impl/ZKMetadataStore.java | 15 ++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 2dbe0a7c3f3..64148d898fc 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -188,7 +188,20 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore Code code = Code.get(rc); if (code == Code.CONNECTIONLOSS) { // There is the chance that we caused a connection reset by sending or requesting a batch -// that passed the max ZK limit. Retry with the individual operations +// that passed the max ZK limit. + +// Build the log warning message +// summarize the operations by type +String countsByType = ops.stream().collect( + Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1))) +.entrySet().stream().map(e -> e.getValue() + " " + e.getKey().name() + " entries") +.collect(Collectors.joining(", ")); +Long totalSize = ops.stream().collect(Collectors.summingLong(MetadataOp::size)); +log.warn("Connection loss while executing batch operation of {} " ++ "of total data size of {}. " ++ "Retrying individual operations one-by-one.", countsByType, totalSize); + +// Retry with the individual operations executor.schedule(() -> { ops.forEach(o -> batchOperation(Collections.singletonList(o))); }, 100, TimeUnit.MILLISECONDS);
(pulsar) branch branch-3.0 updated: [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new ec646f90c48 [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566) ec646f90c48 is described below commit ec646f90c48d9b2664c5d3484d1beab726fdad20 Author: Lari Hotari AuthorDate: Fri Apr 26 15:16:47 2024 +0300 [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566) (cherry picked from commit 69839c72f1375d141b56734bc5e041c13e366c57) --- .../org/apache/pulsar/metadata/impl/ZKMetadataStore.java | 15 ++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 079ae3e2ae5..2e88cb33324 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -192,7 +192,20 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore Code code = Code.get(rc); if (code == Code.CONNECTIONLOSS) { // There is the chance that we caused a connection reset by sending or requesting a batch -// that passed the max ZK limit. Retry with the individual operations +// that passed the max ZK limit. + +// Build the log warning message +// summarize the operations by type +String countsByType = ops.stream().collect( + Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1))) +.entrySet().stream().map(e -> e.getValue() + " " + e.getKey().name() + " entries") +.collect(Collectors.joining(", ")); +Long totalSize = ops.stream().collect(Collectors.summingLong(MetadataOp::size)); +log.warn("Connection loss while executing batch operation of {} " ++ "of total data size of {}. " ++ "Retrying individual operations one-by-one.", countsByType, totalSize); + +// Retry with the individual operations executor.schedule(() -> { ops.forEach(o -> batchOperation(Collections.singletonList(o))); }, 100, TimeUnit.MILLISECONDS);
(pulsar) branch master updated: [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 69839c72f13 [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566) 69839c72f13 is described below commit 69839c72f1375d141b56734bc5e041c13e366c57 Author: Lari Hotari AuthorDate: Fri Apr 26 15:16:47 2024 +0300 [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566) --- .../org/apache/pulsar/metadata/impl/ZKMetadataStore.java | 15 ++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 079ae3e2ae5..2e88cb33324 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -192,7 +192,20 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore Code code = Code.get(rc); if (code == Code.CONNECTIONLOSS) { // There is the chance that we caused a connection reset by sending or requesting a batch -// that passed the max ZK limit. Retry with the individual operations +// that passed the max ZK limit. + +// Build the log warning message +// summarize the operations by type +String countsByType = ops.stream().collect( + Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1))) +.entrySet().stream().map(e -> e.getValue() + " " + e.getKey().name() + " entries") +.collect(Collectors.joining(", ")); +Long totalSize = ops.stream().collect(Collectors.summingLong(MetadataOp::size)); +log.warn("Connection loss while executing batch operation of {} " ++ "of total data size of {}. " ++ "Retrying individual operations one-by-one.", countsByType, totalSize); + +// Retry with the individual operations executor.schedule(() -> { ops.forEach(o -> batchOperation(Collections.singletonList(o))); }, 100, TimeUnit.MILLISECONDS);
Re: [PR] [improve][meta] Log a warning when ZK batch fails with connectionloss [pulsar]
lhotari merged PR #22566: URL: https://github.com/apache/pulsar/pull/22566 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.2 updated: [fix][test] Clear fields in test cleanup to reduce memory consumption (#22583)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 1be23d00295 [fix][test] Clear fields in test cleanup to reduce memory consumption (#22583) 1be23d00295 is described below commit 1be23d00295e9021d66bf38058a18cd2fcb38f31 Author: Lari Hotari AuthorDate: Thu Apr 25 21:54:55 2024 +0300 [fix][test] Clear fields in test cleanup to reduce memory consumption (#22583) (cherry picked from commit 3de14c55de138770ed61d1a14cd883048ea1915c) --- .../pulsar/broker/MultiBrokerTestZKBaseTest.java | 1 + .../apache/pulsar/broker/SLAMonitoringTest.java| 21 +++-- .../loadbalance/LeaderElectionServiceTest.java | 5 +- .../broker/loadbalance/LoadBalancerTest.java | 15 -- .../loadbalance/SimpleLoadManagerImplTest.java | 30 +--- .../loadbalance/extensions/BrokerRegistryTest.java | 10 +++- .../ExtensibleLoadManagerImplBaseTest.java | 9 +++- .../loadbalance/impl/BundleSplitterTaskTest.java | 10 +++- .../impl/ModularLoadManagerImplTest.java | 35 ++ .../broker/service/AdvertisedAddressTest.java | 10 +++- .../pulsar/broker/service/BkEnsemblesTestBase.java | 15 -- .../broker/service/BrokerBookieIsolationTest.java | 6 ++- .../CanReconnectZKClientPulsarServiceBaseTest.java | 24 +++--- .../pulsar/broker/service/MaxMessageSizeTest.java | 15 -- .../broker/service/OneWayReplicatorTestBase.java | 44 ++ .../pulsar/broker/service/ReplicatorTest.java | 8 ++-- .../pulsar/broker/service/ReplicatorTestBase.java | 53 -- .../pulsar/broker/service/TopicOwnerTest.java | 15 -- .../coordinator/TransactionMetaStoreTestBase.java | 17 --- .../client/api/ClientDeduplicationFailureTest.java | 20 ++-- .../worker/PulsarFunctionE2ESecurityTest.java | 25 -- .../worker/PulsarFunctionPublishTest.java | 25 -- .../functions/worker/PulsarFunctionTlsTest.java| 8 +++- .../worker/PulsarWorkerAssignmentTest.java | 25 -- .../apache/pulsar/io/AbstractPulsarE2ETest.java| 42 + .../apache/pulsar/io/PulsarFunctionAdminTest.java | 25 -- .../apache/pulsar/io/PulsarFunctionTlsTest.java| 20 ++-- .../pulsar/zookeeper/ZookeeperServerTest.java | 13 -- .../org/apache/pulsar/metadata/TestZKServer.java | 1 + 29 files changed, 417 insertions(+), 130 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java index d6a39fadec4..f07c8f8398a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java @@ -59,6 +59,7 @@ public abstract class MultiBrokerTestZKBaseTest extends MultiBrokerBaseTest { } catch (Exception e) { log.error("Error in stopping ZK server", e); } +testZKServer = null; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java index 4a6524bf245..941229fc3d9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java @@ -126,15 +126,26 @@ public class SLAMonitoringTest { @AfterClass(alwaysRun = true) public void shutdown() throws Exception { log.info("--- Shutting down ---"); -executor.shutdownNow(); -executor = null; +if (executor != null) { +executor.shutdownNow(); +executor = null; +} for (int i = 0; i < BROKER_COUNT; i++) { -pulsarAdmins[i].close(); -pulsarServices[i].close(); +if (pulsarAdmins[i] != null) { +pulsarAdmins[i].close(); +pulsarAdmins[i] = null; +} +if (pulsarServices[i] != null) { +pulsarServices[i].close(); +pulsarServices[i] = null; +} } -bkEnsemble.stop(); +if (bkEnsemble != null) { +bkEnsemble.stop(); +bkEnsemble = null; +} } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java index ded4ee8e58d..358410f1f28 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java +++
(pulsar) 02/02: [fix][test] Clear fields in test cleanup to reduce memory consumption (#22583)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 72833feba28740b7f02e1978edae7baf36a462bf Author: Lari Hotari AuthorDate: Thu Apr 25 21:54:55 2024 +0300 [fix][test] Clear fields in test cleanup to reduce memory consumption (#22583) (cherry picked from commit 3de14c55de138770ed61d1a14cd883048ea1915c) --- .../pulsar/broker/MultiBrokerTestZKBaseTest.java | 1 + .../apache/pulsar/broker/SLAMonitoringTest.java| 21 +++-- .../loadbalance/LeaderElectionServiceTest.java | 5 +- .../broker/loadbalance/LoadBalancerTest.java | 11 - .../loadbalance/SimpleLoadManagerImplTest.java | 30 +--- .../loadbalance/extensions/BrokerRegistryTest.java | 10 +++- .../ExtensibleLoadManagerImplBaseTest.java | 8 +++- .../loadbalance/impl/BundleSplitterTaskTest.java | 10 +++- .../impl/ModularLoadManagerImplTest.java | 35 ++ .../broker/service/AdvertisedAddressTest.java | 10 +++- .../pulsar/broker/service/BkEnsemblesTestBase.java | 15 -- .../broker/service/BrokerBookieIsolationTest.java | 6 ++- .../CanReconnectZKClientPulsarServiceBaseTest.java | 24 +++--- .../pulsar/broker/service/MaxMessageSizeTest.java | 14 +- .../broker/service/OneWayReplicatorTestBase.java | 44 ++ .../pulsar/broker/service/ReplicatorTest.java | 8 ++-- .../pulsar/broker/service/ReplicatorTestBase.java | 53 -- .../pulsar/broker/service/TopicOwnerTest.java | 15 -- .../coordinator/TransactionMetaStoreTestBase.java | 17 --- .../client/api/ClientDeduplicationFailureTest.java | 20 ++-- .../worker/PulsarFunctionE2ESecurityTest.java | 25 -- .../worker/PulsarFunctionPublishTest.java | 25 -- .../functions/worker/PulsarFunctionTlsTest.java| 8 +++- .../worker/PulsarWorkerAssignmentTest.java | 25 -- .../apache/pulsar/io/AbstractPulsarE2ETest.java| 42 + .../apache/pulsar/io/PulsarFunctionAdminTest.java | 25 -- .../apache/pulsar/io/PulsarFunctionTlsTest.java| 20 ++-- .../pulsar/zookeeper/ZookeeperServerTest.java | 15 -- .../org/apache/pulsar/metadata/TestZKServer.java | 1 + 29 files changed, 414 insertions(+), 129 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java index 0cd5bce5d51..b1e608b620a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java @@ -48,6 +48,7 @@ public abstract class MultiBrokerTestZKBaseTest extends MultiBrokerBaseTest { } catch (Exception e) { log.error("Error in stopping ZK server", e); } +testZKServer = null; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java index 4a6524bf245..941229fc3d9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java @@ -126,15 +126,26 @@ public class SLAMonitoringTest { @AfterClass(alwaysRun = true) public void shutdown() throws Exception { log.info("--- Shutting down ---"); -executor.shutdownNow(); -executor = null; +if (executor != null) { +executor.shutdownNow(); +executor = null; +} for (int i = 0; i < BROKER_COUNT; i++) { -pulsarAdmins[i].close(); -pulsarServices[i].close(); +if (pulsarAdmins[i] != null) { +pulsarAdmins[i].close(); +pulsarAdmins[i] = null; +} +if (pulsarServices[i] != null) { +pulsarServices[i].close(); +pulsarServices[i] = null; +} } -bkEnsemble.stop(); +if (bkEnsemble != null) { +bkEnsemble.stop(); +bkEnsemble = null; +} } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java index ded4ee8e58d..358410f1f28 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java @@ -59,7 +59,10 @@ public class LeaderElectionServiceTest { @AfterMethod(alwaysRun = true) void shutdown() throws Exception { -
(pulsar) 01/02: [fix][test] Fix resource leak in TransactionCoordinatorClientTest (#21380)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 8ecfd64a8993e93a2ff4a45b3ca32b74f7a04997 Author: Lari Hotari AuthorDate: Tue Oct 17 21:50:16 2023 +0300 [fix][test] Fix resource leak in TransactionCoordinatorClientTest (#21380) (cherry picked from commit cb7c98a7f56c3115a9e73bff96b8b4daac2c8180) --- .../transaction/coordinator/TransactionMetaStoreTestBase.java | 6 ++ 1 file changed, 6 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java index eb714dd848a..7a0fb48f911 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java @@ -120,6 +120,9 @@ public abstract class TransactionMetaStoreTestBase extends TestRetrySupport { @Override protected void cleanup() throws Exception { +if (transactionCoordinatorClient != null) { +transactionCoordinatorClient.close(); +} for (PulsarAdmin admin : pulsarAdmins) { if (admin != null) { admin.close(); @@ -133,6 +136,9 @@ public abstract class TransactionMetaStoreTestBase extends TestRetrySupport { service.close(); } } +if (bkEnsemble != null) { +bkEnsemble.stop(); +} Mockito.reset(); } }
(pulsar) branch branch-3.0 updated (5ffec8a3cd8 -> 72833feba28)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git from 5ffec8a3cd8 [fix][admin] Fix namespace admin api exception response (#22587) new 8ecfd64a899 [fix][test] Fix resource leak in TransactionCoordinatorClientTest (#21380) new 72833feba28 [fix][test] Clear fields in test cleanup to reduce memory consumption (#22583) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../pulsar/broker/MultiBrokerTestZKBaseTest.java | 1 + .../apache/pulsar/broker/SLAMonitoringTest.java| 21 +++-- .../loadbalance/LeaderElectionServiceTest.java | 5 +- .../broker/loadbalance/LoadBalancerTest.java | 11 - .../loadbalance/SimpleLoadManagerImplTest.java | 30 +--- .../loadbalance/extensions/BrokerRegistryTest.java | 10 +++- .../ExtensibleLoadManagerImplBaseTest.java | 8 +++- .../loadbalance/impl/BundleSplitterTaskTest.java | 10 +++- .../impl/ModularLoadManagerImplTest.java | 35 ++ .../broker/service/AdvertisedAddressTest.java | 10 +++- .../pulsar/broker/service/BkEnsemblesTestBase.java | 15 -- .../broker/service/BrokerBookieIsolationTest.java | 6 ++- .../CanReconnectZKClientPulsarServiceBaseTest.java | 24 +++--- .../pulsar/broker/service/MaxMessageSizeTest.java | 14 +- .../broker/service/OneWayReplicatorTestBase.java | 44 ++ .../pulsar/broker/service/ReplicatorTest.java | 8 ++-- .../pulsar/broker/service/ReplicatorTestBase.java | 53 -- .../pulsar/broker/service/TopicOwnerTest.java | 15 -- .../coordinator/TransactionMetaStoreTestBase.java | 23 +++--- .../client/api/ClientDeduplicationFailureTest.java | 20 ++-- .../worker/PulsarFunctionE2ESecurityTest.java | 25 -- .../worker/PulsarFunctionPublishTest.java | 25 -- .../functions/worker/PulsarFunctionTlsTest.java| 8 +++- .../worker/PulsarWorkerAssignmentTest.java | 25 -- .../apache/pulsar/io/AbstractPulsarE2ETest.java| 42 + .../apache/pulsar/io/PulsarFunctionAdminTest.java | 25 -- .../apache/pulsar/io/PulsarFunctionTlsTest.java| 20 ++-- .../pulsar/zookeeper/ZookeeperServerTest.java | 15 -- .../org/apache/pulsar/metadata/TestZKServer.java | 1 + 29 files changed, 420 insertions(+), 129 deletions(-)
Re: [PR] [improve][misc] Upgrade slf4j to 2.0.13 [pulsar]
BewareMyPower commented on PR #22391: URL: https://github.com/apache/pulsar/pull/22391#issuecomment-2079249332 I tried adding the `reload4j` dependency and it didn't work. ``` [INFO] \- org.slf4j:slf4j-reload4j:jar:2.0.13:test [INFO]\- ch.qos.reload4j:reload4j:jar:1.2.22:test ``` ``` SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][misc] Upgrade slf4j to 2.0.13 [pulsar]
BewareMyPower commented on PR #22391: URL: https://github.com/apache/pulsar/pull/22391#issuecomment-2079238842 It's a breaking change that affects the down In our KSN project (the private version of [KoP](https://github.com/streamnative/kop)), after upgrading to include this commit, logs cannot be printed anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] One topic can be closed multiple times concurrently [pulsar]
lhotari commented on PR #17524: URL: https://github.com/apache/pulsar/pull/17524#issuecomment-2079199183 @poorbarcode looks like OneWayReplicatorTest.testUnFenceTopicToReuse fails -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [cleanup] [test] remove useless TestAuthorizationProvider2 (#22595)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new d19860c706e [cleanup] [test] remove useless TestAuthorizationProvider2 (#22595) d19860c706e is described below commit d19860c706e47b3f2525678da5edfad3f6adafd1 Author: thetumbled <52550727+thetumb...@users.noreply.github.com> AuthorDate: Fri Apr 26 19:21:45 2024 +0800 [cleanup] [test] remove useless TestAuthorizationProvider2 (#22595) --- .../api/AuthorizationProducerConsumerTest.java | 28 ++ 1 file changed, 2 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java index 3ead51ad7fc..2638709abc5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java @@ -350,7 +350,8 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase { } catch (Exception e) { // my-sub1 has no msg backlog, so expire message won't be issued on that subscription assertTrue(e.getMessage().startsWith("Expire message by timestamp not issued on topic")); -}sub1Admin.topics().peekMessages(topicName, subscriptionName, 1); +} +sub1Admin.topics().peekMessages(topicName, subscriptionName, 1); sub1Admin.topics().resetCursor(topicName, subscriptionName, 10); sub1Admin.topics().resetCursor(topicName, subscriptionName, MessageId.earliest); @@ -992,31 +993,6 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase { } } -/** - * This provider always fails authorization on consumer and passes on producer - * - */ -public static class TestAuthorizationProvider2 extends TestAuthorizationProvider { - -@Override -public CompletableFuture canProduceAsync(TopicName topicName, String role, -AuthenticationDataSource authenticationData) { -return CompletableFuture.completedFuture(true); -} - -@Override -public CompletableFuture canConsumeAsync(TopicName topicName, String role, -AuthenticationDataSource authenticationData, String subscription) { -return CompletableFuture.completedFuture(false); -} - -@Override -public CompletableFuture canLookupAsync(TopicName topicName, String role, -AuthenticationDataSource authenticationData) { -return CompletableFuture.completedFuture(true); -} -} - public static class TestAuthorizationProviderWithSubscriptionPrefix extends TestAuthorizationProvider { @Override public CompletableFuture allowTopicOperationAsync(TopicName topic,
Re: [PR] [cleanup] [test] remove useless TestAuthorizationProvider2 [pulsar]
lhotari merged PR #22595: URL: https://github.com/apache/pulsar/pull/22595 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [cleanup] [test] remove useless TestAuthorizationProvider2 [pulsar]
thetumbled commented on PR #22595: URL: https://github.com/apache/pulsar/pull/22595#issuecomment-2079048232 /pulsarbot rerun-failure-checks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] [Bug] parseMessageMetadata error when broker entry metadata enable with high loading [pulsar]
semistone opened a new issue, #22601: URL: https://github.com/apache/pulsar/issues/22601 ### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [X] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### Version - 3.2.2 - 3.1.2 ### Minimal reproduce step publish event in about 6k QPS and 100Mbits/sec with metaData BatcherBuilder.KEY_BASED mode and producer and send message by high concurrent/parallel producer process. it happens only in almost real time consumer (almost zero backlog) ### What did you expect to see? no lost event ### What did you see instead? could see error log in broker and show Failed to peek sticky key from the message metadata it look like thread safe issue, because it happen randomly. in 1M events, it only happen few times. ### Anything else? the error similar to https://github.com/apache/pulsar/issues/10967 but I think it's different issue. the data in bookkeeper is correct. I can download the event from bookkeeper and parse it successfully. or consume the same event few minutes later and it could consume successfully. but all subscriptions will get the same error in the same event in real time consumer(zero backlog). I have trace source code. it happens in PersistentDispatcherMultipleConsumers.readEntriesComplete -> AbstractBaseDispatcher.filterEntriesForConsumer -> Commands.peekAndCopyMessageMetadata and I also print the ByteBuf contents, it's I could clear see the data isn't the same in bookkeeper in normal event , the hex code usually start by 010e (magicCrc32c) 000 010e95295fbc0a033a0a6e697267 in one of our error event, the bytebuf have about 48 bytes strange data, then continue with normal data 000 a61002007239 <== from here 020 02001339ea17a8b08b8efa5e 040 2af02675f6451623d17edc34526def44 <=== until here is garbage 060 010e95295fbc0a033a0a6e697267 <== from here is normal data - this is just an example, but sometimes the first new bytes are correct and something wrong after first new bytes. I am still trying to debug when and how the ByteBuf returns incorrect data, and why it only happens during stress testing. It is still not easy to reproduce using the perf tool, but we can 100% reproduce it in our producer code. Does anyone have any idea what could be causing this issue, and any suggestions on which library or class may have potential issues? Additionally, any suggestions on how to debug this issue or if I need to print any specific information to help identify the root cause would be appreciated. Thank you. ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][test] Flaky-test: ManagedLedgerTest.testTimestampOnWorkingLedger [pulsar]
Technoboy- commented on code in PR #22600: URL: https://github.com/apache/pulsar/pull/22600#discussion_r1580764559 ## managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java: ## @@ -2473,19 +2473,17 @@ public void testTimestampOnWorkingLedger() throws Exception { ml.addEntry("msg02".getBytes()); +// reopen a new ml2 ml.close(); -// Thread.sleep(1000); -iter = ml.getLedgersInfoAsList().iterator(); -ts = -1; -while (iter.hasNext()) { -LedgerInfo i = iter.next(); -if (iter.hasNext()) { -assertTrue(ts <= i.getTimestamp(), i.toString()); -ts = i.getTimestamp(); -} else { -assertTrue(i.getTimestamp() > 0, "well closed LedgerInfo should set a timestamp > 0"); -} -} +ManagedLedgerImpl ml2 = (ManagedLedgerImpl) factory.open("my_test_ledger", conf); Review Comment: Need to close the `ml2` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][test] Flaky-test: ManagedLedgerTest.testTimestampOnWorkingLedger [pulsar]
codecov-commenter commented on PR #22600: URL: https://github.com/apache/pulsar/pull/22600#issuecomment-2079013686 ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/22600?dropdown=coverage=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) Report All modified and coverable lines are covered by tests :white_check_mark: > Project coverage is 73.92%. Comparing base [(`bbc6224`)](https://app.codecov.io/gh/apache/pulsar/commit/bbc62245c5ddba1de4b1e7cee4ab49334bc36277?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) to head [(`d8cdd6e`)](https://app.codecov.io/gh/apache/pulsar/pull/22600?dropdown=coverage=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache). > Report is 193 commits behind head on master. Additional details and impacted files [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/22600/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/22600?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) ```diff @@ Coverage Diff @@ ## master #22600 +/- ## + Coverage 73.57% 73.92% +0.34% + Complexity3262432606 -18 Files 1877 1885 +8 Lines139502 140524+1022 Branches 1529915452 +153 + Hits 102638 103877+1239 + Misses2890828600 -308 - Partials 7956 8047 +91 ``` | [Flag](https://app.codecov.io/gh/apache/pulsar/pull/22600/flags?src=pr=flags_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | Coverage Δ | | |---|---|---| | [inttests](https://app.codecov.io/gh/apache/pulsar/pull/22600/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `26.99% <ø> (+2.41%)` | :arrow_up: | | [systests](https://app.codecov.io/gh/apache/pulsar/pull/22600/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `24.39% <ø> (+0.07%)` | :arrow_up: | | [unittests](https://app.codecov.io/gh/apache/pulsar/pull/22600/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `73.21% <ø> (+0.37%)` | :arrow_up: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. [see 250 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/22600/indirect-changes?src=pr=tree-more_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [D] Apache Beam support for Pulsar [pulsar]
GitHub user hpvd added a comment to the discussion: Apache Beam support for Pulsar if you are interested in Beam support for Pulsar plz upvote this discussion topic to gain visibility and take a look on open issues in beam repository summarized in https://github.com/apache/beam/issues/31078 GitHub link: https://github.com/apache/pulsar/discussions/18453#discussioncomment-9234229 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [PR] [fix][broker] Continue closing even when executor is shut down [pulsar]
lhotari commented on code in PR #22599: URL: https://github.com/apache/pulsar/pull/22599#discussion_r1580671136 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -1429,7 +1430,14 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, } FutureUtil.waitForAll(futures).thenRunAsync(() -> { closeClientFuture.complete(null); -}, getOrderedExecutor()).exceptionally(ex -> { +}, command -> { +try { +getOrderedExecutor().execute(command); Review Comment: Sure, but that would require more logic. This would address the current retry loop. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Continue closing even when executor is shut down [pulsar]
poorbarcode commented on code in PR #22599: URL: https://github.com/apache/pulsar/pull/22599#discussion_r1580626116 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -1429,7 +1430,14 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, } FutureUtil.waitForAll(futures).thenRunAsync(() -> { closeClientFuture.complete(null); -}, getOrderedExecutor()).exceptionally(ex -> { +}, command -> { +try { +getOrderedExecutor().execute(command); Review Comment: This executor will only be closed when the broker is shutdown, so we can skip the closing of this topic here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][fn]make sure the classloader for ContextImpl is `functionClassLoader` in different runtimes [pulsar]
freeznet commented on PR #22501: URL: https://github.com/apache/pulsar/pull/22501#issuecomment-2078827890 @lhotari could you please help to review this PR when you have time, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [cleanup] [test] remove useless TestAuthorizationProvider2 [pulsar]
thetumbled commented on code in PR #22595: URL: https://github.com/apache/pulsar/pull/22595#discussion_r1580519788 ## pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java: ## @@ -349,7 +349,8 @@ public void testSubscriberPermission() throws Exception { } catch (Exception e) { // my-sub1 has no msg backlog, so expire message won't be issued on that subscription assertTrue(e.getMessage().startsWith("Expire message by timestamp not issued on topic")); -}sub1Admin.topics().peekMessages(topicName, subscriptionName, 1); +} +sub1Admin.topics().peekMessages(topicName, subscriptionName, 1); Review Comment: Just to correct the code format by the way, not logical change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix][test] Flaky-test: ManagedLedgerTest.testTimestampOnWorkingLedger [pulsar]
shibd opened a new pull request, #22600: URL: https://github.com/apache/pulsar/pull/22600 ### Motivation #22430 After #22034, When the ledger is closed due to full entry, will trigger create a new ledger, However, since the creation of the ledger is asynchronous, sometimes the last ledger is not created before the function returns. https://github.com/apache/pulsar/blob/57a616eaa79096af5b49db89c99cd39ccc94ec00/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1775-L1778 For this test: `ml.close();` is no make sure that the `currentLedger` switch successful, and there are two possible case: - Case1: **[Ledger01{entry1}], [Ledger02{entry1}], [Ledger03{}]** - Case2: **[Ledger01{entry1}], [Ledger02{entry1}]** **Case 1** will cause the test to fail. ### Modifications When `ml.close` to reopen a new `ManagerLedger` that will make sure into `Case1`, and then to assert timestamp of `Ledger02` than 0. ### Verifying this change - Run this test locally 1000 times always successful. ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix][broker] Continue closing even when executor is shut down [pulsar]
lhotari opened a new pull request, #22599: URL: https://github.com/apache/pulsar/pull/22599 ### Motivation ``` 2024-04-26T09:36:28,397 - ERROR - [ForkJoinPool.commonPool-worker-10:PersistentTopic] - [persistent://pulsar/global/removeClusterTest/__change_events] Error closing clients java.util.concurrent.CompletionException: java.util.concurrent.RejectedExecutionException: Executor is shutting down at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.uniRunNow(CompletableFuture.java:823) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:803) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.thenRunAsync(CompletableFuture.java:2204) ~[?:?] at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$44(PersistentTopic.java:1430) ~[classes/:?] at org.apache.pulsar.broker.resources.NamespaceResources$PartitionedTopicResources.lambda$runWithMarkDeleteAsync$9(NamespaceResources.java:359) ~[pulsar-broker-common-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) [?:?] at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) [?:?] at java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483) [?:?] at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) [?:?] at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) [?:?] at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) [?:?] at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) [?:?] at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) [?:?] Caused by: java.util.concurrent.RejectedExecutionException: Executor is shutting down at org.apache.bookkeeper.common.util.SingleThreadExecutor.execute(SingleThreadExecutor.java:208) ~[bookkeeper-common-4.17.0.jar:4.17.0] at java.base/java.util.concurrent.CompletableFuture.uniRunNow(CompletableFuture.java:817) ~[?:?] ... 12 more ``` ### Modifications - provide custom Executor that catches RejectedExecutionException and runs in current thread in that case. ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Avoid being stuck when closing the broker with extensible load manager [pulsar]
BewareMyPower commented on code in PR #22573: URL: https://github.com/apache/pulsar/pull/22573#discussion_r1580574803 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -926,9 +927,13 @@ public void unloadNamespaceBundlesGracefully() { } public void unloadNamespaceBundlesGracefully(int maxConcurrentUnload, boolean closeWithoutWaitingClientDisconnect) { +if (unloaded) { +return; +} Review Comment: It's not in a critical path. Here I use a volatile field just because I realized shutdown could also be triggered via an admin API (see `BrokersBase#shutDownBrokerGracefully`). Therefore, to avoid thread-safety issues, I used volatile here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker][WIP] Don't check replication clusters for ownership of system topics [pulsar]
heesung-sn commented on PR #22597: URL: https://github.com/apache/pulsar/pull/22597#issuecomment-2078763715 To answer your questions, Im not sure. I assume all system topics should live in local cluster only,but I could be wrong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker][WIP] Don't check replication clusters for ownership of system topics [pulsar]
heesung-sn commented on PR #22597: URL: https://github.com/apache/pulsar/pull/22597#issuecomment-2078761854 I realized that Pulsar has some loopholes that unnecessarily try to replicate, migrate or do other operations on system topics. We need to check all system topics and define their behaviors for each feature. System namespace and system topics - load report - blue-green migration - geo-replication - cross-cluster ownership check - and others I am checking the system topics from the new load balancer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Avoid being stuck when closing the broker with extensible load manager [pulsar]
lhotari commented on code in PR #22573: URL: https://github.com/apache/pulsar/pull/22573#discussion_r1580566781 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -926,9 +927,13 @@ public void unloadNamespaceBundlesGracefully() { } public void unloadNamespaceBundlesGracefully(int maxConcurrentUnload, boolean closeWithoutWaitingClientDisconnect) { +if (unloaded) { +return; +} Review Comment: is there a need to prevent multiple calls to concurrently call this method? This solution doesn't seem very effective. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker][WIP] Don't check replication clusters for ownership of system topics [pulsar]
lhotari commented on PR #22597: URL: https://github.com/apache/pulsar/pull/22597#issuecomment-2078730928 @poorbarcode @heesung-sn Do you think that relaxing the checkLocalOrGetPeerReplicationCluster checks for system topics would be useful? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix][broker][WIP] Don't check replication clusters for ownership of system topics [pulsar]
lhotari opened a new pull request, #22597: URL: https://github.com/apache/pulsar/pull/22597 WIP, not ready for review. PR opened in draft mode for initial discussion and feedback. ### Motivation When investigating another issue and reproducing the problem using these instructions, https://github.com/apache/pulsar/pull/21948#issuecomment-2078264388, I noticed this WARN log message: ``` 2024-04-26T09:34:01,535 - WARN - [pulsar-io-35-3:ServerCnx] - Failed to get Partitioned Metadata [/127.0.0.1:56820] persistent://pulsar/global/removeClusterTest/__change_events: Namespace missing local cluster name in clusters list: local_cluster=r1 ns=pulsar/global/removeClusterTest clusters=[r2, r3] org.apache.pulsar.broker.web.RestException: Namespace missing local cluster name in clusters list: local_cluster=r1 ns=pulsar/global/removeClusterTest clusters=[r2, r3] at org.apache.pulsar.broker.web.PulsarWebResource.lambda$checkLocalOrGetPeerReplicationCluster$27(PulsarWebResource.java:922) ~[classes/:?] at java.base/java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:757) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:735) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2182) ~[?:?] at org.apache.pulsar.broker.web.PulsarWebResource.lambda$checkLocalOrGetPeerReplicationCluster$29(PulsarWebResource.java:912) ~[classes/:?] at java.base/java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:757) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:735) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2182) ~[?:?] at org.apache.pulsar.broker.web.PulsarWebResource.checkLocalOrGetPeerReplicationCluster(PulsarWebResource.java:896) ~[classes/:?] at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync(PersistentTopicsBase.java:4304) ~[classes/:?] at org.apache.pulsar.broker.service.ServerCnx.lambda$handlePartitionMetadataRequest$7(ServerCnx.java:610) ~[classes/:?] at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:684) [?:?] at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662) [?:?] at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2168) [?:?] at org.apache.pulsar.broker.service.ServerCnx.handlePartitionMetadataRequest(ServerCnx.java:607) [classes/:?] at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:134) [pulsar-common-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT] ``` ### Modifications - Don't limit system topics with the `checkLocalOrGetPeerReplicationCluster` checks. ### Additional Context This is related to #20304 changes since that made some relaxation on the checks. ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [improve][broker] Propagate cause exception in TopicBusyException when applicable [pulsar]
lhotari opened a new pull request, #22596: URL: https://github.com/apache/pulsar/pull/22596 ### Motivation When topic is closing and throws an exception, the exception isn't logged and it's swallowed. It's better to propagate the cause in TopicBusyException in these causes. ### Modifications - Add support for cause exception to TopicBusyException - Propagate the cause when closing throws an exception ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [cleanup] [test] remove useless TestAuthorizationProvider2 [pulsar]
thetumbled commented on code in PR #22595: URL: https://github.com/apache/pulsar/pull/22595#discussion_r1580519788 ## pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java: ## @@ -349,7 +349,8 @@ public void testSubscriberPermission() throws Exception { } catch (Exception e) { // my-sub1 has no msg backlog, so expire message won't be issued on that subscription assertTrue(e.getMessage().startsWith("Expire message by timestamp not issued on topic")); -}sub1Admin.topics().peekMessages(topicName, subscriptionName, 1); +} +sub1Admin.topics().peekMessages(topicName, subscriptionName, 1); Review Comment: Just to correct the code format by the way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.2 updated: [improve][admin] Check if the topic existed before the permission operations (#22547)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new adac20a72ef [improve][admin] Check if the topic existed before the permission operations (#22547) adac20a72ef is described below commit adac20a72efdc2b1d9b16464ebffb569c41014e9 Author: Jiwei Guo AuthorDate: Fri Apr 26 14:05:30 2024 +0800 [improve][admin] Check if the topic existed before the permission operations (#22547) --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 9 ++--- .../pulsar/broker/admin/AdminApiSchemaWithAuthTest.java| 1 + .../java/org/apache/pulsar/broker/admin/AdminApiTest.java | 12 .../apache/pulsar/broker/admin/PersistentTopicsTest.java | 10 -- .../org/apache/pulsar/broker/auth/AuthorizationTest.java | 14 +- .../client/api/AuthenticatedProducerConsumerTest.java | 4 +++- .../client/api/AuthorizationProducerConsumerTest.java | 2 ++ .../pulsar/websocket/proxy/ProxyAuthorizationTest.java | 8 +--- 8 files changed, 46 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index b0968f494ee..4b29452f98c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -207,6 +207,7 @@ public class PersistentTopicsBase extends AdminResource { protected CompletableFuture>> internalGetPermissionsOnTopic() { // This operation should be reading from zookeeper and it should be allowed without having admin privileges return validateAdminAccessForTenantAsync(namespaceName.getTenant()) +.thenCompose(__ -> internalCheckTopicExists(topicName)) .thenCompose(__ -> getAuthorizationService().getPermissionsAsync(topicName)); } @@ -258,9 +259,10 @@ public class PersistentTopicsBase extends AdminResource { Set actions) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessForTenantAsync(namespaceName.getTenant()) -.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 -> -grantPermissionsAsync(topicName, role, actions) -.thenAccept(unused -> asyncResponse.resume(Response.noContent().build() +.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) +.thenCompose(__ -> internalCheckTopicExists(topicName)) +.thenCompose(unused1 -> grantPermissionsAsync(topicName, role, actions)) +.thenAccept(unused -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { Throwable realCause = FutureUtil.unwrapCompletionException(ex); log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicName, realCause); @@ -272,6 +274,7 @@ public class PersistentTopicsBase extends AdminResource { protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessForTenantAsync(namespaceName.getTenant()) +.thenCompose(__ -> internalCheckTopicExists(topicName)) .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, true, false) .thenCompose(metadata -> { int numPartitions = metadata.partitions; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java index e89b4ff5e83..2dcb930fbe7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java @@ -120,6 +120,7 @@ public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest { .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString()) .authentication(AuthenticationToken.class.getName(), PRODUCE_TOKEN) .build(); +admin.topics().createNonPartitionedTopic(topicName); admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume)); admin.topics().grantPermission(topicName, "producer",
Re: [PR] [cleanup] [test] remove useless TestAuthorizationProvider2 [pulsar]
thetumbled commented on code in PR #22595: URL: https://github.com/apache/pulsar/pull/22595#discussion_r1580519788 ## pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java: ## @@ -349,7 +349,8 @@ public void testSubscriberPermission() throws Exception { } catch (Exception e) { // my-sub1 has no msg backlog, so expire message won't be issued on that subscription assertTrue(e.getMessage().startsWith("Expire message by timestamp not issued on topic")); -}sub1Admin.topics().peekMessages(topicName, subscriptionName, 1); +} +sub1Admin.topics().peekMessages(topicName, subscriptionName, 1); Review Comment: Just to correct the code format. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [improve][admin] Check if the topic existed before the permission operations (#22547)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 69a600e86bb [improve][admin] Check if the topic existed before the permission operations (#22547) 69a600e86bb is described below commit 69a600e86bb5110a118d836125411e941b83764d Author: Jiwei Guo AuthorDate: Fri Apr 26 14:05:30 2024 +0800 [improve][admin] Check if the topic existed before the permission operations (#22547) --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 9 ++--- .../pulsar/broker/admin/AdminApiSchemaWithAuthTest.java | 1 + .../java/org/apache/pulsar/broker/admin/AdminApiTest.java | 12 .../apache/pulsar/broker/admin/PersistentTopicsTest.java| 10 -- .../org/apache/pulsar/broker/auth/AuthorizationTest.java| 13 - .../client/api/AuthenticatedProducerConsumerTest.java | 4 +++- .../client/api/AuthorizationProducerConsumerTest.java | 2 ++ .../pulsar/websocket/proxy/ProxyAuthorizationTest.java | 8 +--- 8 files changed, 45 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 63ea987bb07..682f41dcdb6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -205,6 +205,7 @@ public class PersistentTopicsBase extends AdminResource { protected CompletableFuture>> internalGetPermissionsOnTopic() { // This operation should be reading from zookeeper and it should be allowed without having admin privileges return validateAdminAccessForTenantAsync(namespaceName.getTenant()) +.thenCompose(__ -> internalCheckTopicExists(topicName)) .thenCompose(__ -> getAuthorizationService().getPermissionsAsync(topicName)); } @@ -256,9 +257,10 @@ public class PersistentTopicsBase extends AdminResource { Set actions) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessForTenantAsync(namespaceName.getTenant()) -.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 -> -grantPermissionsAsync(topicName, role, actions) -.thenAccept(unused -> asyncResponse.resume(Response.noContent().build() +.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) +.thenCompose(__ -> internalCheckTopicExists(topicName)) +.thenCompose(unused1 -> grantPermissionsAsync(topicName, role, actions)) +.thenAccept(unused -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { Throwable realCause = FutureUtil.unwrapCompletionException(ex); log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicName, realCause); @@ -270,6 +272,7 @@ public class PersistentTopicsBase extends AdminResource { protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessForTenantAsync(namespaceName.getTenant()) +.thenCompose(__ -> internalCheckTopicExists(topicName)) .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, true, false) .thenCompose(metadata -> { int numPartitions = metadata.partitions; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java index e89b4ff5e83..2dcb930fbe7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java @@ -120,6 +120,7 @@ public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest { .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString()) .authentication(AuthenticationToken.class.getName(), PRODUCE_TOKEN) .build(); +admin.topics().createNonPartitionedTopic(topicName); admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume)); admin.topics().grantPermission(topicName, "producer",
Re: [PR] [improve][admin] Check if the topic existed before the permission operations [pulsar]
Technoboy- merged PR #22547: URL: https://github.com/apache/pulsar/pull/22547 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] One topic can be closed multiple times concurrently [pulsar]
poorbarcode commented on PR #17524: URL: https://github.com/apache/pulsar/pull/17524#issuecomment-2078692154 > However, I think this Topic state management needs a serious refactoring. Agree with you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] One topic can be closed multiple times concurrently [pulsar]
lhotari commented on PR #17524: URL: https://github.com/apache/pulsar/pull/17524#issuecomment-2078692302 > However, I think this Topic state management needs a serious refactoring. > > I suggest defining TopicState and revisit topic state transitions in a state machine manner. @heesung-sn I agree that a state machine style would result in a more maintainable solution. We can handle that in a second step. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
poorbarcode commented on code in PR #21948: URL: https://github.com/apache/pulsar/pull/21948#discussion_r1580517607 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -3881,9 +3895,27 @@ private void fenceTopicToCloseOrDelete() { } private void unfenceTopicToResume() { -subscriptions.values().forEach(sub -> sub.resumeAfterFence()); isFenced = false; isClosingOrDeleting = false; +subscriptions.values().forEach(sub -> sub.resumeAfterFence()); +unfenceReplicatorsToResume(); +} + +private void unfenceReplicatorsToResume() { +checkReplication(); Review Comment: Agree with you, we need more than one separate PRs to merge the stats `fenced, close, deleting`, just like we discussed [here](https://github.com/apache/pulsar/pull/17524#issuecomment-1886688685) ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -3881,9 +3895,27 @@ private void fenceTopicToCloseOrDelete() { } private void unfenceTopicToResume() { -subscriptions.values().forEach(sub -> sub.resumeAfterFence()); isFenced = false; isClosingOrDeleting = false; +subscriptions.values().forEach(sub -> sub.resumeAfterFence()); +unfenceReplicatorsToResume(); +} + +private void unfenceReplicatorsToResume() { +checkReplication(); Review Comment: Agree with you, we need more than one separate PRs to merge the states `fenced, close, deleting`, just like we discussed [here](https://github.com/apache/pulsar/pull/17524#issuecomment-1886688685) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org