Re: [PR] [fix][client] the nullValue in msgMetadata should be true by default [pulsar]

2024-03-27 Thread via GitHub


Technoboy- commented on code in PR #22372:
URL: https://github.com/apache/pulsar/pull/22372#discussion_r1542337533


##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java:
##
@@ -144,6 +145,7 @@ public TypedMessageBuilder value(T value) {
 msgMetadata.setNullValue(true);
 return this;
 }
+msgMetadata.setNullValue(false);

Review Comment:
   Also we can do this in the `getMessage` before send.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [Improve] Add admin api HealthCheckWithTopicVersion [pulsar-client-go]

2024-03-27 Thread via GitHub


crossoverJie opened a new pull request, #1200:
URL: https://github.com/apache/pulsar-client-go/pull/1200

   
   ### Motivation
   
   To keep consistent with the Java client.
   
   Releted PR: https://github.com/apache/pulsar/pull/11268
   
   
   ### Modifications
   
   Add `HealthCheckWithTopicVersion` interface.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API: (yes)
 - The schema: (no)
 - The default values of configurations: (no)
 - The wire protocol: (no)
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (GoDocs)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a followup 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription (#22359)

2024-03-27 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 6f9c8e7f70e [fix][broker] Fix PersistentSubscription duplicate 
implementation interface Subscription (#22359)
6f9c8e7f70e is described below

commit 6f9c8e7f70ec201d65c7fc270480bed3aa3b5aba
Author: sherlock-lin <1193179...@qq.com>
AuthorDate: Thu Mar 28 11:58:47 2024 +0800

[fix][broker] Fix PersistentSubscription duplicate implementation interface 
Subscription (#22359)
---
 .../pulsar/broker/service/nonpersistent/NonPersistentSubscription.java | 3 +--
 .../pulsar/broker/service/persistent/PersistentSubscription.java   | 2 +-
 2 files changed, 2 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 92aba6221da..cfe05cc32b7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -40,7 +40,6 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFence
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.GetStatsOptions;
-import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
@@ -53,7 +52,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class NonPersistentSubscription extends AbstractSubscription implements 
Subscription {
+public class NonPersistentSubscription extends AbstractSubscription {
 private final NonPersistentTopic topic;
 private volatile NonPersistentDispatcher dispatcher;
 private final String topicName;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 50e84310ac1..6e8e94baeae 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -97,7 +97,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PersistentSubscription extends AbstractSubscription implements 
Subscription {
+public class PersistentSubscription extends AbstractSubscription {
 protected final PersistentTopic topic;
 protected final ManagedCursor cursor;
 protected volatile Dispatcher dispatcher;



Re: [PR] [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription [pulsar]

2024-03-27 Thread via GitHub


Technoboy- merged PR #22359:
URL: https://github.com/apache/pulsar/pull/22359


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Cleanup] PersistentSubscription duplicate implementation interface Subscription [pulsar]

2024-03-27 Thread via GitHub


Technoboy- closed issue #22354: [Cleanup] PersistentSubscription duplicate 
implementation interface Subscription
URL: https://github.com/apache/pulsar/issues/22354


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription [pulsar]

2024-03-27 Thread via GitHub


codecov-commenter commented on PR #22359:
URL: https://github.com/apache/pulsar/pull/22359#issuecomment-2024335935

   ## 
[Codecov](https://app.codecov.io/gh/apache/pulsar/pull/22359?dropdown=coverage=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 Report
   All modified and coverable lines are covered by tests :white_check_mark:
   > Project coverage is 73.63%. Comparing base 
[(`bbc6224`)](https://app.codecov.io/gh/apache/pulsar/commit/bbc62245c5ddba1de4b1e7cee4ab49334bc36277?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 to head 
[(`fcde506`)](https://app.codecov.io/gh/apache/pulsar/pull/22359?dropdown=coverage=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache).
   > Report is 99 commits behind head on master.
   
   Additional details and impacted files
   
   
   [![Impacted file tree 
graph](https://app.codecov.io/gh/apache/pulsar/pull/22359/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/22359?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
   
   ```diff
   @@ Coverage Diff  @@
   ## master   #22359  +/-   ##
   
   + Coverage 73.57%   73.63%   +0.06% 
   + Complexity3262432125 -499 
   
 Files  1877 1879   +2 
 Lines139502   140277 +775 
 Branches  1529915571 +272 
   
   + Hits 102638   103299 +661 
   - Misses2890828967  +59 
   - Partials   7956 8011  +55 
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/pulsar/pull/22359/flags?src=pr=flags_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[inttests](https://app.codecov.io/gh/apache/pulsar/pull/22359/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `27.28% <ø> (+2.69%)` | :arrow_up: |
   | 
[systests](https://app.codecov.io/gh/apache/pulsar/pull/22359/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `24.62% <ø> (+0.30%)` | :arrow_up: |
   | 
[unittests](https://app.codecov.io/gh/apache/pulsar/pull/22359/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `72.88% <ø> (+0.03%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   | 
[Files](https://app.codecov.io/gh/apache/pulsar/pull/22359?dropdown=coverage=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[...rvice/nonpersistent/NonPersistentSubscription.java](https://app.codecov.io/gh/apache/pulsar/pull/22359?src=pr=tree=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2Fnonpersistent%2FNonPersistentSubscription.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL25vbnBlcnNpc3RlbnQvTm9uUGVyc2lzdGVudFN1YnNjcmlwdGlvbi5qYXZh)
 | `53.27% <ø> (ø)` | |
   | 
[...ker/service/persistent/PersistentSubscription.java](https://app.codecov.io/gh/apache/pulsar/pull/22359?src=pr=tree=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2Fpersistent%2FPersistentSubscription.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvUGVyc2lzdGVudFN1YnNjcmlwdGlvbi5qYXZh)
 | `76.72% <ø> (+0.03%)` | :arrow_up: |
   
   ... and [150 files with indirect coverage 
changes](https://app.codecov.io/gh/apache/pulsar/pull/22359/indirect-changes?src=pr=tree-more_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription [pulsar]

2024-03-27 Thread via GitHub


Technoboy- commented on PR #22359:
URL: https://github.com/apache/pulsar/pull/22359#issuecomment-2024318822

   > This is my first pr, if the merge in can give me more confidence, I'll try 
to avoid mentioning low value pr taking up resources until the technical debt 
clears up.
   
   Yes, we know. Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-client-cpp) annotated tag v3.5.1-candidate-1 updated (0ba3d7d -> ea2729a)

2024-03-27 Thread xyz
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a change to annotated tag v3.5.1-candidate-1
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


*** WARNING: tag v3.5.1-candidate-1 was modified! ***

from 0ba3d7d  (commit)
  to ea2729a  (tag)
 tagging 0ba3d7d8e9f7d3fb2f2cc3cdf918bb63c4791afc (commit)
 replaces v3.5.0
  by Yunze Xu
  on Thu Mar 28 10:50:13 2024 +0800

- Log -
Release v3.5.1-candidate-1
-BEGIN PGP SIGNATURE-

iQJDBAABCgAtFiEEn+m0+KLf1EiRy6J0Qrtq+2zSb6YFAmYE2uYPHHh5ekBhcGFj
aGUub3JnAAoJEEK7avts0m+m36gP/1AjcqllVVeCx8cFtCSqRrOB5Uw+MZOk6UDY
1McU/ri08XdMEpv770ALUz8oWB1C0MTCuJzktsvFP+ju/QtMGvBilXLeMGxNsz6v
vPl4OV/a3O1axYDy8pRtC88E4xFB4wqXNfBdZ+lgIOfyrNvS9aYYhxBTrvyplDWb
DYfnQpBsA38wbMdA+S3OnQhxKjSzcvO1u6avhNmtZUF1poR85ddewxfNGAsCY1cR
qG/QW0hImIWP0fhpwMgalT5+3BfJKDDFG+m2xk2N5MA2Wc9Kb4/UEURLWzyL0lGH
scLZ9HujYPsbsXv89St0qB0ZNI+J9lFlxnzWd/0gWnK1KcX2mKr5evXAQ0lBYRgS
I0LIw3H4cXoWRGOxbrd/jxW1p5tKYvrsZsXhDHgwct6W0YNwFDSrfiWLM0v6kwKA
x8vJUlQnUF0/CXg5Shf8m0BbpL2TQA6AwburHYTshB0jKEtSbmMRJAdrlD1gRc8Y
qIE5V+tQnFTEvL948WxugcyMOhXpOKGu3Z1EnDOedAohDR9Zhds34+M322TGZgdt
G+N97geORkkBdS5pTEvdtg/4lEgasBddphCS2EFPvK1q5kNO/6Oc+oLKmzpoGe9B
9Z1i6ll6cYApsdkHRzrSzuspdtc/gC7NuA+zqb977jJkeDjn21d8Hvm+XtFdXX7t
M2wbz/mq
=oWoD
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



Re: [PR] [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription [pulsar]

2024-03-27 Thread via GitHub


sherlock-lin commented on PR #22359:
URL: https://github.com/apache/pulsar/pull/22359#issuecomment-2024304577

   This is my first pr, if the merge in can give me more confidence, I'll try 
to avoid mentioning low value pr taking up resources until the technical debt 
clears up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-client-cpp) branch branch-3.5 updated (916af95 -> 0ba3d7d)

2024-03-27 Thread xyz
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a change to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


from 916af95  Merge branch 'main' into branch-3.5
 new e7793d6  Support customize vcpkg directory when INTEGRATE_VCPKG is ON 
(#417)
 new 0e0ca8d  Fix broken wireshark build workflow on macOS (#414)
 new 7cfd775  Fix minor issues reported by CodeQL (#421)
 new b2ad352  Fix wrong results of hasMessageAvailable and readNext after 
seeking by timestamp (#422)
 new 0ba3d7d  Release 3.5.1

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/ci-pr-validation.yaml | 10 
 CMakeLists.txt  |  4 +-
 README.md   | 13 +
 lib/ConsumerImpl.cc | 41 +---
 lib/ConsumerImpl.h  | 15 +-
 lib/lz4/lz4.cc  |  6 +--
 perf/PerfProducer.cc|  2 +-
 tests/ReaderTest.cc | 84 ++---
 version.txt |  2 +-
 9 files changed, 135 insertions(+), 42 deletions(-)



(pulsar-client-cpp) 04/05: Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp (#422)

2024-03-27 Thread xyz
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit b2ad3525eb762de5b92cc2af92cbe2e02c3e90cd
Author: Yunze Xu 
AuthorDate: Thu Mar 28 10:36:12 2024 +0800

Fix wrong results of hasMessageAvailable and readNext after seeking by 
timestamp (#422)

Fixes https://github.com/apache/pulsar-client-cpp/issues/420

It's a catch-up for https://github.com/apache/pulsar/pull/22363

(cherry picked from commit 27d8cc01d5b7cfce94a7d71259dca5cea83ae01e)
---
 lib/ConsumerImpl.cc | 41 ++
 lib/ConsumerImpl.h  | 15 +-
 tests/ReaderTest.cc | 84 ++---
 3 files changed, 104 insertions(+), 36 deletions(-)

diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index ebc8518..1a0b0cb 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -1050,7 +1050,9 @@ void ConsumerImpl::messageProcessed(Message& msg, bool 
track) {
  */
 void ConsumerImpl::clearReceiveQueue() {
 if (duringSeek()) {
-startMessageId_ = seekMessageId_.get();
+if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
+startMessageId_ = seekMessageId_.get();
+}
 SeekStatus expected = SeekStatus::COMPLETED;
 if (seekStatus_.compare_exchange_strong(expected, 
SeekStatus::NOT_STARTED)) {
 auto seekCallback = seekCallback_.release();
@@ -1476,7 +1478,7 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, 
ResultCallback callback) {
 return;
 }
 const auto requestId = client->newRequestId();
-seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, 
msgId), msgId, 0L, callback);
+seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, 
msgId), SeekArg{msgId}, callback);
 }
 
 void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
@@ -1495,8 +1497,8 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, 
ResultCallback callback) {
 return;
 }
 const auto requestId = client->newRequestId();
-seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, 
timestamp), MessageId::earliest(),
-  timestamp, callback);
+seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, 
timestamp), SeekArg{timestamp},
+  callback);
 }
 
 bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
@@ -1509,7 +1511,7 @@ void 
ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
 (lastDequedMessageId_ == MessageId::earliest()) &&
 (startMessageId_.get().value_or(MessageId::earliest()) == 
MessageId::latest());
 }
-if (compareMarkDeletePosition) {
+if (compareMarkDeletePosition || 
hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
 auto self = get_shared_this_ptr();
 getLastMessageIdAsync([self, callback](Result result, const 
GetLastMessageIdResponse& response) {
 if (result != ResultOk) {
@@ -1518,8 +1520,8 @@ void 
ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
 }
 auto handleResponse = [self, response, callback] {
 if (response.hasMarkDeletePosition() && 
response.getLastMessageId().entryId() >= 0) {
-// We only care about comparing ledger ids and entry ids 
as mark delete position doesn't
-// have other ids such as batch index
+// We only care about comparing ledger ids and entry ids 
as mark delete position
+// doesn't have other ids such as batch index
 auto compareResult = 
compareLedgerAndEntryId(response.getMarkDeletePosition(),
  
response.getLastMessageId());
 callback(ResultOk, 
self->config_.isStartMessageIdInclusive() ? compareResult <= 0
@@ -1528,7 +1530,8 @@ void 
ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
 callback(ResultOk, false);
 }
 };
-if (self->config_.isStartMessageIdInclusive()) {
+if (self->config_.isStartMessageIdInclusive() &&
+!self->hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
 self->seekAsync(response.getLastMessageId(), [callback, 
handleResponse](Result result) {
 if (result != ResultOk) {
 callback(result, {});
@@ -1644,8 +1647,8 @@ bool ConsumerImpl::isConnected() const { return 
!getCnx().expired() && state_ ==
 
 uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 
1 : 0; }
 
-void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const 
MessageId& seekId,
- long 

(pulsar-client-cpp) 03/05: Fix minor issues reported by CodeQL (#421)

2024-03-27 Thread xyz
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit 7cfd77506c83f259c69b440082d3b558b60695e4
Author: Matteo Merli 
AuthorDate: Wed Mar 27 02:47:42 2024 -0700

Fix minor issues reported by CodeQL (#421)

(cherry picked from commit 763b85c6c4b9bb648b9f7cf62f9ed09f04f3decb)
---
 lib/lz4/lz4.cc   | 6 +++---
 perf/PerfProducer.cc | 2 +-
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/lib/lz4/lz4.cc b/lib/lz4/lz4.cc
index d63b977..2f98fb3 100644
--- a/lib/lz4/lz4.cc
+++ b/lib/lz4/lz4.cc
@@ -1175,9 +1175,9 @@ FORCE_INLINE int LZ4_decompress_generic(
 s = *ip++;
 length += s;
 } while (likely((endOnInput) ? ip < iend - RUN_MASK : 1) && (s == 
255));
-if ((safeDecode) && unlikely((size_t)(op + length) < (size_t)(op)))
+if ((safeDecode) && unlikely(length >= (size_t)(oend - op)))
 goto _output_error; /* overflow detection */
-if ((safeDecode) && unlikely((size_t)(ip + length) < (size_t)(ip)))
+if ((safeDecode) && unlikely(length >= (size_t)(iend - ip)))
 goto _output_error; /* overflow detection */
 }
 
@@ -1220,7 +1220,7 @@ FORCE_INLINE int LZ4_decompress_generic(
 s = *ip++;
 length += s;
 } while (s == 255);
-if ((safeDecode) && unlikely((size_t)(op + length) < (size_t)op))
+if ((safeDecode) && unlikely(length >= (size_t)(oend - op)))
 goto _output_error; /* overflow detection */
 }
 length += MINMATCH;
diff --git a/perf/PerfProducer.cc b/perf/PerfProducer.cc
index aeda8e8..cbfef68 100644
--- a/perf/PerfProducer.cc
+++ b/perf/PerfProducer.cc
@@ -160,7 +160,7 @@ void startPerfProducer(const Arguments& args, 
pulsar::ProducerConfiguration& pro
 limiter = std::make_shared(args.rate);
 }
 
-producerList.resize(args.numTopics * args.numProducers);
+producerList.resize((size_t)args.numTopics * args.numProducers);
 for (int i = 0; i < args.numTopics; i++) {
 std::string topic = (args.numTopics == 1) ? args.topic : args.topic + 
"-" + std::to_string(i);
 LOG_INFO("Adding " << args.numProducers << " producers on topic " << 
topic);



(pulsar-client-cpp) 01/05: Support customize vcpkg directory when INTEGRATE_VCPKG is ON (#417)

2024-03-27 Thread xyz
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit e7793d6752a6bbfd5f526ecf207bd3738179f29e
Author: Yunze Xu 
AuthorDate: Thu Mar 14 20:11:26 2024 +0800

Support customize vcpkg directory when INTEGRATE_VCPKG is ON (#417)

### Motivation

Currently when INTEGRATE_VCPKG is ON, the CMAKE_TOOLCHAIN_FILE variable
is always a subdirectory of `${CMAKE_SOURCE_DIR}/vcpkg`. We can only
customize the vcpkg directory when INTEGRATE_VCPKG is OFF, while the
legacy CMake logic is incompatible with this way.

### Modifications

When INTEGRATE_VCPKG is ON, only set CMAKE_TOOLCHAIN_FILE if it's not
defined. The workflow and README are updated for it.

(cherry picked from commit 821871777e247e1ccbfa323ea0d5136cf0e18711)
---
 .github/workflows/ci-pr-validation.yaml |  5 +
 CMakeLists.txt  |  4 +++-
 README.md   | 13 +
 3 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/ci-pr-validation.yaml 
b/.github/workflows/ci-pr-validation.yaml
index d9dc2ca..5010411 100644
--- a/.github/workflows/ci-pr-validation.yaml
+++ b/.github/workflows/ci-pr-validation.yaml
@@ -114,6 +114,11 @@ jobs:
   cmake . -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON -DBUILD_PERF_TOOLS=ON
   cmake --build . -j8
 
+  - name: Verify custom vcpkg installation
+run: |
+  mv vcpkg /tmp/
+  cmake -B build -DINTEGRATE_VCPKG=ON 
-DCMAKE_TOOLCHAIN_FILE="/tmp/vcpkg/scripts/buildsystems/vcpkg.cmake"
+
   cpp20-build:
 name: Build with the C++20 standard
 needs: formatting-check
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 662e84a..b004653 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -24,7 +24,9 @@ option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)
 option(INTEGRATE_VCPKG "Integrate with Vcpkg" OFF)
 if (INTEGRATE_VCPKG)
 set(USE_ASIO ON)
-set(CMAKE_TOOLCHAIN_FILE 
"${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake")
+if (NOT CMAKE_TOOLCHAIN_FILE)
+set(CMAKE_TOOLCHAIN_FILE 
"${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake")
+endif ()
 endif ()
 
 option(BUILD_TESTS "Build tests" ON)
diff --git a/README.md b/README.md
index f0c13f0..4c86b63 100644
--- a/README.md
+++ b/README.md
@@ -47,6 +47,8 @@ Since it's integrated with vcpkg, see 
[vcpkg#README](https://github.com/microsof
 
 ### How to build from source
 
+The simplest way is to clone this project with the vcpkg submodule.
+
 ```bash
 git clone https://github.com/apache/pulsar-client-cpp.git
 cd pulsar-client-cpp
@@ -57,6 +59,17 @@ cmake --build build -j8
 
 The 1st step will download vcpkg and then install all dependencies according 
to the version description in [vcpkg.json](./vcpkg.json). The 2nd step will 
build the Pulsar C++ libraries under `./build/lib/`, where `./build` is the 
CMake build directory.
 
+> You can also add the CMAKE_TOOLCHAIN_FILE option if your system already have 
vcpkg installed.
+>
+> ```bash
+> git clone https://github.com/apache/pulsar-client-cpp.git
+> cd pulsar-client-cpp
+> # For example, you can install vcpkg in /tmp/vcpkg
+> cd /tmp && git clone https://github.com/microsoft/vcpkg.git && cd -
+> cmake -B build -DINTEGRATE_VCPKG=ON 
-DCMAKE_TOOLCHAIN_FILE="/tmp/vcpkg/scripts/buildsystems/vcpkg.cmake"
+> cmake --build build -j8
+> ```
+
 After the build, the hierarchy of the `build` directory will be:
 
 ```



(pulsar-client-cpp) 05/05: Release 3.5.1

2024-03-27 Thread xyz
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit 0ba3d7d8e9f7d3fb2f2cc3cdf918bb63c4791afc
Author: Yunze Xu 
AuthorDate: Thu Mar 28 10:45:07 2024 +0800

Release 3.5.1
---
 version.txt | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/version.txt b/version.txt
index 1545d96..d5c0c99 100644
--- a/version.txt
+++ b/version.txt
@@ -1 +1 @@
-3.5.0
+3.5.1



(pulsar-client-cpp) 02/05: Fix broken wireshark build workflow on macOS (#414)

2024-03-27 Thread xyz
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit 0e0ca8da3e7cabf76adb5d5ae30283f909e0aaa3
Author: Yunze Xu 
AuthorDate: Sun Mar 17 23:10:15 2024 +0800

Fix broken wireshark build workflow on macOS (#414)

### Motivation

See 
https://github.com/apache/pulsar-client-cpp/actions/runs/8276076995/job/22644077705

```
Error: The `brew link` step did not complete successfully
The formula built, but is not symlinked into /usr/local
Could not symlink bin/2to3
```

`brew install` failed because the wireshark dependency depends on
python@3.12 and it failed at `brew link`.

### Modifications

Remove the existing binaries that might conflict.

(cherry picked from commit c513f29fadce86bacaf4f878d8d25064f7560083)
---
 .github/workflows/ci-pr-validation.yaml | 5 +
 1 file changed, 5 insertions(+)

diff --git a/.github/workflows/ci-pr-validation.yaml 
b/.github/workflows/ci-pr-validation.yaml
index 5010411..9f113b0 100644
--- a/.github/workflows/ci-pr-validation.yaml
+++ b/.github/workflows/ci-pr-validation.yaml
@@ -64,6 +64,11 @@ jobs:
 run: |
   # See https://github.com/Homebrew/homebrew-core/issues/157142
   export HOMEBREW_NO_INSTALLED_DEPENDENTS_CHECK=1
+  cd /usr/local/bin
+  rm -f 2to3* idle3* pydoc* python3*
+  rm -f /usr/local/share/man/man1/python3.1 
/usr/local/lib/pkgconfig/python3*
+  cd /usr/local/Frameworks/Python.framework
+  rm -rf Headers Python Resources Versions/Current
   brew update
   brew install pkg-config wireshark protobuf
   - name: Build wireshark plugin



Re: [PR] [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription [pulsar]

2024-03-27 Thread via GitHub


Technoboy- commented on PR #22359:
URL: https://github.com/apache/pulsar/pull/22359#issuecomment-2024299728

   > > I don't agree with you for this patch. This isn't a problem, but from 
the Java interface definition, it's better to do like this.
   > 
   > @Technoboy- what is it that you don't agree? I agree that it is "better". 
When something is "better", this implies that it produces some benefit. In this 
case, what is it that it makes better? The question is then whether this 
benefit is relevant when we are considering how Pulsar is currently maintained. 
I've provided my opinion about formatting changes in this mailing list post: 
https://lists.apache.org/thread/lo15cdzsl740dwgcqwpsl9oy9qb13onv . After we 
settle on a maintenance strategy that reduces merge conflicts for maintaining 
the LTS branch, we have more freedom to do refactorings which we are currently 
avoiding because of the merge conflicts. Merge conflicts caused by unnecessary 
changes are the main reason for my resistance. That could be resolved with an 
improved maintenance strategy.
   
   #22295 don't have any better, but just no `anything harmful.` And you merge 
it. From my side, you merge the pr you create, but others give some suggestion 
and leave the pr there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-client-cpp) branch main updated: Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp (#422)

2024-03-27 Thread xyz
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
 new 27d8cc0  Fix wrong results of hasMessageAvailable and readNext after 
seeking by timestamp (#422)
27d8cc0 is described below

commit 27d8cc01d5b7cfce94a7d71259dca5cea83ae01e
Author: Yunze Xu 
AuthorDate: Thu Mar 28 10:36:12 2024 +0800

Fix wrong results of hasMessageAvailable and readNext after seeking by 
timestamp (#422)

Fixes https://github.com/apache/pulsar-client-cpp/issues/420

It's a catch-up for https://github.com/apache/pulsar/pull/22363
---
 lib/ConsumerImpl.cc | 41 ++
 lib/ConsumerImpl.h  | 15 +-
 tests/ReaderTest.cc | 84 ++---
 3 files changed, 104 insertions(+), 36 deletions(-)

diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 7d5250e..e5df421 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -1050,7 +1050,9 @@ void ConsumerImpl::messageProcessed(Message& msg, bool 
track) {
  */
 void ConsumerImpl::clearReceiveQueue() {
 if (duringSeek()) {
-startMessageId_ = seekMessageId_.get();
+if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
+startMessageId_ = seekMessageId_.get();
+}
 SeekStatus expected = SeekStatus::COMPLETED;
 if (seekStatus_.compare_exchange_strong(expected, 
SeekStatus::NOT_STARTED)) {
 auto seekCallback = seekCallback_.release();
@@ -1476,7 +1478,7 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, 
ResultCallback callback) {
 return;
 }
 const auto requestId = client->newRequestId();
-seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, 
msgId), msgId, 0L, callback);
+seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, 
msgId), SeekArg{msgId}, callback);
 }
 
 void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
@@ -1495,8 +1497,8 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, 
ResultCallback callback) {
 return;
 }
 const auto requestId = client->newRequestId();
-seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, 
timestamp), MessageId::earliest(),
-  timestamp, callback);
+seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, 
timestamp), SeekArg{timestamp},
+  callback);
 }
 
 bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
@@ -1509,7 +1511,7 @@ void 
ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
 (lastDequedMessageId_ == MessageId::earliest()) &&
 (startMessageId_.get().value_or(MessageId::earliest()) == 
MessageId::latest());
 }
-if (compareMarkDeletePosition) {
+if (compareMarkDeletePosition || 
hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
 auto self = get_shared_this_ptr();
 getLastMessageIdAsync([self, callback](Result result, const 
GetLastMessageIdResponse& response) {
 if (result != ResultOk) {
@@ -1518,8 +1520,8 @@ void 
ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
 }
 auto handleResponse = [self, response, callback] {
 if (response.hasMarkDeletePosition() && 
response.getLastMessageId().entryId() >= 0) {
-// We only care about comparing ledger ids and entry ids 
as mark delete position doesn't
-// have other ids such as batch index
+// We only care about comparing ledger ids and entry ids 
as mark delete position
+// doesn't have other ids such as batch index
 auto compareResult = 
compareLedgerAndEntryId(response.getMarkDeletePosition(),
  
response.getLastMessageId());
 callback(ResultOk, 
self->config_.isStartMessageIdInclusive() ? compareResult <= 0
@@ -1528,7 +1530,8 @@ void 
ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
 callback(ResultOk, false);
 }
 };
-if (self->config_.isStartMessageIdInclusive()) {
+if (self->config_.isStartMessageIdInclusive() &&
+!self->hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
 self->seekAsync(response.getLastMessageId(), [callback, 
handleResponse](Result result) {
 if (result != ResultOk) {
 callback(result, {});
@@ -1644,8 +1647,8 @@ bool ConsumerImpl::isConnected() const { return 
!getCnx().expired() && state_ ==
 
 uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 
1 : 0; }
 
-void 

Re: [I] [Bug] Regression of seeking by timestamp in 3.5.0 [pulsar-client-cpp]

2024-03-27 Thread via GitHub


BewareMyPower closed issue #420: [Bug] Regression of seeking by timestamp in 
3.5.0
URL: https://github.com/apache/pulsar-client-cpp/issues/420


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp [pulsar-client-cpp]

2024-03-27 Thread via GitHub


BewareMyPower merged PR #422:
URL: https://github.com/apache/pulsar-client-cpp/pull/422


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [fix][broker] Fix typos in PersistentTopic class (#22364)

2024-03-27 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 32037c3b098 [fix][broker] Fix typos in PersistentTopic class (#22364)
32037c3b098 is described below

commit 32037c3b0982aa00a7cb5ee7e17a6b235a8c2d7f
Author: hanmz 
AuthorDate: Thu Mar 28 10:31:09 2024 +0800

[fix][broker] Fix typos in PersistentTopic class (#22364)
---
 .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6179e73169f..1650e449a3f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2753,7 +2753,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 ledger.asyncMigrate();
 }
 if (log.isDebugEnabled()) {
-log.debug("{} has replication backlog and applied 
migraiton", topic);
+log.debug("{} has replication backlog and applied 
migration", topic);
 }
 return CompletableFuture.completedFuture(null);
 }



Re: [PR] [fix][broker] Fix typos in PersistentTopic class [pulsar]

2024-03-27 Thread via GitHub


Technoboy- merged PR #22364:
URL: https://github.com/apache/pulsar/pull/22364


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] There is a memory leak,I need help [pulsar-client-python]

2024-03-27 Thread via GitHub


kansnow closed issue #208: There is a memory leak,I need help
URL: https://github.com/apache/pulsar-client-python/issues/208


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] There is a memory leak,I need help [pulsar-client-python]

2024-03-27 Thread via GitHub


kansnow commented on issue #208:
URL: 
https://github.com/apache/pulsar-client-python/issues/208#issuecomment-2024274127

   > Can you try with latest version 3.4.0? 
https://pypi.org/project/pulsar-client/
   
   Thank you very much. I used the top command to check the memory usage of the 
process. After upgrading the version, the memory did not grow anymore


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] There is a memory leak,I need help [pulsar-client-python]

2024-03-27 Thread via GitHub


kansnow commented on issue #208:
URL: 
https://github.com/apache/pulsar-client-python/issues/208#issuecomment-2024238645

   > Can you try with latest version 3.4.0? 
https://pypi.org/project/pulsar-client/
   
   ok, I'll try upgrading the version


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) 01/02: [fix][broker] Check cursor state before adding it to the `waitingCursors` (#22191)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 622c87d1e82163c8b42281c0c7a40e3ad79dc0df
Author: Jiwei Guo 
AuthorDate: Thu Mar 28 06:53:21 2024 +0800

[fix][broker] Check cursor state before adding it to the `waitingCursors` 
(#22191)

(cherry picked from commit b702d440dc5e5a4cfd845bf60d5e310efe665ff5)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 +
 .../service/persistent/PersistentTopicTest.java| 46 ++
 3 files changed, 57 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index da013c07313..0da71e89018 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -987,7 +987,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 name);
 }
 // Let the managed ledger know we want to be notified whenever 
a new entry is published
-ledger.waitingCursors.add(this);
+ledger.addWaitingCursor(this);
 } else {
 if (log.isDebugEnabled()) {
 log.debug("[{}] [{}] Skip notification registering since 
we do have entries available",
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 75ac4dd4c0a..ee8e7c430ef 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3803,6 +3803,16 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 this.waitingCursors.remove(cursor);
 }
 
+public void addWaitingCursor(ManagedCursorImpl cursor) {
+if (cursor instanceof NonDurableCursorImpl) {
+if (cursor.isActive()) {
+this.waitingCursors.add(cursor);
+}
+} else {
+this.waitingCursors.add(cursor);
+}
+}
+
 public boolean isCursorActive(ManagedCursor cursor) {
 return activeCursors.get(cursor.getName()) != null;
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 717dfc28ac8..618fe8006a6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -57,10 +57,15 @@ import java.util.function.Supplier;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.TopicPoliciesService;
 import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
@@ -74,6 +79,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
@@ -661,4 +667,44 @@ public class PersistentTopicTest extends BrokerTestBase {
 subscribe.close();
 admin.topics().delete(topicName);
 }
+
+@Test
+public void testAddWaitingCursorsForNonDurable() throws Exception {
+final String ns = "prop/ns-test";
+admin.namespaces().createNamespace(ns, 2);
+final String topicName = 
"persistent://prop/ns-test/testAddWaitingCursors";
+admin.topics().createNonPartitionedTopic(topicName);
+final Optional topic = 
pulsar.getBrokerService().getTopic(topicName, false).join();
+  

(pulsar) branch branch-3.2 updated (f56c383a531 -> d9068698da4)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from f56c383a531 [fix][broker] Avoid expired unclosed ledgers when checking 
expired messages by ledger closure time (#22335)
 new 622c87d1e82 [fix][broker] Check cursor state before adding it to the 
`waitingCursors` (#22191)
 new d9068698da4 [improve][misc] Remove the call to sun 
InetAddressCachePolicy (#22329)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 +
 .../service/persistent/PersistentTopicTest.java| 46 +++
 .../pulsar/common/util/netty/DnsResolverUtil.java  | 52 --
 .../pulsar/common/util/netty/DnsResolverTest.java  | 44 ++
 5 files changed, 139 insertions(+), 15 deletions(-)



(pulsar) 02/02: [improve][misc] Remove the call to sun InetAddressCachePolicy (#22329)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d9068698da48f826cea2c8d5075ce95497cecc94
Author: Jian Yun 
AuthorDate: Thu Mar 28 06:59:28 2024 +0800

[improve][misc] Remove the call to sun InetAddressCachePolicy (#22329)

Co-authored-by: Lari Hotari 
(cherry picked from commit cce0b058efd55e2d5ac42c4ecaceddacee648a7c)
---
 .../pulsar/common/util/netty/DnsResolverUtil.java  | 52 --
 .../pulsar/common/util/netty/DnsResolverTest.java  | 44 ++
 2 files changed, 82 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
index f49a6453c72..bcff83acd94 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
@@ -19,12 +19,20 @@
 package org.apache.pulsar.common.util.netty;
 
 import io.netty.resolver.dns.DnsNameResolverBuilder;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
+import java.security.Security;
+import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class DnsResolverUtil {
+
+private static final String CACHE_POLICY_PROP = "networkaddress.cache.ttl";
+private static final String CACHE_POLICY_PROP_FALLBACK = 
"sun.net.inetaddr.ttl";
+private static final String NEGATIVE_CACHE_POLICY_PROP = 
"networkaddress.cache.negative.ttl";
+private static final String NEGATIVE_CACHE_POLICY_PROP_FALLBACK = 
"sun.net.inetaddr.negative.ttl";
+/* default ttl value from sun.net.InetAddressCachePolicy.DEFAULT_POSITIVE, 
which is used when no security manager
+ is used */
+private static final int JDK_DEFAULT_TTL = 30;
 private static final int MIN_TTL = 0;
 private static final int TTL;
 private static final int NEGATIVE_TTL;
@@ -39,19 +47,35 @@ public class DnsResolverUtil {
 int ttl = DEFAULT_TTL;
 int negativeTtl = DEFAULT_NEGATIVE_TTL;
 try {
-// use reflection to call sun.net.InetAddressCachePolicy's get and 
getNegative methods for getting
-// effective JDK settings for DNS caching
-Class inetAddressCachePolicyClass = 
Class.forName("sun.net.InetAddressCachePolicy");
-Method getTTLMethod = inetAddressCachePolicyClass.getMethod("get");
-ttl = (Integer) getTTLMethod.invoke(null);
-Method getNegativeTTLMethod = 
inetAddressCachePolicyClass.getMethod("getNegative");
-negativeTtl = (Integer) getNegativeTTLMethod.invoke(null);
-} catch (NoSuchMethodException | ClassNotFoundException | 
InvocationTargetException
- | IllegalAccessException e) {
-log.warn("Cannot get DNS TTL settings from 
sun.net.InetAddressCachePolicy class", e);
+String ttlStr = Security.getProperty(CACHE_POLICY_PROP);
+if (ttlStr == null) {
+// Compatible with sun.net.inetaddr.ttl settings
+ttlStr = System.getProperty(CACHE_POLICY_PROP_FALLBACK);
+}
+String negativeTtlStr = 
Security.getProperty(NEGATIVE_CACHE_POLICY_PROP);
+if (negativeTtlStr == null) {
+// Compatible with sun.net.inetaddr.negative.ttl settings
+negativeTtlStr = 
System.getProperty(NEGATIVE_CACHE_POLICY_PROP_FALLBACK);
+}
+ttl = Optional.ofNullable(ttlStr)
+.map(Integer::decode)
+.filter(i -> i > 0)
+.orElseGet(() -> {
+if (System.getSecurityManager() == null) {
+return JDK_DEFAULT_TTL;
+}
+return DEFAULT_TTL;
+});
+
+negativeTtl = Optional.ofNullable(negativeTtlStr)
+.map(Integer::decode)
+.filter(i -> i >= 0)
+.orElse(DEFAULT_NEGATIVE_TTL);
+} catch (NumberFormatException e) {
+log.warn("Cannot get DNS TTL settings", e);
 }
-TTL = ttl <= 0 ? DEFAULT_TTL : ttl;
-NEGATIVE_TTL = negativeTtl < 0 ? DEFAULT_NEGATIVE_TTL : negativeTtl;
+TTL = ttl;
+NEGATIVE_TTL = negativeTtl;
 }
 
 private DnsResolverUtil() {
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
index 0ccb960e798..46599cc45a0 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java

(pulsar) 02/02: [improve][misc] Remove the call to sun InetAddressCachePolicy (#22329)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1b9ae2e8cb90478d03c581250e28b633aa960957
Author: Jian Yun 
AuthorDate: Thu Mar 28 06:59:28 2024 +0800

[improve][misc] Remove the call to sun InetAddressCachePolicy (#22329)

Co-authored-by: Lari Hotari 
(cherry picked from commit cce0b058efd55e2d5ac42c4ecaceddacee648a7c)
---
 .../pulsar/common/util/netty/DnsResolverUtil.java  | 52 --
 .../pulsar/common/util/netty/DnsResolverTest.java  | 44 ++
 2 files changed, 82 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
index f49a6453c72..bcff83acd94 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
@@ -19,12 +19,20 @@
 package org.apache.pulsar.common.util.netty;
 
 import io.netty.resolver.dns.DnsNameResolverBuilder;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
+import java.security.Security;
+import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class DnsResolverUtil {
+
+private static final String CACHE_POLICY_PROP = "networkaddress.cache.ttl";
+private static final String CACHE_POLICY_PROP_FALLBACK = 
"sun.net.inetaddr.ttl";
+private static final String NEGATIVE_CACHE_POLICY_PROP = 
"networkaddress.cache.negative.ttl";
+private static final String NEGATIVE_CACHE_POLICY_PROP_FALLBACK = 
"sun.net.inetaddr.negative.ttl";
+/* default ttl value from sun.net.InetAddressCachePolicy.DEFAULT_POSITIVE, 
which is used when no security manager
+ is used */
+private static final int JDK_DEFAULT_TTL = 30;
 private static final int MIN_TTL = 0;
 private static final int TTL;
 private static final int NEGATIVE_TTL;
@@ -39,19 +47,35 @@ public class DnsResolverUtil {
 int ttl = DEFAULT_TTL;
 int negativeTtl = DEFAULT_NEGATIVE_TTL;
 try {
-// use reflection to call sun.net.InetAddressCachePolicy's get and 
getNegative methods for getting
-// effective JDK settings for DNS caching
-Class inetAddressCachePolicyClass = 
Class.forName("sun.net.InetAddressCachePolicy");
-Method getTTLMethod = inetAddressCachePolicyClass.getMethod("get");
-ttl = (Integer) getTTLMethod.invoke(null);
-Method getNegativeTTLMethod = 
inetAddressCachePolicyClass.getMethod("getNegative");
-negativeTtl = (Integer) getNegativeTTLMethod.invoke(null);
-} catch (NoSuchMethodException | ClassNotFoundException | 
InvocationTargetException
- | IllegalAccessException e) {
-log.warn("Cannot get DNS TTL settings from 
sun.net.InetAddressCachePolicy class", e);
+String ttlStr = Security.getProperty(CACHE_POLICY_PROP);
+if (ttlStr == null) {
+// Compatible with sun.net.inetaddr.ttl settings
+ttlStr = System.getProperty(CACHE_POLICY_PROP_FALLBACK);
+}
+String negativeTtlStr = 
Security.getProperty(NEGATIVE_CACHE_POLICY_PROP);
+if (negativeTtlStr == null) {
+// Compatible with sun.net.inetaddr.negative.ttl settings
+negativeTtlStr = 
System.getProperty(NEGATIVE_CACHE_POLICY_PROP_FALLBACK);
+}
+ttl = Optional.ofNullable(ttlStr)
+.map(Integer::decode)
+.filter(i -> i > 0)
+.orElseGet(() -> {
+if (System.getSecurityManager() == null) {
+return JDK_DEFAULT_TTL;
+}
+return DEFAULT_TTL;
+});
+
+negativeTtl = Optional.ofNullable(negativeTtlStr)
+.map(Integer::decode)
+.filter(i -> i >= 0)
+.orElse(DEFAULT_NEGATIVE_TTL);
+} catch (NumberFormatException e) {
+log.warn("Cannot get DNS TTL settings", e);
 }
-TTL = ttl <= 0 ? DEFAULT_TTL : ttl;
-NEGATIVE_TTL = negativeTtl < 0 ? DEFAULT_NEGATIVE_TTL : negativeTtl;
+TTL = ttl;
+NEGATIVE_TTL = negativeTtl;
 }
 
 private DnsResolverUtil() {
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
index 0ccb960e798..46599cc45a0 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java

(pulsar) 01/02: [fix][broker] Check cursor state before adding it to the `waitingCursors` (#22191)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ba8ff27d25200b371904161e9162d7c112848066
Author: Jiwei Guo 
AuthorDate: Thu Mar 28 06:53:21 2024 +0800

[fix][broker] Check cursor state before adding it to the `waitingCursors` 
(#22191)

(cherry picked from commit b702d440dc5e5a4cfd845bf60d5e310efe665ff5)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 +
 .../service/persistent/PersistentTopicTest.java| 46 ++
 3 files changed, 57 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 7fd93dacf49..b5c16317f2b 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -987,7 +987,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 name);
 }
 // Let the managed ledger know we want to be notified whenever 
a new entry is published
-ledger.waitingCursors.add(this);
+ledger.addWaitingCursor(this);
 } else {
 if (log.isDebugEnabled()) {
 log.debug("[{}] [{}] Skip notification registering since 
we do have entries available",
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 6727fc63479..73dfc86e1ad 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3804,6 +3804,16 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 this.waitingCursors.remove(cursor);
 }
 
+public void addWaitingCursor(ManagedCursorImpl cursor) {
+if (cursor instanceof NonDurableCursorImpl) {
+if (cursor.isActive()) {
+this.waitingCursors.add(cursor);
+}
+} else {
+this.waitingCursors.add(cursor);
+}
+}
+
 public boolean isCursorActive(ManagedCursor cursor) {
 return activeCursors.get(cursor.getName()) != null;
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 4b4aa5b45d3..55024ca3d7d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -57,10 +57,15 @@ import java.util.function.Supplier;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.TopicPoliciesService;
 import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
@@ -74,6 +79,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
@@ -661,4 +667,44 @@ public class PersistentTopicTest extends BrokerTestBase {
 subscribe.close();
 admin.topics().delete(topicName);
 }
+
+@Test
+public void testAddWaitingCursorsForNonDurable() throws Exception {
+final String ns = "prop/ns-test";
+admin.namespaces().createNamespace(ns, 2);
+final String topicName = 
"persistent://prop/ns-test/testAddWaitingCursors";
+admin.topics().createNonPartitionedTopic(topicName);
+final Optional topic = 
pulsar.getBrokerService().getTopic(topicName, false).join();
+  

(pulsar) branch branch-3.0 updated (1045f8be626 -> 1b9ae2e8cb9)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 1045f8be626 [fix][client] Fix wrong results of hasMessageAvailable and 
readNext after seeking by timestamp (#22363)
 new ba8ff27d252 [fix][broker] Check cursor state before adding it to the 
`waitingCursors` (#22191)
 new 1b9ae2e8cb9 [improve][misc] Remove the call to sun 
InetAddressCachePolicy (#22329)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 +
 .../service/persistent/PersistentTopicTest.java| 46 +++
 .../pulsar/common/util/netty/DnsResolverUtil.java  | 52 --
 .../pulsar/common/util/netty/DnsResolverTest.java  | 44 ++
 5 files changed, 139 insertions(+), 15 deletions(-)



(pulsar) branch master updated: [improve][misc] Remove the call to sun InetAddressCachePolicy (#22329)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new cce0b058efd [improve][misc] Remove the call to sun 
InetAddressCachePolicy (#22329)
cce0b058efd is described below

commit cce0b058efd55e2d5ac42c4ecaceddacee648a7c
Author: Jian Yun 
AuthorDate: Thu Mar 28 06:59:28 2024 +0800

[improve][misc] Remove the call to sun InetAddressCachePolicy (#22329)

Co-authored-by: Lari Hotari 
---
 .../pulsar/common/util/netty/DnsResolverUtil.java  | 52 --
 .../pulsar/common/util/netty/DnsResolverTest.java  | 44 ++
 2 files changed, 82 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
index f49a6453c72..bcff83acd94 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
@@ -19,12 +19,20 @@
 package org.apache.pulsar.common.util.netty;
 
 import io.netty.resolver.dns.DnsNameResolverBuilder;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
+import java.security.Security;
+import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class DnsResolverUtil {
+
+private static final String CACHE_POLICY_PROP = "networkaddress.cache.ttl";
+private static final String CACHE_POLICY_PROP_FALLBACK = 
"sun.net.inetaddr.ttl";
+private static final String NEGATIVE_CACHE_POLICY_PROP = 
"networkaddress.cache.negative.ttl";
+private static final String NEGATIVE_CACHE_POLICY_PROP_FALLBACK = 
"sun.net.inetaddr.negative.ttl";
+/* default ttl value from sun.net.InetAddressCachePolicy.DEFAULT_POSITIVE, 
which is used when no security manager
+ is used */
+private static final int JDK_DEFAULT_TTL = 30;
 private static final int MIN_TTL = 0;
 private static final int TTL;
 private static final int NEGATIVE_TTL;
@@ -39,19 +47,35 @@ public class DnsResolverUtil {
 int ttl = DEFAULT_TTL;
 int negativeTtl = DEFAULT_NEGATIVE_TTL;
 try {
-// use reflection to call sun.net.InetAddressCachePolicy's get and 
getNegative methods for getting
-// effective JDK settings for DNS caching
-Class inetAddressCachePolicyClass = 
Class.forName("sun.net.InetAddressCachePolicy");
-Method getTTLMethod = inetAddressCachePolicyClass.getMethod("get");
-ttl = (Integer) getTTLMethod.invoke(null);
-Method getNegativeTTLMethod = 
inetAddressCachePolicyClass.getMethod("getNegative");
-negativeTtl = (Integer) getNegativeTTLMethod.invoke(null);
-} catch (NoSuchMethodException | ClassNotFoundException | 
InvocationTargetException
- | IllegalAccessException e) {
-log.warn("Cannot get DNS TTL settings from 
sun.net.InetAddressCachePolicy class", e);
+String ttlStr = Security.getProperty(CACHE_POLICY_PROP);
+if (ttlStr == null) {
+// Compatible with sun.net.inetaddr.ttl settings
+ttlStr = System.getProperty(CACHE_POLICY_PROP_FALLBACK);
+}
+String negativeTtlStr = 
Security.getProperty(NEGATIVE_CACHE_POLICY_PROP);
+if (negativeTtlStr == null) {
+// Compatible with sun.net.inetaddr.negative.ttl settings
+negativeTtlStr = 
System.getProperty(NEGATIVE_CACHE_POLICY_PROP_FALLBACK);
+}
+ttl = Optional.ofNullable(ttlStr)
+.map(Integer::decode)
+.filter(i -> i > 0)
+.orElseGet(() -> {
+if (System.getSecurityManager() == null) {
+return JDK_DEFAULT_TTL;
+}
+return DEFAULT_TTL;
+});
+
+negativeTtl = Optional.ofNullable(negativeTtlStr)
+.map(Integer::decode)
+.filter(i -> i >= 0)
+.orElse(DEFAULT_NEGATIVE_TTL);
+} catch (NumberFormatException e) {
+log.warn("Cannot get DNS TTL settings", e);
 }
-TTL = ttl <= 0 ? DEFAULT_TTL : ttl;
-NEGATIVE_TTL = negativeTtl < 0 ? DEFAULT_NEGATIVE_TTL : negativeTtl;
+TTL = ttl;
+NEGATIVE_TTL = negativeTtl;
 }
 
 private DnsResolverUtil() {
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
index 0ccb960e798..46599cc45a0 100644
--- 

Re: [PR] [improve][misc] Remove the call to sun InetAddressCachePolicy [pulsar]

2024-03-27 Thread via GitHub


lhotari merged PR #22329:
URL: https://github.com/apache/pulsar/pull/22329


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [fix][broker] Check cursor state before adding it to the `waitingCursors` (#22191)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new b702d440dc5 [fix][broker] Check cursor state before adding it to the 
`waitingCursors` (#22191)
b702d440dc5 is described below

commit b702d440dc5e5a4cfd845bf60d5e310efe665ff5
Author: Jiwei Guo 
AuthorDate: Thu Mar 28 06:53:21 2024 +0800

[fix][broker] Check cursor state before adding it to the `waitingCursors` 
(#22191)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 +
 .../service/persistent/PersistentTopicTest.java| 46 ++
 3 files changed, 57 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8b13fc0f342..b253da72fa9 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -992,7 +992,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 name);
 }
 // Let the managed ledger know we want to be notified whenever 
a new entry is published
-ledger.waitingCursors.add(this);
+ledger.addWaitingCursor(this);
 } else {
 if (log.isDebugEnabled()) {
 log.debug("[{}] [{}] Skip notification registering since 
we do have entries available",
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 1c0a0465507..0f089ef4a85 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3813,6 +3813,16 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 this.waitingCursors.remove(cursor);
 }
 
+public void addWaitingCursor(ManagedCursorImpl cursor) {
+if (cursor instanceof NonDurableCursorImpl) {
+if (cursor.isActive()) {
+this.waitingCursors.add(cursor);
+}
+} else {
+this.waitingCursors.add(cursor);
+}
+}
+
 public boolean isCursorActive(ManagedCursor cursor) {
 return activeCursors.get(cursor.getName()) != null;
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index ea1a68bb0c2..d42b1d92007 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -59,10 +59,15 @@ import java.util.function.Supplier;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.TopicPoliciesService;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -75,6 +80,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
@@ -662,4 +668,44 @@ public class PersistentTopicTest extends BrokerTestBase {
 subscribe.close();
 admin.topics().delete(topicName);
 }
+
+@Test
+public void testAddWaitingCursorsForNonDurable() throws Exception {
+final String ns = "prop/ns-test";
+admin.namespaces().createNamespace(ns, 2);
+final String topicName = 
"persistent://prop/ns-test/testAddWaitingCursors";
+

Re: [PR] [fix][broker] Check cursor state before adding it to the `waitingCursors` [pulsar]

2024-03-27 Thread via GitHub


lhotari merged PR #22191:
URL: https://github.com/apache/pulsar/pull/22191


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug] Broker memory leak [pulsar]

2024-03-27 Thread via GitHub


lhotari closed issue #22157: [Bug] Broker memory leak 
URL: https://github.com/apache/pulsar/issues/22157


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] [Bug] weird packet len issues on broker <-> metadatastore [pulsar]

2024-03-27 Thread via GitHub


KannarFr opened a new issue, #22373:
URL: https://github.com/apache/pulsar/issues/22373

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [X] I understand that unsupported versions don't get bug fixes. I will 
attempt to reproduce the issue on a supported version of Pulsar client and 
Pulsar broker.
   
   
   ### Version
   
   3.2.1
   
   ### Minimal reproduce step
   
   N/A
   
   ### What did you expect to see?
   
   N/A
   
   ### What did you see instead?
   
   Random `Packet len` exceptions for some days after months of running. A part 
of the logs 
https://gist.githubusercontent.com/KannarFr/8aec4e4100c422563aacb3b3b404cd8c/raw/c5844217d386f2267b402099829a893c10f41d50/gistfile1.txt.
   
   I'm pretty sure it can't be namespace policies/topics metadata as we limited 
the number of topics per namespace to 3000. Can it be a ledgers listing over 
packet len from ZK to broker? 
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) 04/05: [fix][broker] Avoid execute prepareInitPoliciesCacheAsync if namespace is deleted (#22268)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d4610a0d60139f8660c9d58bf2d8a7981f03d19a
Author: hanmz 
AuthorDate: Mon Mar 18 06:45:02 2024 +0800

[fix][broker] Avoid execute prepareInitPoliciesCacheAsync if namespace is 
deleted (#22268)

(cherry picked from commit 96d77f7e1d5b9c56070eaed5c31213a8144871d3)
---
 .../SystemTopicBasedTopicPoliciesService.java  | 66 +-
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 19 +++
 2 files changed, 58 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 71f78e21f93..4e9e875bcf4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -324,34 +324,46 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
 }
 }
 
-private @Nonnull CompletableFuture 
prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
+@VisibleForTesting
+@Nonnull CompletableFuture prepareInitPoliciesCacheAsync(@Nonnull 
NamespaceName namespace) {
 requireNonNull(namespace);
-return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
-final CompletableFuture> 
readerCompletableFuture =
-createSystemTopicClient(namespace);
-readerCaches.put(namespace, readerCompletableFuture);
-ownedBundlesCountPerNamespace.putIfAbsent(namespace, new 
AtomicInteger(1));
-final CompletableFuture initFuture = readerCompletableFuture
-.thenCompose(reader -> {
-final CompletableFuture stageFuture = new 
CompletableFuture<>();
-initPolicesCache(reader, stageFuture);
-return stageFuture
-// Read policies in background
-.thenAccept(__ -> 
readMorePoliciesAsync(reader));
-});
-initFuture.exceptionally(ex -> {
-try {
-log.error("[{}] Failed to create reader on __change_events 
topic", namespace, ex);
-cleanCacheAndCloseReader(namespace, false);
-} catch (Throwable cleanupEx) {
-// Adding this catch to avoid break callback chain
-log.error("[{}] Failed to cleanup reader on 
__change_events topic", namespace, cleanupEx);
-}
-return null;
-});
-// let caller know we've got an exception.
-return initFuture;
-});
+return 
pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace)
+.thenCompose(namespacePolicies -> {
+if (namespacePolicies.isEmpty() || 
namespacePolicies.get().deleted) {
+log.info("[{}] skip prepare init policies 
cache since the namespace is deleted",
+namespace);
+return CompletableFuture.completedFuture(null);
+}
+
+return 
policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
+final 
CompletableFuture> 
readerCompletableFuture =
+createSystemTopicClient(namespace);
+readerCaches.put(namespace, 
readerCompletableFuture);
+
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
+final CompletableFuture initFuture = 
readerCompletableFuture
+.thenCompose(reader -> {
+final CompletableFuture 
stageFuture = new CompletableFuture<>();
+initPolicesCache(reader, 
stageFuture);
+return stageFuture
+// Read policies in 
background
+.thenAccept(__ -> 
readMorePoliciesAsync(reader));
+});
+initFuture.exceptionally(ex -> {
+try {
+log.error("[{}] Failed to create 
reader on __change_events topic",
+namespace, ex);
+

(pulsar) 01/05: [improve][client] Add backoff for `seek` (#20963)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9ea7f6014b51635fc337acd0e264cf444cdd3a2a
Author: Jiwei Guo 
AuthorDate: Mon Aug 21 15:10:49 2023 +0800

[improve][client] Add backoff for `seek` (#20963)

(cherry picked from commit ee91edc3e08c87db1e5662a553ff62af0c1886e5)
---
 .../apache/pulsar/client/impl/ConsumerImpl.java| 154 +++--
 .../pulsar/client/impl/ConsumerImplTest.java   |  13 +-
 2 files changed, 93 insertions(+), 74 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index cfb5f309684..717355abbef 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2225,100 +2225,108 @@ public class ConsumerImpl extends ConsumerBase 
implements ConnectionHandle
 new PulsarClientException("Only support seek by messageId or 
timestamp"));
 }
 
-private Optional> seekAsyncCheckState(String 
seekBy) {
-if (getState() == State.Closing || getState() == State.Closed) {
-return Optional.of(FutureUtil
-.failedFuture(new 
PulsarClientException.AlreadyClosedException(
-String.format("The consumer %s was already closed 
when seeking the subscription %s of the"
-+ " topic %s to %s", consumerName, 
subscription, topicName.toString(), seekBy;
-}
-
-if (!isConnected()) {
-return Optional.of(FutureUtil.failedFuture(new 
PulsarClientException(
-String.format("The client is not connected to the broker 
when seeking the subscription %s of the "
-+ "topic %s to %s", subscription, 
topicName.toString(), seekBy;
-}
+private CompletableFuture seekAsyncInternal(long requestId, ByteBuf 
seek, MessageId seekId, String seekBy) {
+AtomicLong opTimeoutMs = new 
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
+Backoff backoff = new BackoffBuilder()
+.setInitialTime(100, TimeUnit.MILLISECONDS)
+.setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
+.setMandatoryStop(0, TimeUnit.MILLISECONDS)
+.create();
 
-return Optional.empty();
+CompletableFuture seekFuture = new CompletableFuture<>();
+seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, 
opTimeoutMs, seekFuture);
+return seekFuture;
 }
 
-private CompletableFuture seekAsyncInternal(long requestId, ByteBuf 
seek, MessageId seekId, String seekBy) {
-final CompletableFuture seekFuture = new CompletableFuture<>();
+private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId 
seekId, String seekBy,
+   final Backoff backoff, final AtomicLong 
remainingTime,
+   CompletableFuture seekFuture) {
 ClientCnx cnx = cnx();
+if (isConnected() && cnx != null) {
+if (!duringSeek.compareAndSet(false, true)) {
+final String message = String.format(
+"[%s][%s] attempting to seek operation that is already 
in progress (seek by %s)",
+topic, subscription, seekBy);
+log.warn("[{}][{}] Attempting to seek operation that is 
already in progress, cancelling {}",
+topic, subscription, seekBy);
+seekFuture.completeExceptionally(new 
IllegalStateException(message));
+return;
+}
+MessageIdAdv originSeekMessageId = seekMessageId;
+seekMessageId = (MessageIdAdv) seekId;
+log.info("[{}][{}] Seeking subscription to {}", topic, 
subscription, seekBy);
 
-if (!duringSeek.compareAndSet(false, true)) {
-final String message = String.format(
-"[%s][%s] attempting to seek operation that is already in 
progress (seek by %s)",
-topic, subscription, seekBy);
-log.warn("[{}][{}] Attempting to seek operation that is already in 
progress, cancelling {}",
-topic, subscription, seekBy);
-seekFuture.completeExceptionally(new 
IllegalStateException(message));
-return seekFuture;
-}
-
-MessageIdAdv originSeekMessageId = seekMessageId;
-seekMessageId = (MessageIdAdv) seekId;
-log.info("[{}][{}] Seeking subscription to {}", topic, subscription, 
seekBy);
-
-cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
-log.info("[{}][{}] Successfully reset subscription to {}", topic, 
subscription, seekBy);
- 

(pulsar) 03/05: [fix][broker] Fix wrong double-checked locking for readOnActiveConsumerTask in dispatcher (#22279)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e2070a87d556816a831a60c8056fb6cbddfb61c1
Author: Yunze Xu 
AuthorDate: Sat Mar 16 14:56:34 2024 +0800

[fix][broker] Fix wrong double-checked locking for readOnActiveConsumerTask 
in dispatcher (#22279)

(cherry picked from commit 4e0c145c89a35ec9b41fa22862edac59e28d892d)
---
 .../PersistentDispatcherSingleActiveConsumer.java  | 26 +-
 1 file changed, 16 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index bf6482bda01..cc7b6841e5c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -71,6 +71,7 @@ public class PersistentDispatcherSingleActiveConsumer extends 
AbstractDispatcher
 protected volatile int readBatchSize;
 protected final Backoff readFailureBackoff;
 private volatile ScheduledFuture readOnActiveConsumerTask = null;
+private final Object lockForReadOnActiveConsumerTask = new Object();
 
 private final RedeliveryTracker redeliveryTracker;
 
@@ -120,18 +121,23 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
 return;
 }
 
-readOnActiveConsumerTask = 
topic.getBrokerService().executor().schedule(() -> {
-if (log.isDebugEnabled()) {
-log.debug("[{}] Rewind cursor and read more entries after {} 
ms delay", name,
-
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
+synchronized (lockForReadOnActiveConsumerTask) {
+if (readOnActiveConsumerTask != null) {
+return;
 }
-Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
-cursor.rewind(activeConsumer != null && 
activeConsumer.readCompacted());
+readOnActiveConsumerTask = 
topic.getBrokerService().executor().schedule(() -> {
+if (log.isDebugEnabled()) {
+log.debug("[{}] Rewind cursor and read more entries after 
{} ms delay", name,
+
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
+}
+Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+cursor.rewind(activeConsumer != null && 
activeConsumer.readCompacted());
 
-notifyActiveConsumerChanged(activeConsumer);
-readMoreEntries(activeConsumer);
-readOnActiveConsumerTask = null;
-}, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), 
TimeUnit.MILLISECONDS);
+notifyActiveConsumerChanged(activeConsumer);
+readMoreEntries(activeConsumer);
+readOnActiveConsumerTask = null;
+}, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), 
TimeUnit.MILLISECONDS);
+}
 }
 
 @Override



(pulsar) 05/05: [fix][client] Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp (#22363)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1045f8be626981c818c6eddf30f5732d25dbac66
Author: Yunze Xu 
AuthorDate: Wed Mar 27 19:49:27 2024 +0800

[fix][client] Fix wrong results of hasMessageAvailable and readNext after 
seeking by timestamp (#22363)

Co-authored-by: Lari Hotari 
(cherry picked from commit 149deaa5a79ed8570489bead4215ae213a4e9206)
---
 .../org/apache/pulsar/client/impl/ReaderTest.java  | 85 +++---
 .../apache/pulsar/client/impl/ConsumerImpl.java| 25 ---
 2 files changed, 90 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index d511c6dc37f..00c3eadb06a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
@@ -64,8 +65,8 @@ import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.apache.pulsar.schema.Schemas;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -75,7 +76,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
 
 private static final String subscription = "reader-sub";
 
-@BeforeMethod
+@BeforeClass(alwaysRun = true)
 @Override
 protected void setup() throws Exception {
 super.internalSetup();
@@ -87,7 +88,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
 admin.namespaces().createNamespace("my-property/my-ns", 
Sets.newHashSet("test"));
 }
 
-@AfterMethod(alwaysRun = true)
+@AfterClass(alwaysRun = true)
 @Override
 protected void cleanup() throws Exception {
 super.internalCleanup();
@@ -146,21 +147,41 @@ public class ReaderTest extends 
MockedPulsarServiceBaseTest {
 testReadMessages(topic, true);
 }
 
-@Test
-public void testReadMessageWithBatchingWithMessageInclusive() throws 
Exception {
+@DataProvider
+public static Object[][] seekBeforeHasMessageAvailable() {
+return new Object[][] { { true }, { false } };
+}
+
+@Test(timeOut = 2, dataProvider = "seekBeforeHasMessageAvailable")
+public void testReadMessageWithBatchingWithMessageInclusive(boolean 
seekBeforeHasMessageAvailable)
+throws Exception {
 String topic = 
"persistent://my-property/my-ns/my-reader-topic-with-batching-inclusive";
 Set keys = publishMessages(topic, 10, true);
 
 Reader reader = 
pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest)
 
.startMessageIdInclusive().readerName(subscription).create();
 
-while (reader.hasMessageAvailable()) {
-Assert.assertTrue(keys.remove(reader.readNext().getKey()));
+if (seekBeforeHasMessageAvailable) {
+reader.seek(0L); // it should seek to the earliest
 }
+
+assertTrue(reader.hasMessageAvailable());
+final Message msg = reader.readNext();
+assertTrue(keys.remove(msg.getKey()));
 // start from latest with start message inclusive should only read the 
last message in batch
 assertEquals(keys.size(), 9);
-Assert.assertFalse(keys.contains("key9"));
-Assert.assertFalse(reader.hasMessageAvailable());
+
+final MessageIdAdv msgId = (MessageIdAdv) msg.getMessageId();
+if (seekBeforeHasMessageAvailable) {
+assertEquals(msgId.getBatchIndex(), 0);
+assertFalse(keys.contains("key0"));
+assertTrue(reader.hasMessageAvailable());
+} else {
+assertEquals(msgId.getBatchIndex(), 9);
+assertFalse(reader.hasMessageAvailable());
+assertFalse(keys.contains("key9"));
+assertFalse(reader.hasMessageAvailable());
+}
 }
 
 private void testReadMessages(String topic, boolean enableBatch) throws 
Exception {
@@ -258,7 +279,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest 
{
 @Test
 public void testReaderWithTimeLong() throws Exception {
 String ns = "my-property/my-ns";
- 

(pulsar) branch branch-3.0 updated (31c9f44cb53 -> 1045f8be626)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 31c9f44cb53 [fix][misc] Make ConcurrentBitSet thread safe (#22361)
 new 9ea7f6014b5 [improve][client] Add backoff for `seek` (#20963)
 new 318ff3398d3 [fix][client] fix Reader.hasMessageAvailable might return 
true after seeking to latest (#22201)
 new e2070a87d55 [fix][broker] Fix wrong double-checked locking for 
readOnActiveConsumerTask in dispatcher (#22279)
 new d4610a0d601 [fix][broker] Avoid execute prepareInitPoliciesCacheAsync 
if namespace is deleted (#22268)
 new 1045f8be626 [fix][client] Fix wrong results of hasMessageAvailable and 
readNext after seeking by timestamp (#22363)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../SystemTopicBasedTopicPoliciesService.java  |  66 +++---
 .../PersistentDispatcherSingleActiveConsumer.java  |  26 ++-
 .../SystemTopicBasedTopicPoliciesServiceTest.java  |  19 ++
 .../org/apache/pulsar/client/impl/ReaderTest.java  | 112 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java| 227 +
 .../pulsar/client/impl/ConsumerImplTest.java   |  15 +-
 6 files changed, 325 insertions(+), 140 deletions(-)



(pulsar) 02/05: [fix][client] fix Reader.hasMessageAvailable might return true after seeking to latest (#22201)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 318ff3398d39fa16010a664c747adbe77fd44d20
Author: Yunze Xu 
AuthorDate: Thu Mar 7 11:32:18 2024 +0800

[fix][client] fix Reader.hasMessageAvailable might return true after 
seeking to latest (#22201)

(cherry picked from commit 95a53f3a033c4e57db2ddb2d1f0e9a4bc8b9f441)
---
 .../org/apache/pulsar/client/impl/ReaderTest.java  |  27 ++
 .../apache/pulsar/client/impl/ConsumerImpl.java| 104 ++---
 .../pulsar/client/impl/ConsumerImplTest.java   |   2 +-
 3 files changed, 96 insertions(+), 37 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 2f91d792581..d511c6dc37f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -66,6 +66,7 @@ import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -761,4 +762,30 @@ public class ReaderTest extends 
MockedPulsarServiceBaseTest {
 producer.close();
 admin.topics().delete(topic, false);
 }
+
+@DataProvider
+public static Object[][] initializeLastMessageIdInBroker() {
+return new Object[][] { { true }, { false } };
+}
+
+@Test(dataProvider = "initializeLastMessageIdInBroker")
+public void testHasMessageAvailableAfterSeek(boolean 
initializeLastMessageIdInBroker) throws Exception {
+final String topic = 
"persistent://my-property/my-ns/test-has-message-available-after-seek";
+@Cleanup Reader reader = 
pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
+.startMessageId(MessageId.earliest).create();
+
+@Cleanup Producer producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+producer.send("msg");
+
+if (initializeLastMessageIdInBroker) {
+assertTrue(reader.hasMessageAvailable());
+} // else: lastMessageIdInBroker is earliest
+
+reader.seek(MessageId.latest);
+// lastMessageIdInBroker is the last message ID, while startMessageId 
is still earliest
+assertFalse(reader.hasMessageAvailable());
+
+producer.send("msg");
+assertTrue(reader.hasMessageAvailable());
+}
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 717355abbef..dae53c59d6e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -166,7 +166,9 @@ public class ConsumerImpl extends ConsumerBase 
implements ConnectionHandle
 private volatile MessageIdAdv startMessageId;
 
 private volatile MessageIdAdv seekMessageId;
-private final AtomicBoolean duringSeek;
+@VisibleForTesting
+final AtomicReference seekStatus;
+private volatile CompletableFuture seekFuture;
 
 private final MessageIdAdv initialStartMessageId;
 
@@ -296,7 +298,7 @@ public class ConsumerImpl extends ConsumerBase 
implements ConnectionHandle
 stats = ConsumerStatsDisabled.INSTANCE;
 }
 
-duringSeek = new AtomicBoolean(false);
+seekStatus = new AtomicReference<>(SeekStatus.NOT_STARTED);
 
 // Create msgCrypto if not created already
 if (conf.getCryptoKeyReader() != null) {
@@ -771,7 +773,7 @@ public class ConsumerImpl extends ConsumerBase 
implements ConnectionHandle
 closeConsumerTasks();
 deregisterFromClientCnx();
 client.cleanupConsumer(this);
-clearReceiverQueue();
+clearReceiverQueue(false);
 return CompletableFuture.completedFuture(null);
 }
 
@@ -779,7 +781,7 @@ public class ConsumerImpl extends ConsumerBase 
implements ConnectionHandle
 topic, subscription, cnx.ctx().channel(), consumerId);
 
 long requestId = client.newRequestId();
-if (duringSeek.get()) {
+if (seekStatus.get() != SeekStatus.NOT_STARTED) {
 acknowledgmentsGroupingTracker.flushAndClean();
 }
 
@@ -790,7 +792,8 @@ public class ConsumerImpl extends ConsumerBase 
implements ConnectionHandle
 int currentSize;
 synchronized (this) {
 currentSize = incomingMessages.size();
-startMessageId = clearReceiverQueue();
+setClientCnx(cnx);
+clearReceiverQueue(true);
 if (possibleSendToDeadLetterTopicMessages != 

Re: [PR] [fix][client] Fix Reader.hasMessageAvailable might return true after seeking to latest [pulsar]

2024-03-27 Thread via GitHub


lhotari commented on PR #22201:
URL: https://github.com/apache/pulsar/pull/22201#issuecomment-2024096913

   It's necessary to cherry-pick #20963 so that this fix could be applied 
cleanly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.0 updated (6e0ebcbaad6 -> 31c9f44cb53)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 6e0ebcbaad6 [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest 
(#22252)
 new 20be6ca977e [fix][broker] Avoid expired unclosed ledgers when checking 
expired messages by ledger closure time (#22335)
 new 31c9f44cb53 [fix][misc] Make ConcurrentBitSet thread safe (#22361)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../persistent/PersistentMessageExpiryMonitor.java |   4 +-
 .../service/PersistentMessageFinderTest.java   |  51 ++-
 .../common/util/collections/ConcurrentBitSet.java  | 363 +++--
 3 files changed, 377 insertions(+), 41 deletions(-)



(pulsar) 02/02: [fix][misc] Make ConcurrentBitSet thread safe (#22361)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 31c9f44cb53f4bd115c2e471a817014827e49ebc
Author: Lari Hotari 
AuthorDate: Wed Mar 27 09:16:22 2024 -0700

[fix][misc] Make ConcurrentBitSet thread safe (#22361)

(cherry picked from commit edd0076bd83f01a5fcbe81c8396667014f0fc36e)
---
 .../common/util/collections/ConcurrentBitSet.java  | 363 +++--
 1 file changed, 331 insertions(+), 32 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
index 23842fe5b55..a37628cb300 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
@@ -20,12 +20,13 @@ package org.apache.pulsar.common.util.collections;
 
 import java.util.BitSet;
 import java.util.concurrent.locks.StampedLock;
-import lombok.EqualsAndHashCode;
+import java.util.stream.IntStream;
 
 /**
- * Safe multithreaded version of {@code BitSet}.
+ * A {@code BitSet} that is protected by a {@code StampedLock} to provide 
thread-safe access.
+ * The {@link #length()} method is not thread safe and is not overridden 
because StampedLock is not reentrant.
+ * Use the {@link #safeLength()} method to get the length of the bit set in a 
thread-safe manner.
  */
-@EqualsAndHashCode(callSuper = true)
 public class ConcurrentBitSet extends BitSet {
 
 private static final long serialVersionUID = 1L;
@@ -39,10 +40,8 @@ public class ConcurrentBitSet extends BitSet {
  * Creates a bit set whose initial size is large enough to explicitly 
represent bits with indices in the range
  * {@code 0} through {@code nbits-1}. All bits are initially {@code false}.
  *
- * @param nbits
- *the initial size of the bit set
- * @throws NegativeArraySizeException
- * if the specified initial size is negative
+ * @param nbits the initial size of the bit set
+ * @throws NegativeArraySizeException if the specified initial size is 
negative
  */
 public ConcurrentBitSet(int nbits) {
 super(nbits);
@@ -65,105 +64,405 @@ public class ConcurrentBitSet extends BitSet {
 
 @Override
 public void set(int bitIndex) {
+long stamp = rwLock.writeLock();
+try {
+super.set(bitIndex);
+} finally {
+rwLock.unlockWrite(stamp);
+}
+}
+
+@Override
+public void clear(int bitIndex) {
+long stamp = rwLock.writeLock();
+try {
+super.clear(bitIndex);
+} finally {
+rwLock.unlockWrite(stamp);
+}
+}
+
+@Override
+public void set(int fromIndex, int toIndex) {
+long stamp = rwLock.writeLock();
+try {
+super.set(fromIndex, toIndex);
+} finally {
+rwLock.unlockWrite(stamp);
+}
+}
+
+@Override
+public void clear(int fromIndex, int toIndex) {
+long stamp = rwLock.writeLock();
+try {
+super.clear(fromIndex, toIndex);
+} finally {
+rwLock.unlockWrite(stamp);
+}
+}
+
+@Override
+public void clear() {
+long stamp = rwLock.writeLock();
+try {
+super.clear();
+} finally {
+rwLock.unlockWrite(stamp);
+}
+}
+
+@Override
+public int nextSetBit(int fromIndex) {
 long stamp = rwLock.tryOptimisticRead();
-super.set(bitIndex);
+int nextSetBit = super.nextSetBit(fromIndex);
 if (!rwLock.validate(stamp)) {
+// Fallback to read lock
 stamp = rwLock.readLock();
 try {
-super.set(bitIndex);
+nextSetBit = super.nextSetBit(fromIndex);
 } finally {
 rwLock.unlockRead(stamp);
 }
 }
+return nextSetBit;
 }
 
 @Override
-public void set(int fromIndex, int toIndex) {
+public int nextClearBit(int fromIndex) {
 long stamp = rwLock.tryOptimisticRead();
-super.set(fromIndex, toIndex);
+int nextClearBit = super.nextClearBit(fromIndex);
 if (!rwLock.validate(stamp)) {
+// Fallback to read lock
 stamp = rwLock.readLock();
 try {
-super.set(fromIndex, toIndex);
+nextClearBit = super.nextClearBit(fromIndex);
 } finally {
 rwLock.unlockRead(stamp);
 }
 }
+return nextClearBit;
 }
 
 @Override
-public int nextSetBit(int fromIndex) {
+public int previousSetBit(int fromIndex) {
 long stamp = rwLock.tryOptimisticRead();
-int bit 

(pulsar) 01/02: [fix][broker] Avoid expired unclosed ledgers when checking expired messages by ledger closure time (#22335)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 20be6ca977e78c7defa14883becb84397003659d
Author: Cong Zhao 
AuthorDate: Thu Mar 28 03:42:15 2024 +0800

[fix][broker] Avoid expired unclosed ledgers when checking expired messages 
by ledger closure time (#22335)

(cherry picked from commit f77fe5f099f7ecc334509db07bba477c4226cf19)
---
 .../persistent/PersistentMessageExpiryMonitor.java |  4 +-
 .../service/PersistentMessageFinderTest.java   | 51 +++---
 2 files changed, 46 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index d82f3d93f8f..51ce2c1b8ab 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -116,8 +116,8 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback {
 managedLedger.getLedgersInfo().lastKey(), true);
 MLDataFormats.ManagedLedgerInfo.LedgerInfo info = null;
 for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : 
ledgerInfoSortedMap.values()) {
-if (!ledgerInfo.hasTimestamp() || 
!MessageImpl.isEntryExpired(messageTTLInSeconds,
-ledgerInfo.getTimestamp())) {
+if (!ledgerInfo.hasTimestamp() || ledgerInfo.getTimestamp() == 
0L
+|| !MessageImpl.isEntryExpired(messageTTLInSeconds, 
ledgerInfo.getTimestamp())) {
 break;
 }
 info = ledgerInfo;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index d56968c8f8e..194747f59fc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -33,10 +33,8 @@ import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
-
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -46,7 +44,9 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
+import java.util.concurrent.atomic.AtomicReference;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -59,6 +59,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import 
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -71,11 +72,10 @@ import 
org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
 import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.core.MediaType;
-
 @Test(groups = "broker")
 public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
 
@@ -261,7 +261,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
 ledger.addEntry(createMessageWrittenToLedger("msg2"));
 ledger.addEntry(createMessageWrittenToLedger("msg3"));
 Position lastPosition = 
ledger.addEntry(createMessageWrittenToLedger("last-message"));
-
+
 long endTimestamp = System.currentTimeMillis() + 1000;
 
 Result result = new Result();
@@ -451,6 +451,43 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
 assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
 }
 
+@Test
+public void 

(pulsar) 01/02: [fix][misc] Make ConcurrentBitSet thread safe (#22361)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cc79803cb7e4caf732e37e1879ae49f3dfbbd290
Author: Lari Hotari 
AuthorDate: Wed Mar 27 09:16:22 2024 -0700

[fix][misc] Make ConcurrentBitSet thread safe (#22361)

(cherry picked from commit edd0076bd83f01a5fcbe81c8396667014f0fc36e)
---
 .../common/util/collections/ConcurrentBitSet.java  | 363 +++--
 1 file changed, 331 insertions(+), 32 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
index 23842fe5b55..a37628cb300 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
@@ -20,12 +20,13 @@ package org.apache.pulsar.common.util.collections;
 
 import java.util.BitSet;
 import java.util.concurrent.locks.StampedLock;
-import lombok.EqualsAndHashCode;
+import java.util.stream.IntStream;
 
 /**
- * Safe multithreaded version of {@code BitSet}.
+ * A {@code BitSet} that is protected by a {@code StampedLock} to provide 
thread-safe access.
+ * The {@link #length()} method is not thread safe and is not overridden 
because StampedLock is not reentrant.
+ * Use the {@link #safeLength()} method to get the length of the bit set in a 
thread-safe manner.
  */
-@EqualsAndHashCode(callSuper = true)
 public class ConcurrentBitSet extends BitSet {
 
 private static final long serialVersionUID = 1L;
@@ -39,10 +40,8 @@ public class ConcurrentBitSet extends BitSet {
  * Creates a bit set whose initial size is large enough to explicitly 
represent bits with indices in the range
  * {@code 0} through {@code nbits-1}. All bits are initially {@code false}.
  *
- * @param nbits
- *the initial size of the bit set
- * @throws NegativeArraySizeException
- * if the specified initial size is negative
+ * @param nbits the initial size of the bit set
+ * @throws NegativeArraySizeException if the specified initial size is 
negative
  */
 public ConcurrentBitSet(int nbits) {
 super(nbits);
@@ -65,105 +64,405 @@ public class ConcurrentBitSet extends BitSet {
 
 @Override
 public void set(int bitIndex) {
+long stamp = rwLock.writeLock();
+try {
+super.set(bitIndex);
+} finally {
+rwLock.unlockWrite(stamp);
+}
+}
+
+@Override
+public void clear(int bitIndex) {
+long stamp = rwLock.writeLock();
+try {
+super.clear(bitIndex);
+} finally {
+rwLock.unlockWrite(stamp);
+}
+}
+
+@Override
+public void set(int fromIndex, int toIndex) {
+long stamp = rwLock.writeLock();
+try {
+super.set(fromIndex, toIndex);
+} finally {
+rwLock.unlockWrite(stamp);
+}
+}
+
+@Override
+public void clear(int fromIndex, int toIndex) {
+long stamp = rwLock.writeLock();
+try {
+super.clear(fromIndex, toIndex);
+} finally {
+rwLock.unlockWrite(stamp);
+}
+}
+
+@Override
+public void clear() {
+long stamp = rwLock.writeLock();
+try {
+super.clear();
+} finally {
+rwLock.unlockWrite(stamp);
+}
+}
+
+@Override
+public int nextSetBit(int fromIndex) {
 long stamp = rwLock.tryOptimisticRead();
-super.set(bitIndex);
+int nextSetBit = super.nextSetBit(fromIndex);
 if (!rwLock.validate(stamp)) {
+// Fallback to read lock
 stamp = rwLock.readLock();
 try {
-super.set(bitIndex);
+nextSetBit = super.nextSetBit(fromIndex);
 } finally {
 rwLock.unlockRead(stamp);
 }
 }
+return nextSetBit;
 }
 
 @Override
-public void set(int fromIndex, int toIndex) {
+public int nextClearBit(int fromIndex) {
 long stamp = rwLock.tryOptimisticRead();
-super.set(fromIndex, toIndex);
+int nextClearBit = super.nextClearBit(fromIndex);
 if (!rwLock.validate(stamp)) {
+// Fallback to read lock
 stamp = rwLock.readLock();
 try {
-super.set(fromIndex, toIndex);
+nextClearBit = super.nextClearBit(fromIndex);
 } finally {
 rwLock.unlockRead(stamp);
 }
 }
+return nextClearBit;
 }
 
 @Override
-public int nextSetBit(int fromIndex) {
+public int previousSetBit(int fromIndex) {
 long stamp = rwLock.tryOptimisticRead();
-int bit 

(pulsar) 02/02: [fix][broker] Avoid expired unclosed ledgers when checking expired messages by ledger closure time (#22335)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f56c383a5312ef9186880662860b8d2f38b3c0c8
Author: Cong Zhao 
AuthorDate: Thu Mar 28 03:42:15 2024 +0800

[fix][broker] Avoid expired unclosed ledgers when checking expired messages 
by ledger closure time (#22335)

(cherry picked from commit f77fe5f099f7ecc334509db07bba477c4226cf19)
---
 .../persistent/PersistentMessageExpiryMonitor.java |  4 +-
 .../service/PersistentMessageFinderTest.java   | 51 +++---
 2 files changed, 47 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index ac391c10503..2478a7a2538 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -121,8 +121,8 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
 managedLedger.getLedgersInfo().lastKey(), true);
 MLDataFormats.ManagedLedgerInfo.LedgerInfo info = null;
 for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : 
ledgerInfoSortedMap.values()) {
-if (!ledgerInfo.hasTimestamp() || 
!MessageImpl.isEntryExpired(messageTTLInSeconds,
-ledgerInfo.getTimestamp())) {
+if (!ledgerInfo.hasTimestamp() || ledgerInfo.getTimestamp() == 
0L
+|| !MessageImpl.isEntryExpired(messageTTLInSeconds, 
ledgerInfo.getTimestamp())) {
 break;
 }
 info = ledgerInfo;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index ace552a55a7..6883c0467e4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -33,10 +33,8 @@ import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
-
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -46,7 +44,9 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
+import java.util.concurrent.atomic.AtomicReference;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -59,6 +59,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import 
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -72,11 +73,10 @@ import 
org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
 import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.core.MediaType;
-
 @Test(groups = "broker")
 public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
 
@@ -463,6 +463,45 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
 assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
 }
 
+@Test
+public void testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger() 
throws Throwable {
+final String ledgerAndCursorName = 
"testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger";
+int maxTTLSeconds = 1;
+ManagedLedgerConfig config = new ManagedLedgerConfig();
+config.setMaxEntriesPerLedger(5);
+ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open(ledgerAndCursorName, config);
+ManagedCursorImpl c1 = 

(pulsar) branch branch-3.2 updated (21e56955d5e -> f56c383a531)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 21e56955d5e [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest 
(#22252)
 new cc79803cb7e [fix][misc] Make ConcurrentBitSet thread safe (#22361)
 new f56c383a531 [fix][broker] Avoid expired unclosed ledgers when checking 
expired messages by ledger closure time (#22335)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../persistent/PersistentMessageExpiryMonitor.java |   4 +-
 .../service/PersistentMessageFinderTest.java   |  51 ++-
 .../common/util/collections/ConcurrentBitSet.java  | 363 +++--
 3 files changed, 378 insertions(+), 40 deletions(-)



(pulsar) branch branch-2.11 updated: [fix][broker][branch-2.11] Fast fix infinite HTTP call createSubscriptions caused by wrong topicName (#22346)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
 new 2cee88c8f08 [fix][broker][branch-2.11] Fast fix infinite HTTP call 
createSubscriptions caused by wrong topicName (#22346)
2cee88c8f08 is described below

commit 2cee88c8f08ebd08e6447f18aa44c3af00026974
Author: fengyubiao 
AuthorDate: Thu Mar 28 03:43:08 2024 +0800

[fix][broker][branch-2.11] Fast fix infinite HTTP call createSubscriptions 
caused by wrong topicName (#22346)
---
 .../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java| 2 +-
 .../api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java| 9 ++---
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 8f6191938ed..2cb8495034b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2234,7 +2234,7 @@ public class PersistentTopicsBase extends AdminResource {
 getPartitionedTopicMetadataAsync(topicName, 
authoritative, allowAutoTopicCreation))
 .thenAccept(partitionMetadata -> {
 final int numPartitions = partitionMetadata.partitions;
-if (numPartitions > 0) {
+if (partitionMetadata.partitions > 0 && 
!isUnexpectedTopicName(partitionMetadata)) {
 final CompletableFuture future = new 
CompletableFuture<>();
 final AtomicInteger count = new 
AtomicInteger(numPartitions);
 final AtomicInteger failureCount = new 
AtomicInteger(0);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java
index 3b1222f5b55..a295a48b4fd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java
@@ -53,7 +53,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest 
extends ProducerCo
 }
 
 @Test
-public void testInfiniteHttpCallGetSubscriptions() throws Exception {
+public void testInfiniteHttpCallGetOrCreateSubscriptions() throws 
Exception {
 final String randomStr = UUID.randomUUID().toString().replaceAll("-", 
"");
 final String partitionedTopicName = 
"persistent://my-property/my-ns/tp1_" + randomStr;
 final String topic_p0 = partitionedTopicName + 
TopicName.PARTITIONED_TOPIC_SUFFIX + "0";
@@ -65,6 +65,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest 
extends ProducerCo
 // Do test.
 ProducerAndConsumerEntry pcEntry = triggerDLQCreated(topic_p0, 
topicDLQ, subscriptionName);
 admin.topics().getSubscriptions(topicDLQ);
+admin.topics().createSubscription(topicDLQ, "s1", MessageId.earliest);
 
 // cleanup.
 pcEntry.consumer.close();
@@ -73,7 +74,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest 
extends ProducerCo
 }
 
 @Test
-public void testInfiniteHttpCallGetSubscriptions2() throws Exception {
+public void testInfiniteHttpCallGetOrCreateSubscriptions2() throws 
Exception {
 final String randomStr = UUID.randomUUID().toString().replaceAll("-", 
"");
 final String topicName = "persistent://my-property/my-ns/tp1_" + 
randomStr + "-partition-0-abc";
 Producer producer = pulsarClient.newProducer(Schema.STRING)
@@ -82,13 +83,14 @@ public class 
TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerCo
 
 // Do test.
 admin.topics().getSubscriptions(topicName);
+admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
 
 // cleanup.
 producer.close();
 }
 
 @Test
-public void testInfiniteHttpCallGetSubscriptions3() throws Exception {
+public void testInfiniteHttpCallGetOrCreateSubscriptions3() throws 
Exception {
 final String randomStr = UUID.randomUUID().toString().replaceAll("-", 
"");
 final String topicName = "persistent://my-property/my-ns/tp1_" + 
randomStr + "-partition-0";
 Producer producer = pulsarClient.newProducer(Schema.STRING)
@@ -97,6 +99,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest 
extends ProducerCo
 
 // Do test.
 admin.topics().getSubscriptions(topicName);
+

Re: [PR] [fix][broker][branch-2.11] Fast fix infinite HTTP call createSubscriptions caused by wrong topicName [pulsar]

2024-03-27 Thread via GitHub


lhotari merged PR #22346:
URL: https://github.com/apache/pulsar/pull/22346


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [fix][broker] Avoid expired unclosed ledgers when checking expired messages by ledger closure time (#22335)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new f77fe5f099f [fix][broker] Avoid expired unclosed ledgers when checking 
expired messages by ledger closure time (#22335)
f77fe5f099f is described below

commit f77fe5f099f7ecc334509db07bba477c4226cf19
Author: Cong Zhao 
AuthorDate: Thu Mar 28 03:42:15 2024 +0800

[fix][broker] Avoid expired unclosed ledgers when checking expired messages 
by ledger closure time (#22335)
---
 .../persistent/PersistentMessageExpiryMonitor.java |  4 +-
 .../service/PersistentMessageFinderTest.java   | 51 +++---
 2 files changed, 47 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index ac391c10503..2478a7a2538 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -121,8 +121,8 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
 managedLedger.getLedgersInfo().lastKey(), true);
 MLDataFormats.ManagedLedgerInfo.LedgerInfo info = null;
 for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : 
ledgerInfoSortedMap.values()) {
-if (!ledgerInfo.hasTimestamp() || 
!MessageImpl.isEntryExpired(messageTTLInSeconds,
-ledgerInfo.getTimestamp())) {
+if (!ledgerInfo.hasTimestamp() || ledgerInfo.getTimestamp() == 
0L
+|| !MessageImpl.isEntryExpired(messageTTLInSeconds, 
ledgerInfo.getTimestamp())) {
 break;
 }
 info = ledgerInfo;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index ace552a55a7..6883c0467e4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -33,10 +33,8 @@ import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
-
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -46,7 +44,9 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
+import java.util.concurrent.atomic.AtomicReference;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -59,6 +59,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import 
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -72,11 +73,10 @@ import 
org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
 import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.core.MediaType;
-
 @Test(groups = "broker")
 public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
 
@@ -463,6 +463,45 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
 assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
 }
 
+@Test
+public void testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger() 
throws Throwable {
+final String ledgerAndCursorName = 
"testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger";
+int maxTTLSeconds = 1;
+ManagedLedgerConfig config = new ManagedLedgerConfig();
+

Re: [PR] [fix][broker] Avoid expired unclosed ledgers when checking expired messages by ledger closure time [pulsar]

2024-03-27 Thread via GitHub


lhotari merged PR #22335:
URL: https://github.com/apache/pulsar/pull/22335


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][client] the nullValue in msgMetadata should be true by default [pulsar]

2024-03-27 Thread via GitHub


lhotari commented on code in PR #22372:
URL: https://github.com/apache/pulsar/pull/22372#discussion_r1541925716


##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java:
##
@@ -141,9 +142,9 @@ public TypedMessageBuilder orderingKey(byte[] 
orderingKey) {
 @Override
 public TypedMessageBuilder value(T value) {
 if (value == null) {
-msgMetadata.setNullValue(true);

Review Comment:
   this shouldn't be removed since `.value` might be called multiple times.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [improve][misc] Upgrade to Netty 4.1.108 and tcnative 2.0.65 (#22369)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new be0a9d9d9bb [improve][misc] Upgrade to Netty 4.1.108 and tcnative 
2.0.65 (#22369)
be0a9d9d9bb is described below

commit be0a9d9d9bb23dabc065f091b853f27c0ebcaa16
Author: Lari Hotari 
AuthorDate: Wed Mar 27 12:34:14 2024 -0700

[improve][misc] Upgrade to Netty 4.1.108 and tcnative 2.0.65 (#22369)
---
 distribution/server/src/assemble/LICENSE.bin.txt | 56 
 distribution/shell/src/assemble/LICENSE.bin.txt  | 54 +++
 pom.xml  |  2 +-
 3 files changed, 56 insertions(+), 56 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index cb99d62edfe..cab23db279a 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -292,34 +292,34 @@ The Apache Software License, Version 2.0
 - org.apache.commons-commons-lang3-3.11.jar
 - org.apache.commons-commons-text-1.10.0.jar
  * Netty
-- io.netty-netty-buffer-4.1.105.Final.jar
-- io.netty-netty-codec-4.1.105.Final.jar
-- io.netty-netty-codec-dns-4.1.105.Final.jar
-- io.netty-netty-codec-http-4.1.105.Final.jar
-- io.netty-netty-codec-http2-4.1.105.Final.jar
-- io.netty-netty-codec-socks-4.1.105.Final.jar
-- io.netty-netty-codec-haproxy-4.1.105.Final.jar
-- io.netty-netty-common-4.1.105.Final.jar
-- io.netty-netty-handler-4.1.105.Final.jar
-- io.netty-netty-handler-proxy-4.1.105.Final.jar
-- io.netty-netty-resolver-4.1.105.Final.jar
-- io.netty-netty-resolver-dns-4.1.105.Final.jar
-- io.netty-netty-resolver-dns-classes-macos-4.1.105.Final.jar
-- io.netty-netty-resolver-dns-native-macos-4.1.105.Final-osx-aarch_64.jar
-- io.netty-netty-resolver-dns-native-macos-4.1.105.Final-osx-x86_64.jar
-- io.netty-netty-transport-4.1.105.Final.jar
-- io.netty-netty-transport-classes-epoll-4.1.105.Final.jar
-- io.netty-netty-transport-native-epoll-4.1.105.Final-linux-aarch_64.jar
-- io.netty-netty-transport-native-epoll-4.1.105.Final-linux-x86_64.jar
-- io.netty-netty-transport-native-unix-common-4.1.105.Final.jar
-- 
io.netty-netty-transport-native-unix-common-4.1.105.Final-linux-x86_64.jar
-- io.netty-netty-tcnative-boringssl-static-2.0.61.Final.jar
-- io.netty-netty-tcnative-boringssl-static-2.0.61.Final-linux-aarch_64.jar
-- io.netty-netty-tcnative-boringssl-static-2.0.61.Final-linux-x86_64.jar
-- io.netty-netty-tcnative-boringssl-static-2.0.61.Final-osx-aarch_64.jar
-- io.netty-netty-tcnative-boringssl-static-2.0.61.Final-osx-x86_64.jar
-- io.netty-netty-tcnative-boringssl-static-2.0.61.Final-windows-x86_64.jar
-- io.netty-netty-tcnative-classes-2.0.61.Final.jar
+- io.netty-netty-buffer-4.1.108.Final.jar
+- io.netty-netty-codec-4.1.108.Final.jar
+- io.netty-netty-codec-dns-4.1.108.Final.jar
+- io.netty-netty-codec-http-4.1.108.Final.jar
+- io.netty-netty-codec-http2-4.1.108.Final.jar
+- io.netty-netty-codec-socks-4.1.108.Final.jar
+- io.netty-netty-codec-haproxy-4.1.108.Final.jar
+- io.netty-netty-common-4.1.108.Final.jar
+- io.netty-netty-handler-4.1.108.Final.jar
+- io.netty-netty-handler-proxy-4.1.108.Final.jar
+- io.netty-netty-resolver-4.1.108.Final.jar
+- io.netty-netty-resolver-dns-4.1.108.Final.jar
+- io.netty-netty-resolver-dns-classes-macos-4.1.108.Final.jar
+- io.netty-netty-resolver-dns-native-macos-4.1.108.Final-osx-aarch_64.jar
+- io.netty-netty-resolver-dns-native-macos-4.1.108.Final-osx-x86_64.jar
+- io.netty-netty-transport-4.1.108.Final.jar
+- io.netty-netty-transport-classes-epoll-4.1.108.Final.jar
+- io.netty-netty-transport-native-epoll-4.1.108.Final-linux-aarch_64.jar
+- io.netty-netty-transport-native-epoll-4.1.108.Final-linux-x86_64.jar
+- io.netty-netty-transport-native-unix-common-4.1.108.Final.jar
+- 
io.netty-netty-transport-native-unix-common-4.1.108.Final-linux-x86_64.jar
+- io.netty-netty-tcnative-boringssl-static-2.0.65.Final.jar
+- io.netty-netty-tcnative-boringssl-static-2.0.65.Final-linux-aarch_64.jar
+- io.netty-netty-tcnative-boringssl-static-2.0.65.Final-linux-x86_64.jar
+- io.netty-netty-tcnative-boringssl-static-2.0.65.Final-osx-aarch_64.jar
+- io.netty-netty-tcnative-boringssl-static-2.0.65.Final-osx-x86_64.jar
+- io.netty-netty-tcnative-boringssl-static-2.0.65.Final-windows-x86_64.jar
+- io.netty-netty-tcnative-classes-2.0.65.Final.jar
 - 
io.netty.incubator-netty-incubator-transport-classes-io_uring-0.0.24.Final.jar
 - 
io.netty.incubator-netty-incubator-transport-native-io_uring-0.0.24.Final-linux-x86_64.jar
 - 

Re: [PR] [improve][misc] Upgrade to Netty 4.1.108 and tcnative 2.0.65 [pulsar]

2024-03-27 Thread via GitHub


lhotari merged PR #22369:
URL: https://github.com/apache/pulsar/pull/22369


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [fix][client] the nullValue in msgMetadata should be true by default [pulsar]

2024-03-27 Thread via GitHub


liangyepianzhou opened a new pull request, #22372:
URL: https://github.com/apache/pulsar/pull/22372

   ### Motivation
   When a message is not set value, the `nullValue` message metadata should be 
true and change to false after the value is set. Otherwise, the message data 
will be set as a [] when the value is not set, that would cause the message 
data to be encoded and throw a `SchemaSerializationException` when calling 
`reconsumerLater`.
   ```
   
   org.apache.pulsar.client.api.PulsarClientException: 
java.util.concurrent.ExecutionException: 
org.apache.pulsar.client.api.SchemaSerializationException: Size of data 
received by IntSchema is not 4
   
at 
org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1131)
at 
org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:467)
at 
org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:452)
at 
org.apache.pulsar.client.api.ConsumerRedeliveryTest.testRedeliverMessagesWithoutValue(ConsumerRedeliveryTest.java:445)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at 
org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
at 
org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:677)
at 
org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:221)
at 
org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50)
at 
org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:969)
at 
org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:194)
at 
org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:148)
at 
org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.testng.TestRunner.privateRun(TestRunner.java:829)
at org.testng.TestRunner.run(TestRunner.java:602)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:437)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:431)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:391)
at org.testng.SuiteRunner.run(SuiteRunner.java:330)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:95)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1256)
at org.testng.TestNG.runSuitesLocally(TestNG.java:1176)
at org.testng.TestNG.runSuites(TestNG.java:1099)
at org.testng.TestNG.run(TestNG.java:1067)
at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:65)
at 
com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:105)
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.pulsar.client.api.SchemaSerializationException: Size of data 
received by IntSchema is not 4
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at 
org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:462)
... 29 more
   Caused by: org.apache.pulsar.client.api.SchemaSerializationException: Size 
of data received by IntSchema is not 4
at 
org.apache.pulsar.client.impl.schema.IntSchema.validate(IntSchema.java:49)
at 
org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:80)
at 
org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:32)
at 
org.apache.pulsar.client.impl.TypedMessageBuilderImpl.lambda$value$3(TypedMessageBuilderImpl.java:157)
at java.base/java.util.Optional.orElseGet(Optional.java:364)
at 
org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:156)
at 
org.apache.pulsar.client.impl.ConsumerImpl.doReconsumeLater(ConsumerImpl.java:689)
at 
org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.doReconsumeLater(MultiTopicsConsumerImpl.java:550)
at 
org.apache.pulsar.client.impl.ConsumerBase.reconsumeLaterAsync(ConsumerBase.java:574)
   ```
   ### Modifications
   When a message is not set value, the `nullValue` message metadata should be 
true and change to false after the value is set. 
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI 

Re: [I] Build with Bazel fails on MacOS as of 0.1.1. [pulsar-client-go]

2024-03-27 Thread via GitHub


merlimat closed issue #302: Build with Bazel fails on MacOS as of 0.1.1.
URL: https://github.com/apache/pulsar-client-go/issues/302


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Enable accessing Message in TableView [pulsar-client-go]

2024-03-27 Thread via GitHub


merlimat commented on code in PR #1174:
URL: https://github.com/apache/pulsar-client-go/pull/1174#discussion_r1541581408


##
pulsar/table_view_impl.go:
##
@@ -186,10 +203,23 @@ func (tv *TableViewImpl) Get(key string) interface{} {
 func (tv *TableViewImpl) Entries() map[string]interface{} {
tv.dataMu.Lock()
defer tv.dataMu.Unlock()
+
data := make(map[string]interface{}, len(tv.data))
-   for k, v := range tv.data {
+   for k, msg := range tv.data {
+   v, err := tv.schemaValueFromMessage(msg)
+   if err != nil {
+   tv.logger.Errorf("getting value for message, %w; msg is 
%v", len(tv.listeners), err, msg)
+   continue
+   }
data[k] = v
}
+
+   return data
+}
+
+func (tv *TableViewImpl) Messages() map[string]Message {
+   tv.dataMu.Lock()
+   defer tv.dataMu.Unlock()

Review Comment:
   This is returning the same underlying map, allowing for race-conditions and 
modifications from the application. 
   
   Note: the existing code is also wrong because it was returning `tv.data` 
instead of `data`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [cleanup][ml] ManagedCursor clean up. [pulsar]

2024-03-27 Thread via GitHub


dao-jun commented on PR #22246:
URL: https://github.com/apache/pulsar/pull/22246#issuecomment-2023414058

   @lhotari I've replied your comment, 
https://github.com/apache/pulsar/commit/532b0d9063474bd1c7ae8ac7cf5bd2d56b002164#r140303950


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Incorrect usage of context.Context in the producer Send and SendAsync methods [pulsar-client-go]

2024-03-27 Thread via GitHub


merlimat commented on issue #293:
URL: 
https://github.com/apache/pulsar-client-go/issues/293#issuecomment-2023398868

   Closing issue as this was fixed long time ago.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Incorrect usage of context.Context in the producer Send and SendAsync methods [pulsar-client-go]

2024-03-27 Thread via GitHub


merlimat closed issue #293: Incorrect usage of context.Context in the producer 
Send and SendAsync methods
URL: https://github.com/apache/pulsar-client-go/issues/293


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] ReaderOptions.MessageChannel is ignored [pulsar-client-go]

2024-03-27 Thread via GitHub


merlimat closed issue #211: ReaderOptions.MessageChannel is ignored
URL: https://github.com/apache/pulsar-client-go/issues/211


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] SDK crashes on ARM [pulsar-client-go]

2024-03-27 Thread via GitHub


merlimat closed issue #250: SDK crashes on ARM
URL: https://github.com/apache/pulsar-client-go/issues/250


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] ReaderOptions.MessageChannel is ignored [pulsar-client-go]

2024-03-27 Thread via GitHub


merlimat commented on issue #211:
URL: 
https://github.com/apache/pulsar-client-go/issues/211#issuecomment-2023395939

   Closing issue as this was fixed long time ago.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] SDK crashes on ARM [pulsar-client-go]

2024-03-27 Thread via GitHub


merlimat commented on issue #250:
URL: 
https://github.com/apache/pulsar-client-go/issues/250#issuecomment-2023393701

   Closing issue as this was fixed long time ago.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [cleanup][ml] ManagedCursor clean up. [pulsar]

2024-03-27 Thread via GitHub


lhotari commented on PR #22246:
URL: https://github.com/apache/pulsar/pull/22246#issuecomment-2023375730

   > > It seems that this was merged without more reviews
   > 
   > Yes, my bad
   
   @dao-jun I'd recommend reverting the change that I have pinpointed in the 
comment 
https://github.com/apache/pulsar/commit/532b0d9063474bd1c7ae8ac7cf5bd2d56b002164#r140287388
 , assuming that it creates new PositionImpl instances which were avoided 
before this change was made. /cc @codelipenghui 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [cleanup][ml] ManagedCursor clean up. [pulsar]

2024-03-27 Thread via GitHub


dao-jun commented on PR #22246:
URL: https://github.com/apache/pulsar/pull/22246#issuecomment-2023350157

   @lhotari 
   > It seems that this was merged without more reviews
   
   Yes, my bad


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Enable Pulsar authentication for Client authentication using private_key_jwt method [pulsar]

2024-03-27 Thread via GitHub


lhotari commented on issue #22371:
URL: https://github.com/apache/pulsar/issues/22371#issuecomment-2023341990

   How is this request different from the current JWT token support in Pulsar?
   https://pulsar.apache.org/docs/3.2.x/security-jwt/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription [pulsar]

2024-03-27 Thread via GitHub


lhotari commented on PR #22359:
URL: https://github.com/apache/pulsar/pull/22359#issuecomment-2023306788

   > I don't agree with you for this patch. This isn't a problem, but from the 
Java interface definition, it's better to do like this.
   
   @Technoboy- what is it that you don't agree? I agree that it is "better". 
When something is "better", this implies that it produces some benefit. In this 
case, what is it that it makes better? The question is then whether this 
benefit is relevant when we are considering how Pulsar is currently maintained. 
I've provided my opinion about formatting changes in this mailing list post: 
https://lists.apache.org/thread/lo15cdzsl740dwgcqwpsl9oy9qb13onv . After we 
settle on a maintenance strategy that reduces merge conflicts for maintaining 
the LTS branch, we have more freedom to do refactorings which we are currently 
avoiding because of the merge conflicts. Merge conflicts caused by unnecessary 
changes are the main reason for my resistance. That could be resolved with an 
improved maintenance strategy.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [D]  Topic data between clusters is out of sync in puslar GEO mode [pulsar]

2024-03-27 Thread via GitHub


GitHub user david-streamlio added a comment to the discussion:   Topic data 
between clusters is out of sync in puslar GEO mode

You should use the `--clusters` switch when you 
[create](https://pulsar.apache.org/reference/#/3.2.x/pulsar-admin/namespaces?id=create)
 the namespace. 

GitHub link: 
https://github.com/apache/pulsar/discussions/22341#discussioncomment-8931135


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [PR] [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription [pulsar]

2024-03-27 Thread via GitHub


Technoboy- closed pull request #22359: [fix][broker] Fix PersistentSubscription 
duplicate implementation interface Subscription
URL: https://github.com/apache/pulsar/pull/22359


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription [pulsar]

2024-03-27 Thread via GitHub


Technoboy- commented on PR #22359:
URL: https://github.com/apache/pulsar/pull/22359#issuecomment-2023215652

   > This isn't a real problem at all. In the Pulsar code base, we currently 
try to minimize unnecessary changes. This might change after we move to a 
different type of maintenance strategy, the dev mailing list discussion is 
https://lists.apache.org/thread/j6qvt45rndnvjypcmqxsfmddqt41bxjv . Refactorings 
aren't currently mentioned there explicitly.
   
   I don't agree with you for this patch. This isn't a problem, but from the 
Java interface definition, it's better to do like this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] Enable Pulsar authentication for Client authentication using private_key_jwt method [pulsar]

2024-03-27 Thread via GitHub


WZHMIJJ opened a new issue, #22371:
URL: https://github.com/apache/pulsar/issues/22371

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Motivation
   
   The motivation for this enhancement stems from the need to bolster security 
and efficiency in Pulsar’s client authentication. The private_key_jwt method 
offers a secure means for client authentication, utilizing a private key and 
JSON Web Token (JWT). By implementing this method, Pulsar can enhance its 
security framework.
   
   ### Solution
   
   The proposed solution involves enabling Pulsar authentication for Client 
authentication using the private_key_jwt method. 
   
   Related materials:
   
   For further details on the private_key_jwt method, refer to the 
specification outlined in 
https://kb.authlete.com/en/s/oauth-and-openid-connect/a/client-auth-private-key-jwt
   
   ### Alternatives
   
   While evaluating alternatives, the current OAuth2 flow with client 
credentials (client_id and client_secret) was noted. This in our case is not an 
option, since we use the flow with client_assertion_type and client_assertion. 
   
   Implementing the private_key_jwt method offers a more secure and efficient 
alternative, reducing dependency on client_secret and providing enhanced 
security through private key and JWT-based authentication.
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription [pulsar]

2024-03-27 Thread via GitHub


Technoboy- commented on PR #22359:
URL: https://github.com/apache/pulsar/pull/22359#issuecomment-2023217622

   > Just to say that it's better to not create a lot of similar PRs since that 
would be a lot of changes which don't provide real value. We have a lot of 
technical debt with more important, but low value areas such as compiler 
warnings. There's a lot of compiler warnings when compiling Pulsar. We have 
chosen to ignore them for now. I hope this could change after we have a better 
maintenance strategy in place.
   
   we can close other patch that not for master branch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [fix][misc] Make ConcurrentBitSet thread safe (#22361)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new edd0076bd83 [fix][misc] Make ConcurrentBitSet thread safe (#22361)
edd0076bd83 is described below

commit edd0076bd83f01a5fcbe81c8396667014f0fc36e
Author: Lari Hotari 
AuthorDate: Wed Mar 27 09:16:22 2024 -0700

[fix][misc] Make ConcurrentBitSet thread safe (#22361)
---
 .../common/util/collections/ConcurrentBitSet.java  | 363 +++--
 1 file changed, 331 insertions(+), 32 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
index 23842fe5b55..a37628cb300 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
@@ -20,12 +20,13 @@ package org.apache.pulsar.common.util.collections;
 
 import java.util.BitSet;
 import java.util.concurrent.locks.StampedLock;
-import lombok.EqualsAndHashCode;
+import java.util.stream.IntStream;
 
 /**
- * Safe multithreaded version of {@code BitSet}.
+ * A {@code BitSet} that is protected by a {@code StampedLock} to provide 
thread-safe access.
+ * The {@link #length()} method is not thread safe and is not overridden 
because StampedLock is not reentrant.
+ * Use the {@link #safeLength()} method to get the length of the bit set in a 
thread-safe manner.
  */
-@EqualsAndHashCode(callSuper = true)
 public class ConcurrentBitSet extends BitSet {
 
 private static final long serialVersionUID = 1L;
@@ -39,10 +40,8 @@ public class ConcurrentBitSet extends BitSet {
  * Creates a bit set whose initial size is large enough to explicitly 
represent bits with indices in the range
  * {@code 0} through {@code nbits-1}. All bits are initially {@code false}.
  *
- * @param nbits
- *the initial size of the bit set
- * @throws NegativeArraySizeException
- * if the specified initial size is negative
+ * @param nbits the initial size of the bit set
+ * @throws NegativeArraySizeException if the specified initial size is 
negative
  */
 public ConcurrentBitSet(int nbits) {
 super(nbits);
@@ -65,105 +64,405 @@ public class ConcurrentBitSet extends BitSet {
 
 @Override
 public void set(int bitIndex) {
+long stamp = rwLock.writeLock();
+try {
+super.set(bitIndex);
+} finally {
+rwLock.unlockWrite(stamp);
+}
+}
+
+@Override
+public void clear(int bitIndex) {
+long stamp = rwLock.writeLock();
+try {
+super.clear(bitIndex);
+} finally {
+rwLock.unlockWrite(stamp);
+}
+}
+
+@Override
+public void set(int fromIndex, int toIndex) {
+long stamp = rwLock.writeLock();
+try {
+super.set(fromIndex, toIndex);
+} finally {
+rwLock.unlockWrite(stamp);
+}
+}
+
+@Override
+public void clear(int fromIndex, int toIndex) {
+long stamp = rwLock.writeLock();
+try {
+super.clear(fromIndex, toIndex);
+} finally {
+rwLock.unlockWrite(stamp);
+}
+}
+
+@Override
+public void clear() {
+long stamp = rwLock.writeLock();
+try {
+super.clear();
+} finally {
+rwLock.unlockWrite(stamp);
+}
+}
+
+@Override
+public int nextSetBit(int fromIndex) {
 long stamp = rwLock.tryOptimisticRead();
-super.set(bitIndex);
+int nextSetBit = super.nextSetBit(fromIndex);
 if (!rwLock.validate(stamp)) {
+// Fallback to read lock
 stamp = rwLock.readLock();
 try {
-super.set(bitIndex);
+nextSetBit = super.nextSetBit(fromIndex);
 } finally {
 rwLock.unlockRead(stamp);
 }
 }
+return nextSetBit;
 }
 
 @Override
-public void set(int fromIndex, int toIndex) {
+public int nextClearBit(int fromIndex) {
 long stamp = rwLock.tryOptimisticRead();
-super.set(fromIndex, toIndex);
+int nextClearBit = super.nextClearBit(fromIndex);
 if (!rwLock.validate(stamp)) {
+// Fallback to read lock
 stamp = rwLock.readLock();
 try {
-super.set(fromIndex, toIndex);
+nextClearBit = super.nextClearBit(fromIndex);
 } finally {
 rwLock.unlockRead(stamp);
 }
 }
+return nextClearBit;
 }
 
 @Override
-public int nextSetBit(int fromIndex) {
+public int 

Re: [I] [Bug] ConcurrentBitSet is not thread safe [pulsar]

2024-03-27 Thread via GitHub


lhotari closed issue #22360: [Bug] ConcurrentBitSet is not thread safe
URL: https://github.com/apache/pulsar/issues/22360


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][misc] Make ConcurrentBitSet thread safe [pulsar]

2024-03-27 Thread via GitHub


lhotari merged PR #22361:
URL: https://github.com/apache/pulsar/pull/22361


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [improve][broker] Optimize web interface deleteDynamicConfiguration return error message (#22356)

2024-03-27 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new e4553391f96 [improve][broker] Optimize web interface 
deleteDynamicConfiguration return error message (#22356)
e4553391f96 is described below

commit e4553391f96af3bda3d8252b97cac3de1f39a1b5
Author: hanmz 
AuthorDate: Thu Mar 28 00:07:54 2024 +0800

[improve][broker] Optimize web interface deleteDynamicConfiguration return 
error message (#22356)
---
 .../src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 61b354610ac..83067e9f296 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -526,7 +526,7 @@ public class BrokersBase extends AdminResource {
 
 private CompletableFuture 
internalDeleteDynamicConfigurationOnMetadataAsync(String configName) {
 if (!pulsar().getBrokerService().isDynamicConfiguration(configName)) {
-throw new RestException(Status.PRECONDITION_FAILED, " Can't update 
non-dynamic configuration");
+throw new RestException(Status.PRECONDITION_FAILED, "Can't delete 
non-dynamic configuration");
 } else {
 return 
dynamicConfigurationResources().setDynamicConfigurationAsync(old -> {
 if (old != null) {



Re: [PR] [improve][broker] Optimize web interface deleteDynamicConfiguration return error message [pulsar]

2024-03-27 Thread via GitHub


Technoboy- merged PR #22356:
URL: https://github.com/apache/pulsar/pull/22356


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Flaky-test: ExtensibleLoadManagerImplTest.initializeState [pulsar]

2024-03-27 Thread via GitHub


heesung-sn commented on issue #20157:
URL: https://github.com/apache/pulsar/issues/20157#issuecomment-2023122179

   hi,  let me check this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] There is a memory leak,I need help [pulsar-client-python]

2024-03-27 Thread via GitHub


merlimat commented on issue #208:
URL: 
https://github.com/apache/pulsar-client-python/issues/208#issuecomment-2023069648

   Can you try with latest version 3.4.0? 
https://pypi.org/project/pulsar-client/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] [broker] Servlet support response compression [pulsar]

2024-03-27 Thread via GitHub


lhotari commented on PR #21667:
URL: https://github.com/apache/pulsar/pull/21667#issuecomment-2023012333

   Early WIP PR for improvement: #22370


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] [broker] Servlet support response compression [pulsar]

2024-03-27 Thread via GitHub


lhotari commented on PR #21667:
URL: https://github.com/apache/pulsar/pull/21667#issuecomment-2022902169

   Btw. It looks like this change could be improved. Instead of adding multiple 
GzipHandler instances, it would be better to add a single instance and provide 
a configuration option for configuring excluded paths, just in case the 
compression causes problems for some cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] [broker] Servlet support response compression [pulsar]

2024-03-27 Thread via GitHub


lhotari commented on PR #21667:
URL: https://github.com/apache/pulsar/pull/21667#issuecomment-2022876842

   Cherry-picking to branch-3.0 and branch-3.2 since the lack of compression is 
causing issues when metrics are enabled and there are a large amount of active 
topics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] HTTP 504 Gateway timeout [pulsar]

2024-03-27 Thread via GitHub


lhotari commented on issue #22324:
URL: https://github.com/apache/pulsar/issues/22324#issuecomment-2022865755

   Closing this issue since there's no way to reproduce this issue. The issue 
has been reported on an unsupported version.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] HTTP 504 Gateway timeout [pulsar]

2024-03-27 Thread via GitHub


lhotari closed issue #22324: HTTP 504 Gateway timeout
URL: https://github.com/apache/pulsar/issues/22324


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Cleanup] PersistentSubscription duplicate implementation interface Subscription [pulsar]

2024-03-27 Thread via GitHub


lhotari commented on issue #22354:
URL: https://github.com/apache/pulsar/issues/22354#issuecomment-2022863264

   ... and this is not a "Code design issue"


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug] PersistentSubscription duplicate implementation interface Subscription [pulsar]

2024-03-27 Thread via GitHub


lhotari commented on issue #22354:
URL: https://github.com/apache/pulsar/issues/22354#issuecomment-2022861168

   This is not a bug. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] No data shown in Grafana Dashboards [pulsar-helm-chart]

2024-03-27 Thread via GitHub


lhotari commented on issue #468:
URL: 
https://github.com/apache/pulsar-helm-chart/issues/468#issuecomment-2022813523

   for the broker `authenticateMetricsEndpoint` defaults to false, so it might 
be something else.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] No data shown in Grafana Dashboards [pulsar-helm-chart]

2024-03-27 Thread via GitHub


lhotari commented on issue #468:
URL: 
https://github.com/apache/pulsar-helm-chart/issues/468#issuecomment-2022810871

   This might be caused by the configured authentication. I guess metrics 
requires a token currently.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.2 updated: [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (#22252)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 21e56955d5e [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest 
(#22252)
21e56955d5e is described below

commit 21e56955d5e9e804b88fbc0d599b6fb172c4f5ff
Author: Lari Hotari 
AuthorDate: Tue Mar 12 22:26:34 2024 +0200

[fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (#22252)

(cherry picked from commit 43f9d2abb9d5cd788fe18da6af7ad6fbfb3bc07b)
---
 .../RGUsageMTAggrWaitForAllMsgsTest.java   | 30 +++---
 1 file changed, 15 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
index 9bf7e3c5325..54c23cacc0d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
@@ -20,6 +20,10 @@ package org.apache.pulsar.broker.resourcegroup;
 
 import com.google.common.collect.Sets;
 import io.prometheus.client.Summary;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
 import 
org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
@@ -45,11 +49,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 
 // The tests implement a set of producer/consumer operations on a set of 
topics.
 // [A thread is started for each producer, and each consumer in the test.]
@@ -57,6 +56,7 @@ import java.util.concurrent.TimeUnit;
 // After sending/receiving all the messages, traffic usage statistics, and 
Prometheus-metrics
 // are verified on the RGs.
 @Slf4j
+@Test(groups = "flaky")
 public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
 @BeforeClass
 @Override
@@ -119,9 +119,9 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
 private final int numMesgsToProduce;
 private final String myProduceTopic;
 
-private int sentNumBytes = 0;
-private int sentNumMsgs = 0;
-private int numExceptions = 0;
+private volatile int sentNumBytes = 0;
+private volatile int sentNumMsgs = 0;
+private volatile int numExceptions = 0;
 
 ProduceMessages(int prodId, int nMesgs, String[] topics) {
 producerId = prodId;
@@ -202,9 +202,9 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
 
 private final int recvTimeoutMilliSecs = 1000;
 private final int ackTimeoutMilliSecs = 1100; // has to be more than 1 
second
-private int recvdNumBytes = 0;
-private int recvdNumMsgs = 0;
-private int numExceptions = 0;
+private volatile int recvdNumBytes = 0;
+private volatile int recvdNumMsgs = 0;
+private volatile int numExceptions = 0;
 private volatile boolean allMessagesReceived = false;
 private volatile boolean consumerIsReady = false;
 
@@ -494,15 +494,15 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
 while (numConsumersDone < NUM_CONSUMERS) {
 for (int ix = 0; ix < NUM_CONSUMERS; ix++) {
 if (!joinedConsumers[ix]) {
+consThr[ix].thread.join();
+joinedConsumers[ix] = true;
+log.debug("Joined consumer={}", ix);
+
 recvdBytes = consThr[ix].consumer.getNumBytesRecvd();
 recvdMsgs = consThr[ix].consumer.getNumMessagesRecvd();
 numConsumerExceptions += 
consThr[ix].consumer.getNumExceptions();
 log.debug("Consumer={} received {} mesgs and {} bytes", 
ix, recvdMsgs, recvdBytes);
 
-consThr[ix].thread.join();
-joinedConsumers[ix] = true;
-log.debug("Joined consumer={}", ix);
-
 recvdNumBytes += recvdBytes;
 recvdNumMsgs += recvdMsgs;
 numConsumersDone++;



(pulsar) branch branch-3.0 updated: [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (#22252)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 6e0ebcbaad6 [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest 
(#22252)
6e0ebcbaad6 is described below

commit 6e0ebcbaad69c3643b7fa10d3adf7b59a7f268dd
Author: Lari Hotari 
AuthorDate: Tue Mar 12 22:26:34 2024 +0200

[fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (#22252)

(cherry picked from commit 43f9d2abb9d5cd788fe18da6af7ad6fbfb3bc07b)
---
 .../RGUsageMTAggrWaitForAllMsgsTest.java   | 30 +++---
 1 file changed, 15 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
index 9bf7e3c5325..54c23cacc0d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
@@ -20,6 +20,10 @@ package org.apache.pulsar.broker.resourcegroup;
 
 import com.google.common.collect.Sets;
 import io.prometheus.client.Summary;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
 import 
org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
@@ -45,11 +49,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 
 // The tests implement a set of producer/consumer operations on a set of 
topics.
 // [A thread is started for each producer, and each consumer in the test.]
@@ -57,6 +56,7 @@ import java.util.concurrent.TimeUnit;
 // After sending/receiving all the messages, traffic usage statistics, and 
Prometheus-metrics
 // are verified on the RGs.
 @Slf4j
+@Test(groups = "flaky")
 public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
 @BeforeClass
 @Override
@@ -119,9 +119,9 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
 private final int numMesgsToProduce;
 private final String myProduceTopic;
 
-private int sentNumBytes = 0;
-private int sentNumMsgs = 0;
-private int numExceptions = 0;
+private volatile int sentNumBytes = 0;
+private volatile int sentNumMsgs = 0;
+private volatile int numExceptions = 0;
 
 ProduceMessages(int prodId, int nMesgs, String[] topics) {
 producerId = prodId;
@@ -202,9 +202,9 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
 
 private final int recvTimeoutMilliSecs = 1000;
 private final int ackTimeoutMilliSecs = 1100; // has to be more than 1 
second
-private int recvdNumBytes = 0;
-private int recvdNumMsgs = 0;
-private int numExceptions = 0;
+private volatile int recvdNumBytes = 0;
+private volatile int recvdNumMsgs = 0;
+private volatile int numExceptions = 0;
 private volatile boolean allMessagesReceived = false;
 private volatile boolean consumerIsReady = false;
 
@@ -494,15 +494,15 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
 while (numConsumersDone < NUM_CONSUMERS) {
 for (int ix = 0; ix < NUM_CONSUMERS; ix++) {
 if (!joinedConsumers[ix]) {
+consThr[ix].thread.join();
+joinedConsumers[ix] = true;
+log.debug("Joined consumer={}", ix);
+
 recvdBytes = consThr[ix].consumer.getNumBytesRecvd();
 recvdMsgs = consThr[ix].consumer.getNumMessagesRecvd();
 numConsumerExceptions += 
consThr[ix].consumer.getNumExceptions();
 log.debug("Consumer={} received {} mesgs and {} bytes", 
ix, recvdMsgs, recvdBytes);
 
-consThr[ix].thread.join();
-joinedConsumers[ix] = true;
-log.debug("Joined consumer={}", ix);
-
 recvdNumBytes += recvdBytes;
 recvdNumMsgs += recvdMsgs;
 numConsumersDone++;



(pulsar) 04/07: [fix] [test] Fix flaky test ManagedLedgerTest.testGetNumberOfEntriesInStorage (#22344)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 100a53b39582cb528d5dd5a99ffdfb4ae97683e9
Author: fengyubiao 
AuthorDate: Wed Mar 27 16:45:02 2024 +0800

[fix] [test] Fix flaky test 
ManagedLedgerTest.testGetNumberOfEntriesInStorage (#22344)

(cherry picked from commit fc066d727b52f7e412476297995c2eb2f5ab61bf)
---
 .../org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 34f65dfd00e..10bfb699780 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2636,10 +2636,10 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
 managedLedger.addEntry(("entry-" + i).getBytes(Encoding));
 }
 
-//trigger ledger rollover and wait for the new ledger created
-Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
-stateUpdater.setAccessible(true);
-stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
+// trigger ledger rollover and wait for the new ledger created
+Awaitility.await().untilAsserted(() -> {
+   assertEquals("LedgerOpened", 
WhiteboxImpl.getInternalState(managedLedger, "state").toString());
+});
 managedLedger.rollCurrentLedgerIfFull();
 Awaitility.await().untilAsserted(() -> {
 assertEquals(managedLedger.getLedgersInfo().size(), 3);



(pulsar) branch branch-3.0 updated (d4c05431def -> 9c50d1801a7)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from d4c05431def [improve][admin] Fix the `createMissingPartitions` doesn't 
response correctly (#22311)
 new bff6ea28131 [fix][client]Fixed getting an incorrect `maxMessageSize` 
value when accessing multiple clusters in the same process (#22306)
 new 22b724fd1c3 [improve][misc] Include native epoll library for Netty for 
arm64 (#22319)
 new 5e958280656 [fix][ml]Expose ledger timestamp  (#22338)
 new 100a53b3958 [fix] [test] Fix flaky test 
ManagedLedgerTest.testGetNumberOfEntriesInStorage (#22344)
 new fe09a69fbe5 [fix][test] Fix flaky 
ManagedLedgerErrorsTest.recoverAfterZnodeVersionError (#22368)
 new bab60b1e983 [fix][broker] Fix OpReadEntry.skipCondition NPE issue 
(#22367)
 new 9c50d1801a7 [fix][client] Consumer lost message ack due to race 
condition in acknowledge with batch message (#22353)

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 distribution/server/src/assemble/LICENSE.bin.txt   |  1 +
 distribution/shell/src/assemble/LICENSE.bin.txt|  1 +
 .../mledger/impl/ManagedLedgerFactoryImpl.java |  1 +
 .../bookkeeper/mledger/impl/OpReadEntry.java   |  2 +-
 .../mledger/impl/ManagedLedgerErrorsTest.java  |  1 -
 .../mledger/impl/ManagedLedgerFactoryTest.java |  5 ++
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  8 +--
 .../client/api/SimpleProducerConsumerTest.java |  6 +-
 .../client/impl/ProducerMemoryLimitTest.java   | 12 ++--
 .../pulsar/client/impl/ProducerSemaphoreTest.java  | 18 +++--
 .../client/impl/AbstractBatchMessageContainer.java |  9 ++-
 .../client/impl/BatchMessageContainerImpl.java | 10 +--
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  3 +-
 .../pulsar/client/impl/ConnectionHandler.java  |  7 ++
 .../apache/pulsar/client/impl/ConsumerImpl.java|  3 +-
 .../pulsar/client/impl/MessageIdAdvUtils.java  | 19 --
 .../PersistentAcknowledgmentsGroupingTracker.java  | 18 +++--
 .../apache/pulsar/client/impl/ProducerImpl.java| 32 +
 .../pulsar/client/impl/MessageIdAdvUtilsTest.java  | 76 ++
 pulsar-common/pom.xml  |  6 ++
 .../org/apache/pulsar/client/api/MessageIdAdv.java |  2 +
 pulsar-sql/presto-distribution/LICENSE |  1 +
 22 files changed, 186 insertions(+), 55 deletions(-)
 create mode 100644 
pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdAdvUtilsTest.java



(pulsar) 01/07: [fix][client]Fixed getting an incorrect `maxMessageSize` value when accessing multiple clusters in the same process (#22306)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bff6ea2813156255a06ede1f8a41cbb627486d06
Author: atomchen <492672...@qq.com>
AuthorDate: Thu Mar 21 17:30:40 2024 +0800

[fix][client]Fixed getting an incorrect `maxMessageSize` value when 
accessing multiple clusters in the same process (#22306)

Co-authored-by: atomchchen 
(cherry picked from commit 71598c1163730defb9fdea85e813fe863c3fe4d2)
---
 .../client/api/SimpleProducerConsumerTest.java |  6 ++--
 .../client/impl/ProducerMemoryLimitTest.java   | 12 
 .../pulsar/client/impl/ProducerSemaphoreTest.java  | 18 ++--
 .../client/impl/AbstractBatchMessageContainer.java |  9 --
 .../client/impl/BatchMessageContainerImpl.java | 10 +++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  3 +-
 .../pulsar/client/impl/ConnectionHandler.java  |  7 +
 .../apache/pulsar/client/impl/ConsumerImpl.java|  3 +-
 .../apache/pulsar/client/impl/ProducerImpl.java| 32 ++
 9 files changed, 60 insertions(+), 40 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 0c0e61fe33f..eb4a322f3c3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -93,13 +93,13 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
-import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConsumerBase;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.PartitionedProducerImpl;
+import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
@@ -3906,11 +3906,11 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
 .topic("persistent://my-property/my-ns/my-topic2");
 
 @Cleanup
-Producer producer = producerBuilder.create();
+ProducerImpl producer = 
(ProducerImpl)producerBuilder.create();
 List> futures = new ArrayList<>();
 
 // Asynchronously produce messages
-byte[] message = new byte[ClientCnx.getMaxMessageSize() + 1];
+byte[] message = new 
byte[producer.getConnectionHandler().getMaxMessageSize() + 1];
 for (int i = 0; i < maxPendingMessages + 10; i++) {
 Future future = producer.sendAsync(message);
 try {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index d776fdb0ed9..55a67ae644d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import io.netty.buffer.ByteBufAllocator;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
@@ -33,7 +34,6 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SizeUnit;
-import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -69,10 +69,12 @@ public class ProducerMemoryLimitTest extends 
ProducerConsumerBase {
 .create();
 this.stopBroker();
 try {
-try (MockedStatic mockedStatic = 
Mockito.mockStatic(ClientCnx.class)) {
-mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(8);
-producer.send("memory-test".getBytes(StandardCharsets.UTF_8));
-}
+ConnectionHandler connectionHandler = 
Mockito.spy(producer.getConnectionHandler());
+Field field = 
producer.getClass().getDeclaredField("connectionHandler");
+field.setAccessible(true);
+  

(pulsar) 03/07: [fix][ml]Expose ledger timestamp (#22338)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5e9582806565412d1eb5ad6ef783da9e3b9444bc
Author: 道君 
AuthorDate: Wed Mar 27 14:08:39 2024 +0800

[fix][ml]Expose ledger timestamp  (#22338)

(cherry picked from commit cd49defc1383175ef32e18c7f0905567f734318c)
---
 .../org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java | 1 +
 .../org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java | 5 +
 2 files changed, 6 insertions(+)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index ce76717942d..31896769e8e 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -736,6 +736,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
 ledgerInfo.ledgerId = pbLedgerInfo.getLedgerId();
 ledgerInfo.entries = pbLedgerInfo.hasEntries() ? 
pbLedgerInfo.getEntries() : null;
 ledgerInfo.size = pbLedgerInfo.hasSize() ? 
pbLedgerInfo.getSize() : null;
+ledgerInfo.timestamp = pbLedgerInfo.hasTimestamp() ? 
pbLedgerInfo.getTimestamp() : null;
 ledgerInfo.isOffloaded = pbLedgerInfo.hasOffloadContext();
 if (pbLedgerInfo.hasOffloadContext()) {
 MLDataFormats.OffloadContext offloadContext = 
pbLedgerInfo.getOffloadContext();
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
index 8695759c99f..4f2c3e17877 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -60,6 +61,10 @@ public class ManagedLedgerFactoryTest extends 
MockedBookKeeperTestCase {
 assertEquals(info.ledgers.get(2).ledgerId, 5);
 assertEquals(info.ledgers.get(3).ledgerId, 6);
 
+for (ManagedLedgerInfo.LedgerInfo linfo : info.ledgers) {
+assertNotNull(linfo.timestamp);
+}
+
 assertEquals(info.cursors.size(), 1);
 
 CursorInfo cursorInfo = info.cursors.get("c1");



(pulsar) 02/07: [improve][misc] Include native epoll library for Netty for arm64 (#22319)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 22b724fd1c3eac463834a58102d667617451d453
Author: Lari Hotari 
AuthorDate: Thu Mar 21 13:23:21 2024 -0700

[improve][misc] Include native epoll library for Netty for arm64 (#22319)

(cherry picked from commit 24e9437ce065613fd924a74f61b620d9fdc0058b)
---
 distribution/server/src/assemble/LICENSE.bin.txt | 1 +
 distribution/shell/src/assemble/LICENSE.bin.txt  | 1 +
 pulsar-common/pom.xml| 6 ++
 pulsar-sql/presto-distribution/LICENSE   | 1 +
 4 files changed, 9 insertions(+)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index 4fa4c9ea821..b3e3d096bda 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -306,6 +306,7 @@ The Apache Software License, Version 2.0
 - io.netty-netty-resolver-dns-native-macos-4.1.105.Final-osx-x86_64.jar
 - io.netty-netty-transport-4.1.105.Final.jar
 - io.netty-netty-transport-classes-epoll-4.1.105.Final.jar
+- io.netty-netty-transport-native-epoll-4.1.105.Final-linux-aarch_64.jar
 - io.netty-netty-transport-native-epoll-4.1.105.Final-linux-x86_64.jar
 - io.netty-netty-transport-native-unix-common-4.1.105.Final.jar
 - 
io.netty-netty-transport-native-unix-common-4.1.105.Final-linux-x86_64.jar
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt 
b/distribution/shell/src/assemble/LICENSE.bin.txt
index a8b27a1fbfd..4f804ffa7c4 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -361,6 +361,7 @@ The Apache Software License, Version 2.0
 - netty-resolver-dns-4.1.105.Final.jar
 - netty-transport-4.1.105.Final.jar
 - netty-transport-classes-epoll-4.1.105.Final.jar
+- netty-transport-native-epoll-4.1.105.Final-linux-aarch_64.jar
 - netty-transport-native-epoll-4.1.105.Final-linux-x86_64.jar
 - netty-transport-native-unix-common-4.1.105.Final.jar
 - netty-transport-native-unix-common-4.1.105.Final-linux-x86_64.jar
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index b3f4f5cc8ec..9bde13520e9 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -98,6 +98,12 @@
   linux-x86_64
 
 
+
+  io.netty
+  netty-transport-native-epoll
+  linux-aarch_64
+
+
 
   io.netty
   netty-transport-native-unix-common
diff --git a/pulsar-sql/presto-distribution/LICENSE 
b/pulsar-sql/presto-distribution/LICENSE
index 937225f2e15..42cb170a2d1 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -255,6 +255,7 @@ The Apache Software License, Version 2.0
 - netty-tcnative-classes-2.0.61.Final.jar
 - netty-transport-4.1.105.Final.jar
 - netty-transport-classes-epoll-4.1.105.Final.jar
+- netty-transport-native-epoll-4.1.105.Final-linux-aarch_64.jar
 - netty-transport-native-epoll-4.1.105.Final-linux-x86_64.jar
 - netty-transport-native-unix-common-4.1.105.Final.jar
 - netty-transport-native-unix-common-4.1.105.Final-linux-x86_64.jar



(pulsar) 07/07: [fix][client] Consumer lost message ack due to race condition in acknowledge with batch message (#22353)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9c50d1801a7c273bbf7416aec823ec7b2da59f50
Author: 萧易客 
AuthorDate: Wed Mar 27 20:12:39 2024 +0800

[fix][client] Consumer lost message ack due to race condition in 
acknowledge with batch message (#22353)

Co-authored-by: Yunze Xu 
Co-authored-by: 汪苏诚 
(cherry picked from commit 3fa2ae83312ead38a81fe82bc06c1784e6061d6f)
---
 .../pulsar/client/impl/MessageIdAdvUtils.java  | 19 --
 .../PersistentAcknowledgmentsGroupingTracker.java  | 18 +++--
 .../pulsar/client/impl/MessageIdAdvUtilsTest.java  | 76 ++
 .../org/apache/pulsar/client/api/MessageIdAdv.java |  2 +
 4 files changed, 106 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java
index a0d1446ba3d..f66bb642021 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java
@@ -40,6 +40,13 @@ public class MessageIdAdvUtils {
 && lhs.getBatchIndex() == rhs.getBatchIndex();
 }
 
+/**
+ * Acknowledge batch message.
+ *
+ * @param msgId the message id
+ * @param individual whether to acknowledge the batch message individually
+ * @return true if the batch message is fully acknowledged
+ */
 static boolean acknowledge(MessageIdAdv msgId, boolean individual) {
 if (!isBatch(msgId)) {
 return true;
@@ -51,12 +58,14 @@ public class MessageIdAdvUtils {
 return false;
 }
 int batchIndex = msgId.getBatchIndex();
-if (individual) {
-ackSet.clear(batchIndex);
-} else {
-ackSet.clear(0, batchIndex + 1);
+synchronized (ackSet) {
+if (individual) {
+ackSet.clear(batchIndex);
+} else {
+ackSet.clear(0, batchIndex + 1);
+}
+return ackSet.isEmpty();
 }
-return ackSet.isEmpty();
 }
 
 static boolean isBatch(MessageIdAdv msgId) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 0cf776aea59..c0ee13b346a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -324,8 +324,15 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
 MessageIdAdvUtils.discardBatch(msgId), __ -> {
 final BitSet ackSet = msgId.getAckSet();
 final ConcurrentBitSetRecyclable value;
-if (ackSet != null && !ackSet.isEmpty()) {
-value = ConcurrentBitSetRecyclable.create(ackSet);
+if (ackSet != null) {
+synchronized (ackSet) {
+if (!ackSet.isEmpty()) {
+value = 
ConcurrentBitSetRecyclable.create(ackSet);
+} else {
+value = ConcurrentBitSetRecyclable.create();
+value.set(0, msgId.getBatchSize());
+}
+}
 } else {
 value = ConcurrentBitSetRecyclable.create();
 value.set(0, msgId.getBatchSize());
@@ -374,8 +381,11 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
 .ConnectException("Consumer connect fail! consumer state:" 
+ consumer.getState()));
 }
 BitSetRecyclable bitSet;
-if (msgId.getAckSet() != null) {
-bitSet = BitSetRecyclable.valueOf(msgId.getAckSet().toLongArray());
+BitSet ackSetFromMsgId = msgId.getAckSet();
+if (ackSetFromMsgId != null) {
+synchronized (ackSetFromMsgId) {
+bitSet = 
BitSetRecyclable.valueOf(ackSetFromMsgId.toLongArray());
+}
 } else {
 bitSet = BitSetRecyclable.create();
 bitSet.set(0, batchSize);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdAdvUtilsTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdAdvUtilsTest.java
new file mode 100644
index 000..704dfc9cbd7
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdAdvUtilsTest.java
@@ -0,0 +1,76 @@
+/*
+ * 

(pulsar) 05/07: [fix][test] Fix flaky ManagedLedgerErrorsTest.recoverAfterZnodeVersionError (#22368)

2024-03-27 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fe09a69fbe596b91d92cf62960cc480fabaa828c
Author: Lari Hotari 
AuthorDate: Wed Mar 27 04:10:47 2024 -0700

[fix][test] Fix flaky ManagedLedgerErrorsTest.recoverAfterZnodeVersionError 
(#22368)

(cherry picked from commit c184209bfc5a61f143abfa701e5f1b0be2109d77)
---
 .../java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java | 1 -
 1 file changed, 1 deletion(-)

diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
index 512e90d17f5..7b2f8228ad7 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
@@ -381,7 +381,6 @@ public class ManagedLedgerErrorsTest extends 
MockedBookKeeperTestCase {
 ledger.addEntry("entry".getBytes());
 fail("should fail");
 } catch (ManagedLedgerFencedException e) {
-assertEquals(e.getCause().getClass(), 
ManagedLedgerException.BadVersionException.class);
 // ok
 }
 



  1   2   >