Re: [PR] [fix][client] the nullValue in msgMetadata should be true by default [pulsar]
Technoboy- commented on code in PR #22372: URL: https://github.com/apache/pulsar/pull/22372#discussion_r1542337533 ## pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java: ## @@ -144,6 +145,7 @@ public TypedMessageBuilder value(T value) { msgMetadata.setNullValue(true); return this; } +msgMetadata.setNullValue(false); Review Comment: Also we can do this in the `getMessage` before send. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [Improve] Add admin api HealthCheckWithTopicVersion [pulsar-client-go]
crossoverJie opened a new pull request, #1200: URL: https://github.com/apache/pulsar-client-go/pull/1200 ### Motivation To keep consistent with the Java client. Releted PR: https://github.com/apache/pulsar/pull/11268 ### Modifications Add `HealthCheckWithTopicVersion` interface. ### Verifying this change - [x] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - Dependencies (does it add or upgrade a dependency): (no) - The public API: (yes) - The schema: (no) - The default values of configurations: (no) - The wire protocol: (no) ### Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (GoDocs) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription (#22359)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 6f9c8e7f70e [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription (#22359) 6f9c8e7f70e is described below commit 6f9c8e7f70ec201d65c7fc270480bed3aa3b5aba Author: sherlock-lin <1193179...@qq.com> AuthorDate: Thu Mar 28 11:58:47 2024 +0800 [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription (#22359) --- .../pulsar/broker/service/nonpersistent/NonPersistentSubscription.java | 3 +-- .../pulsar/broker/service/persistent/PersistentSubscription.java | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index 92aba6221da..cfe05cc32b7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -40,7 +40,6 @@ import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFence import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.GetStatsOptions; -import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; @@ -53,7 +52,7 @@ import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class NonPersistentSubscription extends AbstractSubscription implements Subscription { +public class NonPersistentSubscription extends AbstractSubscription { private final NonPersistentTopic topic; private volatile NonPersistentDispatcher dispatcher; private final String topicName; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 50e84310ac1..6e8e94baeae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -97,7 +97,7 @@ import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PersistentSubscription extends AbstractSubscription implements Subscription { +public class PersistentSubscription extends AbstractSubscription { protected final PersistentTopic topic; protected final ManagedCursor cursor; protected volatile Dispatcher dispatcher;
Re: [PR] [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription [pulsar]
Technoboy- merged PR #22359: URL: https://github.com/apache/pulsar/pull/22359 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Cleanup] PersistentSubscription duplicate implementation interface Subscription [pulsar]
Technoboy- closed issue #22354: [Cleanup] PersistentSubscription duplicate implementation interface Subscription URL: https://github.com/apache/pulsar/issues/22354 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription [pulsar]
codecov-commenter commented on PR #22359: URL: https://github.com/apache/pulsar/pull/22359#issuecomment-2024335935 ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/22359?dropdown=coverage=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) Report All modified and coverable lines are covered by tests :white_check_mark: > Project coverage is 73.63%. Comparing base [(`bbc6224`)](https://app.codecov.io/gh/apache/pulsar/commit/bbc62245c5ddba1de4b1e7cee4ab49334bc36277?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) to head [(`fcde506`)](https://app.codecov.io/gh/apache/pulsar/pull/22359?dropdown=coverage=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache). > Report is 99 commits behind head on master. Additional details and impacted files [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/22359/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/22359?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) ```diff @@ Coverage Diff @@ ## master #22359 +/- ## + Coverage 73.57% 73.63% +0.06% + Complexity3262432125 -499 Files 1877 1879 +2 Lines139502 140277 +775 Branches 1529915571 +272 + Hits 102638 103299 +661 - Misses2890828967 +59 - Partials 7956 8011 +55 ``` | [Flag](https://app.codecov.io/gh/apache/pulsar/pull/22359/flags?src=pr=flags_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | Coverage Δ | | |---|---|---| | [inttests](https://app.codecov.io/gh/apache/pulsar/pull/22359/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `27.28% <ø> (+2.69%)` | :arrow_up: | | [systests](https://app.codecov.io/gh/apache/pulsar/pull/22359/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `24.62% <ø> (+0.30%)` | :arrow_up: | | [unittests](https://app.codecov.io/gh/apache/pulsar/pull/22359/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `72.88% <ø> (+0.03%)` | :arrow_up: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. | [Files](https://app.codecov.io/gh/apache/pulsar/pull/22359?dropdown=coverage=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | Coverage Δ | | |---|---|---| | [...rvice/nonpersistent/NonPersistentSubscription.java](https://app.codecov.io/gh/apache/pulsar/pull/22359?src=pr=tree=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2Fnonpersistent%2FNonPersistentSubscription.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL25vbnBlcnNpc3RlbnQvTm9uUGVyc2lzdGVudFN1YnNjcmlwdGlvbi5qYXZh) | `53.27% <ø> (ø)` | | | [...ker/service/persistent/PersistentSubscription.java](https://app.codecov.io/gh/apache/pulsar/pull/22359?src=pr=tree=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2Fpersistent%2FPersistentSubscription.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvUGVyc2lzdGVudFN1YnNjcmlwdGlvbi5qYXZh) | `76.72% <ø> (+0.03%)` | :arrow_up: | ... and [150 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/22359/indirect-changes?src=pr=tree-more_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription [pulsar]
Technoboy- commented on PR #22359: URL: https://github.com/apache/pulsar/pull/22359#issuecomment-2024318822 > This is my first pr, if the merge in can give me more confidence, I'll try to avoid mentioning low value pr taking up resources until the technical debt clears up. Yes, we know. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-client-cpp) annotated tag v3.5.1-candidate-1 updated (0ba3d7d -> ea2729a)
This is an automated email from the ASF dual-hosted git repository. xyz pushed a change to annotated tag v3.5.1-candidate-1 in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git *** WARNING: tag v3.5.1-candidate-1 was modified! *** from 0ba3d7d (commit) to ea2729a (tag) tagging 0ba3d7d8e9f7d3fb2f2cc3cdf918bb63c4791afc (commit) replaces v3.5.0 by Yunze Xu on Thu Mar 28 10:50:13 2024 +0800 - Log - Release v3.5.1-candidate-1 -BEGIN PGP SIGNATURE- iQJDBAABCgAtFiEEn+m0+KLf1EiRy6J0Qrtq+2zSb6YFAmYE2uYPHHh5ekBhcGFj aGUub3JnAAoJEEK7avts0m+m36gP/1AjcqllVVeCx8cFtCSqRrOB5Uw+MZOk6UDY 1McU/ri08XdMEpv770ALUz8oWB1C0MTCuJzktsvFP+ju/QtMGvBilXLeMGxNsz6v vPl4OV/a3O1axYDy8pRtC88E4xFB4wqXNfBdZ+lgIOfyrNvS9aYYhxBTrvyplDWb DYfnQpBsA38wbMdA+S3OnQhxKjSzcvO1u6avhNmtZUF1poR85ddewxfNGAsCY1cR qG/QW0hImIWP0fhpwMgalT5+3BfJKDDFG+m2xk2N5MA2Wc9Kb4/UEURLWzyL0lGH scLZ9HujYPsbsXv89St0qB0ZNI+J9lFlxnzWd/0gWnK1KcX2mKr5evXAQ0lBYRgS I0LIw3H4cXoWRGOxbrd/jxW1p5tKYvrsZsXhDHgwct6W0YNwFDSrfiWLM0v6kwKA x8vJUlQnUF0/CXg5Shf8m0BbpL2TQA6AwburHYTshB0jKEtSbmMRJAdrlD1gRc8Y qIE5V+tQnFTEvL948WxugcyMOhXpOKGu3Z1EnDOedAohDR9Zhds34+M322TGZgdt G+N97geORkkBdS5pTEvdtg/4lEgasBddphCS2EFPvK1q5kNO/6Oc+oLKmzpoGe9B 9Z1i6ll6cYApsdkHRzrSzuspdtc/gC7NuA+zqb977jJkeDjn21d8Hvm+XtFdXX7t M2wbz/mq =oWoD -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
Re: [PR] [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription [pulsar]
sherlock-lin commented on PR #22359: URL: https://github.com/apache/pulsar/pull/22359#issuecomment-2024304577 This is my first pr, if the merge in can give me more confidence, I'll try to avoid mentioning low value pr taking up resources until the technical debt clears up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-client-cpp) branch branch-3.5 updated (916af95 -> 0ba3d7d)
This is an automated email from the ASF dual-hosted git repository. xyz pushed a change to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git from 916af95 Merge branch 'main' into branch-3.5 new e7793d6 Support customize vcpkg directory when INTEGRATE_VCPKG is ON (#417) new 0e0ca8d Fix broken wireshark build workflow on macOS (#414) new 7cfd775 Fix minor issues reported by CodeQL (#421) new b2ad352 Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp (#422) new 0ba3d7d Release 3.5.1 The 5 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .github/workflows/ci-pr-validation.yaml | 10 CMakeLists.txt | 4 +- README.md | 13 + lib/ConsumerImpl.cc | 41 +--- lib/ConsumerImpl.h | 15 +- lib/lz4/lz4.cc | 6 +-- perf/PerfProducer.cc| 2 +- tests/ReaderTest.cc | 84 ++--- version.txt | 2 +- 9 files changed, 135 insertions(+), 42 deletions(-)
(pulsar-client-cpp) 04/05: Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp (#422)
This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git commit b2ad3525eb762de5b92cc2af92cbe2e02c3e90cd Author: Yunze Xu AuthorDate: Thu Mar 28 10:36:12 2024 +0800 Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp (#422) Fixes https://github.com/apache/pulsar-client-cpp/issues/420 It's a catch-up for https://github.com/apache/pulsar/pull/22363 (cherry picked from commit 27d8cc01d5b7cfce94a7d71259dca5cea83ae01e) --- lib/ConsumerImpl.cc | 41 ++ lib/ConsumerImpl.h | 15 +- tests/ReaderTest.cc | 84 ++--- 3 files changed, 104 insertions(+), 36 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index ebc8518..1a0b0cb 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -1050,7 +1050,9 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) { */ void ConsumerImpl::clearReceiveQueue() { if (duringSeek()) { -startMessageId_ = seekMessageId_.get(); +if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) { +startMessageId_ = seekMessageId_.get(); +} SeekStatus expected = SeekStatus::COMPLETED; if (seekStatus_.compare_exchange_strong(expected, SeekStatus::NOT_STARTED)) { auto seekCallback = seekCallback_.release(); @@ -1476,7 +1478,7 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) { return; } const auto requestId = client->newRequestId(); -seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), msgId, 0L, callback); +seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), SeekArg{msgId}, callback); } void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { @@ -1495,8 +1497,8 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { return; } const auto requestId = client->newRequestId(); -seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), MessageId::earliest(), - timestamp, callback); +seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), SeekArg{timestamp}, + callback); } bool ConsumerImpl::isReadCompacted() { return readCompacted_; } @@ -1509,7 +1511,7 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback (lastDequedMessageId_ == MessageId::earliest()) && (startMessageId_.get().value_or(MessageId::earliest()) == MessageId::latest()); } -if (compareMarkDeletePosition) { +if (compareMarkDeletePosition || hasSoughtByTimestamp_.load(std::memory_order_acquire)) { auto self = get_shared_this_ptr(); getLastMessageIdAsync([self, callback](Result result, const GetLastMessageIdResponse& response) { if (result != ResultOk) { @@ -1518,8 +1520,8 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback } auto handleResponse = [self, response, callback] { if (response.hasMarkDeletePosition() && response.getLastMessageId().entryId() >= 0) { -// We only care about comparing ledger ids and entry ids as mark delete position doesn't -// have other ids such as batch index +// We only care about comparing ledger ids and entry ids as mark delete position +// doesn't have other ids such as batch index auto compareResult = compareLedgerAndEntryId(response.getMarkDeletePosition(), response.getLastMessageId()); callback(ResultOk, self->config_.isStartMessageIdInclusive() ? compareResult <= 0 @@ -1528,7 +1530,8 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback callback(ResultOk, false); } }; -if (self->config_.isStartMessageIdInclusive()) { +if (self->config_.isStartMessageIdInclusive() && +!self->hasSoughtByTimestamp_.load(std::memory_order_acquire)) { self->seekAsync(response.getLastMessageId(), [callback, handleResponse](Result result) { if (result != ResultOk) { callback(result, {}); @@ -1644,8 +1647,8 @@ bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ == uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; } -void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId, - long
(pulsar-client-cpp) 03/05: Fix minor issues reported by CodeQL (#421)
This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git commit 7cfd77506c83f259c69b440082d3b558b60695e4 Author: Matteo Merli AuthorDate: Wed Mar 27 02:47:42 2024 -0700 Fix minor issues reported by CodeQL (#421) (cherry picked from commit 763b85c6c4b9bb648b9f7cf62f9ed09f04f3decb) --- lib/lz4/lz4.cc | 6 +++--- perf/PerfProducer.cc | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/lz4/lz4.cc b/lib/lz4/lz4.cc index d63b977..2f98fb3 100644 --- a/lib/lz4/lz4.cc +++ b/lib/lz4/lz4.cc @@ -1175,9 +1175,9 @@ FORCE_INLINE int LZ4_decompress_generic( s = *ip++; length += s; } while (likely((endOnInput) ? ip < iend - RUN_MASK : 1) && (s == 255)); -if ((safeDecode) && unlikely((size_t)(op + length) < (size_t)(op))) +if ((safeDecode) && unlikely(length >= (size_t)(oend - op))) goto _output_error; /* overflow detection */ -if ((safeDecode) && unlikely((size_t)(ip + length) < (size_t)(ip))) +if ((safeDecode) && unlikely(length >= (size_t)(iend - ip))) goto _output_error; /* overflow detection */ } @@ -1220,7 +1220,7 @@ FORCE_INLINE int LZ4_decompress_generic( s = *ip++; length += s; } while (s == 255); -if ((safeDecode) && unlikely((size_t)(op + length) < (size_t)op)) +if ((safeDecode) && unlikely(length >= (size_t)(oend - op))) goto _output_error; /* overflow detection */ } length += MINMATCH; diff --git a/perf/PerfProducer.cc b/perf/PerfProducer.cc index aeda8e8..cbfef68 100644 --- a/perf/PerfProducer.cc +++ b/perf/PerfProducer.cc @@ -160,7 +160,7 @@ void startPerfProducer(const Arguments& args, pulsar::ProducerConfiguration& pro limiter = std::make_shared(args.rate); } -producerList.resize(args.numTopics * args.numProducers); +producerList.resize((size_t)args.numTopics * args.numProducers); for (int i = 0; i < args.numTopics; i++) { std::string topic = (args.numTopics == 1) ? args.topic : args.topic + "-" + std::to_string(i); LOG_INFO("Adding " << args.numProducers << " producers on topic " << topic);
(pulsar-client-cpp) 01/05: Support customize vcpkg directory when INTEGRATE_VCPKG is ON (#417)
This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git commit e7793d6752a6bbfd5f526ecf207bd3738179f29e Author: Yunze Xu AuthorDate: Thu Mar 14 20:11:26 2024 +0800 Support customize vcpkg directory when INTEGRATE_VCPKG is ON (#417) ### Motivation Currently when INTEGRATE_VCPKG is ON, the CMAKE_TOOLCHAIN_FILE variable is always a subdirectory of `${CMAKE_SOURCE_DIR}/vcpkg`. We can only customize the vcpkg directory when INTEGRATE_VCPKG is OFF, while the legacy CMake logic is incompatible with this way. ### Modifications When INTEGRATE_VCPKG is ON, only set CMAKE_TOOLCHAIN_FILE if it's not defined. The workflow and README are updated for it. (cherry picked from commit 821871777e247e1ccbfa323ea0d5136cf0e18711) --- .github/workflows/ci-pr-validation.yaml | 5 + CMakeLists.txt | 4 +++- README.md | 13 + 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci-pr-validation.yaml b/.github/workflows/ci-pr-validation.yaml index d9dc2ca..5010411 100644 --- a/.github/workflows/ci-pr-validation.yaml +++ b/.github/workflows/ci-pr-validation.yaml @@ -114,6 +114,11 @@ jobs: cmake . -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON -DBUILD_PERF_TOOLS=ON cmake --build . -j8 + - name: Verify custom vcpkg installation +run: | + mv vcpkg /tmp/ + cmake -B build -DINTEGRATE_VCPKG=ON -DCMAKE_TOOLCHAIN_FILE="/tmp/vcpkg/scripts/buildsystems/vcpkg.cmake" + cpp20-build: name: Build with the C++20 standard needs: formatting-check diff --git a/CMakeLists.txt b/CMakeLists.txt index 662e84a..b004653 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -24,7 +24,9 @@ option(USE_ASIO "Use Asio instead of Boost.Asio" OFF) option(INTEGRATE_VCPKG "Integrate with Vcpkg" OFF) if (INTEGRATE_VCPKG) set(USE_ASIO ON) -set(CMAKE_TOOLCHAIN_FILE "${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake") +if (NOT CMAKE_TOOLCHAIN_FILE) +set(CMAKE_TOOLCHAIN_FILE "${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake") +endif () endif () option(BUILD_TESTS "Build tests" ON) diff --git a/README.md b/README.md index f0c13f0..4c86b63 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,8 @@ Since it's integrated with vcpkg, see [vcpkg#README](https://github.com/microsof ### How to build from source +The simplest way is to clone this project with the vcpkg submodule. + ```bash git clone https://github.com/apache/pulsar-client-cpp.git cd pulsar-client-cpp @@ -57,6 +59,17 @@ cmake --build build -j8 The 1st step will download vcpkg and then install all dependencies according to the version description in [vcpkg.json](./vcpkg.json). The 2nd step will build the Pulsar C++ libraries under `./build/lib/`, where `./build` is the CMake build directory. +> You can also add the CMAKE_TOOLCHAIN_FILE option if your system already have vcpkg installed. +> +> ```bash +> git clone https://github.com/apache/pulsar-client-cpp.git +> cd pulsar-client-cpp +> # For example, you can install vcpkg in /tmp/vcpkg +> cd /tmp && git clone https://github.com/microsoft/vcpkg.git && cd - +> cmake -B build -DINTEGRATE_VCPKG=ON -DCMAKE_TOOLCHAIN_FILE="/tmp/vcpkg/scripts/buildsystems/vcpkg.cmake" +> cmake --build build -j8 +> ``` + After the build, the hierarchy of the `build` directory will be: ```
(pulsar-client-cpp) 05/05: Release 3.5.1
This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git commit 0ba3d7d8e9f7d3fb2f2cc3cdf918bb63c4791afc Author: Yunze Xu AuthorDate: Thu Mar 28 10:45:07 2024 +0800 Release 3.5.1 --- version.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.txt b/version.txt index 1545d96..d5c0c99 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -3.5.0 +3.5.1
(pulsar-client-cpp) 02/05: Fix broken wireshark build workflow on macOS (#414)
This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git commit 0e0ca8da3e7cabf76adb5d5ae30283f909e0aaa3 Author: Yunze Xu AuthorDate: Sun Mar 17 23:10:15 2024 +0800 Fix broken wireshark build workflow on macOS (#414) ### Motivation See https://github.com/apache/pulsar-client-cpp/actions/runs/8276076995/job/22644077705 ``` Error: The `brew link` step did not complete successfully The formula built, but is not symlinked into /usr/local Could not symlink bin/2to3 ``` `brew install` failed because the wireshark dependency depends on python@3.12 and it failed at `brew link`. ### Modifications Remove the existing binaries that might conflict. (cherry picked from commit c513f29fadce86bacaf4f878d8d25064f7560083) --- .github/workflows/ci-pr-validation.yaml | 5 + 1 file changed, 5 insertions(+) diff --git a/.github/workflows/ci-pr-validation.yaml b/.github/workflows/ci-pr-validation.yaml index 5010411..9f113b0 100644 --- a/.github/workflows/ci-pr-validation.yaml +++ b/.github/workflows/ci-pr-validation.yaml @@ -64,6 +64,11 @@ jobs: run: | # See https://github.com/Homebrew/homebrew-core/issues/157142 export HOMEBREW_NO_INSTALLED_DEPENDENTS_CHECK=1 + cd /usr/local/bin + rm -f 2to3* idle3* pydoc* python3* + rm -f /usr/local/share/man/man1/python3.1 /usr/local/lib/pkgconfig/python3* + cd /usr/local/Frameworks/Python.framework + rm -rf Headers Python Resources Versions/Current brew update brew install pkg-config wireshark protobuf - name: Build wireshark plugin
Re: [PR] [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription [pulsar]
Technoboy- commented on PR #22359: URL: https://github.com/apache/pulsar/pull/22359#issuecomment-2024299728 > > I don't agree with you for this patch. This isn't a problem, but from the Java interface definition, it's better to do like this. > > @Technoboy- what is it that you don't agree? I agree that it is "better". When something is "better", this implies that it produces some benefit. In this case, what is it that it makes better? The question is then whether this benefit is relevant when we are considering how Pulsar is currently maintained. I've provided my opinion about formatting changes in this mailing list post: https://lists.apache.org/thread/lo15cdzsl740dwgcqwpsl9oy9qb13onv . After we settle on a maintenance strategy that reduces merge conflicts for maintaining the LTS branch, we have more freedom to do refactorings which we are currently avoiding because of the merge conflicts. Merge conflicts caused by unnecessary changes are the main reason for my resistance. That could be resolved with an improved maintenance strategy. #22295 don't have any better, but just no `anything harmful.` And you merge it. From my side, you merge the pr you create, but others give some suggestion and leave the pr there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-client-cpp) branch main updated: Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp (#422)
This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git The following commit(s) were added to refs/heads/main by this push: new 27d8cc0 Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp (#422) 27d8cc0 is described below commit 27d8cc01d5b7cfce94a7d71259dca5cea83ae01e Author: Yunze Xu AuthorDate: Thu Mar 28 10:36:12 2024 +0800 Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp (#422) Fixes https://github.com/apache/pulsar-client-cpp/issues/420 It's a catch-up for https://github.com/apache/pulsar/pull/22363 --- lib/ConsumerImpl.cc | 41 ++ lib/ConsumerImpl.h | 15 +- tests/ReaderTest.cc | 84 ++--- 3 files changed, 104 insertions(+), 36 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 7d5250e..e5df421 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -1050,7 +1050,9 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) { */ void ConsumerImpl::clearReceiveQueue() { if (duringSeek()) { -startMessageId_ = seekMessageId_.get(); +if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) { +startMessageId_ = seekMessageId_.get(); +} SeekStatus expected = SeekStatus::COMPLETED; if (seekStatus_.compare_exchange_strong(expected, SeekStatus::NOT_STARTED)) { auto seekCallback = seekCallback_.release(); @@ -1476,7 +1478,7 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) { return; } const auto requestId = client->newRequestId(); -seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), msgId, 0L, callback); +seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), SeekArg{msgId}, callback); } void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { @@ -1495,8 +1497,8 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { return; } const auto requestId = client->newRequestId(); -seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), MessageId::earliest(), - timestamp, callback); +seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), SeekArg{timestamp}, + callback); } bool ConsumerImpl::isReadCompacted() { return readCompacted_; } @@ -1509,7 +1511,7 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback (lastDequedMessageId_ == MessageId::earliest()) && (startMessageId_.get().value_or(MessageId::earliest()) == MessageId::latest()); } -if (compareMarkDeletePosition) { +if (compareMarkDeletePosition || hasSoughtByTimestamp_.load(std::memory_order_acquire)) { auto self = get_shared_this_ptr(); getLastMessageIdAsync([self, callback](Result result, const GetLastMessageIdResponse& response) { if (result != ResultOk) { @@ -1518,8 +1520,8 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback } auto handleResponse = [self, response, callback] { if (response.hasMarkDeletePosition() && response.getLastMessageId().entryId() >= 0) { -// We only care about comparing ledger ids and entry ids as mark delete position doesn't -// have other ids such as batch index +// We only care about comparing ledger ids and entry ids as mark delete position +// doesn't have other ids such as batch index auto compareResult = compareLedgerAndEntryId(response.getMarkDeletePosition(), response.getLastMessageId()); callback(ResultOk, self->config_.isStartMessageIdInclusive() ? compareResult <= 0 @@ -1528,7 +1530,8 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback callback(ResultOk, false); } }; -if (self->config_.isStartMessageIdInclusive()) { +if (self->config_.isStartMessageIdInclusive() && +!self->hasSoughtByTimestamp_.load(std::memory_order_acquire)) { self->seekAsync(response.getLastMessageId(), [callback, handleResponse](Result result) { if (result != ResultOk) { callback(result, {}); @@ -1644,8 +1647,8 @@ bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ == uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; } -void
Re: [I] [Bug] Regression of seeking by timestamp in 3.5.0 [pulsar-client-cpp]
BewareMyPower closed issue #420: [Bug] Regression of seeking by timestamp in 3.5.0 URL: https://github.com/apache/pulsar-client-cpp/issues/420 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp [pulsar-client-cpp]
BewareMyPower merged PR #422: URL: https://github.com/apache/pulsar-client-cpp/pull/422 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [fix][broker] Fix typos in PersistentTopic class (#22364)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 32037c3b098 [fix][broker] Fix typos in PersistentTopic class (#22364) 32037c3b098 is described below commit 32037c3b0982aa00a7cb5ee7e17a6b235a8c2d7f Author: hanmz AuthorDate: Thu Mar 28 10:31:09 2024 +0800 [fix][broker] Fix typos in PersistentTopic class (#22364) --- .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 6179e73169f..1650e449a3f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2753,7 +2753,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal ledger.asyncMigrate(); } if (log.isDebugEnabled()) { -log.debug("{} has replication backlog and applied migraiton", topic); +log.debug("{} has replication backlog and applied migration", topic); } return CompletableFuture.completedFuture(null); }
Re: [PR] [fix][broker] Fix typos in PersistentTopic class [pulsar]
Technoboy- merged PR #22364: URL: https://github.com/apache/pulsar/pull/22364 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] There is a memory leak,I need help [pulsar-client-python]
kansnow closed issue #208: There is a memory leak,I need help URL: https://github.com/apache/pulsar-client-python/issues/208 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] There is a memory leak,I need help [pulsar-client-python]
kansnow commented on issue #208: URL: https://github.com/apache/pulsar-client-python/issues/208#issuecomment-2024274127 > Can you try with latest version 3.4.0? https://pypi.org/project/pulsar-client/ Thank you very much. I used the top command to check the memory usage of the process. After upgrading the version, the memory did not grow anymore -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] There is a memory leak,I need help [pulsar-client-python]
kansnow commented on issue #208: URL: https://github.com/apache/pulsar-client-python/issues/208#issuecomment-2024238645 > Can you try with latest version 3.4.0? https://pypi.org/project/pulsar-client/ ok, I'll try upgrading the version -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) 01/02: [fix][broker] Check cursor state before adding it to the `waitingCursors` (#22191)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 622c87d1e82163c8b42281c0c7a40e3ad79dc0df Author: Jiwei Guo AuthorDate: Thu Mar 28 06:53:21 2024 +0800 [fix][broker] Check cursor state before adding it to the `waitingCursors` (#22191) (cherry picked from commit b702d440dc5e5a4cfd845bf60d5e310efe665ff5) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 + .../service/persistent/PersistentTopicTest.java| 46 ++ 3 files changed, 57 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index da013c07313..0da71e89018 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -987,7 +987,7 @@ public class ManagedCursorImpl implements ManagedCursor { name); } // Let the managed ledger know we want to be notified whenever a new entry is published -ledger.waitingCursors.add(this); +ledger.addWaitingCursor(this); } else { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Skip notification registering since we do have entries available", diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 75ac4dd4c0a..ee8e7c430ef 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3803,6 +3803,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { this.waitingCursors.remove(cursor); } +public void addWaitingCursor(ManagedCursorImpl cursor) { +if (cursor instanceof NonDurableCursorImpl) { +if (cursor.isActive()) { +this.waitingCursors.add(cursor); +} +} else { +this.waitingCursors.add(cursor); +} +} + public boolean isCursorActive(ManagedCursor cursor) { return activeCursors.get(cursor.getName()) != null; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 717dfc28ac8..618fe8006a6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -57,10 +57,15 @@ import java.util.function.Supplier; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.stats.PrometheusMetricsTest; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; @@ -74,6 +79,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; @@ -661,4 +667,44 @@ public class PersistentTopicTest extends BrokerTestBase { subscribe.close(); admin.topics().delete(topicName); } + +@Test +public void testAddWaitingCursorsForNonDurable() throws Exception { +final String ns = "prop/ns-test"; +admin.namespaces().createNamespace(ns, 2); +final String topicName = "persistent://prop/ns-test/testAddWaitingCursors"; +admin.topics().createNonPartitionedTopic(topicName); +final Optional topic = pulsar.getBrokerService().getTopic(topicName, false).join(); +
(pulsar) branch branch-3.2 updated (f56c383a531 -> d9068698da4)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git from f56c383a531 [fix][broker] Avoid expired unclosed ledgers when checking expired messages by ledger closure time (#22335) new 622c87d1e82 [fix][broker] Check cursor state before adding it to the `waitingCursors` (#22191) new d9068698da4 [improve][misc] Remove the call to sun InetAddressCachePolicy (#22329) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 + .../service/persistent/PersistentTopicTest.java| 46 +++ .../pulsar/common/util/netty/DnsResolverUtil.java | 52 -- .../pulsar/common/util/netty/DnsResolverTest.java | 44 ++ 5 files changed, 139 insertions(+), 15 deletions(-)
(pulsar) 02/02: [improve][misc] Remove the call to sun InetAddressCachePolicy (#22329)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit d9068698da48f826cea2c8d5075ce95497cecc94 Author: Jian Yun AuthorDate: Thu Mar 28 06:59:28 2024 +0800 [improve][misc] Remove the call to sun InetAddressCachePolicy (#22329) Co-authored-by: Lari Hotari (cherry picked from commit cce0b058efd55e2d5ac42c4ecaceddacee648a7c) --- .../pulsar/common/util/netty/DnsResolverUtil.java | 52 -- .../pulsar/common/util/netty/DnsResolverTest.java | 44 ++ 2 files changed, 82 insertions(+), 14 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java index f49a6453c72..bcff83acd94 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java @@ -19,12 +19,20 @@ package org.apache.pulsar.common.util.netty; import io.netty.resolver.dns.DnsNameResolverBuilder; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; +import java.security.Security; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; @Slf4j public class DnsResolverUtil { + +private static final String CACHE_POLICY_PROP = "networkaddress.cache.ttl"; +private static final String CACHE_POLICY_PROP_FALLBACK = "sun.net.inetaddr.ttl"; +private static final String NEGATIVE_CACHE_POLICY_PROP = "networkaddress.cache.negative.ttl"; +private static final String NEGATIVE_CACHE_POLICY_PROP_FALLBACK = "sun.net.inetaddr.negative.ttl"; +/* default ttl value from sun.net.InetAddressCachePolicy.DEFAULT_POSITIVE, which is used when no security manager + is used */ +private static final int JDK_DEFAULT_TTL = 30; private static final int MIN_TTL = 0; private static final int TTL; private static final int NEGATIVE_TTL; @@ -39,19 +47,35 @@ public class DnsResolverUtil { int ttl = DEFAULT_TTL; int negativeTtl = DEFAULT_NEGATIVE_TTL; try { -// use reflection to call sun.net.InetAddressCachePolicy's get and getNegative methods for getting -// effective JDK settings for DNS caching -Class inetAddressCachePolicyClass = Class.forName("sun.net.InetAddressCachePolicy"); -Method getTTLMethod = inetAddressCachePolicyClass.getMethod("get"); -ttl = (Integer) getTTLMethod.invoke(null); -Method getNegativeTTLMethod = inetAddressCachePolicyClass.getMethod("getNegative"); -negativeTtl = (Integer) getNegativeTTLMethod.invoke(null); -} catch (NoSuchMethodException | ClassNotFoundException | InvocationTargetException - | IllegalAccessException e) { -log.warn("Cannot get DNS TTL settings from sun.net.InetAddressCachePolicy class", e); +String ttlStr = Security.getProperty(CACHE_POLICY_PROP); +if (ttlStr == null) { +// Compatible with sun.net.inetaddr.ttl settings +ttlStr = System.getProperty(CACHE_POLICY_PROP_FALLBACK); +} +String negativeTtlStr = Security.getProperty(NEGATIVE_CACHE_POLICY_PROP); +if (negativeTtlStr == null) { +// Compatible with sun.net.inetaddr.negative.ttl settings +negativeTtlStr = System.getProperty(NEGATIVE_CACHE_POLICY_PROP_FALLBACK); +} +ttl = Optional.ofNullable(ttlStr) +.map(Integer::decode) +.filter(i -> i > 0) +.orElseGet(() -> { +if (System.getSecurityManager() == null) { +return JDK_DEFAULT_TTL; +} +return DEFAULT_TTL; +}); + +negativeTtl = Optional.ofNullable(negativeTtlStr) +.map(Integer::decode) +.filter(i -> i >= 0) +.orElse(DEFAULT_NEGATIVE_TTL); +} catch (NumberFormatException e) { +log.warn("Cannot get DNS TTL settings", e); } -TTL = ttl <= 0 ? DEFAULT_TTL : ttl; -NEGATIVE_TTL = negativeTtl < 0 ? DEFAULT_NEGATIVE_TTL : negativeTtl; +TTL = ttl; +NEGATIVE_TTL = negativeTtl; } private DnsResolverUtil() { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java index 0ccb960e798..46599cc45a0 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
(pulsar) 02/02: [improve][misc] Remove the call to sun InetAddressCachePolicy (#22329)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 1b9ae2e8cb90478d03c581250e28b633aa960957 Author: Jian Yun AuthorDate: Thu Mar 28 06:59:28 2024 +0800 [improve][misc] Remove the call to sun InetAddressCachePolicy (#22329) Co-authored-by: Lari Hotari (cherry picked from commit cce0b058efd55e2d5ac42c4ecaceddacee648a7c) --- .../pulsar/common/util/netty/DnsResolverUtil.java | 52 -- .../pulsar/common/util/netty/DnsResolverTest.java | 44 ++ 2 files changed, 82 insertions(+), 14 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java index f49a6453c72..bcff83acd94 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java @@ -19,12 +19,20 @@ package org.apache.pulsar.common.util.netty; import io.netty.resolver.dns.DnsNameResolverBuilder; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; +import java.security.Security; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; @Slf4j public class DnsResolverUtil { + +private static final String CACHE_POLICY_PROP = "networkaddress.cache.ttl"; +private static final String CACHE_POLICY_PROP_FALLBACK = "sun.net.inetaddr.ttl"; +private static final String NEGATIVE_CACHE_POLICY_PROP = "networkaddress.cache.negative.ttl"; +private static final String NEGATIVE_CACHE_POLICY_PROP_FALLBACK = "sun.net.inetaddr.negative.ttl"; +/* default ttl value from sun.net.InetAddressCachePolicy.DEFAULT_POSITIVE, which is used when no security manager + is used */ +private static final int JDK_DEFAULT_TTL = 30; private static final int MIN_TTL = 0; private static final int TTL; private static final int NEGATIVE_TTL; @@ -39,19 +47,35 @@ public class DnsResolverUtil { int ttl = DEFAULT_TTL; int negativeTtl = DEFAULT_NEGATIVE_TTL; try { -// use reflection to call sun.net.InetAddressCachePolicy's get and getNegative methods for getting -// effective JDK settings for DNS caching -Class inetAddressCachePolicyClass = Class.forName("sun.net.InetAddressCachePolicy"); -Method getTTLMethod = inetAddressCachePolicyClass.getMethod("get"); -ttl = (Integer) getTTLMethod.invoke(null); -Method getNegativeTTLMethod = inetAddressCachePolicyClass.getMethod("getNegative"); -negativeTtl = (Integer) getNegativeTTLMethod.invoke(null); -} catch (NoSuchMethodException | ClassNotFoundException | InvocationTargetException - | IllegalAccessException e) { -log.warn("Cannot get DNS TTL settings from sun.net.InetAddressCachePolicy class", e); +String ttlStr = Security.getProperty(CACHE_POLICY_PROP); +if (ttlStr == null) { +// Compatible with sun.net.inetaddr.ttl settings +ttlStr = System.getProperty(CACHE_POLICY_PROP_FALLBACK); +} +String negativeTtlStr = Security.getProperty(NEGATIVE_CACHE_POLICY_PROP); +if (negativeTtlStr == null) { +// Compatible with sun.net.inetaddr.negative.ttl settings +negativeTtlStr = System.getProperty(NEGATIVE_CACHE_POLICY_PROP_FALLBACK); +} +ttl = Optional.ofNullable(ttlStr) +.map(Integer::decode) +.filter(i -> i > 0) +.orElseGet(() -> { +if (System.getSecurityManager() == null) { +return JDK_DEFAULT_TTL; +} +return DEFAULT_TTL; +}); + +negativeTtl = Optional.ofNullable(negativeTtlStr) +.map(Integer::decode) +.filter(i -> i >= 0) +.orElse(DEFAULT_NEGATIVE_TTL); +} catch (NumberFormatException e) { +log.warn("Cannot get DNS TTL settings", e); } -TTL = ttl <= 0 ? DEFAULT_TTL : ttl; -NEGATIVE_TTL = negativeTtl < 0 ? DEFAULT_NEGATIVE_TTL : negativeTtl; +TTL = ttl; +NEGATIVE_TTL = negativeTtl; } private DnsResolverUtil() { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java index 0ccb960e798..46599cc45a0 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
(pulsar) 01/02: [fix][broker] Check cursor state before adding it to the `waitingCursors` (#22191)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit ba8ff27d25200b371904161e9162d7c112848066 Author: Jiwei Guo AuthorDate: Thu Mar 28 06:53:21 2024 +0800 [fix][broker] Check cursor state before adding it to the `waitingCursors` (#22191) (cherry picked from commit b702d440dc5e5a4cfd845bf60d5e310efe665ff5) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 + .../service/persistent/PersistentTopicTest.java| 46 ++ 3 files changed, 57 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 7fd93dacf49..b5c16317f2b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -987,7 +987,7 @@ public class ManagedCursorImpl implements ManagedCursor { name); } // Let the managed ledger know we want to be notified whenever a new entry is published -ledger.waitingCursors.add(this); +ledger.addWaitingCursor(this); } else { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Skip notification registering since we do have entries available", diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 6727fc63479..73dfc86e1ad 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3804,6 +3804,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { this.waitingCursors.remove(cursor); } +public void addWaitingCursor(ManagedCursorImpl cursor) { +if (cursor instanceof NonDurableCursorImpl) { +if (cursor.isActive()) { +this.waitingCursors.add(cursor); +} +} else { +this.waitingCursors.add(cursor); +} +} + public boolean isCursorActive(ManagedCursor cursor) { return activeCursors.get(cursor.getName()) != null; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 4b4aa5b45d3..55024ca3d7d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -57,10 +57,15 @@ import java.util.function.Supplier; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.stats.PrometheusMetricsTest; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; @@ -74,6 +79,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; @@ -661,4 +667,44 @@ public class PersistentTopicTest extends BrokerTestBase { subscribe.close(); admin.topics().delete(topicName); } + +@Test +public void testAddWaitingCursorsForNonDurable() throws Exception { +final String ns = "prop/ns-test"; +admin.namespaces().createNamespace(ns, 2); +final String topicName = "persistent://prop/ns-test/testAddWaitingCursors"; +admin.topics().createNonPartitionedTopic(topicName); +final Optional topic = pulsar.getBrokerService().getTopic(topicName, false).join(); +
(pulsar) branch branch-3.0 updated (1045f8be626 -> 1b9ae2e8cb9)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git from 1045f8be626 [fix][client] Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp (#22363) new ba8ff27d252 [fix][broker] Check cursor state before adding it to the `waitingCursors` (#22191) new 1b9ae2e8cb9 [improve][misc] Remove the call to sun InetAddressCachePolicy (#22329) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 + .../service/persistent/PersistentTopicTest.java| 46 +++ .../pulsar/common/util/netty/DnsResolverUtil.java | 52 -- .../pulsar/common/util/netty/DnsResolverTest.java | 44 ++ 5 files changed, 139 insertions(+), 15 deletions(-)
(pulsar) branch master updated: [improve][misc] Remove the call to sun InetAddressCachePolicy (#22329)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new cce0b058efd [improve][misc] Remove the call to sun InetAddressCachePolicy (#22329) cce0b058efd is described below commit cce0b058efd55e2d5ac42c4ecaceddacee648a7c Author: Jian Yun AuthorDate: Thu Mar 28 06:59:28 2024 +0800 [improve][misc] Remove the call to sun InetAddressCachePolicy (#22329) Co-authored-by: Lari Hotari --- .../pulsar/common/util/netty/DnsResolverUtil.java | 52 -- .../pulsar/common/util/netty/DnsResolverTest.java | 44 ++ 2 files changed, 82 insertions(+), 14 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java index f49a6453c72..bcff83acd94 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java @@ -19,12 +19,20 @@ package org.apache.pulsar.common.util.netty; import io.netty.resolver.dns.DnsNameResolverBuilder; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; +import java.security.Security; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; @Slf4j public class DnsResolverUtil { + +private static final String CACHE_POLICY_PROP = "networkaddress.cache.ttl"; +private static final String CACHE_POLICY_PROP_FALLBACK = "sun.net.inetaddr.ttl"; +private static final String NEGATIVE_CACHE_POLICY_PROP = "networkaddress.cache.negative.ttl"; +private static final String NEGATIVE_CACHE_POLICY_PROP_FALLBACK = "sun.net.inetaddr.negative.ttl"; +/* default ttl value from sun.net.InetAddressCachePolicy.DEFAULT_POSITIVE, which is used when no security manager + is used */ +private static final int JDK_DEFAULT_TTL = 30; private static final int MIN_TTL = 0; private static final int TTL; private static final int NEGATIVE_TTL; @@ -39,19 +47,35 @@ public class DnsResolverUtil { int ttl = DEFAULT_TTL; int negativeTtl = DEFAULT_NEGATIVE_TTL; try { -// use reflection to call sun.net.InetAddressCachePolicy's get and getNegative methods for getting -// effective JDK settings for DNS caching -Class inetAddressCachePolicyClass = Class.forName("sun.net.InetAddressCachePolicy"); -Method getTTLMethod = inetAddressCachePolicyClass.getMethod("get"); -ttl = (Integer) getTTLMethod.invoke(null); -Method getNegativeTTLMethod = inetAddressCachePolicyClass.getMethod("getNegative"); -negativeTtl = (Integer) getNegativeTTLMethod.invoke(null); -} catch (NoSuchMethodException | ClassNotFoundException | InvocationTargetException - | IllegalAccessException e) { -log.warn("Cannot get DNS TTL settings from sun.net.InetAddressCachePolicy class", e); +String ttlStr = Security.getProperty(CACHE_POLICY_PROP); +if (ttlStr == null) { +// Compatible with sun.net.inetaddr.ttl settings +ttlStr = System.getProperty(CACHE_POLICY_PROP_FALLBACK); +} +String negativeTtlStr = Security.getProperty(NEGATIVE_CACHE_POLICY_PROP); +if (negativeTtlStr == null) { +// Compatible with sun.net.inetaddr.negative.ttl settings +negativeTtlStr = System.getProperty(NEGATIVE_CACHE_POLICY_PROP_FALLBACK); +} +ttl = Optional.ofNullable(ttlStr) +.map(Integer::decode) +.filter(i -> i > 0) +.orElseGet(() -> { +if (System.getSecurityManager() == null) { +return JDK_DEFAULT_TTL; +} +return DEFAULT_TTL; +}); + +negativeTtl = Optional.ofNullable(negativeTtlStr) +.map(Integer::decode) +.filter(i -> i >= 0) +.orElse(DEFAULT_NEGATIVE_TTL); +} catch (NumberFormatException e) { +log.warn("Cannot get DNS TTL settings", e); } -TTL = ttl <= 0 ? DEFAULT_TTL : ttl; -NEGATIVE_TTL = negativeTtl < 0 ? DEFAULT_NEGATIVE_TTL : negativeTtl; +TTL = ttl; +NEGATIVE_TTL = negativeTtl; } private DnsResolverUtil() { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java index 0ccb960e798..46599cc45a0 100644 ---
Re: [PR] [improve][misc] Remove the call to sun InetAddressCachePolicy [pulsar]
lhotari merged PR #22329: URL: https://github.com/apache/pulsar/pull/22329 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [fix][broker] Check cursor state before adding it to the `waitingCursors` (#22191)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new b702d440dc5 [fix][broker] Check cursor state before adding it to the `waitingCursors` (#22191) b702d440dc5 is described below commit b702d440dc5e5a4cfd845bf60d5e310efe665ff5 Author: Jiwei Guo AuthorDate: Thu Mar 28 06:53:21 2024 +0800 [fix][broker] Check cursor state before adding it to the `waitingCursors` (#22191) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 + .../service/persistent/PersistentTopicTest.java| 46 ++ 3 files changed, 57 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 8b13fc0f342..b253da72fa9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -992,7 +992,7 @@ public class ManagedCursorImpl implements ManagedCursor { name); } // Let the managed ledger know we want to be notified whenever a new entry is published -ledger.waitingCursors.add(this); +ledger.addWaitingCursor(this); } else { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Skip notification registering since we do have entries available", diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 1c0a0465507..0f089ef4a85 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3813,6 +3813,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { this.waitingCursors.remove(cursor); } +public void addWaitingCursor(ManagedCursorImpl cursor) { +if (cursor instanceof NonDurableCursorImpl) { +if (cursor.isActive()) { +this.waitingCursors.add(cursor); +} +} else { +this.waitingCursors.add(cursor); +} +} + public boolean isCursorActive(ManagedCursor cursor) { return activeCursors.get(cursor.getName()) != null; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index ea1a68bb0c2..d42b1d92007 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -59,10 +59,15 @@ import java.util.function.Supplier; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -75,6 +80,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; @@ -662,4 +668,44 @@ public class PersistentTopicTest extends BrokerTestBase { subscribe.close(); admin.topics().delete(topicName); } + +@Test +public void testAddWaitingCursorsForNonDurable() throws Exception { +final String ns = "prop/ns-test"; +admin.namespaces().createNamespace(ns, 2); +final String topicName = "persistent://prop/ns-test/testAddWaitingCursors"; +
Re: [PR] [fix][broker] Check cursor state before adding it to the `waitingCursors` [pulsar]
lhotari merged PR #22191: URL: https://github.com/apache/pulsar/pull/22191 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug] Broker memory leak [pulsar]
lhotari closed issue #22157: [Bug] Broker memory leak URL: https://github.com/apache/pulsar/issues/22157 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] [Bug] weird packet len issues on broker <-> metadatastore [pulsar]
KannarFr opened a new issue, #22373: URL: https://github.com/apache/pulsar/issues/22373 ### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [X] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### Version 3.2.1 ### Minimal reproduce step N/A ### What did you expect to see? N/A ### What did you see instead? Random `Packet len` exceptions for some days after months of running. A part of the logs https://gist.githubusercontent.com/KannarFr/8aec4e4100c422563aacb3b3b404cd8c/raw/c5844217d386f2267b402099829a893c10f41d50/gistfile1.txt. I'm pretty sure it can't be namespace policies/topics metadata as we limited the number of topics per namespace to 3000. Can it be a ledgers listing over packet len from ZK to broker? ### Anything else? _No response_ ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) 04/05: [fix][broker] Avoid execute prepareInitPoliciesCacheAsync if namespace is deleted (#22268)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit d4610a0d60139f8660c9d58bf2d8a7981f03d19a Author: hanmz AuthorDate: Mon Mar 18 06:45:02 2024 +0800 [fix][broker] Avoid execute prepareInitPoliciesCacheAsync if namespace is deleted (#22268) (cherry picked from commit 96d77f7e1d5b9c56070eaed5c31213a8144871d3) --- .../SystemTopicBasedTopicPoliciesService.java | 66 +- .../SystemTopicBasedTopicPoliciesServiceTest.java | 19 +++ 2 files changed, 58 insertions(+), 27 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 71f78e21f93..4e9e875bcf4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -324,34 +324,46 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic } } -private @Nonnull CompletableFuture prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) { +@VisibleForTesting +@Nonnull CompletableFuture prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) { requireNonNull(namespace); -return policyCacheInitMap.computeIfAbsent(namespace, (k) -> { -final CompletableFuture> readerCompletableFuture = -createSystemTopicClient(namespace); -readerCaches.put(namespace, readerCompletableFuture); -ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1)); -final CompletableFuture initFuture = readerCompletableFuture -.thenCompose(reader -> { -final CompletableFuture stageFuture = new CompletableFuture<>(); -initPolicesCache(reader, stageFuture); -return stageFuture -// Read policies in background -.thenAccept(__ -> readMorePoliciesAsync(reader)); -}); -initFuture.exceptionally(ex -> { -try { -log.error("[{}] Failed to create reader on __change_events topic", namespace, ex); -cleanCacheAndCloseReader(namespace, false); -} catch (Throwable cleanupEx) { -// Adding this catch to avoid break callback chain -log.error("[{}] Failed to cleanup reader on __change_events topic", namespace, cleanupEx); -} -return null; -}); -// let caller know we've got an exception. -return initFuture; -}); +return pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace) +.thenCompose(namespacePolicies -> { +if (namespacePolicies.isEmpty() || namespacePolicies.get().deleted) { +log.info("[{}] skip prepare init policies cache since the namespace is deleted", +namespace); +return CompletableFuture.completedFuture(null); +} + +return policyCacheInitMap.computeIfAbsent(namespace, (k) -> { +final CompletableFuture> readerCompletableFuture = +createSystemTopicClient(namespace); +readerCaches.put(namespace, readerCompletableFuture); + ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1)); +final CompletableFuture initFuture = readerCompletableFuture +.thenCompose(reader -> { +final CompletableFuture stageFuture = new CompletableFuture<>(); +initPolicesCache(reader, stageFuture); +return stageFuture +// Read policies in background +.thenAccept(__ -> readMorePoliciesAsync(reader)); +}); +initFuture.exceptionally(ex -> { +try { +log.error("[{}] Failed to create reader on __change_events topic", +namespace, ex); +
(pulsar) 01/05: [improve][client] Add backoff for `seek` (#20963)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 9ea7f6014b51635fc337acd0e264cf444cdd3a2a Author: Jiwei Guo AuthorDate: Mon Aug 21 15:10:49 2023 +0800 [improve][client] Add backoff for `seek` (#20963) (cherry picked from commit ee91edc3e08c87db1e5662a553ff62af0c1886e5) --- .../apache/pulsar/client/impl/ConsumerImpl.java| 154 +++-- .../pulsar/client/impl/ConsumerImplTest.java | 13 +- 2 files changed, 93 insertions(+), 74 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index cfb5f309684..717355abbef 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2225,100 +2225,108 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle new PulsarClientException("Only support seek by messageId or timestamp")); } -private Optional> seekAsyncCheckState(String seekBy) { -if (getState() == State.Closing || getState() == State.Closed) { -return Optional.of(FutureUtil -.failedFuture(new PulsarClientException.AlreadyClosedException( -String.format("The consumer %s was already closed when seeking the subscription %s of the" -+ " topic %s to %s", consumerName, subscription, topicName.toString(), seekBy; -} - -if (!isConnected()) { -return Optional.of(FutureUtil.failedFuture(new PulsarClientException( -String.format("The client is not connected to the broker when seeking the subscription %s of the " -+ "topic %s to %s", subscription, topicName.toString(), seekBy; -} +private CompletableFuture seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) { +AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs()); +Backoff backoff = new BackoffBuilder() +.setInitialTime(100, TimeUnit.MILLISECONDS) +.setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS) +.setMandatoryStop(0, TimeUnit.MILLISECONDS) +.create(); -return Optional.empty(); +CompletableFuture seekFuture = new CompletableFuture<>(); +seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs, seekFuture); +return seekFuture; } -private CompletableFuture seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) { -final CompletableFuture seekFuture = new CompletableFuture<>(); +private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy, + final Backoff backoff, final AtomicLong remainingTime, + CompletableFuture seekFuture) { ClientCnx cnx = cnx(); +if (isConnected() && cnx != null) { +if (!duringSeek.compareAndSet(false, true)) { +final String message = String.format( +"[%s][%s] attempting to seek operation that is already in progress (seek by %s)", +topic, subscription, seekBy); +log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}", +topic, subscription, seekBy); +seekFuture.completeExceptionally(new IllegalStateException(message)); +return; +} +MessageIdAdv originSeekMessageId = seekMessageId; +seekMessageId = (MessageIdAdv) seekId; +log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy); -if (!duringSeek.compareAndSet(false, true)) { -final String message = String.format( -"[%s][%s] attempting to seek operation that is already in progress (seek by %s)", -topic, subscription, seekBy); -log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}", -topic, subscription, seekBy); -seekFuture.completeExceptionally(new IllegalStateException(message)); -return seekFuture; -} - -MessageIdAdv originSeekMessageId = seekMessageId; -seekMessageId = (MessageIdAdv) seekId; -log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy); - -cnx.sendRequestWithId(seek, requestId).thenRun(() -> { -log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy); -
(pulsar) 03/05: [fix][broker] Fix wrong double-checked locking for readOnActiveConsumerTask in dispatcher (#22279)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit e2070a87d556816a831a60c8056fb6cbddfb61c1 Author: Yunze Xu AuthorDate: Sat Mar 16 14:56:34 2024 +0800 [fix][broker] Fix wrong double-checked locking for readOnActiveConsumerTask in dispatcher (#22279) (cherry picked from commit 4e0c145c89a35ec9b41fa22862edac59e28d892d) --- .../PersistentDispatcherSingleActiveConsumer.java | 26 +- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index bf6482bda01..cc7b6841e5c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -71,6 +71,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher protected volatile int readBatchSize; protected final Backoff readFailureBackoff; private volatile ScheduledFuture readOnActiveConsumerTask = null; +private final Object lockForReadOnActiveConsumerTask = new Object(); private final RedeliveryTracker redeliveryTracker; @@ -120,18 +121,23 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher return; } -readOnActiveConsumerTask = topic.getBrokerService().executor().schedule(() -> { -if (log.isDebugEnabled()) { -log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name, - serviceConfig.getActiveConsumerFailoverDelayTimeMillis()); +synchronized (lockForReadOnActiveConsumerTask) { +if (readOnActiveConsumerTask != null) { +return; } -Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this); -cursor.rewind(activeConsumer != null && activeConsumer.readCompacted()); +readOnActiveConsumerTask = topic.getBrokerService().executor().schedule(() -> { +if (log.isDebugEnabled()) { +log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name, + serviceConfig.getActiveConsumerFailoverDelayTimeMillis()); +} +Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this); +cursor.rewind(activeConsumer != null && activeConsumer.readCompacted()); -notifyActiveConsumerChanged(activeConsumer); -readMoreEntries(activeConsumer); -readOnActiveConsumerTask = null; -}, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS); +notifyActiveConsumerChanged(activeConsumer); +readMoreEntries(activeConsumer); +readOnActiveConsumerTask = null; +}, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS); +} } @Override
(pulsar) 05/05: [fix][client] Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp (#22363)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 1045f8be626981c818c6eddf30f5732d25dbac66 Author: Yunze Xu AuthorDate: Wed Mar 27 19:49:27 2024 +0800 [fix][client] Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp (#22363) Co-authored-by: Lari Hotari (cherry picked from commit 149deaa5a79ed8570489bead4215ae213a4e9206) --- .../org/apache/pulsar/client/impl/ReaderTest.java | 85 +++--- .../apache/pulsar/client/impl/ConsumerImpl.java| 25 --- 2 files changed, 90 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index d511c6dc37f..00c3eadb06a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -44,6 +44,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; @@ -64,8 +65,8 @@ import org.apache.pulsar.common.util.Murmur3_32Hash; import org.apache.pulsar.schema.Schemas; import org.awaitility.Awaitility; import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -75,7 +76,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest { private static final String subscription = "reader-sub"; -@BeforeMethod +@BeforeClass(alwaysRun = true) @Override protected void setup() throws Exception { super.internalSetup(); @@ -87,7 +88,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest { admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test")); } -@AfterMethod(alwaysRun = true) +@AfterClass(alwaysRun = true) @Override protected void cleanup() throws Exception { super.internalCleanup(); @@ -146,21 +147,41 @@ public class ReaderTest extends MockedPulsarServiceBaseTest { testReadMessages(topic, true); } -@Test -public void testReadMessageWithBatchingWithMessageInclusive() throws Exception { +@DataProvider +public static Object[][] seekBeforeHasMessageAvailable() { +return new Object[][] { { true }, { false } }; +} + +@Test(timeOut = 2, dataProvider = "seekBeforeHasMessageAvailable") +public void testReadMessageWithBatchingWithMessageInclusive(boolean seekBeforeHasMessageAvailable) +throws Exception { String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching-inclusive"; Set keys = publishMessages(topic, 10, true); Reader reader = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest) .startMessageIdInclusive().readerName(subscription).create(); -while (reader.hasMessageAvailable()) { -Assert.assertTrue(keys.remove(reader.readNext().getKey())); +if (seekBeforeHasMessageAvailable) { +reader.seek(0L); // it should seek to the earliest } + +assertTrue(reader.hasMessageAvailable()); +final Message msg = reader.readNext(); +assertTrue(keys.remove(msg.getKey())); // start from latest with start message inclusive should only read the last message in batch assertEquals(keys.size(), 9); -Assert.assertFalse(keys.contains("key9")); -Assert.assertFalse(reader.hasMessageAvailable()); + +final MessageIdAdv msgId = (MessageIdAdv) msg.getMessageId(); +if (seekBeforeHasMessageAvailable) { +assertEquals(msgId.getBatchIndex(), 0); +assertFalse(keys.contains("key0")); +assertTrue(reader.hasMessageAvailable()); +} else { +assertEquals(msgId.getBatchIndex(), 9); +assertFalse(reader.hasMessageAvailable()); +assertFalse(keys.contains("key9")); +assertFalse(reader.hasMessageAvailable()); +} } private void testReadMessages(String topic, boolean enableBatch) throws Exception { @@ -258,7 +279,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest { @Test public void testReaderWithTimeLong() throws Exception { String ns = "my-property/my-ns"; -
(pulsar) branch branch-3.0 updated (31c9f44cb53 -> 1045f8be626)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git from 31c9f44cb53 [fix][misc] Make ConcurrentBitSet thread safe (#22361) new 9ea7f6014b5 [improve][client] Add backoff for `seek` (#20963) new 318ff3398d3 [fix][client] fix Reader.hasMessageAvailable might return true after seeking to latest (#22201) new e2070a87d55 [fix][broker] Fix wrong double-checked locking for readOnActiveConsumerTask in dispatcher (#22279) new d4610a0d601 [fix][broker] Avoid execute prepareInitPoliciesCacheAsync if namespace is deleted (#22268) new 1045f8be626 [fix][client] Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp (#22363) The 5 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../SystemTopicBasedTopicPoliciesService.java | 66 +++--- .../PersistentDispatcherSingleActiveConsumer.java | 26 ++- .../SystemTopicBasedTopicPoliciesServiceTest.java | 19 ++ .../org/apache/pulsar/client/impl/ReaderTest.java | 112 +- .../apache/pulsar/client/impl/ConsumerImpl.java| 227 + .../pulsar/client/impl/ConsumerImplTest.java | 15 +- 6 files changed, 325 insertions(+), 140 deletions(-)
(pulsar) 02/05: [fix][client] fix Reader.hasMessageAvailable might return true after seeking to latest (#22201)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 318ff3398d39fa16010a664c747adbe77fd44d20 Author: Yunze Xu AuthorDate: Thu Mar 7 11:32:18 2024 +0800 [fix][client] fix Reader.hasMessageAvailable might return true after seeking to latest (#22201) (cherry picked from commit 95a53f3a033c4e57db2ddb2d1f0e9a4bc8b9f441) --- .../org/apache/pulsar/client/impl/ReaderTest.java | 27 ++ .../apache/pulsar/client/impl/ConsumerImpl.java| 104 ++--- .../pulsar/client/impl/ConsumerImplTest.java | 2 +- 3 files changed, 96 insertions(+), 37 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index 2f91d792581..d511c6dc37f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -66,6 +66,7 @@ import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @@ -761,4 +762,30 @@ public class ReaderTest extends MockedPulsarServiceBaseTest { producer.close(); admin.topics().delete(topic, false); } + +@DataProvider +public static Object[][] initializeLastMessageIdInBroker() { +return new Object[][] { { true }, { false } }; +} + +@Test(dataProvider = "initializeLastMessageIdInBroker") +public void testHasMessageAvailableAfterSeek(boolean initializeLastMessageIdInBroker) throws Exception { +final String topic = "persistent://my-property/my-ns/test-has-message-available-after-seek"; +@Cleanup Reader reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1) +.startMessageId(MessageId.earliest).create(); + +@Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); +producer.send("msg"); + +if (initializeLastMessageIdInBroker) { +assertTrue(reader.hasMessageAvailable()); +} // else: lastMessageIdInBroker is earliest + +reader.seek(MessageId.latest); +// lastMessageIdInBroker is the last message ID, while startMessageId is still earliest +assertFalse(reader.hasMessageAvailable()); + +producer.send("msg"); +assertTrue(reader.hasMessageAvailable()); +} } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 717355abbef..dae53c59d6e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -166,7 +166,9 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private volatile MessageIdAdv startMessageId; private volatile MessageIdAdv seekMessageId; -private final AtomicBoolean duringSeek; +@VisibleForTesting +final AtomicReference seekStatus; +private volatile CompletableFuture seekFuture; private final MessageIdAdv initialStartMessageId; @@ -296,7 +298,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle stats = ConsumerStatsDisabled.INSTANCE; } -duringSeek = new AtomicBoolean(false); +seekStatus = new AtomicReference<>(SeekStatus.NOT_STARTED); // Create msgCrypto if not created already if (conf.getCryptoKeyReader() != null) { @@ -771,7 +773,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle closeConsumerTasks(); deregisterFromClientCnx(); client.cleanupConsumer(this); -clearReceiverQueue(); +clearReceiverQueue(false); return CompletableFuture.completedFuture(null); } @@ -779,7 +781,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle topic, subscription, cnx.ctx().channel(), consumerId); long requestId = client.newRequestId(); -if (duringSeek.get()) { +if (seekStatus.get() != SeekStatus.NOT_STARTED) { acknowledgmentsGroupingTracker.flushAndClean(); } @@ -790,7 +792,8 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle int currentSize; synchronized (this) { currentSize = incomingMessages.size(); -startMessageId = clearReceiverQueue(); +setClientCnx(cnx); +clearReceiverQueue(true); if (possibleSendToDeadLetterTopicMessages !=
Re: [PR] [fix][client] Fix Reader.hasMessageAvailable might return true after seeking to latest [pulsar]
lhotari commented on PR #22201: URL: https://github.com/apache/pulsar/pull/22201#issuecomment-2024096913 It's necessary to cherry-pick #20963 so that this fix could be applied cleanly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.0 updated (6e0ebcbaad6 -> 31c9f44cb53)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git from 6e0ebcbaad6 [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (#22252) new 20be6ca977e [fix][broker] Avoid expired unclosed ledgers when checking expired messages by ledger closure time (#22335) new 31c9f44cb53 [fix][misc] Make ConcurrentBitSet thread safe (#22361) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../persistent/PersistentMessageExpiryMonitor.java | 4 +- .../service/PersistentMessageFinderTest.java | 51 ++- .../common/util/collections/ConcurrentBitSet.java | 363 +++-- 3 files changed, 377 insertions(+), 41 deletions(-)
(pulsar) 02/02: [fix][misc] Make ConcurrentBitSet thread safe (#22361)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 31c9f44cb53f4bd115c2e471a817014827e49ebc Author: Lari Hotari AuthorDate: Wed Mar 27 09:16:22 2024 -0700 [fix][misc] Make ConcurrentBitSet thread safe (#22361) (cherry picked from commit edd0076bd83f01a5fcbe81c8396667014f0fc36e) --- .../common/util/collections/ConcurrentBitSet.java | 363 +++-- 1 file changed, 331 insertions(+), 32 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java index 23842fe5b55..a37628cb300 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java @@ -20,12 +20,13 @@ package org.apache.pulsar.common.util.collections; import java.util.BitSet; import java.util.concurrent.locks.StampedLock; -import lombok.EqualsAndHashCode; +import java.util.stream.IntStream; /** - * Safe multithreaded version of {@code BitSet}. + * A {@code BitSet} that is protected by a {@code StampedLock} to provide thread-safe access. + * The {@link #length()} method is not thread safe and is not overridden because StampedLock is not reentrant. + * Use the {@link #safeLength()} method to get the length of the bit set in a thread-safe manner. */ -@EqualsAndHashCode(callSuper = true) public class ConcurrentBitSet extends BitSet { private static final long serialVersionUID = 1L; @@ -39,10 +40,8 @@ public class ConcurrentBitSet extends BitSet { * Creates a bit set whose initial size is large enough to explicitly represent bits with indices in the range * {@code 0} through {@code nbits-1}. All bits are initially {@code false}. * - * @param nbits - *the initial size of the bit set - * @throws NegativeArraySizeException - * if the specified initial size is negative + * @param nbits the initial size of the bit set + * @throws NegativeArraySizeException if the specified initial size is negative */ public ConcurrentBitSet(int nbits) { super(nbits); @@ -65,105 +64,405 @@ public class ConcurrentBitSet extends BitSet { @Override public void set(int bitIndex) { +long stamp = rwLock.writeLock(); +try { +super.set(bitIndex); +} finally { +rwLock.unlockWrite(stamp); +} +} + +@Override +public void clear(int bitIndex) { +long stamp = rwLock.writeLock(); +try { +super.clear(bitIndex); +} finally { +rwLock.unlockWrite(stamp); +} +} + +@Override +public void set(int fromIndex, int toIndex) { +long stamp = rwLock.writeLock(); +try { +super.set(fromIndex, toIndex); +} finally { +rwLock.unlockWrite(stamp); +} +} + +@Override +public void clear(int fromIndex, int toIndex) { +long stamp = rwLock.writeLock(); +try { +super.clear(fromIndex, toIndex); +} finally { +rwLock.unlockWrite(stamp); +} +} + +@Override +public void clear() { +long stamp = rwLock.writeLock(); +try { +super.clear(); +} finally { +rwLock.unlockWrite(stamp); +} +} + +@Override +public int nextSetBit(int fromIndex) { long stamp = rwLock.tryOptimisticRead(); -super.set(bitIndex); +int nextSetBit = super.nextSetBit(fromIndex); if (!rwLock.validate(stamp)) { +// Fallback to read lock stamp = rwLock.readLock(); try { -super.set(bitIndex); +nextSetBit = super.nextSetBit(fromIndex); } finally { rwLock.unlockRead(stamp); } } +return nextSetBit; } @Override -public void set(int fromIndex, int toIndex) { +public int nextClearBit(int fromIndex) { long stamp = rwLock.tryOptimisticRead(); -super.set(fromIndex, toIndex); +int nextClearBit = super.nextClearBit(fromIndex); if (!rwLock.validate(stamp)) { +// Fallback to read lock stamp = rwLock.readLock(); try { -super.set(fromIndex, toIndex); +nextClearBit = super.nextClearBit(fromIndex); } finally { rwLock.unlockRead(stamp); } } +return nextClearBit; } @Override -public int nextSetBit(int fromIndex) { +public int previousSetBit(int fromIndex) { long stamp = rwLock.tryOptimisticRead(); -int bit
(pulsar) 01/02: [fix][broker] Avoid expired unclosed ledgers when checking expired messages by ledger closure time (#22335)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 20be6ca977e78c7defa14883becb84397003659d Author: Cong Zhao AuthorDate: Thu Mar 28 03:42:15 2024 +0800 [fix][broker] Avoid expired unclosed ledgers when checking expired messages by ledger closure time (#22335) (cherry picked from commit f77fe5f099f7ecc334509db07bba477c4226cf19) --- .../persistent/PersistentMessageExpiryMonitor.java | 4 +- .../service/PersistentMessageFinderTest.java | 51 +++--- 2 files changed, 46 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index d82f3d93f8f..51ce2c1b8ab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -116,8 +116,8 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback { managedLedger.getLedgersInfo().lastKey(), true); MLDataFormats.ManagedLedgerInfo.LedgerInfo info = null; for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgerInfoSortedMap.values()) { -if (!ledgerInfo.hasTimestamp() || !MessageImpl.isEntryExpired(messageTTLInSeconds, -ledgerInfo.getTimestamp())) { +if (!ledgerInfo.hasTimestamp() || ledgerInfo.getTimestamp() == 0L +|| !MessageImpl.isEntryExpired(messageTTLInSeconds, ledgerInfo.getTimestamp())) { break; } info = ledgerInfo; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index d56968c8f8e..194747f59fc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -33,10 +33,8 @@ import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; - import io.netty.buffer.ByteBuf; import io.netty.buffer.UnpooledByteBufAllocator; - import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashSet; @@ -46,7 +44,9 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - +import java.util.concurrent.atomic.AtomicReference; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.MediaType; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -59,6 +59,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; @@ -71,11 +72,10 @@ import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils; import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.protocol.Commands; import org.awaitility.Awaitility; +import org.mockito.Mockito; +import org.testng.Assert; import org.testng.annotations.Test; -import javax.ws.rs.client.Entity; -import javax.ws.rs.core.MediaType; - @Test(groups = "broker") public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { @@ -261,7 +261,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { ledger.addEntry(createMessageWrittenToLedger("msg2")); ledger.addEntry(createMessageWrittenToLedger("msg3")); Position lastPosition = ledger.addEntry(createMessageWrittenToLedger("last-message")); - + long endTimestamp = System.currentTimeMillis() + 1000; Result result = new Result(); @@ -451,6 +451,43 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { assertEquals(c1.getNumberOfEntriesInBacklog(true), 0); } +@Test +public void
(pulsar) 01/02: [fix][misc] Make ConcurrentBitSet thread safe (#22361)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit cc79803cb7e4caf732e37e1879ae49f3dfbbd290 Author: Lari Hotari AuthorDate: Wed Mar 27 09:16:22 2024 -0700 [fix][misc] Make ConcurrentBitSet thread safe (#22361) (cherry picked from commit edd0076bd83f01a5fcbe81c8396667014f0fc36e) --- .../common/util/collections/ConcurrentBitSet.java | 363 +++-- 1 file changed, 331 insertions(+), 32 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java index 23842fe5b55..a37628cb300 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java @@ -20,12 +20,13 @@ package org.apache.pulsar.common.util.collections; import java.util.BitSet; import java.util.concurrent.locks.StampedLock; -import lombok.EqualsAndHashCode; +import java.util.stream.IntStream; /** - * Safe multithreaded version of {@code BitSet}. + * A {@code BitSet} that is protected by a {@code StampedLock} to provide thread-safe access. + * The {@link #length()} method is not thread safe and is not overridden because StampedLock is not reentrant. + * Use the {@link #safeLength()} method to get the length of the bit set in a thread-safe manner. */ -@EqualsAndHashCode(callSuper = true) public class ConcurrentBitSet extends BitSet { private static final long serialVersionUID = 1L; @@ -39,10 +40,8 @@ public class ConcurrentBitSet extends BitSet { * Creates a bit set whose initial size is large enough to explicitly represent bits with indices in the range * {@code 0} through {@code nbits-1}. All bits are initially {@code false}. * - * @param nbits - *the initial size of the bit set - * @throws NegativeArraySizeException - * if the specified initial size is negative + * @param nbits the initial size of the bit set + * @throws NegativeArraySizeException if the specified initial size is negative */ public ConcurrentBitSet(int nbits) { super(nbits); @@ -65,105 +64,405 @@ public class ConcurrentBitSet extends BitSet { @Override public void set(int bitIndex) { +long stamp = rwLock.writeLock(); +try { +super.set(bitIndex); +} finally { +rwLock.unlockWrite(stamp); +} +} + +@Override +public void clear(int bitIndex) { +long stamp = rwLock.writeLock(); +try { +super.clear(bitIndex); +} finally { +rwLock.unlockWrite(stamp); +} +} + +@Override +public void set(int fromIndex, int toIndex) { +long stamp = rwLock.writeLock(); +try { +super.set(fromIndex, toIndex); +} finally { +rwLock.unlockWrite(stamp); +} +} + +@Override +public void clear(int fromIndex, int toIndex) { +long stamp = rwLock.writeLock(); +try { +super.clear(fromIndex, toIndex); +} finally { +rwLock.unlockWrite(stamp); +} +} + +@Override +public void clear() { +long stamp = rwLock.writeLock(); +try { +super.clear(); +} finally { +rwLock.unlockWrite(stamp); +} +} + +@Override +public int nextSetBit(int fromIndex) { long stamp = rwLock.tryOptimisticRead(); -super.set(bitIndex); +int nextSetBit = super.nextSetBit(fromIndex); if (!rwLock.validate(stamp)) { +// Fallback to read lock stamp = rwLock.readLock(); try { -super.set(bitIndex); +nextSetBit = super.nextSetBit(fromIndex); } finally { rwLock.unlockRead(stamp); } } +return nextSetBit; } @Override -public void set(int fromIndex, int toIndex) { +public int nextClearBit(int fromIndex) { long stamp = rwLock.tryOptimisticRead(); -super.set(fromIndex, toIndex); +int nextClearBit = super.nextClearBit(fromIndex); if (!rwLock.validate(stamp)) { +// Fallback to read lock stamp = rwLock.readLock(); try { -super.set(fromIndex, toIndex); +nextClearBit = super.nextClearBit(fromIndex); } finally { rwLock.unlockRead(stamp); } } +return nextClearBit; } @Override -public int nextSetBit(int fromIndex) { +public int previousSetBit(int fromIndex) { long stamp = rwLock.tryOptimisticRead(); -int bit
(pulsar) 02/02: [fix][broker] Avoid expired unclosed ledgers when checking expired messages by ledger closure time (#22335)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit f56c383a5312ef9186880662860b8d2f38b3c0c8 Author: Cong Zhao AuthorDate: Thu Mar 28 03:42:15 2024 +0800 [fix][broker] Avoid expired unclosed ledgers when checking expired messages by ledger closure time (#22335) (cherry picked from commit f77fe5f099f7ecc334509db07bba477c4226cf19) --- .../persistent/PersistentMessageExpiryMonitor.java | 4 +- .../service/PersistentMessageFinderTest.java | 51 +++--- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index ac391c10503..2478a7a2538 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -121,8 +121,8 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag managedLedger.getLedgersInfo().lastKey(), true); MLDataFormats.ManagedLedgerInfo.LedgerInfo info = null; for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgerInfoSortedMap.values()) { -if (!ledgerInfo.hasTimestamp() || !MessageImpl.isEntryExpired(messageTTLInSeconds, -ledgerInfo.getTimestamp())) { +if (!ledgerInfo.hasTimestamp() || ledgerInfo.getTimestamp() == 0L +|| !MessageImpl.isEntryExpired(messageTTLInSeconds, ledgerInfo.getTimestamp())) { break; } info = ledgerInfo; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index ace552a55a7..6883c0467e4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -33,10 +33,8 @@ import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; - import io.netty.buffer.ByteBuf; import io.netty.buffer.UnpooledByteBufAllocator; - import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashSet; @@ -46,7 +44,9 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - +import java.util.concurrent.atomic.AtomicReference; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.MediaType; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -59,6 +59,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; @@ -72,11 +73,10 @@ import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils; import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.protocol.Commands; import org.awaitility.Awaitility; +import org.mockito.Mockito; +import org.testng.Assert; import org.testng.annotations.Test; -import javax.ws.rs.client.Entity; -import javax.ws.rs.core.MediaType; - @Test(groups = "broker") public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { @@ -463,6 +463,45 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { assertEquals(c1.getNumberOfEntriesInBacklog(true), 0); } +@Test +public void testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger() throws Throwable { +final String ledgerAndCursorName = "testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger"; +int maxTTLSeconds = 1; +ManagedLedgerConfig config = new ManagedLedgerConfig(); +config.setMaxEntriesPerLedger(5); +ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config); +ManagedCursorImpl c1 =
(pulsar) branch branch-3.2 updated (21e56955d5e -> f56c383a531)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git from 21e56955d5e [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (#22252) new cc79803cb7e [fix][misc] Make ConcurrentBitSet thread safe (#22361) new f56c383a531 [fix][broker] Avoid expired unclosed ledgers when checking expired messages by ledger closure time (#22335) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../persistent/PersistentMessageExpiryMonitor.java | 4 +- .../service/PersistentMessageFinderTest.java | 51 ++- .../common/util/collections/ConcurrentBitSet.java | 363 +++-- 3 files changed, 378 insertions(+), 40 deletions(-)
(pulsar) branch branch-2.11 updated: [fix][broker][branch-2.11] Fast fix infinite HTTP call createSubscriptions caused by wrong topicName (#22346)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 2cee88c8f08 [fix][broker][branch-2.11] Fast fix infinite HTTP call createSubscriptions caused by wrong topicName (#22346) 2cee88c8f08 is described below commit 2cee88c8f08ebd08e6447f18aa44c3af00026974 Author: fengyubiao AuthorDate: Thu Mar 28 03:43:08 2024 +0800 [fix][broker][branch-2.11] Fast fix infinite HTTP call createSubscriptions caused by wrong topicName (#22346) --- .../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java| 2 +- .../api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java| 9 ++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 8f6191938ed..2cb8495034b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2234,7 +2234,7 @@ public class PersistentTopicsBase extends AdminResource { getPartitionedTopicMetadataAsync(topicName, authoritative, allowAutoTopicCreation)) .thenAccept(partitionMetadata -> { final int numPartitions = partitionMetadata.partitions; -if (numPartitions > 0) { +if (partitionMetadata.partitions > 0 && !isUnexpectedTopicName(partitionMetadata)) { final CompletableFuture future = new CompletableFuture<>(); final AtomicInteger count = new AtomicInteger(numPartitions); final AtomicInteger failureCount = new AtomicInteger(0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java index 3b1222f5b55..a295a48b4fd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java @@ -53,7 +53,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerCo } @Test -public void testInfiniteHttpCallGetSubscriptions() throws Exception { +public void testInfiniteHttpCallGetOrCreateSubscriptions() throws Exception { final String randomStr = UUID.randomUUID().toString().replaceAll("-", ""); final String partitionedTopicName = "persistent://my-property/my-ns/tp1_" + randomStr; final String topic_p0 = partitionedTopicName + TopicName.PARTITIONED_TOPIC_SUFFIX + "0"; @@ -65,6 +65,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerCo // Do test. ProducerAndConsumerEntry pcEntry = triggerDLQCreated(topic_p0, topicDLQ, subscriptionName); admin.topics().getSubscriptions(topicDLQ); +admin.topics().createSubscription(topicDLQ, "s1", MessageId.earliest); // cleanup. pcEntry.consumer.close(); @@ -73,7 +74,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerCo } @Test -public void testInfiniteHttpCallGetSubscriptions2() throws Exception { +public void testInfiniteHttpCallGetOrCreateSubscriptions2() throws Exception { final String randomStr = UUID.randomUUID().toString().replaceAll("-", ""); final String topicName = "persistent://my-property/my-ns/tp1_" + randomStr + "-partition-0-abc"; Producer producer = pulsarClient.newProducer(Schema.STRING) @@ -82,13 +83,14 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerCo // Do test. admin.topics().getSubscriptions(topicName); +admin.topics().createSubscription(topicName, "s1", MessageId.earliest); // cleanup. producer.close(); } @Test -public void testInfiniteHttpCallGetSubscriptions3() throws Exception { +public void testInfiniteHttpCallGetOrCreateSubscriptions3() throws Exception { final String randomStr = UUID.randomUUID().toString().replaceAll("-", ""); final String topicName = "persistent://my-property/my-ns/tp1_" + randomStr + "-partition-0"; Producer producer = pulsarClient.newProducer(Schema.STRING) @@ -97,6 +99,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerCo // Do test. admin.topics().getSubscriptions(topicName); +
Re: [PR] [fix][broker][branch-2.11] Fast fix infinite HTTP call createSubscriptions caused by wrong topicName [pulsar]
lhotari merged PR #22346: URL: https://github.com/apache/pulsar/pull/22346 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [fix][broker] Avoid expired unclosed ledgers when checking expired messages by ledger closure time (#22335)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new f77fe5f099f [fix][broker] Avoid expired unclosed ledgers when checking expired messages by ledger closure time (#22335) f77fe5f099f is described below commit f77fe5f099f7ecc334509db07bba477c4226cf19 Author: Cong Zhao AuthorDate: Thu Mar 28 03:42:15 2024 +0800 [fix][broker] Avoid expired unclosed ledgers when checking expired messages by ledger closure time (#22335) --- .../persistent/PersistentMessageExpiryMonitor.java | 4 +- .../service/PersistentMessageFinderTest.java | 51 +++--- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index ac391c10503..2478a7a2538 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -121,8 +121,8 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag managedLedger.getLedgersInfo().lastKey(), true); MLDataFormats.ManagedLedgerInfo.LedgerInfo info = null; for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgerInfoSortedMap.values()) { -if (!ledgerInfo.hasTimestamp() || !MessageImpl.isEntryExpired(messageTTLInSeconds, -ledgerInfo.getTimestamp())) { +if (!ledgerInfo.hasTimestamp() || ledgerInfo.getTimestamp() == 0L +|| !MessageImpl.isEntryExpired(messageTTLInSeconds, ledgerInfo.getTimestamp())) { break; } info = ledgerInfo; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index ace552a55a7..6883c0467e4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -33,10 +33,8 @@ import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; - import io.netty.buffer.ByteBuf; import io.netty.buffer.UnpooledByteBufAllocator; - import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashSet; @@ -46,7 +44,9 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - +import java.util.concurrent.atomic.AtomicReference; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.MediaType; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -59,6 +59,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; @@ -72,11 +73,10 @@ import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils; import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.protocol.Commands; import org.awaitility.Awaitility; +import org.mockito.Mockito; +import org.testng.Assert; import org.testng.annotations.Test; -import javax.ws.rs.client.Entity; -import javax.ws.rs.core.MediaType; - @Test(groups = "broker") public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { @@ -463,6 +463,45 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { assertEquals(c1.getNumberOfEntriesInBacklog(true), 0); } +@Test +public void testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger() throws Throwable { +final String ledgerAndCursorName = "testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger"; +int maxTTLSeconds = 1; +ManagedLedgerConfig config = new ManagedLedgerConfig(); +
Re: [PR] [fix][broker] Avoid expired unclosed ledgers when checking expired messages by ledger closure time [pulsar]
lhotari merged PR #22335: URL: https://github.com/apache/pulsar/pull/22335 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][client] the nullValue in msgMetadata should be true by default [pulsar]
lhotari commented on code in PR #22372: URL: https://github.com/apache/pulsar/pull/22372#discussion_r1541925716 ## pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java: ## @@ -141,9 +142,9 @@ public TypedMessageBuilder orderingKey(byte[] orderingKey) { @Override public TypedMessageBuilder value(T value) { if (value == null) { -msgMetadata.setNullValue(true); Review Comment: this shouldn't be removed since `.value` might be called multiple times. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [improve][misc] Upgrade to Netty 4.1.108 and tcnative 2.0.65 (#22369)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new be0a9d9d9bb [improve][misc] Upgrade to Netty 4.1.108 and tcnative 2.0.65 (#22369) be0a9d9d9bb is described below commit be0a9d9d9bb23dabc065f091b853f27c0ebcaa16 Author: Lari Hotari AuthorDate: Wed Mar 27 12:34:14 2024 -0700 [improve][misc] Upgrade to Netty 4.1.108 and tcnative 2.0.65 (#22369) --- distribution/server/src/assemble/LICENSE.bin.txt | 56 distribution/shell/src/assemble/LICENSE.bin.txt | 54 +++ pom.xml | 2 +- 3 files changed, 56 insertions(+), 56 deletions(-) diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index cb99d62edfe..cab23db279a 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -292,34 +292,34 @@ The Apache Software License, Version 2.0 - org.apache.commons-commons-lang3-3.11.jar - org.apache.commons-commons-text-1.10.0.jar * Netty -- io.netty-netty-buffer-4.1.105.Final.jar -- io.netty-netty-codec-4.1.105.Final.jar -- io.netty-netty-codec-dns-4.1.105.Final.jar -- io.netty-netty-codec-http-4.1.105.Final.jar -- io.netty-netty-codec-http2-4.1.105.Final.jar -- io.netty-netty-codec-socks-4.1.105.Final.jar -- io.netty-netty-codec-haproxy-4.1.105.Final.jar -- io.netty-netty-common-4.1.105.Final.jar -- io.netty-netty-handler-4.1.105.Final.jar -- io.netty-netty-handler-proxy-4.1.105.Final.jar -- io.netty-netty-resolver-4.1.105.Final.jar -- io.netty-netty-resolver-dns-4.1.105.Final.jar -- io.netty-netty-resolver-dns-classes-macos-4.1.105.Final.jar -- io.netty-netty-resolver-dns-native-macos-4.1.105.Final-osx-aarch_64.jar -- io.netty-netty-resolver-dns-native-macos-4.1.105.Final-osx-x86_64.jar -- io.netty-netty-transport-4.1.105.Final.jar -- io.netty-netty-transport-classes-epoll-4.1.105.Final.jar -- io.netty-netty-transport-native-epoll-4.1.105.Final-linux-aarch_64.jar -- io.netty-netty-transport-native-epoll-4.1.105.Final-linux-x86_64.jar -- io.netty-netty-transport-native-unix-common-4.1.105.Final.jar -- io.netty-netty-transport-native-unix-common-4.1.105.Final-linux-x86_64.jar -- io.netty-netty-tcnative-boringssl-static-2.0.61.Final.jar -- io.netty-netty-tcnative-boringssl-static-2.0.61.Final-linux-aarch_64.jar -- io.netty-netty-tcnative-boringssl-static-2.0.61.Final-linux-x86_64.jar -- io.netty-netty-tcnative-boringssl-static-2.0.61.Final-osx-aarch_64.jar -- io.netty-netty-tcnative-boringssl-static-2.0.61.Final-osx-x86_64.jar -- io.netty-netty-tcnative-boringssl-static-2.0.61.Final-windows-x86_64.jar -- io.netty-netty-tcnative-classes-2.0.61.Final.jar +- io.netty-netty-buffer-4.1.108.Final.jar +- io.netty-netty-codec-4.1.108.Final.jar +- io.netty-netty-codec-dns-4.1.108.Final.jar +- io.netty-netty-codec-http-4.1.108.Final.jar +- io.netty-netty-codec-http2-4.1.108.Final.jar +- io.netty-netty-codec-socks-4.1.108.Final.jar +- io.netty-netty-codec-haproxy-4.1.108.Final.jar +- io.netty-netty-common-4.1.108.Final.jar +- io.netty-netty-handler-4.1.108.Final.jar +- io.netty-netty-handler-proxy-4.1.108.Final.jar +- io.netty-netty-resolver-4.1.108.Final.jar +- io.netty-netty-resolver-dns-4.1.108.Final.jar +- io.netty-netty-resolver-dns-classes-macos-4.1.108.Final.jar +- io.netty-netty-resolver-dns-native-macos-4.1.108.Final-osx-aarch_64.jar +- io.netty-netty-resolver-dns-native-macos-4.1.108.Final-osx-x86_64.jar +- io.netty-netty-transport-4.1.108.Final.jar +- io.netty-netty-transport-classes-epoll-4.1.108.Final.jar +- io.netty-netty-transport-native-epoll-4.1.108.Final-linux-aarch_64.jar +- io.netty-netty-transport-native-epoll-4.1.108.Final-linux-x86_64.jar +- io.netty-netty-transport-native-unix-common-4.1.108.Final.jar +- io.netty-netty-transport-native-unix-common-4.1.108.Final-linux-x86_64.jar +- io.netty-netty-tcnative-boringssl-static-2.0.65.Final.jar +- io.netty-netty-tcnative-boringssl-static-2.0.65.Final-linux-aarch_64.jar +- io.netty-netty-tcnative-boringssl-static-2.0.65.Final-linux-x86_64.jar +- io.netty-netty-tcnative-boringssl-static-2.0.65.Final-osx-aarch_64.jar +- io.netty-netty-tcnative-boringssl-static-2.0.65.Final-osx-x86_64.jar +- io.netty-netty-tcnative-boringssl-static-2.0.65.Final-windows-x86_64.jar +- io.netty-netty-tcnative-classes-2.0.65.Final.jar - io.netty.incubator-netty-incubator-transport-classes-io_uring-0.0.24.Final.jar - io.netty.incubator-netty-incubator-transport-native-io_uring-0.0.24.Final-linux-x86_64.jar -
Re: [PR] [improve][misc] Upgrade to Netty 4.1.108 and tcnative 2.0.65 [pulsar]
lhotari merged PR #22369: URL: https://github.com/apache/pulsar/pull/22369 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix][client] the nullValue in msgMetadata should be true by default [pulsar]
liangyepianzhou opened a new pull request, #22372: URL: https://github.com/apache/pulsar/pull/22372 ### Motivation When a message is not set value, the `nullValue` message metadata should be true and change to false after the value is set. Otherwise, the message data will be set as a [] when the value is not set, that would cause the message data to be encoded and throw a `SchemaSerializationException` when calling `reconsumerLater`. ``` org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4 at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1131) at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:467) at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:452) at org.apache.pulsar.client.api.ConsumerRedeliveryTest.testRedeliverMessagesWithoutValue(ConsumerRedeliveryTest.java:445) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139) at org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:677) at org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:221) at org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50) at org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:969) at org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:194) at org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:148) at org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128) at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) at org.testng.TestRunner.privateRun(TestRunner.java:829) at org.testng.TestRunner.run(TestRunner.java:602) at org.testng.SuiteRunner.runTest(SuiteRunner.java:437) at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:431) at org.testng.SuiteRunner.privateRun(SuiteRunner.java:391) at org.testng.SuiteRunner.run(SuiteRunner.java:330) at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52) at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:95) at org.testng.TestNG.runSuitesSequentially(TestNG.java:1256) at org.testng.TestNG.runSuitesLocally(TestNG.java:1176) at org.testng.TestNG.runSuites(TestNG.java:1099) at org.testng.TestNG.run(TestNG.java:1067) at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:65) at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:105) Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4 at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:462) ... 29 more Caused by: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4 at org.apache.pulsar.client.impl.schema.IntSchema.validate(IntSchema.java:49) at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:80) at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:32) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.lambda$value$3(TypedMessageBuilderImpl.java:157) at java.base/java.util.Optional.orElseGet(Optional.java:364) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:156) at org.apache.pulsar.client.impl.ConsumerImpl.doReconsumeLater(ConsumerImpl.java:689) at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.doReconsumeLater(MultiTopicsConsumerImpl.java:550) at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLaterAsync(ConsumerBase.java:574) ``` ### Modifications When a message is not set value, the `nullValue` message metadata should be true and change to false after the value is set. ### Verifying this change - [ ] Make sure that the change passes the CI
Re: [I] Build with Bazel fails on MacOS as of 0.1.1. [pulsar-client-go]
merlimat closed issue #302: Build with Bazel fails on MacOS as of 0.1.1. URL: https://github.com/apache/pulsar-client-go/issues/302 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Enable accessing Message in TableView [pulsar-client-go]
merlimat commented on code in PR #1174: URL: https://github.com/apache/pulsar-client-go/pull/1174#discussion_r1541581408 ## pulsar/table_view_impl.go: ## @@ -186,10 +203,23 @@ func (tv *TableViewImpl) Get(key string) interface{} { func (tv *TableViewImpl) Entries() map[string]interface{} { tv.dataMu.Lock() defer tv.dataMu.Unlock() + data := make(map[string]interface{}, len(tv.data)) - for k, v := range tv.data { + for k, msg := range tv.data { + v, err := tv.schemaValueFromMessage(msg) + if err != nil { + tv.logger.Errorf("getting value for message, %w; msg is %v", len(tv.listeners), err, msg) + continue + } data[k] = v } + + return data +} + +func (tv *TableViewImpl) Messages() map[string]Message { + tv.dataMu.Lock() + defer tv.dataMu.Unlock() Review Comment: This is returning the same underlying map, allowing for race-conditions and modifications from the application. Note: the existing code is also wrong because it was returning `tv.data` instead of `data` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [cleanup][ml] ManagedCursor clean up. [pulsar]
dao-jun commented on PR #22246: URL: https://github.com/apache/pulsar/pull/22246#issuecomment-2023414058 @lhotari I've replied your comment, https://github.com/apache/pulsar/commit/532b0d9063474bd1c7ae8ac7cf5bd2d56b002164#r140303950 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Incorrect usage of context.Context in the producer Send and SendAsync methods [pulsar-client-go]
merlimat commented on issue #293: URL: https://github.com/apache/pulsar-client-go/issues/293#issuecomment-2023398868 Closing issue as this was fixed long time ago. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Incorrect usage of context.Context in the producer Send and SendAsync methods [pulsar-client-go]
merlimat closed issue #293: Incorrect usage of context.Context in the producer Send and SendAsync methods URL: https://github.com/apache/pulsar-client-go/issues/293 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] ReaderOptions.MessageChannel is ignored [pulsar-client-go]
merlimat closed issue #211: ReaderOptions.MessageChannel is ignored URL: https://github.com/apache/pulsar-client-go/issues/211 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] SDK crashes on ARM [pulsar-client-go]
merlimat closed issue #250: SDK crashes on ARM URL: https://github.com/apache/pulsar-client-go/issues/250 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] ReaderOptions.MessageChannel is ignored [pulsar-client-go]
merlimat commented on issue #211: URL: https://github.com/apache/pulsar-client-go/issues/211#issuecomment-2023395939 Closing issue as this was fixed long time ago. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] SDK crashes on ARM [pulsar-client-go]
merlimat commented on issue #250: URL: https://github.com/apache/pulsar-client-go/issues/250#issuecomment-2023393701 Closing issue as this was fixed long time ago. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [cleanup][ml] ManagedCursor clean up. [pulsar]
lhotari commented on PR #22246: URL: https://github.com/apache/pulsar/pull/22246#issuecomment-2023375730 > > It seems that this was merged without more reviews > > Yes, my bad @dao-jun I'd recommend reverting the change that I have pinpointed in the comment https://github.com/apache/pulsar/commit/532b0d9063474bd1c7ae8ac7cf5bd2d56b002164#r140287388 , assuming that it creates new PositionImpl instances which were avoided before this change was made. /cc @codelipenghui -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [cleanup][ml] ManagedCursor clean up. [pulsar]
dao-jun commented on PR #22246: URL: https://github.com/apache/pulsar/pull/22246#issuecomment-2023350157 @lhotari > It seems that this was merged without more reviews Yes, my bad -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Enable Pulsar authentication for Client authentication using private_key_jwt method [pulsar]
lhotari commented on issue #22371: URL: https://github.com/apache/pulsar/issues/22371#issuecomment-2023341990 How is this request different from the current JWT token support in Pulsar? https://pulsar.apache.org/docs/3.2.x/security-jwt/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription [pulsar]
lhotari commented on PR #22359: URL: https://github.com/apache/pulsar/pull/22359#issuecomment-2023306788 > I don't agree with you for this patch. This isn't a problem, but from the Java interface definition, it's better to do like this. @Technoboy- what is it that you don't agree? I agree that it is "better". When something is "better", this implies that it produces some benefit. In this case, what is it that it makes better? The question is then whether this benefit is relevant when we are considering how Pulsar is currently maintained. I've provided my opinion about formatting changes in this mailing list post: https://lists.apache.org/thread/lo15cdzsl740dwgcqwpsl9oy9qb13onv . After we settle on a maintenance strategy that reduces merge conflicts for maintaining the LTS branch, we have more freedom to do refactorings which we are currently avoiding because of the merge conflicts. Merge conflicts caused by unnecessary changes are the main reason for my resistance. That could be resolved with an improved maintenance strategy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [D] Topic data between clusters is out of sync in puslar GEO mode [pulsar]
GitHub user david-streamlio added a comment to the discussion: Topic data between clusters is out of sync in puslar GEO mode You should use the `--clusters` switch when you [create](https://pulsar.apache.org/reference/#/3.2.x/pulsar-admin/namespaces?id=create) the namespace. GitHub link: https://github.com/apache/pulsar/discussions/22341#discussioncomment-8931135 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [PR] [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription [pulsar]
Technoboy- closed pull request #22359: [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription URL: https://github.com/apache/pulsar/pull/22359 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription [pulsar]
Technoboy- commented on PR #22359: URL: https://github.com/apache/pulsar/pull/22359#issuecomment-2023215652 > This isn't a real problem at all. In the Pulsar code base, we currently try to minimize unnecessary changes. This might change after we move to a different type of maintenance strategy, the dev mailing list discussion is https://lists.apache.org/thread/j6qvt45rndnvjypcmqxsfmddqt41bxjv . Refactorings aren't currently mentioned there explicitly. I don't agree with you for this patch. This isn't a problem, but from the Java interface definition, it's better to do like this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] Enable Pulsar authentication for Client authentication using private_key_jwt method [pulsar]
WZHMIJJ opened a new issue, #22371: URL: https://github.com/apache/pulsar/issues/22371 ### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Motivation The motivation for this enhancement stems from the need to bolster security and efficiency in Pulsar’s client authentication. The private_key_jwt method offers a secure means for client authentication, utilizing a private key and JSON Web Token (JWT). By implementing this method, Pulsar can enhance its security framework. ### Solution The proposed solution involves enabling Pulsar authentication for Client authentication using the private_key_jwt method. Related materials: For further details on the private_key_jwt method, refer to the specification outlined in https://kb.authlete.com/en/s/oauth-and-openid-connect/a/client-auth-private-key-jwt ### Alternatives While evaluating alternatives, the current OAuth2 flow with client credentials (client_id and client_secret) was noted. This in our case is not an option, since we use the flow with client_assertion_type and client_assertion. Implementing the private_key_jwt method offers a more secure and efficient alternative, reducing dependency on client_secret and providing enhanced security through private key and JWT-based authentication. ### Anything else? _No response_ ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription [pulsar]
Technoboy- commented on PR #22359: URL: https://github.com/apache/pulsar/pull/22359#issuecomment-2023217622 > Just to say that it's better to not create a lot of similar PRs since that would be a lot of changes which don't provide real value. We have a lot of technical debt with more important, but low value areas such as compiler warnings. There's a lot of compiler warnings when compiling Pulsar. We have chosen to ignore them for now. I hope this could change after we have a better maintenance strategy in place. we can close other patch that not for master branch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [fix][misc] Make ConcurrentBitSet thread safe (#22361)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new edd0076bd83 [fix][misc] Make ConcurrentBitSet thread safe (#22361) edd0076bd83 is described below commit edd0076bd83f01a5fcbe81c8396667014f0fc36e Author: Lari Hotari AuthorDate: Wed Mar 27 09:16:22 2024 -0700 [fix][misc] Make ConcurrentBitSet thread safe (#22361) --- .../common/util/collections/ConcurrentBitSet.java | 363 +++-- 1 file changed, 331 insertions(+), 32 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java index 23842fe5b55..a37628cb300 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java @@ -20,12 +20,13 @@ package org.apache.pulsar.common.util.collections; import java.util.BitSet; import java.util.concurrent.locks.StampedLock; -import lombok.EqualsAndHashCode; +import java.util.stream.IntStream; /** - * Safe multithreaded version of {@code BitSet}. + * A {@code BitSet} that is protected by a {@code StampedLock} to provide thread-safe access. + * The {@link #length()} method is not thread safe and is not overridden because StampedLock is not reentrant. + * Use the {@link #safeLength()} method to get the length of the bit set in a thread-safe manner. */ -@EqualsAndHashCode(callSuper = true) public class ConcurrentBitSet extends BitSet { private static final long serialVersionUID = 1L; @@ -39,10 +40,8 @@ public class ConcurrentBitSet extends BitSet { * Creates a bit set whose initial size is large enough to explicitly represent bits with indices in the range * {@code 0} through {@code nbits-1}. All bits are initially {@code false}. * - * @param nbits - *the initial size of the bit set - * @throws NegativeArraySizeException - * if the specified initial size is negative + * @param nbits the initial size of the bit set + * @throws NegativeArraySizeException if the specified initial size is negative */ public ConcurrentBitSet(int nbits) { super(nbits); @@ -65,105 +64,405 @@ public class ConcurrentBitSet extends BitSet { @Override public void set(int bitIndex) { +long stamp = rwLock.writeLock(); +try { +super.set(bitIndex); +} finally { +rwLock.unlockWrite(stamp); +} +} + +@Override +public void clear(int bitIndex) { +long stamp = rwLock.writeLock(); +try { +super.clear(bitIndex); +} finally { +rwLock.unlockWrite(stamp); +} +} + +@Override +public void set(int fromIndex, int toIndex) { +long stamp = rwLock.writeLock(); +try { +super.set(fromIndex, toIndex); +} finally { +rwLock.unlockWrite(stamp); +} +} + +@Override +public void clear(int fromIndex, int toIndex) { +long stamp = rwLock.writeLock(); +try { +super.clear(fromIndex, toIndex); +} finally { +rwLock.unlockWrite(stamp); +} +} + +@Override +public void clear() { +long stamp = rwLock.writeLock(); +try { +super.clear(); +} finally { +rwLock.unlockWrite(stamp); +} +} + +@Override +public int nextSetBit(int fromIndex) { long stamp = rwLock.tryOptimisticRead(); -super.set(bitIndex); +int nextSetBit = super.nextSetBit(fromIndex); if (!rwLock.validate(stamp)) { +// Fallback to read lock stamp = rwLock.readLock(); try { -super.set(bitIndex); +nextSetBit = super.nextSetBit(fromIndex); } finally { rwLock.unlockRead(stamp); } } +return nextSetBit; } @Override -public void set(int fromIndex, int toIndex) { +public int nextClearBit(int fromIndex) { long stamp = rwLock.tryOptimisticRead(); -super.set(fromIndex, toIndex); +int nextClearBit = super.nextClearBit(fromIndex); if (!rwLock.validate(stamp)) { +// Fallback to read lock stamp = rwLock.readLock(); try { -super.set(fromIndex, toIndex); +nextClearBit = super.nextClearBit(fromIndex); } finally { rwLock.unlockRead(stamp); } } +return nextClearBit; } @Override -public int nextSetBit(int fromIndex) { +public int
Re: [I] [Bug] ConcurrentBitSet is not thread safe [pulsar]
lhotari closed issue #22360: [Bug] ConcurrentBitSet is not thread safe URL: https://github.com/apache/pulsar/issues/22360 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][misc] Make ConcurrentBitSet thread safe [pulsar]
lhotari merged PR #22361: URL: https://github.com/apache/pulsar/pull/22361 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [improve][broker] Optimize web interface deleteDynamicConfiguration return error message (#22356)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new e4553391f96 [improve][broker] Optimize web interface deleteDynamicConfiguration return error message (#22356) e4553391f96 is described below commit e4553391f96af3bda3d8252b97cac3de1f39a1b5 Author: hanmz AuthorDate: Thu Mar 28 00:07:54 2024 +0800 [improve][broker] Optimize web interface deleteDynamicConfiguration return error message (#22356) --- .../src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 61b354610ac..83067e9f296 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -526,7 +526,7 @@ public class BrokersBase extends AdminResource { private CompletableFuture internalDeleteDynamicConfigurationOnMetadataAsync(String configName) { if (!pulsar().getBrokerService().isDynamicConfiguration(configName)) { -throw new RestException(Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration"); +throw new RestException(Status.PRECONDITION_FAILED, "Can't delete non-dynamic configuration"); } else { return dynamicConfigurationResources().setDynamicConfigurationAsync(old -> { if (old != null) {
Re: [PR] [improve][broker] Optimize web interface deleteDynamicConfiguration return error message [pulsar]
Technoboy- merged PR #22356: URL: https://github.com/apache/pulsar/pull/22356 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Flaky-test: ExtensibleLoadManagerImplTest.initializeState [pulsar]
heesung-sn commented on issue #20157: URL: https://github.com/apache/pulsar/issues/20157#issuecomment-2023122179 hi, let me check this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] There is a memory leak,I need help [pulsar-client-python]
merlimat commented on issue #208: URL: https://github.com/apache/pulsar-client-python/issues/208#issuecomment-2023069648 Can you try with latest version 3.4.0? https://pypi.org/project/pulsar-client/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Servlet support response compression [pulsar]
lhotari commented on PR #21667: URL: https://github.com/apache/pulsar/pull/21667#issuecomment-2023012333 Early WIP PR for improvement: #22370 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Servlet support response compression [pulsar]
lhotari commented on PR #21667: URL: https://github.com/apache/pulsar/pull/21667#issuecomment-2022902169 Btw. It looks like this change could be improved. Instead of adding multiple GzipHandler instances, it would be better to add a single instance and provide a configuration option for configuring excluded paths, just in case the compression causes problems for some cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Servlet support response compression [pulsar]
lhotari commented on PR #21667: URL: https://github.com/apache/pulsar/pull/21667#issuecomment-2022876842 Cherry-picking to branch-3.0 and branch-3.2 since the lack of compression is causing issues when metrics are enabled and there are a large amount of active topics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] HTTP 504 Gateway timeout [pulsar]
lhotari commented on issue #22324: URL: https://github.com/apache/pulsar/issues/22324#issuecomment-2022865755 Closing this issue since there's no way to reproduce this issue. The issue has been reported on an unsupported version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] HTTP 504 Gateway timeout [pulsar]
lhotari closed issue #22324: HTTP 504 Gateway timeout URL: https://github.com/apache/pulsar/issues/22324 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Cleanup] PersistentSubscription duplicate implementation interface Subscription [pulsar]
lhotari commented on issue #22354: URL: https://github.com/apache/pulsar/issues/22354#issuecomment-2022863264 ... and this is not a "Code design issue" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug] PersistentSubscription duplicate implementation interface Subscription [pulsar]
lhotari commented on issue #22354: URL: https://github.com/apache/pulsar/issues/22354#issuecomment-2022861168 This is not a bug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] No data shown in Grafana Dashboards [pulsar-helm-chart]
lhotari commented on issue #468: URL: https://github.com/apache/pulsar-helm-chart/issues/468#issuecomment-2022813523 for the broker `authenticateMetricsEndpoint` defaults to false, so it might be something else. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] No data shown in Grafana Dashboards [pulsar-helm-chart]
lhotari commented on issue #468: URL: https://github.com/apache/pulsar-helm-chart/issues/468#issuecomment-2022810871 This might be caused by the configured authentication. I guess metrics requires a token currently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.2 updated: [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (#22252)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 21e56955d5e [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (#22252) 21e56955d5e is described below commit 21e56955d5e9e804b88fbc0d599b6fb172c4f5ff Author: Lari Hotari AuthorDate: Tue Mar 12 22:26:34 2024 +0200 [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (#22252) (cherry picked from commit 43f9d2abb9d5cd788fe18da6af7ad6fbfb3bc07b) --- .../RGUsageMTAggrWaitForAllMsgsTest.java | 30 +++--- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java index 9bf7e3c5325..54c23cacc0d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java @@ -20,6 +20,10 @@ package org.apache.pulsar.broker.resourcegroup; import com.google.common.collect.Sets; import io.prometheus.client.Summary; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass; @@ -45,11 +49,6 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Map; -import java.util.concurrent.TimeUnit; - // The tests implement a set of producer/consumer operations on a set of topics. // [A thread is started for each producer, and each consumer in the test.] @@ -57,6 +56,7 @@ import java.util.concurrent.TimeUnit; // After sending/receiving all the messages, traffic usage statistics, and Prometheus-metrics // are verified on the RGs. @Slf4j +@Test(groups = "flaky") public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase { @BeforeClass @Override @@ -119,9 +119,9 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase { private final int numMesgsToProduce; private final String myProduceTopic; -private int sentNumBytes = 0; -private int sentNumMsgs = 0; -private int numExceptions = 0; +private volatile int sentNumBytes = 0; +private volatile int sentNumMsgs = 0; +private volatile int numExceptions = 0; ProduceMessages(int prodId, int nMesgs, String[] topics) { producerId = prodId; @@ -202,9 +202,9 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase { private final int recvTimeoutMilliSecs = 1000; private final int ackTimeoutMilliSecs = 1100; // has to be more than 1 second -private int recvdNumBytes = 0; -private int recvdNumMsgs = 0; -private int numExceptions = 0; +private volatile int recvdNumBytes = 0; +private volatile int recvdNumMsgs = 0; +private volatile int numExceptions = 0; private volatile boolean allMessagesReceived = false; private volatile boolean consumerIsReady = false; @@ -494,15 +494,15 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase { while (numConsumersDone < NUM_CONSUMERS) { for (int ix = 0; ix < NUM_CONSUMERS; ix++) { if (!joinedConsumers[ix]) { +consThr[ix].thread.join(); +joinedConsumers[ix] = true; +log.debug("Joined consumer={}", ix); + recvdBytes = consThr[ix].consumer.getNumBytesRecvd(); recvdMsgs = consThr[ix].consumer.getNumMessagesRecvd(); numConsumerExceptions += consThr[ix].consumer.getNumExceptions(); log.debug("Consumer={} received {} mesgs and {} bytes", ix, recvdMsgs, recvdBytes); -consThr[ix].thread.join(); -joinedConsumers[ix] = true; -log.debug("Joined consumer={}", ix); - recvdNumBytes += recvdBytes; recvdNumMsgs += recvdMsgs; numConsumersDone++;
(pulsar) branch branch-3.0 updated: [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (#22252)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 6e0ebcbaad6 [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (#22252) 6e0ebcbaad6 is described below commit 6e0ebcbaad69c3643b7fa10d3adf7b59a7f268dd Author: Lari Hotari AuthorDate: Tue Mar 12 22:26:34 2024 +0200 [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (#22252) (cherry picked from commit 43f9d2abb9d5cd788fe18da6af7ad6fbfb3bc07b) --- .../RGUsageMTAggrWaitForAllMsgsTest.java | 30 +++--- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java index 9bf7e3c5325..54c23cacc0d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java @@ -20,6 +20,10 @@ package org.apache.pulsar.broker.resourcegroup; import com.google.common.collect.Sets; import io.prometheus.client.Summary; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass; @@ -45,11 +49,6 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Map; -import java.util.concurrent.TimeUnit; - // The tests implement a set of producer/consumer operations on a set of topics. // [A thread is started for each producer, and each consumer in the test.] @@ -57,6 +56,7 @@ import java.util.concurrent.TimeUnit; // After sending/receiving all the messages, traffic usage statistics, and Prometheus-metrics // are verified on the RGs. @Slf4j +@Test(groups = "flaky") public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase { @BeforeClass @Override @@ -119,9 +119,9 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase { private final int numMesgsToProduce; private final String myProduceTopic; -private int sentNumBytes = 0; -private int sentNumMsgs = 0; -private int numExceptions = 0; +private volatile int sentNumBytes = 0; +private volatile int sentNumMsgs = 0; +private volatile int numExceptions = 0; ProduceMessages(int prodId, int nMesgs, String[] topics) { producerId = prodId; @@ -202,9 +202,9 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase { private final int recvTimeoutMilliSecs = 1000; private final int ackTimeoutMilliSecs = 1100; // has to be more than 1 second -private int recvdNumBytes = 0; -private int recvdNumMsgs = 0; -private int numExceptions = 0; +private volatile int recvdNumBytes = 0; +private volatile int recvdNumMsgs = 0; +private volatile int numExceptions = 0; private volatile boolean allMessagesReceived = false; private volatile boolean consumerIsReady = false; @@ -494,15 +494,15 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase { while (numConsumersDone < NUM_CONSUMERS) { for (int ix = 0; ix < NUM_CONSUMERS; ix++) { if (!joinedConsumers[ix]) { +consThr[ix].thread.join(); +joinedConsumers[ix] = true; +log.debug("Joined consumer={}", ix); + recvdBytes = consThr[ix].consumer.getNumBytesRecvd(); recvdMsgs = consThr[ix].consumer.getNumMessagesRecvd(); numConsumerExceptions += consThr[ix].consumer.getNumExceptions(); log.debug("Consumer={} received {} mesgs and {} bytes", ix, recvdMsgs, recvdBytes); -consThr[ix].thread.join(); -joinedConsumers[ix] = true; -log.debug("Joined consumer={}", ix); - recvdNumBytes += recvdBytes; recvdNumMsgs += recvdMsgs; numConsumersDone++;
(pulsar) 04/07: [fix] [test] Fix flaky test ManagedLedgerTest.testGetNumberOfEntriesInStorage (#22344)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 100a53b39582cb528d5dd5a99ffdfb4ae97683e9 Author: fengyubiao AuthorDate: Wed Mar 27 16:45:02 2024 +0800 [fix] [test] Fix flaky test ManagedLedgerTest.testGetNumberOfEntriesInStorage (#22344) (cherry picked from commit fc066d727b52f7e412476297995c2eb2f5ab61bf) --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 34f65dfd00e..10bfb699780 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -2636,10 +2636,10 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { managedLedger.addEntry(("entry-" + i).getBytes(Encoding)); } -//trigger ledger rollover and wait for the new ledger created -Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state"); -stateUpdater.setAccessible(true); -stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened); +// trigger ledger rollover and wait for the new ledger created +Awaitility.await().untilAsserted(() -> { + assertEquals("LedgerOpened", WhiteboxImpl.getInternalState(managedLedger, "state").toString()); +}); managedLedger.rollCurrentLedgerIfFull(); Awaitility.await().untilAsserted(() -> { assertEquals(managedLedger.getLedgersInfo().size(), 3);
(pulsar) branch branch-3.0 updated (d4c05431def -> 9c50d1801a7)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git from d4c05431def [improve][admin] Fix the `createMissingPartitions` doesn't response correctly (#22311) new bff6ea28131 [fix][client]Fixed getting an incorrect `maxMessageSize` value when accessing multiple clusters in the same process (#22306) new 22b724fd1c3 [improve][misc] Include native epoll library for Netty for arm64 (#22319) new 5e958280656 [fix][ml]Expose ledger timestamp (#22338) new 100a53b3958 [fix] [test] Fix flaky test ManagedLedgerTest.testGetNumberOfEntriesInStorage (#22344) new fe09a69fbe5 [fix][test] Fix flaky ManagedLedgerErrorsTest.recoverAfterZnodeVersionError (#22368) new bab60b1e983 [fix][broker] Fix OpReadEntry.skipCondition NPE issue (#22367) new 9c50d1801a7 [fix][client] Consumer lost message ack due to race condition in acknowledge with batch message (#22353) The 7 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: distribution/server/src/assemble/LICENSE.bin.txt | 1 + distribution/shell/src/assemble/LICENSE.bin.txt| 1 + .../mledger/impl/ManagedLedgerFactoryImpl.java | 1 + .../bookkeeper/mledger/impl/OpReadEntry.java | 2 +- .../mledger/impl/ManagedLedgerErrorsTest.java | 1 - .../mledger/impl/ManagedLedgerFactoryTest.java | 5 ++ .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 8 +-- .../client/api/SimpleProducerConsumerTest.java | 6 +- .../client/impl/ProducerMemoryLimitTest.java | 12 ++-- .../pulsar/client/impl/ProducerSemaphoreTest.java | 18 +++-- .../client/impl/AbstractBatchMessageContainer.java | 9 ++- .../client/impl/BatchMessageContainerImpl.java | 10 +-- .../org/apache/pulsar/client/impl/ClientCnx.java | 3 +- .../pulsar/client/impl/ConnectionHandler.java | 7 ++ .../apache/pulsar/client/impl/ConsumerImpl.java| 3 +- .../pulsar/client/impl/MessageIdAdvUtils.java | 19 -- .../PersistentAcknowledgmentsGroupingTracker.java | 18 +++-- .../apache/pulsar/client/impl/ProducerImpl.java| 32 + .../pulsar/client/impl/MessageIdAdvUtilsTest.java | 76 ++ pulsar-common/pom.xml | 6 ++ .../org/apache/pulsar/client/api/MessageIdAdv.java | 2 + pulsar-sql/presto-distribution/LICENSE | 1 + 22 files changed, 186 insertions(+), 55 deletions(-) create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdAdvUtilsTest.java
(pulsar) 01/07: [fix][client]Fixed getting an incorrect `maxMessageSize` value when accessing multiple clusters in the same process (#22306)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit bff6ea2813156255a06ede1f8a41cbb627486d06 Author: atomchen <492672...@qq.com> AuthorDate: Thu Mar 21 17:30:40 2024 +0800 [fix][client]Fixed getting an incorrect `maxMessageSize` value when accessing multiple clusters in the same process (#22306) Co-authored-by: atomchchen (cherry picked from commit 71598c1163730defb9fdea85e813fe863c3fe4d2) --- .../client/api/SimpleProducerConsumerTest.java | 6 ++-- .../client/impl/ProducerMemoryLimitTest.java | 12 .../pulsar/client/impl/ProducerSemaphoreTest.java | 18 ++-- .../client/impl/AbstractBatchMessageContainer.java | 9 -- .../client/impl/BatchMessageContainerImpl.java | 10 +++ .../org/apache/pulsar/client/impl/ClientCnx.java | 3 +- .../pulsar/client/impl/ConnectionHandler.java | 7 + .../apache/pulsar/client/impl/ConsumerImpl.java| 3 +- .../apache/pulsar/client/impl/ProducerImpl.java| 32 ++ 9 files changed, 60 insertions(+), 40 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 0c0e61fe33f..eb4a322f3c3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -93,13 +93,13 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.impl.ClientBuilderImpl; -import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.ConsumerBase; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.PartitionedProducerImpl; +import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; @@ -3906,11 +3906,11 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { .topic("persistent://my-property/my-ns/my-topic2"); @Cleanup -Producer producer = producerBuilder.create(); +ProducerImpl producer = (ProducerImpl)producerBuilder.create(); List> futures = new ArrayList<>(); // Asynchronously produce messages -byte[] message = new byte[ClientCnx.getMaxMessageSize() + 1]; +byte[] message = new byte[producer.getConnectionHandler().getMaxMessageSize() + 1]; for (int i = 0; i < maxPendingMessages + 10; i++) { Future future = producer.sendAsync(message); try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java index d776fdb0ed9..55a67ae644d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import io.netty.buffer.ByteBufAllocator; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; @@ -33,7 +34,6 @@ import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SizeUnit; -import org.mockito.MockedStatic; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -69,10 +69,12 @@ public class ProducerMemoryLimitTest extends ProducerConsumerBase { .create(); this.stopBroker(); try { -try (MockedStatic mockedStatic = Mockito.mockStatic(ClientCnx.class)) { -mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(8); -producer.send("memory-test".getBytes(StandardCharsets.UTF_8)); -} +ConnectionHandler connectionHandler = Mockito.spy(producer.getConnectionHandler()); +Field field = producer.getClass().getDeclaredField("connectionHandler"); +field.setAccessible(true); +
(pulsar) 03/07: [fix][ml]Expose ledger timestamp (#22338)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 5e9582806565412d1eb5ad6ef783da9e3b9444bc Author: 道君 AuthorDate: Wed Mar 27 14:08:39 2024 +0800 [fix][ml]Expose ledger timestamp (#22338) (cherry picked from commit cd49defc1383175ef32e18c7f0905567f734318c) --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java | 1 + .../org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java | 5 + 2 files changed, 6 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index ce76717942d..31896769e8e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -736,6 +736,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { ledgerInfo.ledgerId = pbLedgerInfo.getLedgerId(); ledgerInfo.entries = pbLedgerInfo.hasEntries() ? pbLedgerInfo.getEntries() : null; ledgerInfo.size = pbLedgerInfo.hasSize() ? pbLedgerInfo.getSize() : null; +ledgerInfo.timestamp = pbLedgerInfo.hasTimestamp() ? pbLedgerInfo.getTimestamp() : null; ledgerInfo.isOffloaded = pbLedgerInfo.hasOffloadContext(); if (pbLedgerInfo.hasOffloadContext()) { MLDataFormats.OffloadContext offloadContext = pbLedgerInfo.getOffloadContext(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java index 8695759c99f..4f2c3e17877 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.mledger.impl; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -60,6 +61,10 @@ public class ManagedLedgerFactoryTest extends MockedBookKeeperTestCase { assertEquals(info.ledgers.get(2).ledgerId, 5); assertEquals(info.ledgers.get(3).ledgerId, 6); +for (ManagedLedgerInfo.LedgerInfo linfo : info.ledgers) { +assertNotNull(linfo.timestamp); +} + assertEquals(info.cursors.size(), 1); CursorInfo cursorInfo = info.cursors.get("c1");
(pulsar) 02/07: [improve][misc] Include native epoll library for Netty for arm64 (#22319)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 22b724fd1c3eac463834a58102d667617451d453 Author: Lari Hotari AuthorDate: Thu Mar 21 13:23:21 2024 -0700 [improve][misc] Include native epoll library for Netty for arm64 (#22319) (cherry picked from commit 24e9437ce065613fd924a74f61b620d9fdc0058b) --- distribution/server/src/assemble/LICENSE.bin.txt | 1 + distribution/shell/src/assemble/LICENSE.bin.txt | 1 + pulsar-common/pom.xml| 6 ++ pulsar-sql/presto-distribution/LICENSE | 1 + 4 files changed, 9 insertions(+) diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 4fa4c9ea821..b3e3d096bda 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -306,6 +306,7 @@ The Apache Software License, Version 2.0 - io.netty-netty-resolver-dns-native-macos-4.1.105.Final-osx-x86_64.jar - io.netty-netty-transport-4.1.105.Final.jar - io.netty-netty-transport-classes-epoll-4.1.105.Final.jar +- io.netty-netty-transport-native-epoll-4.1.105.Final-linux-aarch_64.jar - io.netty-netty-transport-native-epoll-4.1.105.Final-linux-x86_64.jar - io.netty-netty-transport-native-unix-common-4.1.105.Final.jar - io.netty-netty-transport-native-unix-common-4.1.105.Final-linux-x86_64.jar diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt index a8b27a1fbfd..4f804ffa7c4 100644 --- a/distribution/shell/src/assemble/LICENSE.bin.txt +++ b/distribution/shell/src/assemble/LICENSE.bin.txt @@ -361,6 +361,7 @@ The Apache Software License, Version 2.0 - netty-resolver-dns-4.1.105.Final.jar - netty-transport-4.1.105.Final.jar - netty-transport-classes-epoll-4.1.105.Final.jar +- netty-transport-native-epoll-4.1.105.Final-linux-aarch_64.jar - netty-transport-native-epoll-4.1.105.Final-linux-x86_64.jar - netty-transport-native-unix-common-4.1.105.Final.jar - netty-transport-native-unix-common-4.1.105.Final-linux-x86_64.jar diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml index b3f4f5cc8ec..9bde13520e9 100644 --- a/pulsar-common/pom.xml +++ b/pulsar-common/pom.xml @@ -98,6 +98,12 @@ linux-x86_64 + + io.netty + netty-transport-native-epoll + linux-aarch_64 + + io.netty netty-transport-native-unix-common diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE index 937225f2e15..42cb170a2d1 100644 --- a/pulsar-sql/presto-distribution/LICENSE +++ b/pulsar-sql/presto-distribution/LICENSE @@ -255,6 +255,7 @@ The Apache Software License, Version 2.0 - netty-tcnative-classes-2.0.61.Final.jar - netty-transport-4.1.105.Final.jar - netty-transport-classes-epoll-4.1.105.Final.jar +- netty-transport-native-epoll-4.1.105.Final-linux-aarch_64.jar - netty-transport-native-epoll-4.1.105.Final-linux-x86_64.jar - netty-transport-native-unix-common-4.1.105.Final.jar - netty-transport-native-unix-common-4.1.105.Final-linux-x86_64.jar
(pulsar) 07/07: [fix][client] Consumer lost message ack due to race condition in acknowledge with batch message (#22353)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 9c50d1801a7c273bbf7416aec823ec7b2da59f50 Author: 萧易客 AuthorDate: Wed Mar 27 20:12:39 2024 +0800 [fix][client] Consumer lost message ack due to race condition in acknowledge with batch message (#22353) Co-authored-by: Yunze Xu Co-authored-by: 汪苏诚 (cherry picked from commit 3fa2ae83312ead38a81fe82bc06c1784e6061d6f) --- .../pulsar/client/impl/MessageIdAdvUtils.java | 19 -- .../PersistentAcknowledgmentsGroupingTracker.java | 18 +++-- .../pulsar/client/impl/MessageIdAdvUtilsTest.java | 76 ++ .../org/apache/pulsar/client/api/MessageIdAdv.java | 2 + 4 files changed, 106 insertions(+), 9 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java index a0d1446ba3d..f66bb642021 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java @@ -40,6 +40,13 @@ public class MessageIdAdvUtils { && lhs.getBatchIndex() == rhs.getBatchIndex(); } +/** + * Acknowledge batch message. + * + * @param msgId the message id + * @param individual whether to acknowledge the batch message individually + * @return true if the batch message is fully acknowledged + */ static boolean acknowledge(MessageIdAdv msgId, boolean individual) { if (!isBatch(msgId)) { return true; @@ -51,12 +58,14 @@ public class MessageIdAdvUtils { return false; } int batchIndex = msgId.getBatchIndex(); -if (individual) { -ackSet.clear(batchIndex); -} else { -ackSet.clear(0, batchIndex + 1); +synchronized (ackSet) { +if (individual) { +ackSet.clear(batchIndex); +} else { +ackSet.clear(0, batchIndex + 1); +} +return ackSet.isEmpty(); } -return ackSet.isEmpty(); } static boolean isBatch(MessageIdAdv msgId) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 0cf776aea59..c0ee13b346a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -324,8 +324,15 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments MessageIdAdvUtils.discardBatch(msgId), __ -> { final BitSet ackSet = msgId.getAckSet(); final ConcurrentBitSetRecyclable value; -if (ackSet != null && !ackSet.isEmpty()) { -value = ConcurrentBitSetRecyclable.create(ackSet); +if (ackSet != null) { +synchronized (ackSet) { +if (!ackSet.isEmpty()) { +value = ConcurrentBitSetRecyclable.create(ackSet); +} else { +value = ConcurrentBitSetRecyclable.create(); +value.set(0, msgId.getBatchSize()); +} +} } else { value = ConcurrentBitSetRecyclable.create(); value.set(0, msgId.getBatchSize()); @@ -374,8 +381,11 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments .ConnectException("Consumer connect fail! consumer state:" + consumer.getState())); } BitSetRecyclable bitSet; -if (msgId.getAckSet() != null) { -bitSet = BitSetRecyclable.valueOf(msgId.getAckSet().toLongArray()); +BitSet ackSetFromMsgId = msgId.getAckSet(); +if (ackSetFromMsgId != null) { +synchronized (ackSetFromMsgId) { +bitSet = BitSetRecyclable.valueOf(ackSetFromMsgId.toLongArray()); +} } else { bitSet = BitSetRecyclable.create(); bitSet.set(0, batchSize); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdAdvUtilsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdAdvUtilsTest.java new file mode 100644 index 000..704dfc9cbd7 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdAdvUtilsTest.java @@ -0,0 +1,76 @@ +/* + *
(pulsar) 05/07: [fix][test] Fix flaky ManagedLedgerErrorsTest.recoverAfterZnodeVersionError (#22368)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit fe09a69fbe596b91d92cf62960cc480fabaa828c Author: Lari Hotari AuthorDate: Wed Mar 27 04:10:47 2024 -0700 [fix][test] Fix flaky ManagedLedgerErrorsTest.recoverAfterZnodeVersionError (#22368) (cherry picked from commit c184209bfc5a61f143abfa701e5f1b0be2109d77) --- .../java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java index 512e90d17f5..7b2f8228ad7 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java @@ -381,7 +381,6 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase { ledger.addEntry("entry".getBytes()); fail("should fail"); } catch (ManagedLedgerFencedException e) { -assertEquals(e.getCause().getClass(), ManagedLedgerException.BadVersionException.class); // ok }