(pulsar) branch branch-3.0 updated: [improve][ci][branch-3.0] Upgrade actions in pulsar-ci and pulsar-ci-flaky, port owasp cache change
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new db40c8f31a8 [improve][ci][branch-3.0] Upgrade actions in pulsar-ci and pulsar-ci-flaky, port owasp cache change db40c8f31a8 is described below commit db40c8f31a8eeb8d8f5e7c4daca6234cfea116ac Author: Lari Hotari AuthorDate: Tue May 14 08:47:00 2024 +0300 [improve][ci][branch-3.0] Upgrade actions in pulsar-ci and pulsar-ci-flaky, port owasp cache change --- .github/actions/upload-coverage/action.yml | 8 +- .github/changes-filter.yaml| 5 +- .github/workflows/ci-go-functions.yaml | 6 +- .github/workflows/pulsar-ci-flaky.yaml | 14 ++-- .github/workflows/pulsar-ci.yaml | 126 + 5 files changed, 91 insertions(+), 68 deletions(-) diff --git a/.github/actions/upload-coverage/action.yml b/.github/actions/upload-coverage/action.yml index a9706e77333..0ba73e94a83 100644 --- a/.github/actions/upload-coverage/action.yml +++ b/.github/actions/upload-coverage/action.yml @@ -51,7 +51,7 @@ runs: - name: "Upload to Codecov (attempt #1)" id: codecov-upload-1 if: steps.repo-check.outputs.passed == 'true' - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 continue-on-error: true with: flags: ${{ inputs.flags }} @@ -64,7 +64,7 @@ runs: - name: "Upload to Codecov (attempt #2)" id: codecov-upload-2 if: steps.codecov-upload-1.outcome == 'failure' - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 continue-on-error: true with: flags: ${{ inputs.flags }} @@ -77,7 +77,7 @@ runs: - name: "Upload to Codecov (attempt #3)" id: codecov-upload-3 if: steps.codecov-upload-2.outcome == 'failure' - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 # fail on last attempt continue-on-error: false with: @@ -97,4 +97,4 @@ runs: [Code coverage report](https://app.codecov.io/github/$GITHUB_REPOSITORY/commit/${head_sha}/tree) -EOF \ No newline at end of file +EOF diff --git a/.github/changes-filter.yaml b/.github/changes-filter.yaml index be6faa95788..66e5db32d4c 100644 --- a/.github/changes-filter.yaml +++ b/.github/changes-filter.yaml @@ -11,10 +11,13 @@ docs: - '.idea/**' - 'deployment/**' - 'wiki/**' + - 'pip/**' +java_non_tests: + - '**/src/main/java/**/*.java' tests: - added|modified: '**/src/test/java/**/*.java' need_owasp: - 'pom.xml' - '**/pom.xml' - 'src/owasp-dependency-check-false-positives.xml' - - 'src/owasp-dependency-check-suppressions.xml' \ No newline at end of file + - 'src/owasp-dependency-check-suppressions.xml' diff --git a/.github/workflows/ci-go-functions.yaml b/.github/workflows/ci-go-functions.yaml index 406aebd3474..563d43b84dc 100644 --- a/.github/workflows/ci-go-functions.yaml +++ b/.github/workflows/ci-go-functions.yaml @@ -43,7 +43,7 @@ jobs: docs_only: ${{ steps.check_changes.outputs.docs_only }} steps: - name: checkout -uses: actions/checkout@v3 +uses: actions/checkout@v4 - name: Detect changed files id: changes @@ -80,13 +80,13 @@ jobs: steps: - name: Check out code into the Go module directory -uses: actions/checkout@v3 +uses: actions/checkout@v4 - name: Tune Runner VM uses: ./.github/actions/tune-runner-vm - name: Set up Go -uses: actions/setup-go@v2 +uses: actions/setup-go@v5 with: go-version: ${{ matrix.go-version }} id: go diff --git a/.github/workflows/pulsar-ci-flaky.yaml b/.github/workflows/pulsar-ci-flaky.yaml index b8a80550689..16688ebc3b5 100644 --- a/.github/workflows/pulsar-ci-flaky.yaml +++ b/.github/workflows/pulsar-ci-flaky.yaml @@ -54,7 +54,7 @@ jobs: collect_coverage: ${{ steps.check_coverage.outputs.collect_coverage }} steps: - name: checkout -uses: actions/checkout@v3 +uses: actions/checkout@v4 - name: Detect changed files id: changes @@ -101,7 +101,7 @@ jobs: if: ${{ needs.preconditions.outputs.docs_only != 'true' }} steps: - name: checkout -uses: actions/checkout@v3 +uses: actions/checkout@v4 - name: Tune Runner VM uses: ./.github/actions/tune-runner-vm @@ -115,7 +115,7 @@ jobs: limit-access-to-actor: true - name: Cache local Maven repository -uses: actions/cache@v3 +uses: actions/cache@v4 timeout-minutes: 5 with: path: | @@ -126,7 +126,7 @@ jobs: ${{ runner.os }}-m2-dependencies-core-modules- - name: Set up JDK 17 -uses:
Re: [PR] [improve][build] Support custom image and label names [pulsar]
nodece commented on PR #22703: URL: https://github.com/apache/pulsar/pull/22703#issuecomment-2109338511 /pulsarbot rerun-failure-checks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] [break change] Do not create partitioned DLQ/Retry topic automatically [pulsar]
mattisonchao commented on PR #22705: URL: https://github.com/apache/pulsar/pull/22705#issuecomment-2109325061 cc @shibd, I remember we discussed this place. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Do not create partitioned DLQ/Retry topic automatically [pulsar]
mattisonchao commented on code in PR #22705: URL: https://github.com/apache/pulsar/pull/22705#discussion_r1599400950 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -3493,6 +3495,10 @@ private CompletableFuture isAllowAutoTopicCreationAsync(final TopicName } public boolean isDefaultTopicTypePartitioned(final TopicName topicName, final Optional policies) { +if (topicName.getPartitionedTopicName().endsWith(DLQ_GROUP_TOPIC_SUFFIX) Review Comment: Okay, Let us add a notice in the new release for this breaking change. thanks! :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Do not create partitioned DLQ/Retry topic automatically [pulsar]
poorbarcode commented on code in PR #22705: URL: https://github.com/apache/pulsar/pull/22705#discussion_r1599398918 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -3493,6 +3495,10 @@ private CompletableFuture isAllowAutoTopicCreationAsync(final TopicName } public boolean isDefaultTopicTypePartitioned(final TopicName topicName, final Optional policies) { +if (topicName.getPartitionedTopicName().endsWith(DLQ_GROUP_TOPIC_SUFFIX) Review Comment: > Yes, it's a case. What if the user uses the topic name with the -DLQ suffix as a regular topic? - They can use the topics that were created before as before. - For new topics, they should create manually if they want a partitioned topic with suffix `-DLQ`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Do not create partitioned DLQ/Retry topic automatically [pulsar]
mattisonchao commented on code in PR #22705: URL: https://github.com/apache/pulsar/pull/22705#discussion_r1599396713 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -3493,6 +3495,10 @@ private CompletableFuture isAllowAutoTopicCreationAsync(final TopicName } public boolean isDefaultTopicTypePartitioned(final TopicName topicName, final Optional policies) { +if (topicName.getPartitionedTopicName().endsWith(DLQ_GROUP_TOPIC_SUFFIX) Review Comment: Yes, it's a case. What if the user uses the topic name with the `-DLQ` suffix as a regular topic? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Do not create partitioned DLQ/Retry topic automatically [pulsar]
mattisonchao commented on code in PR #22705: URL: https://github.com/apache/pulsar/pull/22705#discussion_r1599388238 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -3493,6 +3495,10 @@ private CompletableFuture isAllowAutoTopicCreationAsync(final TopicName } public boolean isDefaultTopicTypePartitioned(final TopicName topicName, final Optional policies) { +if (topicName.getPartitionedTopicName().endsWith(DLQ_GROUP_TOPIC_SUFFIX) Review Comment: Yes, but what is your compensation plan for breaking changes? I saw you also wanna cherry-pick it to branch 3.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Do not create partitioned DLQ/Retry topic automatically [pulsar]
poorbarcode commented on code in PR #22705: URL: https://github.com/apache/pulsar/pull/22705#discussion_r1599394690 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -3493,6 +3495,10 @@ private CompletableFuture isAllowAutoTopicCreationAsync(final TopicName } public boolean isDefaultTopicTypePartitioned(final TopicName topicName, final Optional policies) { +if (topicName.getPartitionedTopicName().endsWith(DLQ_GROUP_TOPIC_SUFFIX) Review Comment: Because the issue described in the motivation will generate much more dirty partitioned topics, which should be fixed for the LTS version -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated (1a7ada88056 -> 9f1325ac64c)
This is an automated email from the ASF dual-hosted git repository. daojun pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 1a7ada88056 [improve][build] Bump version to 3.4.0-SNAPSHOT (#22700) add 9f1325ac64c [fix] [broker] fix typo and useless local variable. (#22704) No new revisions were added by this update. Summary of changes: .../broker/service/persistent/ReplicatedSubscriptionsController.java | 5 + ...plicatorSubscriptionTest.java => ReplicatedSubscriptionTest.java} | 4 ++-- ...ctionTest.java => ReplicatedSubscriptionWithTransactionTest.java} | 2 +- 3 files changed, 4 insertions(+), 7 deletions(-) rename pulsar-broker/src/test/java/org/apache/pulsar/broker/service/{ReplicatorSubscriptionTest.java => ReplicatedSubscriptionTest.java} (99%) rename pulsar-broker/src/test/java/org/apache/pulsar/broker/service/{ReplicatorSubscriptionWithTransactionTest.java => ReplicatedSubscriptionWithTransactionTest.java} (95%)
Re: [PR] [fix] [broker] fix typo and useless local variable. [pulsar]
dao-jun merged PR #22704: URL: https://github.com/apache/pulsar/pull/22704 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Do not create partitioned DLQ/Retry topic automatically [pulsar]
poorbarcode commented on code in PR #22705: URL: https://github.com/apache/pulsar/pull/22705#discussion_r1599394690 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -3493,6 +3495,10 @@ private CompletableFuture isAllowAutoTopicCreationAsync(final TopicName } public boolean isDefaultTopicTypePartitioned(final TopicName topicName, final Optional policies) { +if (topicName.getPartitionedTopicName().endsWith(DLQ_GROUP_TOPIC_SUFFIX) Review Comment: Because the issue described in the motivation will generate much more dirty partitioned topics -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Do not create partitioned DLQ/Retry topic automatically [pulsar]
mattisonchao commented on code in PR #22705: URL: https://github.com/apache/pulsar/pull/22705#discussion_r1599388238 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -3493,6 +3495,10 @@ private CompletableFuture isAllowAutoTopicCreationAsync(final TopicName } public boolean isDefaultTopicTypePartitioned(final TopicName topicName, final Optional policies) { +if (topicName.getPartitionedTopicName().endsWith(DLQ_GROUP_TOPIC_SUFFIX) Review Comment: Yes, but what is your plan for breaking changes? I saw you also wanna cherry-pick it to branch 3.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.2 updated: [fix][test] Fix NPE in BookKeeperClusterTestCase tearDown (#22493)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new bb41dbde205 [fix][test] Fix NPE in BookKeeperClusterTestCase tearDown (#22493) bb41dbde205 is described below commit bb41dbde205ea6e4ffe23e577c9fd24377ebf22d Author: Lari Hotari AuthorDate: Fri Apr 12 10:09:54 2024 -0700 [fix][test] Fix NPE in BookKeeperClusterTestCase tearDown (#22493) (cherry picked from commit d1748573f1cb294838b69b5d80af672c3ee9e453) --- .../org/apache/bookkeeper/test/BookKeeperClusterTestCase.java | 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index 0ddd04ebc48..a323ecfeb8e 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -86,7 +86,7 @@ public abstract class BookKeeperClusterTestCase { protected String testName; -@BeforeMethod +@BeforeMethod(alwaysRun = true) public void handleTestMethodName(Method method) { testName = method.getName(); } @@ -148,7 +148,7 @@ public abstract class BookKeeperClusterTestCase { } } -@BeforeTest +@BeforeTest(alwaysRun = true) public void setUp() throws Exception { setUp(getLedgersRootPath()); } @@ -222,7 +222,9 @@ public abstract class BookKeeperClusterTestCase { tearDownException = e; } -executor.shutdownNow(); +if (executor != null) { +executor.shutdownNow(); +} LOG.info("Tearing down test {} in {} ms.", testName, sw.elapsed(TimeUnit.MILLISECONDS)); if (tearDownException != null) {
Re: [PR] [improve] [broker] Do not create partitioned DLQ/Retry topic automatically [pulsar]
poorbarcode commented on code in PR #22705: URL: https://github.com/apache/pulsar/pull/22705#discussion_r1599386804 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -3493,6 +3495,10 @@ private CompletableFuture isAllowAutoTopicCreationAsync(final TopicName } public boolean isDefaultTopicTypePartitioned(final TopicName topicName, final Optional policies) { +if (topicName.getPartitionedTopicName().endsWith(DLQ_GROUP_TOPIC_SUFFIX) Review Comment: Good suggestion, we can do this improvement with a separate PIP, just like the discussion https://lists.apache.org/thread/qgbpzr6o3k5rqbs2jvpkdh8hr9jpnw39. Before that, we can fix the issue faster through this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.0 updated: [fix][test] Fix NPE in BookKeeperClusterTestCase tearDown (#22493)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 4bb0d8135c1 [fix][test] Fix NPE in BookKeeperClusterTestCase tearDown (#22493) 4bb0d8135c1 is described below commit 4bb0d8135c1194d469575764b1e96f9f9f56d8be Author: Lari Hotari AuthorDate: Fri Apr 12 10:09:54 2024 -0700 [fix][test] Fix NPE in BookKeeperClusterTestCase tearDown (#22493) (cherry picked from commit d1748573f1cb294838b69b5d80af672c3ee9e453) --- .../org/apache/bookkeeper/test/BookKeeperClusterTestCase.java | 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index 80bb6256591..e316083e837 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -86,7 +86,7 @@ public abstract class BookKeeperClusterTestCase { protected String testName; -@BeforeMethod +@BeforeMethod(alwaysRun = true) public void handleTestMethodName(Method method) { testName = method.getName(); } @@ -148,7 +148,7 @@ public abstract class BookKeeperClusterTestCase { } } -@BeforeTest +@BeforeTest(alwaysRun = true) public void setUp() throws Exception { setUp(getLedgersRootPath()); } @@ -222,7 +222,9 @@ public abstract class BookKeeperClusterTestCase { tearDownException = e; } -executor.shutdownNow(); +if (executor != null) { +executor.shutdownNow(); +} LOG.info("Tearing down test {} in {} ms.", testName, sw.elapsed(TimeUnit.MILLISECONDS)); if (tearDownException != null) {
Re: [PR] [improve] [broker] Do not create partitioned DLQ/Retry topic automatically [pulsar]
mattisonchao commented on code in PR #22705: URL: https://github.com/apache/pulsar/pull/22705#discussion_r1599384270 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -3493,6 +3495,10 @@ private CompletableFuture isAllowAutoTopicCreationAsync(final TopicName } public boolean isDefaultTopicTypePartitioned(final TopicName topicName, final Optional policies) { +if (topicName.getPartitionedTopicName().endsWith(DLQ_GROUP_TOPIC_SUFFIX) Review Comment: I suggest we collect all the topic names like this to be used as a keyword collection in Pulsar. Plus, it's a breaking change for existing users using a topic name with `-DLQ` suffix. and this suffix is kinda common. we should give a plan for this breaking change. (workaround or notice important) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) 06/06: [fix][broker] fix replicated subscriptions for transactional messages (#22452)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit e99bf9a4a77a90f14956909a849f927fb1052a81 Author: Lari Hotari AuthorDate: Tue May 14 07:27:41 2024 +0300 [fix][broker] fix replicated subscriptions for transactional messages (#22452) (cherry picked from commit 9fd1b61fc45d06348af0241f002966087f1822a0) # Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java --- .../broker/service/persistent/PersistentTopic.java | 26 +-- .../ReplicatedSubscriptionsController.java | 4 +- .../transaction/buffer/TransactionBuffer.java | 3 +- .../buffer/impl/InMemTransactionBuffer.java| 13 +- .../buffer/impl/TopicTransactionBuffer.java| 70 ++-- .../buffer/impl/TransactionBufferDisable.java | 13 +- .../pulsar/broker/service/PersistentTopicTest.java | 4 +- .../broker/service/ReplicatorSubscriptionTest.java | 25 +++ .../TransactionalReplicateSubscriptionTest.java| 182 + .../broker/transaction/TransactionProduceTest.java | 36 .../pulsar/broker/transaction/TransactionTest.java | 2 +- 11 files changed, 343 insertions(+), 35 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 472387a0a9b..5eb0600a9fd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -128,6 +128,7 @@ import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.NamespaceStats; import org.apache.pulsar.broker.stats.ReplicationMetrics; import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; +import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.client.admin.LongRunningProcessStatus; @@ -266,9 +267,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Getter protected final TransactionBuffer transactionBuffer; +@Getter +private final TopicTransactionBuffer.MaxReadPositionCallBack maxReadPositionCallBack = +(oldPosition, newPosition) -> updateMaxReadPositionMovedForwardTimestamp(); -// Record the last time a data message (ie: not an internal Pulsar marker) is published on the topic -private volatile long lastDataMessagePublishedTimestamp = 0; +// Record the last time max read position is moved forward, unless it's a marker message. +@Getter +private volatile long lastMaxReadPositionMovedForwardTimestamp = 0; @Getter private final ExecutorService orderedExecutor; @@ -380,7 +385,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } else { this.transactionBuffer = new TransactionBufferDisable(this); } -transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry()); +transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry(), true); if (ledger instanceof ShadowManagedLedgerImpl) { shadowSourceTopic = TopicName.get(ledger.getConfig().getShadowSource()); } else { @@ -681,6 +686,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } } +private void updateMaxReadPositionMovedForwardTimestamp() { +lastMaxReadPositionMovedForwardTimestamp = Clock.systemUTC().millis(); +} + @Override public void addComplete(Position pos, ByteBuf entryData, Object ctx) { PublishContext publishContext = (PublishContext) ctx; @@ -689,12 +698,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal // Message has been successfully persisted messageDeduplication.recordMessagePersisted(publishContext, position); -if (!publishContext.isMarkerMessage()) { -lastDataMessagePublishedTimestamp = Clock.systemUTC().millis(); -} - // in order to sync the max position when cursor read entries -transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry()); +transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry(), +publishContext.isMarkerMessage()); publishContext.setMetadataFromEntryData(entryData);
(pulsar) 03/06: [fix][broker] Fix ProducerBusy issue due to incorrect userCreatedProducerCount on non-persistent topic (#22685)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 9ddf78d4224003c5a6f8bd3d18458d9bdd322924 Author: hr <64506104+hrz...@users.noreply.github.com> AuthorDate: Thu May 9 21:49:27 2024 +0800 [fix][broker] Fix ProducerBusy issue due to incorrect userCreatedProducerCount on non-persistent topic (#22685) Co-authored-by: ruihongzhou (cherry picked from commit 253e6506ea2c5ccc6afe1117e311cf24685ce4e9) --- .../service/nonpersistent/NonPersistentTopic.java | 10 -- .../nonpersistent/NonPersistentTopicTest.java | 22 ++ 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index ecc6134ecda..8af654633b4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.service.nonpersistent; -import static com.google.common.base.Preconditions.checkArgument; import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl.create; import static org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; @@ -57,7 +56,6 @@ import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersio import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.GetStatsOptions; -import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.Replicator; import org.apache.pulsar.broker.service.StreamingStats; import org.apache.pulsar.broker.service.Subscription; @@ -248,14 +246,6 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol return false; } -@Override -public void removeProducer(Producer producer) { -checkArgument(producer.getTopic() == this); -if (producers.remove(producer.getProducerName(), producer)) { -handleProducerRemoved(producer); -} -} - @Override public CompletableFuture checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled) { return CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java index b33381126e5..e2aec70fb11 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java @@ -18,11 +18,13 @@ */ package org.apache.pulsar.broker.service.nonpersistent; +import java.lang.reflect.Field; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.pulsar.broker.service.AbstractTopic; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.SubscriptionOption; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -250,4 +252,24 @@ public class NonPersistentTopicTest extends BrokerTestBase { Awaitility.waitAtMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS) .until(() -> subscriptionMap.get(keySharedSubName) == null); } + + +@Test +public void testRemoveProducerOnNonPersistentTopic() throws Exception { +final String topicName = "non-persistent://prop/ns-abc/topic_" + UUID.randomUUID(); + +Producer producer = pulsarClient.newProducer() +.topic(topicName) +.create(); + +NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); +Field field = AbstractTopic.class.getDeclaredField("userCreatedProducerCount"); +field.setAccessible(true); +int userCreatedProducerCount = (int) field.get(topic); +assertEquals(userCreatedProducerCount, 1); + +producer.close(); +userCreatedProducerCount = (int) field.get(topic); +assertEquals(userCreatedProducerCount, 0); +} }
(pulsar) 02/06: [fix] [broker] rename to changeMaxReadPositionCount (#22656)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 1e53635fb25de88c165a8b4f126759979a40dd4a Author: Wenzhi Feng <52550727+thetumb...@users.noreply.github.com> AuthorDate: Wed May 8 19:34:00 2024 +0800 [fix] [broker] rename to changeMaxReadPositionCount (#22656) (cherry picked from commit 5ab05129514c1e71a09ec3f28b2b2dda9ce3e47f) --- .../transaction/buffer/impl/TopicTransactionBuffer.java | 16 .../pulsar/broker/transaction/TransactionTest.java | 12 ++-- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index a36216bd625..81c9ecfc728 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -76,8 +76,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen */ private final LinkedMap ongoingTxns = new LinkedMap<>(); -// when add abort or change max read position, the count will +1. Take snapshot will set 0 into it. -private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new AtomicLong(); +// when change max read position, the count will +1. Take snapshot will reset the count. +private final AtomicLong changeMaxReadPositionCount = new AtomicLong(); private final LongAdder txnCommittedCounter = new LongAdder(); @@ -429,15 +429,15 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen } private void takeSnapshotByChangeTimes() { -if (changeMaxReadPositionAndAddAbortTimes.get() >= takeSnapshotIntervalNumber) { -this.changeMaxReadPositionAndAddAbortTimes.set(0); +if (changeMaxReadPositionCount.get() >= takeSnapshotIntervalNumber) { +this.changeMaxReadPositionCount.set(0); this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition); } } private void takeSnapshotByTimeout() { -if (changeMaxReadPositionAndAddAbortTimes.get() > 0) { -this.changeMaxReadPositionAndAddAbortTimes.set(0); +if (changeMaxReadPositionCount.get() > 0) { +this.changeMaxReadPositionCount.set(0); this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition); } this.timer.newTimeout(TopicTransactionBuffer.this, @@ -454,7 +454,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(); } if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) { -this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement(); +this.changeMaxReadPositionCount.getAndIncrement(); } } @@ -489,7 +489,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen } else if (checkIfReady()) { if (ongoingTxns.isEmpty()) { maxReadPosition = position; -changeMaxReadPositionAndAddAbortTimes.incrementAndGet(); +changeMaxReadPositionCount.incrementAndGet(); } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 28dc2f8972c..86def029186 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1062,10 +1062,10 @@ public class TransactionTest extends TransactionTestBase { } @Test -public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() throws Exception { +public void testNotChangeMaxReadPositionCountWhenCheckIfNoSnapshot() throws Exception { PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0) .getBrokerService() -.getTopic(NAMESPACE1 + "/changeMaxReadPositionAndAddAbortTimes" + UUID.randomUUID(), true) +.getTopic(NAMESPACE1 + "/changeMaxReadPositionCount" + UUID.randomUUID(), true) .get().get(); TransactionBuffer buffer = persistentTopic.getTransactionBuffer(); Field processorField = TopicTransactionBuffer.class.getDeclaredField("snapshotAbortedTxnProcessor"); @@ -1073,9 +1073,9 @@ public class TransactionTest extends
(pulsar) 01/06: [fix][storage] ReadonlyManagedLedger initialization does not fill in the properties (#22630)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 4cd8a674b80c5795a1a9b0b74f16547b7a9fff79 Author: Gvan Yao <50432408+gvan...@users.noreply.github.com> AuthorDate: Sat May 4 18:35:25 2024 +0800 [fix][storage] ReadonlyManagedLedger initialization does not fill in the properties (#22630) (cherry picked from commit eee3694f00e269eef0f75d791521d0d35d8ff411) --- .../mledger/impl/ReadOnlyManagedLedgerImpl.java| 8 ++ .../impl/ReadOnlyManagedLedgerImplTest.java| 103 + 2 files changed, 111 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java index 1fdf6939506..707b71c9d9f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java @@ -32,6 +32,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException; import org.apache.bookkeeper.mledger.ReadOnlyCursor; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.pulsar.metadata.api.Stat; @@ -58,6 +59,13 @@ public class ReadOnlyManagedLedgerImpl extends ManagedLedgerImpl { ledgers.put(ls.getLedgerId(), ls); } +if (mlInfo.getPropertiesCount() > 0) { +for (int i = 0; i < mlInfo.getPropertiesCount(); i++) { +MLDataFormats.KeyValue property = mlInfo.getProperties(i); +propertiesMap.put(property.getKey(), property.getValue()); +} +} + // Last ledger stat may be zeroed, we must update it if (ledgers.size() > 0 && ledgers.lastEntry().getValue().getEntries() == 0) { long lastLedgerId = ledgers.lastKey(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImplTest.java new file mode 100644 index 000..028ecad4072 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImplTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + + +import static org.testng.Assert.assertEquals; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.testng.annotations.Test; + +public class ReadOnlyManagedLedgerImplTest extends MockedBookKeeperTestCase { +private static final String MANAGED_LEDGER_NAME_NON_PROPERTIES = "ml-non-properties"; +private static final String MANAGED_LEDGER_NAME_ATTACHED_PROPERTIES = "ml-attached-properties"; + + +@Test +public void testReadOnlyManagedLedgerImplAttachProperties() +throws ManagedLedgerException, InterruptedException, ExecutionException, TimeoutException { +final ManagedLedger ledger = factory.open(MANAGED_LEDGER_NAME_ATTACHED_PROPERTIES, +new ManagedLedgerConfig().setRetentionTime(1,
(pulsar) 04/06: [fix][client] Fix ReaderBuilder doest not give illegalArgument on connection failure retry (#22639)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit a0b90bcafc307397018c5686fe5bc508c4ae271e Author: Rajan Dhabalia AuthorDate: Fri May 10 04:10:31 2024 -0700 [fix][client] Fix ReaderBuilder doest not give illegalArgument on connection failure retry (#22639) (cherry picked from commit b56f238f6aaffdc0b37b9f6e2185b219f8708570) --- .../org/apache/pulsar/client/impl/ReaderTest.java | 27 ++ .../pulsar/client/impl/ReaderBuilderImpl.java | 5 ++-- .../apache/pulsar/client/impl/BuildersTest.java| 2 +- 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index 2d3e8d4c6e9..12228220b18 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -36,6 +36,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -48,6 +50,7 @@ import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.Reader; @@ -902,4 +905,28 @@ public class ReaderTest extends MockedPulsarServiceBaseTest { assertTrue(reader.hasMessageAvailable()); } } + +@Test +public void testReaderBuilderStateOnRetryFailure() throws Exception { +String ns = "my-property/my-ns"; +String topic = "persistent://" + ns + "/testRetryReader"; +RetentionPolicies retention = new RetentionPolicies(-1, -1); +admin.namespaces().setRetention(ns, retention); +String badUrl = "pulsar://bad-host:8080"; + +PulsarClient client = PulsarClient.builder().serviceUrl(badUrl).build(); + +ReaderBuilder readerBuilder = client.newReader().topic(topic).startMessageFromRollbackDuration(100, +TimeUnit.SECONDS); + +for (int i = 0; i < 3; i++) { +try { +readerBuilder.createAsync().get(1, TimeUnit.SECONDS); +} catch (TimeoutException e) { +log.info("It should time out due to invalid url"); +} catch (IllegalArgumentException e) { +fail("It should not fail with corrupt reader state"); +} +} +} } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index 2860cda0cee..ef230475be5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -86,8 +86,9 @@ public class ReaderBuilderImpl implements ReaderBuilder { .failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder")); } -if (conf.getStartMessageId() != null && conf.getStartMessageFromRollbackDurationInSec() > 0 -|| conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0) { +boolean isStartMsgIdExist = conf.getStartMessageId() != null && conf.getStartMessageId() != MessageId.earliest; +if ((isStartMsgIdExist && conf.getStartMessageFromRollbackDurationInSec() > 0) +|| (conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0)) { return FutureUtil .failedFuture(new IllegalArgumentException( "Start message id or start message from roll back must be specified but they cannot be" diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java index 607689e0e2b..5f52f86d8b0 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java @@ -106,7 +106,7 @@ public class BuildersTest { @Test(expectedExceptions = {PulsarClientException.class}, expectedExceptionsMessageRegExp = ".* must be specified but they cannot be specified at the same time.*")
(pulsar) 05/06: [fix][sec] Upgrade postgresql version to avoid CVE-2024-1597 (#22635)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 31a19973d679aca8a939000299e2b71f873bff88 Author: Nikhil Erigila <60037808+nikhilerigil...@users.noreply.github.com> AuthorDate: Fri May 10 16:41:20 2024 +0530 [fix][sec] Upgrade postgresql version to avoid CVE-2024-1597 (#22635) (cherry picked from commit 2cfd9597676828bae68c9dac74e41d65a1a29864) --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index c817c8e1858..1a24ec0b4d5 100644 --- a/pom.xml +++ b/pom.xml @@ -189,7 +189,7 @@ flexible messaging model and an intuitive client API. 5.1.0 3.42.0.0 8.0.11 -42.5.1 +42.5.5 0.4.6 2.7.5 0.4.4-hotfix1 @@ -198,7 +198,7 @@ flexible messaging model and an intuitive client API. 1.2.4 8.12.1 1.9.7.Final -42.5.0 +42.5.5 8.0.30 1.15.16.Final
(pulsar) branch branch-3.2 updated (1f0afdd6738 -> e99bf9a4a77)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git from 1f0afdd6738 [fix][broker] usedLocallySinceLastReport should always be reset (#22672) new 4cd8a674b80 [fix][storage] ReadonlyManagedLedger initialization does not fill in the properties (#22630) new 1e53635fb25 [fix] [broker] rename to changeMaxReadPositionCount (#22656) new 9ddf78d4224 [fix][broker] Fix ProducerBusy issue due to incorrect userCreatedProducerCount on non-persistent topic (#22685) new a0b90bcafc3 [fix][client] Fix ReaderBuilder doest not give illegalArgument on connection failure retry (#22639) new 31a19973d67 [fix][sec] Upgrade postgresql version to avoid CVE-2024-1597 (#22635) new e99bf9a4a77 [fix][broker] fix replicated subscriptions for transactional messages (#22452) The 6 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../mledger/impl/ReadOnlyManagedLedgerImpl.java| 8 + .../impl/ReadOnlyManagedLedgerImplTest.java| 103 pom.xml| 4 +- .../service/nonpersistent/NonPersistentTopic.java | 10 -- .../broker/service/persistent/PersistentTopic.java | 26 +-- .../ReplicatedSubscriptionsController.java | 4 +- .../transaction/buffer/TransactionBuffer.java | 3 +- .../buffer/impl/InMemTransactionBuffer.java| 13 +- .../buffer/impl/TopicTransactionBuffer.java| 82 +++--- .../buffer/impl/TransactionBufferDisable.java | 13 +- .../pulsar/broker/service/PersistentTopicTest.java | 4 +- .../broker/service/ReplicatorSubscriptionTest.java | 25 +++ .../TransactionalReplicateSubscriptionTest.java| 182 + .../nonpersistent/NonPersistentTopicTest.java | 22 +++ .../broker/transaction/TransactionProduceTest.java | 36 .../pulsar/broker/transaction/TransactionTest.java | 14 +- .../org/apache/pulsar/client/impl/ReaderTest.java | 27 +++ .../pulsar/client/impl/ReaderBuilderImpl.java | 5 +- .../apache/pulsar/client/impl/BuildersTest.java| 2 +- 19 files changed, 521 insertions(+), 62 deletions(-) create mode 100644 managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImplTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.java
(pulsar) branch branch-3.0 updated (cea5409cde1 -> e300fbd7218)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git from cea5409cde1 [fix][sec] Upgrade postgresql version to avoid CVE-2024-1597 (#22635) new 1cf6a72f970 [fix] [broker] rename to changeMaxReadPositionCount (#22656) new e300fbd7218 [fix][broker] fix replicated subscriptions for transactional messages (#22452) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../broker/service/persistent/PersistentTopic.java | 26 +-- .../ReplicatedSubscriptionsController.java | 4 +- .../transaction/buffer/TransactionBuffer.java | 3 +- .../buffer/impl/InMemTransactionBuffer.java| 13 +- .../buffer/impl/TopicTransactionBuffer.java| 82 +++--- .../buffer/impl/TransactionBufferDisable.java | 13 +- .../pulsar/broker/service/PersistentTopicTest.java | 4 +- .../broker/service/ReplicatorSubscriptionTest.java | 25 +++ .../TransactionalReplicateSubscriptionTest.java| 182 + .../broker/transaction/TransactionProduceTest.java | 36 .../pulsar/broker/transaction/TransactionTest.java | 14 +- 11 files changed, 355 insertions(+), 47 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.java
(pulsar) 01/02: [fix] [broker] rename to changeMaxReadPositionCount (#22656)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 1cf6a72f970c1a6b937a8d441b13184b2d73e471 Author: Wenzhi Feng <52550727+thetumb...@users.noreply.github.com> AuthorDate: Wed May 8 19:34:00 2024 +0800 [fix] [broker] rename to changeMaxReadPositionCount (#22656) (cherry picked from commit 5ab05129514c1e71a09ec3f28b2b2dda9ce3e47f) --- .../transaction/buffer/impl/TopicTransactionBuffer.java | 16 .../pulsar/broker/transaction/TransactionTest.java | 12 ++-- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index e45b1c6a852..5dbd5f2f5ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -76,8 +76,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen */ private final LinkedMap ongoingTxns = new LinkedMap<>(); -// when add abort or change max read position, the count will +1. Take snapshot will set 0 into it. -private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new AtomicLong(); +// when change max read position, the count will +1. Take snapshot will reset the count. +private final AtomicLong changeMaxReadPositionCount = new AtomicLong(); private final LongAdder txnCommittedCounter = new LongAdder(); @@ -425,15 +425,15 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen } private void takeSnapshotByChangeTimes() { -if (changeMaxReadPositionAndAddAbortTimes.get() >= takeSnapshotIntervalNumber) { -this.changeMaxReadPositionAndAddAbortTimes.set(0); +if (changeMaxReadPositionCount.get() >= takeSnapshotIntervalNumber) { +this.changeMaxReadPositionCount.set(0); this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition); } } private void takeSnapshotByTimeout() { -if (changeMaxReadPositionAndAddAbortTimes.get() > 0) { -this.changeMaxReadPositionAndAddAbortTimes.set(0); +if (changeMaxReadPositionCount.get() > 0) { +this.changeMaxReadPositionCount.set(0); this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition); } this.timer.newTimeout(TopicTransactionBuffer.this, @@ -450,7 +450,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(); } if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) { -this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement(); +this.changeMaxReadPositionCount.getAndIncrement(); } } @@ -485,7 +485,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen } else if (checkIfReady()) { if (ongoingTxns.isEmpty()) { maxReadPosition = position; -changeMaxReadPositionAndAddAbortTimes.incrementAndGet(); +changeMaxReadPositionCount.incrementAndGet(); } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 4200828c071..47e9d16e7aa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1050,10 +1050,10 @@ public class TransactionTest extends TransactionTestBase { } @Test -public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() throws Exception { +public void testNotChangeMaxReadPositionCountWhenCheckIfNoSnapshot() throws Exception { PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0) .getBrokerService() -.getTopic(NAMESPACE1 + "/changeMaxReadPositionAndAddAbortTimes" + UUID.randomUUID(), true) +.getTopic(NAMESPACE1 + "/changeMaxReadPositionCount" + UUID.randomUUID(), true) .get().get(); TransactionBuffer buffer = persistentTopic.getTransactionBuffer(); Field processorField = TopicTransactionBuffer.class.getDeclaredField("snapshotAbortedTxnProcessor"); @@ -1061,9 +1061,9 @@ public class TransactionTest extends
(pulsar) 02/02: [fix][broker] fix replicated subscriptions for transactional messages (#22452)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit e300fbd72180e50ae049362499ed06bdcb0342ff Author: Wenzhi Feng <52550727+thetumb...@users.noreply.github.com> AuthorDate: Mon May 13 20:04:55 2024 +0800 [fix][broker] fix replicated subscriptions for transactional messages (#22452) (cherry picked from commit 9fd1b61fc45d06348af0241f002966087f1822a0) --- .../broker/service/persistent/PersistentTopic.java | 26 +-- .../ReplicatedSubscriptionsController.java | 4 +- .../transaction/buffer/TransactionBuffer.java | 3 +- .../buffer/impl/InMemTransactionBuffer.java| 13 +- .../buffer/impl/TopicTransactionBuffer.java| 70 ++-- .../buffer/impl/TransactionBufferDisable.java | 13 +- .../pulsar/broker/service/PersistentTopicTest.java | 4 +- .../broker/service/ReplicatorSubscriptionTest.java | 25 +++ .../TransactionalReplicateSubscriptionTest.java| 182 + .../broker/transaction/TransactionProduceTest.java | 36 .../pulsar/broker/transaction/TransactionTest.java | 2 +- 11 files changed, 343 insertions(+), 35 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index dc1b204baa5..c19ab7a5192 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -124,6 +124,7 @@ import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.NamespaceStats; import org.apache.pulsar.broker.stats.ReplicationMetrics; import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; +import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.client.admin.LongRunningProcessStatus; @@ -253,9 +254,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Getter protected final TransactionBuffer transactionBuffer; +@Getter +private final TopicTransactionBuffer.MaxReadPositionCallBack maxReadPositionCallBack = +(oldPosition, newPosition) -> updateMaxReadPositionMovedForwardTimestamp(); -// Record the last time a data message (ie: not an internal Pulsar marker) is published on the topic -private volatile long lastDataMessagePublishedTimestamp = 0; +// Record the last time max read position is moved forward, unless it's a marker message. +@Getter +private volatile long lastMaxReadPositionMovedForwardTimestamp = 0; @Getter private final ExecutorService orderedExecutor; @@ -370,7 +375,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } else { this.transactionBuffer = new TransactionBufferDisable(this); } -transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry()); +transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry(), true); if (ledger instanceof ShadowManagedLedgerImpl) { shadowSourceTopic = TopicName.get(ledger.getConfig().getShadowSource()); } else { @@ -646,6 +651,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } } +private void updateMaxReadPositionMovedForwardTimestamp() { +lastMaxReadPositionMovedForwardTimestamp = Clock.systemUTC().millis(); +} + @Override public void addComplete(Position pos, ByteBuf entryData, Object ctx) { PublishContext publishContext = (PublishContext) ctx; @@ -654,12 +663,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal // Message has been successfully persisted messageDeduplication.recordMessagePersisted(publishContext, position); -if (!publishContext.isMarkerMessage()) { -lastDataMessagePublishedTimestamp = Clock.systemUTC().millis(); -} - // in order to sync the max position when cursor read entries -transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry()); +transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry(), +publishContext.isMarkerMessage()); publishContext.setMetadataFromEntryData(entryData); publishContext.completed(null, position.getLedgerId(), position.getEntryId());
Re: [PR] [fix][client] Fix MessageIdUtils cannot handle TopicMessageId [pulsar]
BewareMyPower commented on code in PR #22698: URL: https://github.com/apache/pulsar/pull/22698#discussion_r1599348776 ## pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java: ## @@ -19,11 +19,12 @@ package org.apache.pulsar.client.util; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.impl.MessageIdImpl; public class MessageIdUtils { Review Comment: Yes. Before removing it, we can mark it with `@Deprecated`. In 3rd party applications, it's easy to customize its own implementation via `MessageIdAdv`. You can see the example from an early implementation of KoP: https://github.com/streamnative/kop/blob/branch-2.7.4.5/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageIdUtils.java, which is also an example to show the way to compute the offset is not standard. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] fix typo and useless local variable. [pulsar]
codecov-commenter commented on PR #22704: URL: https://github.com/apache/pulsar/pull/22704#issuecomment-2109240149 ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/22704?dropdown=coverage=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) Report All modified and coverable lines are covered by tests :white_check_mark: > Project coverage is 73.20%. Comparing base [(`bbc6224`)](https://app.codecov.io/gh/apache/pulsar/commit/bbc62245c5ddba1de4b1e7cee4ab49334bc36277?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) to head [(`82f7eae`)](https://app.codecov.io/gh/apache/pulsar/pull/22704?dropdown=coverage=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache). > Report is 265 commits behind head on master. Additional details and impacted files [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/22704/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/22704?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) ```diff @@ Coverage Diff @@ ## master #22704 +/- ## - Coverage 73.57% 73.20% -0.38% - Complexity3262432858 +234 Files 1877 1889 +12 Lines139502 141392+1890 Branches 1529915516 +217 + Hits 102638 103502 +864 - Misses2890829909+1001 - Partials 7956 7981 +25 ``` | [Flag](https://app.codecov.io/gh/apache/pulsar/pull/22704/flags?src=pr=flags_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | Coverage Δ | | |---|---|---| | [inttests](https://app.codecov.io/gh/apache/pulsar/pull/22704/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `27.46% <ø> (+2.87%)` | :arrow_up: | | [systests](https://app.codecov.io/gh/apache/pulsar/pull/22704/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `24.75% <ø> (+0.43%)` | :arrow_up: | | [unittests](https://app.codecov.io/gh/apache/pulsar/pull/22704/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `72.22% <ø> (-0.63%)` | :arrow_down: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. | [Files](https://app.codecov.io/gh/apache/pulsar/pull/22704?dropdown=coverage=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | Coverage Δ | | |---|---|---| | [.../persistent/ReplicatedSubscriptionsController.java](https://app.codecov.io/gh/apache/pulsar/pull/22704?src=pr=tree=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2Fpersistent%2FReplicatedSubscriptionsController.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvUmVwbGljYXRlZFN1YnNjcmlwdGlvbnNDb250cm9sbGVyLmphdmE=) | `70.14% <ø> (-2.45%)` | :arrow_down: | ... and [357 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/22704/indirect-changes?src=pr=tree-more_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][build] Support custom image and label names [pulsar]
lhotari commented on code in PR #22703: URL: https://github.com/apache/pulsar/pull/22703#discussion_r1599345168 ## tests/docker-images/latest-version-image/pom.xml: ## @@ -152,11 +152,15 @@ - ${docker.organization}/pulsar-test-latest-version + ${docker.organization}/${docker.tag}-test-latest-version Review Comment: ```suggestion ${docker.organization}/${docker.image}-test-latest-version ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [improve][build] Bump version to 3.4.0-SNAPSHOT (#22700)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 1a7ada88056 [improve][build] Bump version to 3.4.0-SNAPSHOT (#22700) 1a7ada88056 is described below commit 1a7ada8805630f974cebc029e5ef12550c217ece Author: Cong Zhao AuthorDate: Tue May 14 11:58:48 2024 +0800 [improve][build] Bump version to 3.4.0-SNAPSHOT (#22700) --- bouncy-castle/bc/pom.xml | 2 +- bouncy-castle/bcfips-include-test/pom.xml | 2 +- bouncy-castle/bcfips/pom.xml | 2 +- bouncy-castle/pom.xml | 2 +- buildtools/pom.xml| 4 ++-- distribution/io/pom.xml | 2 +- distribution/offloaders/pom.xml | 2 +- distribution/pom.xml | 2 +- distribution/server/pom.xml | 2 +- distribution/shell/pom.xml| 2 +- docker/pom.xml| 2 +- docker/pulsar-all/pom.xml | 2 +- docker/pulsar/pom.xml | 2 +- jclouds-shaded/pom.xml| 2 +- managed-ledger/pom.xml| 2 +- microbench/pom.xml| 2 +- pom.xml | 4 ++-- pulsar-bom/pom.xml| 4 ++-- pulsar-broker-auth-athenz/pom.xml | 2 +- pulsar-broker-auth-oidc/pom.xml | 2 +- pulsar-broker-auth-sasl/pom.xml | 2 +- pulsar-broker-common/pom.xml | 2 +- pulsar-broker/pom.xml | 2 +- pulsar-cli-utils/pom.xml | 2 +- pulsar-client-1x-base/pom.xml | 2 +- pulsar-client-1x-base/pulsar-client-1x/pom.xml| 2 +- pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml | 2 +- pulsar-client-admin-api/pom.xml | 2 +- pulsar-client-admin-shaded/pom.xml| 2 +- pulsar-client-admin/pom.xml | 2 +- pulsar-client-all/pom.xml | 2 +- pulsar-client-api/pom.xml | 2 +- pulsar-client-auth-athenz/pom.xml | 2 +- pulsar-client-auth-sasl/pom.xml | 2 +- pulsar-client-messagecrypto-bc/pom.xml| 2 +- pulsar-client-shaded/pom.xml | 2 +- pulsar-client-tools-api/pom.xml | 2 +- pulsar-client-tools-customcommand-example/pom.xml | 2 +- pulsar-client-tools-test/pom.xml | 2 +- pulsar-client-tools/pom.xml | 2 +- pulsar-client/pom.xml | 2 +- pulsar-common/pom.xml | 2 +- pulsar-config-validation/pom.xml | 2 +- pulsar-docs-tools/pom.xml | 2 +- pulsar-functions/api-java/pom.xml | 2 +- pulsar-functions/instance/pom.xml | 2 +- pulsar-functions/java-examples-builtin/pom.xml| 2 +- pulsar-functions/java-examples/pom.xml| 2 +- pulsar-functions/localrun-shaded/pom.xml | 2 +- pulsar-functions/localrun/pom.xml | 2 +- pulsar-functions/pom.xml | 2 +- pulsar-functions/proto/pom.xml| 2 +- pulsar-functions/runtime-all/pom.xml | 2 +- pulsar-functions/runtime/pom.xml | 2 +- pulsar-functions/secrets/pom.xml | 2 +- pulsar-functions/utils/pom.xml| 2 +- pulsar-functions/worker/pom.xml | 2 +- pulsar-io/aerospike/pom.xml | 2 +- pulsar-io/alluxio/pom.xml | 2 +- pulsar-io/aws/pom.xml | 2 +- pulsar-io/azure-data-explorer/pom.xml | 2 +- pulsar-io/batch-data-generator/pom.xml| 2 +- pulsar-io/batch-discovery-triggerers/pom.xml | 2 +- pulsar-io/canal/pom.xml | 2 +- pulsar-io/cassandra/pom.xml | 2 +- pulsar-io/common/pom.xml | 2 +- pulsar-io/core/pom.xml| 2 +- pulsar-io/data-generator/pom.xml | 2 +- pulsar-io/debezium/core/pom.xml | 2 +- pulsar-io/debezium/mongodb/pom.xml| 2 +- pulsar-io/debezium/mssql/pom.xml | 2 +- pulsar-io/debezium/mysql/pom.xml | 2 +-
Re: [PR] [improve][build] Bump version to 3.4.0-SNAPSHOT [pulsar]
lhotari merged PR #22700: URL: https://github.com/apache/pulsar/pull/22700 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [feat][broker] Implementation of PIP-323: Complete Backlog Quota Telemetry [pulsar]
lhotari commented on PR #21816: URL: https://github.com/apache/pulsar/pull/21816#issuecomment-2109231706 @codelipenghui This is a large change and causes a lot of merge conflicts. Do we really want this in maintenance branches? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]
lhotari commented on PR #22452: URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2109230737 I'm giving up on cherry-picking #21816 since that's too large change. I'll check what the conflicts are when applying only #22656 and #22452. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]
thetumbled commented on PR #22452: URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2109229522 > cherry-picking this to branch-3.0 . it looks like #21816 and #22656 need to be cherry-picked before this one to reduce merge conflicts. Hi, Lari. You can cherry pick https://github.com/apache/pulsar/pull/22656 into branch-3.0 without conflict, and i can help to cherry pick this pr if there are any conflict. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]
lhotari commented on PR #22452: URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2109219574 cherry-picking this to branch-3.0 . it looks like #21816 and #22656 need to be cherry-picked before this one to reduce merge conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) 04/14: [fix] [client] Fix Consumer should return configured batch receive max messages (#22619)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 29ee145f53873a3a9b7f087c3c2c97749f3ca695 Author: Rajan Dhabalia AuthorDate: Thu May 2 16:57:49 2024 -0700 [fix] [client] Fix Consumer should return configured batch receive max messages (#22619) (cherry picked from commit 0219921b5b7cd157092ac8f2d86ab7e60787d36c) --- .../client/api/ConsumerBatchReceiveTest.java | 8 +++--- .../client/api/SimpleProducerConsumerTest.java | 29 ++ .../pulsar/client/impl/ConsumerBuilderImpl.java| 4 +++ 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java index d54b1c99e3e..974d25aad64 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java @@ -112,7 +112,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { // Number of message limitation exceed receiverQueue size { BatchReceivePolicy.builder() -.maxNumMessages(70) +.maxNumMessages(50) .build(), true, 50, false }, // Number of message limitation exceed receiverQueue size and timeout limitation @@ -147,7 +147,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { // Number of message limitation exceed receiverQueue size { BatchReceivePolicy.builder() -.maxNumMessages(70) +.maxNumMessages(50) .build(), false, 50, false }, // Number of message limitation exceed receiverQueue size and timeout limitation @@ -248,7 +248,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { // Number of message limitation exceed receiverQueue size { BatchReceivePolicy.builder() -.maxNumMessages(70) +.maxNumMessages(50) .build(), true, 50, true }, // Number of message limitation exceed receiverQueue size and timeout limitation @@ -283,7 +283,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { // Number of message limitation exceed receiverQueue size { BatchReceivePolicy.builder() -.maxNumMessages(70) +.maxNumMessages(50) .build(), false, 50, true }, // Number of message limitation exceed receiverQueue size and timeout limitation diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 68158dd69a4..7877d5ab604 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -4825,6 +4825,35 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { admin.topics().delete(topic, false); } +/** + * It verifies that consumer receives configured number of messages into the batch. + * @throws Exception + */ +@Test +public void testBatchReceiveWithMaxBatchSize() throws Exception { +int maxBatchSize = 100; +final int internalQueueSize = 10; +final int maxBytes = 200; +final int timeOutInSeconds = 900; +final String topic = "persistent://my-property/my-ns/testBatchReceive"; +BatchReceivePolicy batchReceivePolicy = BatchReceivePolicy.builder().maxNumBytes(maxBytes) +.maxNumMessages(maxBatchSize).timeout(timeOutInSeconds, TimeUnit.SECONDS).build(); +@Cleanup +Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic) +.subscriptionName("my-subscriber-name") +.receiverQueueSize(internalQueueSize) +.batchReceivePolicy(batchReceivePolicy).subscribe(); +@Cleanup +Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create(); + +final int numMessages = 100; +for (int i = 0; i < numMessages; i++) { +producer.newMessage().value(("value-" + i).getBytes(UTF_8)).eventTime((i + 1) *
(pulsar) 08/14: [fix] Fix Reader can be stuck from transaction aborted messages. (#22610)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit f516a852f040ba112dcea2264653af530ff5002e Author: 道君 AuthorDate: Tue May 7 20:45:16 2024 +0800 [fix] Fix Reader can be stuck from transaction aborted messages. (#22610) (cherry picked from commit 7e88463d9a598f95725bee49fd7f713bce27cf28) --- .../mledger/util/ManagedLedgerImplUtils.java | 17 ++ .../broker/service/persistent/PersistentTopic.java | 24 .../pulsar/broker/transaction/TransactionTest.java | 69 ++ .../buffer/TopicTransactionBufferTest.java | 36 +++ 4 files changed, 111 insertions(+), 35 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java index cd8671b0e62..01de115290a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java @@ -38,11 +38,7 @@ public class ManagedLedgerImplUtils { final Predicate predicate, final PositionImpl startPosition) { CompletableFuture future = new CompletableFuture<>(); -if (!ledger.isValidPosition(startPosition)) { -future.complete(startPosition); -} else { -internalAsyncReverseFindPositionOneByOne(ledger, predicate, startPosition, future); -} +internalAsyncReverseFindPositionOneByOne(ledger, predicate, startPosition, future); return future; } @@ -50,6 +46,10 @@ public class ManagedLedgerImplUtils { final Predicate predicate, final PositionImpl position, final CompletableFuture future) { +if (!ledger.isValidPosition(position)) { +future.complete(position); +return; +} ledger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryComplete(Entry entry, Object ctx) { @@ -60,12 +60,7 @@ public class ManagedLedgerImplUtils { return; } PositionImpl previousPosition = ledger.getPreviousPosition((PositionImpl) position); -if (!ledger.isValidPosition(previousPosition)) { -future.complete(previousPosition); -} else { -internalAsyncReverseFindPositionOneByOne(ledger, predicate, -ledger.getPreviousPosition((PositionImpl) position), future); -} +internalAsyncReverseFindPositionOneByOne(ledger, predicate, previousPosition, future); } catch (Exception e) { future.completeExceptionally(e); } finally { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 904e1ed670e..0f9c64565ce 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3300,18 +3300,18 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Override public CompletableFuture getLastDispatchablePosition() { -PositionImpl maxReadPosition = getMaxReadPosition(); -// If `maxReadPosition` is not equal to `LastPosition`. It means that there are uncommitted transactions. -// so return `maxRedPosition` directly. -if (maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0) { -return CompletableFuture.completedFuture(maxReadPosition); -} else { -return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> { -MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer()); -// If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer -return !Markers.isServerOnlyMarker(md); -}, maxReadPosition); -} +return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> { +MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer()); +
(pulsar) 10/14: [improve][ws] Add memory limit configuration for Pulsar client used in Websocket proxy (#22666)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit e5515c5d17366087025e2a243a988a3290728515 Author: Lari Hotari AuthorDate: Wed May 8 06:56:35 2024 +0300 [improve][ws] Add memory limit configuration for Pulsar client used in Websocket proxy (#22666) (cherry picked from commit 80d46758e89b088688d521aa8ae401bfb00c98b2) --- conf/broker.conf | 3 +++ conf/standalone.conf | 3 +++ conf/websocket.conf| 3 +++ .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java | 7 +++ .../main/java/org/apache/pulsar/websocket/WebSocketService.java| 3 ++- .../pulsar/websocket/service/WebSocketProxyConfiguration.java | 3 +++ 6 files changed, 21 insertions(+), 1 deletion(-) diff --git a/conf/broker.conf b/conf/broker.conf index 34f7ab017e9..d5f10a487f2 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1535,6 +1535,9 @@ webSocketNumServiceThreads= # Number of connections per Broker in Pulsar Client used in WebSocket proxy webSocketConnectionsPerBroker= +# Memory limit in MBs for direct memory in Pulsar Client used in WebSocket proxy +webSocketPulsarClientMemoryLimitInMB=0 + # Time in milliseconds that idle WebSocket session times out webSocketSessionIdleTimeoutMillis=30 diff --git a/conf/standalone.conf b/conf/standalone.conf index a7d9990b7cc..ab0f2cebf66 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -966,6 +966,9 @@ webSocketNumIoThreads=8 # Number of connections per Broker in Pulsar Client used in WebSocket proxy webSocketConnectionsPerBroker=8 +# Memory limit in MBs for direct memory in Pulsar Client used in WebSocket proxy +webSocketPulsarClientMemoryLimitInMB=0 + # Time in milliseconds that idle WebSocket session times out webSocketSessionIdleTimeoutMillis=30 diff --git a/conf/websocket.conf b/conf/websocket.conf index 9051f3b590c..91f7f7d4c23 100644 --- a/conf/websocket.conf +++ b/conf/websocket.conf @@ -71,6 +71,9 @@ numHttpServerThreads= # Number of connections per Broker in Pulsar Client used in WebSocket proxy webSocketConnectionsPerBroker= +# Memory limit in MBs for direct memory in Pulsar Client used in WebSocket proxy +webSocketPulsarClientMemoryLimitInMB=0 + # Time in milliseconds that idle WebSocket session times out webSocketSessionIdleTimeoutMillis=30 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 55c9771b901..ff33d9103ab 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2836,6 +2836,13 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Number of connections per Broker in Pulsar Client used in WebSocket proxy" ) private int webSocketConnectionsPerBroker = Runtime.getRuntime().availableProcessors(); + +@FieldContext( +category = CATEGORY_WEBSOCKET, +doc = "Memory limit in MBs for direct memory in Pulsar Client used in WebSocket proxy" +) +private int webSocketPulsarClientMemoryLimitInMB = 0; + @FieldContext( category = CATEGORY_WEBSOCKET, doc = "Time in milliseconds that idle WebSocket session times out" diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java index 66b2a0075ec..889f4431cc3 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java @@ -195,7 +195,8 @@ public class WebSocketService implements Closeable { private PulsarClient createClientInstance(ClusterData clusterData) throws IOException { ClientBuilder clientBuilder = PulsarClient.builder() // -.memoryLimit(0, SizeUnit.BYTES) + .memoryLimit(SizeUnit.MEGA_BYTES.toBytes(config.getWebSocketPulsarClientMemoryLimitInMB()), +SizeUnit.BYTES) .statsInterval(0, TimeUnit.SECONDS) // .enableTls(config.isTlsEnabled()) // .allowTlsInsecureConnection(config.isTlsAllowInsecureConnection()) // diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java index 3fcbcf4b215..31a1adc2915 100644 ---
(pulsar) 14/14: [fix][sec] Upgrade postgresql version to avoid CVE-2024-1597 (#22635)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit cea5409cde1c33f085fe28fd335574852913e45b Author: Nikhil Erigila <60037808+nikhilerigil...@users.noreply.github.com> AuthorDate: Fri May 10 16:41:20 2024 +0530 [fix][sec] Upgrade postgresql version to avoid CVE-2024-1597 (#22635) (cherry picked from commit 2cfd9597676828bae68c9dac74e41d65a1a29864) --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 4cfb70a620f..35378e72cda 100644 --- a/pom.xml +++ b/pom.xml @@ -188,7 +188,7 @@ flexible messaging model and an intuitive client API. 5.1.0 3.42.0.0 8.0.11 -42.5.1 +42.5.5 0.4.6 2.7.5 0.4.4-hotfix1 @@ -198,7 +198,7 @@ flexible messaging model and an intuitive client API. 8.12.1 368 1.9.7.Final -42.5.0 +42.5.5 8.0.30 1.15.16.Final
(pulsar) 12/14: [fix][broker] Fix ProducerBusy issue due to incorrect userCreatedProducerCount on non-persistent topic (#22685)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 89b545eaa8c25a5be5d5e92981474ff752f015db Author: hr <64506104+hrz...@users.noreply.github.com> AuthorDate: Thu May 9 21:49:27 2024 +0800 [fix][broker] Fix ProducerBusy issue due to incorrect userCreatedProducerCount on non-persistent topic (#22685) Co-authored-by: ruihongzhou (cherry picked from commit 253e6506ea2c5ccc6afe1117e311cf24685ce4e9) --- .../service/nonpersistent/NonPersistentTopic.java | 10 -- .../nonpersistent/NonPersistentTopicTest.java | 22 ++ 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 08d21f6591f..6e6d944c59b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.service.nonpersistent; -import static com.google.common.base.Preconditions.checkArgument; import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl.create; import static org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; @@ -55,7 +54,6 @@ import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedExcept import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Dispatcher; -import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.Replicator; import org.apache.pulsar.broker.service.StreamingStats; import org.apache.pulsar.broker.service.Subscription; @@ -244,14 +242,6 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol return false; } -@Override -public void removeProducer(Producer producer) { -checkArgument(producer.getTopic() == this); -if (producers.remove(producer.getProducerName(), producer)) { -handleProducerRemoved(producer); -} -} - @Override public CompletableFuture checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled) { return CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java index 73a1084f30f..766cc2353d4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java @@ -18,7 +18,10 @@ */ package org.apache.pulsar.broker.service.nonpersistent; +import java.lang.reflect.Field; +import java.util.UUID; import lombok.Cleanup; +import org.apache.pulsar.broker.service.AbstractTopic; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; @@ -119,4 +122,23 @@ public class NonPersistentTopicTest extends BrokerTestBase { } Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 4); } + +@Test +public void testRemoveProducerOnNonPersistentTopic() throws Exception { +final String topicName = "non-persistent://prop/ns-abc/topic_" + UUID.randomUUID(); + +Producer producer = pulsarClient.newProducer() +.topic(topicName) +.create(); + +NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); +Field field = AbstractTopic.class.getDeclaredField("userCreatedProducerCount"); +field.setAccessible(true); +int userCreatedProducerCount = (int) field.get(topic); +assertEquals(userCreatedProducerCount, 1); + +producer.close(); +userCreatedProducerCount = (int) field.get(topic); +assertEquals(userCreatedProducerCount, 0); +} }
(pulsar) 09/14: [fix][broker] Disable system topic message deduplication (#22582)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 34ce38efc6037866ccf9e02518399daeee732596 Author: Qiang Zhao AuthorDate: Wed May 8 10:53:53 2024 +0800 [fix][broker] Disable system topic message deduplication (#22582) (cherry picked from commit 5ff0fb9604e5e74eddaf35bb072541923c03f373) --- .../org/apache/pulsar/broker/service/Topic.java| 10 +++ .../service/persistent/MessageDeduplication.java | 6 +--- .../broker/service/persistent/PersistentTopic.java | 9 +++--- .../broker/service/persistent/SystemTopic.java | 16 +++ .../service/persistent/MessageDuplicationTest.java | 32 ++ 5 files changed, 64 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 188c7d8f2c7..e06d9b2f81e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -210,6 +210,16 @@ public interface Topic { void checkCursorsToCacheEntries(); +/** + * Indicate if the current topic enabled server side deduplication. + * This is a dynamic configuration, user may update it by namespace/topic policies. + * + * @return whether enabled server side deduplication + */ +default boolean isDeduplicationEnabled() { +return false; +} + void checkDeduplicationSnapshot(); void checkMessageExpiry(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index e508661364d..ab3b799093b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -217,7 +217,7 @@ public class MessageDeduplication { * returning a future to track the completion of the task */ public CompletableFuture checkStatus() { -boolean shouldBeEnabled = isDeduplicationEnabled(); +boolean shouldBeEnabled = topic.isDeduplicationEnabled(); synchronized (this) { if (status == Status.Recovering || status == Status.Removing) { // If there's already a transition happening, check later for status @@ -472,10 +472,6 @@ public class MessageDeduplication { }, null); } -private boolean isDeduplicationEnabled() { -return topic.getHierarchyTopicPolicies().getDeduplicationEnabled().get(); -} - /** * Topic will call this method whenever a producer connects. */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 0f9c64565ce..dc1b204baa5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2039,10 +2039,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return future; } -public boolean isDeduplicationEnabled() { -return messageDeduplication.isEnabled(); -} - @Override public int getNumberOfConsumers() { int count = 0; @@ -3799,6 +3795,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return ledger.isMigrated(); } +public boolean isDeduplicationEnabled() { +return getHierarchyTopicPolicies().getDeduplicationEnabled().get(); +} + public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID, String subName) { return this.subscriptions.get(subName).getTransactionInPendingAckStats(txnID); } @@ -3823,4 +3823,5 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal public Optional getShadowSourceTopic() { return Optional.ofNullable(shadowSourceTopic); } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java index 720ae3c5189..f2cec2138a3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java @@ -80,6 +80,22 @@ public class SystemTopic extends PersistentTopic { return !NamespaceService.isHeartbeatNamespace(TopicName.get(topic)); }
(pulsar) 07/14: [fix][test] Clear MockedPulsarServiceBaseTest fields to prevent test runtime memory leak (#22659)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 13f0aae8a93f91c8444d9f538f3cde4172a5788d Author: Lari Hotari AuthorDate: Mon May 6 21:48:47 2024 +0300 [fix][test] Clear MockedPulsarServiceBaseTest fields to prevent test runtime memory leak (#22659) (cherry picked from commit 025354ef4e733d62eee0d332edacb0b33b787da2) --- .../org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java | 7 +++ 1 file changed, 7 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 7380912e424..2cdb8e11da7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -258,13 +258,20 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport { } if (brokerGateway != null) { brokerGateway.close(); +brokerGateway = null; } if (pulsarTestContext != null) { pulsarTestContext.close(); pulsarTestContext = null; } + resetConfig(); onCleanup(); + +// clear fields to avoid test runtime memory leak, pulsarTestContext already handles closing of these instances +pulsar = null; +mockZooKeeper = null; +mockZooKeeperGlobal = null; } protected void onCleanup() {
(pulsar) 11/14: [fix][broker] avoid offload system topic (#22497)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 20483f54efff842e84e6fe32f33fe7ebfc352e38 Author: Qiang Zhao AuthorDate: Wed May 8 13:10:49 2024 +0800 [fix][broker] avoid offload system topic (#22497) Co-authored-by: 道君 (cherry picked from commit 3114199c185cb03a7fdb1b8af2bbc356162cf42d) --- .../pulsar/broker/service/BrokerService.java | 8 +- .../pulsar/broker/service/BrokerServiceTest.java | 93 ++ 2 files changed, 100 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 8a12178cef5..b74e6ee2c4e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1991,7 +1991,13 @@ public class BrokerService implements Closeable { topicLevelOffloadPolicies, OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)), getPulsar().getConfig().getProperties()); -if (NamespaceService.isSystemServiceNamespace(namespace.toString())) { +if (NamespaceService.isSystemServiceNamespace(namespace.toString()) +|| SystemTopicNames.isSystemTopic(topicName)) { +/* + Avoid setting broker internal system topics using off-loader because some of them are the + preconditions of other topics. The slow replying log speed will cause a delay in all the topic + loading.(timeout) + */ managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE); } else { if (topicLevelOffloadPolicies != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 1bfbe85c826..daa61070dbf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -67,11 +67,14 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; @@ -111,6 +114,8 @@ import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.LocalPolicies; +import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; +import org.apache.pulsar.common.policies.data.OffloadedReadPriority; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; @@ -1772,4 +1777,92 @@ public class BrokerServiceTest extends BrokerTestBase { admin.topics().delete(topic); admin.namespaces().deleteNamespace(namespace); } + + +@Test +public void testOffloadConfShouldNotAppliedForSystemTopic() throws PulsarAdminException { +final String driver = "aws-s3"; +final String region = "test-region"; +final String bucket = "test-bucket"; +final String role = "test-role"; +final String roleSessionName = "test-role-session-name"; +final String credentialId = "test-credential-id"; +final String credentialSecret = "test-credential-secret"; +final String endPoint = "test-endpoint"; +final Integer maxBlockSizeInBytes = 5; +final Integer readBufferSizeInBytes = 2; +final Long offloadThresholdInBytes = 10L; +final Long offloadThresholdInSeconds = 1000L; +final Long offloadDeletionLagInMillis = 5L; + +final OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create( +driver, +region, +bucket, +endPoint, +role, +
(pulsar) 13/14: [fix][client] Fix ReaderBuilder doest not give illegalArgument on connection failure retry (#22639)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 8b2d5e9ce9d5dc2c529ac7d885441c966dafa100 Author: Rajan Dhabalia AuthorDate: Fri May 10 04:10:31 2024 -0700 [fix][client] Fix ReaderBuilder doest not give illegalArgument on connection failure retry (#22639) (cherry picked from commit b56f238f6aaffdc0b37b9f6e2185b219f8708570) --- .../org/apache/pulsar/client/impl/ReaderTest.java | 27 ++ .../pulsar/client/impl/ReaderBuilderImpl.java | 5 ++-- .../apache/pulsar/client/impl/BuildersTest.java| 2 +- 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index 00c3eadb06a..df9a97c29eb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -36,6 +36,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -48,6 +50,7 @@ import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.Reader; @@ -851,4 +854,28 @@ public class ReaderTest extends MockedPulsarServiceBaseTest { assertTrue(reader.hasMessageAvailable()); } } + +@Test +public void testReaderBuilderStateOnRetryFailure() throws Exception { +String ns = "my-property/my-ns"; +String topic = "persistent://" + ns + "/testRetryReader"; +RetentionPolicies retention = new RetentionPolicies(-1, -1); +admin.namespaces().setRetention(ns, retention); +String badUrl = "pulsar://bad-host:8080"; + +PulsarClient client = PulsarClient.builder().serviceUrl(badUrl).build(); + +ReaderBuilder readerBuilder = client.newReader().topic(topic).startMessageFromRollbackDuration(100, +TimeUnit.SECONDS); + +for (int i = 0; i < 3; i++) { +try { +readerBuilder.createAsync().get(1, TimeUnit.SECONDS); +} catch (TimeoutException e) { +log.info("It should time out due to invalid url"); +} catch (IllegalArgumentException e) { +fail("It should not fail with corrupt reader state"); +} +} +} } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index ca2011cf18a..7e74a8e9c9b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -85,8 +85,9 @@ public class ReaderBuilderImpl implements ReaderBuilder { .failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder")); } -if (conf.getStartMessageId() != null && conf.getStartMessageFromRollbackDurationInSec() > 0 -|| conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0) { +boolean isStartMsgIdExist = conf.getStartMessageId() != null && conf.getStartMessageId() != MessageId.earliest; +if ((isStartMsgIdExist && conf.getStartMessageFromRollbackDurationInSec() > 0) +|| (conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0)) { return FutureUtil .failedFuture(new IllegalArgumentException( "Start message id or start message from roll back must be specified but they cannot be" diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java index 607689e0e2b..5f52f86d8b0 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java @@ -106,7 +106,7 @@ public class BuildersTest { @Test(expectedExceptions = {PulsarClientException.class}, expectedExceptionsMessageRegExp = ".* must be specified but they cannot be specified at the same time.*")
(pulsar) 06/14: [fix][storage] ReadonlyManagedLedger initialization does not fill in the properties (#22630)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 91f42873cad98a1792581fba65b53ee560fa7e96 Author: Gvan Yao <50432408+gvan...@users.noreply.github.com> AuthorDate: Sat May 4 18:35:25 2024 +0800 [fix][storage] ReadonlyManagedLedger initialization does not fill in the properties (#22630) (cherry picked from commit eee3694f00e269eef0f75d791521d0d35d8ff411) --- .../mledger/impl/ReadOnlyManagedLedgerImpl.java| 8 ++ .../impl/ReadOnlyManagedLedgerImplTest.java| 103 + 2 files changed, 111 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java index 1fdf6939506..707b71c9d9f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java @@ -32,6 +32,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException; import org.apache.bookkeeper.mledger.ReadOnlyCursor; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.pulsar.metadata.api.Stat; @@ -58,6 +59,13 @@ public class ReadOnlyManagedLedgerImpl extends ManagedLedgerImpl { ledgers.put(ls.getLedgerId(), ls); } +if (mlInfo.getPropertiesCount() > 0) { +for (int i = 0; i < mlInfo.getPropertiesCount(); i++) { +MLDataFormats.KeyValue property = mlInfo.getProperties(i); +propertiesMap.put(property.getKey(), property.getValue()); +} +} + // Last ledger stat may be zeroed, we must update it if (ledgers.size() > 0 && ledgers.lastEntry().getValue().getEntries() == 0) { long lastLedgerId = ledgers.lastKey(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImplTest.java new file mode 100644 index 000..028ecad4072 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImplTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + + +import static org.testng.Assert.assertEquals; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.testng.annotations.Test; + +public class ReadOnlyManagedLedgerImplTest extends MockedBookKeeperTestCase { +private static final String MANAGED_LEDGER_NAME_NON_PROPERTIES = "ml-non-properties"; +private static final String MANAGED_LEDGER_NAME_ATTACHED_PROPERTIES = "ml-attached-properties"; + + +@Test +public void testReadOnlyManagedLedgerImplAttachProperties() +throws ManagedLedgerException, InterruptedException, ExecutionException, TimeoutException { +final ManagedLedger ledger = factory.open(MANAGED_LEDGER_NAME_ATTACHED_PROPERTIES, +new ManagedLedgerConfig().setRetentionTime(1,
(pulsar) 05/14: [fix][sec] Upgrade elasticsearch-java version to avoid CVE-2023-4043 (#22640)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 46d802232b9f5cb479b9dd92aafa18a54de3b438 Author: Nikhil Erigila <60037808+nikhilerigil...@users.noreply.github.com> AuthorDate: Sat May 4 02:51:48 2024 +0530 [fix][sec] Upgrade elasticsearch-java version to avoid CVE-2023-4043 (#22640) (cherry picked from commit 3ca4ddfc8fecab633b473e48e1a6f78adcfbd4df) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index b3fabfbfa0c..4cfb70a620f 100644 --- a/pom.xml +++ b/pom.xml @@ -195,7 +195,7 @@ flexible messaging model and an intuitive client API. 3.3.5 2.4.10 1.2.4 -8.5.2 +8.12.1 368 1.9.7.Final 42.5.0
(pulsar) 01/14: [fix][broker] Avoid being stuck when closing the broker with extensible load manager (#22573)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit db43414efb934481b795f9b2a45311758a1fc384 Author: Yunze Xu AuthorDate: Fri Apr 26 21:30:15 2024 +0800 [fix][broker] Avoid being stuck when closing the broker with extensible load manager (#22573) (cherry picked from commit f411e3c0f26eef98382c7d06ea1676781247149b) --- .../org/apache/pulsar/broker/PulsarService.java| 3 + .../store/TableViewLoadDataStoreImpl.java | 6 +- .../pulsar/broker/service/BrokerService.java | 11 +++ .../extensions/ExtensibleLoadManagerCloseTest.java | 107 + 4 files changed, 122 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 2a17607376e..47509e9bc49 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -424,6 +424,9 @@ public class PulsarService implements AutoCloseable, ShutdownService { return closeFuture; } LOG.info("Closing PulsarService"); +if (brokerService != null) { +brokerService.unloadNamespaceBundlesGracefully(); +} state = State.Closing; // close the service in reverse order v.s. in which they are started diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java index d916e917162..81cf33b4a55 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java @@ -161,12 +161,8 @@ public class TableViewLoadDataStoreImpl implements LoadDataStore { } private void validateProducer() { -if (producer == null || !producer.isConnected()) { +if (producer == null) { try { -if (producer != null) { -producer.close(); -} -producer = null; startProducer(); log.info("Restarted producer on {}", topic); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 2f3f2560e1d..8a12178cef5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -288,6 +288,7 @@ public class BrokerService implements Closeable { private Set brokerEntryPayloadProcessors; private final TopicEventsDispatcher topicEventsDispatcher = new TopicEventsDispatcher(); +private volatile boolean unloaded = false; public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception { this.pulsar = pulsar; @@ -956,9 +957,13 @@ public class BrokerService implements Closeable { } public void unloadNamespaceBundlesGracefully(int maxConcurrentUnload, boolean closeWithoutWaitingClientDisconnect) { +if (unloaded) { +return; +} try { log.info("Unloading namespace-bundles..."); // make broker-node unavailable from the cluster +long disableBrokerStartTime = System.nanoTime(); if (pulsar.getLoadManager() != null && pulsar.getLoadManager().get() != null) { try { pulsar.getLoadManager().get().disableBroker(); @@ -967,6 +972,10 @@ public class BrokerService implements Closeable { // still continue and release bundle ownership as broker's registration node doesn't exist. } } +double disableBrokerTimeSeconds = +TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - disableBrokerStartTime)) +/ 1000.0; +log.info("Disable broker in load manager completed in {} seconds", disableBrokerTimeSeconds); // unload all namespace-bundles gracefully long closeTopicsStartTime = System.nanoTime(); @@ -1001,6 +1010,8 @@ public class BrokerService implements Closeable { } } catch (Exception e) { log.error("Failed to disable broker from loadbalancer list {}", e.getMessage(), e); +} finally { +unloaded = true; } } diff --git
(pulsar) 03/14: [fix][sec] Upgrade aws-sdk.version to avoid CVE-2024-21634 (#22633)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 04c429c7cf7949d4c503b006e04c8858392508d3 Author: Nikhil Erigila <60037808+nikhilerigil...@users.noreply.github.com> AuthorDate: Thu May 2 18:58:02 2024 +0530 [fix][sec] Upgrade aws-sdk.version to avoid CVE-2024-21634 (#22633) (cherry picked from commit bc44280e88e98fdf0a815fa384a0f52508ca4b8e) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index fbae234e855..b3fabfbfa0c 100644 --- a/pom.xml +++ b/pom.xml @@ -181,7 +181,7 @@ flexible messaging model and an intuitive client API. 4.4.20 3.4.0 5.18.0 -1.12.262 +1.12.638 1.11.3 2.10.10 2.6.0
(pulsar) 02/14: [fix][fn]make sure the classloader for ContextImpl is `functionClassLoader` in different runtimes (#22501)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit d23c77bc61a307d3e2b900584d8e98e4037d92b5 Author: Rui Fu AuthorDate: Wed May 1 13:18:05 2024 +0800 [fix][fn]make sure the classloader for ContextImpl is `functionClassLoader` in different runtimes (#22501) (cherry picked from commit d067efcc67f761babd056e1db2b9c7c1dc419a1b) --- .../apache/pulsar/functions/instance/JavaInstanceRunnable.java| 8 +++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index e2ad9e4c989..28f3dbe3cfb 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -272,9 +272,15 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { ContextImpl setupContext() throws PulsarClientException { Logger instanceLog = LoggerFactory.getILoggerFactory().getLogger( "function-" + instanceConfig.getFunctionDetails().getName()); -return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider, +ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(functionClassLoader); +return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider, collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager, pulsarAdmin, clientBuilder); +} finally { +Thread.currentThread().setContextClassLoader(clsLoader); +} } public interface AsyncResultConsumer {
(pulsar) branch branch-3.0 updated (f1daa75525c -> cea5409cde1)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git from f1daa75525c [fix][io] Fix es index creation (#22654) (#22701) new db43414efb9 [fix][broker] Avoid being stuck when closing the broker with extensible load manager (#22573) new d23c77bc61a [fix][fn]make sure the classloader for ContextImpl is `functionClassLoader` in different runtimes (#22501) new 04c429c7cf7 [fix][sec] Upgrade aws-sdk.version to avoid CVE-2024-21634 (#22633) new 29ee145f538 [fix] [client] Fix Consumer should return configured batch receive max messages (#22619) new 46d802232b9 [fix][sec] Upgrade elasticsearch-java version to avoid CVE-2023-4043 (#22640) new 91f42873cad [fix][storage] ReadonlyManagedLedger initialization does not fill in the properties (#22630) new 13f0aae8a93 [fix][test] Clear MockedPulsarServiceBaseTest fields to prevent test runtime memory leak (#22659) new f516a852f04 [fix] Fix Reader can be stuck from transaction aborted messages. (#22610) new 34ce38efc60 [fix][broker] Disable system topic message deduplication (#22582) new e5515c5d173 [improve][ws] Add memory limit configuration for Pulsar client used in Websocket proxy (#22666) new 20483f54eff [fix][broker] avoid offload system topic (#22497) new 89b545eaa8c [fix][broker] Fix ProducerBusy issue due to incorrect userCreatedProducerCount on non-persistent topic (#22685) new 8b2d5e9ce9d [fix][client] Fix ReaderBuilder doest not give illegalArgument on connection failure retry (#22639) new cea5409cde1 [fix][sec] Upgrade postgresql version to avoid CVE-2024-1597 (#22635) The 14 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: conf/broker.conf | 3 + conf/standalone.conf | 3 + conf/websocket.conf| 3 + .../mledger/impl/ReadOnlyManagedLedgerImpl.java| 8 ++ .../mledger/util/ManagedLedgerImplUtils.java | 17 ++-- .../impl/ReadOnlyManagedLedgerImplTest.java| 103 pom.xml| 8 +- .../apache/pulsar/broker/ServiceConfiguration.java | 7 ++ .../org/apache/pulsar/broker/PulsarService.java| 3 + .../store/TableViewLoadDataStoreImpl.java | 6 +- .../pulsar/broker/service/BrokerService.java | 19 +++- .../org/apache/pulsar/broker/service/Topic.java| 10 ++ .../service/nonpersistent/NonPersistentTopic.java | 10 -- .../service/persistent/MessageDeduplication.java | 6 +- .../broker/service/persistent/PersistentTopic.java | 33 --- .../broker/service/persistent/SystemTopic.java | 16 +++ .../broker/auth/MockedPulsarServiceBaseTest.java | 7 ++ .../extensions/ExtensibleLoadManagerCloseTest.java | 107 + .../pulsar/broker/service/BrokerServiceTest.java | 93 ++ .../nonpersistent/NonPersistentTopicTest.java | 22 + .../service/persistent/MessageDuplicationTest.java | 32 ++ .../pulsar/broker/transaction/TransactionTest.java | 69 + .../buffer/TopicTransactionBufferTest.java | 36 --- .../client/api/ConsumerBatchReceiveTest.java | 8 +- .../client/api/SimpleProducerConsumerTest.java | 29 ++ .../org/apache/pulsar/client/impl/ReaderTest.java | 27 ++ .../pulsar/client/impl/ConsumerBuilderImpl.java| 4 + .../pulsar/client/impl/ReaderBuilderImpl.java | 5 +- .../apache/pulsar/client/impl/BuildersTest.java| 2 +- .../functions/instance/JavaInstanceRunnable.java | 8 +- .../apache/pulsar/websocket/WebSocketService.java | 3 +- .../service/WebSocketProxyConfiguration.java | 3 + 32 files changed, 637 insertions(+), 73 deletions(-) create mode 100644 managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImplTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
Re: [PR] [fix] [broker] fix typo and useless local variable. [pulsar]
thetumbled commented on PR #22704: URL: https://github.com/apache/pulsar/pull/22704#issuecomment-2109200288 /pulsarbot rerun-failure-checks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix] Return an error when AckCumulative on a Shared/KeyShared subscription [pulsar-client-go]
RobertIndie opened a new pull request, #1217: URL: https://github.com/apache/pulsar-client-go/pull/1217 ### Motivation The consumer should return error when AckCumulative on a Shared/KeyShared subscription ### Verifying this change This change added tests ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API: (yes / no) - The schema: (yes / no / don't know) - The default values of configurations: (yes / no) - The wire protocol: (yes / no) ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][sec] Upgrade Debezium oracle connector version to avoid CVE-2023-4586 [pulsar]
lhotari commented on PR #22641: URL: https://github.com/apache/pulsar/pull/22641#issuecomment-2109172075 Note: This PR shouldn't be cherry-picked since it breaks the connector. Reverted in #22668. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Fix][broker] Limit replication rate based on bytes [pulsar]
codecov-commenter commented on PR #22674: URL: https://github.com/apache/pulsar/pull/22674#issuecomment-2108724412 ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/22674?dropdown=coverage=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) Report Attention: Patch coverage is `65.38462%` with `9 lines` in your changes are missing coverage. Please review. > Project coverage is 73.18%. Comparing base [(`bbc6224`)](https://app.codecov.io/gh/apache/pulsar/commit/bbc62245c5ddba1de4b1e7cee4ab49334bc36277?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) to head [(`a3277c3`)](https://app.codecov.io/gh/apache/pulsar/pull/22674?dropdown=coverage=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache). > Report is 264 commits behind head on master. Additional details and impacted files [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/22674/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/22674?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) ```diff @@ Coverage Diff @@ ## master #22674 +/- ## - Coverage 73.57% 73.18% -0.40% - Complexity3262432849 +225 Files 1877 1889 +12 Lines139502 141370+1868 Branches 1529915515 +216 + Hits 102638 103457 +819 - Misses2890829911+1003 - Partials 7956 8002 +46 ``` | [Flag](https://app.codecov.io/gh/apache/pulsar/pull/22674/flags?src=pr=flags_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | Coverage Δ | | |---|---|---| | [inttests](https://app.codecov.io/gh/apache/pulsar/pull/22674/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `27.52% <42.30%> (+2.93%)` | :arrow_up: | | [systests](https://app.codecov.io/gh/apache/pulsar/pull/22674/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `24.70% <0.00%> (+0.38%)` | :arrow_up: | | [unittests](https://app.codecov.io/gh/apache/pulsar/pull/22674/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `72.20% <65.38%> (-0.65%)` | :arrow_down: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. | [Files](https://app.codecov.io/gh/apache/pulsar/pull/22674?dropdown=coverage=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | Coverage Δ | | |---|---|---| | [...roker/service/persistent/PersistentReplicator.java](https://app.codecov.io/gh/apache/pulsar/pull/22674?src=pr=tree=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2Fpersistent%2FPersistentReplicator.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvUGVyc2lzdGVudFJlcGxpY2F0b3IuamF2YQ==) | `67.54% <65.38%> (-1.34%)` | :arrow_down: | ... and [353 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/22674/indirect-changes?src=pr=tree-more_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [improve] [broker] Do not create partitioned DLQ/Retry topic automatically [pulsar]
poorbarcode opened a new pull request, #22705: URL: https://github.com/apache/pulsar/pull/22705 ### Motivation After you set `defaultNumPartitions` to `16`, you will get `16` partitions per topic, and `16*16` DLQ partitions per subscription, you will get a huge number of DLQs if you have more than one subscription under one topic. For example: - create topic `t1` with `16` partitions - you will get `t1-partition-0`, `t1-partition-1`...`tp-partition-15`. - after you enable DLQ, you will get the following `16*16` DLQs per subscription - `t1-partition-0-{subscription}-DLQ-partition-0` - `t1-partition-0-{subscription}-DLQ-partition-1` - ... - `t1-partition-15-{subscription}-DLQ-partition-15` - `t1-partition-1-{subscription}-DLQ-partition-0` - `t1-partition-1-{subscription}-DLQ-partition-1` - ... - ... - ... - `t1-partition-15-{subscription}-partition-15` ### Modifications - Do not create partitioned DLQ/Retry topic automatically. - Users can also create partitioned DLQ manually if they need it. ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: x -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Fix][broker] Limit replication rate based on bytes [pulsar]
nodece commented on PR #22674: URL: https://github.com/apache/pulsar/pull/22674#issuecomment-2108568614 /pulsarbot rerun-failure-checks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Fix][broker] Limit replication rate based on bytes [pulsar]
nodece commented on PR #22674: URL: https://github.com/apache/pulsar/pull/22674#issuecomment-2108567733 /pulsarbot rerun-failure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-site) branch main updated: Fix wrong Java API call to delete partitioned topics (#897)
This is an automated email from the ASF dual-hosted git repository. visortelle pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-site.git The following commit(s) were added to refs/heads/main by this push: new 5dca18a58fa2 Fix wrong Java API call to delete partitioned topics (#897) 5dca18a58fa2 is described below commit 5dca18a58fa2b92d22c66b690f52cdbda9255233 Author: Dragos Misca AuthorDate: Mon May 13 09:48:35 2024 -0700 Fix wrong Java API call to delete partitioned topics (#897) * Fix wrong Java API call to delete partitioned topics * Update versions 2.10,2.11,3.0,3.1,3.2 --- docs/admin-api-topics.md | 2 +- versioned_docs/version-2.10.x/admin-api-topics.md | 2 +- versioned_docs/version-2.11.x/admin-api-topics.md | 2 +- versioned_docs/version-3.0.x/admin-api-topics.md | 2 +- versioned_docs/version-3.1.x/admin-api-topics.md | 2 +- versioned_docs/version-3.2.x/admin-api-topics.md | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/admin-api-topics.md b/docs/admin-api-topics.md index 2b9e1c6fdba1..0508e50c9cf7 100644 --- a/docs/admin-api-topics.md +++ b/docs/admin-api-topics.md @@ -1734,7 +1734,7 @@ pulsar-admin topics delete-partitioned-topic \ ```java -admin.topics().delete(topic); +admin.topics().deletePartitionedTopic(topic); ``` diff --git a/versioned_docs/version-2.10.x/admin-api-topics.md b/versioned_docs/version-2.10.x/admin-api-topics.md index d22c922742b3..25599f0a1103 100644 --- a/versioned_docs/version-2.10.x/admin-api-topics.md +++ b/versioned_docs/version-2.10.x/admin-api-topics.md @@ -2114,7 +2114,7 @@ $ bin/pulsar-admin topics delete-partitioned-topic \ ```java -admin.topics().delete(topic); +admin.topics().deletePartitionedTopic(topic); ``` diff --git a/versioned_docs/version-2.11.x/admin-api-topics.md b/versioned_docs/version-2.11.x/admin-api-topics.md index 0ecd500c40d8..cdaabbf26f8e 100644 --- a/versioned_docs/version-2.11.x/admin-api-topics.md +++ b/versioned_docs/version-2.11.x/admin-api-topics.md @@ -1734,7 +1734,7 @@ pulsar-admin topics delete-partitioned-topic \ ```java -admin.topics().delete(topic); +admin.topics().deletePartitionedTopic(topic); ``` diff --git a/versioned_docs/version-3.0.x/admin-api-topics.md b/versioned_docs/version-3.0.x/admin-api-topics.md index b4640562720c..37fbffa1c44c 100644 --- a/versioned_docs/version-3.0.x/admin-api-topics.md +++ b/versioned_docs/version-3.0.x/admin-api-topics.md @@ -1733,7 +1733,7 @@ pulsar-admin topics delete-partitioned-topic \ ```java -admin.topics().delete(topic); +admin.topics().deletePartitionedTopic(topic); ``` diff --git a/versioned_docs/version-3.1.x/admin-api-topics.md b/versioned_docs/version-3.1.x/admin-api-topics.md index b4640562720c..37fbffa1c44c 100644 --- a/versioned_docs/version-3.1.x/admin-api-topics.md +++ b/versioned_docs/version-3.1.x/admin-api-topics.md @@ -1733,7 +1733,7 @@ pulsar-admin topics delete-partitioned-topic \ ```java -admin.topics().delete(topic); +admin.topics().deletePartitionedTopic(topic); ``` diff --git a/versioned_docs/version-3.2.x/admin-api-topics.md b/versioned_docs/version-3.2.x/admin-api-topics.md index 2b9e1c6fdba1..0508e50c9cf7 100644 --- a/versioned_docs/version-3.2.x/admin-api-topics.md +++ b/versioned_docs/version-3.2.x/admin-api-topics.md @@ -1734,7 +1734,7 @@ pulsar-admin topics delete-partitioned-topic \ ```java -admin.topics().delete(topic); +admin.topics().deletePartitionedTopic(topic); ```
Re: [PR] Fix wrong Java API call to delete partitioned topics [pulsar-site]
visortelle merged PR #897: URL: https://github.com/apache/pulsar-site/pull/897 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [improve][build] Improve docker-push (#22702)
This is an automated email from the ASF dual-hosted git repository. zixuan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 9668674b361 [improve][build] Improve docker-push (#22702) 9668674b361 is described below commit 9668674b361aa1b7b5e72c457fc0fa9d7b324f05 Author: Zixuan Liu AuthorDate: Tue May 14 00:22:55 2024 +0800 [improve][build] Improve docker-push (#22702) Signed-off-by: Zixuan Liu --- docker/pulsar-all/pom.xml | 23 --- docker/pulsar/pom.xml | 23 --- pom.xml | 1 + 3 files changed, 9 insertions(+), 38 deletions(-) diff --git a/docker/pulsar-all/pom.xml b/docker/pulsar-all/pom.xml index 3da14ea84bc..6aa783ee9c8 100644 --- a/docker/pulsar-all/pom.xml +++ b/docker/pulsar-all/pom.xml @@ -146,6 +146,7 @@ build tag + push @@ -180,25 +181,9 @@ docker-push - - - -io.fabric8 -docker-maven-plugin - - -default -package - - build - tag - push - - - - - - + +false + diff --git a/docker/pulsar/pom.xml b/docker/pulsar/pom.xml index 79ff4bd33b1..5d83c8b5477 100644 --- a/docker/pulsar/pom.xml +++ b/docker/pulsar/pom.xml @@ -73,6 +73,7 @@ build tag + push @@ -124,25 +125,9 @@ docker-push - - - -io.fabric8 -docker-maven-plugin - - -default -package - - build - tag - push - - - - - - + +false + diff --git a/pom.xml b/pom.xml index 63b44788f14..2254d6a1875 100644 --- a/pom.xml +++ b/pom.xml @@ -103,6 +103,7 @@ flexible messaging model and an intuitive client API. To create multi-arch image, pass -Ddocker.platforms=linux/arm64,linux/amd64 --> +true
Re: [PR] [improve][build] Improve docker-push [pulsar]
nodece merged PR #22702: URL: https://github.com/apache/pulsar/pull/22702 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Fix wrong Java API call to delete partitioned topics [pulsar-site]
dragosvictor opened a new pull request, #897: URL: https://github.com/apache/pulsar-site/pull/897 ### ✅ Contribution Checklist - [x] I read the [contribution guide](https://pulsar.apache.org/contribute/document-contribution/) - [ ] I updated the [versioned docs](https://pulsar.apache.org/contribute/document-contribution/#update-versioned-docs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [cleanup] Remove AvroCodec from JSONSchema [pulsar-client-go]
crossoverJie opened a new pull request, #1216: URL: https://github.com/apache/pulsar-client-go/pull/1216 ### Modifications `AvroCodec` is not used. Remove `AvroCodec` from `JSONSchema`. ### Verifying this change - [x] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - Dependencies (does it add or upgrade a dependency): (no) - The public API: (no) - The schema: (no) - The default values of configurations: (no) - The wire protocol: (no) ### Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (GoDocs) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug] Deadlock in broker service while initializing bkClient [pulsar]
Meet0861 commented on issue #22699: URL: https://github.com/apache/pulsar/issues/22699#issuecomment-2107538542 @lhotari [#21096](https://github.com/apache/pulsar/pull/21096) seems like already cherry-picked in 2.10. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] fix typo and useless local variable. [pulsar]
thetumbled commented on PR #22704: URL: https://github.com/apache/pulsar/pull/22704#issuecomment-2107507829 PTAL, thanks. @lhotari @Technoboy- @dao-jun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix] [broker] typo [pulsar]
thetumbled opened a new pull request, #22704: URL: https://github.com/apache/pulsar/pull/22704 ### Motivation fix typo ### Modifications ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][build] Support custom image and label names [pulsar]
lhotari commented on code in PR #22703: URL: https://github.com/apache/pulsar/pull/22703#discussion_r1598408361 ## tests/docker-images/latest-version-image/Dockerfile: ## @@ -27,12 +27,14 @@ RUN cd /go/src/github.com/apache/pulsar/pulsar-function-go/pf && go install RUN cd /go/src/github.com/apache/pulsar/pulsar-function-go/examples && go install ./... # Reference pulsar-all to copy connectors from there -FROM apachepulsar/pulsar-all:latest as pulsar-all +ARG PULSAR_ALL_IMAGE +FROM $PULSAR_ALL_IMAGE as pulsar-all ## Main image build -FROM apachepulsar/pulsar:latest +ARG PULSAR_IMAGE +FROM $PULSAR_IMAGE Review Comment: It seems that `PULSAR_ALL_IMAGE` and `PULSAR_IMAGE` aren't passed as args for building the docker image -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][build] Support custom image and label names [pulsar]
lhotari commented on code in PR #22703: URL: https://github.com/apache/pulsar/pull/22703#discussion_r1598408361 ## tests/docker-images/latest-version-image/Dockerfile: ## @@ -27,12 +27,14 @@ RUN cd /go/src/github.com/apache/pulsar/pulsar-function-go/pf && go install RUN cd /go/src/github.com/apache/pulsar/pulsar-function-go/examples && go install ./... # Reference pulsar-all to copy connectors from there -FROM apachepulsar/pulsar-all:latest as pulsar-all +ARG PULSAR_ALL_IMAGE +FROM $PULSAR_ALL_IMAGE as pulsar-all ## Main image build -FROM apachepulsar/pulsar:latest +ARG PULSAR_IMAGE +FROM $PULSAR_IMAGE Review Comment: It seems that `PULSAR_ALL_IMAGE` and `PULSAR_IMAGE` aren't passed as args for building the docker image -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][io] Fix es index creation [pulsar]
nodece commented on PR #22654: URL: https://github.com/apache/pulsar/pull/22654#issuecomment-2107421025 Opened a PR to cherry-pick this to the branch-3.0: #22701 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][io] Fix es index creation (#22654) [pulsar]
nodece merged PR #22701: URL: https://github.com/apache/pulsar/pull/22701 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.0 updated: [fix][io] Fix es index creation (#22654) (#22701)
This is an automated email from the ASF dual-hosted git repository. zixuan pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new f1daa75525c [fix][io] Fix es index creation (#22654) (#22701) f1daa75525c is described below commit f1daa75525c99bd6093d8ac28b6e635bdb9b8011 Author: Zixuan Liu AuthorDate: Mon May 13 20:13:50 2024 +0800 [fix][io] Fix es index creation (#22654) (#22701) Signed-off-by: Zixuan Liu --- .../elastic/ElasticSearchJavaRestClient.java | 4 +- .../io/elasticsearch/ElasticSearchSinkTests.java | 53 +- 2 files changed, 33 insertions(+), 24 deletions(-) diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java index afda5ba0e74..133daa8cd6a 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java @@ -144,7 +144,7 @@ public class ElasticSearchJavaRestClient extends RestClient { public boolean deleteDocument(String index, String documentId) throws IOException { final DeleteRequest req = new DeleteRequest.Builder() -.index(config.getIndexName()) +.index(index) .id(documentId) .build(); @@ -156,7 +156,7 @@ public class ElasticSearchJavaRestClient extends RestClient { public boolean indexDocument(String index, String documentId, String documentSource) throws IOException { final Map mapped = objectMapper.readValue(documentSource, Map.class); final IndexRequest indexRequest = new IndexRequest.Builder<>() -.index(config.getIndexName()) +.index(index) .document(mapped) .id(documentId) .build(); diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java index 62592f5f09b..4a16caf3ede 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java @@ -18,39 +18,41 @@ */ package org.apache.pulsar.io.elasticsearch; -import co.elastic.clients.transport.ElasticsearchTransport; -import com.fasterxml.jackson.core.JsonParseException; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.schema.GenericObject; -import org.apache.pulsar.client.api.schema.GenericRecord; -import org.apache.pulsar.client.api.schema.GenericSchema; -import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; -import org.apache.pulsar.client.api.schema.SchemaBuilder; -import org.apache.pulsar.client.impl.MessageImpl; -import org.apache.pulsar.common.schema.KeyValue; -import org.apache.pulsar.common.schema.KeyValueEncodingType; -import org.apache.pulsar.common.schema.SchemaType; - import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; - +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import co.elastic.clients.transport.ElasticsearchTransport; +import com.fasterxml.jackson.core.JsonParseException; +import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.HashMap; import java.util.List; import java.util.Locale; -import java.io.IOException; -import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.GenericObject; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.GenericSchema; +import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; +import org.apache.pulsar.client.api.schema.SchemaBuilder; +import org.apache.pulsar.client.impl.MessageImpl;
Re: [I] [Bug] [branch-3.0] The test SimpleProducerConsumerTest.testAccessAvroSchemaMetadata always fails [pulsar]
NesaraK commented on issue #22470: URL: https://github.com/apache/pulsar/issues/22470#issuecomment-2107411865 The issue was related to the unit testing framework's handling of schema changes. Previously, tests would fail whenever there was a schema change due to lack of proper error handling. The solution was to restart the server for every test case. In the latest branch (3.3), the issue has been resolved by introducing try-catch and finally blocks for better error handling within the unit tests. While the fix in branch 3.3 addresses the immediate issue, consider implementing the following improvements: Use separate topics for each test case to avoid interference due to schema changes. If two different schemas (e.g., JSON and Avro) were uploaded on different topics, the tests would have passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [fix][broker] fix replicated subscriptions for transactional messages (#22452)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 9fd1b61fc45 [fix][broker] fix replicated subscriptions for transactional messages (#22452) 9fd1b61fc45 is described below commit 9fd1b61fc45d06348af0241f002966087f1822a0 Author: Wenzhi Feng <52550727+thetumb...@users.noreply.github.com> AuthorDate: Mon May 13 20:04:55 2024 +0800 [fix][broker] fix replicated subscriptions for transactional messages (#22452) --- .../broker/service/persistent/PersistentTopic.java | 21 ++- .../ReplicatedSubscriptionsController.java | 4 +- .../transaction/buffer/TransactionBuffer.java | 3 +- .../buffer/impl/InMemTransactionBuffer.java| 13 +- .../buffer/impl/TopicTransactionBuffer.java| 70 ++-- .../buffer/impl/TransactionBufferDisable.java | 13 +- .../pulsar/broker/service/PersistentTopicTest.java | 4 +- .../broker/service/ReplicatorSubscriptionTest.java | 25 +++ .../TransactionalReplicateSubscriptionTest.java| 182 + .../broker/transaction/TransactionProduceTest.java | 36 .../pulsar/broker/transaction/TransactionTest.java | 2 +- 11 files changed, 342 insertions(+), 31 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 28bc27f7961..69c7f404fdd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -134,6 +134,7 @@ import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.NamespaceStats; import org.apache.pulsar.broker.stats.ReplicationMetrics; import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; +import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.client.admin.LongRunningProcessStatus; @@ -272,10 +273,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Getter protected final TransactionBuffer transactionBuffer; +@Getter +private final TopicTransactionBuffer.MaxReadPositionCallBack maxReadPositionCallBack = +(oldPosition, newPosition) -> updateMaxReadPositionMovedForwardTimestamp(); -// Record the last time a data message (ie: not an internal Pulsar marker) is published on the topic +// Record the last time max read position is moved forward, unless it's a marker message. @Getter -private volatile long lastDataMessagePublishedTimestamp = 0; +private volatile long lastMaxReadPositionMovedForwardTimestamp = 0; @Getter private final ExecutorService orderedExecutor; @@ -410,7 +414,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } else { this.transactionBuffer = new TransactionBufferDisable(this); } -transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry()); +transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry(), true); if (ledger instanceof ShadowManagedLedgerImpl) { shadowSourceTopic = TopicName.get(ledger.getConfig().getShadowSource()); } else { @@ -719,6 +723,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } } +private void updateMaxReadPositionMovedForwardTimestamp() { +lastMaxReadPositionMovedForwardTimestamp = Clock.systemUTC().millis(); +} + @Override public void addComplete(Position pos, ByteBuf entryData, Object ctx) { PublishContext publishContext = (PublishContext) ctx; @@ -727,12 +735,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal // Message has been successfully persisted messageDeduplication.recordMessagePersisted(publishContext, position); -if (!publishContext.isMarkerMessage()) { -lastDataMessagePublishedTimestamp = Clock.systemUTC().millis(); -} - // in order to sync the max position when cursor read entries -transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry()); +transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry(), +publishContext.isMarkerMessage());
Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]
poorbarcode merged PR #22452: URL: https://github.com/apache/pulsar/pull/22452 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][pip] PIP-347: add role field in consumer's stat [pulsar]
thetumbled commented on code in PR #22564: URL: https://github.com/apache/pulsar/pull/22564#discussion_r1598211480 ## pip/pip-347.md: ## @@ -0,0 +1,63 @@ + +# PIP-347: add role field in consumer's stat + +# Background knowledge + +During the operation and maintenance process, there are many users asking administrator for help to find out the consumers of a topic and notify them about the business change. +Administrators can call `bin/pulsar-admin topics partitioned-stats` to find out the `ip:port` of the consumers, but no role info. So administrators need to take a lot of time to +communicate with users to find out the owner based on the `ip:port`. It's a troublesome work and low efficiency. + +# Motivation + +This pip can help to solve such kind of problem. By adding a field `role` in the consumer's stat, the administrator can find out the owner of the consumer directly. +It can save a lot of time and improve the efficiency of the operation and maintenance process. Review Comment: In our cases, we can't trace back a consumer by subscription name, the consumer name and the properties. For an subscription bypassing out management platform, there is no way for us to find out the owner for now. As the field name `role` is a little confused since the Pulsar's access control is not role based. We can rename the field exposed to `appId`, as `appId` is one of the property of the consumer object. For JWT-based authentication users, we can use `appId` to retrieve the role info of the consumer. https://github.com/apache/pulsar/blob/854a0560eb77174d7207dafc344b242ae6f92b9f/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L203-L203 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix][io] Fix es index creation (#22654) [pulsar]
nodece opened a new pull request, #22701: URL: https://github.com/apache/pulsar/pull/22701 (cherry picked from commit efcedf6b0d4217db7e47efef3420eb61da282c50) ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][pip] PIP-347: add role field in consumer's stat [pulsar]
thetumbled commented on code in PR #22564: URL: https://github.com/apache/pulsar/pull/22564#discussion_r1598211480 ## pip/pip-347.md: ## @@ -0,0 +1,63 @@ + +# PIP-347: add role field in consumer's stat + +# Background knowledge + +During the operation and maintenance process, there are many users asking administrator for help to find out the consumers of a topic and notify them about the business change. +Administrators can call `bin/pulsar-admin topics partitioned-stats` to find out the `ip:port` of the consumers, but no role info. So administrators need to take a lot of time to +communicate with users to find out the owner based on the `ip:port`. It's a troublesome work and low efficiency. + +# Motivation + +This pip can help to solve such kind of problem. By adding a field `role` in the consumer's stat, the administrator can find out the owner of the consumer directly. +It can save a lot of time and improve the efficiency of the operation and maintenance process. Review Comment: In our cases, we can't trace back a consumer by subscription name, the consumer name and the properties. For an subscription bypassing out management platform, there is no way for us to find out the owner for now. As the field name `role` is a little confused since the Pulsar's access control is not role based. We can rename the field exposed to `appId`, as `appId` is one of the property of the consumer object. For token-based authentication users, we can use `appId` to retrieve the role info of the consumer. https://github.com/apache/pulsar/blob/854a0560eb77174d7207dafc344b242ae6f92b9f/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L203-L203 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][pip] PIP-347: add role field in consumer's stat [pulsar]
codelipenghui commented on PR #22564: URL: https://github.com/apache/pulsar/pull/22564#issuecomment-2107157309 > There are times when the role is more sensitive information and I would recommend adding a configuration to expose the role. It should be fine since we already exposed the sub name, consumer name and more information for users who has LOOKUP permission with default Pulsar authorization. And Pulsar can also support fine-grained access control by authorization plugin. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][pip] PIP-347: add role field in consumer's stat [pulsar]
thetumbled commented on code in PR #22564: URL: https://github.com/apache/pulsar/pull/22564#discussion_r1598211480 ## pip/pip-347.md: ## @@ -0,0 +1,63 @@ + +# PIP-347: add role field in consumer's stat + +# Background knowledge + +During the operation and maintenance process, there are many users asking administrator for help to find out the consumers of a topic and notify them about the business change. +Administrators can call `bin/pulsar-admin topics partitioned-stats` to find out the `ip:port` of the consumers, but no role info. So administrators need to take a lot of time to +communicate with users to find out the owner based on the `ip:port`. It's a troublesome work and low efficiency. + +# Motivation + +This pip can help to solve such kind of problem. By adding a field `role` in the consumer's stat, the administrator can find out the owner of the consumer directly. +It can save a lot of time and improve the efficiency of the operation and maintenance process. Review Comment: In our cases, we can't trace back a consumer by subscription name, the consumer name and the properties. For an subscription bypassing out management platform, there is no way for us to find out the owner for now. As the field name `role` is a little confused since the Pulsar's access control is not role based. We can rename the field exposed to `appId`, as `appId` is one of the property of the consumer object. For token-based authentication users, we can use `appId` to retrieve the role info of the consumer. ## pip/pip-347.md: ## @@ -0,0 +1,63 @@ + +# PIP-347: add role field in consumer's stat + +# Background knowledge + +During the operation and maintenance process, there are many users asking administrator for help to find out the consumers of a topic and notify them about the business change. +Administrators can call `bin/pulsar-admin topics partitioned-stats` to find out the `ip:port` of the consumers, but no role info. So administrators need to take a lot of time to +communicate with users to find out the owner based on the `ip:port`. It's a troublesome work and low efficiency. + +# Motivation + +This pip can help to solve such kind of problem. By adding a field `role` in the consumer's stat, the administrator can find out the owner of the consumer directly. +It can save a lot of time and improve the efficiency of the operation and maintenance process. Review Comment: In our cases, we can't trace back a consumer by subscription name, the consumer name and the properties. For an subscription bypassing out management platform, there is no way for us to find out the owner for now. As the field name `role` is a little confused since the Pulsar's access control is not role based. We can rename the field exposed to `appId`, as `appId` is one of the property of the consumer object. For token-based authentication users, we can use `appId` to retrieve the role info of the consumer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [improve][build] Bump version to 3.4.0-SNAPSHOT [pulsar]
coderzc opened a new pull request, #22700: URL: https://github.com/apache/pulsar/pull/22700 Fixes #xyz Main Issue: #xyz PIP: #xyz ### Motivation branch-3.3 has been split from master branch and now master branch development is for 3.4.0-SNAPSHOT. ### Modifications set project version to 3.4.0-SNAPSHOT ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [improve][broker]Ensure namespace deletion doesn't fail (#22627)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 936afecede8 [improve][broker]Ensure namespace deletion doesn't fail (#22627) 936afecede8 is described below commit 936afecede8374b14d13e9d48e9372fec1c27447 Author: Enrico Olivelli AuthorDate: Mon May 13 11:50:39 2024 +0200 [improve][broker]Ensure namespace deletion doesn't fail (#22627) --- .../pulsar/broker/resources/BaseResources.java | 27 - .../broker/resources/LocalPoliciesResources.java | 2 +- .../broker/resources/NamespaceResources.java | 17 +-- .../pulsar/broker/resources/TopicResources.java| 35 -- .../pulsar/broker/admin/impl/NamespacesBase.java | 16 -- .../SystemTopicBasedTopicPoliciesService.java | 3 +- .../apache/pulsar/metadata/api/MetadataStore.java | 22 ++ .../metadata/impl/AbstractMetadataStore.java | 13 8 files changed, 78 insertions(+), 57 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java index 4011a482075..00e381e0729 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java @@ -197,22 +197,21 @@ public class BaseResources { } protected CompletableFuture deleteIfExistsAsync(String path) { -return cache.exists(path).thenCompose(exists -> { -if (!exists) { -return CompletableFuture.completedFuture(null); +log.info("Deleting path: {}", path); +CompletableFuture future = new CompletableFuture<>(); +cache.delete(path).whenComplete((ignore, ex) -> { +if (ex != null && ex.getCause() instanceof MetadataStoreException.NotFoundException) { +log.info("Path {} did not exist in metadata store", path); +future.complete(null); +} else if (ex != null) { +log.info("Failed to delete path from metadata store: {}", path, ex); +future.completeExceptionally(ex); +} else { +log.info("Deleted path from metadata store: {}", path); +future.complete(null); } -CompletableFuture future = new CompletableFuture<>(); -cache.delete(path).whenComplete((ignore, ex) -> { -if (ex != null && ex.getCause() instanceof MetadataStoreException.NotFoundException) { -future.complete(null); -} else if (ex != null) { -future.completeExceptionally(ex); -} else { -future.complete(null); -} -}); -return future; }); +return future; } protected boolean exists(String path) throws MetadataStoreException { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java index c6b658c3bd0..ae3479fde59 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java @@ -79,7 +79,7 @@ public class LocalPoliciesResources extends BaseResources { } public CompletableFuture deleteLocalPoliciesAsync(NamespaceName ns) { -return deleteAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString())); +return deleteIfExistsAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString())); } public CompletableFuture deleteLocalPoliciesTenantAsync(String tenant) { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 975b23192f9..9d7c60cd344 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -115,7 +115,7 @@ public class NamespaceResources extends BaseResources { } public CompletableFuture deletePoliciesAsync(NamespaceName ns){ -return deleteAsync(joinPath(BASE_POLICIES_PATH, ns.toString())); +return deleteIfExistsAsync(joinPath(BASE_POLICIES_PATH, ns.toString())); } public Optional getPolicies(NamespaceName ns) throws MetadataStoreException{ @@ -155,10 +155,18 @@ public class NamespaceResources extends
Re: [PR] [improve][broker]Ensure namespace deletion doesn't fail [pulsar]
eolivelli merged PR #22627: URL: https://github.com/apache/pulsar/pull/22627 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix: fix print message id without batch index [pulsar-client-go]
geniusjoe commented on PR #1211: URL: https://github.com/apache/pulsar-client-go/pull/1211#issuecomment-2107100692 I think the unit test case ran fail is because `TestGetMessagesByID` function in `pulsaradmin/pkg/admin/subscription_test.go` use message `id.String()` as a map key to check if current messages are in a same batch. When we change `id.String()` from `fmt.Sprintf("%d:%d:%d", id.ledgerID, id.entryID, id.partitionIdx)` to `fmt.Sprintf("%d:%d:%d:%d", id.ledgerID, id.entryID, id.partitionIdx, id.batchIdx)`, map key changed, so that the test case cannot distinguish messages in the same batch. ``` for i := 0; i <= numberMessages; i++ { producer.SendAsync(ctx, { Payload: []byte(fmt.Sprintf("hello-%d", i)), }, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) { assert.Nil(t, err) messageIDMap[id.String()]++ wg.Done() }) } ``` If we need to fix test case, we can replace `messageIDMap[id.String()]` to `messageIDMap[fmt.Sprintf("%d:%d:%d", id.ledgerID, id.entryID, id.partitionIdx)]`, but I think this pr is an incompatibility change, so it should be greater to add change log in documentation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][pip] PIP-347: add role field in consumer's stat [pulsar]
codelipenghui commented on code in PR #22564: URL: https://github.com/apache/pulsar/pull/22564#discussion_r1598150499 ## pip/pip-347.md: ## @@ -0,0 +1,63 @@ + +# PIP-347: add role field in consumer's stat + +# Background knowledge + +During the operation and maintenance process, there are many users asking administrator for help to find out the consumers of a topic and notify them about the business change. +Administrators can call `bin/pulsar-admin topics partitioned-stats` to find out the `ip:port` of the consumers, but no role info. So administrators need to take a lot of time to +communicate with users to find out the owner based on the `ip:port`. It's a troublesome work and low efficiency. + +# Motivation + +This pip can help to solve such kind of problem. By adding a field `role` in the consumer's stat, the administrator can find out the owner of the consumer directly. +It can save a lot of time and improve the efficiency of the operation and maintenance process. Review Comment: For the RBAC's perspective, you might don't have many roles defined in Pulsar. So there will be a lot of clients using the same role and it also could be associated with multiple roles. I don't that will help much in troubleshooting where the consumer comes from. In Pulsar, the role_name is a little confused since the Pulsar's access control is not role based. The role_name is actually the principle or subject. Expose as a role name will be confused for users who will have diverse access control plugins. Actually, Pulsar already has such capacity to locate where the consumer from such as the subscription name, the consumer name, the properties which already exposed in the topic stats. Users can define the property sub name, consumer name as well the specific properties for locating the consumer if needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-2.10 updated: [fix][build] Fix code style
This is an automated email from the ASF dual-hosted git repository. zixuan pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.10 by this push: new f89fa720a78 [fix][build] Fix code style f89fa720a78 is described below commit f89fa720a782c8071cce6fc02fdacb0d47e758db Author: Zixuan Liu AuthorDate: Mon May 13 16:58:53 2024 +0800 [fix][build] Fix code style Signed-off-by: Zixuan Liu --- .../main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java | 1 - .../main/java/org/apache/pulsar/broker/service/BrokerService.java | 3 ++- .../src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java | 7 ++- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 57fdf6f2849..c040d2c268c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -49,7 +49,6 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.broker.namespace.NamespaceService; -import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.web.RestException; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index f6103dbae76..11840f3c89a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1193,7 +1193,8 @@ public class BrokerService implements Closeable { if (log.isDebugEnabled()) { log.debug("Broker is unable to load non-persistent topic {}", topic); } -topicFuture.completeExceptionally(new NotAllowedException("Broker is not unable to load non-persistent topic")); +topicFuture.completeExceptionally( +new NotAllowedException("Broker is not unable to load non-persistent topic")); return topicFuture; } final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index b1afe95465d..cb6164b9888 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -322,8 +322,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne interceptorMessage.getProperties(); } -int msgSize = interceptorMessage.getDataBuffer().readableBytes(); -sendAsync(interceptorMessage, new DefaultSendMessageCallback(future, interceptorMessage, msgSize)); +sendAsync(interceptorMessage, new DefaultSendMessageCallback(future, interceptorMessage)); return future; } @@ -331,15 +330,13 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne CompletableFuture sendFuture; MessageImpl currentMsg; -int msgSize; long createdAt = System.nanoTime(); SendCallback nextCallback = null; MessageImpl nextMsg = null; -DefaultSendMessageCallback(CompletableFuture sendFuture, MessageImpl currentMsg, int msgSize) { +DefaultSendMessageCallback(CompletableFuture sendFuture, MessageImpl currentMsg) { this.sendFuture = sendFuture; this.currentMsg = currentMsg; -this.msgSize = msgSize; } @Override
Re: [I] [Bug] Ctrl-C does not terminating the current shell operation in pulsar shell [pulsar]
mukesh154 commented on issue #22671: URL: https://github.com/apache/pulsar/issues/22671#issuecomment-2106996608 @nodece I tried with latest(3.3.0-SHAPSHOT) version. And i see the same issue. It is reproducible on this version as well. Might be you are misunderstanding the issue at hand. The problem is when a user the presses Ctrl-C during an ongoing operation let's say `client consume -s s1 my-topic -n 1000`, the user wants to terminate the client consume command but as per current behaviour, the shell keeps running the command until it finishes. It just print an exception on pressing Ctrl-C `org.apache.pulsar.shell.PulsarShell$InterruptShellException` the stack trace is added in the issue description. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][client] Fix client side memory leak when call MessageImpl.create and fix imprecise client-side metrics: pendingMessagesUpDownCounter, pendingBytesUpDownCounter, latencyHistogram [pulsar
nodece commented on PR #22393: URL: https://github.com/apache/pulsar/pull/22393#issuecomment-2106957945 This PR broke the branch-2.10 CI. ``` Error: Medium: Unread field: org.apache.pulsar.client.impl.ProducerImpl$DefaultSendMessageCallback.msgSize [org.apache.pulsar.client.impl.ProducerImpl$DefaultSendMessageCallback] At ProducerImpl.java:[line 342] URF_UNREAD_FIELD Error: Failed to execute goal com.github.spotbugs:spotbugs-maven-plugin:4.2.2:check (spotbugs) on project pulsar-client-original: failed with 1 bugs and 0 errors -> [Help 1] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [D] intermittent Messages loss while consuming the messages from consumer [pulsar]
GitHub user ragaur-tibco edited a comment on the discussion: intermittent Messages loss while consuming the messages from consumer Hi @lhotari we have two class one is producer.java and another consumer.java PulsarProducerExample .java = ``` `import org.apache.pulsar.client.api.*; public class PulsarProducerExample { public static void main(String[] args) throws PulsarClientException { String serviceUrl = "pulsar://localhost:6650"; String topicName = "topic-Name""; // Create a Pulsar client instance PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .build(); // Create a producer Producer producer = client.newProducer() .topic(topicName) .create(); // Send messages for (int i = 0; i < 100; i++) { String message = "Message " + i; producer.send(message.getBytes()); System.out.println("Sent message: " + message); } // Close the producer and Pulsar client producer.close(); client.close(); } } ` ``` Consumer.java = ``` `import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; public class Consumer{ public static void main(String[] args) throws Exception { String serviceUrl = "pulsar://localhost:6650"; String topicName = "topic-Name"; String subscriptionName = "subscriptionName"; PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .build(); Consumer consumer1 = client.newConsumer() .topic(topicName) .subscriptionName(subscriptionName) .subscriptionType(SubscriptionType.Shared) // Use Shared subscription type .subscribe(); while (true) { Message msg = consumer1.receive(); System.out.println("from consumer 1=="+consumer1.receive()); consumer1.acknowledgeAsync(msg); } } } ` ``` Steps to reproduce: 1. Run consumer and topic will be created by itself 2. check the topic stats one consumer created in subscription name 3. stop the consumer and then run the producer and send 100 messages 4. Run consumer and check the topic stats now you will be able to see that a different consumer with different name and some time with same name created (if in consumerName parameter provided with the same consumer name which is already present) and 100 messages available in previous consumer but not in new consumer. Due to which not able to receive messages Queries: 1. Is there any api from which we can receive the messages from the specific consumer name 2. Is there any way to fetch the all the messages with is available in subscription from all the consumers GitHub link: https://github.com/apache/pulsar/discussions/22681#discussioncomment-9415732 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] intermittent Messages loss while consuming the messages from consumer [pulsar]
GitHub user ragaur-tibco edited a comment on the discussion: intermittent Messages loss while consuming the messages from consumer Hi @lhotari we have two class one is producer.java and another consumer.java PulsarProducerExample .java = ``` `import org.apache.pulsar.client.api.*; public class PulsarProducerExample { public static void main(String[] args) throws PulsarClientException { String serviceUrl = "pulsar://localhost:6650"; String topicName = "topic-Name""; // Create a Pulsar client instance PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .build(); // Create a producer Producer producer = client.newProducer() .topic(topicName) .create(); // Send messages for (int i = 0; i < 100; i++) { String message = "Message " + i; producer.send(message.getBytes()); System.out.println("Sent message: " + message); } // Close the producer and Pulsar client producer.close(); client.close(); } } ` ``` Consumer.java = ``` `import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; public class Consumer{ public static void main(String[] args) throws Exception { String serviceUrl = "pulsar://localhost:6650"; String topicName = "topic-Name"; String subscriptionName = "subscriptionName"; PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .build(); Consumer consumer1 = client.newConsumer() .topic(topicName) .subscriptionName(subscriptionName) .subscriptionType(SubscriptionType.Shared) // Use Shared subscription type .subscribe(); while (true) { System.out.println("from consumer 1=="+consumer1.receive()); } } } ` ``` Steps to reproduce: 1. Run consumer and topic will be created by itself 2. check the topic stats one consumer created in subscription name 3. stop the consumer and then run the producer and send 100 messages 4. Run consumer and check the topic stats now you will be able to see that a different consumer with different name and some time with same name created (if in consumerName parameter provided with the same consumer name which is already present) and 100 messages available in previous consumer but not in new consumer. Due to which not able to receive messages Queries: 1. Is there any api from which we can receive the messages from the specific consumer name 2. Is there any way to fetch the all the messages with is available in subscription from all the consumers GitHub link: https://github.com/apache/pulsar/discussions/22681#discussioncomment-9415732 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] intermittent Messages loss while consuming the messages from consumer [pulsar]
GitHub user ragaur-tibco edited a comment on the discussion: intermittent Messages loss while consuming the messages from consumer Hi @lhotari we have two class one is producer.java and another consumer.java PulsarProducerExample .java = ``` `import org.apache.pulsar.client.api.*; public class PulsarProducerExample { public static void main(String[] args) throws PulsarClientException { String serviceUrl = "pulsar://localhost:6650"; String topicName = "persistent://public/default/my-topic"; // Create a Pulsar client instance PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .build(); // Create a producer Producer producer = client.newProducer() .topic(topicName) .create(); // Send messages for (int i = 0; i < 100; i++) { String message = "Message " + i; producer.send(message.getBytes()); System.out.println("Sent message: " + message); } // Close the producer and Pulsar client producer.close(); client.close(); } } ` ``` Consumer.java = ``` `import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; public class Consumer{ public static void main(String[] args) throws Exception { String serviceUrl = "pulsar://localhost:6650"; String topicName = "topic-Name"; String subscriptionName = "subscriptionName"; PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .build(); Consumer consumer1 = client.newConsumer() .topic(topicName) .subscriptionName(subscriptionName) .subscriptionType(SubscriptionType.Shared) // Use Shared subscription type .subscribe(); while (true) { System.out.println("from consumer 1=="+consumer1.receive()); } } } ` ``` Steps to reproduce: 1. Run consumer and topic will be created by itself 2. check the topic stats one consumer created in subscription name 3. stop the consumer and then run the producer and send 100 messages 4. Run consumer and check the topic stats now you will be able to see that a different consumer with different name and some time with same name created (if in consumerName parameter provided with the same consumer name which is already present) and 100 messages available in previous consumer but not in new consumer. Due to which not able to receive messages Queries: 1. Is there any api from which we can receive the messages from the specific consumer name 2. Is there any way to fetch the all the messages with is available in subscription from all the consumers GitHub link: https://github.com/apache/pulsar/discussions/22681#discussioncomment-9415732 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] intermittent Messages loss while consuming the messages from consumer [pulsar]
GitHub user ragaur-tibco added a comment to the discussion: intermittent Messages loss while consuming the messages from consumer Hi @lhotari we have two class one is producer.java and another consumer.java PulsarProducerExample .java = ``` `import org.apache.pulsar.client.api.*; public class PulsarProducerExample { public static void main(String[] args) throws PulsarClientException { String serviceUrl = "pulsar://localhost:6650"; String topicName = "persistent://public/default/my-topic"; // Create a Pulsar client instance PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .build(); // Create a producer Producer producer = client.newProducer() .topic(topicName) .create(); // Send messages for (int i = 0; i < 100; i++) { String message = "Message " + i; producer.send(message.getBytes()); System.out.println("Sent message: " + message); } // Close the producer and Pulsar client producer.close(); client.close(); } } ` ``` Consumer.java = ``` `import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; public class Consumer{ public static void main(String[] args) throws Exception { String serviceUrl = "pulsar://localhost:6650"; String topicName = "topic-Name"; String subscriptionName = "subscriptionName"; PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .build(); Consumer consumer1 = client.newConsumer() .topic(topicName) .subscriptionName(subscriptionName) .subscriptionType(SubscriptionType.Shared) // Use Shared subscription type .subscribe(); while (true) { System.out.println("from consumer 1=="+consumer1.receive()); } } } ` ``` Steps to reproduce: 1. Run consumer and topic will be created by itself 2. check the topic stats one consumer created in subscription name 3. stop the consumer and then run the producer and send 100 messages 4. Run consumer and check the topic stats now you will be able to see that a different consumer with different name and some time with same name created (if in consumerName parameter provided with the same consumer name which is already present) and 100 messages available in previous consumer but not in new consumer. Due to which not able to receive messages Queries: 1. Is there any api from which we can receive the messages from the specific consumer name 2. Is there any way to fetch the all the messages with is available in subscription from all the consumers GitHub link: https://github.com/apache/pulsar/discussions/22681#discussioncomment-9415732 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [PR] [improve][broker]Ensure namespace deletion doesn't fail [pulsar]
codecov-commenter commented on PR #22627: URL: https://github.com/apache/pulsar/pull/22627#issuecomment-2106767000 ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/22627?dropdown=coverage=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) Report Attention: Patch coverage is `43.13725%` with `29 lines` in your changes are missing coverage. Please review. > Project coverage is 32.47%. Comparing base [(`bbc6224`)](https://app.codecov.io/gh/apache/pulsar/commit/bbc62245c5ddba1de4b1e7cee4ab49334bc36277?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) to head [(`23c39c1`)](https://app.codecov.io/gh/apache/pulsar/pull/22627?dropdown=coverage=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache). > Report is 261 commits behind head on master. Additional details and impacted files [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/22627/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/22627?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) ```diff @@ Coverage Diff @@ ## master #22627 +/- ## = - Coverage 73.57% 32.47% -41.10% + Complexity32624 87-32537 = Files 1877 1745 -132 Lines139502 133886 -5616 Branches 1529914673 -626 = - Hits 10263843482-59156 - Misses2890884033+55125 + Partials 7956 6371 -1585 ``` | [Flag](https://app.codecov.io/gh/apache/pulsar/pull/22627/flags?src=pr=flags_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | Coverage Δ | | |---|---|---| | [inttests](https://app.codecov.io/gh/apache/pulsar/pull/22627/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `27.52% <43.13%> (+2.94%)` | :arrow_up: | | [systests](https://app.codecov.io/gh/apache/pulsar/pull/22627/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `24.80% <5.88%> (+0.47%)` | :arrow_up: | | [unittests](https://app.codecov.io/gh/apache/pulsar/pull/22627/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `?` | | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. | [Files](https://app.codecov.io/gh/apache/pulsar/pull/22627?dropdown=coverage=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | Coverage Δ | | |---|---|---| | [.../service/SystemTopicBasedTopicPoliciesService.java](https://app.codecov.io/gh/apache/pulsar/pull/22627?src=pr=tree=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2FSystemTopicBasedTopicPoliciesService.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL1N5c3RlbVRvcGljQmFzZWRUb3BpY1BvbGljaWVzU2VydmljZS5qYXZh) | `52.26% <100.00%> (-21.93%)` | :arrow_down: | | [...he/pulsar/metadata/impl/AbstractMetadataStore.java](https://app.codecov.io/gh/apache/pulsar/pull/22627?src=pr=tree=pulsar-metadata%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fmetadata%2Fimpl%2FAbstractMetadataStore.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLW1ldGFkYXRhL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9wdWxzYXIvbWV0YWRhdGEvaW1wbC9BYnN0cmFjdE1ldGFkYXRhU3RvcmUuamF2YQ==) | `55.31% <100.00%> (-29.25%)` | :arrow_down: | | [...ulsar/broker/resources/LocalPoliciesResources.java](https://app.codecov.io/gh/apache/pulsar/pull/22627?src=pr=tree=pulsar-broker-common%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fresources%2FLocalPoliciesResources.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3B1bHNhci9icm9rZXIvcmVzb3VyY2VzL0xvY2FsUG9saWNpZXNSZXNvdXJjZXMuamF2YQ==) | `44.82% <0.00%> (-31.04%)` | :arrow_down: | |
Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]
thetumbled commented on PR #22452: URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2106762113 PTAL, thanks. @poorbarcode -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug] Authentication using OpenID Connect assumes alg element under the OpenID Keys URL to be required. It is optional per specs. [pulsar]
lhotari commented on issue #22696: URL: https://github.com/apache/pulsar/issues/22696#issuecomment-2106749631 Duplicate of #22419 . The fix #22421 will be part of 3.0.5 and 3.2.3 releases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug] Authentication using OpenID Connect assumes alg element under the OpenID Keys URL to be required. It is optional per specs. [pulsar]
lhotari closed issue #22696: [Bug] Authentication using OpenID Connect assumes alg element under the OpenID Keys URL to be required. It is optional per specs. URL: https://github.com/apache/pulsar/issues/22696 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug] Deadlock in broker service while initializing bkClient [pulsar]
lhotari commented on issue #22699: URL: https://github.com/apache/pulsar/issues/22699#issuecomment-2106746698 Thanks for the issue report. Is this similar to #20148 which is fixed by #21096 ? [In a Slack thread](https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1714221812908679?thread_ts=1714066287.810589=C5Z4T36F7) I made these comments some time ago: > The deadlock issue might be caused by https://github.com/apache/pulsar/pull/18672 . > It must be a different problem. The thread dump was very useful. the line numbers seemed to match 2.10.6 . One possible way to solve the problem would be to change thenAccept on this line [https://github.com/apache/pulsar/blob/c1d8630b13e782935def3c4b12b59ae9aa8e5541/pul[…]c/main/java/org/apache/pulsar/broker/service/BrokerService.java](https://github.com/apache/pulsar/blob/c1d8630b13e782935def3c4b12b59ae9aa8e5541/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L1511) to thenAcceptAsync . That would prevent the metadata store getting blocked and essentially dead locked. > it seems that the same bug is also in the master branch so it will be useful to report it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org