[GitHub] [pulsar] mattisonchao opened a new pull request, #19761: [fix][meta] Fix close borrowed executor
mattisonchao opened a new pull request, #19761: URL: https://github.com/apache/pulsar/pull/19761 ### Motivation The executor owner is `CoordinationServiceImpl `, but we close it in the borrower `LeaderElectionImpl`. ### Modifications - Don't close borrowed executor in `LeaderElectionImpl` ### Verifying this change - [x] Make sure that the change passes the CI checks. ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] gaoran10 commented on a diff in pull request #19708: [improve][broker] PIP-192 Supports AntiAffinityGroupPolicy
gaoran10 commented on code in PR #19708: URL: https://github.com/apache/pulsar/pull/19708#discussion_r1130605603 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java: ## @@ -424,7 +458,6 @@ public static CompletableFuture> getAntiAffinityNamespaceOw brokerToNamespaceToBundleRange) { CompletableFuture> antiAffinityNsBrokersResult = new CompletableFuture<>(); - pulsar.getPulsarResources().getLocalPolicies().getLocalPoliciesAsync(NamespaceName.get(namespaceName)) Review Comment: Can we add a method to check the anti-affinity policy and reuse it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] momo-jun commented on pull request #19470: [improve][fn] Support configure compression type
momo-jun commented on PR #19470: URL: https://github.com/apache/pulsar/pull/19470#issuecomment-1461479876 > the type is `String` and available values are: `None`, `LZ4`, `ZLIB`, `ZSTD` and `SNAPPY`, and default to `LZ4` @jiangpengcheng thanks for the input. I've drafted https://github.com/apache/pulsar-site/pull/461 to add the docs. Only one concern - the default config of compression type on client side, e.g., Java producers, is `None` (no compression), which is not aligned with functions. Is there any risk here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] lhotari commented on issue #11032: Flaky-test: ZKSessionTest.testReacquireLeadershipAfterSessionLost
lhotari commented on issue #11032: URL: https://github.com/apache/pulsar/issues/11032#issuecomment-1461479227 related to #19754 . @mattisonchao please remove ZKSessionTest from quarantine and fix this. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-site] momo-jun opened a new pull request, #461: [feat][doc] Add a new config `compressionType` for functions
momo-jun opened a new pull request, #461: URL: https://github.com/apache/pulsar-site/pull/461 ### Modifications Add a new config `compressionType` to function docs, following https://github.com/apache/pulsar/pull/19470. https://user-images.githubusercontent.com/60642177/223951547-8a6ab553-f7be-4643-b828-dace7a576c2c.png;> ### Documentation - [x] `doc` - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] lhotari opened a new issue, #11032: Flaky-test: ZKSessionTest.testReacquireLeadershipAfterSessionLost
lhotari opened a new issue, #11032: URL: https://github.com/apache/pulsar/issues/11032 ZKSessionTest is flaky. The testReacquireLeadershipAfterSessionLost test method fails sporadically. [example failure](https://github.com/apache/pulsar/pull/11026/checks?check_run_id=2889407530#step:9:4861) ``` Error: Tests run: 6, Failures: 1, Errors: 0, Skipped: 4, Time elapsed: 31.46 s <<< FAILURE! - in org.apache.pulsar.metadata.ZKSessionTest Error: testReacquireLeadershipAfterSessionLost(org.apache.pulsar.metadata.ZKSessionTest) Time elapsed: 15.396 s <<< FAILURE! org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a lambda expression in org.apache.pulsar.metadata.ZKSessionTest that uses org.apache.pulsar.metadata.api.coordination.LeaderElection expected [Leading] but found [NoLeader] within 10 seconds. at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:165) at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119) at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31) at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:895) at org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:679) at org.apache.pulsar.metadata.ZKSessionTest.testReacquireLeadershipAfterSessionLost(ZKSessionTest.java:174) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132) at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:45) at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:73) at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.AssertionError: expected [Leading] but found [NoLeader] at org.testng.Assert.fail(Assert.java:99) at org.testng.Assert.failNotEquals(Assert.java:1037) at org.testng.Assert.assertEqualsImpl(Assert.java:140) at org.testng.Assert.assertEquals(Assert.java:122) at org.testng.Assert.assertEquals(Assert.java:617) at org.apache.pulsar.metadata.ZKSessionTest.lambda$testReacquireLeadershipAfterSessionLost$1(ZKSessionTest.java:175) at org.awaitility.core.AssertionCondition.lambda$new$0(AssertionCondition.java:53) at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:222) at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:209) ... 4 more ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] Demogorgon314 closed pull request #19592: [improve][broker] PIP-192: Support broker isolation policy
Demogorgon314 closed pull request #19592: [improve][broker] PIP-192: Support broker isolation policy URL: https://github.com/apache/pulsar/pull/19592 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-go] CrazyCollin commented on a diff in pull request #987: [feat] Expose the chunk config of consumer to the reader
CrazyCollin commented on code in PR #987: URL: https://github.com/apache/pulsar-client-go/pull/987#discussion_r1130583553 ## pulsar/reader.go: ## @@ -92,6 +92,16 @@ type ReaderOptions struct { // BackoffPolicy parameterize the following options in the reconnection logic to // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) BackoffPolicy internal.BackoffPolicy + + // MaxPendingChunkedMessage sets the maximum pending chunked messages. (default: 100) + MaxPendingChunkedMessage int + + // ExpireTimeOfIncompleteChunk sets the expiry time of discarding incomplete chunked message. (default: 60 seconds) + ExpireTimeOfIncompleteChunk time.Duration + + // AutoAckIncompleteChunk sets whether consumer auto acknowledges incomplete chunked message when it should + // be removed (e.g.the chunked message pending queue is full). (default: false) + AutoAckIncompleteChunk bool Review Comment: Thank you so much! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-go] RobertIndie commented on a diff in pull request #987: [feat] Expose the chunk config of consumer to the reader
RobertIndie commented on code in PR #987: URL: https://github.com/apache/pulsar-client-go/pull/987#discussion_r1130582077 ## pulsar/reader.go: ## @@ -92,6 +92,16 @@ type ReaderOptions struct { // BackoffPolicy parameterize the following options in the reconnection logic to // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) BackoffPolicy internal.BackoffPolicy + + // MaxPendingChunkedMessage sets the maximum pending chunked messages. (default: 100) + MaxPendingChunkedMessage int + + // ExpireTimeOfIncompleteChunk sets the expiry time of discarding incomplete chunked message. (default: 60 seconds) + ExpireTimeOfIncompleteChunk time.Duration + + // AutoAckIncompleteChunk sets whether consumer auto acknowledges incomplete chunked message when it should + // be removed (e.g.the chunked message pending queue is full). (default: false) + AutoAckIncompleteChunk bool Review Comment: ```suggestion // AutoAckIncompleteChunk sets whether reader auto acknowledges incomplete chunked message when it should // be removed (e.g.the chunked message pending queue is full). (default: false) AutoAckIncompleteChunk bool ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] ericsyh edited a comment on the discussion: Using Pulsar inside Istio Service Mesh?
GitHub user ericsyh edited a comment on the discussion: Using Pulsar inside Istio Service Mesh? I guess for now there are some extra efforts to make `pulsar-helm-chart` running with Istio b/c several components have initContainers in the chart. I didn't try hacking to disable bk's initContainers before, but I think not running `pulsar-bookkeeper-verify-clusterid` should be okay. Bk nodes start running has no much dependency. GitHub link: https://github.com/apache/pulsar/discussions/19703#discussioncomment-5250906 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
[GitHub] [pulsar] ericsyh added a comment to the discussion: Using Pulsar inside Istio Service Mesh?
GitHub user ericsyh added a comment to the discussion: Using Pulsar inside Istio Service Mesh? I guess for now there are some extra efforts to make `pulsar-helm-chart` running with Istio b/c several components have initContainers. I didn't try hacking to disable bk's initContainers before, but I think not running `pulsar-bookkeeper-verify-clusterid` should be okay. Bk nodes start running has no much dependency. GitHub link: https://github.com/apache/pulsar/discussions/19703#discussioncomment-5250906 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
[GitHub] [pulsar] gaoran10 commented on a diff in pull request #19708: [improve][broker] PIP-192 Supports AntiAffinityGroupPolicy
gaoran10 commented on code in PR #19708: URL: https://github.com/apache/pulsar/pull/19708#discussion_r1130573036 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java: ## @@ -161,6 +175,13 @@ public void start() throws PulsarServerException { } catch (LoadDataStoreException e) { throw new PulsarServerException(e); } +LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap); +// register listeners for domain changes + pulsar.getPulsarResources().getClusterResources().getFailureDomainResources() +.registerListener(__ -> { +pulsar.getLoadManagerExecutor().execute(() -> + LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap)); +}); Review Comment: Can we change the field `antiAffinityGroupPolicyHelper` to a local variable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] gaoran10 commented on a diff in pull request #19708: [improve][broker] PIP-192 Supports AntiAffinityGroupPolicy
gaoran10 commented on code in PR #19708: URL: https://github.com/apache/pulsar/pull/19708#discussion_r1130573036 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java: ## @@ -161,6 +175,13 @@ public void start() throws PulsarServerException { } catch (LoadDataStoreException e) { throw new PulsarServerException(e); } +LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap); +// register listeners for domain changes + pulsar.getPulsarResources().getClusterResources().getFailureDomainResources() +.registerListener(__ -> { +pulsar.getLoadManagerExecutor().execute(() -> + LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap)); +}); Review Comment: Does the field `antiAffinityGroupPolicyHelper` is necessary? Can we change it to a local variable. ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java: ## @@ -161,6 +175,13 @@ public void start() throws PulsarServerException { } catch (LoadDataStoreException e) { throw new PulsarServerException(e); } +LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap); +// register listeners for domain changes + pulsar.getPulsarResources().getClusterResources().getFailureDomainResources() +.registerListener(__ -> { +pulsar.getLoadManagerExecutor().execute(() -> + LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap)); +}); Review Comment: Does the field `antiAffinityGroupPolicyHelper` is necessary? Can we change it to a local variable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19730: [improve][broker] PIP-192 Made only the leader consume TopBundlesLoadDataStore
heesung-sn commented on code in PR #19730: URL: https://github.com/apache/pulsar/pull/19730#discussion_r1130572901 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java: ## @@ -398,6 +425,50 @@ private boolean isInternalTopic(String topic) { || topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC); } +private void playLeader() { +log.info("This broker:{} is the leader now.", pulsar.getLookupServiceAddress()); +serviceUnitStateChannel.scheduleOwnershipMonitor(); +this.pulsar.getLoadManagerExecutor().execute(() -> { +try { +loadStoreInitWaiter.await(); +} catch (InterruptedException e) { Review Comment: Optimistically, yes. If the load store has been initialized, then it will successfully start table view. If not, in the worst case, the table view start will throw an exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] gaoran10 commented on a diff in pull request #19708: [improve][broker] PIP-192 Supports AntiAffinityGroupPolicy
gaoran10 commented on code in PR #19708: URL: https://github.com/apache/pulsar/pull/19708#discussion_r1130573036 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java: ## @@ -161,6 +175,13 @@ public void start() throws PulsarServerException { } catch (LoadDataStoreException e) { throw new PulsarServerException(e); } +LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap); +// register listeners for domain changes + pulsar.getPulsarResources().getClusterResources().getFailureDomainResources() +.registerListener(__ -> { +pulsar.getLoadManagerExecutor().execute(() -> + LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap)); +}); Review Comment: Does the field `antiAffinityGroupPolicyHelper` is necessary? It seems we can change it to a local variable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode commented on pull request #19751: [fix] [test] fix flaky test BucketDelayedDeliveryTrackerTest. testWithBkException
poorbarcode commented on PR #19751: URL: https://github.com/apache/pulsar/pull/19751#issuecomment-1461454829 /pulsarbot run-failure-checks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch master updated: [fix] [broker] delete topic failed if disabled system topic (#19735)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 401fb055ed4 [fix] [broker] delete topic failed if disabled system topic (#19735) 401fb055ed4 is described below commit 401fb055ed428b7f3e33a49cea78b25b04f7448e Author: fengyubiao AuthorDate: Thu Mar 9 15:13:50 2023 +0800 [fix] [broker] delete topic failed if disabled system topic (#19735) Motivation: After PR #18823, The cmd delete topic will fail if disabled the feature system topic. Modifications: do not delete the system policy if disabled the feature system topic --- .../broker/service/persistent/PersistentTopic.java | 3 +- .../client/impl/DisabledSystemTopicTest.java | 61 ++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 3b9fbbceb24..79717ed5730 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1246,7 +1246,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal deleteTopicAuthenticationFuture.thenCompose(ignore -> deleteSchema()) .thenCompose(ignore -> { -if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)) { +if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic) +&& brokerService.getPulsar().getConfiguration().isSystemTopicEnabled()) { return deleteTopicPolicies(); } else { return CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DisabledSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DisabledSystemTopicTest.java new file mode 100644 index 000..7747d3a576c --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DisabledSystemTopicTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class DisabledSystemTopicTest extends ProducerConsumerBase { + +@Override +@BeforeMethod +public void setup() throws Exception { +super.internalSetup(); +super.producerBaseSetup(); +} + +@Override +@AfterMethod(alwaysRun = true) +public void cleanup() throws Exception { +super.internalCleanup(); +} + +protected void doInitConf() throws Exception { +super.doInitConf(); +conf.setTransactionCoordinatorEnabled(false); +conf.setSystemTopicEnabled(false); +} + +@Test +public void testDeleteTopic() throws Exception { +String topicName = "persistent://my-property/my-ns/tp_" + UUID.randomUUID().toString(); + +admin.topics().createNonPartitionedTopic(topicName); +admin.topics().delete(topicName, false); + +admin.topics().createPartitionedTopic(topicName, 3); +admin.topics().deletePartitionedTopic(topicName); +} +}
[GitHub] [pulsar] poorbarcode merged pull request #19735: [fix] [broker] delete topic failed if disabled system topic
poorbarcode merged PR #19735: URL: https://github.com/apache/pulsar/pull/19735 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar-site] branch main updated: explain the schema format for REST API & Java Admin API (#460)
This is an automated email from the ASF dual-hosted git repository. liuyu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-site.git The following commit(s) were added to refs/heads/main by this push: new 91430cdcc7d explain the schema format for REST API & Java Admin API (#460) 91430cdcc7d is described below commit 91430cdcc7d469cfe5ab84eedc5342ab121ac522 Author: Julien Jakubowski AuthorDate: Thu Mar 9 08:07:37 2023 +0100 explain the schema format for REST API & Java Admin API (#460) --- docs/admin-api-schemas.md | 43 +++ 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/docs/admin-api-schemas.md b/docs/admin-api-schemas.md index bc583a1288f..deb43cd2102 100644 --- a/docs/admin-api-schemas.md +++ b/docs/admin-api-schemas.md @@ -51,26 +51,6 @@ The `schema-definition-file` is in JSON format. } ``` -The `schema-definition-file` includes the following fields: - -| Field | Description | -| --- | --- | -| `type` | The schema type. | -| `schema` | The schema definition data, which is encoded in UTF 8 charset. If the schema is a **primitive** schema, this field should be blank. If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition. | -| `properties` | The additional properties associated with the schema. | - -The following is an example of the `schema-definition-file` for a JSON schema. - -**Example** - -```json -{ -"type": "JSON", -"schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file3\",\"type\":[\"string\",\"null\"],\"default\":\"dfdf\"}]}", -"properties": {} -} -``` - @@ -105,10 +85,33 @@ payload.setSchema(""); admin.createSchema("my-tenant/my-ns/my-topic", payload); ``` +If the schema is a **primitive** schema, the `schema` field must be blank. +If the schema is a **struct** schema, this field must be a JSON string of the Avro schema definition. + +The payload includes the following fields: + +| Field | Description | +| --- | --- | +| `type` | The schema type. | +| `schema` | The schema definition data, which is encoded in UTF 8 charset. If the schema is a **primitive** schema, this field should be blank. If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition. | +| `properties` | The additional properties associated with the schema. | + +The following is an example for a JSON schema. + +**Example** + +```json +{ +"type": "JSON", +"schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file3\",\"type\":[\"string\",\"null\"],\"default\":\"dfdf\"}]}", +"properties": {} +} +``` + ### Get the latest schema To get the latest schema for a topic, you can use one of the following methods.
[GitHub] [pulsar-site] Anonymitaet merged pull request #460: explain the schema format for REST API & Java Admin API
Anonymitaet merged PR #460: URL: https://github.com/apache/pulsar-site/pull/460 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] massakam opened a new pull request, #19759: [fix][stats] Fix issue where msgRateExpired may not refresh forever
massakam opened a new pull request, #19759: URL: https://github.com/apache/pulsar/pull/19759 ### Motivation Topic stats have a `msgRateExpired` field. Once this value is greater than 0, it may never return to 0, even though no messages have expired. This is because `PersistentMessageExpiryMonitor#updateRates()` only runs when `PersistentMessageExpiryMonitor#expireMessages()` is executed, not periodically. For example, if the number of messages in the backlog of a subscription is 0, the execution of `PersistentMessageExpiryMonitor#expireMessages()` will be skipped and `msgRateExpired` will not be updated. https://github.com/apache/pulsar/blob/cdeef00c5f6a5bd3197b4ca6de0a0505b18835d8/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L1062-L1071 As a result, the value of `msgRateExpired` may remain non-zero even though message expiration has not occurred recently. ### Modifications Call `PersistentMessageExpiryMonitor#updateRates()` in the `PersistentTopic#updateRates()` method that runs periodically to update the topic stats. ### Verifying this change - [ ] Make sure that the change passes the CI checks. ### Documentation - [ ] `doc` - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] codecov-commenter commented on pull request #19751: [fix] [test] fix flaky test BucketDelayedDeliveryTrackerTest. testWithBkException
codecov-commenter commented on PR #19751: URL: https://github.com/apache/pulsar/pull/19751#issuecomment-1461397026 # [Codecov](https://codecov.io/gh/apache/pulsar/pull/19751?src=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) Report > Merging [#19751](https://codecov.io/gh/apache/pulsar/pull/19751?src=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) (6575239) into [master](https://codecov.io/gh/apache/pulsar/commit/af1360fb167c1f9484fda5771df3ea9b21d1440b?el=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) (af1360f) will **increase** coverage by `5.68%`. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/pulsar/pull/19751/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pulsar/pull/19751?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) ```diff @@ Coverage Diff @@ ## master #19751 +/- ## + Coverage 26.40% 32.09% +5.68% + Complexity 6427 6389 -38 Files 1597 1674 +77 Lines123682 126618+2936 Branches 1351113819 +308 + Hits 3265840632+7974 + Misses8633679972-6364 - Partials 4688 6014+1326 ``` | Flag | Coverage Δ | | |---|---|---| | inttests | `24.53% <ø> (-0.05%)` | :arrow_down: | | systests | `25.37% <ø> (?)` | | | unittests | `17.31% <ø> (-0.09%)` | :arrow_down: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more. | [Impacted Files](https://codecov.io/gh/apache/pulsar/pull/19751?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) | Coverage Δ | | |---|---|---| | [.../main/java/org/apache/pulsar/client/api/Range.java](https://codecov.io/gh/apache/pulsar/pull/19751?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3B1bHNhci9jbGllbnQvYXBpL1JhbmdlLmphdmE=) | `0.00% <0.00%> (-27.28%)` | :arrow_down: | | [...ce/ConsistentHashingStickyKeyConsumerSelector.java](https://codecov.io/gh/apache/pulsar/pull/19751?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL0NvbnNpc3RlbnRIYXNoaW5nU3RpY2t5S2V5Q29uc3VtZXJTZWxlY3Rvci5qYXZh) | `63.46% <0.00%> (-17.31%)` | :arrow_down: | | [...ker/loadbalance/impl/LeastLongTermMessageRate.java](https://codecov.io/gh/apache/pulsar/pull/19751?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9sb2FkYmFsYW5jZS9pbXBsL0xlYXN0TG9uZ1Rlcm1NZXNzYWdlUmF0ZS5qYXZh) | `64.44% <0.00%> (-8.89%)` | :arrow_down: | | [...g/apache/bookkeeper/mledger/impl/PositionImpl.java](https://codecov.io/gh/apache/pulsar/pull/19751?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-bWFuYWdlZC1sZWRnZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2Jvb2trZWVwZXIvbWxlZGdlci9pbXBsL1Bvc2l0aW9uSW1wbC5qYXZh) | `67.79% <0.00%> (-6.78%)` | :arrow_down: | | [...g/apache/pulsar/broker/lookup/TopicLookupBase.java](https://codecov.io/gh/apache/pulsar/pull/19751?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9sb29rdXAvVG9waWNMb29rdXBCYXNlLmphdmE=) | `40.22% <0.00%> (-4.03%)` | :arrow_down: | | [...sar/broker/loadbalance/impl/LoadManagerShared.java](https://codecov.io/gh/apache/pulsar/pull/19751?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9sb2FkYmFsYW5jZS9pbXBsL0xvYWRNYW5hZ2VyU2hhcmVkLmphdmE=) | `37.28% <0.00%> (-2.64%)` | :arrow_down: | |
[GitHub] [pulsar] Technoboy- commented on pull request #19746: [fix][broker] Fix potential exception cause the policy service init fail.
Technoboy- commented on PR #19746: URL: https://github.com/apache/pulsar/pull/19746#issuecomment-1461357330 /pulsarbot run-failure-checks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] codecov-commenter commented on pull request #19735: [fix] [broker] delete topic failed if disabled system topic
codecov-commenter commented on PR #19735: URL: https://github.com/apache/pulsar/pull/19735#issuecomment-1461352203 # [Codecov](https://codecov.io/gh/apache/pulsar/pull/19735?src=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) Report > Merging [#19735](https://codecov.io/gh/apache/pulsar/pull/19735?src=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) (5b969a4) into [master](https://codecov.io/gh/apache/pulsar/commit/4ed8a871bfa9a7a8e30d86d782e33a556646d7b1?el=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) (4ed8a87) will **increase** coverage by `36.79%`. > The diff coverage is `100.00%`. [![Impacted file tree graph](https://codecov.io/gh/apache/pulsar/pull/19735/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pulsar/pull/19735?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) ```diff @@ Coverage Diff @@ ## master #19735 +/- ## = + Coverage 25.22% 62.02% +36.79% - Complexity 205 3500 +3295 = Files 1670 1848 +178 Lines126397 135871 +9474 Branches 1379714953 +1156 = + Hits 3188784274+52387 + Misses8954043851-45689 - Partials 4970 7746 +2776 ``` | Flag | Coverage Δ | | |---|---|---| | inttests | `24.44% <100.00%> (?)` | | | systests | `25.26% <100.00%> (+0.03%)` | :arrow_up: | | unittests | `59.25% <100.00%> (?)` | | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more. | [Impacted Files](https://codecov.io/gh/apache/pulsar/pull/19735?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) | Coverage Δ | | |---|---|---| | [...sar/broker/service/persistent/PersistentTopic.java](https://codecov.io/gh/apache/pulsar/pull/19735?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvUGVyc2lzdGVudFRvcGljLmphdmE=) | `64.14% <100.00%> (+21.36%)` | :arrow_up: | | [...va/org/apache/pulsar/common/util/NumberFormat.java](https://codecov.io/gh/apache/pulsar/pull/19735?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNvbW1vbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbW1vbi91dGlsL051bWJlckZvcm1hdC5qYXZh) | `66.66% <0.00%> (-12.50%)` | :arrow_down: | | [...dbalance/extensions/ExtensibleLoadManagerImpl.java](https://codecov.io/gh/apache/pulsar/pull/19735?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9sb2FkYmFsYW5jZS9leHRlbnNpb25zL0V4dGVuc2libGVMb2FkTWFuYWdlckltcGwuamF2YQ==) | `4.27% <0.00%> (-0.63%)` | :arrow_down: | | [...xtensions/channel/ServiceUnitStateChannelImpl.java](https://codecov.io/gh/apache/pulsar/pull/19735?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9sb2FkYmFsYW5jZS9leHRlbnNpb25zL2NoYW5uZWwvU2VydmljZVVuaXRTdGF0ZUNoYW5uZWxJbXBsLmphdmE=) | `0.53% <0.00%> (-0.01%)` | :arrow_down: | | [...in/java/org/apache/pulsar/common/api/AuthData.java](https://codecov.io/gh/apache/pulsar/pull/19735?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3B1bHNhci9jb21tb24vYXBpL0F1dGhEYXRhLmphdmE=) | `71.42% <0.00%> (ø)` | | | [...ava/org/apache/pulsar/client/api/schema/Field.java](https://codecov.io/gh/apache/pulsar/pull/19735?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3B1bHNhci9jbGllbnQvYXBpL3NjaGVtYS9GaWVsZC5qYXZh) | `80.00% <0.00%> (ø)` | | |
[GitHub] [pulsar] dao-jun opened a new pull request, #19758: [fix] Fix Authentication metric has infinity label values
dao-jun opened a new pull request, #19758: URL: https://github.com/apache/pulsar/pull/19758 ### Motivation The `pulsar_authentication_failures_count` metric is flooding with non-finite values in the reason label, which will cause a cardinality explosion on Grafana instances. This problem is impacting the performance and reliability of metrics. ### Modifications ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [x] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #19730: [improve][broker] PIP-192 Made only the leader consume TopBundlesLoadDataStore
Demogorgon314 commented on code in PR #19730: URL: https://github.com/apache/pulsar/pull/19730#discussion_r1130527811 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java: ## @@ -398,6 +425,50 @@ private boolean isInternalTopic(String topic) { || topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC); } +private void playLeader() { +log.info("This broker:{} is the leader now.", pulsar.getLookupServiceAddress()); +serviceUnitStateChannel.scheduleOwnershipMonitor(); +this.pulsar.getLoadManagerExecutor().execute(() -> { +try { +loadStoreInitWaiter.await(); +} catch (InterruptedException e) { Review Comment: After being interrupted, do we still need to start the table view? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19749: [improve][broker] PIP-192: Add metrics for unload operation
heesung-sn commented on code in PR #19749: URL: https://github.com/apache/pulsar/pull/19749#discussion_r1130519751 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadCounter.java: ## @@ -45,36 +48,57 @@ public class UnloadCounter { long unloadBrokerCount = 0; long unloadBundleCount = 0; -final Map> breakdownCounters; +@Getter +@VisibleForTesting +final Map> breakdownCounters; +@Getter +@VisibleForTesting double loadAvg; +@Getter +@VisibleForTesting double loadStd; +private volatile long updatedAt = 0; + public UnloadCounter() { breakdownCounters = Map.of( Success, Map.of( -Overloaded, new MutableLong(), -Underloaded, new MutableLong()), +Overloaded, new AtomicLong(), +Underloaded, new AtomicLong(), +Admin, new AtomicLong()), Skip, Map.of( -Balanced, new MutableLong(), -NoBundles, new MutableLong(), -CoolDown, new MutableLong(), -OutDatedData, new MutableLong(), -NoLoadData, new MutableLong(), -NoBrokers, new MutableLong(), -Unknown, new MutableLong()), +Balanced, new AtomicLong(), +NoBundles, new AtomicLong(), +CoolDown, new AtomicLong(), +OutDatedData, new AtomicLong(), +NoLoadData, new AtomicLong(), +NoBrokers, new AtomicLong(), +Unknown, new AtomicLong()), Failure, Map.of( -Unknown, new MutableLong()) +Unknown, new AtomicLong()) ); } public void update(UnloadDecision decision) { -var unloads = decision.getUnloads(); -unloadBrokerCount += unloads.keySet().size(); -unloadBundleCount += unloads.values().size(); - breakdownCounters.get(decision.getLabel()).get(decision.getReason()).increment(); -loadAvg = decision.loadAvg; -loadStd = decision.loadStd; +unloadBrokerCount++; +unloadBundleCount++; + breakdownCounters.get(decision.getLabel()).get(decision.getReason()).incrementAndGet(); +updatedAt = System.currentTimeMillis(); +} + +public void update(UnloadDecision.Label label, UnloadDecision.Reason reason) { +if (label == Success) { +unloadBundleCount++; +unloadBrokerCount++; Review Comment: @codelipenghui do you have any thoughts about this `brk_lb_unload_broker_total ` removal? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-go] BewareMyPower commented on pull request #988: Optimize batch index ACK performance
BewareMyPower commented on PR #988: URL: https://github.com/apache/pulsar-client-go/pull/988#issuecomment-1461328085 ## Performance test setup Run a Pulsar 2.11.0 standalone locally with `acknowledgmentAtBatchIndexLevelEnabled=true`. Build the `perf` binary: ```bash cd perf go build ``` Run the consumer in terminal 1: ```bash ./perf consume --profile \ -s sub \ --enable-batch-index-ack \ my-topic ``` Run the producer in terminal, the throughput is about 20 MB/s: ```bash ./perf produce \ --batching-num-messages=10 \ --size 105 \ --rate 20 \ my-topic ``` ## Test result ### Before this patch ``` INFO[13:31:17.584] Stats - Publish rate: 150038.6 msg/s - 120.2 Mbps - Latency ms: 50% 3.1 -95% 13.0 - 99% 48.9 - 99.9% 48.9 - max 48.9 INFO[13:31:27.581] Stats - Publish rate: 179909.8 msg/s - 144.1 Mbps - Latency ms: 50% 2.4 -95% 8.7 - 99% 26.2 - 99.9% 26.2 - max 26.2 INFO[13:31:37.581] Stats - Publish rate: 188957.1 msg/s - 151.4 Mbps - Latency ms: 50% 2.3 -95% 7.8 - 99% 15.9 - 99.9% 15.9 - max 15.9 ``` ``` INFO[13:31:11.592] Stats - Consume rate: 22956.0 msg/s - 18.4 Mbps INFO[13:31:21.592] Stats - Consume rate: 46805.3 msg/s - 37.5 Mbps INFO[13:31:31.593] Stats - Consume rate: 47494.7 msg/s - 38.0 Mbps INFO[13:31:41.594] Stats - Consume rate: 48100.3 msg/s - 38.5 Mbps INFO[13:31:51.592] Stats - Consume rate: 59705.8 msg/s - 47.8 Mbps INFO[13:32:01.597] Stats - Consume rate: 61694.0 msg/s - 49.4 Mbps INFO[13:32:11.592] Stats - Consume rate: 58100.2 msg/s - 46.5 Mbps INFO[13:32:21.592] Stats - Consume rate: 60818.2 msg/s - 48.7 Mbps INFO[13:32:31.596] Stats - Consume rate: 60681.5 msg/s - 48.6 Mbps ``` The consumer can not catch up the producer even if the produce rate is only 20 MB/s. So it fell back to catch up read and the broker reads messages directly with BK. ### After this patch ``` INFO[13:27:33.212] Stats - Publish rate: 165290.5 msg/s - 132.4 Mbps - Latency ms: 50% 2.9 -95% 11.3 - 99% 58.2 - 99.9% 58.2 - max 58.2 INFO[13:27:43.213] Stats - Publish rate: 197230.3 msg/s - 158.0 Mbps - Latency ms: 50% 2.4 -95% 5.5 - 99% 22.2 - 99.9% 22.2 - max 22.2 INFO[13:27:53.212] Stats - Publish rate: 193150.6 msg/s - 154.7 Mbps - Latency ms: 50% 2.5 -95% 5.9 - 99% 17.6 - 99.9% 17.6 - max 17.6 INFO[13:28:03.212] Stats - Publish rate: 170759.9 msg/s - 136.8 Mbps - Latency ms: 50% 2.6 -95% 5.7 - 99% 23.4 - 99.9% 23.4 - max 23.4 INFO[13:28:13.212] Stats - Publish rate: 149780.3 msg/s - 120.0 Mbps - Latency ms: 50% 2.9 -95% 6.4 - 99% 61.2 - 99.9% 61.2 - max 61.2 INFO[13:28:23.212] Stats - Publish rate: 157285.9 msg/s - 126.0 Mbps - Latency ms: 50% 2.7 -95% 6.2 - 99% 14.2 - 99.9% 14.2 - max 14.2 ``` ``` INFO[13:27:34.523] Stats - Consume rate: 180047.6 msg/s - 144.2 Mbps INFO[13:27:44.524] Stats - Consume rate: 197120.2 msg/s - 157.9 Mbps INFO[13:27:54.523] Stats - Consume rate: 193490.8 msg/s - 155.0 Mbps INFO[13:28:04.523] Stats - Consume rate: 165149.2 msg/s - 132.3 Mbps INFO[13:28:14.523] Stats - Consume rate: 150062.6 msg/s - 120.2 Mbps INFO[13:28:24.523] Stats - Consume rate: 154697.5 msg/s - 123.9 Mbps ``` ### Flame Graph Before this patch: ![image](https://user-images.githubusercontent.com/18204803/223931561-3b71c01a-5305-4cef-93ce-7977b0021994.png) ![image](https://user-images.githubusercontent.com/18204803/223931801-71892bde-0a2a-4ef0-8b42-364783dd97d8.png) We can see `internalSendRequest` takes about 40% of the total, which also leads to the result that `runEventLoop` method also takes 8.87% of the total. After this patch: ![image](https://user-images.githubusercontent.com/18204803/223932285-1e8adb5e-02b4-43a7-9ba9-beb854706fe3.png) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19749: [improve][broker] PIP-192: Add metrics for unload operation
heesung-sn commented on code in PR #19749: URL: https://github.com/apache/pulsar/pull/19749#discussion_r1130026707 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java: ## @@ -74,7 +81,13 @@ public CompletableFuture waitAsync(CompletableFuture eventPubFuture, } }); return future; -})); +})).whenComplete((__, ex) -> { +if (ex != null) { +counter.update(Failure, Unknown); +return; +} +counter.update(decision); Review Comment: Let's remove `counter.update(decision)` here but update the success counter when completed in `this.complete(..)`. Also, update the failure counter if an exception is passed in `this.complete(..)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] Demogorgon314 closed pull request #19622: [improve][broker] PIP-192 Added SplitScheduler and DefaultNamespaceBundleSplitStrategyImpl
Demogorgon314 closed pull request #19622: [improve][broker] PIP-192 Added SplitScheduler and DefaultNamespaceBundleSplitStrategyImpl URL: https://github.com/apache/pulsar/pull/19622 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode commented on pull request #19735: [fix] [broker] delete topic failed if disabled system topic
poorbarcode commented on PR #19735: URL: https://github.com/apache/pulsar/pull/19735#issuecomment-1461326658 /pulsarbot run-failure-checks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode commented on pull request #19751: [fix] [test] fix flaky test BucketDelayedDeliveryTrackerTest. testWithBkException
poorbarcode commented on PR #19751: URL: https://github.com/apache/pulsar/pull/19751#issuecomment-1461326390 /pulsarbot run-failure-checks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-go] BewareMyPower opened a new pull request, #988: Optimize batch index ACK performance
BewareMyPower opened a new pull request, #988: URL: https://github.com/apache/pulsar-client-go/pull/988 ### Motivation Currently, when `EnableBatchIndexAck` is true, the ACK performance is very poor. There are two main reasons: 1. Acknowledgment by list is not supported. It means that even N MessageIDs are grouped, there are still N ACK requests to send. 2. The implementation of ACK grouping tracker is wrong. Give a batch that has N messages, when batch index ACK is enabled, each MessageID is cached. However, after all these N MessageIDs arrived, the current implementation does not clear them. ### Modifications - Add a `func(id []*pb.MessageIdData)` to the ACK grouping tracker. When flushing individual ACKs, construct the slice and wrap the slice to `CommandAck` directly. - Refactor the implementation of the ACK grouping tracker to have these two data structures: - `pendingAcks`: Cache the non-batched MessageIDs - `pendingBatchAcks`: Cache the batched MessageIDs, once all messages in a batch have been added, remove them from it and add the non-batched MessageID to `pendingAcks` - Add `TestBatchIndexAckAllMessages` to verify the new behavior. After this change, the ACK order cannot be guaranteed, sort the acknowledged MessageIDs in the `ack_grouping_tracker_test.go`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19622: [improve][broker] PIP-192 Added SplitScheduler and DefaultNamespaceBundleSplitStrategyImpl
heesung-sn commented on code in PR #19622: URL: https://github.com/apache/pulsar/pull/19622#discussion_r1130492695 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java: ## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.manager; + +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; + +/** + * Split manager. + */ +@Slf4j +public class SplitManager implements StateChangeListener { + +record InFlightSplitRequest(SplitDecision splitDecision, CompletableFuture future) { +} + +private final Map inFlightSplitRequests; + +private final SplitCounter counter; + +public SplitManager(SplitCounter splitCounter) { +this.inFlightSplitRequests = new ConcurrentHashMap<>(); +this.counter = splitCounter; +} + +private void complete(String serviceUnit, Throwable ex) { +inFlightSplitRequests.computeIfPresent(serviceUnit, (__, inFlightSplitRequest) -> { +var future = inFlightSplitRequest.future; +if (!future.isDone()) { +if (ex != null) { +counter.update(Failure, Unknown); Review Comment: Nice catch. Updated. ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.manager; + +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; + +/** + * Split manager. + */ +@Slf4j +public class SplitManager implements StateChangeListener { + +record InFlightSplitRequest(SplitDecision splitDecision, CompletableFuture future) { +} + +private final Map inFlightSplitRequests; + +private final SplitCounter counter; + +public SplitManager(SplitCounter splitCounter) { +this.inFlightSplitRequests = new ConcurrentHashMap<>(); +this.counter = splitCounter; +} + +private void complete(String serviceUnit, Throwable ex) { +
[GitHub] [pulsar] Technoboy- commented on pull request #19751: [fix] [test] fix flaky test BucketDelayedDeliveryTrackerTest. testWithBkException
Technoboy- commented on PR #19751: URL: https://github.com/apache/pulsar/pull/19751#issuecomment-1461295805 /pulsarbot run-failure-checks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #19622: [improve][broker] PIP-192 Added SplitScheduler and DefaultNamespaceBundleSplitStrategyImpl
Demogorgon314 commented on code in PR #19622: URL: https://github.com/apache/pulsar/pull/19622#discussion_r1130449381 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.manager; + +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; + +/** + * Split manager. + */ +@Slf4j +public class SplitManager implements StateChangeListener { + +record InFlightSplitRequest(SplitDecision splitDecision, CompletableFuture future) { +} + +private final Map inFlightSplitRequests; + +private final SplitCounter counter; + +public SplitManager(SplitCounter splitCounter) { +this.inFlightSplitRequests = new ConcurrentHashMap<>(); +this.counter = splitCounter; +} + +private void complete(String serviceUnit, Throwable ex) { +inFlightSplitRequests.computeIfPresent(serviceUnit, (__, inFlightSplitRequest) -> { +var future = inFlightSplitRequest.future; +if (!future.isDone()) { +if (ex != null) { +future.completeExceptionally(ex); +} else { +future.complete(null); +} +} +return null; +}); +} + +public CompletableFuture waitAsync(CompletableFuture eventPubFuture, + String bundle, + SplitDecision decision, + long timeout, + TimeUnit timeoutUnit) { +return eventPubFuture +.thenCompose(__ -> inFlightSplitRequests.computeIfAbsent(bundle, ignore -> { +log.info("Published the bundle split event for bundle:{}. " ++ "Waiting the split event to complete. Timeout: {} {}", +bundle, timeout, timeoutUnit); +CompletableFuture future = new CompletableFuture<>(); +future.orTimeout(timeout, timeoutUnit).whenComplete((v, ex) -> { +if (ex != null) { +inFlightSplitRequests.remove(bundle); +log.warn("Timed out while waiting for the bundle split event: {}", bundle, ex); +} +}); +return new InFlightSplitRequest(decision, future); Review Comment: Now we don't need `InFlightSplitRequest` obj right? Since we already pass the decision in `whenComplete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] nodece commented on pull request #18114: [fix][broker] Fix RetentionPolicies types mismatch.
nodece commented on PR #18114: URL: https://github.com/apache/pulsar/pull/18114#issuecomment-1461283902 > > Question: Why don't we choose `long`? > > `int` is enough. I think we still should use the `long`, `long` can be compatible with `int`. @Technoboy- @mattisonchao -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] nodece commented on pull request #18114: [fix][broker] Fix RetentionPolicies types mismatch.
nodece commented on PR #18114: URL: https://github.com/apache/pulsar/pull/18114#issuecomment-1461277947 Hi @Technoboy-, why not change the constructor of the `RetentionPolicies`? So like: ``` public RetentionPolicies(int retentionTimeInMinutes, long retentionSizeInMB) { ``` `long` can be compatible with `int`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19622: [improve][broker] PIP-192 Added SplitScheduler and DefaultNamespaceBundleSplitStrategyImpl
heesung-sn commented on code in PR #19622: URL: https://github.com/apache/pulsar/pull/19622#discussion_r1130429714 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java: ## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.manager; + +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; + +/** + * Split manager. + */ +@Slf4j +public class SplitManager implements StateChangeListener { + +record InFlightSplitRequest(SplitDecision splitDecision, CompletableFuture future) { +} + +private final Map inFlightSplitRequests; + +private final SplitCounter counter; + +public SplitManager(SplitCounter splitCounter) { +this.inFlightSplitRequests = new ConcurrentHashMap<>(); +this.counter = splitCounter; +} + +private void complete(String serviceUnit, Throwable ex) { +inFlightSplitRequests.computeIfPresent(serviceUnit, (__, inFlightSplitRequest) -> { +var future = inFlightSplitRequest.future; +if (!future.isDone()) { +if (ex != null) { +counter.update(Failure, Unknown); Review Comment: oh I see. I think we need to clean this part. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] codecov-commenter commented on pull request #19756: [fix][io] KCA: 'desanitize' topic name for the pulsar's ctx calls
codecov-commenter commented on PR #19756: URL: https://github.com/apache/pulsar/pull/19756#issuecomment-1461274188 # [Codecov](https://codecov.io/gh/apache/pulsar/pull/19756?src=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) Report > Merging [#19756](https://codecov.io/gh/apache/pulsar/pull/19756?src=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) (dc57e44) into [master](https://codecov.io/gh/apache/pulsar/commit/af1360fb167c1f9484fda5771df3ea9b21d1440b?el=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) (af1360f) will **increase** coverage by `35.61%`. > The diff coverage is `82.35%`. [![Impacted file tree graph](https://codecov.io/gh/apache/pulsar/pull/19756/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pulsar/pull/19756?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) ```diff @@ Coverage Diff @@ ## master #19756 +/- ## = + Coverage 26.40% 62.02% +35.61% - Complexity 642725758+19331 = Files 1597 1848 +251 Lines123682 135882+12200 Branches 1351114954 +1443 = + Hits 3265884279+51621 + Misses8633643810-42526 - Partials 4688 7793 +3105 ``` | Flag | Coverage Δ | | |---|---|---| | inttests | `24.53% <ø> (-0.04%)` | :arrow_down: | | systests | `25.12% <ø> (?)` | | | unittests | `59.37% <82.35%> (+41.97%)` | :arrow_up: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more. | [Impacted Files](https://codecov.io/gh/apache/pulsar/pull/19756?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) | Coverage Δ | | |---|---|---| | [...ache/pulsar/io/kafka/connect/KafkaConnectSink.java](https://codecov.io/gh/apache/pulsar/pull/19756?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWlvL2thZmthLWNvbm5lY3QtYWRhcHRvci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2lvL2thZmthL2Nvbm5lY3QvS2Fma2FDb25uZWN0U2luay5qYXZh) | `73.06% <70.00%> (ø)` | | | [...r/io/kafka/connect/PulsarKafkaSinkTaskContext.java](https://codecov.io/gh/apache/pulsar/pull/19756?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWlvL2thZmthLWNvbm5lY3QtYWRhcHRvci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2lvL2thZmthL2Nvbm5lY3QvUHVsc2FyS2Fma2FTaW5rVGFza0NvbnRleHQuamF2YQ==) | `64.03% <100.00%> (ø)` | | | [...ce/ConsistentHashingStickyKeyConsumerSelector.java](https://codecov.io/gh/apache/pulsar/pull/19756?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL0NvbnNpc3RlbnRIYXNoaW5nU3RpY2t5S2V5Q29uc3VtZXJTZWxlY3Rvci5qYXZh) | `76.92% <0.00%> (-3.85%)` | :arrow_down: | | [...in/java/org/apache/pulsar/common/api/AuthData.java](https://codecov.io/gh/apache/pulsar/pull/19756?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3B1bHNhci9jb21tb24vYXBpL0F1dGhEYXRhLmphdmE=) | `71.42% <0.00%> (ø)` | | | [.../apache/pulsar/broker/namespace/LookupOptions.java](https://codecov.io/gh/apache/pulsar/pull/19756?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9uYW1lc3BhY2UvTG9va3VwT3B0aW9ucy5qYXZh) | `87.50% <0.00%> (ø)` | | | [.../apache/pulsar/common/naming/SystemTopicNames.java](https://codecov.io/gh/apache/pulsar/pull/19756?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNvbW1vbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbW1vbi9uYW1pbmcvU3lzdGVtVG9waWNOYW1lcy5qYXZh) | `55.55% <0.00%> (ø)` | | |
[GitHub] [pulsar] coderzc closed pull request #19718: [cherry-pick][branch-2.9] Upgrade Netty to 4.1.87.Final (#19417)
coderzc closed pull request #19718: [cherry-pick][branch-2.9] Upgrade Netty to 4.1.87.Final (#19417) URL: https://github.com/apache/pulsar/pull/19718 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-cpp] massakam commented on pull request #208: Support configure global level client version string
massakam commented on PR #208: URL: https://github.com/apache/pulsar-client-cpp/pull/208#issuecomment-1461256213 Is it possible that users are including `Version.h` and using these macros? If so, removing these macros may be a breaking change. - PULSAR_VERSION - PULSAR_VERSION_STR FYI, these macros were added in https://github.com/apache/pulsar/pull/12769. These were originally intended to be used by the Pulsar Node.js client, but ended up not being used. https://github.com/apache/pulsar-client-node/pull/252#discussion_r1040405072 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19622: [improve][broker] PIP-192 Added SplitScheduler and DefaultNamespaceBundleSplitStrategyImpl
heesung-sn commented on code in PR #19622: URL: https://github.com/apache/pulsar/pull/19622#discussion_r1130392966 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java: ## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.manager; + +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; + +/** + * Split manager. + */ +@Slf4j +public class SplitManager implements StateChangeListener { + +record InFlightSplitRequest(SplitDecision splitDecision, CompletableFuture future) { +} + +private final Map inFlightSplitRequests; + +private final SplitCounter counter; + +public SplitManager(SplitCounter splitCounter) { +this.inFlightSplitRequests = new ConcurrentHashMap<>(); +this.counter = splitCounter; +} + +private void complete(String serviceUnit, Throwable ex) { +inFlightSplitRequests.computeIfPresent(serviceUnit, (__, inFlightSplitRequest) -> { +var future = inFlightSplitRequest.future; +if (!future.isDone()) { +if (ex != null) { +counter.update(Failure, Unknown); Review Comment: Can you clarify how we are updating this failure counter twice? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #19622: [improve][broker] PIP-192 Added SplitScheduler and DefaultNamespaceBundleSplitStrategyImpl
Demogorgon314 commented on code in PR #19622: URL: https://github.com/apache/pulsar/pull/19622#discussion_r1130387921 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java: ## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.manager; + +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; + +/** + * Split manager. + */ +@Slf4j +public class SplitManager implements StateChangeListener { + +record InFlightSplitRequest(SplitDecision splitDecision, CompletableFuture future) { +} + +private final Map inFlightSplitRequests; + +private final SplitCounter counter; + +public SplitManager(SplitCounter splitCounter) { +this.inFlightSplitRequests = new ConcurrentHashMap<>(); +this.counter = splitCounter; +} + +private void complete(String serviceUnit, Throwable ex) { +inFlightSplitRequests.computeIfPresent(serviceUnit, (__, inFlightSplitRequest) -> { +var future = inFlightSplitRequest.future; +if (!future.isDone()) { +if (ex != null) { +counter.update(Failure, Unknown); Review Comment: I suggest moving the counter update to `waitAsync`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #19622: [improve][broker] PIP-192 Added SplitScheduler and DefaultNamespaceBundleSplitStrategyImpl
Demogorgon314 commented on code in PR #19622: URL: https://github.com/apache/pulsar/pull/19622#discussion_r1130379642 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java: ## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.manager; + +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; + +/** + * Split manager. + */ +@Slf4j +public class SplitManager implements StateChangeListener { + +record InFlightSplitRequest(SplitDecision splitDecision, CompletableFuture future) { +} + +private final Map inFlightSplitRequests; + +private final SplitCounter counter; + +public SplitManager(SplitCounter splitCounter) { +this.inFlightSplitRequests = new ConcurrentHashMap<>(); +this.counter = splitCounter; +} + +private void complete(String serviceUnit, Throwable ex) { +inFlightSplitRequests.computeIfPresent(serviceUnit, (__, inFlightSplitRequest) -> { +var future = inFlightSplitRequest.future; +if (!future.isDone()) { +if (ex != null) { +counter.update(Failure, Unknown); Review Comment: If we update the counter here, when complete exception it will updates twice -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] yyj8 commented on pull request #19047: [improve][broker]PIP-214 Add broker level metrics statistics and expose to prometheus
yyj8 commented on PR #19047: URL: https://github.com/apache/pulsar/pull/19047#issuecomment-1461219602 > Why delete `site2` folder? > Why delete `site2` folder? This is an incorrect operation in handling conflicts. Attempting to repair it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] yyj8 commented on pull request #19047: [improve][broker]PIP-214 Add broker level metrics statistics and expose to prometheus
yyj8 commented on PR #19047: URL: https://github.com/apache/pulsar/pull/19047#issuecomment-1461214983 > > Why delete `site2` folder? > > I guess it's a git command mistake; e.g., rebase and thus change the commit hash while keep the changeset. > > @yyj8 I'd suggest you redo the change and force push to `pip-214` branch. What you said is that I first keep the master branch of my own code warehouse consistent with the master branch of the community, then use my own master to forcibly cover the pip-214 branch, and then modify it on pip-214, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-go] CrazyCollin commented on a diff in pull request #987: [feat] Expose the chunk config of consumer to the reader
CrazyCollin commented on code in PR #987: URL: https://github.com/apache/pulsar-client-go/pull/987#discussion_r1130364401 ## pulsar/reader_test.go: ## @@ -70,6 +70,49 @@ func TestReaderConfigSubscribeName(t *testing.T) { assert.NotNil(t, consumer) } +func TestReaderConfigChunk(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + consumer1, err := client.CreateReader(ReaderOptions{ + Topic: "my-topic1", + StartMessageID: EarliestMessageID(), + MaxPendingChunkedMessage:50, + ExpireTimeOfIncompleteChunk: 30 * time.Second, + AutoAckIncompleteChunk: true, + }) + if err != nil { + t.Fatal(err) + } Review Comment: Thank you for your advice, but a little confused to me, shouldn't `assert.Nil` be used here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19193: [fix] [doc] fix multiple apis in the automatically generated documentation use the same anchor point
poorbarcode commented on code in PR #19193: URL: https://github.com/apache/pulsar/pull/19193#discussion_r1130364199 ## pulsar-broker/pom.xml: ## @@ -697,11 +697,13 @@ com.github.kongchen swagger-maven-plugin -3.1.7 +3.1.8 false + {{className}}_{{methodName}} Review Comment: After keeping only the latest version of admin API(such as `v2`, `v3`), `{className}_{methodName}` will not conflict. If we open multiple versions of API in the future, we need to change the rule to `{package_name}_{className}_{methodName}` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] codecov-commenter commented on pull request #19047: [improve][broker]PIP-214 Add broker level metrics statistics and expose to prometheus
codecov-commenter commented on PR #19047: URL: https://github.com/apache/pulsar/pull/19047#issuecomment-1461211606 # [Codecov](https://codecov.io/gh/apache/pulsar/pull/19047?src=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) Report > Merging [#19047](https://codecov.io/gh/apache/pulsar/pull/19047?src=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) (07365cd) into [master](https://codecov.io/gh/apache/pulsar/commit/d8569cd4ec6da14f8b2b9338db1ed2f6a3eacf0a?el=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) (d8569cd) will **decrease** coverage by `23.47%`. > The diff coverage is `0.00%`. [![Impacted file tree graph](https://codecov.io/gh/apache/pulsar/pull/19047/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pulsar/pull/19047?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) ```diff @@ Coverage Diff @@ ## master #19047 +/- ## = - Coverage 47.20% 23.73% -23.47% + Complexity10645 298-10347 = Files 709 1672 +963 Lines 69421 128109+58688 Branches 744913967 +6518 = - Hits 3276930406 -2363 - Misses3298493197+60213 - Partials 3668 4506 +838 ``` | Flag | Coverage Δ | | |---|---|---| | inttests | `23.73% <0.00%> (?)` | | | unittests | `?` | | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more. | [Impacted Files](https://codecov.io/gh/apache/pulsar/pull/19047?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) | Coverage Δ | | |---|---|---| | [...broker/stats/prometheus/AggregatedBrokerStats.java](https://codecov.io/gh/apache/pulsar/pull/19047?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zdGF0cy9wcm9tZXRoZXVzL0FnZ3JlZ2F0ZWRCcm9rZXJTdGF0cy5qYXZh) | `0.00% <0.00%> (ø)` | | | [...ker/stats/prometheus/NamespaceStatsAggregator.java](https://codecov.io/gh/apache/pulsar/pull/19047?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zdGF0cy9wcm9tZXRoZXVzL05hbWVzcGFjZVN0YXRzQWdncmVnYXRvci5qYXZh) | `0.00% <0.00%> (ø)` | | | [...java/org/apache/pulsar/proxy/stats/TopicStats.java](https://codecov.io/gh/apache/pulsar/pull/19047?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLXByb3h5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9wdWxzYXIvcHJveHkvc3RhdHMvVG9waWNTdGF0cy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: | | [...ava/org/apache/pulsar/broker/admin/v1/Brokers.java](https://codecov.io/gh/apache/pulsar/pull/19047?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9hZG1pbi92MS9Ccm9rZXJzLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: | | [...va/org/apache/pulsar/broker/admin/v1/Clusters.java](https://codecov.io/gh/apache/pulsar/pull/19047?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9hZG1pbi92MS9DbHVzdGVycy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: | | [.../org/apache/pulsar/broker/admin/v1/Properties.java](https://codecov.io/gh/apache/pulsar/pull/19047?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9hZG1pbi92MS9Qcm9wZXJ0aWVzLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: | |
[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19193: [fix] [doc] fix multiple apis in the automatically generated documentation use the same anchor point
poorbarcode commented on code in PR #19193: URL: https://github.com/apache/pulsar/pull/19193#discussion_r1130361324 ## pulsar-broker/pom.xml: ## @@ -697,11 +697,13 @@ com.github.kongchen swagger-maven-plugin -3.1.7 +3.1.8 false + {{className}}_{{methodName}} + json Review Comment: > nit: Is this json outputFormats necessary? It should be optional, but maven or swagger had issues, and I had null Pointers when testing locally. I tried troubleshooting the issue yesterday but couldn't get the root cause quickly, so I thought adding this configuration would avoid unnecessary trouble. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] tisonkun commented on pull request #19047: [improve][broker]PIP-214 Add broker level metrics statistics and expose to prometheus
tisonkun commented on PR #19047: URL: https://github.com/apache/pulsar/pull/19047#issuecomment-1461209461 > Why delete `site2` folder? I guess it's a git command mistake; e.g., rebase and thus change the commit hash while keep the changeset. @yyj8 I'd suggest you redo the change and force push to `pip-214` branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] momo-jun commented on issue #18822: PIP-228: Refactor Information Architecture of Pulsar Client Documentation
momo-jun commented on issue #18822: URL: https://github.com/apache/pulsar/issues/18822#issuecomment-1461207870 Hi bot, this PIP is not stale, but on track and in good health. Sorry for not updating it :) Attached the links to the follow-up doc enhancements implemented by @RobertIndie : * Added the missing code examples for using Java - https://github.com/apache/pulsar-site/pull/430 * Added how to initialize C++/Python client - https://github.com/apache/pulsar-site/pull/444/ * Will add the missing configs for C++/Python/C#/Go clients * Will add the missing code examples for C++/Go/Python/Node.js/C# clients -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] mattisonchao commented on a diff in pull request #19754: [fix][meta] Fix deadlock causes session notification not to work
mattisonchao commented on code in PR #19754: URL: https://github.com/apache/pulsar/pull/19754#discussion_r1130352638 ## pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java: ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata.coordination.impl; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; +import lombok.Cleanup; +import org.apache.pulsar.metadata.BaseMetadataStoreTest; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.coordination.CoordinationService; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.testng.annotations.Test; + +public class LeaderElectionImplTest extends BaseMetadataStoreTest { Review Comment: The test will run in the other group. https://user-images.githubusercontent.com/74767115/223906841-d6275977-8dc1-41f6-b8d2-962723f780bd.png;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19622: [improve][broker] PIP-192 Added SplitScheduler and DefaultNamespaceBundleSplitStrategyImpl
heesung-sn commented on code in PR #19622: URL: https://github.com/apache/pulsar/pull/19622#discussion_r1130351864 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.manager; + +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; + +/** + * Split manager. + */ +@Slf4j +public class SplitManager implements StateChangeListener { + +record InFlightSplitRequest(SplitDecision splitDecision, CompletableFuture future) { +} + +private final Map inFlightSplitRequests; + +private final SplitCounter counter; + +public SplitManager(SplitCounter splitCounter) { +this.inFlightSplitRequests = new ConcurrentHashMap<>(); +this.counter = splitCounter; +} + +private void complete(String serviceUnit, Throwable ex) { +inFlightSplitRequests.computeIfPresent(serviceUnit, (__, inFlightSplitRequest) -> { +var future = inFlightSplitRequest.future; +if (!future.isDone()) { +if (ex != null) { +counter.update(Failure, Unknown); +future.completeExceptionally(ex); +log.error("Failed the bundle split event: {}", serviceUnit, ex); +} else { +counter.update(inFlightSplitRequest.splitDecision); +future.complete(null); +log.info("Completed the bundle split event: {}", serviceUnit); +} +} +return null; +}); +} + +public CompletableFuture waitAsync(CompletableFuture eventPubFuture, + String bundle, + SplitDecision decision, + long timeout, + TimeUnit timeoutUnit) { +return eventPubFuture +.thenCompose(__ -> inFlightSplitRequests.computeIfAbsent(bundle, ignore -> { +log.info("Published the bundle split event for bundle:{}. " ++ "Waiting the split event to complete. Timeout: {} {}", +bundle, timeout, timeoutUnit); +CompletableFuture future = new CompletableFuture<>(); +future.orTimeout(timeout, timeoutUnit).whenComplete((v, ex) -> { +if (ex != null) { +inFlightSplitRequests.remove(bundle); +log.warn("Timed out while waiting for the bundle split event: {}", bundle, ex); +} +}); +return new InFlightSplitRequest(decision, future); +}).future) +.exceptionally(e -> { +log.error("Failed to publish the bundle split event for bundle:{}. Skipping wait.", bundle); +counter.update(Failure, Unknown); +return null; Review Comment: Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] github-actions[bot] deleted a comment on the discussion: All instances of the production cluster are full gc, and topic fenced cannot be written
GitHub user github-actions[bot] deleted a comment on the discussion: All instances of the production cluster are full gc, and topic fenced cannot be written The issue had no activity for 30 days, mark with Stale label. GitHub link: https://github.com/apache/pulsar/discussions/19757#discussioncomment-5248790 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
[GitHub] [pulsar] keyboardbobo edited a discussion: All instances of the production cluster are full gc, and topic fenced cannot be written
GitHub user keyboardbobo edited a discussion: All instances of the production cluster are full gc, and topic fenced cannot be written ### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. There are 9 brokers and 9 bookies in our production environment. After running for half a year, the number of topic partitions increased from 160 to 280. After restarting the client application, all instances showed full gc after a few hours. Each traffic has only about 50M traffic ### Version broker 2.9.2 , bookkeeper 4.14 ### Minimal reproduce step Later we reproduced the problem in the test environment, the cluster has 4 bookies and 4 brokers, persistent://qlm-test/qlm-ns/qlm-test has 500 partitions, The client is an independent machine, and the CPU load is relatively high,The traffic of each broker is only 40M,Full gc can be reproduced locally with the following script: **Start 5 instances on the same linux server**:nohup bin/pulsar-perf produce -threads 20 -u pulsar://clusterIp:port -n 20 -s 200 -r 10 persistent://qlm-test/qlm-ns/qlm-test & **Start 2 instances on the other same linux server**:nohup bin/pulsar-perf consume -u pulsar://clusterIp:port -q 5000 -ss qlm-sub -st Shared persistent://qlm-test/qlm-ns/qlm-test & **The broker Full gc log:** > 2022-11-10T15:40:55.685+0800: 4028.874: [Full GC (Allocation Failure) > 15G->7299M(16G), 27.2368180 secs] >[Eden: 0.0B(8192.0M)->0.0B(8192.0M) Survivors: 0.0B->0.0B Heap: > 16.0G(16.0G)->7299.4M(16.0G)], [Metaspace: 90357K->86677K(1140736K)] > [Times: user=47.76 sys=1.22, real=27.24 secs] > 2022-11-10T15:41:22.924+0800: 4056.113: [GC concurrent-mark-abort] > 2022-11-10T15:41:28.922+0800: 4062.110: [GC pause (GCLocker Initiated GC) > (young) (to-space exhausted), 7.1966394 secs] >[Parallel Time: 6586.0 ms, GC Workers: 32] > [GC Worker Start (ms): Min: 4062112.3, Avg: 4062112.5, Max: 4062112.7, > Diff: 0.4] > [Ext Root Scanning (ms): Min: 1.4, Avg: 2.2, Max: 7.8, Diff: 6.4, Sum: > 71.9] > [Update RS (ms): Min: 0.0, Avg: 3.9, Max: 5.3, Diff: 5.3, Sum: 125.8] > [Processed Buffers: Min: 0, Avg: 10.1, Max: 23, Diff: 23, Sum: 323] > [Scan RS (ms): Min: 13.8, Avg: 15.4, Max: 15.8, Diff: 2.0, Sum: 493.0] > [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.1, Diff: 0.1, Sum: > 0.4] > [Object Copy (ms): Min: 2558.9, Avg: 2684.2, Max: 6563.5, Diff: 4004.6, > Sum: 85894.9] > [Termination (ms): Min: 0.0, Avg: 3879.3, Max: 4004.6, Diff: 4004.6, > Sum: 124138.1] > [Termination Attempts: Min: 1, Avg: 10.9, Max: 18, Diff: 17, Sum: > 348] > [GC Worker Other (ms): Min: 0.0, Avg: 0.3, Max: 0.6, Diff: 0.6, Sum: > 9.7] > [GC Worker Total (ms): Min: 6584.9, Avg: 6585.4, Max: 6585.8, Diff: > 0.9, Sum: 210733.7] > [GC Worker End (ms): Min: 4068697.6, Avg: 4068697.9, Max: 4068698.2, > Diff: 0.5] >[Code Root Fixup: 0.2 ms] >[Code Root Purge: 0.2 ms] >[Clear CT: 3.5 ms] >[Other: 606.6 ms] > [Evacuation Failure: 573.9 ms] > [Choose CSet: 0.0 ms] > [Ref Proc: 6.1 ms] > [Ref Enq: 0.5 ms] > [Redirty Cards: 5.2 ms] > [Humongous Register: 0.1 ms] > [Humongous Reclaim: 1.0 ms] > [Free CSet: 11.8 ms] >[Eden: 8200.0M(8192.0M)->0.0B(7328.0M) Survivors: 0.0B->864.0M Heap: > 15.1G(16.0G)->14.9G(16.0G)] **The broker has more logs like this:** > 2022-11-10 15:41:27.0581 [BookKeeperClientWorker-OrderedExecutor-2-0] WARN > org.apache.bookkeeper.client.PendingAddOp - Fencing exception on write: > L780738 E1371097 on 10.101.129.75:3181 > 2022-11-10 15:41:27.0581 [BookKeeperClientWorker-OrderedExecutor-39-0] WARN > org.apache.bookkeeper.client.PendingAddOp - Fencing exception on write: > L780711 E1370071 on 10.101.129.68:3181 > 2022-11-10 15:41:27.0581 [BookKeeperClientWorker-OrderedExecutor-18-0] ERROR > org.apache.bookkeeper.client.PendingAddOp - Write of ledger entry to quorum > failed: L780690 E1374427 > 2022-11-10 15:41:27.0581 [BookKeeperClientWorker-OrderedExecutor-40-0] WARN > org.apache.bookkeeper.client.PendingAddOp - Failed to write entry (780712, > 1354072): Bookie operation timeout > 2022-11-10 15:41:27.0581 [BookKeeperClientWorker-OrderedExecutor-7-0] WARN > org.apache.bookkeeper.client.PendingAddOp - Fencing exception on write: > L780743 E1370335 on 10.101.129.68:3181 > 2022-11-10 15:41:27.0581 [BookKeeperClientWorker-OrderedExecutor-18-0] ERROR > org.apache.bookkeeper.client.PendingAddOp - Write of ledger entry to quorum > failed: L780690 E1374428 **The client has more logs like this:** > 2022-11-10 16:48:50.0588 [pulsar-timer-78-1] INFO > org.apache.pulsar.client.impl.ConnectionHandler - > [persistent://qlm-test/qlm-ns/qlm-test-partition-279] [pulsar_dev2-118-13100] > Reconnecting after timeout > 2022-11-10 16:48:50.0588 [pulsar-client-io-16-1] WARN > org.apache.pulsar.client.impl.ClientCnx -
[GitHub] [pulsar] shy-share commented on issue #19752: How can I get the list of topics?
shy-share commented on issue #19752: URL: https://github.com/apache/pulsar/issues/19752#issuecomment-1461201026 You can try to use java sdk: ` List allTopics = pulsarAdmin.topics().getList(tenant + "/" + namespace);` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch branch-2.9 updated: [fix][meta] Fix deadlock causes session notification not to work (#19754)
This is an automated email from the ASF dual-hosted git repository. zhaocong pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.9 by this push: new fbfa8e75cbb [fix][meta] Fix deadlock causes session notification not to work (#19754) fbfa8e75cbb is described below commit fbfa8e75cbbe4f3269f4f3f59555a22f845d022a Author: Qiang Zhao AuthorDate: Thu Mar 9 10:17:39 2023 +0800 [fix][meta] Fix deadlock causes session notification not to work (#19754) ### Motivation This is a namespace bundle double-owners problem. We found it in the memory dumps. The memory dumps show that the notification thread has been blocked for a long time by the leader election deadlock, And many notifications are blocked in the executor queue. This causes we can't to revalidate the locks, and they are still thinking them working well. For private reasons, I can't share the namespace bundle snapshot, but the blocked thread easily explains it. https://user-images.githubusercontent.com/74767115/223670419-c4319f44-f1e1-4361-8c79-04c7f0dabe3c.png;> ^^ blocked thread https://user-images.githubusercontent.com/74767115/223672000-fb4ce22c-6a45-4cb6-9d23-c36e3afbc93a.png;> ^^ executor queue ### Modifications - Avoid putting the new task to single thread executor causes deadlock. (cherry picked from commit cbd799f05eb26aeac5ffd5bbb9751ad9e5928dd3) --- .../coordination/impl/LeaderElectionImpl.java | 21 +++ .../coordination/impl/LeaderElectionImplTest.java | 65 ++ 2 files changed, 74 insertions(+), 12 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java index 7f50904eb07..bb445010690 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java @@ -19,30 +19,23 @@ package org.apache.pulsar.metadata.coordination.impl; import com.fasterxml.jackson.databind.type.TypeFactory; - -import io.netty.util.concurrent.DefaultThreadFactory; - -import java.util.ArrayList; +import com.google.common.annotations.VisibleForTesting; import java.util.EnumSet; -import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; - import lombok.extern.slf4j.Slf4j; - import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.bookkeeper.common.util.SafeRunnable; -import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataCacheConfig; +import org.apache.pulsar.metadata.api.MetadataSerde; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyClosedException; import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; @@ -54,7 +47,6 @@ import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.api.extended.SessionEvent; import org.apache.pulsar.metadata.cache.impl.JSONMetadataSerdeSimpleType; -import org.apache.pulsar.metadata.api.MetadataSerde; @Slf4j class LeaderElectionImpl implements LeaderElection { @@ -120,13 +112,13 @@ class LeaderElectionImpl implements LeaderElection { } else { return tryToBecomeLeader(); } -}).thenComposeAsync(leaderElectionState -> { +}).thenCompose(leaderElectionState -> { // make sure that the cache contains the current leader // so that getLeaderValueIfPresent works on all brokers cache.refresh(path); return cache.get(path) .thenApply(__ -> leaderElectionState); -}, executor); +}); } private synchronized CompletableFuture handleExistingLeaderValue(GetResult res) { @@ -345,4 +337,9 @@ class LeaderElectionImpl implements LeaderElection { } } } + +@VisibleForTesting +protected ScheduledExecutorService getSchedulerExecutor() { +return executor; +} } diff --git
[pulsar] branch branch-2.11 updated: [fix] [broker] Topic close failure leaves subscription in a permanent fence state (#19692)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 9f58da745ea [fix] [broker] Topic close failure leaves subscription in a permanent fence state (#19692) 9f58da745ea is described below commit 9f58da745ead56be6f61c2f2f656f5a1616bcd94 Author: fengyubiao AuthorDate: Thu Mar 9 10:38:21 2023 +0800 [fix] [broker] Topic close failure leaves subscription in a permanent fence state (#19692) Motivation : After a Topic close failure or a delete failure, the fence state will be reset to get the topic back to work,but it will not reset the fence state of the subscription, which will result in the consumer never being able to connect to the broker. Modifications: Reset the fence state of subscriptions if the operation of topic close is failed. (cherry picked from commit cdeef00c5f6a5bd3197b4ca6de0a0505b18835d8) --- .../service/persistent/PersistentSubscription.java | 39 +++ .../broker/service/persistent/PersistentTopic.java | 1 + .../service/persistent/PersistentTopicTest.java| 55 ++ 3 files changed, 87 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 53701649289..645f7402c7e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -125,6 +125,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache; private final PendingAckHandle pendingAckHandle; private volatile Map subscriptionProperties; +private volatile CompletableFuture fenceFuture; static { REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); @@ -891,7 +892,10 @@ public class PersistentSubscription extends AbstractSubscription implements Subs */ @Override public synchronized CompletableFuture disconnect() { -CompletableFuture disconnectFuture = new CompletableFuture<>(); +if (fenceFuture != null){ +return fenceFuture; +} +fenceFuture = new CompletableFuture<>(); // block any further consumers on this subscription IS_FENCED_UPDATER.set(this, TRUE); @@ -899,19 +903,38 @@ public class PersistentSubscription extends AbstractSubscription implements Subs (dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null)) .thenCompose(v -> close()).thenRun(() -> { log.info("[{}][{}] Successfully disconnected and closed subscription", topicName, subName); -disconnectFuture.complete(null); +fenceFuture.complete(null); }).exceptionally(exception -> { -IS_FENCED_UPDATER.set(this, FALSE); -if (dispatcher != null) { -dispatcher.reset(); -} log.error("[{}][{}] Error disconnecting consumers from subscription", topicName, subName, exception); -disconnectFuture.completeExceptionally(exception); +fenceFuture.completeExceptionally(exception); +resumeAfterFence(); return null; }); +return fenceFuture; +} -return disconnectFuture; +/** + * Resume subscription after topic deletion or close failure. + */ +public synchronized void resumeAfterFence() { +// If "fenceFuture" is null, it means that "disconnect" has never been called. +if (fenceFuture != null) { +fenceFuture.whenComplete((ignore, ignoreEx) -> { +synchronized (PersistentSubscription.this) { +try { +if (IS_FENCED_UPDATER.compareAndSet(this, TRUE, FALSE)) { +if (dispatcher != null) { +dispatcher.reset(); +} +} +fenceFuture = null; +} catch (Exception ex) { +log.error("[{}] Resume subscription [{}] failure", topicName, subName, ex); +} +} +}); +} } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #19295: [feat][broker] OneStageAuth State: move authn out of constructor
BewareMyPower commented on code in PR #19295: URL: https://github.com/apache/pulsar/pull/19295#discussion_r1130341227 ## pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/OneStageAuthenticationState.java: ## @@ -20,39 +20,60 @@ import static java.nio.charset.StandardCharsets.UTF_8; import java.net.SocketAddress; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; import javax.servlet.http.HttpServletRequest; import org.apache.pulsar.common.api.AuthData; /** - * Interface for authentication state. - * - * It tell broker whether the authentication is completed or not, - * if completed, what is the AuthRole is. + * A class to track single stage authentication. This class assumes that: + * 1. {@link #authenticateAsync(AuthData)} is called once and when the {@link CompletableFuture} completes, + *authentication is complete. + * 2. Authentication does not expire, so {@link #isExpired()} always returns false. + * + * See {@link AuthenticationState} for Pulsar's contract on how this interface is used by Pulsar. */ public class OneStageAuthenticationState implements AuthenticationState { -private final AuthenticationDataSource authenticationDataSource; -private final String authRole; +private AuthenticationDataSource authenticationDataSource; +private final SocketAddress remoteAddress; +private final SSLSession sslSession; +private final AuthenticationProvider provider; +private volatile String authRole; + +/** + * Constructor for a {@link OneStageAuthenticationState} where there is no authentication performed during + * initialization. + * @param remoteAddress - remoteAddress associated with the {@link AuthenticationState} + * @param sslSession - sslSession associated with the {@link AuthenticationState} + * @param provider - {@link AuthenticationProvider} to use to verify {@link AuthData} + */ public OneStageAuthenticationState(AuthData authData, SocketAddress remoteAddress, SSLSession sslSession, - AuthenticationProvider provider) throws AuthenticationException { -this.authenticationDataSource = new AuthenticationDataCommand( -new String(authData.getBytes(), UTF_8), remoteAddress, sslSession); -this.authRole = provider.authenticate(authenticationDataSource); + AuthenticationProvider provider) { +this.provider = provider; +this.remoteAddress = remoteAddress; +this.sslSession = sslSession; } -public OneStageAuthenticationState(HttpServletRequest request, AuthenticationProvider provider) -throws AuthenticationException { +public OneStageAuthenticationState(HttpServletRequest request, AuthenticationProvider provider) { +// Must initialize this here for backwards compatibility with http authentication this.authenticationDataSource = new AuthenticationDataHttps(request); -this.authRole = provider.authenticate(authenticationDataSource); +this.provider = provider; +// These are not used when invoking this constructor. +this.remoteAddress = null; +this.sslSession = null; } @Override -public String getAuthRole() { +public String getAuthRole() throws AuthenticationException { +if (authRole == null) { +throw new AuthenticationException("Must authenticate before calling getAuthRole"); +} Review Comment: > Out of curiosity, what is the consequence of breaking source compatibility by removing the exception from the method signature? No. I just noticed this point when I reviewed again. After looking for some documents and testing the ABI compatibility locally, it seems that removing the exception signature does not affect the ABI compatibility. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch branch-2.10 updated: [fix] [broker] Topic close failure leaves subscription in a permanent fence state (#19692)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.10 by this push: new 751eeea172e [fix] [broker] Topic close failure leaves subscription in a permanent fence state (#19692) 751eeea172e is described below commit 751eeea172e125a7e44baf66b3728db5c348d279 Author: fengyubiao AuthorDate: Thu Mar 9 10:38:21 2023 +0800 [fix] [broker] Topic close failure leaves subscription in a permanent fence state (#19692) Motivation : After a Topic close failure or a delete failure, the fence state will be reset to get the topic back to work,but it will not reset the fence state of the subscription, which will result in the consumer never being able to connect to the broker. Modifications: Reset the fence state of subscriptions if the operation of topic close is failed. (cherry picked from commit cdeef00c5f6a5bd3197b4ca6de0a0505b18835d8) --- .../service/persistent/PersistentSubscription.java | 39 +++ .../broker/service/persistent/PersistentTopic.java | 1 + .../service/persistent/PersistentTopicTest.java| 55 ++ 3 files changed, 87 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index ff1c654f1d8..3352fb78d51 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -114,6 +114,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache; private final PendingAckHandle pendingAckHandle; private volatile Map subscriptionProperties; +private volatile CompletableFuture fenceFuture; static { REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); @@ -777,7 +778,10 @@ public class PersistentSubscription extends AbstractSubscription implements Subs */ @Override public synchronized CompletableFuture disconnect() { -CompletableFuture disconnectFuture = new CompletableFuture<>(); +if (fenceFuture != null){ +return fenceFuture; +} +fenceFuture = new CompletableFuture<>(); // block any further consumers on this subscription IS_FENCED_UPDATER.set(this, TRUE); @@ -785,19 +789,38 @@ public class PersistentSubscription extends AbstractSubscription implements Subs (dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null)) .thenCompose(v -> close()).thenRun(() -> { log.info("[{}][{}] Successfully disconnected and closed subscription", topicName, subName); -disconnectFuture.complete(null); +fenceFuture.complete(null); }).exceptionally(exception -> { -IS_FENCED_UPDATER.set(this, FALSE); -if (dispatcher != null) { -dispatcher.reset(); -} log.error("[{}][{}] Error disconnecting consumers from subscription", topicName, subName, exception); -disconnectFuture.completeExceptionally(exception); +fenceFuture.completeExceptionally(exception); +resumeAfterFence(); return null; }); +return fenceFuture; +} -return disconnectFuture; +/** + * Resume subscription after topic deletion or close failure. + */ +public synchronized void resumeAfterFence() { +// If "fenceFuture" is null, it means that "disconnect" has never been called. +if (fenceFuture != null) { +fenceFuture.whenComplete((ignore, ignoreEx) -> { +synchronized (PersistentSubscription.this) { +try { +if (IS_FENCED_UPDATER.compareAndSet(this, TRUE, FALSE)) { +if (dispatcher != null) { +dispatcher.reset(); +} +} +fenceFuture = null; +} catch (Exception ex) { +log.error("[{}] Resume subscription [{}] failure", topicName, subName, ex); +} +} +}); +} } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
[GitHub] [pulsar] nodece commented on a diff in pull request #19432: [improve] Allow to build and push multi-arch Docker images
nodece commented on code in PR #19432: URL: https://github.com/apache/pulsar/pull/19432#discussion_r1102218809 ## docker/pulsar-all/pom.xml: ## @@ -139,6 +135,7 @@ build tag + push Review Comment: We don't need to push the image by the maven, so we need to remove this goal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch branch-2.9 updated: [fix] [broker] Topic close failure leaves subscription in a permanent fence state (#19692)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.9 by this push: new 21d85fca559 [fix] [broker] Topic close failure leaves subscription in a permanent fence state (#19692) 21d85fca559 is described below commit 21d85fca559c9088701455060d7f0613f454aa6f Author: fengyubiao AuthorDate: Thu Mar 9 10:38:21 2023 +0800 [fix] [broker] Topic close failure leaves subscription in a permanent fence state (#19692) Motivation : After a Topic close failure or a delete failure, the fence state will be reset to get the topic back to work,but it will not reset the fence state of the subscription, which will result in the consumer never being able to connect to the broker. Modifications: Reset the fence state of subscriptions if the operation of topic close is failed. (cherry picked from commit cdeef00c5f6a5bd3197b4ca6de0a0505b18835d8) --- .../service/persistent/PersistentSubscription.java | 39 +++--- .../broker/service/persistent/PersistentTopic.java | 1 + .../service/persistent/PersistentTopicTest.java| 61 -- 3 files changed, 89 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index b93410fe472..7aac21c8994 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -112,6 +112,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache; private final PendingAckHandle pendingAckHandle; +private volatile CompletableFuture fenceFuture; static { REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); @@ -769,7 +770,10 @@ public class PersistentSubscription extends AbstractSubscription implements Subs */ @Override public synchronized CompletableFuture disconnect() { -CompletableFuture disconnectFuture = new CompletableFuture<>(); +if (fenceFuture != null){ +return fenceFuture; +} +fenceFuture = new CompletableFuture<>(); // block any further consumers on this subscription IS_FENCED_UPDATER.set(this, TRUE); @@ -777,19 +781,38 @@ public class PersistentSubscription extends AbstractSubscription implements Subs (dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null)) .thenCompose(v -> close()).thenRun(() -> { log.info("[{}][{}] Successfully disconnected and closed subscription", topicName, subName); -disconnectFuture.complete(null); +fenceFuture.complete(null); }).exceptionally(exception -> { -IS_FENCED_UPDATER.set(this, FALSE); -if (dispatcher != null) { -dispatcher.reset(); -} log.error("[{}][{}] Error disconnecting consumers from subscription", topicName, subName, exception); -disconnectFuture.completeExceptionally(exception); +fenceFuture.completeExceptionally(exception); +resumeAfterFence(); return null; }); +return fenceFuture; +} -return disconnectFuture; +/** + * Resume subscription after topic deletion or close failure. + */ +public synchronized void resumeAfterFence() { +// If "fenceFuture" is null, it means that "disconnect" has never been called. +if (fenceFuture != null) { +fenceFuture.whenComplete((ignore, ignoreEx) -> { +synchronized (PersistentSubscription.this) { +try { +if (IS_FENCED_UPDATER.compareAndSet(this, TRUE, FALSE)) { +if (dispatcher != null) { +dispatcher.reset(); +} +} +fenceFuture = null; +} catch (Exception ex) { +log.error("[{}] Resume subscription [{}] failure", topicName, subName, ex); +} +} +}); +} } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
[GitHub] [pulsar-client-python] merlimat commented on issue #102: upgrade grpcio version
merlimat commented on issue #102: URL: https://github.com/apache/pulsar-client-python/issues/102#issuecomment-1461190789 @hangc0276 @zymap We should probably upgrade it first in BookKeeper because it's used by Python functions state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] tisonkun commented on issue #18408: [Bug] All instances of the production cluster are full gc, and topic fenced cannot be written
tisonkun commented on issue #18408: URL: https://github.com/apache/pulsar/issues/18408#issuecomment-1461189566 Moved to discussion. It's more a usage question than a bug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] codelipenghui added a comment to the discussion: [Bug] All instances of the production cluster are full gc, and topic fenced cannot be written
GitHub user codelipenghui added a comment to the discussion: [Bug] All instances of the production cluster are full gc, and topic fenced cannot be written > CPU load is relatively high Do you see which threads consumed lots of the CPU? If you are able to reproduce the issue. It's better to do some CPU profiling by using https://github.com/jvm-profiling-tools/async-profiler or other tools. So that we can understand which thread consumed the CPU resources GitHub link: https://github.com/apache/pulsar/discussions/19757#discussioncomment-5248789 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
[GitHub] [pulsar] tisonkun added a comment to the discussion: [Bug] All instances of the production cluster are full gc, and topic fenced cannot be written
GitHub user tisonkun added a comment to the discussion: [Bug] All instances of the production cluster are full gc, and topic fenced cannot be written Moved to discussion. It's more a usage question than a bug. GitHub link: https://github.com/apache/pulsar/discussions/19757#discussioncomment-5248791 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
[GitHub] [pulsar] github-actions[bot] added a comment to the discussion: [Bug] All instances of the production cluster are full gc, and topic fenced cannot be written
GitHub user github-actions[bot] added a comment to the discussion: [Bug] All instances of the production cluster are full gc, and topic fenced cannot be written The issue had no activity for 30 days, mark with Stale label. GitHub link: https://github.com/apache/pulsar/discussions/19757#discussioncomment-5248790 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
[GitHub] [pulsar] tisonkun closed issue #18408: [Bug] All instances of the production cluster are full gc, and topic fenced cannot be written
tisonkun closed issue #18408: [Bug] All instances of the production cluster are full gc, and topic fenced cannot be written URL: https://github.com/apache/pulsar/issues/18408 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] tisonkun closed issue #19032: Flaky-test: TestPulsarAuth.testPulsarSqlAuth hangs and makes the test run to get cancelled by a timeout
tisonkun closed issue #19032: Flaky-test: TestPulsarAuth.testPulsarSqlAuth hangs and makes the test run to get cancelled by a timeout URL: https://github.com/apache/pulsar/issues/19032 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] tisonkun commented on issue #19032: Flaky-test: TestPulsarAuth.testPulsarSqlAuth hangs and makes the test run to get cancelled by a timeout
tisonkun commented on issue #19032: URL: https://github.com/apache/pulsar/issues/19032#issuecomment-1461188807 https://github.com/apache/pulsar/issues/13964 has been fixed. Closing as no more report for this case. Feel free to open a new ticket or reopen this one if it comes back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] tisonkun closed issue #19150: [Bug] pulsar-2.10.2 Please redo the lookup. Request is denied: namespace=public/default
tisonkun closed issue #19150: [Bug] pulsar-2.10.2 Please redo the lookup. Request is denied: namespace=public/default URL: https://github.com/apache/pulsar/issues/19150 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] tisonkun commented on issue #19150: [Bug] pulsar-2.10.2 Please redo the lookup. Request is denied: namespace=public/default
tisonkun commented on issue #19150: URL: https://github.com/apache/pulsar/issues/19150#issuecomment-1461187806 Please correctly fill out the form. Pasting a long log without a human-readable description cannot help debug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] shibd commented on pull request #19047: [improve][broker]PIP-214 Add broker level metrics statistics and expose to prometheus
shibd commented on PR #19047: URL: https://github.com/apache/pulsar/pull/19047#issuecomment-1461184277 Why delete `site2` folder? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch branch-2.9 updated: [Branch-2.9] [build] fix build failed (#19743)
This is an automated email from the ASF dual-hosted git repository. zhaocong pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.9 by this push: new 8e97632ff2f [Branch-2.9] [build] fix build failed (#19743) 8e97632ff2f is described below commit 8e97632ff2f1fd024d4ff29240196c950b02e5c1 Author: Cong Zhao AuthorDate: Thu Mar 9 10:39:03 2023 +0800 [Branch-2.9] [build] fix build failed (#19743) --- .../pulsar/client/impl/BrokerClientIntegrationTest.java | 15 --- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java index 26b9fa1c0d2..ed2454ab5f1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -36,6 +36,9 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; import java.lang.reflect.Field; import java.nio.ByteBuffer; @@ -43,7 +46,6 @@ import java.nio.charset.StandardCharsets; import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.NavigableMap; import java.util.Optional; import java.util.Set; @@ -54,9 +56,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.Cleanup; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -96,15 +95,12 @@ import org.apache.pulsar.client.impl.schema.writer.JacksonJsonWriter; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.protocol.PulsarHandler; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; -import org.apache.pulsar.metadata.api.MetadataCache; -import org.apache.pulsar.zookeeper.ZooKeeperDataCache; +import org.awaitility.Awaitility; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,9 +110,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - @Test(groups = "broker-impl") public class BrokerClientIntegrationTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(BrokerClientIntegrationTest.class);
[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19749: [improve][broker] PIP-192: Add metrics for unload operation
heesung-sn commented on code in PR #19749: URL: https://github.com/apache/pulsar/pull/19749#discussion_r1130324249 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadCounter.java: ## @@ -45,36 +48,57 @@ public class UnloadCounter { long unloadBrokerCount = 0; long unloadBundleCount = 0; -final Map> breakdownCounters; +@Getter +@VisibleForTesting +final Map> breakdownCounters; +@Getter +@VisibleForTesting double loadAvg; +@Getter +@VisibleForTesting double loadStd; +private volatile long updatedAt = 0; + public UnloadCounter() { breakdownCounters = Map.of( Success, Map.of( -Overloaded, new MutableLong(), -Underloaded, new MutableLong()), +Overloaded, new AtomicLong(), +Underloaded, new AtomicLong(), +Admin, new AtomicLong()), Skip, Map.of( -Balanced, new MutableLong(), -NoBundles, new MutableLong(), -CoolDown, new MutableLong(), -OutDatedData, new MutableLong(), -NoLoadData, new MutableLong(), -NoBrokers, new MutableLong(), -Unknown, new MutableLong()), +Balanced, new AtomicLong(), +NoBundles, new AtomicLong(), +CoolDown, new AtomicLong(), +OutDatedData, new AtomicLong(), +NoLoadData, new AtomicLong(), +NoBrokers, new AtomicLong(), +Unknown, new AtomicLong()), Failure, Map.of( -Unknown, new MutableLong()) +Unknown, new AtomicLong()) ); } public void update(UnloadDecision decision) { -var unloads = decision.getUnloads(); -unloadBrokerCount += unloads.keySet().size(); -unloadBundleCount += unloads.values().size(); - breakdownCounters.get(decision.getLabel()).get(decision.getReason()).increment(); -loadAvg = decision.loadAvg; -loadStd = decision.loadStd; +unloadBrokerCount++; +unloadBundleCount++; + breakdownCounters.get(decision.getLabel()).get(decision.getReason()).incrementAndGet(); +updatedAt = System.currentTimeMillis(); +} + +public void update(UnloadDecision.Label label, UnloadDecision.Reason reason) { +if (label == Success) { +unloadBundleCount++; +unloadBrokerCount++; Review Comment: I am fine with this change if we can change this metric's behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] coderzc merged pull request #19743: [Branch-2.9] [build] fix build failed
coderzc merged PR #19743: URL: https://github.com/apache/pulsar/pull/19743 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch master updated: [fix] [broker] Topic close failure leaves subscription in a permanent fence state (#19692)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new cdeef00c5f6 [fix] [broker] Topic close failure leaves subscription in a permanent fence state (#19692) cdeef00c5f6 is described below commit cdeef00c5f6a5bd3197b4ca6de0a0505b18835d8 Author: fengyubiao AuthorDate: Thu Mar 9 10:38:21 2023 +0800 [fix] [broker] Topic close failure leaves subscription in a permanent fence state (#19692) Motivation : After a Topic close failure or a delete failure, the fence state will be reset to get the topic back to work,but it will not reset the fence state of the subscription, which will result in the consumer never being able to connect to the broker. Modifications: Reset the fence state of subscriptions if the operation of topic close is failed. --- .../service/persistent/PersistentSubscription.java | 39 +++ .../broker/service/persistent/PersistentTopic.java | 1 + .../service/persistent/PersistentTopicTest.java| 55 ++ 3 files changed, 87 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index dfeca267503..e07d7bee500 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -126,6 +126,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache; private final PendingAckHandle pendingAckHandle; private volatile Map subscriptionProperties; +private volatile CompletableFuture fenceFuture; static { REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); @@ -897,7 +898,10 @@ public class PersistentSubscription extends AbstractSubscription implements Subs */ @Override public synchronized CompletableFuture disconnect() { -CompletableFuture disconnectFuture = new CompletableFuture<>(); +if (fenceFuture != null){ +return fenceFuture; +} +fenceFuture = new CompletableFuture<>(); // block any further consumers on this subscription IS_FENCED_UPDATER.set(this, TRUE); @@ -905,19 +909,38 @@ public class PersistentSubscription extends AbstractSubscription implements Subs (dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null)) .thenCompose(v -> close()).thenRun(() -> { log.info("[{}][{}] Successfully disconnected and closed subscription", topicName, subName); -disconnectFuture.complete(null); +fenceFuture.complete(null); }).exceptionally(exception -> { -IS_FENCED_UPDATER.set(this, FALSE); -if (dispatcher != null) { -dispatcher.reset(); -} log.error("[{}][{}] Error disconnecting consumers from subscription", topicName, subName, exception); -disconnectFuture.completeExceptionally(exception); +fenceFuture.completeExceptionally(exception); +resumeAfterFence(); return null; }); +return fenceFuture; +} -return disconnectFuture; +/** + * Resume subscription after topic deletion or close failure. + */ +public synchronized void resumeAfterFence() { +// If "fenceFuture" is null, it means that "disconnect" has never been called. +if (fenceFuture != null) { +fenceFuture.whenComplete((ignore, ignoreEx) -> { +synchronized (PersistentSubscription.this) { +try { +if (IS_FENCED_UPDATER.compareAndSet(this, TRUE, FALSE)) { +if (dispatcher != null) { +dispatcher.reset(); +} +} +fenceFuture = null; +} catch (Exception ex) { +log.error("[{}] Resume subscription [{}] failure", topicName, subName, ex); +} +} +}); +} } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index fd0b0694212..3b9fbbceb24
[GitHub] [pulsar] poorbarcode merged pull request #19692: [fix] [broker] Topic close failure leaves subscription in a permanent fence state
poorbarcode merged PR #19692: URL: https://github.com/apache/pulsar/pull/19692 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] lordcheng10 commented on pull request #19754: [fix][meta] Fix deadlock causes session notification not to work
lordcheng10 commented on PR #19754: URL: https://github.com/apache/pulsar/pull/19754#issuecomment-1461179278 nice catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] coderzc closed pull request #19751: [fix] [test] fix flaky test BucketDelayedDeliveryTrackerTest. testWithBkException
coderzc closed pull request #19751: [fix] [test] fix flaky test BucketDelayedDeliveryTrackerTest. testWithBkException URL: https://github.com/apache/pulsar/pull/19751 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode closed pull request #14792: [fix] [package] Fix Auto delete the package name issue
poorbarcode closed pull request #14792: [fix] [package] Fix Auto delete the package name issue URL: https://github.com/apache/pulsar/pull/14792 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-go] RobertIndie closed issue #906: Include original messageId and topic as part of properties for DLQ message.
RobertIndie closed issue #906: Include original messageId and topic as part of properties for DLQ message. URL: https://github.com/apache/pulsar-client-go/issues/906 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-go] RobertIndie commented on issue #906: Include original messageId and topic as part of properties for DLQ message.
RobertIndie commented on issue #906: URL: https://github.com/apache/pulsar-client-go/issues/906#issuecomment-1461176933 Fixed by #907 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-go] RobertIndie commented on a diff in pull request #987: [feat] Expose the chunk config of consumer to the reader
RobertIndie commented on code in PR #987: URL: https://github.com/apache/pulsar-client-go/pull/987#discussion_r1130316355 ## pulsar/reader_test.go: ## @@ -70,6 +70,49 @@ func TestReaderConfigSubscribeName(t *testing.T) { assert.NotNil(t, consumer) } +func TestReaderConfigChunk(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + consumer1, err := client.CreateReader(ReaderOptions{ + Topic: "my-topic1", + StartMessageID: EarliestMessageID(), + MaxPendingChunkedMessage:50, + ExpireTimeOfIncompleteChunk: 30 * time.Second, + AutoAckIncompleteChunk: true, + }) + if err != nil { + t.Fatal(err) + } Review Comment: Please use `assert.NotNil(t, err)` ## pulsar/reader_test.go: ## @@ -70,6 +70,49 @@ func TestReaderConfigSubscribeName(t *testing.T) { assert.NotNil(t, consumer) } +func TestReaderConfigChunk(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + consumer1, err := client.CreateReader(ReaderOptions{ + Topic: "my-topic1", + StartMessageID: EarliestMessageID(), + MaxPendingChunkedMessage:50, + ExpireTimeOfIncompleteChunk: 30 * time.Second, + AutoAckIncompleteChunk: true, + }) + if err != nil { + t.Fatal(err) + } + defer consumer1.Close() Review Comment: ```suggestion r1, err := client.CreateReader(ReaderOptions{ Topic: "my-topic1", StartMessageID: EarliestMessageID(), MaxPendingChunkedMessage:50, ExpireTimeOfIncompleteChunk: 30 * time.Second, AutoAckIncompleteChunk: true, }) if err != nil { t.Fatal(err) } defer r1.Close() ``` Same to the `consumer2` below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #19622: [improve][broker] PIP-192 Added SplitScheduler and DefaultNamespaceBundleSplitStrategyImpl
Demogorgon314 commented on code in PR #19622: URL: https://github.com/apache/pulsar/pull/19622#discussion_r1130315799 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.manager; + +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; + +/** + * Split manager. + */ +@Slf4j +public class SplitManager implements StateChangeListener { + +record InFlightSplitRequest(SplitDecision splitDecision, CompletableFuture future) { +} + +private final Map inFlightSplitRequests; + +private final SplitCounter counter; + +public SplitManager(SplitCounter splitCounter) { +this.inFlightSplitRequests = new ConcurrentHashMap<>(); +this.counter = splitCounter; +} + +private void complete(String serviceUnit, Throwable ex) { +inFlightSplitRequests.computeIfPresent(serviceUnit, (__, inFlightSplitRequest) -> { +var future = inFlightSplitRequest.future; +if (!future.isDone()) { +if (ex != null) { +counter.update(Failure, Unknown); +future.completeExceptionally(ex); +log.error("Failed the bundle split event: {}", serviceUnit, ex); +} else { +counter.update(inFlightSplitRequest.splitDecision); +future.complete(null); +log.info("Completed the bundle split event: {}", serviceUnit); +} +} +return null; +}); +} + +public CompletableFuture waitAsync(CompletableFuture eventPubFuture, + String bundle, + SplitDecision decision, + long timeout, + TimeUnit timeoutUnit) { +return eventPubFuture +.thenCompose(__ -> inFlightSplitRequests.computeIfAbsent(bundle, ignore -> { +log.info("Published the bundle split event for bundle:{}. " ++ "Waiting the split event to complete. Timeout: {} {}", +bundle, timeout, timeoutUnit); +CompletableFuture future = new CompletableFuture<>(); +future.orTimeout(timeout, timeoutUnit).whenComplete((v, ex) -> { +if (ex != null) { +inFlightSplitRequests.remove(bundle); +log.warn("Timed out while waiting for the bundle split event: {}", bundle, ex); +} +}); +return new InFlightSplitRequest(decision, future); +}).future) +.exceptionally(e -> { +log.error("Failed to publish the bundle split event for bundle:{}. Skipping wait.", bundle); +counter.update(Failure, Unknown); +return null; Review Comment: Using exceptionally will cause the returned future to lose the exception info. We should use `whenComplete` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this
[GitHub] [pulsar] thetumbled commented on pull request #19662: [fix][txn] The second commit of same transaction will fail
thetumbled commented on PR #19662: URL: https://github.com/apache/pulsar/pull/19662#issuecomment-1461173327 PTAL, thanks. @congbobo184 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #19622: [improve][broker] PIP-192 Added SplitScheduler and DefaultNamespaceBundleSplitStrategyImpl
Demogorgon314 commented on code in PR #19622: URL: https://github.com/apache/pulsar/pull/19622#discussion_r1130313762 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java: ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.scheduler; + +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; +import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; +import org.apache.pulsar.broker.loadbalance.extensions.strategy.DefaultNamespaceBundleSplitStrategyImpl; +import org.apache.pulsar.broker.loadbalance.extensions.strategy.NamespaceBundleSplitStrategy; +import org.apache.pulsar.common.stats.Metrics; +import org.apache.pulsar.common.util.FutureUtil; + +/** + * Service Unit(e.g. bundles) Split scheduler. + */ +@Slf4j +public class SplitScheduler implements LoadManagerScheduler { + +private final PulsarService pulsar; + +private final ScheduledExecutorService loadManagerExecutor; + +private final LoadManagerContext context; + +private final ServiceConfiguration conf; + +private final ServiceUnitStateChannel serviceUnitStateChannel; + +private final NamespaceBundleSplitStrategy bundleSplitStrategy; + +private final SplitCounter counter; + +private final AtomicReference> splitMetrics; + +private volatile ScheduledFuture task; + +private long counterLastUpdatedAt = 0; + +public SplitScheduler(PulsarService pulsar, + ServiceUnitStateChannel serviceUnitStateChannel, + SplitCounter counter, + AtomicReference> splitMetrics, + LoadManagerContext context, + NamespaceBundleSplitStrategy bundleSplitStrategy) { +this.pulsar = pulsar; +this.loadManagerExecutor = pulsar.getLoadManagerExecutor(); +this.counter = counter; +this.splitMetrics = splitMetrics; +this.context = context; +this.conf = pulsar.getConfiguration(); +this.bundleSplitStrategy = bundleSplitStrategy; +this.serviceUnitStateChannel = serviceUnitStateChannel; +} + +public SplitScheduler(PulsarService pulsar, + ServiceUnitStateChannel serviceUnitStateChannel, + SplitCounter counter, + AtomicReference> splitMetrics, + LoadManagerContext context) { +this(pulsar, serviceUnitStateChannel, counter, splitMetrics, context, +new DefaultNamespaceBundleSplitStrategyImpl(counter)); +} + +@Override +public void execute() { +boolean debugMode = conf.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled(); +if (debugMode) { +log.info("Load balancer enabled: {}, Split enabled: {}.", +conf.isLoadBalancerEnabled(), conf.isLoadBalancerAutoBundleSplitEnabled()); +} + +if (!isLoadBalancerAutoBundleSplitEnabled()) { +if (debugMode) { +log.info("The load balancer or load balancer split already disabled. Skipping."); +} +return; +} + +
[GitHub] [pulsar-client-python] NiuBlibing opened a new issue, #102: upgrade grpcio version
NiuBlibing opened a new issue, #102: URL: https://github.com/apache/pulsar-client-python/issues/102 The depency of grpcio `"grpcio<1.28,>=1.8.2",` is release two years ago which could not install on python3.10. ``` OS: Linux (manjaro 5.15.94-1-MANJARO) GCC: gcc (GCC) 12.2.1 20230201 PYTHON: Python 3.10.9 ``` ```log cc1plus: warning: command-line option ‘-std=gnu99’ is valid for C/ObjC but not for C++ src/core/tsi/ssl_transport_security.cc: In function ‘tsi_result tsi_ssl_get_cert_chain_contents(stack_st_X509*, tsi_peer_property*)’: src/core/tsi/ssl_transport_security.cc:1125:21: warning: comparison of integer expressions of different signedness: ‘int’ and ‘size_t’ {aka ‘long unsigned int’} [-Wsign-compare] 1125 | for (int i = 0; i < sk_X509_num(peer_chain); i++) { | ~~^ gcc -pthread -B /home/test/.conda/envs/py310/compiler_compat -Wno-unused-result -Wsign-compare -DNDEBUG -fwrapv -O2 -Wall -fPIC -O2 -isystem /home/test/.conda/envs/py310/include -fPIC -O2 -isystem /home/test/.conda/envs/py310/include -fPIC -DOPENSSL_NO_ASM=1 -D_WIN32_WINNT=1536 -DGPR_BACKWARDS_COMPATIBILITY_MODE=1 -DHAVE_CONFIG_H=1 -DGRPC_ENABLE_FORK_SUPPORT=1 "-DPyMODINIT_FUNC=extern \"C\" __attribute__((visibility (\"default\"))) PyObject*" -DGRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK=1 -Isrc/python/grpcio -Iinclude -I. -Ithird_party/abseil-cpp -Ithird_party/address_sorting/include -Ithird_party/cares -Ithird_party/cares/cares -Ithird_party/cares/config_linux -Ithird_party/boringssl/include -Ithird_party/upb -Isrc/core/ext/upb-generated -Ithird_party/zlib -I/home/test/.conda/envs/py310/include/python3.10 -c src/core/tsi/transport_security.cc -o python_build/temp.linux-x86_64-cpython-310/src/core/tsi/transport_security.o -std=c++11 -std=gnu99 -fvisibility=hidden -fno-wrapv -fno-e xceptions -pthread cc1plus: warning: command-line option ‘-std=gnu99’ is valid for C/ObjC but not for C++ gcc -pthread -B /home/test/.conda/envs/py310/compiler_compat -Wno-unused-result -Wsign-compare -DNDEBUG -fwrapv -O2 -Wall -fPIC -O2 -isystem /home/test/.conda/envs/py310/include -fPIC -O2 -isystem /home/test/.conda/envs/py310/include -fPIC -DOPENSSL_NO_ASM=1 -D_WIN32_WINNT=1536 -DGPR_BACKWARDS_COMPATIBILITY_MODE=1 -DHAVE_CONFIG_H=1 -DGRPC_ENABLE_FORK_SUPPORT=1 "-DPyMODINIT_FUNC=extern \"C\" __attribute__((visibility (\"default\"))) PyObject*" -DGRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK=1 -Isrc/python/grpcio -Iinclude -I. -Ithird_party/abseil-cpp -Ithird_party/address_sorting/include -Ithird_party/cares -Ithird_party/cares/cares -Ithird_party/cares/config_linux -Ithird_party/boringssl/include -Ithird_party/upb -Isrc/core/ext/upb-generated -Ithird_party/zlib -I/home/test/.conda/envs/py310/include/python3.10 -c src/core/tsi/transport_security_grpc.cc -o python_build/temp.linux-x86_64-cpython-310/src/core/tsi/transport_security_grpc.o -std=c++11 -std=gnu99 -fvisibility=hidden -fno-wr apv -fno-exceptions -pthread cc1plus: warning: command-line option ‘-std=gnu99’ is valid for C/ObjC but not for C++ gcc -pthread -B /home/test/.conda/envs/py310/compiler_compat -Wno-unused-result -Wsign-compare -DNDEBUG -fwrapv -O2 -Wall -fPIC -O2 -isystem /home/test/.conda/envs/py310/include -fPIC -O2 -isystem /home/test/.conda/envs/py310/include -fPIC -DOPENSSL_NO_ASM=1 -D_WIN32_WINNT=1536 -DGPR_BACKWARDS_COMPATIBILITY_MODE=1 -DHAVE_CONFIG_H=1 -DGRPC_ENABLE_FORK_SUPPORT=1 "-DPyMODINIT_FUNC=extern \"C\" __attribute__((visibility (\"default\"))) PyObject*" -DGRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK=1 -Isrc/python/grpcio -Iinclude -I. -Ithird_party/abseil-cpp -Ithird_party/address_sorting/include -Ithird_party/cares -Ithird_party/cares/cares -Ithird_party/cares/config_linux -Ithird_party/boringssl/include -Ithird_party/upb -Isrc/core/ext/upb-generated -Ithird_party/zlib -I/home/test/.conda/envs/py310/include/python3.10 -c src/python/grpcio/grpc/_cython/cygrpc.cpp -o python_build/temp.linux-x86_64-cpython-310/src/python/grpcio/grpc/_cython/cygrpc.o -std=c++11 -std=gnu99 -fvisibility=hidden -fno -wrapv -fno-exceptions -pthread cc1plus: warning: command-line option ‘-std=gnu99’ is valid for C/ObjC but not for C++ src/python/grpcio/grpc/_cython/cygrpc.cpp: In function ‘PyObject* __pyx_f_4grpc_7_cython_6cygrpc__initialize()’: src/python/grpcio/grpc/_cython/cygrpc.cpp:88388:28: warning: ‘void PyEval_InitThreads()’ is deprecated [-Wdeprecated-declarations] 88388 | (void)(PyEval_InitThreads()); | ~~^~ In file included from /home/test/.conda/envs/py310/include/python3.10/Python.h:130, from src/python/grpcio/grpc/_cython/cygrpc.cpp:877: /home/test/.conda/envs/py310/include/python3.10/ceval.h:122:37: note: declared here 122 | Py_DEPRECATED(3.9)
[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19749: [improve][broker] PIP-192: Add metrics for unload operation
heesung-sn commented on code in PR #19749: URL: https://github.com/apache/pulsar/pull/19749#discussion_r1130311710 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadCounter.java: ## @@ -45,36 +48,57 @@ public class UnloadCounter { long unloadBrokerCount = 0; long unloadBundleCount = 0; -final Map> breakdownCounters; +@Getter +@VisibleForTesting +final Map> breakdownCounters; +@Getter +@VisibleForTesting double loadAvg; +@Getter +@VisibleForTesting double loadStd; +private volatile long updatedAt = 0; + public UnloadCounter() { breakdownCounters = Map.of( Success, Map.of( -Overloaded, new MutableLong(), -Underloaded, new MutableLong()), +Overloaded, new AtomicLong(), +Underloaded, new AtomicLong(), +Admin, new AtomicLong()), Skip, Map.of( -Balanced, new MutableLong(), -NoBundles, new MutableLong(), -CoolDown, new MutableLong(), -OutDatedData, new MutableLong(), -NoLoadData, new MutableLong(), -NoBrokers, new MutableLong(), -Unknown, new MutableLong()), +Balanced, new AtomicLong(), +NoBundles, new AtomicLong(), +CoolDown, new AtomicLong(), +OutDatedData, new AtomicLong(), +NoLoadData, new AtomicLong(), +NoBrokers, new AtomicLong(), +Unknown, new AtomicLong()), Failure, Map.of( -Unknown, new MutableLong()) +Unknown, new AtomicLong()) ); } public void update(UnloadDecision decision) { -var unloads = decision.getUnloads(); -unloadBrokerCount += unloads.keySet().size(); -unloadBundleCount += unloads.values().size(); - breakdownCounters.get(decision.getLabel()).get(decision.getReason()).increment(); -loadAvg = decision.loadAvg; -loadStd = decision.loadStd; +unloadBrokerCount++; +unloadBundleCount++; + breakdownCounters.get(decision.getLabel()).get(decision.getReason()).incrementAndGet(); +updatedAt = System.currentTimeMillis(); +} + +public void update(UnloadDecision.Label label, UnloadDecision.Reason reason) { +if (label == Success) { +unloadBundleCount++; +unloadBrokerCount++; Review Comment: I dont think this is an important metic, but I was trying to keep the original behavior for the backward compatible reason( someone users might be using it?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #19749: [improve][broker] PIP-192: Add metrics for unload operation
Demogorgon314 commented on code in PR #19749: URL: https://github.com/apache/pulsar/pull/19749#discussion_r1130309935 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java: ## @@ -122,33 +141,40 @@ public synchronized void execute() { log.info("Only 1 broker available: no load shedding will be performed. Skipping."); return CompletableFuture.completedFuture(null); } -final UnloadDecision unloadDecision = namespaceUnloadStrategy +final Set decisions = namespaceUnloadStrategy .findBundlesForUnloading(context, recentlyUnloadedBundles, recentlyUnloadedBrokers); if (debugMode) { log.info("[{}] Unload decision result: {}", - namespaceUnloadStrategy.getClass().getSimpleName(), unloadDecision.toString()); + namespaceUnloadStrategy.getClass().getSimpleName(), decisions); } -if (unloadDecision.getUnloads().isEmpty()) { +if (decisions.isEmpty()) { if (debugMode) { log.info("[{}] Unload decision unloads is empty. Skipping.", namespaceUnloadStrategy.getClass().getSimpleName()); } return CompletableFuture.completedFuture(null); } List> futures = new ArrayList<>(); -unloadDecision.getUnloads().forEach((broker, unload) -> { +long asyncOpTimeoutMs = conf.getNamespaceBundleUnloadingTimeoutMs(); +decisions.forEach(decision -> { +Unload unload = decision.getUnload(); log.info("[{}] Unloading bundle: {}", namespaceUnloadStrategy.getClass().getSimpleName(), unload); futures.add(unloadManager.waitAsync(channel.publishUnloadEventAsync(unload), unload.serviceUnit(), - conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS) +decision, asyncOpTimeoutMs, TimeUnit.MILLISECONDS) .thenAccept(__ -> { - recentlyUnloadedBundles.put(unload.serviceUnit(), System.currentTimeMillis()); - recentlyUnloadedBrokers.put(unload.sourceBroker(), System.currentTimeMillis()); +recentlyUnloadedBundles.put(unload.serviceUnit(), System.currentTimeMillis()); +recentlyUnloadedBrokers.put(unload.sourceBroker(), System.currentTimeMillis()); })); }); -return FutureUtil.waitForAll(futures).exceptionally(ex -> { -log.error("[{}] Namespace unload has exception.", - namespaceUnloadStrategy.getClass().getSimpleName(), ex); -return null; +return FutureUtil.waitForAll(futures).whenComplete((__, ex) -> { Review Comment: updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #19749: [improve][broker] PIP-192: Add metrics for unload operation
Demogorgon314 commented on code in PR #19749: URL: https://github.com/apache/pulsar/pull/19749#discussion_r1130309591 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java: ## @@ -122,33 +141,40 @@ public synchronized void execute() { log.info("Only 1 broker available: no load shedding will be performed. Skipping."); return CompletableFuture.completedFuture(null); } -final UnloadDecision unloadDecision = namespaceUnloadStrategy +final Set decisions = namespaceUnloadStrategy .findBundlesForUnloading(context, recentlyUnloadedBundles, recentlyUnloadedBrokers); if (debugMode) { log.info("[{}] Unload decision result: {}", - namespaceUnloadStrategy.getClass().getSimpleName(), unloadDecision.toString()); + namespaceUnloadStrategy.getClass().getSimpleName(), decisions); } -if (unloadDecision.getUnloads().isEmpty()) { +if (decisions.isEmpty()) { if (debugMode) { log.info("[{}] Unload decision unloads is empty. Skipping.", namespaceUnloadStrategy.getClass().getSimpleName()); } return CompletableFuture.completedFuture(null); } List> futures = new ArrayList<>(); -unloadDecision.getUnloads().forEach((broker, unload) -> { +long asyncOpTimeoutMs = conf.getNamespaceBundleUnloadingTimeoutMs(); +decisions.forEach(decision -> { Review Comment: Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #19749: [improve][broker] PIP-192: Add metrics for unload operation
Demogorgon314 commented on code in PR #19749: URL: https://github.com/apache/pulsar/pull/19749#discussion_r1130309317 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java: ## @@ -229,36 +245,31 @@ public String toString() { @Override -public UnloadDecision findBundlesForUnloading(LoadManagerContext context, +public Set findBundlesForUnloading(LoadManagerContext context, Map recentlyUnloadedBundles, Map recentlyUnloadedBrokers) { final var conf = context.brokerConfiguration(); -decision.clear(); +decisionCache.clear(); stats.clear(); -var selectedBundlesCache = decision.getUnloads(); try { final var loadStore = context.brokerLoadDataStore(); stats.setLoadDataStore(loadStore); boolean debugMode = conf.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled(); var skipReason = stats.update(context.brokerLoadDataStore(), recentlyUnloadedBrokers, conf); -if (!skipReason.isEmpty()) { -decision.skip(skipReason.get()); -log.warn("Failed to update load stat. Reason:{}. Stop unloading.", decision.getReason()); -return decision; +if (skipReason.isPresent()) { +log.warn("Failed to update load stat. Reason:{}. Stop unloading.", skipReason.get()); +counter.updateLoadData(stats.avg, stats.std); Review Comment: Updated. ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java: ## @@ -347,16 +350,21 @@ public CompletableFuture unloadNamespaceBundleAsync(ServiceUnitId bundle, log.warn(msg); throw new IllegalArgumentException(msg); } -return unloadAsync(new Unload(sourceBroker, bundle.toString(), destinationBroker), +Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker); +UnloadDecision unloadDecision = +new UnloadDecision(unload, Success, Admin); +return unloadAsync(unloadDecision, conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS); }); } -private CompletableFuture unloadAsync(Unload unload, +private CompletableFuture unloadAsync(UnloadDecision unloadDecision, long timeout, TimeUnit timeoutUnit) { +Unload unload = unloadDecision.getUnload(); CompletableFuture future = serviceUnitStateChannel.publishUnloadEventAsync(unload); -return unloadManager.waitAsync(future, unload.serviceUnit(), timeout, timeoutUnit); +return unloadManager.waitAsync(future, unload.serviceUnit(), +new UnloadDecision(unload, Success, Admin), timeout, timeoutUnit); Review Comment: Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch master updated (e973388eb77 -> cbd799f05eb)
This is an automated email from the ASF dual-hosted git repository. xiangying pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from e973388eb77 [fix][broker] Fixed history load not releasing (#19726) add cbd799f05eb [fix][meta] Fix deadlock causes session notification not to work (#19754) No new revisions were added by this update. Summary of changes: .../coordination/impl/LeaderElectionImpl.java | 10 +++- .../coordination/impl/LeaderElectionImplTest.java | 65 ++ 2 files changed, 73 insertions(+), 2 deletions(-) create mode 100644 pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java
[GitHub] [pulsar] liangyepianzhou merged pull request #19754: [fix][meta] Fix deadlock causes session notification not to work
liangyepianzhou merged PR #19754: URL: https://github.com/apache/pulsar/pull/19754 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] github-actions[bot] commented on issue #18822: PIP-228: Refactor Information Architecture of Pulsar Client Documentation
github-actions[bot] commented on issue #18822: URL: https://github.com/apache/pulsar/issues/18822#issuecomment-1461158757 The issue had no activity for 30 days, mark with Stale label. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] github-actions[bot] commented on issue #19088: Flaky-test: SimpleProducerConsumerTestStreamingDispatcherTest.testBackoffAndReconnect
github-actions[bot] commented on issue #19088: URL: https://github.com/apache/pulsar/issues/19088#issuecomment-1461158443 The issue had no activity for 30 days, mark with Stale label. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] github-actions[bot] commented on issue #19379: Flaky-test: InactiveTopicDeleteTest.testDeleteWhenNoSubscriptionsWithMultiConfig
github-actions[bot] commented on issue #19379: URL: https://github.com/apache/pulsar/issues/19379#issuecomment-1461158061 The issue had no activity for 30 days, mark with Stale label. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] github-actions[bot] commented on issue #19375: [Bug][pulsar-functions][worker] --update-auth-data fails if Config is identical
github-actions[bot] commented on issue #19375: URL: https://github.com/apache/pulsar/issues/19375#issuecomment-1461158101 The issue had no activity for 30 days, mark with Stale label. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org