Re: [PR] [fix] [test] Fix flaky test ReplicatorTest [pulsar]
poorbarcode commented on PR #22594: URL: https://github.com/apache/pulsar/pull/22594#issuecomment-2078685982 > @poorbarcode I disabled one problematic test causing OOME in https://github.com/apache/pulsar/pull/22586 . Would it be fine to include the revert in this PR or is it better to handle that separately? I will review the test `testWriteMarkerTaskOfReplicateSubscriptions` that #22586 disabled later, and push another PR to fix the flaky issue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [test] Fix flaky test ReplicatorTest [pulsar]
lhotari commented on PR #22594: URL: https://github.com/apache/pulsar/pull/22594#issuecomment-2078684396 @poorbarcode I disabled one problematic test causing OOME in https://github.com/apache/pulsar/pull/22586 . Would it be fine to include the revert in this PR or is it better to handle that separately? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [test] Fix flaky test ReplicatorTest [pulsar]
poorbarcode commented on code in PR #22594: URL: https://github.com/apache/pulsar/pull/22594#discussion_r1580507648 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java: ## @@ -1028,4 +1019,43 @@ public void testPackageAPI() throws Exception { superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); } } + +@Test +@SneakyThrows +public void testOffloadThresholdInSeconds() { +final String namespace = "public/default"; +final String subject = UUID.randomUUID().toString(); +final String token = Jwts.builder() +.claim("sub", subject).signWith(SECRET_KEY).compact(); +@Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() +.serviceHttpUrl(getPulsarService().getWebServiceAddress()) +.authentication(new AuthenticationToken(token)) +.build(); +Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, +() -> subAdmin.namespaces().getOffloadThresholdInSeconds(namespace)); + +Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, +() -> subAdmin.namespaces().setOffloadThresholdInSeconds(namespace, 1)); +} + +@Test +@SneakyThrows +public void testMaxSubscriptionsPerTopic() { +final String namespace = "public/default"; +final String subject = UUID.randomUUID().toString(); +final String token = Jwts.builder() +.claim("sub", subject).signWith(SECRET_KEY).compact(); +@Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() +.serviceHttpUrl(getPulsarService().getWebServiceAddress()) +.authentication(new AuthenticationToken(token)) +.build(); +Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, +() -> subAdmin.namespaces().getMaxSubscriptionsPerTopic(namespace)); + +Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, +() -> subAdmin.namespaces().setMaxSubscriptionsPerTopic(namespace, 100)); + +Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, +() -> subAdmin.namespaces().removeMaxSubscriptionsPerTopic(namespace)); +} Review Comment: It is caused by a mistake, removed ## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java: ## @@ -2019,7 +2019,7 @@ protected void internalSetMaxUnackedMessagesPerConsumer(Integer maxUnackedMessag } protected void internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic){ -validateNamespacePolicyOperationAsync(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE); +validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE); Review Comment: It is caused by a mistake, removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker]One topic can be close multiple times concurrently [pulsar]
lhotari commented on PR #17524: URL: https://github.com/apache/pulsar/pull/17524#issuecomment-2078663063 @poorbarcode there are some test failures, do you have a chance to check? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [cleanup] [test] remove useless TestAuthorizationProvider2 [pulsar]
lhotari commented on code in PR #22595: URL: https://github.com/apache/pulsar/pull/22595#discussion_r1580497818 ## pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java: ## @@ -349,7 +349,8 @@ public void testSubscriberPermission() throws Exception { } catch (Exception e) { // my-sub1 has no msg backlog, so expire message won't be issued on that subscription assertTrue(e.getMessage().startsWith("Expire message by timestamp not issued on topic")); -}sub1Admin.topics().peekMessages(topicName, subscriptionName, 1); +} +sub1Admin.topics().peekMessages(topicName, subscriptionName, 1); Review Comment: Is this change related to this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [test] Fix flaky test ReplicatorTest [pulsar]
lhotari commented on code in PR #22594: URL: https://github.com/apache/pulsar/pull/22594#discussion_r1580497194 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java: ## @@ -1028,4 +1019,43 @@ public void testPackageAPI() throws Exception { superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); } } + +@Test +@SneakyThrows +public void testOffloadThresholdInSeconds() { +final String namespace = "public/default"; +final String subject = UUID.randomUUID().toString(); +final String token = Jwts.builder() +.claim("sub", subject).signWith(SECRET_KEY).compact(); +@Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() +.serviceHttpUrl(getPulsarService().getWebServiceAddress()) +.authentication(new AuthenticationToken(token)) +.build(); +Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, +() -> subAdmin.namespaces().getOffloadThresholdInSeconds(namespace)); + +Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, +() -> subAdmin.namespaces().setOffloadThresholdInSeconds(namespace, 1)); +} + +@Test +@SneakyThrows +public void testMaxSubscriptionsPerTopic() { +final String namespace = "public/default"; +final String subject = UUID.randomUUID().toString(); +final String token = Jwts.builder() +.claim("sub", subject).signWith(SECRET_KEY).compact(); +@Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() +.serviceHttpUrl(getPulsarService().getWebServiceAddress()) +.authentication(new AuthenticationToken(token)) +.build(); +Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, +() -> subAdmin.namespaces().getMaxSubscriptionsPerTopic(namespace)); + +Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, +() -> subAdmin.namespaces().setMaxSubscriptionsPerTopic(namespace, 100)); + +Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, +() -> subAdmin.namespaces().removeMaxSubscriptionsPerTopic(namespace)); +} Review Comment: Are these part of some other change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [test] Fix flaky test ReplicatorTest [pulsar]
lhotari commented on code in PR #22594: URL: https://github.com/apache/pulsar/pull/22594#discussion_r1580496723 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java: ## @@ -2019,7 +2019,7 @@ protected void internalSetMaxUnackedMessagesPerConsumer(Integer maxUnackedMessag } protected void internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic){ -validateNamespacePolicyOperationAsync(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE); +validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE); Review Comment: Is this change related to this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [test] Fix flaky test ReplicatorTest [pulsar]
lhotari commented on code in PR #22594: URL: https://github.com/apache/pulsar/pull/22594#discussion_r1580470740 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java: ## @@ -2019,7 +2019,7 @@ protected void internalSetMaxUnackedMessagesPerConsumer(Integer maxUnackedMessag } protected void internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic){ -validateNamespacePolicyOperationAsync(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE); +validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE); Review Comment: This seems to be a production code fix. Maybe the PR title should be related? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [test] Fix flaky test ReplicatorTest [pulsar]
lhotari commented on PR #22594: URL: https://github.com/apache/pulsar/pull/22594#issuecomment-2078613762 > Move Replicatortest.testConfigChange into ReplicatorGlobalNSDangerousOperationTest\nMove the normal test ReplicatorGlobalNSTest. testForcefullyTopicDeletion into Replicatortest Makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [test] Fix flaky test ReplicatorTest [pulsar]
lhotari commented on PR #22594: URL: https://github.com/apache/pulsar/pull/22594#issuecomment-207861 > Rename the class ReplicatorGlobalNSTest to ReplicatorGlobalNSDangerousOperationTest I think this could be handled with a comment instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
heesung-sn commented on code in PR #21948: URL: https://github.com/apache/pulsar/pull/21948#discussion_r1580453561 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -3881,9 +3895,27 @@ private void fenceTopicToCloseOrDelete() { } private void unfenceTopicToResume() { -subscriptions.values().forEach(sub -> sub.resumeAfterFence()); isFenced = false; isClosingOrDeleting = false; +subscriptions.values().forEach(sub -> sub.resumeAfterFence()); +unfenceReplicatorsToResume(); +} + +private void unfenceReplicatorsToResume() { +checkReplication(); Review Comment: Can we define enum state for topic and reject/skip any invalid state transition (fail/skip fast)? For example, delete should skip if closed, and vice-versa. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [cleanup] [test] remove useless TestAuthorizationProvider2 [pulsar]
thetumbled commented on PR #22595: URL: https://github.com/apache/pulsar/pull/22595#issuecomment-2078588760 PTAL, thanks. @Technoboy- @lhotari -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [cleanup] [test] remove useless TestAuthorizationProvider2 [pulsar]
thetumbled opened a new pull request, #22595: URL: https://github.com/apache/pulsar/pull/22595 ### Motivation `TestAuthorizationProvider2` is useless, no need to retain. ### Modifications Remove it. ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix] [test] Fix flaky test ReplicatorTest [pulsar]
poorbarcode opened a new pull request, #22594: URL: https://github.com/apache/pulsar/pull/22594 ### Motivation There are two tests that are not normal operations when using Geo-Replication with a global metadata store, which is should be denied in a production pulsar cluster. they are very dangerous, which leads to a lot of topic deletion namespace policies not correct, making other tests in the same class flaky - `Replicatortest.testConfigChange` - `ReplicatorGlobalNSTest .testRemoveLocalClusterOnGlobalNamespace` ### Modifications - Rename the class `ReplicatorGlobalNSTest` to `ReplicatorGlobalNSDangerousOperationTest` - Move `Replicatortest.testConfigChange` into `ReplicatorGlobalNSDangerousOperationTest` - Move the normal test `ReplicatorGlobalNSTest. testForcefullyTopicDeletion` into `Replicatortest` ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: x -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug] broker healthcheck ran into loop after decommissioned a cluster of bookies [pulsar]
wallacepeng closed issue #22559: [Bug] broker healthcheck ran into loop after decommissioned a cluster of bookies URL: https://github.com/apache/pulsar/issues/22559 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug] broker healthcheck ran into loop after decommissioned a cluster of bookies [pulsar]
wallacepeng commented on issue #22559: URL: https://github.com/apache/pulsar/issues/22559#issuecomment-2078544904 I finally fixed it . I have to set up another broker cluster, then did some clean up for namespace and managed-ledgers , schemas , then restore the old broker cluster, it fixed the issue . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
poorbarcode commented on code in PR #21948: URL: https://github.com/apache/pulsar/pull/21948#discussion_r1580423072 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -3881,9 +3895,27 @@ private void fenceTopicToCloseOrDelete() { } private void unfenceTopicToResume() { -subscriptions.values().forEach(sub -> sub.resumeAfterFence()); isFenced = false; isClosingOrDeleting = false; +subscriptions.values().forEach(sub -> sub.resumeAfterFence()); +unfenceReplicatorsToResume(); +} + +private void unfenceReplicatorsToResume() { +checkReplication(); Review Comment: It is caused by the method `topic.close` is a reentrant method, the issue steps are like the following - `reset namespace level policies -> checkReplication -> delete topic` - `unload namespace -> unload topic -> close topic` The method `topic.close` can be run even if the deleting task is in-progress, it is not correct, the PR https://github.com/apache/pulsar/pull/17524 will fix it, please review it again, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [D] broker healthcheck endpoint reports No such ledger exists on Metadata Server - ledger endlessly [pulsar]
GitHub user wallacepeng closed a discussion: broker healthcheck endpoint reports No such ledger exists on Metadata Server - ledger endlessly 2024-04-21T03:14:20,474+ [pulsar-io-4-7] ERROR org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck] [null] Failed to create producer: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such ledger exists on Metadata Server - ledger=3596046 - operation=Failed to open ledger caused by org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such ledger exists on Metadata Server - ledger=3596046 - operation=Failed to open ledger","reqId":3493733038286036253, "remote":"pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local/172.20.203.179:6650", "local":"/172.20.203.179:58124"} 2024-04-21 03:14:20.474 2024-04-21T03:14:20,474+ [pulsar-io-4-7] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0xdce5010a, L:/172.20.203.179:58124 - R:pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local/172.20.203.179:6650] Received error from server: org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such ledger exists on Metadata Server - ledger=3596046 - operation=Failed to open ledger caused by org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such ledger exists on Metadata Server - ledger=3596046 - operation=Failed to open ledger this is caused by that internal healthcheck cannot create producer for topic (it seems have null producerName) persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck . it reports No such ledger exists on Metadata Server . All the others work except the healthcheck . i have bookkeeper clusters, i decommission the default one then it got this error . does it still try to create ledger on decommisisoned bookie ? ``` bin/pulsar-admin topics stats-internal persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck No such ledger exists on Metadata Server Reason: No such ledger exists on Metadata Server https://github.com/apache/pulsar/assets/894641/9eb33a0f-48f4-4ff4-8f80-88ea5c7cc1d7;> correct log from other pulsar cluster looks like https://github.com/apache/pulsar/assets/894641/90c104d9-b351-4212-b26b-41dc170a18e4;> GitHub link: https://github.com/apache/pulsar/discussions/22545 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] broker healthcheck endpoint reports No such ledger exists on Metadata Server - ledger endlessly [pulsar]
GitHub user wallacepeng closed the discussion with a comment: broker healthcheck endpoint reports No such ledger exists on Metadata Server - ledger endlessly I ended up with setting up another broker cluster, then i shutdown old broker cluster, clean up the namespaces , managed-ledgers then store old cluster , decommission new broker cluster. it magically fixed . GitHub link: https://github.com/apache/pulsar/discussions/22545#discussioncomment-9231670 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [PR] [improve][test] Add policy authentication test for namespace API [pulsar]
Technoboy- closed pull request #22593: [improve][test] Add policy authentication test for namespace API URL: https://github.com/apache/pulsar/pull/22593 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker]One topic can be close multiple times concurrently [pulsar]
poorbarcode commented on code in PR #17524: URL: https://github.com/apache/pulsar/pull/17524#discussion_r1580420056 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -1595,41 +1662,50 @@ public CompletableFuture close( } } -CompletableFuture clientCloseFuture = closeWithoutWaitingClientDisconnect -? CompletableFuture.completedFuture(null) -: FutureUtil.waitForAll(futures); - -clientCloseFuture.thenRun(() -> { -// After having disconnected all producers/consumers, close the managed ledger +CompletableFuture disconnectClientsFuture = FutureUtil.waitForAll(futures); Review Comment: Since we need to initialize the variable `closeFutures.waitDisconnectClients`, the variable `disconnectClientsFuture` is needed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker]One topic can be close multiple times concurrently [pulsar]
poorbarcode commented on code in PR #17524: URL: https://github.com/apache/pulsar/pull/17524#discussion_r1580420056 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -1595,41 +1662,50 @@ public CompletableFuture close( } } -CompletableFuture clientCloseFuture = closeWithoutWaitingClientDisconnect -? CompletableFuture.completedFuture(null) -: FutureUtil.waitForAll(futures); - -clientCloseFuture.thenRun(() -> { -// After having disconnected all producers/consumers, close the managed ledger +CompletableFuture disconnectClientsFuture = FutureUtil.waitForAll(futures); Review Comment: Since we need to initialize the variable `closeFutures.waitDisconnectClients`, this feature is needed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][admin] Check if the topic existed before the permission operations [pulsar]
Technoboy- commented on PR #22547: URL: https://github.com/apache/pulsar/pull/22547#issuecomment-2078539225 > I think this should be a breaking change and we need a PIP for it. Users need to explicitly create a topic before granting the permission after this change. This doesn't seem to make sense in some cases. The admin would simply want to grant users permissions without requiring that the topic has already been created. not a break change. because new grant topic permissions need to create topic first. old ones not impact -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker]One topic can be close multiple times concurrently [pulsar]
poorbarcode commented on PR #17524: URL: https://github.com/apache/pulsar/pull/17524#issuecomment-2078537404 Rebase master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [improve][test] Add policy authentication test for namespace API [pulsar]
coderzc opened a new pull request, #22593: URL: https://github.com/apache/pulsar/pull/22593 Fixes #xyz Main Issue: #xyz PIP: #xyz ### Motivation & Modifications Add policy authentication test for namespace API. ### Verifying this change - [x] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix][test] Flaky-test: AdminApiMultiBrokersTest.testForceDeletePartitionedTopicWithSub [pulsar]
crossoverJie opened a new pull request, #22592: URL: https://github.com/apache/pulsar/pull/22592 Fixes #22428 ### Motivation ```java java.util.concurrent.ExecutionException: org.apache.pulsar.client.admin.PulsarAdminException$ConflictException: This topic already exists at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at org.apache.pulsar.broker.admin.AdminApiMultiBrokersTest.testForceDeletePartitionedTopicWithSub(AdminApiMultiBrokersTest.java:208) ``` `ConflictException` can be ignored. ### Modifications - Check again whether the topic exists. - Ignore ConflictException. ### Verifying this change - [x] Make sure that the change passes the CI checks. ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: https://github.com/crossoverJie/pulsar/pull/24 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] Sepcify the client version for the node client [pulsar-client-node]
RobertIndie opened a new issue, #379: URL: https://github.com/apache/pulsar-client-node/issues/379 As we have discussed [here](https://lists.apache.org/thread/n59k537fhthjnzkfxtc2p4zk4l0cv3mp) and [the PIP-254](https://github.com/apache/pulsar/issues/19705). The node client should specify the client version using the API `ClientBuilder.description` [exposed by the C++ client](https://github.com/apache/pulsar-client-cpp/pull/253). Otherwise, when we retrieve the client version from the topic stats, it will show like `Pulsar-CPP-vx.x.x` instead of `Pulsar-Node-vx.x.x`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
heesung-sn commented on code in PR #21948: URL: https://github.com/apache/pulsar/pull/21948#discussion_r1580239552 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -3881,9 +3895,27 @@ private void fenceTopicToCloseOrDelete() { } private void unfenceTopicToResume() { -subscriptions.values().forEach(sub -> sub.resumeAfterFence()); isFenced = false; isClosingOrDeleting = false; +subscriptions.values().forEach(sub -> sub.resumeAfterFence()); +unfenceReplicatorsToResume(); +} + +private void unfenceReplicatorsToResume() { +checkReplication(); Review Comment: I think this also means we need to think about how to skip deleting ledgers when the cursor is closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.0 updated: [fix][admin] Fix namespace admin api exception response (#22587)
This is an automated email from the ASF dual-hosted git repository. zhaocong pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 5ffec8a3cd8 [fix][admin] Fix namespace admin api exception response (#22587) 5ffec8a3cd8 is described below commit 5ffec8a3cd89325a7040cb69c8370608092e2b33 Author: Cong Zhao AuthorDate: Fri Apr 26 09:18:27 2024 +0800 [fix][admin] Fix namespace admin api exception response (#22587) (cherry picked from commit f25776d7fe6812f11b17226995d989c5a2364920) --- .../pulsar/broker/admin/impl/NamespacesBase.java | 5 ++- .../pulsar/broker/admin/NamespaceAuthZTest.java| 48 +++--- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index d3c5f681b6d..a73ef0b4400 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2042,7 +2042,7 @@ public abstract class NamespacesBase extends AdminResource { } protected void internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic){ -validateNamespacePolicyOperationAsync(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE); +validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE); validatePoliciesReadOnlyAccess(); if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) { throw new RestException(Status.PRECONDITION_FAILED, @@ -2148,9 +2148,10 @@ public abstract class NamespacesBase extends AdminResource { f.complete(null); }) .exceptionally(t -> { +Throwable cause = FutureUtil.unwrapCompletionException(t); log.error("[{}] Failed to update offloadThresholdInSeconds configuration for namespace {}", clientAppId(), namespaceName, t); -f.completeExceptionally(new RestException(t)); +f.completeExceptionally(new RestException(cause)); return null; }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java index cc905608016..bfd0dfaaffe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java @@ -20,6 +20,10 @@ package org.apache.pulsar.broker.admin; import io.jsonwebtoken.Jwts; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import lombok.Cleanup; import lombok.SneakyThrows; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -33,11 +37,6 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - @Test(groups = "broker-admin") public class NamespaceAuthZTest extends MockedPulsarStandalone { @@ -161,4 +160,43 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { } superUserAdmin.topics().delete(topic, true); } + +@Test +@SneakyThrows +public void testOffloadThresholdInSeconds() { +final String namespace = "public/default"; +final String subject = UUID.randomUUID().toString(); +final String token = Jwts.builder() +.claim("sub", subject).signWith(SECRET_KEY).compact(); +@Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() +.serviceHttpUrl(getPulsarService().getWebServiceAddress()) +.authentication(new AuthenticationToken(token)) +.build(); +Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, +() -> subAdmin.namespaces().getOffloadThresholdInSeconds(namespace)); + +Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, +() -> subAdmin.namespaces().setOffloadThresholdInSeconds(namespace, 1)); +} + +@Test +@SneakyThrows +public void testMaxSubscriptionsPerTopic() { +final String namespace = "public/default"; +final String subject = UUID.randomUUID().toString(); +final String token = Jwts.builder() +.claim("sub", subject).signWith(SECRET_KEY).compact(); +@Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() +
(pulsar) branch branch-3.2 updated: [fix][admin] Fix namespace admin api exception response (#22587)
This is an automated email from the ASF dual-hosted git repository. zhaocong pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new ea584d0b652 [fix][admin] Fix namespace admin api exception response (#22587) ea584d0b652 is described below commit ea584d0b6521d31fc5d3ac76e53d3034131a7428 Author: Cong Zhao AuthorDate: Fri Apr 26 09:18:27 2024 +0800 [fix][admin] Fix namespace admin api exception response (#22587) (cherry picked from commit f25776d7fe6812f11b17226995d989c5a2364920) --- .../pulsar/broker/admin/impl/NamespacesBase.java | 5 ++- .../pulsar/broker/admin/NamespaceAuthZTest.java| 47 -- 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 4eba341371b..53176567021 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2019,7 +2019,7 @@ public abstract class NamespacesBase extends AdminResource { } protected void internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic){ -validateNamespacePolicyOperationAsync(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE); +validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE); validatePoliciesReadOnlyAccess(); if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) { throw new RestException(Status.PRECONDITION_FAILED, @@ -2125,9 +2125,10 @@ public abstract class NamespacesBase extends AdminResource { f.complete(null); }) .exceptionally(t -> { +Throwable cause = FutureUtil.unwrapCompletionException(t); log.error("[{}] Failed to update offloadThresholdInSeconds configuration for namespace {}", clientAppId(), namespaceName, t); -f.completeExceptionally(new RestException(t)); +f.completeExceptionally(new RestException(cause)); return null; }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java index ce0b925614c..bfd0dfaaffe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java @@ -20,6 +20,10 @@ package org.apache.pulsar.broker.admin; import io.jsonwebtoken.Jwts; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import lombok.Cleanup; import lombok.SneakyThrows; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -32,10 +36,6 @@ import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.UUID; @Test(groups = "broker-admin") public class NamespaceAuthZTest extends MockedPulsarStandalone { @@ -160,4 +160,43 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { } superUserAdmin.topics().delete(topic, true); } + +@Test +@SneakyThrows +public void testOffloadThresholdInSeconds() { +final String namespace = "public/default"; +final String subject = UUID.randomUUID().toString(); +final String token = Jwts.builder() +.claim("sub", subject).signWith(SECRET_KEY).compact(); +@Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() +.serviceHttpUrl(getPulsarService().getWebServiceAddress()) +.authentication(new AuthenticationToken(token)) +.build(); +Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, +() -> subAdmin.namespaces().getOffloadThresholdInSeconds(namespace)); + +Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, +() -> subAdmin.namespaces().setOffloadThresholdInSeconds(namespace, 1)); +} + +@Test +@SneakyThrows +public void testMaxSubscriptionsPerTopic() { +final String namespace = "public/default"; +final String subject = UUID.randomUUID().toString(); +final String token = Jwts.builder() +.claim("sub", subject).signWith(SECRET_KEY).compact(); +@Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() +
Re: [PR] [fix][admin] Fix namespace admin api exception response [pulsar]
Technoboy- merged PR #22587: URL: https://github.com/apache/pulsar/pull/22587 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [fix][admin] Fix namespace admin api exception response (#22587)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new f25776d7fe6 [fix][admin] Fix namespace admin api exception response (#22587) f25776d7fe6 is described below commit f25776d7fe6812f11b17226995d989c5a2364920 Author: Cong Zhao AuthorDate: Fri Apr 26 09:18:27 2024 +0800 [fix][admin] Fix namespace admin api exception response (#22587) --- .../pulsar/broker/admin/impl/NamespacesBase.java | 5 +- .../pulsar/broker/admin/NamespaceAuthZTest.java| 60 -- 2 files changed, 48 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index bbadc7bb331..5f2dccc3e9c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2019,7 +2019,7 @@ public abstract class NamespacesBase extends AdminResource { } protected void internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic){ -validateNamespacePolicyOperationAsync(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE); +validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE); validatePoliciesReadOnlyAccess(); if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) { throw new RestException(Status.PRECONDITION_FAILED, @@ -2125,9 +2125,10 @@ public abstract class NamespacesBase extends AdminResource { f.complete(null); }) .exceptionally(t -> { +Throwable cause = FutureUtil.unwrapCompletionException(t); log.error("[{}] Failed to update offloadThresholdInSeconds configuration for namespace {}", clientAppId(), namespaceName, t); -f.completeExceptionally(new RestException(t)); +f.completeExceptionally(new RestException(cause)); return null; }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java index d5a0468f340..5358295b785 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.admin; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.deleteNamespaceWithRetry; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -58,7 +59,6 @@ import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test(groups = "broker-admin") @@ -72,8 +72,6 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { private AuthorizationService authorizationService; -private AuthorizationService orignalAuthorizationService; - private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); private static final String TENANT_ADMIN_TOKEN = Jwts.builder() .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact(); @@ -100,6 +98,9 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN)) .build(); this.pulsarClient = super.getPulsarService().getClient(); +this.authorizationService = Mockito.spy(getPulsarService().getBrokerService().getAuthorizationService()); +FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", +authorizationService, true); } @@ -115,19 +116,9 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { close(); } -@BeforeMethod -public void before() throws IllegalAccessException { -orignalAuthorizationService = getPulsarService().getBrokerService().getAuthorizationService(); -authorizationService = Mockito.spy(orignalAuthorizationService); -FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", -authorizationService, true); -} - @AfterMethod -public void after() throws IllegalAccessException, PulsarAdminException { -
Re: [PR] [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
heesung-sn commented on code in PR #21948: URL: https://github.com/apache/pulsar/pull/21948#discussion_r1580224189 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -3881,9 +3895,27 @@ private void fenceTopicToCloseOrDelete() { } private void unfenceTopicToResume() { -subscriptions.values().forEach(sub -> sub.resumeAfterFence()); isFenced = false; isClosingOrDeleting = false; +subscriptions.values().forEach(sub -> sub.resumeAfterFence()); +unfenceReplicatorsToResume(); +} + +private void unfenceReplicatorsToResume() { +checkReplication(); Review Comment: I think this can lead to a loop checkReplication -> fail to delete replicated topic -> unfenceTopicToResume ->checkReplication -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
heesung-sn commented on code in PR #21948: URL: https://github.com/apache/pulsar/pull/21948#discussion_r1580239552 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -3881,9 +3895,27 @@ private void fenceTopicToCloseOrDelete() { } private void unfenceTopicToResume() { -subscriptions.values().forEach(sub -> sub.resumeAfterFence()); isFenced = false; isClosingOrDeleting = false; +subscriptions.values().forEach(sub -> sub.resumeAfterFence()); +unfenceReplicatorsToResume(); +} + +private void unfenceReplicatorsToResume() { +checkReplication(); Review Comment: I think this also means we need to think about how to delete ledgers when the cursor is closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
heesung-sn commented on code in PR #21948: URL: https://github.com/apache/pulsar/pull/21948#discussion_r1580239552 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -3881,9 +3895,27 @@ private void fenceTopicToCloseOrDelete() { } private void unfenceTopicToResume() { -subscriptions.values().forEach(sub -> sub.resumeAfterFence()); isFenced = false; isClosingOrDeleting = false; +subscriptions.values().forEach(sub -> sub.resumeAfterFence()); +unfenceReplicatorsToResume(); +} + +private void unfenceReplicatorsToResume() { +checkReplication(); Review Comment: I think this also means we need to think about how to delete closed ledgers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
heesung-sn commented on code in PR #21948: URL: https://github.com/apache/pulsar/pull/21948#discussion_r1580226632 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -3881,9 +3895,27 @@ private void fenceTopicToCloseOrDelete() { } private void unfenceTopicToResume() { -subscriptions.values().forEach(sub -> sub.resumeAfterFence()); isFenced = false; isClosingOrDeleting = false; +subscriptions.values().forEach(sub -> sub.resumeAfterFence()); +unfenceReplicatorsToResume(); +} + +private void unfenceReplicatorsToResume() { +checkReplication(); Review Comment: ``` 2024-04-25T14:57:46,573 - INFO - [broker-topic-workers-OrderedExecutor-2-0:PersistentTopic] - Deleting topic [persistent://pulsar/global/removeClusterTest/__change_events] because local cluster is not part of global namespace repl list [r2, r3] 2024-04-25T14:57:46,573 - INFO - [ForkJoinPool.commonPool-worker-4:AbstractReplicator] - [persistent://pulsar/global/removeClusterTest/__change_events | r1-->r2] Skip current termination since other thread is doing termination, state : Terminated 2024-04-25T14:57:46,573 - INFO - [ForkJoinPool.commonPool-worker-4:AbstractReplicator] - [persistent://pulsar/global/removeClusterTest/__change_events | r1-->r3] Skip current termination since other thread is doing termination, state : Terminated 2024-04-25T14:57:46,573 - ERROR - [broker-topic-workers-OrderedExecutor-2-0:PersistentTopic] - [persistent://pulsar/global/removeClusterTest/__change_events] Error deleting topic org.apache.pulsar.broker.service.BrokerServiceException$PersistenceException: org.apache.bookkeeper.mledger.ManagedLedgerException: java.util.concurrent.CompletionException: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed at org.apache.pulsar.broker.service.persistent.PersistentTopic$6.deleteLedgerFailed(PersistentTopic.java:1496) ~[classes/:?] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$asyncDelete$33(ManagedLedgerImpl.java:2950) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357) ~[?:?] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncDelete(ManagedLedgerImpl.java:2947) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT] at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$40(PersistentTopic.java:1468) ~[classes/:?] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357) ~[?:?] at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$41(PersistentTopic.java:1462) ~[classes/:?] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357) ~[?:?] at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$42(PersistentTopic.java:1453) ~[classes/:?] at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179) ~[?:?] at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$34(PersistentTopic.java:1431) ~[classes/:?] at java.base/java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:787) ~[?:?] at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?] at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[bookkeeper-common-4.17.0.jar:4.17.0] at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) ~[bookkeeper-common-4.17.0.jar:4.17.0] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.108.Final.jar:4.1.108.Final] at
Re: [PR] [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
heesung-sn commented on code in PR #21948: URL: https://github.com/apache/pulsar/pull/21948#discussion_r1580224189 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -3881,9 +3895,27 @@ private void fenceTopicToCloseOrDelete() { } private void unfenceTopicToResume() { -subscriptions.values().forEach(sub -> sub.resumeAfterFence()); isFenced = false; isClosingOrDeleting = false; +subscriptions.values().forEach(sub -> sub.resumeAfterFence()); +unfenceReplicatorsToResume(); +} + +private void unfenceReplicatorsToResume() { +checkReplication(); Review Comment: I think this can lead to a loop checkReplication -> fail to close replicated topic -> unfenceTopicToResume ->checkReplication -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][test] Disable ExtensibleLoadManagerImpl in ReplicatorGlobalNSTest [pulsar]
lhotari closed pull request #22590: [fix][test] Disable ExtensibleLoadManagerImpl in ReplicatorGlobalNSTest URL: https://github.com/apache/pulsar/pull/22590 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
lhotari commented on PR #21948: URL: https://github.com/apache/pulsar/pull/21948#issuecomment-2078264388 @poorbarcode I can confirm that this PR is causing problems which most likely lead to OOMEs. Reported as #22588. To reproduce: ``` git clone https://github.com/apache/pulsar cd pulsar mvn -Pcore-modules,-main -T 1C clean install -DskipTests -Dspotbugs.skip=true -Dcheckstyle.skip=true mvn -DredirectTestOutputToFile=false -DtestRetryCount=0 test -pl pulsar-broker "-Dtest=org.apache.pulsar.broker.service.ReplicatorGlobalNSTest#testRemoveLocalClusterOnGlobalNamespace" ``` then checkout d475655 which is the previous commit. you can see that the behavior is very different: ``` git checkout d475655 mvn -Pcore-modules,-main -T 1C clean install -DskipTests -Dspotbugs.skip=true -Dcheckstyle.skip=true mvn -DredirectTestOutputToFile=false -DtestRetryCount=0 test -pl pulsar-broker "-Dtest=org.apache.pulsar.broker.service.ReplicatorGlobalNSTest#testRemoveLocalClusterOnGlobalNamespace" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][test] Disable ExtensibleLoadManagerImpl in ReplicatorGlobalNSTest [pulsar]
lhotari commented on PR #22590: URL: https://github.com/apache/pulsar/pull/22590#issuecomment-2078201213 ``` 2024-04-26T00:24:03,544 - ERROR - [broker-topic-workers-OrderedExecutor-6-0:PersistentTopic] - [persistent://pulsar/global/removeClusterTest/__change_events] Error deleting topic org.apache.pulsar.broker.service.BrokerServiceException$PersistenceException: org.apache.bookkeeper.mledger.ManagedLedgerException: java.util.concurrent.CompletionException: org.apach e.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed at org.apache.pulsar.broker.service.persistent.PersistentTopic$6.deleteLedgerFailed(PersistentTopic.java:1496) ~[classes/:?] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$asyncDelete$33(ManagedLedgerImpl.java:2950) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2325) ~[?:?] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncDelete(ManagedLedgerImpl.java:2947) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT] at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$40(PersistentTopic.java:1468) ~[classes/:?] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2325) ~[?:?] at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$41(PersistentTopic.java:1462) ~[classes/:?] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2325) ~[?:?] at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$42(PersistentTopic.java:1453) ~[classes/:?] at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?] at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$34(PersistentTopic.java:1431) ~[classes/:?] at java.base/java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:787) [?:?] at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) [?:?] at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) [bookkeeper-common-4.17.0.jar:4.17.0] at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) [bookkeeper-common-4.17.0.jar:4.17.0] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.108.Final.jar:4.1.108.Final] at java.base/java.lang.Thread.run(Thread.java:840) [?:?] Caused by: org.apache.bookkeeper.mledger.ManagedLedgerException: java.util.concurrent.CompletionException: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedExce ption: Cursor was already closed Caused by: java.util.concurrent.CompletionException: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1527) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.allOf(CompletableFuture.java:2419) ~[?:?] at org.apache.pulsar.common.util.FutureUtil.waitForAll(FutureUtil.java:56) ~[pulsar-common-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncTruncate(ManagedLedgerImpl.java:4341) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncDelete(ManagedLedgerImpl.java:2946) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT] ... 19 more Caused by:
Re: [PR] [fix][test] Disable ExtensibleLoadManagerImpl in ReplicatorGlobalNSTest [pulsar]
lhotari commented on PR #22590: URL: https://github.com/apache/pulsar/pull/22590#issuecomment-2078184654 The problem happens for both load balancer implementations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][test] Disable ExtensibleLoadManagerImpl in ReplicatorGlobalNSTest [pulsar]
lhotari commented on PR #22590: URL: https://github.com/apache/pulsar/pull/22590#issuecomment-2078180984 This reproduces a problem ``` mvn -DredirectTestOutputToFile=false -DtestRetryCount=0 test -pl pulsar-broker "-Dtest=org.apache.pulsar.broker.service.ReplicatorGlobalNSTest#testRemoveLocalClusterOnGlobalNamespace" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][test] Disable ExtensibleLoadManagerImpl in ReplicatorGlobalNSTest [pulsar]
lhotari commented on PR #22590: URL: https://github.com/apache/pulsar/pull/22590#issuecomment-2078175501 > `persistent://pulsar/global/removeClusterTest/__change_event` > > I am a bit surprised that we have so many events on this topic. (Are there any loop that pushes too many events there..?) Yes, one assumption is that readers/writers in TopicPoliciesService are causing the trouble. That's why I did #22589. I'll have to try running ReplicatorGlobalNSTest locally to see if the problem reproduces. I didn't try that yet since I've only been analysing the heap dumps. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][test] Disable ExtensibleLoadManagerImpl in ReplicatorGlobalNSTest [pulsar]
heesung-sn commented on PR #22590: URL: https://github.com/apache/pulsar/pull/22590#issuecomment-2078171366 `persistent://pulsar/global/removeClusterTest/__change_event` I am a bit surprised that we have so many events on this topic. (Are there any loop that pushes too many events there..?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] Close TopicPoliciesService to allow Pulsar broker graceful shutdown [pulsar]
heesung-sn commented on code in PR #22589: URL: https://github.com/apache/pulsar/pull/22589#discussion_r1580122532 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java: ## @@ -110,11 +111,13 @@ public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) { this.writerCaches = Caffeine.newBuilder() .expireAfterAccess(5, TimeUnit.MINUTES) .removalListener((namespaceName, writer, cause) -> { -((SystemTopicClient.Writer) writer).closeAsync().exceptionally(ex -> { -log.error("[{}] Close writer error.", namespaceName, ex); -return null; -}); +try { +((SystemTopicClient.Writer) writer).close(); +} catch (IOException e) { Review Comment: Maybe catching `Exception` might be safer? ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java: ## @@ -142,8 +143,26 @@ private SystemTopicClient getTransactionBufferSystemTopicClient(NamespaceName public void close() throws Exception { for (Map.Entry> entry : clients.entrySet()) { -entry.getValue().close(); +try { +entry.getValue().close(); +} catch (Exception e) { +log.error("Failed to close system topic client for namespace {}", entry.getKey(), e); +} +} +clients.clear(); +for (Map.Entry> entry : refCountedWriterMap.entrySet()) { +CompletableFuture> future = entry.getValue().getFuture(); +if (!future.isCompletedExceptionally()) { +future.thenAccept(writer -> { +try { +writer.close(); +} catch (IOException e) { Review Comment: Maybe catching `Exception` might be safer? ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java: ## @@ -740,4 +743,21 @@ protected AsyncLoadingCache } private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedTopicPoliciesService.class); + +@Override +public void close() throws Exception { +writerCaches.synchronous().invalidateAll(); +readerCaches.values().forEach(future -> { +if (future != null && !future.isCompletedExceptionally()) { +future.thenAccept(reader -> { +try { +reader.close(); +} catch (IOException e) { Review Comment: Maybe catching `Exception` might be safer? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Fix Replicated Topic unload bug when ExtensibleLoadManager is enabled [pulsar]
heesung-sn commented on code in PR #22496: URL: https://github.com/apache/pulsar/pull/22496#discussion_r1580114938 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java: ## @@ -41,6 +45,18 @@ public class ReplicatorGlobalNSTest extends ReplicatorTestBase { protected String methodName; +@DataProvider(name = "loadManagerClassName") +public static Object[][] loadManagerClassName() { +return new Object[][]{ +{ModularLoadManagerImpl.class.getName()}, +{ExtensibleLoadManagerImpl.class.getName()} Review Comment: Sure. We can comment out this line if this is problematic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][test] Disable ExtensibleLoadManagerImpl in ReplicatorGlobalNSTest [pulsar]
lhotari commented on PR #22590: URL: https://github.com/apache/pulsar/pull/22590#issuecomment-2078151573 Evidence in a heap dump: ```sql select this['arg$2.completeTopicName'], count(*) from "org.apache.pulsar.broker.resources.NamespaceResources$PartitionedTopicResources$$Lambda$2276+0x7f588cc63970" group by 1 ``` ``` EXPR$0 | EXPR$1 -- persistent://pulsar/global/removeClusterTest/__change_events| 209,380 -- ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][test] Disable ExtensibleLoadManagerImpl in ReplicatorGlobalNSTest [pulsar]
lhotari commented on PR #22590: URL: https://github.com/apache/pulsar/pull/22590#issuecomment-2078144575 @heesung-sn There's a chance that #22589 helps. It looks like cleanup for tests gets stuck in deleting the `__change_events` topic for namespaces. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Fix Replicated Topic unload bug when ExtensibleLoadManager is enabled [pulsar]
lhotari commented on code in PR #22496: URL: https://github.com/apache/pulsar/pull/22496#discussion_r1580089862 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java: ## @@ -41,6 +45,18 @@ public class ReplicatorGlobalNSTest extends ReplicatorTestBase { protected String methodName; +@DataProvider(name = "loadManagerClassName") +public static Object[][] loadManagerClassName() { +return new Object[][]{ +{ModularLoadManagerImpl.class.getName()}, +{ExtensibleLoadManagerImpl.class.getName()} Review Comment: @heesung-sn I think that this is triggering a problem in tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [improve][broker] Close TopicPoliciesService to allow Pulsar broker graceful shutdown [pulsar]
lhotari opened a new pull request, #22589: URL: https://github.com/apache/pulsar/pull/22589 ### Motivation There doesn't seem to be a proper handling for TopicPoliciesService shutdown. This would be needed to allow the Pulsar broker to shutdown gracefully. This also prevents memory leaks in tests. This problem was detected during the investigation of test OOMEs. See #22588 for details. ### Modifications - Add close method to TopicPoliciesService and implement it - Use Pulsar's executor for the AsyncLoader in TopicPoliciesService - make shutdown synchronous - Improve the close method of SystemTopicTxnBufferSnapshotService ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] [Tests][Bug] There are multiple memory leaks that cause Pulsar CI to fail with OOME [pulsar]
lhotari opened a new issue, #22588: URL: https://github.com/apache/pulsar/issues/22588 ### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [X] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### Version master ### Minimal reproduce step Run Pulsar CI master branch build manually. Usually Unit test group 1 and client impl tests fail. Example https://github.com/apache/pulsar/actions/runs/8835483969/attempts/1 ### What did you expect to see? Tests should have memory leaks ### What did you see instead? There are major memory leaks in test execution. ### Anything else? _No response_ ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [fix][test] Clear fields in test cleanup to reduce memory consumption (#22583)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 3de14c55de1 [fix][test] Clear fields in test cleanup to reduce memory consumption (#22583) 3de14c55de1 is described below commit 3de14c55de138770ed61d1a14cd883048ea1915c Author: Lari Hotari AuthorDate: Thu Apr 25 21:54:55 2024 +0300 [fix][test] Clear fields in test cleanup to reduce memory consumption (#22583) --- .../pulsar/broker/MultiBrokerTestZKBaseTest.java | 1 + .../apache/pulsar/broker/SLAMonitoringTest.java| 21 +++-- .../loadbalance/LeaderElectionServiceTest.java | 5 +- .../broker/loadbalance/LoadBalancerTest.java | 15 -- .../loadbalance/SimpleLoadManagerImplTest.java | 30 +--- .../loadbalance/extensions/BrokerRegistryTest.java | 10 +++- .../ExtensibleLoadManagerImplBaseTest.java | 9 +++- .../loadbalance/impl/BundleSplitterTaskTest.java | 10 +++- .../impl/ModularLoadManagerImplTest.java | 35 ++ .../broker/service/AdvertisedAddressTest.java | 10 +++- .../pulsar/broker/service/BkEnsemblesTestBase.java | 15 -- .../broker/service/BrokerBookieIsolationTest.java | 6 ++- .../CanReconnectZKClientPulsarServiceBaseTest.java | 24 +++--- .../pulsar/broker/service/MaxMessageSizeTest.java | 15 -- .../broker/service/OneWayReplicatorTestBase.java | 44 ++ .../pulsar/broker/service/ReplicatorTest.java | 8 ++-- .../pulsar/broker/service/ReplicatorTestBase.java | 53 -- .../pulsar/broker/service/TopicOwnerTest.java | 15 -- .../coordinator/TransactionMetaStoreTestBase.java | 17 --- .../client/api/ClientDeduplicationFailureTest.java | 20 ++-- .../worker/PulsarFunctionE2ESecurityTest.java | 25 -- .../worker/PulsarFunctionPublishTest.java | 25 -- .../functions/worker/PulsarFunctionTlsTest.java| 8 +++- .../worker/PulsarWorkerAssignmentTest.java | 25 -- .../apache/pulsar/io/AbstractPulsarE2ETest.java| 42 + .../apache/pulsar/io/PulsarFunctionAdminTest.java | 25 -- .../apache/pulsar/io/PulsarFunctionTlsTest.java| 20 ++-- .../pulsar/zookeeper/ZookeeperServerTest.java | 13 -- .../org/apache/pulsar/metadata/TestZKServer.java | 1 + 29 files changed, 417 insertions(+), 130 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java index e5b80c0af33..a78254df4aa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerTestZKBaseTest.java @@ -59,6 +59,7 @@ public abstract class MultiBrokerTestZKBaseTest extends MultiBrokerBaseTest { } catch (Exception e) { log.error("Error in stopping ZK server", e); } +testZKServer = null; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java index 4a6524bf245..941229fc3d9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java @@ -126,15 +126,26 @@ public class SLAMonitoringTest { @AfterClass(alwaysRun = true) public void shutdown() throws Exception { log.info("--- Shutting down ---"); -executor.shutdownNow(); -executor = null; +if (executor != null) { +executor.shutdownNow(); +executor = null; +} for (int i = 0; i < BROKER_COUNT; i++) { -pulsarAdmins[i].close(); -pulsarServices[i].close(); +if (pulsarAdmins[i] != null) { +pulsarAdmins[i].close(); +pulsarAdmins[i] = null; +} +if (pulsarServices[i] != null) { +pulsarServices[i].close(); +pulsarServices[i] = null; +} } -bkEnsemble.stop(); +if (bkEnsemble != null) { +bkEnsemble.stop(); +bkEnsemble = null; +} } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java index ded4ee8e58d..358410f1f28 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java @@ -59,7 +59,10
Re: [PR] [fix][test] Clear fields in test cleanup to reduce memory consumption [pulsar]
lhotari merged PR #22583: URL: https://github.com/apache/pulsar/pull/22583 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][ci] Disable test that causes OOME until the problem has been resolved [pulsar]
lhotari commented on PR #22586: URL: https://github.com/apache/pulsar/pull/22586#issuecomment-2077963851 ```sql select toString(this['result.ex.detailMessage']), count(*) from java.util.concurrent.CompletableFuture where this['result.ex.detailMessage'] is not null group by 1 order by 2 desc ``` ``` EXPR$0 | EXPR$1 -- org.apache.pulsar.metadata.api.MetadataStoreException$NotFoundException: | 209,380 Lock was not in valid state: Releasing | 23 BookKeeper client is closed | 15 org.apache.bookkeeper.mledger.ManagedLedgerException: java.util.concurrent.CompletionException: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed| 2 Failed to close clients before deleting topic. | 1 Total: 5 entries | 209,421 -- ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][ci] Disable test that causes OOME until the problem has been resolved [pulsar]
lhotari commented on PR #22586: URL: https://github.com/apache/pulsar/pull/22586#issuecomment-2077871792 There are more problems. Using heap dump from https://github.com/apache/pulsar/actions/runs/8835173621/attempts/1?pr=22583 ```sql select toString(this['stack.fn.arg$1']), count(*) from java.util.concurrent.CompletableFuture where this['stack.fn'] is not null group by 1 order by 2 desc ``` ``` EXPR$0 | EXPR$1 - org.apache.pulsar.broker.service.persistent.SystemTopic @ 0x100031c173f8 | 435,961 org.apache.pulsar.broker.resources.NamespaceResources$PartitionedTopicResources @ 0x10002b285920 | 56,647 | 144 org.apache.pulsar.metadata.bookkeeper.PulsarRegistrationClient @ 0x1a69cfe0| 17 org.apache.pulsar.metadata.bookkeeper.PulsarRegistrationClient @ 0x100011873a80| 17 org.apache.pulsar.metadata.bookkeeper.PulsarRegistrationClient @ 0x17eb5af8| 13 org.apache.pulsar.metadata.bookkeeper.PulsarRegistrationClient @ 0x100018f4fd48| 13 org.apache.pulsar.metadata.bookkeeper.PulsarRegistrationClient @ 0x10002b0e1900| 12 org.apache.pulsar.metadata.bookkeeper.PulsarRegistrationClient @ 0x10001c039270| 12 org.apache.pulsar.metadata.bookkeeper.PulsarRegistrationClient @ 0x10003be7e448| 11 org.apache.pulsar.metadata.bookkeeper.PulsarRegistrationClient @ 0x10003bfa8e60| 11 org.apache.pulsar.metadata.bookkeeper.PulsarRegistrationClient @ 0x1c4b8f78| 9 org.apache.pulsar.metadata.bookkeeper.PulsarRegistrationClient @ 0x1c439b80| 9 org.apache.pulsar.client.impl.BinaryProtoLookupService @ 0x10002d78f820 | 7 org.apache.pulsar.broker.service.persistent.PersistentTopic @ 0x1000281e0380 | 6 org.apache.pulsar.client.impl.PulsarClientImpl @ 0x10002d78dd90 | 5 org.apache.pulsar.client.impl.ConnectionHandler @ 0x10002d60cfc8 | 4 org.apache.pulsar.client.impl.ConnectionHandler @ 0x10002d619f28 | 4 org.apache.pulsar.client.impl.ConnectionHandler @ 0x100024745fc8 | 4 ASSIGN | 4 org.apache.pulsar.client.impl.ConnectionHandler @ 0x10002a0f8390 | 4 org.apache.pulsar.client.impl.ConnectionHandler @ 0x10002d6132f8 | 4 org.apache.pulsar.client.impl.ConnectionHandler @ 0x10002d796090 | 4 org.apache.pulsar.broker.namespace.OwnershipCache$OwnedServiceUnitCacheLoader @ 0x10001c0314e8 | 3 org.apache.pulsar.broker.namespace.OwnershipCache$OwnedServiceUnitCacheLoader @ 0x10003be76ef8 | 2 org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator @ 0x1003f30fc0c8 | 2 org.apache.pulsar.metadata.impl.ZKMetadataStore @ 0x1003ef17fef0 | 2 org.apache.pulsar.client.impl.TableViewImpl @ 0x10002725bb30 | 2 org.apache.pulsar.client.impl.TableViewImpl @ 0x10002a1bf700 | 2 org.apache.pulsar.client.impl.TableViewImpl @ 0x10002d6684a0 | 2 org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService @ 0x1000182bb060 | 2 org.apache.pulsar.broker.systopic.TopicPoliciesSystemTopicClient @ 0x1003f0a66de0 | 2 org.apache.pulsar.broker.namespace.OwnershipCache$OwnedServiceUnitCacheLoader @ 0x1c5040b0 | 2 org.apache.pulsar.broker.namespace.OwnershipCache$OwnedServiceUnitCacheLoader @ 0x100018f47fb0 | 2 java.util.concurrent.CompletableFuture @ 0x1003ed849b98
Re: [PR] [improve][broker] perf: Reduce stickyHash calculations of non-persistent topics in SHARED subscriptions [pulsar]
dlg99 commented on code in PR #22536: URL: https://github.com/apache/pulsar/pull/22536#discussion_r1579869175 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java: ## @@ -139,28 +147,38 @@ public void sendMessages(List entries) { final Map> groupedEntries = localGroupedEntries.get(); groupedEntries.clear(); +final Map> consumerStickyKeyHashesMap = localGroupedStickyKeyHashes.get(); +consumerStickyKeyHashesMap.clear(); for (Entry entry : entries) { -Consumer consumer = selector.select(peekStickyKey(entry.getDataBuffer())); +byte[] stickyKey = peekStickyKey(entry.getDataBuffer()); +int stickyKeyHash = StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey); + +Consumer consumer = selector.select(stickyKeyHash); if (consumer != null) { -groupedEntries.computeIfAbsent(consumer, k -> new ArrayList<>()).add(entry); +int startingSize = Math.max(10, entries.size() / (2 * consumerSet.size())); Review Comment: guesstimated size of the array, to reduce ArrayList's resizings -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][sec] Align some namespace level policy authorisation check [pulsar]
Technoboy- commented on PR #21640: URL: https://github.com/apache/pulsar/pull/21640#issuecomment-2077614440 > @Technoboy- please tag the release in this PR to track the cherry-picking. Just wondering if this should go to 3.1.x and 3.0.x too if it's going to 2.11.x . Oh, sorry, miss this comment. I have cherry-picked to branch-3.2 and branch-3.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.0 updated: [improve][sec] Align some namespace level policy authorisation check (#21640)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 7fdfbf46f9b [improve][sec] Align some namespace level policy authorisation check (#21640) 7fdfbf46f9b is described below commit 7fdfbf46f9b01e55cac270782b57008ec02eb5b2 Author: Qiang Zhao AuthorDate: Mon Dec 4 22:15:19 2023 +0800 [improve][sec] Align some namespace level policy authorisation check (#21640) --- .../pulsar/broker/admin/impl/NamespacesBase.java | 30 +- .../apache/pulsar/broker/admin/v2/Namespaces.java | 3 ++- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 04ec944aab4..d3c5f681b6d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1204,7 +1204,8 @@ public abstract class NamespacesBase extends AdminResource { protected CompletableFuture internalSetPublishRateAsync(PublishRate maxPublishMessageRate) { log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate); -return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { +return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.WRITE) +.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { policies.publishMaxMessageRate.put(pulsar().getConfiguration().getClusterName(), maxPublishMessageRate); log.info("[{}] Successfully updated the publish_max_message_rate for cluster on namespace {}", clientAppId(), namespaceName); @@ -1233,7 +1234,8 @@ public abstract class NamespacesBase extends AdminResource { protected CompletableFuture internalRemovePublishRateAsync() { log.info("[{}] Remove namespace publish-rate {}/{}", clientAppId(), namespaceName, topicName); -return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { +return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.WRITE) +.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { if (policies.publishMaxMessageRate != null) { policies.publishMaxMessageRate.remove(pulsar().getConfiguration().getClusterName()); } @@ -1253,7 +1255,8 @@ public abstract class NamespacesBase extends AdminResource { @SuppressWarnings("deprecation") protected CompletableFuture internalSetTopicDispatchRateAsync(DispatchRateImpl dispatchRate) { log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate); -return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { +return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.WRITE) +.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { policies.topicDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate); policies.clusterDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate); log.info("[{}] Successfully updated the dispatchRate for cluster on namespace {}", clientAppId(), @@ -1263,7 +1266,8 @@ public abstract class NamespacesBase extends AdminResource { } protected CompletableFuture internalDeleteTopicDispatchRateAsync() { -return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { +return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.WRITE) +.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { policies.topicDispatchRate.remove(pulsar().getConfiguration().getClusterName()); policies.clusterDispatchRate.remove(pulsar().getConfiguration().getClusterName()); log.info("[{}] Successfully delete the dispatchRate for cluster on namespace {}", clientAppId(), @@ -1280,7 +1284,7 @@ public abstract class NamespacesBase extends AdminResource { } protected CompletableFuture internalSetSubscriptionDispatchRateAsync(DispatchRateImpl dispatchRate) { -return validateSuperUserAccessAsync() +return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.WRITE) .thenCompose(__ -> updatePoliciesAsync(namespaceName,
(pulsar) branch master updated: [improve][ci] Disable test that causes OOME until the problem has been resolved (#22586)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 6a9423156f3 [improve][ci] Disable test that causes OOME until the problem has been resolved (#22586) 6a9423156f3 is described below commit 6a9423156f35fe1c2fedab94028f337276f5e7a3 Author: Lari Hotari AuthorDate: Thu Apr 25 18:38:31 2024 +0300 [improve][ci] Disable test that causes OOME until the problem has been resolved (#22586) --- .../org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java index 4cc3a9ada7d..8aeb902211d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java @@ -757,7 +757,8 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase { * similar to step 1. * */ -@Test(dataProvider = "isTopicPolicyEnabled") +// TODO: this test causes OOME in the CI, need to investigate +@Test(dataProvider = "isTopicPolicyEnabled", enabled = false) public void testWriteMarkerTaskOfReplicateSubscriptions(boolean isTopicPolicyEnabled) throws Exception { // 1. Prepare resource and use proper configuration. String namespace = BrokerTestUtil.newUniqueName("pulsar/testReplicateSubBackLog");
Re: [PR] [improve][ci] Disable test that causes OOME until the problem has been resolved [pulsar]
lhotari merged PR #22586: URL: https://github.com/apache/pulsar/pull/22586 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][ci] Disable test that causes OOME until the problem has been resolved [pulsar]
lhotari commented on PR #22586: URL: https://github.com/apache/pulsar/pull/22586#issuecomment-2077595847 the namespace deletion in the test might be the code that triggers the problem: https://github.com/apache/pulsar/blob/e81a20d667aef7c0f888e88dbcf972196012ebea/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java#L821-L824 @poorbarcode do you have a chance to debug this issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][ci] Disable test that causes OOME until the problem has been resolved [pulsar]
lhotari commented on PR #22586: URL: https://github.com/apache/pulsar/pull/22586#issuecomment-2077575391 Just wondering if the problem is somehow related to namespace deletion with replication enabled. The namespace deletion code is something that will need to be refactored in any case to get the concurrency into control. https://github.com/apache/pulsar/blob/d7d54522933b63f6a74ec7139c6dedebe8ad9149/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java#L216-L344 The concurrency issue is explained in https://github.com/apache/pulsar/pull/22541#issuecomment-2071568113 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][admin] Fix namespace admin api exception response [pulsar]
coderzc commented on code in PR #22587: URL: https://github.com/apache/pulsar/pull/22587#discussion_r1579703993 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java: ## @@ -2125,9 +2125,10 @@ protected CompletableFuture internalSetOffloadThresholdInSecondsAsync(long f.complete(null); }) .exceptionally(t -> { +Throwable cause = t.getCause(); Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][admin] Fix namespace admin api exception response [pulsar]
Technoboy- commented on code in PR #22587: URL: https://github.com/apache/pulsar/pull/22587#discussion_r1579699553 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java: ## @@ -2125,9 +2125,10 @@ protected CompletableFuture internalSetOffloadThresholdInSecondsAsync(long f.complete(null); }) .exceptionally(t -> { +Throwable cause = t.getCause(); Review Comment: Also, we can make `internalSetMaxSubscriptionsPerTopic` async -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][admin] Fix namespace admin api exception response [pulsar]
Technoboy- commented on code in PR #22587: URL: https://github.com/apache/pulsar/pull/22587#discussion_r1579699553 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java: ## @@ -2125,9 +2125,10 @@ protected CompletableFuture internalSetOffloadThresholdInSecondsAsync(long f.complete(null); }) .exceptionally(t -> { +Throwable cause = t.getCause(); Review Comment: Also, we can make this method async -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][admin] Fix namespace admin api exception response [pulsar]
Technoboy- commented on code in PR #22587: URL: https://github.com/apache/pulsar/pull/22587#discussion_r1579694026 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java: ## @@ -2125,9 +2125,10 @@ protected CompletableFuture internalSetOffloadThresholdInSecondsAsync(long f.complete(null); }) .exceptionally(t -> { +Throwable cause = t.getCause(); Review Comment: FutureUtil.unwrapCompletionException -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix][admin] Fix namespace admin api exception response [pulsar]
coderzc opened a new pull request, #22587: URL: https://github.com/apache/pulsar/pull/22587 ### Motivation Namespace admin API should return `NotAuthorizedException` when verification of permissions failed, but `SetOffloadThresholdInSeconds` return `ServerSideErrorException`. ### Modifications * Return correct exception response. * Using `validateNamespacePolicyOperation` instead of `validateNamespacePolicyOperationAsync` to avoid exceptions being ignored ### Verifying this change - [x] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][ci] Disable test that causes OOME until the problem has been resolved [pulsar]
lhotari commented on PR #22586: URL: https://github.com/apache/pulsar/pull/22586#issuecomment-2077437609 There are a few recent replicator related changes #21946, #21948 and #22537 . @poorbarcode please check if one of the changes is triggering the OOME issue possibly related to deletion. There are a lot of entries for `__change_events` topic for the replicated namespace. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
lhotari commented on PR #21948: URL: https://github.com/apache/pulsar/pull/21948#issuecomment-2077417174 @poorbarcode I wonder if these changes somehow cause OOME problems with org.apache.pulsar.broker.service.ReplicatorSubscriptionTest#testWriteMarkerTaskOfReplicateSubscriptions, please see #21948 . Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] PostgreSQL sink connector doesn't persist message to table [pulsar]
alexandrebrilhante commented on issue #22543: URL: https://github.com/apache/pulsar/issues/22543#issuecomment-2077396425 Looks I was trying to persist `{"id" 1, "name" "abcdefg"}` as opposed to just `{"name" "abcdefg"}` as expected by the sink connector. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] PostgreSQL sink connector doesn't persist message to table [pulsar]
alexandrebrilhante closed issue #22543: PostgreSQL sink connector doesn't persist message to table URL: https://github.com/apache/pulsar/issues/22543 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][ci] Disable test that causes OOME until the problem has been resolved [pulsar]
lhotari commented on PR #22586: URL: https://github.com/apache/pulsar/pull/22586#issuecomment-2077363787 In another heapdump ```sql select this['arg$2.completeTopicName'], count(*) from "org.apache.pulsar.broker.resources.NamespaceResources$PartitionedTopicResources$$Lambda$3405+0x7fae50f7b000" group by 1 ``` ``` EXPR$0 | EXPR$1 -- persistent://pulsar/testReplicateSubBackLog-acf0e69a-2836-430c-bf1e-c9babd059179/replication-disable| 4 persistent://pulsar/testReplicateSubBackLog-acf0e69a-2836-430c-bf1e-c9babd059179/__change_events | 277,899 Total: 2 entries | 277,903 -- ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][ci] Disable test that causes OOME until the problem has been resolved [pulsar]
lhotari commented on PR #22586: URL: https://github.com/apache/pulsar/pull/22586#issuecomment-2077351106 In one of the heap dumps, there was 251,029 lambdas which all reference a `__change_events` topic. ```sql select this['arg$2.completeTopicName'], count(*) from "org.apache.pulsar.broker.resources.NamespaceResources$PartitionedTopicResources$$Lambda$1819+0x7f08a8b65ee8" group by 1 ``` ``` EXPR$0 | EXPR$1 -- persistent://pulsar/testReplicateSubBackLog-88c7c05a-0f07-4ed2-b82a-9b911afad922/__change_events| 251,029 -- ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][meta] Fix invalid use of drain API and race condition in closing metadata store [pulsar]
lhotari commented on PR #22585: URL: https://github.com/apache/pulsar/pull/22585#issuecomment-2077333078 It seems that the OOME is another issue. https://github.com/apache/pulsar/pull/22586 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [improve][ci] Disable test that causes OOME until the problem has been resolved [pulsar]
lhotari opened a new pull request, #22586: URL: https://github.com/apache/pulsar/pull/22586 ### Motivation Unit test group 1 fails often with OOME. ### Modifications The issue is most like related to https://github.com/apache/pulsar/pull/21495 and org.apache.pulsar.broker.service.ReplicatorSubscriptionTest#testWriteMarkerTaskOfReplicateSubscriptions . Disable the test until the problem has been resolved. ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] PostgreSQL sink connector doesn't persist message to table [pulsar]
alexandrebrilhante commented on issue #22543: URL: https://github.com/apache/pulsar/issues/22543#issuecomment-2077310495 Seems to be related to the schema Postgres is using which is weird I'm using the same schema as the getting started example for Pulsar IO. ```log org.postgresql.util.PSQLException: ERROR: null value in column "id" of relation "pulsar_postgres_jdbc_sink" violates not-null constraint Detail: Failing row contains (null, null). at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2676) ~[postgresql-42.5.1.jar:42.5.1] at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2366) ~[postgresql-42.5.1.jar:42.5.1] at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:356) ~[postgresql-42.5.1.jar:42.5.1] at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:496) ~[postgresql-42.5.1.jar:42.5.1] at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:413) ~[postgresql-42.5.1.jar:42.5.1] at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:190) ~[postgresql-42.5.1.jar:42.5.1] at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:177) ~[postgresql-42.5.1.jar:42.5.1] at org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:289) ~[pulsar-io-jdbc-core-3.2.2.jar:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] at java.lang.Thread.run(Thread.java:840) ~[?:?] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]
shibd commented on code in PR #22572: URL: https://github.com/apache/pulsar/pull/22572#discussion_r1579492679 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -3602,6 +3603,15 @@ public Position getLastPosition() { return ledger.getLastConfirmedEntry(); } +@Override +public CompletableFuture getLastCanDispatchPosition() { Review Comment: Thanks for sharing your idea. I still like to let `TranactionBuffer` has complete logic about `getLastCanDispatchPosition`, like this PR: ```java class TopicTransactionBuffer { @Override public CompletableFuture getLastDispatchablePosition() { PositionImpl tnxMaxReadPosition = getMaxReadPosition(); if (tnxMaxReadPosition.compareTo((PositionImpl) topic.getLastPosition()) == 0) { return topic.getLastDispatchablePosition(); } else { return CompletableFuture.completedFuture(tnxMaxReadPosition); } } } ``` Let's hear what others think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-dotpulsar) branch master updated: Updated NuGet package
This is an automated email from the ASF dual-hosted git repository. blankensteiner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git The following commit(s) were added to refs/heads/master by this push: new a3d6825 Updated NuGet package a3d6825 is described below commit a3d6825d4fc75a98e713422a87ff6e6673ba028b Author: Daniel Blankensteiner AuthorDate: Thu Apr 25 15:40:21 2024 +0200 Updated NuGet package --- benchmarks/Compression/Compression.csproj| 2 +- tests/DotPulsar.Tests/DotPulsar.Tests.csproj | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmarks/Compression/Compression.csproj b/benchmarks/Compression/Compression.csproj index f7d9f27..b8b32bf 100644 --- a/benchmarks/Compression/Compression.csproj +++ b/benchmarks/Compression/Compression.csproj @@ -18,7 +18,7 @@ - + diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj index edc8c2b..c222a7c 100644 --- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj +++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj @@ -36,7 +36,7 @@ runtime; build; native; contentfiles; analyzers; buildtransitive - +
(pulsar) branch branch-3.2 updated: [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression (#22576)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new c6e58b0f0eb [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression (#22576) c6e58b0f0eb is described below commit c6e58b0f0ebff6441c3c14a7250180a10d46dc23 Author: Lari Hotari AuthorDate: Thu Apr 25 14:12:33 2024 +0300 [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression (#22576) (cherry picked from commit 997c8b95e1798cee08c56d92b77eb70056dfca8f) --- .../prometheus/PrometheusMetricsGenerator.java | 74 +-- .../prometheus/PrometheusMetricsGeneratorTest.java | 85 ++ 2 files changed, 138 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 8cd68caf1ee..6b4d08c359d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; import io.prometheus.client.Collector; import java.io.BufferedOutputStream; import java.io.IOException; @@ -191,8 +192,8 @@ public class PrometheusMetricsGenerator implements AutoCloseable { crc = new CRC32(); this.bufferSize = Math.max(Math.min(resolveChunkSize(bufAllocator), readableBytes), 8192); this.bufAllocator = bufAllocator; -this.resultBuffer = bufAllocator.compositeDirectBuffer(readableBytes / bufferSize + 1); -allocateBuffer(); +this.resultBuffer = bufAllocator.compositeDirectBuffer(readableBytes / bufferSize + 2); +allocateCompressBuffer(); } /** @@ -217,37 +218,66 @@ public class PrometheusMetricsGenerator implements AutoCloseable { // write gzip header compressBuffer.put(GZIP_HEADER); } +// update the CRC32 checksum calculation nioBuffer.mark(); crc.update(nioBuffer); nioBuffer.reset(); +// pass the input buffer to the deflater deflater.setInput(nioBuffer); +// when the input buffer is the last one, set the flag to finish the deflater if (isLast) { deflater.finish(); } -while (!deflater.needsInput() && !deflater.finished()) { -int written = deflater.deflate(compressBuffer); -if (written == 0 && !compressBuffer.hasRemaining()) { -backingCompressBuffer.setIndex(0, compressBuffer.position()); -resultBuffer.addComponent(true, backingCompressBuffer); -allocateBuffer(); +int written = -1; +// the deflater may need multiple calls to deflate the input buffer +// the completion is checked by the deflater.needsInput() method for buffers that aren't the last buffer +// for the last buffer, the completion is checked by the deflater.finished() method +while (!isLast && !deflater.needsInput() || isLast && !deflater.finished()) { +// when the previous deflater.deflate call returns 0 (and needsInput/finished returns false), +// it means that the output buffer is full. +// append the compressed buffer to the result buffer and allocate a new buffer. +if (written == 0) { +if (compressBuffer.position() > 0) { +appendCompressBufferToResultBuffer(); +allocateCompressBuffer(); +} else { +// this is an unexpected case, throw an exception to prevent an infinite loop +throw new IllegalStateException( +"Deflater didn't write any bytes while the compress buffer is empty."); +} } +written = deflater.deflate(compressBuffer); } if (isLast) { -// write gzip footer, integer values are in little endian byte order -compressBuffer.order(ByteOrder.LITTLE_ENDIAN); -// write CRC32 checksum -compressBuffer.putInt((int) crc.getValue()); -// write uncompressed size -
(pulsar) branch branch-2.11 updated: [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression (#22576)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 0ef7bd2dcb3 [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression (#22576) 0ef7bd2dcb3 is described below commit 0ef7bd2dcb31dd67bb07916457323592f4bd4a5b Author: Lari Hotari AuthorDate: Thu Apr 25 14:12:33 2024 +0300 [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression (#22576) (cherry picked from commit 997c8b95e1798cee08c56d92b77eb70056dfca8f) --- .../prometheus/PrometheusMetricsGenerator.java | 74 +-- .../prometheus/PrometheusMetricsGeneratorTest.java | 85 ++ 2 files changed, 138 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 146209d0035..fe1dfb32c4f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; import io.prometheus.client.Collector; import java.io.BufferedOutputStream; import java.io.IOException; @@ -192,8 +193,8 @@ public class PrometheusMetricsGenerator implements AutoCloseable { crc = new CRC32(); this.bufferSize = Math.max(Math.min(resolveChunkSize(bufAllocator), readableBytes), 8192); this.bufAllocator = bufAllocator; -this.resultBuffer = bufAllocator.compositeDirectBuffer(readableBytes / bufferSize + 1); -allocateBuffer(); +this.resultBuffer = bufAllocator.compositeDirectBuffer(readableBytes / bufferSize + 2); +allocateCompressBuffer(); } /** @@ -218,37 +219,66 @@ public class PrometheusMetricsGenerator implements AutoCloseable { // write gzip header compressBuffer.put(GZIP_HEADER); } +// update the CRC32 checksum calculation nioBuffer.mark(); crc.update(nioBuffer); nioBuffer.reset(); +// pass the input buffer to the deflater deflater.setInput(nioBuffer); +// when the input buffer is the last one, set the flag to finish the deflater if (isLast) { deflater.finish(); } -while (!deflater.needsInput() && !deflater.finished()) { -int written = deflater.deflate(compressBuffer); -if (written == 0 && !compressBuffer.hasRemaining()) { -backingCompressBuffer.setIndex(0, compressBuffer.position()); -resultBuffer.addComponent(true, backingCompressBuffer); -allocateBuffer(); +int written = -1; +// the deflater may need multiple calls to deflate the input buffer +// the completion is checked by the deflater.needsInput() method for buffers that aren't the last buffer +// for the last buffer, the completion is checked by the deflater.finished() method +while (!isLast && !deflater.needsInput() || isLast && !deflater.finished()) { +// when the previous deflater.deflate call returns 0 (and needsInput/finished returns false), +// it means that the output buffer is full. +// append the compressed buffer to the result buffer and allocate a new buffer. +if (written == 0) { +if (compressBuffer.position() > 0) { +appendCompressBufferToResultBuffer(); +allocateCompressBuffer(); +} else { +// this is an unexpected case, throw an exception to prevent an infinite loop +throw new IllegalStateException( +"Deflater didn't write any bytes while the compress buffer is empty."); +} } +written = deflater.deflate(compressBuffer); } if (isLast) { -// write gzip footer, integer values are in little endian byte order -compressBuffer.order(ByteOrder.LITTLE_ENDIAN); -// write CRC32 checksum -compressBuffer.putInt((int) crc.getValue()); -// write uncompressed size -
(pulsar) branch branch-3.0 updated: [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression (#22576)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new d017030b232 [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression (#22576) d017030b232 is described below commit d017030b23203fddc80c7d5b0e14d7f39f842f42 Author: Lari Hotari AuthorDate: Thu Apr 25 14:12:33 2024 +0300 [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression (#22576) (cherry picked from commit 997c8b95e1798cee08c56d92b77eb70056dfca8f) --- .../prometheus/PrometheusMetricsGenerator.java | 74 +-- .../prometheus/PrometheusMetricsGeneratorTest.java | 85 ++ 2 files changed, 138 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index bfcbb5ec89d..9dfa7673fe8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; import io.prometheus.client.Collector; import java.io.BufferedOutputStream; import java.io.IOException; @@ -192,8 +193,8 @@ public class PrometheusMetricsGenerator implements AutoCloseable { crc = new CRC32(); this.bufferSize = Math.max(Math.min(resolveChunkSize(bufAllocator), readableBytes), 8192); this.bufAllocator = bufAllocator; -this.resultBuffer = bufAllocator.compositeDirectBuffer(readableBytes / bufferSize + 1); -allocateBuffer(); +this.resultBuffer = bufAllocator.compositeDirectBuffer(readableBytes / bufferSize + 2); +allocateCompressBuffer(); } /** @@ -218,37 +219,66 @@ public class PrometheusMetricsGenerator implements AutoCloseable { // write gzip header compressBuffer.put(GZIP_HEADER); } +// update the CRC32 checksum calculation nioBuffer.mark(); crc.update(nioBuffer); nioBuffer.reset(); +// pass the input buffer to the deflater deflater.setInput(nioBuffer); +// when the input buffer is the last one, set the flag to finish the deflater if (isLast) { deflater.finish(); } -while (!deflater.needsInput() && !deflater.finished()) { -int written = deflater.deflate(compressBuffer); -if (written == 0 && !compressBuffer.hasRemaining()) { -backingCompressBuffer.setIndex(0, compressBuffer.position()); -resultBuffer.addComponent(true, backingCompressBuffer); -allocateBuffer(); +int written = -1; +// the deflater may need multiple calls to deflate the input buffer +// the completion is checked by the deflater.needsInput() method for buffers that aren't the last buffer +// for the last buffer, the completion is checked by the deflater.finished() method +while (!isLast && !deflater.needsInput() || isLast && !deflater.finished()) { +// when the previous deflater.deflate call returns 0 (and needsInput/finished returns false), +// it means that the output buffer is full. +// append the compressed buffer to the result buffer and allocate a new buffer. +if (written == 0) { +if (compressBuffer.position() > 0) { +appendCompressBufferToResultBuffer(); +allocateCompressBuffer(); +} else { +// this is an unexpected case, throw an exception to prevent an infinite loop +throw new IllegalStateException( +"Deflater didn't write any bytes while the compress buffer is empty."); +} } +written = deflater.deflate(compressBuffer); } if (isLast) { -// write gzip footer, integer values are in little endian byte order -compressBuffer.order(ByteOrder.LITTLE_ENDIAN); -// write CRC32 checksum -compressBuffer.putInt((int) crc.getValue()); -// write uncompressed size -
(pulsar) branch branch-3.1 updated: [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression (#22576)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new fe05e089c97 [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression (#22576) fe05e089c97 is described below commit fe05e089c97fb798b478b3ea0ad3877768c4fcee Author: Lari Hotari AuthorDate: Thu Apr 25 14:12:33 2024 +0300 [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression (#22576) (cherry picked from commit 997c8b95e1798cee08c56d92b77eb70056dfca8f) --- .../prometheus/PrometheusMetricsGenerator.java | 74 +-- .../prometheus/PrometheusMetricsGeneratorTest.java | 85 ++ 2 files changed, 138 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index bfcbb5ec89d..9dfa7673fe8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; import io.prometheus.client.Collector; import java.io.BufferedOutputStream; import java.io.IOException; @@ -192,8 +193,8 @@ public class PrometheusMetricsGenerator implements AutoCloseable { crc = new CRC32(); this.bufferSize = Math.max(Math.min(resolveChunkSize(bufAllocator), readableBytes), 8192); this.bufAllocator = bufAllocator; -this.resultBuffer = bufAllocator.compositeDirectBuffer(readableBytes / bufferSize + 1); -allocateBuffer(); +this.resultBuffer = bufAllocator.compositeDirectBuffer(readableBytes / bufferSize + 2); +allocateCompressBuffer(); } /** @@ -218,37 +219,66 @@ public class PrometheusMetricsGenerator implements AutoCloseable { // write gzip header compressBuffer.put(GZIP_HEADER); } +// update the CRC32 checksum calculation nioBuffer.mark(); crc.update(nioBuffer); nioBuffer.reset(); +// pass the input buffer to the deflater deflater.setInput(nioBuffer); +// when the input buffer is the last one, set the flag to finish the deflater if (isLast) { deflater.finish(); } -while (!deflater.needsInput() && !deflater.finished()) { -int written = deflater.deflate(compressBuffer); -if (written == 0 && !compressBuffer.hasRemaining()) { -backingCompressBuffer.setIndex(0, compressBuffer.position()); -resultBuffer.addComponent(true, backingCompressBuffer); -allocateBuffer(); +int written = -1; +// the deflater may need multiple calls to deflate the input buffer +// the completion is checked by the deflater.needsInput() method for buffers that aren't the last buffer +// for the last buffer, the completion is checked by the deflater.finished() method +while (!isLast && !deflater.needsInput() || isLast && !deflater.finished()) { +// when the previous deflater.deflate call returns 0 (and needsInput/finished returns false), +// it means that the output buffer is full. +// append the compressed buffer to the result buffer and allocate a new buffer. +if (written == 0) { +if (compressBuffer.position() > 0) { +appendCompressBufferToResultBuffer(); +allocateCompressBuffer(); +} else { +// this is an unexpected case, throw an exception to prevent an infinite loop +throw new IllegalStateException( +"Deflater didn't write any bytes while the compress buffer is empty."); +} } +written = deflater.deflate(compressBuffer); } if (isLast) { -// write gzip footer, integer values are in little endian byte order -compressBuffer.order(ByteOrder.LITTLE_ENDIAN); -// write CRC32 checksum -compressBuffer.putInt((int) crc.getValue()); -// write uncompressed size -
[PR] [improve][meta] Fix invalid use of drain API and race condition in closing metadata store [pulsar]
lhotari opened a new pull request, #22585: URL: https://github.com/apache/pulsar/pull/22585 ### Motivation There's currently some memory leaks in tests and while investigating the issue, I found out that there's a large number of uncompleted CompletableFutures in the heap dump. Currently the metadata store doesn't complete all pending operations when it is closed. There are multiple problems: - MpscUnboundedArrayQueue.drain will limit the number of entries to 4096 at a time - MpscUnboundedArrayQueue.drain uses relaxedPoll which is eventually consistent - There might be batches in flight, which need to be terminated ### Modifications - replace drain with a while loop in close - replace drain with a for loop in batch flushing - add isClosed checks ### Additional Context CompletableFutures in the heap dump: https://github.com/apache/pulsar/assets/66864/c373b65e-c639-4b1d-8ea5-d16a251552b2;> The instances are related to `org.apache.pulsar.broker.resources.NamespaceResources$PartitionedTopicResources$$Lambda$1819+0x7f08a8b65ee8`: https://github.com/apache/pulsar/assets/66864/6b289fbd-17e2-4df9-a418-f78770532777;> ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression (#22576)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 997c8b95e17 [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression (#22576) 997c8b95e17 is described below commit 997c8b95e1798cee08c56d92b77eb70056dfca8f Author: Lari Hotari AuthorDate: Thu Apr 25 14:12:33 2024 +0300 [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression (#22576) --- .../prometheus/PrometheusMetricsGenerator.java | 74 +-- .../prometheus/PrometheusMetricsGeneratorTest.java | 85 ++ 2 files changed, 138 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 8cd68caf1ee..6b4d08c359d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; import io.prometheus.client.Collector; import java.io.BufferedOutputStream; import java.io.IOException; @@ -191,8 +192,8 @@ public class PrometheusMetricsGenerator implements AutoCloseable { crc = new CRC32(); this.bufferSize = Math.max(Math.min(resolveChunkSize(bufAllocator), readableBytes), 8192); this.bufAllocator = bufAllocator; -this.resultBuffer = bufAllocator.compositeDirectBuffer(readableBytes / bufferSize + 1); -allocateBuffer(); +this.resultBuffer = bufAllocator.compositeDirectBuffer(readableBytes / bufferSize + 2); +allocateCompressBuffer(); } /** @@ -217,37 +218,66 @@ public class PrometheusMetricsGenerator implements AutoCloseable { // write gzip header compressBuffer.put(GZIP_HEADER); } +// update the CRC32 checksum calculation nioBuffer.mark(); crc.update(nioBuffer); nioBuffer.reset(); +// pass the input buffer to the deflater deflater.setInput(nioBuffer); +// when the input buffer is the last one, set the flag to finish the deflater if (isLast) { deflater.finish(); } -while (!deflater.needsInput() && !deflater.finished()) { -int written = deflater.deflate(compressBuffer); -if (written == 0 && !compressBuffer.hasRemaining()) { -backingCompressBuffer.setIndex(0, compressBuffer.position()); -resultBuffer.addComponent(true, backingCompressBuffer); -allocateBuffer(); +int written = -1; +// the deflater may need multiple calls to deflate the input buffer +// the completion is checked by the deflater.needsInput() method for buffers that aren't the last buffer +// for the last buffer, the completion is checked by the deflater.finished() method +while (!isLast && !deflater.needsInput() || isLast && !deflater.finished()) { +// when the previous deflater.deflate call returns 0 (and needsInput/finished returns false), +// it means that the output buffer is full. +// append the compressed buffer to the result buffer and allocate a new buffer. +if (written == 0) { +if (compressBuffer.position() > 0) { +appendCompressBufferToResultBuffer(); +allocateCompressBuffer(); +} else { +// this is an unexpected case, throw an exception to prevent an infinite loop +throw new IllegalStateException( +"Deflater didn't write any bytes while the compress buffer is empty."); +} } +written = deflater.deflate(compressBuffer); } if (isLast) { -// write gzip footer, integer values are in little endian byte order -compressBuffer.order(ByteOrder.LITTLE_ENDIAN); -// write CRC32 checksum -compressBuffer.putInt((int) crc.getValue()); -// write uncompressed size -compressBuffer.putInt(deflater.getTotalIn()); -// append the last compressed buffer -
Re: [PR] [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression [pulsar]
lhotari merged PR #22576: URL: https://github.com/apache/pulsar/pull/22576 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug] /metrics gzip compression sporadically fails with error 500 caused by java.nio.BufferOverflowException [pulsar]
lhotari closed issue #22575: [Bug] /metrics gzip compression sporadically fails with error 500 caused by java.nio.BufferOverflowException URL: https://github.com/apache/pulsar/issues/22575 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix][broker] Fix geo-replication admin client url [pulsar]
Demogorgon314 opened a new pull request, #22584: URL: https://github.com/apache/pulsar/pull/22584 ### Motivation When we only set the TLS URLs for `ClusterData` and set the `brokerClientTlsEnabled` to `true`, the admin client for geo-replication will be failed to create because the else if block(`StringUtils.isEmpty(data.getServiceUrl())`) logic is wrong. ### Modifications * Allow the cluster data only set tls url ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Fix BufferOverflowException and EOFException bugs in /metrics gzip compression [pulsar]
codecov-commenter commented on PR #22576: URL: https://github.com/apache/pulsar/pull/22576#issuecomment-2076880246 ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/22576?dropdown=coverage=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) Report Attention: Patch coverage is `79.16667%` with `5 lines` in your changes are missing coverage. Please review. > Project coverage is 73.95%. Comparing base [(`bbc6224`)](https://app.codecov.io/gh/apache/pulsar/commit/bbc62245c5ddba1de4b1e7cee4ab49334bc36277?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) to head [(`4bd5888`)](https://app.codecov.io/gh/apache/pulsar/pull/22576?dropdown=coverage=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache). > Report is 188 commits behind head on master. Additional details and impacted files [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/22576/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/22576?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) ```diff @@ Coverage Diff @@ ## master #22576 +/- ## + Coverage 73.57% 73.95% +0.37% + Complexity3262432595 -29 Files 1877 1885 +8 Lines139502 140520+1018 Branches 1529915452 +153 + Hits 102638 103919+1281 + Misses2890828565 -343 - Partials 7956 8036 +80 ``` | [Flag](https://app.codecov.io/gh/apache/pulsar/pull/22576/flags?src=pr=flags_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | Coverage Δ | | |---|---|---| | [inttests](https://app.codecov.io/gh/apache/pulsar/pull/22576/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `26.71% <0.00%> (+2.13%)` | :arrow_up: | | [systests](https://app.codecov.io/gh/apache/pulsar/pull/22576/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `24.46% <0.00%> (+0.14%)` | :arrow_up: | | [unittests](https://app.codecov.io/gh/apache/pulsar/pull/22576/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `73.22% <79.16%> (+0.37%)` | :arrow_up: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. | [Files](https://app.codecov.io/gh/apache/pulsar/pull/22576?dropdown=coverage=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | Coverage Δ | | |---|---|---| | [...r/stats/prometheus/PrometheusMetricsGenerator.java](https://app.codecov.io/gh/apache/pulsar/pull/22576?src=pr=tree=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fstats%2Fprometheus%2FPrometheusMetricsGenerator.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zdGF0cy9wcm9tZXRoZXVzL1Byb21ldGhldXNNZXRyaWNzR2VuZXJhdG9yLmphdmE=) | `78.43% <79.16%> (-7.08%)` | :arrow_down: | ... and [255 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/22576/indirect-changes?src=pr=tree-more_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix][test] Clear fields in test cleanup to reduce memory consumption [pulsar]
lhotari opened a new pull request, #22583: URL: https://github.com/apache/pulsar/pull/22583 ### Motivation - TestNG keeps a reference to the test instance and this will cause a OOME in some cases https://github.com/apache/pulsar/assets/66864/255bc5dd-65af-484d-9286-03d11c67523b;> ### Modifications - Clear instances in the test classes that showed up as dominators in a heap dump ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] [Namespace-level Geo-Replication] Reject a topic creation if there is a confilct topic on the remote side [pulsar]
nodece commented on code in PR #22577: URL: https://github.com/apache/pulsar/pull/22577#discussion_r1579136424 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -652,4 +652,63 @@ public void testUnFenceTopicToReuse() throws Exception { admin2.topics().delete(topicName); }); } + +@Test +public void testNamespaceLevelReplicationRemoteConflictTopicExist() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp"); +// Verify: will get a not found error when calling "getPartitionedTopicMetadata" on a topic not exists. +try { +admin1.topics().getPartitionedTopicMetadata(topicName); +fail("Expected a not found error"); +} catch (Exception ex) { +Throwable unWrapEx = FutureUtil.unwrapCompletionException(ex); +assertTrue(unWrapEx.getMessage().contains("not found")); +} +// Verify: will get a conflict error when there is a topic with different partitions on the remote side. +admin2.topics().createPartitionedTopic(topicName, 1); +try { +admin1.topics().createPartitionedTopic(topicName, 2); Review Comment: > > When you send the message to topic-1 from the local cluster, the replicator will be created, partition 1 and 2 topics will become non-partition topics on the remote cluster. > > No, the creation of partitions `1` and `2` will fail, and the replicator will get an error "Illegal topic partition name xxx with max allowed 1 partitions" Ok, this check looks from https://github.com/apache/pulsar/pull/19086 Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][misc] Upgrade slf4j to 2.0.13 [pulsar]
nodece merged PR #22391: URL: https://github.com/apache/pulsar/pull/22391 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [improve][misc] Upgrade slf4j to 2.0.13 (#22391)
This is an automated email from the ASF dual-hosted git repository. zixuan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 0c097ef2c6c [improve][misc] Upgrade slf4j to 2.0.13 (#22391) 0c097ef2c6c is described below commit 0c097ef2c6c85efbb91d388ffe839ec542e82278 Author: Zixuan Liu AuthorDate: Thu Apr 25 16:55:45 2024 +0800 [improve][misc] Upgrade slf4j to 2.0.13 (#22391) Signed-off-by: Zixuan Liu --- buildtools/pom.xml | 10 +-- distribution/server/pom.xml| 2 +- distribution/server/src/assemble/LICENSE.bin.txt | 6 ++-- distribution/shell/pom.xml | 2 +- distribution/shell/src/assemble/LICENSE.bin.txt| 4 +-- pom.xml| 22 ++- pulsar-client-all/pom.xml | 2 +- pulsar-functions/instance/pom.xml | 2 +- pulsar-functions/localrun/pom.xml | 2 +- pulsar-functions/runtime-all/pom.xml | 2 +- pulsar-io/alluxio/pom.xml | 10 +++ pulsar-io/hdfs2/pom.xml| 2 +- pulsar-io/hdfs3/pom.xml| 2 +- pulsar-io/rabbitmq/pom.xml | 4 +-- .../pulsar/io/rabbitmq/RabbitMQBrokerManager.java | 33 -- pulsar-io/solr/pom.xml | 4 +++ structured-event-log/pom.xml | 2 +- tiered-storage/file-system/pom.xml | 6 +++- 18 files changed, 82 insertions(+), 35 deletions(-) diff --git a/buildtools/pom.xml b/buildtools/pom.xml index cd4d02af3d7..58f99e9ea86 100644 --- a/buildtools/pom.xml +++ b/buildtools/pom.xml @@ -41,7 +41,7 @@ 1.8 3.1.0 2.23.1 -1.7.32 +2.0.13 7.7.1 3.11 4.1 @@ -100,6 +100,12 @@ org.testng testng ${testng.version} + + + org.slf4j + * + + org.apache.logging.log4j @@ -111,7 +117,7 @@ org.apache.logging.log4j - log4j-slf4j-impl + log4j-slf4j2-impl org.slf4j diff --git a/distribution/server/pom.xml b/distribution/server/pom.xml index 599a9755f91..1c9ea686853 100644 --- a/distribution/server/pom.xml +++ b/distribution/server/pom.xml @@ -180,7 +180,7 @@ org.apache.logging.log4j - log4j-slf4j-impl + log4j-slf4j2-impl diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index c5642503b25..c5c243796b6 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -350,7 +350,7 @@ The Apache Software License, Version 2.0 * Log4J - org.apache.logging.log4j-log4j-api-2.23.1.jar - org.apache.logging.log4j-log4j-core-2.23.1.jar -- org.apache.logging.log4j-log4j-slf4j-impl-2.23.1.jar +- org.apache.logging.log4j-log4j-slf4j2-impl-2.23.1.jar - org.apache.logging.log4j-log4j-web-2.23.1.jar * Java Native Access JNA - net.java.dev.jna-jna-jpms-5.12.1.jar @@ -563,8 +563,8 @@ BSD 2-Clause License MIT License * Java SemVer -- com.github.zafarkhaja-java-semver-0.9.0.jar -- ../licenses/LICENSE-SemVer.txt * SLF4J -- ../licenses/LICENSE-SLF4J.txt -- org.slf4j-slf4j-api-1.7.32.jar -- org.slf4j-jcl-over-slf4j-1.7.32.jar +- org.slf4j-slf4j-api-2.0.13.jar +- org.slf4j-jcl-over-slf4j-2.0.13.jar * The Checker Framework - org.checkerframework-checker-qual-3.33.0.jar * oshi diff --git a/distribution/shell/pom.xml b/distribution/shell/pom.xml index 5f4fc549ccc..144f7b1ff6d 100644 --- a/distribution/shell/pom.xml +++ b/distribution/shell/pom.xml @@ -51,7 +51,7 @@ org.apache.logging.log4j - log4j-slf4j-impl + log4j-slf4j2-impl diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt index f76631dbbf2..41b38f17dce 100644 --- a/distribution/shell/src/assemble/LICENSE.bin.txt +++ b/distribution/shell/src/assemble/LICENSE.bin.txt @@ -386,7 +386,7 @@ The Apache Software License, Version 2.0 * Log4J - log4j-api-2.23.1.jar - log4j-core-2.23.1.jar -- log4j-slf4j-impl-2.23.1.jar +- log4j-slf4j2-impl-2.23.1.jar - log4j-web-2.23.1.jar * OpenTelemetry - opentelemetry-api-1.34.1.jar @@ -424,7 +424,7 @@ BSD 3-clause "New" or "Revised" License MIT License * SLF4J -- ../licenses/LICENSE-SLF4J.txt -- slf4j-api-1.7.32.jar +- slf4j-api-2.0.13.jar * The Checker Framework - checker-qual-3.33.0.jar diff --git a/pom.xml b/pom.xml index 90b6c8cb8ed..585347fb1f8 100644 --- a/pom.xml +++ b/pom.xml @@ -153,7 +153,7 @@ flexible messaging model and an intuitive client API. 0.16.0
Re: [PR] [fix] [broker] [Namespace-level Geo-Replication] Reject a topic creation if there is a confilct topic on the remote side [pulsar]
poorbarcode commented on code in PR #22577: URL: https://github.com/apache/pulsar/pull/22577#discussion_r1579099569 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -652,4 +652,63 @@ public void testUnFenceTopicToReuse() throws Exception { admin2.topics().delete(topicName); }); } + +@Test +public void testNamespaceLevelReplicationRemoteConflictTopicExist() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp"); +// Verify: will get a not found error when calling "getPartitionedTopicMetadata" on a topic not exists. +try { +admin1.topics().getPartitionedTopicMetadata(topicName); +fail("Expected a not found error"); +} catch (Exception ex) { +Throwable unWrapEx = FutureUtil.unwrapCompletionException(ex); +assertTrue(unWrapEx.getMessage().contains("not found")); +} +// Verify: will get a conflict error when there is a topic with different partitions on the remote side. +admin2.topics().createPartitionedTopic(topicName, 1); +try { +admin1.topics().createPartitionedTopic(topicName, 2); Review Comment: > When you send the message to topic-1 from the local cluster, the replicator will be created, partition 1 and 2 topics will become non-partition topics on the remote cluster. No, the creation of partitions `1` and `2` will fail, and the replicator will get an error "Illegal topic partition name xxx with max allowed 1 partitions" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] [Namespace-level Geo-Replication] Reject a topic creation if there is a confilct topic on the remote side [pulsar]
nodece commented on code in PR #22577: URL: https://github.com/apache/pulsar/pull/22577#discussion_r1579088423 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -652,4 +652,63 @@ public void testUnFenceTopicToReuse() throws Exception { admin2.topics().delete(topicName); }); } + +@Test +public void testNamespaceLevelReplicationRemoteConflictTopicExist() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp"); +// Verify: will get a not found error when calling "getPartitionedTopicMetadata" on a topic not exists. +try { +admin1.topics().getPartitionedTopicMetadata(topicName); +fail("Expected a not found error"); +} catch (Exception ex) { +Throwable unWrapEx = FutureUtil.unwrapCompletionException(ex); +assertTrue(unWrapEx.getMessage().contains("not found")); +} +// Verify: will get a conflict error when there is a topic with different partitions on the remote side. +admin2.topics().createPartitionedTopic(topicName, 1); +try { +admin1.topics().createPartitionedTopic(topicName, 2); Review Comment: I want to explain the replicator behavior. The geo-replication is enabled on the namespace, `topic-1` has 1 partitions on the remote cluster, and you will create a `topic-1` topic with 3 partitions on the local cluster, which can be created. When you send the message to `topic-1` from the local cluster, the replicator will be created, partition 1 and 2 topics will become non-partition topics on the remote cluster. | local cluster | remote cluster | | -- | -- | | topic-1-parition-0(partitioned) | topic-1-partition-0(partitioned) | | topic-1-parition-1(partitioned) | topic-1-parition-1(non-partition) | | topic-1-parition-2(partitioned) | topic-1-parition-2(non-partition) | -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [D] Pulsar in Active , Active small scale Setup [pulsar]
GitHub user lhotari edited a comment on the discussion: Pulsar in Active , Active small scale Setup Assuming that this is a local redundancy setup and not about geographical redundancy where Pulsar's geo-replication would be relevant. Pulsar standalone shouldn't be considered a unit of deployment. As mentioned before, Pulsar standalone is not designed to be used for production use, but if it works for you, we won't stop you. Instead of thinking of redundancy with 2 Pulsar Standalone instances and having replication between them, a better approach would be to run a minimal setup where there's redundancy using the standard features of Bookkeeper and Zookeeper. For bookies, it's fine to have 2 instances, but for Zookeeper you would need 3 instances to reach quorum. GitHub link: https://github.com/apache/pulsar/discussions/22581#discussioncomment-9222309 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] Pulsar in Active , Active small scale Setup [pulsar]
GitHub user lhotari edited a comment on the discussion: Pulsar in Active , Active small scale Setup Assuming that this is a local redundancy setup and not about geographical redundancy where Pulsar's geo-replication would be relevant. Pulsar standalone shouldn't be considered a unit of deployment. As mentioned before, Pulsar standalone is not designed to be used for production use, but if it works for you, we won't stop you. Instead of thinking of redundancy with 2 Pulsar Standalone instances and having replication between them, a better approach would be to run a minimal setup where there's redundancy using the standard features of Bookkeeper and Zookeeper. For bookies, it's fine to have 2 instances for redundancy, but for Zookeeper you would need 3 instances to reach quorum in a redundant setup. GitHub link: https://github.com/apache/pulsar/discussions/22581#discussioncomment-9222309 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] Pulsar in Active , Active small scale Setup [pulsar]
GitHub user lhotari added a comment to the discussion: Pulsar in Active , Active small scale Setup Assuming that this is a local redundancy setup and not about geographical redundancy where Pulsar's geo-replication would be relevant. Pulsar standalone shouldn't be considered a unit of deployment. As mentioned before, Pulsar standalone is not designed to be used for production use, but if it works for you, we won't stop you. Instead of thinking of redundancy with 2 Pulsar Standalone instances and having replication between them, a better approach would be to run a minimal setup where there's redundancy using the standard features of Bookkeeper and Zookeeper. For bookies, it's fine to have 2 instances, but for Zookeeper you would need 3 instances at minimum to reach quorum. GitHub link: https://github.com/apache/pulsar/discussions/22581#discussioncomment-9222309 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [PR] [improve][ml] Optimize read entries with skipCondition [pulsar]
dao-jun closed pull request #22560: [improve][ml] Optimize read entries with skipCondition URL: https://github.com/apache/pulsar/pull/22560 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] [Namespace-level Geo-Replication] Reject a topic creation if there is a confilct topic on the remote side [pulsar]
poorbarcode commented on code in PR #22577: URL: https://github.com/apache/pulsar/pull/22577#discussion_r1579059969 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -652,4 +652,63 @@ public void testUnFenceTopicToReuse() throws Exception { admin2.topics().delete(topicName); }); } + +@Test +public void testNamespaceLevelReplicationRemoteConflictTopicExist() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp"); +// Verify: will get a not found error when calling "getPartitionedTopicMetadata" on a topic not exists. +try { +admin1.topics().getPartitionedTopicMetadata(topicName); +fail("Expected a not found error"); +} catch (Exception ex) { +Throwable unWrapEx = FutureUtil.unwrapCompletionException(ex); +assertTrue(unWrapEx.getMessage().contains("not found")); +} +// Verify: will get a conflict error when there is a topic with different partitions on the remote side. +admin2.topics().createPartitionedTopic(topicName, 1); +try { +admin1.topics().createPartitionedTopic(topicName, 2); Review Comment: @nodece > If the remote cluster's topic equals the topic that will be created, how to handle it? Broker assumes created successfully when the topic is the same as trying to create. See https://github.com/apache/pulsar/pull/22577/files#diff-6902810fd24ff29ec34052bcc6a0de36d76107649148d945b80ec2be888e9042R683-R686 ```java if (topicMeta.partitions == numPartitions) { log.info("[{}] Skip created partitioned topic {} in cluster {}, because that {}", clientAppId(), topicName, cluster, unwrapEx2.getMessage()); createRemoteTopicFuture.complete(null); } ``` > How to roll back a remote cluster's topic if the remote cluster's topic created the cluster successfully, but the local cluster's creation failed. Before creating topics at the remote cluster, the check for creating topics at the local cluster has been passed, so it is less probably to fail. Users should handle the error manually if created on the local cluster failed, for example, they can try again. >>According to your description, when using the default broker configuration, and the geo-replication is enabled on the namespace level, the remote cluster will create two non-partitioned topics, tenant/namespace/topic-partition-1 and tenant/namespace/topic-partition-2 by the geo producer. Is it right? > Sorry, I didn't consider your PR. > If there isn't this PR, the local cluster will do that. I can not fully understand this comment, could explain more details about it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] [Namespace-level Geo-Replication] Reject a topic creation if there is a confilct topic on the remote side [pulsar]
poorbarcode commented on code in PR #22577: URL: https://github.com/apache/pulsar/pull/22577#discussion_r1579059969 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -652,4 +652,63 @@ public void testUnFenceTopicToReuse() throws Exception { admin2.topics().delete(topicName); }); } + +@Test +public void testNamespaceLevelReplicationRemoteConflictTopicExist() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp"); +// Verify: will get a not found error when calling "getPartitionedTopicMetadata" on a topic not exists. +try { +admin1.topics().getPartitionedTopicMetadata(topicName); +fail("Expected a not found error"); +} catch (Exception ex) { +Throwable unWrapEx = FutureUtil.unwrapCompletionException(ex); +assertTrue(unWrapEx.getMessage().contains("not found")); +} +// Verify: will get a conflict error when there is a topic with different partitions on the remote side. +admin2.topics().createPartitionedTopic(topicName, 1); +try { +admin1.topics().createPartitionedTopic(topicName, 2); Review Comment: > If the remote cluster's topic equals the topic that will be created, how to handle it? Broker assumes created successfully when the topic is the same as trying to create. See https://github.com/apache/pulsar/pull/22577/files#diff-6902810fd24ff29ec34052bcc6a0de36d76107649148d945b80ec2be888e9042R683-R686 ```java if (topicMeta.partitions == numPartitions) { log.info("[{}] Skip created partitioned topic {} in cluster {}, because that {}", clientAppId(), topicName, cluster, unwrapEx2.getMessage()); createRemoteTopicFuture.complete(null); } ``` > How to roll back a remote cluster's topic if the remote cluster's topic created the cluster successfully, but the local cluster's creation failed. Before creating topics at the remote cluster, the check for creating topics at the local cluster has been passed, so it is less probably to fail. Users should handle the error manually if created on the local cluster failed, for example, they can try again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org