[GitHub] [pulsar] Technoboy- commented on a diff in pull request #20804: [improve][offload] Extend the offload police to allow specifying more conf

2023-07-13 Thread via GitHub


Technoboy- commented on code in PR #20804:
URL: https://github.com/apache/pulsar/pull/20804#discussion_r1263325619


##
pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java:
##
@@ -432,4 +433,16 @@ private byte[] loadClassData(String name) throws 
IOException {
 }
 }
 
+@Test
+public void testCreateOffloadPoliciesWithExtraConfiguration() {
+Properties properties = new Properties();
+properties.put("managedLedgerOffloadExtraConfigKey1", "value1");
+properties.put("managedLedgerOffloadExtraConfigKey2", "value2");
+OffloadPoliciesImpl policies = OffloadPoliciesImpl.create(properties);

Review Comment:
   ```
   Properties properties = new Properties();
   Properties extraProps = new Properties();
   extraProps.put("configKey1", "value1");
   extraProps.put("configKey2", "value2");
   properties.put("managedLedgerOffloadExtraConfig", extraProps);
   OffloadPoliciesImpl policies = OffloadPoliciesImpl.create(properties);
   ```
   Seems better then `prefix` keys



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] lifepuzzlefun commented on pull request #20800: [improve][broker] Add the MessageExpirer interface to make code clear

2023-07-13 Thread via GitHub


lifepuzzlefun commented on PR #20800:
URL: https://github.com/apache/pulsar/pull/20800#issuecomment-1635278642

   @BewareMyPower 你好,我来澄清一下 你说到的代码问题,这个pr 
https://github.com/apache/pulsar/pull/20597 hard to 
read的原因是因为这里有一个bug,这个patch之前的代码是不允许用户调用admin api来重置offset的,不是一个『the unrelated 
changes』而且这里的代码风格是 从first init comment 提交的时候带过来的。
   https://github.com/apache/pulsar/assets/13600283/54890ae6-2858-45ac-bab9-691ed8d700b7;>
   这个pr很棒,而且我觉得提高了代码的可读性,但是我觉得很容易对代码的reviewer产生对我本人后续patch的误会。maybe you can 
just tell people this is a refactoring pr but not with some insult words ok ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] poorbarcode closed pull request #20805: [improve] [pip] PIP-288: Add a internal API waitForAllTopicsCreated under persistent

2023-07-13 Thread via GitHub


poorbarcode closed pull request #20805: [improve] [pip] PIP-288: Add a internal 
API waitForAllTopicsCreated under persistent
URL: https://github.com/apache/pulsar/pull/20805


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[pulsar] branch master updated: [fix][doc]Correcting spelling mistakes in the pulsar-broker module (#20798)

2023-07-13 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 2152870702b [fix][doc]Correcting spelling mistakes in the 
pulsar-broker module (#20798)
2152870702b is described below

commit 2152870702bec5b91a3d7125aa539c4f8e71c943
Author: wangda <38549158+da...@users.noreply.github.com>
AuthorDate: Fri Jul 14 11:47:54 2023 +0800

[fix][doc]Correcting spelling mistakes in the pulsar-broker module (#20798)

Signed-off-by: zhangwd3 
---
 .../broker/authentication/AuthenticationProviderAthenzTest.java | 2 +-
 .../broker/loadbalance/extensions/scheduler/TransferShedder.java| 2 +-
 .../java/org/apache/pulsar/broker/service/plugin/EntryFilter.java   | 2 +-
 .../apache/pulsar/broker/transaction/buffer/TransactionBuffer.java  | 2 +-
 .../buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java | 2 +-
 .../src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java  | 2 +-
 .../java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java| 2 +-
 .../java/org/apache/pulsar/broker/service/BrokerServiceTest.java| 2 +-
 .../org/apache/pulsar/broker/service/PersistentTopicE2ETest.java| 2 +-
 .../java/org/apache/pulsar/broker/service/ResendRequestTest.java| 2 +-
 .../pulsar/broker/service/persistent/DelayedDeliveryTest.java   | 2 +-
 .../org/apache/pulsar/broker/testcontext/PulsarTestContext.java | 2 +-
 .../org/apache/pulsar/client/api/SimpleProducerConsumerTest.java| 4 ++--
 .../org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java| 6 +++---
 .../org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java  | 2 +-
 .../apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java| 2 +-
 .../test/java/org/apache/pulsar/client/impl/PulsarTestClient.java   | 2 +-
 .../pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java   | 2 +-
 .../org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java | 2 +-
 .../test/java/org/apache/pulsar/zookeeper/ZookeeperServerTest.java  | 2 +-
 20 files changed, 23 insertions(+), 23 deletions(-)

diff --git 
a/pulsar-broker-auth-athenz/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenzTest.java
 
b/pulsar-broker-auth-athenz/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenzTest.java
index 89ee5ca0830..a5211c2f814 100644
--- 
a/pulsar-broker-auth-athenz/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenzTest.java
+++ 
b/pulsar-broker-auth-athenz/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenzTest.java
@@ -81,7 +81,7 @@ public class AuthenticationProviderAthenzTest {
 sysPropProvider2.initialize(config);
 assertEquals(sysPropProvider2.getAllowedOffset(), 0);
 } catch (Exception e) {
-fail("Failed to get allowd offset from system property");
+fail("Failed to get allowed offset from system property");
 }
 
 System.setProperty("pulsar.athenz.role.token_allowed_offset", 
"invalid");
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
index 7bb16bac124..cec052b65ff 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
@@ -68,7 +68,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Load shedding strategy that unloads bundles from the highest loaded brokers.
- * This strategy is only configurable in the broker load balancer extenstions 
introduced by
+ * This strategy is only configurable in the broker load balancer extensions 
introduced by
  * PIP-192[https://github.com/apache/pulsar/issues/16691].
  *
  * This load shedding strategy has the following goals:
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java
index 2e5a590fa19..686c72df8c2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java
@@ -49,7 +49,7 @@ public interface EntryFilter {
  */
 REJECT,
 /**
- * postpone message, it should not go to this conmumer.
+ * postpone message, it should not go to this consumer.
  */
 RESCHEDULE
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
 

[GitHub] [pulsar] Technoboy- merged pull request #20798: [fix][doc]Correcting spelling mistakes in the pulsar-broker module

2023-07-13 Thread via GitHub


Technoboy- merged PR #20798:
URL: https://github.com/apache/pulsar/pull/20798


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] codecov-commenter commented on pull request #20798: [fix][doc]Correcting spelling mistakes in the pulsar-broker module

2023-07-13 Thread via GitHub


codecov-commenter commented on PR #20798:
URL: https://github.com/apache/pulsar/pull/20798#issuecomment-1635220217

   ## 
[Codecov](https://app.codecov.io/gh/apache/pulsar/pull/20798?src=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 Report
   > Merging 
[#20798](https://app.codecov.io/gh/apache/pulsar/pull/20798?src=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 (ff51d5e) into 
[master](https://app.codecov.io/gh/apache/pulsar/commit/4e2ba4b9d2aff24add3016c48fe72457518444aa?el=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 (4e2ba4b) will **increase** coverage by `0.01%`.
   > The diff coverage is `50.00%`.
   
   [![Impacted file tree 
graph](https://app.codecov.io/gh/apache/pulsar/pull/20798/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/20798?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
   
   ```diff
   @@ Coverage Diff  @@
   ## master   #20798  +/-   ##
   
   + Coverage 73.10%   73.12%   +0.01% 
   - Complexity3210832131  +23 
   
 Files  1866 1867   +1 
 Lines139028   139058  +30 
 Branches  1529415299   +5 
   
   + Hits 101634   101683  +49 
   + Misses2933329306  -27 
   - Partials   8061 8069   +8 
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | inttests | `24.08% <30.00%> (-0.10%)` | :arrow_down: |
   | systests | `25.05% <30.00%> (+0.03%)` | :arrow_up: |
   | unittests | `72.41% <50.00%> (+0.03%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   | [Impacted 
Files](https://app.codecov.io/gh/apache/pulsar/pull/20798?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[...dbalance/extensions/scheduler/TransferShedder.java](https://app.codecov.io/gh/apache/pulsar/pull/20798?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9sb2FkYmFsYW5jZS9leHRlbnNpb25zL3NjaGVkdWxlci9UcmFuc2ZlclNoZWRkZXIuamF2YQ==)
 | `82.92% <ø> (ø)` | |
   | 
[...ache/pulsar/broker/service/plugin/EntryFilter.java](https://app.codecov.io/gh/apache/pulsar/pull/20798?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BsdWdpbi9FbnRyeUZpbHRlci5qYXZh)
 | `100.00% <ø> (ø)` | |
   | 
[...r/impl/SnapshotSegmentAbortedTxnProcessorImpl.java](https://app.codecov.io/gh/apache/pulsar/pull/20798?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci90cmFuc2FjdGlvbi9idWZmZXIvaW1wbC9TbmFwc2hvdFNlZ21lbnRBYm9ydGVkVHhuUHJvY2Vzc29ySW1wbC5qYXZh)
 | `77.20% <ø> (ø)` | |
   | 
[...lsar/common/policies/data/OffloadPoliciesImpl.java](https://app.codecov.io/gh/apache/pulsar/pull/20798?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWNvbW1vbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbW1vbi9wb2xpY2llcy9kYXRhL09mZmxvYWRQb2xpY2llc0ltcGwuamF2YQ==)
 | `83.43% <37.50%> (-1.28%)` | :arrow_down: |
   | 
[...va/org/apache/pulsar/broker/service/ServerCnx.java](https://app.codecov.io/gh/apache/pulsar/pull/20798?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL1NlcnZlckNueC5qYXZh)
 | `71.27% <100.00%> (-0.12%)` | :arrow_down: |
   | 
[.../persistent/ReplicatedSubscriptionsController.java](https://app.codecov.io/gh/apache/pulsar/pull/20798?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvUmVwbGljYXRlZFN1YnNjcmlwdGlvbnNDb250cm9sbGVyLmphdmE=)
 | `69.62% <100.00%> (-1.91%)` | :arrow_down: |
   
   ... and [86 files with indirect coverage 
changes](https://app.codecov.io/gh/apache/pulsar/pull/20798/indirect-changes?src=pr=tree-more_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
   


-- 
This is an automated message from the Apache Git 

[GitHub] [pulsar] yyj8 commented on pull request #20048: [improve][broker]PIP-255 Add topic metrics for the number of production data requests to add a topic and the average number of messages per req

2023-07-13 Thread via GitHub


yyj8 commented on PR #20048:
URL: https://github.com/apache/pulsar/pull/20048#issuecomment-1635207236

   > > > Did this PIP vote pass? Where is the issue?
   > > > PIP-255 is `Make the partition assignment strategy pluggable` #19806
   > > 
   > > 
   > > PIP must have an issue, right?
   > 
   > pip needs to be discussed in the dev mail list, and finally initiate a 
vote, at least 2 PMC +1 can pass
   
   Do I still need to send an email to the development mailing list?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar-client-go] Gleiphir2769 commented on pull request #1055: [fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and schema

2023-07-13 Thread via GitHub


Gleiphir2769 commented on PR #1055:
URL: 
https://github.com/apache/pulsar-client-go/pull/1055#issuecomment-1635174466

   Please rerun workflow for this pr. cc @RobertIndie 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #1055: [fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and schema

2023-07-13 Thread via GitHub


Gleiphir2769 commented on code in PR #1055:
URL: https://github.com/apache/pulsar-client-go/pull/1055#discussion_r1263219202


##
pulsar/producer_partition.go:
##
@@ -1407,31 +1410,39 @@ func (p *partitionProducer) releaseSemaphoreAndMem(size 
int64) {
p.client.memLimit.ReleaseMemory(size)
 }
 
-func (p *partitionProducer) canAddToQueue(sr *sendRequest, 
uncompressedPayloadSize int64) bool {
+func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool {
if p.options.DisableBlockIfQueueFull {
if !p.publishSemaphore.TryAcquire() {
runCallback(sr.callback, nil, sr.msg, 
errSendQueueIsFull)
return false
}
-   if !p.client.memLimit.TryReserveMemory(uncompressedPayloadSize) 
{
+   } else {
+   if !p.publishSemaphore.Acquire(sr.ctx) {
+   runCallback(sr.callback, nil, sr.msg, errContextExpired)
+   return false
+   }
+   }
+   p.metrics.MessagesPending.Inc()
+   p.metrics.BytesPending.Add(float64(len(sr.msg.Payload)))

Review Comment:
   Thanks for your careful review. L1425-L1426 is the original code and +1 to 
move it to `canReserveMem`.
   
   
https://github.com/apache/pulsar-client-go/blob/e45122c2defc5efd4efc493d0acef278a7ccfc01/pulsar/producer_partition.go#L1436-L1441



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #1055: [fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and schema

2023-07-13 Thread via GitHub


Gleiphir2769 commented on code in PR #1055:
URL: https://github.com/apache/pulsar-client-go/pull/1055#discussion_r1263219202


##
pulsar/producer_partition.go:
##
@@ -1407,31 +1410,39 @@ func (p *partitionProducer) releaseSemaphoreAndMem(size 
int64) {
p.client.memLimit.ReleaseMemory(size)
 }
 
-func (p *partitionProducer) canAddToQueue(sr *sendRequest, 
uncompressedPayloadSize int64) bool {
+func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool {
if p.options.DisableBlockIfQueueFull {
if !p.publishSemaphore.TryAcquire() {
runCallback(sr.callback, nil, sr.msg, 
errSendQueueIsFull)
return false
}
-   if !p.client.memLimit.TryReserveMemory(uncompressedPayloadSize) 
{
+   } else {
+   if !p.publishSemaphore.Acquire(sr.ctx) {
+   runCallback(sr.callback, nil, sr.msg, errContextExpired)
+   return false
+   }
+   }
+   p.metrics.MessagesPending.Inc()
+   p.metrics.BytesPending.Add(float64(len(sr.msg.Payload)))

Review Comment:
   Thanks for your careful review. L1425-L1426 is the original code and +1 for 
me move it to `canReserveMem`.
   
   
https://github.com/apache/pulsar-client-go/blob/e45122c2defc5efd4efc493d0acef278a7ccfc01/pulsar/producer_partition.go#L1436-L1441



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #1055: [fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and schema

2023-07-13 Thread via GitHub


Gleiphir2769 commented on code in PR #1055:
URL: https://github.com/apache/pulsar-client-go/pull/1055#discussion_r1263219202


##
pulsar/producer_partition.go:
##
@@ -1407,31 +1410,39 @@ func (p *partitionProducer) releaseSemaphoreAndMem(size 
int64) {
p.client.memLimit.ReleaseMemory(size)
 }
 
-func (p *partitionProducer) canAddToQueue(sr *sendRequest, 
uncompressedPayloadSize int64) bool {
+func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool {
if p.options.DisableBlockIfQueueFull {
if !p.publishSemaphore.TryAcquire() {
runCallback(sr.callback, nil, sr.msg, 
errSendQueueIsFull)
return false
}
-   if !p.client.memLimit.TryReserveMemory(uncompressedPayloadSize) 
{
+   } else {
+   if !p.publishSemaphore.Acquire(sr.ctx) {
+   runCallback(sr.callback, nil, sr.msg, errContextExpired)
+   return false
+   }
+   }
+   p.metrics.MessagesPending.Inc()
+   p.metrics.BytesPending.Add(float64(len(sr.msg.Payload)))

Review Comment:
   Thanks for your careful review. L1425-L1426 is the original code and I think 
we can move it to `canReserveMem` to fix this metric.
   
   
https://github.com/apache/pulsar-client-go/blob/e45122c2defc5efd4efc493d0acef278a7ccfc01/pulsar/producer_partition.go#L1436-L1441



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar-client-go] Gleiphir2769 commented on a diff in pull request #1055: [fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and schema

2023-07-13 Thread via GitHub


Gleiphir2769 commented on code in PR #1055:
URL: https://github.com/apache/pulsar-client-go/pull/1055#discussion_r1263217545


##
pulsar/producer_test.go:
##
@@ -1924,6 +1924,159 @@ func TestMemLimitRejectProducerMessages(t *testing.T) {
assert.NoError(t, err)
 }
 
+func TestMemLimitRejectProducerMessagesWithSchema(t *testing.T) {
+
+   c, err := NewClient(ClientOptions{
+   URL:  serviceURL,
+   MemoryLimitBytes: 100 * 6,
+   })
+   assert.NoError(t, err)
+   defer c.Close()
+
+   schema := NewAvroSchema(`{"fields":
+   [
+   
{"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]}
+   ],
+   "name":"MyAvro","namespace":"schemaNotFoundTestCase","type":"record"}`, 
nil)
+
+   topicName := newTopicName()
+   producer1, _ := c.CreateProducer(ProducerOptions{
+   Topic:   topicName,
+   DisableBlockIfQueueFull: true,
+   DisableBatching: false,
+   BatchingMaxPublishDelay: 100 * time.Second,
+   SendTimeout: 2 * time.Second,
+   })
+
+   producer2, _ := c.CreateProducer(ProducerOptions{
+   Topic:   topicName,
+   DisableBlockIfQueueFull: true,
+   DisableBatching: false,
+   BatchingMaxPublishDelay: 100 * time.Second,
+   SendTimeout: 2 * time.Second,
+   })
+
+   // the size of encoded value is 6 bytes
+   value := map[string]interface{}{
+   "id": 0,
+   "name": map[string]interface{}{
+   "string": "abc",
+   },
+   }
+
+   n := 101
+   for i := 0; i < n/2; i++ {
+   producer1.SendAsync(context.Background(), {
+   Value:  value,
+   Schema: schema,
+   }, func(id MessageID, message *ProducerMessage, e error) {})
+
+   producer2.SendAsync(context.Background(), {
+   Value:  value,
+   Schema: schema,
+   }, func(id MessageID, message *ProducerMessage, e error) {})
+   }
+   // Last message in order to reach the limit
+   producer1.SendAsync(context.Background(), {
+   Value:  value,
+   Schema: schema,
+   }, func(id MessageID, message *ProducerMessage, e error) {})
+   time.Sleep(100 * time.Millisecond)
+   assert.Equal(t, int64(n*6), c.(*client).memLimit.CurrentUsage())
+
+   _, err = producer1.Send(context.Background(), {
+   Value:  value,
+   Schema: schema,
+   })
+   assert.Error(t, err)
+   assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))
+
+   _, err = producer2.Send(context.Background(), {
+   Value:  value,
+   Schema: schema,
+   })
+   assert.Error(t, err)
+   assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))
+
+   // flush pending msg
+   err = producer1.Flush()
+   assert.NoError(t, err)
+   err = producer2.Flush()
+   assert.NoError(t, err)
+   assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage())
+
+   _, err = producer1.Send(context.Background(), {
+   Value:  value,
+   Schema: schema,
+   })
+   assert.NoError(t, err)
+   _, err = producer2.Send(context.Background(), {
+   Value:  value,
+   Schema: schema,
+   })
+   assert.NoError(t, err)
+}
+
+func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) {
+
+   c, err := NewClient(ClientOptions{
+   URL:  serviceURL,
+   MemoryLimitBytes: 10 * 1024,
+   })
+   assert.NoError(t, err)
+   defer c.Close()
+
+   topicName := newTopicName()
+   producer1, _ := c.CreateProducer(ProducerOptions{
+   Topic:   topicName,
+   DisableBlockIfQueueFull: true,
+   DisableBatching: true,
+   EnableChunking:  true,
+   ChunkMaxMessageSize: 1024,
+   SendTimeout: 2 * time.Second,
+   })
+
+   producer1.SendAsync(context.Background(), {
+   Payload: make([]byte, 10*1024+1),
+   }, func(id MessageID, message *ProducerMessage, e error) {
+   if e != nil {
+   t.Fatal(e)
+   }
+   })
+
+   _, err = producer1.Send(context.Background(), {
+   Payload: make([]byte, 1),
+   })
+   assert.Error(t, err)
+   assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))
+
+   // wait all the chunks have been released

Review Comment:
   Because of `DisableBatching=true`, `producer.flush` here is useless and 
cause panic.



-- 
This is an automated message from the 

[GitHub] [pulsar] github-actions[bot] commented on issue #19239: PIP-242: Topic name restrictions

2023-07-13 Thread via GitHub


github-actions[bot] commented on issue #19239:
URL: https://github.com/apache/pulsar/issues/19239#issuecomment-1635164603

   The issue had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on issue #19403: Add a new admin API getAllLoadReport()

2023-07-13 Thread via GitHub


github-actions[bot] commented on issue #19403:
URL: https://github.com/apache/pulsar/issues/19403#issuecomment-1635164423

   The issue had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on issue #19504: [Doc] Clarification of documentation regarding multi-tenancy

2023-07-13 Thread via GitHub


github-actions[bot] commented on issue #19504:
URL: https://github.com/apache/pulsar/issues/19504#issuecomment-1635164273

   The issue had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #19515: [feat][sql] Support querying compacted data in Pulsar SQL

2023-07-13 Thread via GitHub


github-actions[bot] commented on PR #19515:
URL: https://github.com/apache/pulsar/pull/19515#issuecomment-1635164239

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on issue #19542: [Bug] Swagger definitions for creating sinks/sources are incorrect

2023-07-13 Thread via GitHub


github-actions[bot] commented on issue #19542:
URL: https://github.com/apache/pulsar/issues/19542#issuecomment-1635164194

   The issue had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on issue #19554: Add pulsar_tenant and pulsar_namespace labels to prometheus metrics

2023-07-13 Thread via GitHub


github-actions[bot] commented on issue #19554:
URL: https://github.com/apache/pulsar/issues/19554#issuecomment-1635164136

   The issue had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #19821: [wip][feat][io] Debezium DB2 source connector for Pulsar

2023-07-13 Thread via GitHub


github-actions[bot] commented on PR #19821:
URL: https://github.com/apache/pulsar/pull/19821#issuecomment-1635163767

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



svn commit: r62967 [1/3] - in /dev/pulsar/pulsar-2.10.5-candidate-1: ./ connectors/ connectors/apache-pulsar-io-connectors-2.10.5-bin/

2023-07-13 Thread xiangying
Author: xiangying
Date: Fri Jul 14 02:01:04 2023
New Revision: 62967

Log:
Staging artifacts and signature for Pulsar release 2.10.5

Added:
dev/pulsar/pulsar-2.10.5-candidate-1/
dev/pulsar/pulsar-2.10.5-candidate-1/apache-pulsar-2.10.5-bin.tar.gz   
(with props)
dev/pulsar/pulsar-2.10.5-candidate-1/apache-pulsar-2.10.5-bin.tar.gz.asc
dev/pulsar/pulsar-2.10.5-candidate-1/apache-pulsar-2.10.5-bin.tar.gz.sha512
dev/pulsar/pulsar-2.10.5-candidate-1/apache-pulsar-2.10.5-src.tar.gz   
(with props)
dev/pulsar/pulsar-2.10.5-candidate-1/apache-pulsar-2.10.5-src.tar.gz.asc
dev/pulsar/pulsar-2.10.5-candidate-1/apache-pulsar-2.10.5-src.tar.gz.sha512

dev/pulsar/pulsar-2.10.5-candidate-1/apache-pulsar-offloaders-2.10.5-bin.tar.gz 
  (with props)

dev/pulsar/pulsar-2.10.5-candidate-1/apache-pulsar-offloaders-2.10.5-bin.tar.gz.asc

dev/pulsar/pulsar-2.10.5-candidate-1/apache-pulsar-offloaders-2.10.5-bin.tar.gz.sha512
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/LICENSE
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/README

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/LICENSE

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/README

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-aerospike-2.10.5.nar
   (with props)

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-aerospike-2.10.5.nar.asc

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-aerospike-2.10.5.nar.sha512

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-batch-data-generator-2.10.5.nar
   (with props)

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-batch-data-generator-2.10.5.nar.asc

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-batch-data-generator-2.10.5.nar.sha512

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-canal-2.10.5.nar
   (with props)

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-canal-2.10.5.nar.asc

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-canal-2.10.5.nar.sha512

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-cassandra-2.10.5.nar
   (with props)

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-cassandra-2.10.5.nar.asc

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-cassandra-2.10.5.nar.sha512

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-data-generator-2.10.5.nar
   (with props)

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-data-generator-2.10.5.nar.asc

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-data-generator-2.10.5.nar.sha512

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-mongodb-2.10.5.nar
   (with props)

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-mongodb-2.10.5.nar.asc

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-mongodb-2.10.5.nar.sha512

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-mssql-2.10.5.nar
   (with props)

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-mssql-2.10.5.nar.asc

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-mssql-2.10.5.nar.sha512

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-mysql-2.10.5.nar
   (with props)

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-mysql-2.10.5.nar.asc

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-mysql-2.10.5.nar.sha512

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-oracle-2.10.5.nar
   (with props)

dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-oracle-2.10.5.nar.asc


svn commit: r62967 [2/3] - in /dev/pulsar/pulsar-2.10.5-candidate-1: ./ connectors/ connectors/apache-pulsar-io-connectors-2.10.5-bin/

2023-07-13 Thread xiangying
Added: 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-postgres-2.10.5.nar.sha512
==
--- 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-postgres-2.10.5.nar.sha512
 (added)
+++ 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-postgres-2.10.5.nar.sha512
 Fri Jul 14 02:01:04 2023
@@ -0,0 +1 @@
+5647705ad1d9db834f1348fe911615f320e47e78228eba2de7d7f6cc6bd452b6b5782633a3680d689f1ab207f52aa6c5f8fe9b76f393905e1879156035da6725
  
./connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-debezium-postgres-2.10.5.nar

Added: 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-dynamodb-2.10.5.nar
==
Binary file - no diff available.

Propchange: 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-dynamodb-2.10.5.nar
--
svn:mime-type = application/octet-stream

Added: 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-dynamodb-2.10.5.nar.asc
==
--- 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-dynamodb-2.10.5.nar.asc
 (added)
+++ 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-dynamodb-2.10.5.nar.asc
 Fri Jul 14 02:01:04 2023
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCgAdFiEE+P0S7h/2fz2fR3D9AvkjT8MxZvcFAmSwo7oACgkQAvkjT8Mx
+ZvcmKhAAiNiX1N9FmxECKmByJg7bmrYWszIoLQWYp9x2aTO8LPTYXhISjEYdPIVg
+ZVuibKk3fEUT9ewHTN1oH8zZq+kHJHftLxbct4GvX1rcKcJnTnk0XABsVlSbYZUW
+Q6cdez0WEP82bNgxkA0kBdIX5s55JG6+kwq44n+++z9m6IDz6Oixp2R3MVuW95LN
+q+fy6uTf6PQhXZQe3StV4A0t3Sormabfyd3xDgoTv348CptVooFtGMtfOYmNZoC9
+AiCDZEjoNgzeXPEx7kS6t9Bt4FgYJyXbtfV2gIkgn+jzODDK/uGld+8at6stueuR
+fgkHTXl605HgeL4+ET/S4fw1IA0VUDVtYgrdDkE3nq2ExdTOPhuSRntm4JLfxToi
+lkI0rTICsYzxs6j4v1nsQ8+9EJKyTqKmQ5Sr0R74fb85EMvlzbqzIc8MYlK50pFk
+coC6ERL1aosBMwIR6jnEXPRC+ekBARqnox4fyF2HDuRKUlyp8fWUSXR+G6oJRa5T
+kx6qt1ae5nD55jBNjzTic8IcRCmhJ2cdHYtwQcOxWUuYdqQCroU3wTiVMGybe8ml
+GUMNnlG51MKXbN+NFtsQsUz9CwrTVnfB8qYe33A6MTbt6UI7+Yw1Q0fEN98fhIV4
+PvVuPFhFzHu+u+A57OnLTtkN+fAhSHFCIZY9iy3YFxgpWpU4RIg=
+=s6oI
+-END PGP SIGNATURE-

Added: 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-dynamodb-2.10.5.nar.sha512
==
--- 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-dynamodb-2.10.5.nar.sha512
 (added)
+++ 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-dynamodb-2.10.5.nar.sha512
 Fri Jul 14 02:01:04 2023
@@ -0,0 +1 @@
+5f2f8a1105be12a1cdef76fcd7532ab43d39d38808c51d40fc4beaa7074d91b4054b2d4b211d53594d9eb242d83083288ba7b1061e35a05a2367cad1a4f5b581
  
./connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-dynamodb-2.10.5.nar

Added: 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-elastic-search-2.10.5.nar
==
Binary file - no diff available.

Propchange: 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-elastic-search-2.10.5.nar
--
svn:mime-type = application/octet-stream

Added: 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-elastic-search-2.10.5.nar.asc
==
--- 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-elastic-search-2.10.5.nar.asc
 (added)
+++ 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/apache-pulsar-io-connectors-2.10.5-bin/pulsar-io-elastic-search-2.10.5.nar.asc
 Fri Jul 14 02:01:04 2023
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCgAdFiEE+P0S7h/2fz2fR3D9AvkjT8MxZvcFAmSwo70ACgkQAvkjT8Mx
+ZvdFpw//fpHiNH7swkjpKVOwIN9JQwJzP+792m9nExHkTPYKf6N3y3KFaXRBVqRg
+oIaxu4GnKPPJzr6Vl/oIjiZ0Lv2NhseMkTZ9vph4jQlkMvypzkDCtC8Ktyz+paQ6
+lAKvsMnldQdm0eHXYv9zrxFBUMXCb09lnXufR0KG+NJtVJEKZHq99/TZTTyznHhO
+PodVXL+gNUXufoVZ2whkIILHynBNoaWlzMD58vllhzc9goJYm92n44N5HoEiG9m0
+xbbB1ElINXiSBDmYLPNlNqY44PESZ6dKqMvXjgVO5hw6kxE8TlqhrqWKJrH2ipUh
+yboomiXFTs7kDgrNccXELx6E2226UFYV89IDgpkm8AAEbqgrQ0sgTgK5Tyqt2RFb
+8vxwgpdV2AyoYGCjh7OEJytJIBvmeRb+UVjstCsOPAUrUMxNTEpttahRMLyW6/uY

svn commit: r62967 [3/3] - in /dev/pulsar/pulsar-2.10.5-candidate-1: ./ connectors/ connectors/apache-pulsar-io-connectors-2.10.5-bin/

2023-07-13 Thread xiangying
Added: 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hbase-2.10.5.nar.asc
==
--- 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hbase-2.10.5.nar.asc 
(added)
+++ 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hbase-2.10.5.nar.asc 
Fri Jul 14 02:01:04 2023
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCgAdFiEE+P0S7h/2fz2fR3D9AvkjT8MxZvcFAmSwo9MACgkQAvkjT8Mx
+ZveqyBAAoIlkpdDJbizCLNZccrHTM+2t1O2vkirDMmdZ0eTEUrUMeFoWU5q3AokG
+G+YllJmtBJGkI7e/lGUvq82pBF+r4qWa0j4nk46MmkmZNTGOLv1EySuDe9s8h2Ak
+AznK53mPuFCUSB9XOZmxYOQ/0e6/h0QP7NIZdIB8Wd0h4ACUZL1hAzzvji8pFDhD
++BQmgDAmjWK/lwYSZk4hQ0jRRfSlb64IP6lRmWSINvrCBBO9x90EyK3jrwcnPsxu
+5bTlNtgiyDgLPi6NqXec8L4yj5pGsYoQ+zPZXV8NPkwYXuu4y5nbhIHIcP71w7rz
+zpO1vEUFN0mQgIhg/IOh6rqfV376q4lPkscWRaKbxShbRMAWRO54j0vCCUBpirVo
+O9rGUNDEOJHXjLyMLa1gDHio0t0DtGO9Zp1qBVAbjaClOpbIgaBg+M0RN4dM+JEB
+7VOuiTsh00ZpDfQEuaqjfvr4Li4A2E0bcNX8G85gaiWoFA14x886cESbtPgti3lp
+wU1TIq3AWmgd/q/3hoOdRTKCdPberaqAEePwQIXhXAjf6phPesoszngqgVvVNKKS
+qGj+IAMB4PJOsmzZvBcOgLwHnxE2szEGNIhRah7LMFnSYOZVl3iO2uHZIy6ttrAB
+5+TqWWKcU2bYAGA6p+aRDj8ZuyB1Tej7hal80TGKZ50czrZ63SI=
+=aBrJ
+-END PGP SIGNATURE-

Added: 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hbase-2.10.5.nar.sha512
==
--- 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hbase-2.10.5.nar.sha512
 (added)
+++ 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hbase-2.10.5.nar.sha512
 Fri Jul 14 02:01:04 2023
@@ -0,0 +1 @@
+2ea871f6ac5e12ebee09cee88ea157b1be3584144f5b4c784bf41e88acbc1094f839f34265328a3813cd31b341cdaff3bc5d529ace41af0482ec96661a599e7e
  ./connectors/pulsar-io-hbase-2.10.5.nar

Added: 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs2-2.10.5.nar
==
Binary file - no diff available.

Propchange: 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs2-2.10.5.nar
--
svn:mime-type = application/octet-stream

Added: 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs2-2.10.5.nar.asc
==
--- 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs2-2.10.5.nar.asc 
(added)
+++ 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs2-2.10.5.nar.asc 
Fri Jul 14 02:01:04 2023
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCgAdFiEE+P0S7h/2fz2fR3D9AvkjT8MxZvcFAmSwo9cACgkQAvkjT8Mx
+ZvcoYg/+LlcicvhiwivNqOtMeWb9ltXibi3nJUVRhL5BM5tzPIrp5RrlMCmlw1Mv
+a2tEu+rWLaZcqT4sRVQBKn2AzMgtpTKZUduON8sdVy/5CHMedk97Z/yAV8HLgAFr
+xEanAVRlpkRDbMqIf/cgtQsMqnAcqocvai5KcXmJB99mqLdXUlu/t7UDytRcmr4E
+oBHUfcTYTZ1NQ8+ILPDYFHdpnqarLkfshkQxdkLS0Iz0CoWXo44a8QHo7V5wuKWf
+3SfaYpGw1BXADvXuJHnxPIMbbOYCzgPecZFP4o5Vq+P0ULjbHz58vP9QyX/wMA0l
+AqcVlFOTkX8abshk9mLY1bsl85wMa0cFqKjaTSfiTI6YuTs5hzRNhoOITxiv0yFq
+N+aBRmbtAe1nelAV2DukpEHAoaP2oJvsubrU+KjuMVQAbWJh+rEXaLcBy71uNaPF
+rceYhVMlT38iu99+/TYd6zqmSomxxhs7wtfRsE7nf7LjBjF88NexFN5GVwbmnIdP
+SdeFJRyT8/FvWgAUpQlqkwXkNWMBVtyn4YyKdpAp3aroIhF6EJr9HOxClddDjJss
+xW/R/Kw7qileIUx0zU3NWi7o9ix3nn25mjgl61kaKbPYTIxhgV6pKUu+CzxyJDc5
+BYsI4LO5VWhf20kd+gr/xQj4DcCmeyIpI+9QWhBP4IH1Lnc79UY=
+=LRUA
+-END PGP SIGNATURE-

Added: 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs2-2.10.5.nar.sha512
==
--- 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs2-2.10.5.nar.sha512
 (added)
+++ 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs2-2.10.5.nar.sha512
 Fri Jul 14 02:01:04 2023
@@ -0,0 +1 @@
+3b3f4cfaf14433fc3004f091ff3c772739c80e1499d4b8c2f40eccd0fe05073bc63a1fbe006e95df33d9180d84da1dec865e88303100b1018093693b222eb7d5
  ./connectors/pulsar-io-hdfs2-2.10.5.nar

Added: 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs3-2.10.5.nar
==
Binary file - no diff available.

Propchange: 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs3-2.10.5.nar
--
svn:mime-type = application/octet-stream

Added: 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs3-2.10.5.nar.asc
==
--- 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs3-2.10.5.nar.asc 
(added)
+++ 
dev/pulsar/pulsar-2.10.5-candidate-1/connectors/pulsar-io-hdfs3-2.10.5.nar.asc 
Fri Jul 14 02:01:04 2023
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCgAdFiEE+P0S7h/2fz2fR3D9AvkjT8MxZvcFAmSwo84ACgkQAvkjT8Mx
+ZveyJhAAhYSMMYcczrxhh2PGtYXMi42WEMUA671Xge7+t6uKxyRuYDxMXUWJnORU

[GitHub] [pulsar-site] shy-share opened a new pull request, #640: fix setReplicationClusters to replicationClusters

2023-07-13 Thread via GitHub


shy-share opened a new pull request, #640:
URL: https://github.com/apache/pulsar-site/pull/640

   
   
   
   
   
   
   This PR fixs apache/pulsar#20742
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] poorbarcode opened a new pull request, #20805: [improve][pip] Add a internal API waitForAllTopicsCreated under persistent

2023-07-13 Thread via GitHub


poorbarcode opened a new pull request, #20805:
URL: https://github.com/apache/pulsar/pull/20805

   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: x
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] zymap opened a new pull request, #20804: [improve][offload] Extend the offload police to allow specifying more conf

2023-07-13 Thread via GitHub


zymap opened a new pull request, #20804:
URL: https://github.com/apache/pulsar/pull/20804

   
   
   
   
   Fixes #xyz
   
   
   
   Main Issue: #xyz
   
   
   
   PIP: #xyz 
   
   
   
   ### Motivation
   
   The offload policies have limited the configurations for the offloaders.  
That means if the offloader needs more configurations, we need to extend more 
fields in the OffloadPoliciesImpl. That doesn't make sense. We should make it 
extendable easily. Add a configuration map support to allow it to set more 
configurations.
   
   ### Modifications
   
   
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[pulsar] 02/02: [fix][test] Fix resource leak in PulsarTestContext (#20799)

2023-07-13 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fbc92a281c36b88d97a82cd7a6a4a57a71a040f6
Author: Lari Hotari 
AuthorDate: Fri Jul 14 00:01:22 2023 +0300

[fix][test] Fix resource leak in PulsarTestContext (#20799)

(cherry picked from commit ae0fc5bdcae0220da4936d0fdaff71eead219cdd)
---
 .../org/apache/pulsar/broker/PulsarService.java|  6 +-
 .../pulsar/broker/service/PersistentTopicTest.java |  5 +-
 .../testcontext/AbstractTestPulsarService.java | 53 ---
 .../testcontext/NonStartableTestPulsarService.java | 79 --
 4 files changed, 49 insertions(+), 94 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index cf8ab33fd7e..6bb477cf037 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -787,7 +787,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 exposeTopicMetrics, offloaderScheduler, interval);
 this.defaultOffloader = 
createManagedLedgerOffloader(defaultOffloadPolicies);
 
-this.brokerInterceptor = BrokerInterceptors.load(config);
+setBrokerInterceptor(newBrokerInterceptor());
 // use getter to support mocking getBrokerInterceptor method in 
tests
 BrokerInterceptor interceptor = getBrokerInterceptor();
 if (interceptor != null) {
@@ -930,6 +930,10 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 }
 }
 
+protected BrokerInterceptor newBrokerInterceptor() throws IOException {
+return BrokerInterceptors.load(config);
+}
+
 @VisibleForTesting
 protected OrderedExecutor newOrderedExecutor() {
 return OrderedExecutor.newBuilder()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 45ef58bb703..02b10dd09a2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -131,6 +131,7 @@ import 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.CompactedTopicContext;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactorMXBean;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.awaitility.Awaitility;
@@ -172,11 +173,13 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 svcConfig.setClusterName("pulsar-cluster");
 svcConfig.setTopicLevelPoliciesEnabled(false);
 svcConfig.setSystemTopicEnabled(false);
+Compactor compactor = mock(Compactor.class);
+when(compactor.getStats()).thenReturn(mock(CompactorMXBean.class));
 pulsarTestContext = PulsarTestContext.builderForNonStartableContext()
 .config(svcConfig)
 .spyByDefault()
 .useTestPulsarResources(metadataStore)
-.compactor(mock(Compactor.class))
+.compactor(compactor)
 .build();
 brokerService = pulsarTestContext.getBrokerService();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
index a6861268b94..517d57d0042 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.broker.testcontext;
 
+import java.io.IOException;
 import org.apache.pulsar.broker.BookKeeperClientFactory;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -37,11 +38,7 @@ import 
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
  */
 abstract class AbstractTestPulsarService extends PulsarService {
 protected final SpyConfig spyConfig;
-protected final MetadataStoreExtended localMetadataStore;
-protected final MetadataStoreExtended configurationMetadataStore;
-protected final Compactor compactor;
-protected final BrokerInterceptor brokerInterceptor;
-protected final BookKeeperClientFactory bookKeeperClientFactory;
+private boolean compactorExists;
 
 public 

[pulsar] 01/02: [fix][test] Fix flaky PersistentSubscriptionTest (#20434)

2023-07-13 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7b259131c3ed41a6b1a28f85171d908f4120535a
Author: Lari Hotari 
AuthorDate: Wed May 31 22:57:16 2023 +0300

[fix][test] Fix flaky PersistentSubscriptionTest (#20434)

(cherry picked from commit 242758d5770de46e506855ff881472cbc274cedb)
---
 .../persistent/PersistentSubscriptionTest.java | 106 +++--
 .../testcontext/NonStartableTestPulsarService.java |  17 
 2 files changed, 72 insertions(+), 51 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index 401f52daa62..87408598889 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -92,7 +92,12 @@ public class PersistentSubscriptionTest {
 public void setup() throws Exception {
 pulsarTestContext = PulsarTestContext.builderForNonStartableContext()
 .spyByDefault()
-.configCustomizer(config -> 
config.setTransactionCoordinatorEnabled(true))
+.configCustomizer(config -> {
+config.setTransactionCoordinatorEnabled(true);
+config.setTransactionPendingAckStoreProviderClassName(
+
CustomTransactionPendingAckStoreProvider.class.getName());
+
config.setTransactionBufferProviderClassName(InMemTransactionBufferProvider.class.getName());
+})
 .useTestPulsarResources()
 .build();
 
@@ -100,56 +105,6 @@ public class PersistentSubscriptionTest {
 doReturn(Optional.of(new Policies())).when(namespaceResources)
 .getPoliciesIfCached(any());
 
-doReturn(new 
InMemTransactionBufferProvider()).when(pulsarTestContext.getPulsarService())
-.getTransactionBufferProvider();
-doReturn(new TransactionPendingAckStoreProvider() {
-@Override
-public CompletableFuture 
newPendingAckStore(PersistentSubscription subscription) {
-return CompletableFuture.completedFuture(new PendingAckStore() 
{
-@Override
-public void replayAsync(PendingAckHandleImpl 
pendingAckHandle, ExecutorService executorService) {
-try {
-Field field = 
PendingAckHandleState.class.getDeclaredField("state");
-field.setAccessible(true);
-field.set(pendingAckHandle, 
PendingAckHandleState.State.Ready);
-} catch (NoSuchFieldException | IllegalAccessException 
e) {
-fail();
-}
-}
-
-@Override
-public CompletableFuture closeAsync() {
-return CompletableFuture.completedFuture(null);
-}
-
-@Override
-public CompletableFuture appendIndividualAck(TxnID 
txnID, List> positions) {
-return CompletableFuture.completedFuture(null);
-}
-
-@Override
-public CompletableFuture appendCumulativeAck(TxnID 
txnID, PositionImpl position) {
-return CompletableFuture.completedFuture(null);
-}
-
-@Override
-public CompletableFuture appendCommitMark(TxnID 
txnID, AckType ackType) {
-return CompletableFuture.completedFuture(null);
-}
-
-@Override
-public CompletableFuture appendAbortMark(TxnID 
txnID, AckType ackType) {
-return CompletableFuture.completedFuture(null);
-}
-});
-}
-
-@Override
-public CompletableFuture 
checkInitializedBefore(PersistentSubscription subscription) {
-return CompletableFuture.completedFuture(true);
-}
-
}).when(pulsarTestContext.getPulsarService()).getTransactionPendingAckStoreProvider();
-
 ledgerMock = mock(ManagedLedgerImpl.class);
 cursorMock = mock(ManagedCursorImpl.class);
 managedLedgerConfigMock = mock(ManagedLedgerConfig.class);
@@ -279,4 +234,53 @@ public class PersistentSubscriptionTest {
 // `acknowledgeMessage` should update cursor last active
 assertTrue(persistentSubscription.cursor.getLastActive() > 
beforeAcknowledgeTimestamp);
 }
+
+public static class 

[pulsar] branch branch-3.0 updated (d7e863748ab -> fbc92a281c3)

2023-07-13 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from d7e863748ab Release 3.0.1
 new 7b259131c3e [fix][test] Fix flaky PersistentSubscriptionTest (#20434)
 new fbc92a281c3 [fix][test] Fix resource leak in PulsarTestContext (#20799)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/pulsar/broker/PulsarService.java|   6 +-
 .../pulsar/broker/service/PersistentTopicTest.java |   5 +-
 .../persistent/PersistentSubscriptionTest.java | 106 +++--
 .../testcontext/AbstractTestPulsarService.java |  53 ++-
 .../testcontext/NonStartableTestPulsarService.java |  96 ++-
 5 files changed, 121 insertions(+), 145 deletions(-)



[pulsar] branch master updated: [fix][test] Fix resource leak in PulsarTestContext (#20799)

2023-07-13 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new ae0fc5bdcae [fix][test] Fix resource leak in PulsarTestContext (#20799)
ae0fc5bdcae is described below

commit ae0fc5bdcae0220da4936d0fdaff71eead219cdd
Author: Lari Hotari 
AuthorDate: Fri Jul 14 00:01:22 2023 +0300

[fix][test] Fix resource leak in PulsarTestContext (#20799)
---
 .../org/apache/pulsar/broker/PulsarService.java|  6 +-
 .../pulsar/broker/service/PersistentTopicTest.java |  5 +-
 .../testcontext/AbstractTestPulsarService.java | 53 ---
 .../testcontext/NonStartableTestPulsarService.java | 79 --
 4 files changed, 49 insertions(+), 94 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5cf3c47bcb0..40c5a2d6528 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -784,7 +784,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 exposeTopicMetrics, offloaderScheduler, interval);
 this.defaultOffloader = 
createManagedLedgerOffloader(defaultOffloadPolicies);
 
-this.brokerInterceptor = BrokerInterceptors.load(config);
+setBrokerInterceptor(newBrokerInterceptor());
 // use getter to support mocking getBrokerInterceptor method in 
tests
 BrokerInterceptor interceptor = getBrokerInterceptor();
 if (interceptor != null) {
@@ -927,6 +927,10 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 }
 }
 
+protected BrokerInterceptor newBrokerInterceptor() throws IOException {
+return BrokerInterceptors.load(config);
+}
+
 @VisibleForTesting
 protected OrderedExecutor newOrderedExecutor() {
 return OrderedExecutor.newBuilder()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index fefed1aaa0a..c49df3e85ce 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -131,6 +131,7 @@ import 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.CompactedTopicContext;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactorMXBean;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.awaitility.Awaitility;
@@ -172,11 +173,13 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 svcConfig.setClusterName("pulsar-cluster");
 svcConfig.setTopicLevelPoliciesEnabled(false);
 svcConfig.setSystemTopicEnabled(false);
+Compactor compactor = mock(Compactor.class);
+when(compactor.getStats()).thenReturn(mock(CompactorMXBean.class));
 pulsarTestContext = PulsarTestContext.builderForNonStartableContext()
 .config(svcConfig)
 .spyByDefault()
 .useTestPulsarResources(metadataStore)
-.compactor(mock(Compactor.class))
+.compactor(compactor)
 .build();
 brokerService = pulsarTestContext.getBrokerService();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
index a6861268b94..517d57d0042 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.broker.testcontext;
 
+import java.io.IOException;
 import org.apache.pulsar.broker.BookKeeperClientFactory;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -37,11 +38,7 @@ import 
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
  */
 abstract class AbstractTestPulsarService extends PulsarService {
 protected final SpyConfig spyConfig;
-protected final MetadataStoreExtended localMetadataStore;
-protected final MetadataStoreExtended configurationMetadataStore;
-protected final Compactor compactor;
-protected final BrokerInterceptor brokerInterceptor;
-protected final 

[GitHub] [pulsar] lhotari closed issue #20797: [Tests] Thread resource leaks in unit tests

2023-07-13 Thread via GitHub


lhotari closed issue #20797: [Tests] Thread resource leaks in unit tests 
URL: https://github.com/apache/pulsar/issues/20797


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] lhotari merged pull request #20799: [fix][test] Fix resource leak in PulsarTestContext

2023-07-13 Thread via GitHub


lhotari merged PR #20799:
URL: https://github.com/apache/pulsar/pull/20799


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar-helm-chart] vfauth commented on pull request #341: Respect rbac.limit_to_namespace for Functions RBAC

2023-07-13 Thread via GitHub


vfauth commented on PR #341:
URL: 
https://github.com/apache/pulsar-helm-chart/pull/341#issuecomment-1634612612

   @lhotari They fix the same issue, so you can close mine, thank you. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #20803: Bump okio from 1.17.2 to 3.4.0

2023-07-13 Thread via GitHub


github-actions[bot] commented on PR #20803:
URL: https://github.com/apache/pulsar/pull/20803#issuecomment-1634602123

   @dependabot[bot] Please add the following content to your PR description and 
select a checkbox:
   ```
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[pulsar] branch dependabot/maven/com.squareup.okio-okio-3.4.0 created (now bf8bd627df5)

2023-07-13 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch 
dependabot/maven/com.squareup.okio-okio-3.4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


  at bf8bd627df5 Bump okio from 1.17.2 to 3.4.0

No new revisions were added by this update.



[GitHub] [pulsar] dependabot[bot] opened a new pull request, #20803: Bump okio from 1.17.2 to 3.4.0

2023-07-13 Thread via GitHub


dependabot[bot] opened a new pull request, #20803:
URL: https://github.com/apache/pulsar/pull/20803

   Bumps [okio](https://github.com/square/okio) from 1.17.2 to 3.4.0.
   
   Changelog
   Sourced from https://github.com/square/okio/blob/master/CHANGELOG.md;>okio's 
changelog.
   
   Version 3.4.0
   2023-07-07
   
   New: Adapt a Java NIO FileSystem (java.nio.file.FileSystem) 
as an Okio FileSystem using
   fileSystem.asOkioFileSystem().
   New: Adapt Android’s AssetManager as an Okio FileSystem 
using AssetFileSystem. This is in the
   new okio-assetfilesystem module. Android applications should 
prefer this over
   FileSystem.RESOURCES as it’s faster to load.
   Fix: Don't crash decoding GZIP files when the optional extra data 
(XLEN) is 32 KiB or larger.
   Fix: Resolve symlinks in FakeFileSystem.canonicalize().
   Fix: Report the correct createdAtMillis in 
NodeJsFileSystem file metadata. We were
   incorrectly using ctimeMs, where c means 
changed, not created.
   Fix: UnsafeCursor is now Closeable.
   
   Version 3.3.0
   2023-01-07
   
   Fix: Don't leak resources when use {} is used with a 
non-local return. We introduced this
   performance and stability bug by not considering that non-local returns 
execute neither the
   return nor catch control flows.
   Fix: Use a sealed interface for BufferedSink and 
BufferedSource. These were never intended
   for end-users to implement, and we're happy that Kotlin now allows us to 
express that in our API.
   New: Change internal locks from synchronized to 
ReentrantLock and Condition. We expect this
   to improve help when using Okio with Java virtual threads ([Project 
Loom][loom]).
   Upgrade: [Kotlin 1.8.0][kotlin_1_8_0].
   
   Version 3.2.0
   2022-06-26
   
   Fix: Configure the multiplatform artifact 
(com.squareup.okio:okio:3.x.x) to depend on the
   JVM artifact (com.squareup.okio:okio-jvm:3.x.x) for Maven 
builds. This should work-around an
   issue where Maven doesn't interpret Gradle metadata.
   Fix: Change CipherSource and CipherSink to 
recover if the cipher doesn't support streaming.
   This should work around a crash with AES/GCM ciphers on Android.
   New: Enable compatibility with non-hierarchical projects.
   
   Version 3.1.0
   2022-04-19
   
   Upgrade: [Kotlin 1.6.20][kotlin_1_6_20].
   New: Support [Hierarchical project structure][hierarchical_projects]. If 
you're using Okio in a
   multiplatform project please upgrade your project to Kotlin 1.6.20 (or 
newer) to take advantage
   of this. With hierarchical projects it's easier to use properties like 
FileSystem.SYSTEM that
   
   
   
   ... (truncated)
   
   
   Commits
   
   https://github.com/square/okio/commit/a161b07fb1b459371458ae6d9508ec31df280428;>a161b07
 Prepare for release 3.4.0.
   https://github.com/square/okio/commit/c5f462b0b51979f0b23b08bff123011bb01045ea;>c5f462b
 Copyright to files in build-support (https://redirect.github.com/square/okio/issues/1285;>#1285)
   https://github.com/square/okio/commit/f21714d492f054ae689b455284816721498775eb;>f21714d
 Upgrade Gradle and JMH (https://redirect.github.com/square/okio/issues/1283;>#1283)
   https://github.com/square/okio/commit/5f5db4a0d2b1a3a0147c6bc18aeaba5a4ffa4037;>5f5db4a
 Merge pull request https://redirect.github.com/square/okio/issues/1284;>#1284 from 
square/renovate/com.google.jimfs
   https://github.com/square/okio/commit/8af8d2a87b0c71ced5d16c44daef20ab0c5d48c8;>8af8d2a
 Update dependency com.google.jimfs:jimfs to v1.3.0
   https://github.com/square/okio/commit/b64c198b790804eea26a05f5409bffb1a4a2d8eb;>b64c198
 Update dependency com.vanniktech:gradle-maven-publish-plugin to v0.25.3 (https://redirect.github.com/square/okio/issues/1282;>#1282)
   https://github.com/square/okio/commit/ea827139afef064ddd0078607719d32d7c154c0f;>ea82713
 Merge pull request https://redirect.github.com/square/okio/issues/1281;>#1281 from 
square/renovate/gradle-7.x
   https://github.com/square/okio/commit/3569daa8b8d039a8989440abcc970b7f35171d49;>3569daa
 Update dependency gradle to v7.6.2
   https://github.com/square/okio/commit/e937a50ffc482f9777b639d3399ba331b167107a;>e937a50
 Merge pull request https://redirect.github.com/square/okio/issues/1277;>#1277 from 
sifmelcara/fix-int-sign-conversion
   https://github.com/square/okio/commit/81bce1a30af244550b0324597720e4799281da7b;>81bce1a
 Fix a bug where xlen larger than 0x7fff was rejected (https://redirect.github.com/square/okio/issues/1280;>#1280)
   Additional commits viewable in https://github.com/square/okio/compare/okio-parent-1.17.2...parent-3.4.0;>compare
 view
   
   
   
   
   
   [![Dependabot compatibility 
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=com.squareup.okio:okio=maven=1.17.2=3.4.0)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)
   
   Dependabot will resolve any conflicts with this PR as long as you don't 
alter it 

[GitHub] [pulsar] github-actions[bot] commented on pull request #20802: Bump okio from 1.17.2 to 3.4.0 in /pulsar-sql

2023-07-13 Thread via GitHub


github-actions[bot] commented on PR #20802:
URL: https://github.com/apache/pulsar/pull/20802#issuecomment-1634598867

   @dependabot[bot] Please add the following content to your PR description and 
select a checkbox:
   ```
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] dependabot[bot] opened a new pull request, #20802: Bump okio from 1.17.2 to 3.4.0 in /pulsar-sql

2023-07-13 Thread via GitHub


dependabot[bot] opened a new pull request, #20802:
URL: https://github.com/apache/pulsar/pull/20802

   Bumps [okio](https://github.com/square/okio) from 1.17.2 to 3.4.0.
   
   Changelog
   Sourced from https://github.com/square/okio/blob/master/CHANGELOG.md;>okio's 
changelog.
   
   Version 3.4.0
   2023-07-07
   
   New: Adapt a Java NIO FileSystem (java.nio.file.FileSystem) 
as an Okio FileSystem using
   fileSystem.asOkioFileSystem().
   New: Adapt Android’s AssetManager as an Okio FileSystem 
using AssetFileSystem. This is in the
   new okio-assetfilesystem module. Android applications should 
prefer this over
   FileSystem.RESOURCES as it’s faster to load.
   Fix: Don't crash decoding GZIP files when the optional extra data 
(XLEN) is 32 KiB or larger.
   Fix: Resolve symlinks in FakeFileSystem.canonicalize().
   Fix: Report the correct createdAtMillis in 
NodeJsFileSystem file metadata. We were
   incorrectly using ctimeMs, where c means 
changed, not created.
   Fix: UnsafeCursor is now Closeable.
   
   Version 3.3.0
   2023-01-07
   
   Fix: Don't leak resources when use {} is used with a 
non-local return. We introduced this
   performance and stability bug by not considering that non-local returns 
execute neither the
   return nor catch control flows.
   Fix: Use a sealed interface for BufferedSink and 
BufferedSource. These were never intended
   for end-users to implement, and we're happy that Kotlin now allows us to 
express that in our API.
   New: Change internal locks from synchronized to 
ReentrantLock and Condition. We expect this
   to improve help when using Okio with Java virtual threads ([Project 
Loom][loom]).
   Upgrade: [Kotlin 1.8.0][kotlin_1_8_0].
   
   Version 3.2.0
   2022-06-26
   
   Fix: Configure the multiplatform artifact 
(com.squareup.okio:okio:3.x.x) to depend on the
   JVM artifact (com.squareup.okio:okio-jvm:3.x.x) for Maven 
builds. This should work-around an
   issue where Maven doesn't interpret Gradle metadata.
   Fix: Change CipherSource and CipherSink to 
recover if the cipher doesn't support streaming.
   This should work around a crash with AES/GCM ciphers on Android.
   New: Enable compatibility with non-hierarchical projects.
   
   Version 3.1.0
   2022-04-19
   
   Upgrade: [Kotlin 1.6.20][kotlin_1_6_20].
   New: Support [Hierarchical project structure][hierarchical_projects]. If 
you're using Okio in a
   multiplatform project please upgrade your project to Kotlin 1.6.20 (or 
newer) to take advantage
   of this. With hierarchical projects it's easier to use properties like 
FileSystem.SYSTEM that
   
   
   
   ... (truncated)
   
   
   Commits
   
   https://github.com/square/okio/commit/a161b07fb1b459371458ae6d9508ec31df280428;>a161b07
 Prepare for release 3.4.0.
   https://github.com/square/okio/commit/c5f462b0b51979f0b23b08bff123011bb01045ea;>c5f462b
 Copyright to files in build-support (https://redirect.github.com/square/okio/issues/1285;>#1285)
   https://github.com/square/okio/commit/f21714d492f054ae689b455284816721498775eb;>f21714d
 Upgrade Gradle and JMH (https://redirect.github.com/square/okio/issues/1283;>#1283)
   https://github.com/square/okio/commit/5f5db4a0d2b1a3a0147c6bc18aeaba5a4ffa4037;>5f5db4a
 Merge pull request https://redirect.github.com/square/okio/issues/1284;>#1284 from 
square/renovate/com.google.jimfs
   https://github.com/square/okio/commit/8af8d2a87b0c71ced5d16c44daef20ab0c5d48c8;>8af8d2a
 Update dependency com.google.jimfs:jimfs to v1.3.0
   https://github.com/square/okio/commit/b64c198b790804eea26a05f5409bffb1a4a2d8eb;>b64c198
 Update dependency com.vanniktech:gradle-maven-publish-plugin to v0.25.3 (https://redirect.github.com/square/okio/issues/1282;>#1282)
   https://github.com/square/okio/commit/ea827139afef064ddd0078607719d32d7c154c0f;>ea82713
 Merge pull request https://redirect.github.com/square/okio/issues/1281;>#1281 from 
square/renovate/gradle-7.x
   https://github.com/square/okio/commit/3569daa8b8d039a8989440abcc970b7f35171d49;>3569daa
 Update dependency gradle to v7.6.2
   https://github.com/square/okio/commit/e937a50ffc482f9777b639d3399ba331b167107a;>e937a50
 Merge pull request https://redirect.github.com/square/okio/issues/1277;>#1277 from 
sifmelcara/fix-int-sign-conversion
   https://github.com/square/okio/commit/81bce1a30af244550b0324597720e4799281da7b;>81bce1a
 Fix a bug where xlen larger than 0x7fff was rejected (https://redirect.github.com/square/okio/issues/1280;>#1280)
   Additional commits viewable in https://github.com/square/okio/compare/okio-parent-1.17.2...parent-3.4.0;>compare
 view
   
   
   
   
   
   [![Dependabot compatibility 
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=com.squareup.okio:okio=maven=1.17.2=3.4.0)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)
   
   Dependabot will resolve any conflicts with this PR as long as you don't 
alter it 

[pulsar] branch dependabot/maven/pulsar-sql/com.squareup.okio-okio-3.4.0 created (now b38d3c4b2fc)

2023-07-13 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch 
dependabot/maven/pulsar-sql/com.squareup.okio-okio-3.4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


  at b38d3c4b2fc Bump okio from 1.17.2 to 3.4.0 in /pulsar-sql

No new revisions were added by this update.



[GitHub] [pulsar-client-cpp] BewareMyPower opened a new pull request, #302: Fix the build failure with C++20 standard

2023-07-13 Thread via GitHub


BewareMyPower opened a new pull request, #302:
URL: https://github.com/apache/pulsar-client-cpp/pull/302

   ### Motivation
   
   When building the project with the `-DCMAKE_CXX_STANDARD=20` option and GCC 
11.3, it failed. There are two main reasons.
   
   One is the `ObjectPool.h`, see http://eel.is/c++draft/diff.cpp17.class#2
   
   In short, see the code below:
   
   ```c++
   template 
   struct A {
 // A() {}  // error: simple-template-id not allowed for constructor
 A() {}// OK, injected-class-name used
   };
   ```
   
   The other reason is deeply hidden and OS-specific. When building the target 
for the unit test, the `lib/` directory is added into the include directories. 
So for `#include "Semaphore.h"`, the `Semaphore.h` header will be looked up 
first in the `lib/` directory. However, C++20 introduced a `` 
header, which finds the POSIX semaphore header `semaphore.h` in the system path.
   
   For example, the include order in `ubuntu:22.04` arm64 container is:
   - `$PROJECT_DIR/lib/` (where `Semaphore.h` is)
   - ...
   - `/usr/lib/gcc/aarch64-linux-gnu/11/include` (where `semaphore.h` is)
   
   The C++ header is case insensitive so the `lib/Semaphore.h` will be included 
by the `` header, which is implicitly included by ``. Our 
own `Semaphore.h` does not have the POSIX semaphore struct definitions so the 
build failed.
   
   ### Modifications
   
   - Fix the semantics error in `ObjectPool.h`
   - Remove the `lib/` directory from the included directories of the unit test 
and include `lib/xxx.h` for header in `lib/` directory.
   - Add a workflow to verify now it can be built with C++20


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] JeffBolle commented on pull request #20623: [improve][io] Elastic Search Sink can now handle raw Record

2023-07-13 Thread via GitHub


JeffBolle commented on PR #20623:
URL: https://github.com/apache/pulsar/pull/20623#issuecomment-1634484762

   > @JeffBolle Please rebase (or merge origin/master) to your PR branch and 
resolve the conflict.
   
   Roger, taking care of it now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #20623: [improve][io] Elastic Search Sink can now handle raw Record

2023-07-13 Thread via GitHub


lhotari commented on PR #20623:
URL: https://github.com/apache/pulsar/pull/20623#issuecomment-1634397306

   @JeffBolle Please rebase (or merge origin/master) to your PR  branch and 
resolve the conflict.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] shibd commented on pull request #20791: [improve][io] Add notifyError method on PushSource.

2023-07-13 Thread via GitHub


shibd commented on PR #20791:
URL: https://github.com/apache/pulsar/pull/20791#issuecomment-1634373968

   > @shibd Would this PR change the interface for the ecosystem component? If 
yes, I think we shouldn't add it to branch-3.0. I remove the label 
`release-3.0.2` first. But feel free to confirm it and add back the label if 
it's suitable to cherry-pick to the LTS branch.
   
   It doesn't change any behavior of existing connectors, just new interfaces 
`notifyError`.
   
   I think this change is more like an interface enhancement or bug fix, and we 
should release to 3.0.2.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[pulsar] branch master updated: [fix] [io] elastic-search sink connector not support JSON.String schema. (#20741)

2023-07-13 Thread baodi
This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new fa72ab4d1fc [fix] [io] elastic-search sink connector not support 
JSON.String schema. (#20741)
fa72ab4d1fc is described below

commit fa72ab4d1fc9991c20025ef86bb8a0b8d65e625b
Author: Baodi Shi 
AuthorDate: Thu Jul 13 22:40:23 2023 +0800

[fix] [io] elastic-search sink connector not support JSON.String schema. 
(#20741)
---
 .../pulsar/io/elasticsearch/ElasticSearchSink.java | 13 +++-
 .../io/elasticsearch/ElasticSearchSinkTests.java   | 80 ++
 2 files changed, 91 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
index e2566d20638..db2a96624bc 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
@@ -373,13 +373,22 @@ public class ElasticSearchSink implements 
Sink {
 return node;
 }
 
-public static JsonNode extractJsonNode(Schema schema, Object val) {
+public JsonNode extractJsonNode(Schema schema, Object val) throws 
JsonProcessingException {
 if (val == null) {
 return null;
 }
 switch (schema.getSchemaInfo().getType()) {
 case JSON:
-return (JsonNode) ((GenericRecord) val).getNativeObject();
+Object nativeObject = ((GenericRecord) val).getNativeObject();
+if (nativeObject instanceof String) {
+try {
+return objectMapper.readTree((String) nativeObject);
+} catch (JsonProcessingException e) {
+log.error("Failed to read JSON string: {}", 
nativeObject, e);
+throw e;
+}
+}
+return (JsonNode) nativeObject;
 case AVRO:
 org.apache.avro.generic.GenericRecord node = 
(org.apache.avro.generic.GenericRecord)
 ((GenericRecord) val).getNativeObject();
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index 9fad03c3579..62592f5f09b 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.io.elasticsearch;
 
 import co.elastic.clients.transport.ElasticsearchTransport;
+import com.fasterxml.jackson.core.JsonParseException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
@@ -243,6 +244,85 @@ public abstract class ElasticSearchSinkTests extends 
ElasticSearchTestBase {
 verify(mockRecord, times(1)).ack();
 }
 
+@Test
+public final void sendJsonStringSchemaTest() throws Exception {
+
+when(mockRecord.getMessage()).thenAnswer(new 
Answer>>() {
+@Override
+public Optional> answer(InvocationOnMock 
invocation) throws Throwable {
+final MessageImpl mock = mock(MessageImpl.class);
+
when(mock.getData()).thenReturn("{\"a\":1}".getBytes(StandardCharsets.UTF_8));
+return Optional.of(mock);
+}
+});
+
+when(mockRecord.getKey()).thenAnswer(new Answer>() {
+public Optional answer(InvocationOnMock invocation) throws 
Throwable {
+return Optional.empty();
+}
+});
+
+GenericRecord genericRecord = mock(GenericRecord.class);
+when(genericRecord.getNativeObject()).thenReturn("{\"a\":1}");
+when(genericRecord.getSchemaType()).thenReturn(SchemaType.JSON);
+when(mockRecord.getValue()).thenAnswer(new Answer() {
+public GenericRecord answer(InvocationOnMock invocation) throws 
Throwable {
+return genericRecord;
+}
+});
+
+when(mockRecord.getSchema()).thenAnswer(new Answer() {
+public Schema answer(InvocationOnMock invocation) throws Throwable 
{
+return Schema.JSON(String.class);
+}
+});
+
+map.put("indexName", "test-index");
+map.put("schemaEnable", "true");
+sink.open(map, mockSinkContext);
+sink.write(mockRecord);
+verify(mockRecord, times(1)).ack();
+}
+
+

[GitHub] [pulsar] shibd merged pull request #20741: [fix] [io] elastic-search sink connector not support JSON.String schema.

2023-07-13 Thread via GitHub


shibd merged PR #20741:
URL: https://github.com/apache/pulsar/pull/20741


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] alexesom opened a new issue, #20801: [Doc] Lack of parameters for admin.namespaces().splitNamespaceBundle in docs

2023-07-13 Thread via GitHub


alexesom opened a new issue, #20801:
URL: https://github.com/apache/pulsar/issues/20801

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### What issue do you find in Pulsar docs?
   
   In [Split namespace 
bundles](https://pulsar.apache.org/docs/next/admin-api-namespaces/#split-namespace-bundles)
 the number of parameters is incorrect. 
   
   In the Namespaces.java:
   /**
* Split namespace bundle.
*
* @param namespace
* @param bundle range of bundle to split
* @param unloadSplitBundles
* @param splitAlgorithmName
* @throws PulsarAdminException
*/
   void splitNamespaceBundle(String namespace, String bundle, boolean 
unloadSplitBundles, String splitAlgorithmName)
   throws PulsarAdminException;
   
   ### What is your suggestion?
   
   Correct the signature of the method and provide explanations for the new 
parameters.
   
   ### Any reference?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[pulsar] branch master updated: [fix][io] Not restart instance when kafka source poll exception. (#20795)

2023-07-13 Thread baodi
This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 911fbf5fa2b [fix][io] Not restart instance when kafka source poll 
exception. (#20795)
911fbf5fa2b is described below

commit 911fbf5fa2b49825d7dcbf2270f0329a5267a2fa
Author: Baodi Shi 
AuthorDate: Thu Jul 13 21:11:06 2023 +0800

[fix][io] Not restart instance when kafka source poll exception. (#20795)
---
 .../pulsar/io/kafka/KafkaAbstractSource.java   | 53 --
 .../io/kafka/source/KafkaAbstractSourceTest.java   | 43 +-
 2 files changed, 52 insertions(+), 44 deletions(-)

diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index 012e4143744..3d4612c039f 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -27,7 +27,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -133,7 +132,6 @@ public abstract class KafkaAbstractSource extends 
PushSource {
 throw new IllegalArgumentException("Unable to instantiate Kafka 
consumer", ex);
 }
 this.start();
-running = true;
 }
 
 protected Properties beforeCreateConsumer(Properties props) {
@@ -158,47 +156,36 @@ public abstract class KafkaAbstractSource extends 
PushSource {
 
 @SuppressWarnings("unchecked")
 public void start() {
+LOG.info("Starting subscribe kafka source on {}", 
kafkaSourceConfig.getTopic());
+
consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
 runnerThread = new Thread(() -> {
-LOG.info("Starting kafka source on {}", 
kafkaSourceConfig.getTopic());
-
consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
 LOG.info("Kafka source started.");
 while (running) {
-ConsumerRecords consumerRecords = 
consumer.poll(Duration.ofSeconds(1L));
-CompletableFuture[] futures = new 
CompletableFuture[consumerRecords.count()];
-int index = 0;
-for (ConsumerRecord consumerRecord : 
consumerRecords) {
-KafkaRecord record = buildRecord(consumerRecord);
-if (LOG.isDebugEnabled()) {
-LOG.debug("Write record {} {} {}", record.getKey(), 
record.getValue(), record.getSchema());
+try {
+ConsumerRecords consumerRecords = 
consumer.poll(Duration.ofSeconds(1L));
+CompletableFuture[] futures = new 
CompletableFuture[consumerRecords.count()];
+int index = 0;
+for (ConsumerRecord consumerRecord : 
consumerRecords) {
+KafkaRecord record = buildRecord(consumerRecord);
+if (LOG.isDebugEnabled()) {
+LOG.debug("Write record {} {} {}", 
record.getKey(), record.getValue(), record.getSchema());
+}
+consume(record);
+futures[index] = record.getCompletableFuture();
+index++;
 }
-consume(record);
-futures[index] = record.getCompletableFuture();
-index++;
-}
-if (!kafkaSourceConfig.isAutoCommitEnabled()) {
-try {
+if (!kafkaSourceConfig.isAutoCommitEnabled()) {
 CompletableFuture.allOf(futures).get();
 consumer.commitSync();
-} catch (InterruptedException ex) {
-break;
-} catch (ExecutionException ex) {
-LOG.error("Error while processing records", ex);
-break;
 }
+} catch (Exception e) {
+LOG.error("Error while processing records", e);
+notifyError(e);
+break;
 }
 }
 });
-runnerThread.setUncaughtExceptionHandler(
-(t, e) -> {
-new Thread(() -> {
-LOG.error("[{}] Error while consuming records", 
t.getName(), e);
-try {
-this.close();
-} catch (Exception ex) {
-  

[GitHub] [pulsar] shibd merged pull request #20795: [fix][io] Not restart instance when kafka source poll exception.

2023-07-13 Thread via GitHub


shibd merged PR #20795:
URL: https://github.com/apache/pulsar/pull/20795


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #20800: [improve][broker] Add the MessageExpirer interface to make code clear

2023-07-13 Thread via GitHub


github-actions[bot] commented on PR #20800:
URL: https://github.com/apache/pulsar/pull/20800#issuecomment-1634165256

   @BewareMyPower Please add the following content to your PR description and 
select a checkbox:
   ```
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] BewareMyPower opened a new pull request, #20800: [improve][broker] Add the MessageExpirer interface to make code clear

2023-07-13 Thread via GitHub


BewareMyPower opened a new pull request, #20800:
URL: https://github.com/apache/pulsar/pull/20800

   ### Motivation
   
   When I reviewed https://github.com/apache/pulsar/pull/20597, the unrelated 
changes in `PersistentTopicsBase` are hard to read. The logic could be 
simplified to:
   
   ```java
   PersistentSubscription sub = null;
   PersistentReplicator repl = null;
   if (metSomeCondition()) {
   repl = /* ... */;
   if (repl == null) {
   /* ... */
   return;
   }
   } else {
   sub = /* ... */;
   if (repl == null) {
   /* ... */
   return;
   }
   }
   final PersistentSubscription finalSub = sub;
   final PersistentReplicator finalRepl = repl;
   future.thenAccept(__ -> {
   if (metSomeCondition()) {
   repl.expireMessages(/* ... */);
   } else {
   sub.expireMessages(/* ... */);
   }
   });
   ```
   
   The code above is such a mess. It adds two final variables because the 
lambda can only capture final variables. The `metSomeCondition` check is 
performed unnecessarily twice. The original code is more hard to read because 
the logic in `/* ... */` takes a few lines so that the two calls of 
`metSomeCondition()` are not near.
   
   From the code search I see all these classes implement two `expireMessages` 
methods that accept an integer or a position.
   
   - PersistentMessageExpiryMonitor
   - PersistentSubscription
   - PersistentReplicator
   - NonPersistentSubscription
   
   The code can be simplified to introduce a new interface.
   
   ### Modifications
   
   Introduce a `MessageExpirer` interface and change the class hierarchy to:
   
   ```
   // [I] is interface, [C] is class
   [I] MessageExpirer
 [I] Subscription
   [C] PersistentSubscription
   [C] NonPersistentSubscription
 [C] PersistentReplicator
 [C] PersistentMessageExpiryMonitor
   ```
   
   The method invocation can be simplified much as shown in this patch.
   
   P.S. Inserting such an interface in the type hierarchy does not even break 
the ABI compatibility, see 
https://docs.oracle.com/javase/specs/jls/se7/html/jls-13.html
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [x] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #20748: [pip][design] PIP-281: Optimize Bundle Unload(Transfer) Protocol for ExtensibleLoadManager

2023-07-13 Thread via GitHub


Demogorgon314 commented on code in PR #20748:
URL: https://github.com/apache/pulsar/pull/20748#discussion_r1262482139


##
pip/pip-281.md:
##
@@ -0,0 +1,233 @@
+
+
+# Background knowledge
+
+- Pulsar broker load balancer periodically unloads bundles from overloaded 
brokers. During this unload process, previous owner brokers close topic 
sessions(e.g. producers, subscriptions(consumers), managed ledgers). When 
re-assigned, new owner brokers recreate the topic sessions.
+
+- Pulsar clients request `CommandLookupTopic` to lookup or assign owner 
brokers for topics and connect to them.
+
+- PIP-192, the extensible load balancer introduced the bundle state channel 
that event-sources this unloading process in a state machine manner, from 
`releasing,` `assigned`, to `owned` state order. At `releasing,` the owner 
broker "releases" the bundle ownership(close topic sessions).
+
+- PIP-192, the extensible load balancer introduced TransferShedder, a new 
shedding strategy, which pre-assigns new owner brokers beforehand.
+
+
+# Motivation
+
+- When unloading closes many topic sessions, then many clients need to request 
CommandLookupTopic at the same time, which could cause many lookup requests on 
brokers. This unloading process can be further optimized if we can let the 
client directly connect to the new owner broker without following 
`CommandLookupTopic` requests.
+- In the new load balancer(pip-192), since the owner broker is already known, 
we can modify the close command protocol to pass the new destination broker URL 
and skip the lookup requests.
+- Also, when unloading, we can gracefully shutdown ledgers -- we always close 
old managed ledgers first and then recreate it on the new owner without 
conflicts.
+
+# Goals
+- Remove clients' lookup requests in the unload protocol
+- Gracefully shutdown managed ledgers before new owners create them.
+
+## In Scope
+
+
+
+- This change will be added in the extensible load balancer.
+
+## Out of Scope
+
+
+
+- This won't change the existing load balancer behavior(modular load manager).
+
+
+
+# High Level Design
+
+
+
+Current Unload and Lookup Sequence in Extensible Load Balancer
+```mermaid
+sequenceDiagram
+participant Clients
+participant Owner Broker
+participant New Owner Broker
+participant Leader Broker
+Leader Broker ->> Owner Broker: "state:Releasing:" close topic
+Owner Broker ->> Owner Broker: close broker topic sessions
+Owner Broker ->> Clients: close producers and consumers
+Clients ->> Clients: reconnecting (inital delay 100ms)
+Owner Broker ->> New Owner Broker: "state:Assign:" assign new ownership
+New Owner Broker ->> Owner Broker: "state:Owned:" ack new ownership
+Clients ->> Owner Broker: lookup
+Owner Broker ->> Clients: redirect
+Clients ->> New Owner Broker: lookup
+New Owner Broker ->> Clients: return(connected)
+```
+
+Proposed Unload Sequence in Extensible Load Balancer without Lookup
+```mermaid
+sequenceDiagram
+participant Clients
+participant Owner Broker
+participant New Owner Broker
+participant Leader Broker
+Leader Broker ->> Owner Broker: "state:Releasing:" close topic
+Owner Broker ->> Owner Broker: close broker topic sessions(e.g ledgers) 
without disconnecting producers/consumers(fenced)
+Clients -->> Owner Broker: message pubs are ignored
+Owner Broker ->> New Owner Broker: "state:Assign:" assign new ownership
+New Owner Broker ->> Owner Broker: "state:Owned:" ack new ownership
+Owner Broker ->> Owner Broker: close the fenced broker topic sessions
+Owner Broker ->> Clients: close producers and consumers (with 
newOwnerBrokerUrl)
+Clients ->> New Owner Broker: immediately connect
+```
+
+
+# Detailed Design
+
+## Design & Implementation Details
+
+
+
+- Modify CommandCloseProducer, CommandCloseConsumer to pass optional 
brokerServiceUrls
+```
+message CommandCloseProducer {

Review Comment:
   We also need to consider the Pulsar proxy, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] rob-kvietkauskas-ign commented on issue #20722: [Bug] Unable to create Connectors on Pulsar 2.9.1 (bare metal cluster)

2023-07-13 Thread via GitHub


rob-kvietkauskas-ign commented on issue #20722:
URL: https://github.com/apache/pulsar/issues/20722#issuecomment-1634149488

   Yesterday I have spent some time analizing 2.11.1 Pulsar source code, trying 
to figure out what may be the cause of zookeper connection to `localhost` (even 
though, nor broker, nor bookie, nor functions worker configuration does not 
contain any entries related to `localhost`). After digging the code that is 
mentioned in the stacktraces I've provided and stacktraces I get using Pulsar 
2.11.1, I have a strong feeling that problem is located in 
`uploadToBookKeeper(Namespace, InputStream, String)` method call in 
[WorkerUtils 
class](https://github.com/apache/pulsar/blob/7233f0e6616ea54d841be7d17bf2abef4d3827c7/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java#L98).
 After this method gets called, BookKeeper client tries to access a bookkeeper 
via ZooKeeper, but somehow (not sure how exactly) ZooKeeperClient gets 
initialized with localhost value.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] codecov-commenter commented on pull request #20623: [improve][io] Elastic Search Sink can now handle raw Record

2023-07-13 Thread via GitHub


codecov-commenter commented on PR #20623:
URL: https://github.com/apache/pulsar/pull/20623#issuecomment-1634147460

   ## 
[Codecov](https://app.codecov.io/gh/apache/pulsar/pull/20623?src=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 Report
   > Merging 
[#20623](https://app.codecov.io/gh/apache/pulsar/pull/20623?src=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 (3556617) into 
[master](https://app.codecov.io/gh/apache/pulsar/commit/677d160148afc935c34a072fb62b765a8a65045c?el=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 (677d160) will **increase** coverage by `41.18%`.
   > The diff coverage is `74.07%`.
   
   [![Impacted file tree 
graph](https://app.codecov.io/gh/apache/pulsar/pull/20623/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/20623?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
   
   ```diff
   @@  Coverage Diff  @@
   ## master   #20623   +/-   ##
   =
   + Coverage 31.93%   73.11%   +41.18% 
   - Complexity1177932039+20260 
   =
 Files  1498 1866  +368 
 Lines114595   139072+24477 
 Branches  1242815302 +2874 
   =
   + Hits  36591   101689+65098 
   + Misses7315829323-43835 
   - Partials   4846 8060 +3214 
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | inttests | `24.16% <ø> (?)` | |
   | systests | `24.92% <0.00%> (?)` | |
   | unittests | `72.41% <74.07%> (+40.47%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   | [Impacted 
Files](https://app.codecov.io/gh/apache/pulsar/pull/20623?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[...che/pulsar/io/elasticsearch/ElasticSearchSink.java](https://app.codecov.io/gh/apache/pulsar/pull/20623?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWlvL2VsYXN0aWMtc2VhcmNoL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9wdWxzYXIvaW8vZWxhc3RpY3NlYXJjaC9FbGFzdGljU2VhcmNoU2luay5qYXZh)
 | `74.61% <74.07%> (ø)` | |
   
   ... and [1566 files with indirect coverage 
changes](https://app.codecov.io/gh/apache/pulsar/pull/20623/indirect-changes?src=pr=tree-more_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] daziz opened a new pull request, #20798: Correcting spelling mistakes in the pulsar-broker module

2023-07-13 Thread via GitHub


daziz opened a new pull request, #20798:
URL: https://github.com/apache/pulsar/pull/20798

   
   
   
   
   Fixes #xyz
   
   
   
   Main Issue: #xyz
   
   
   
   PIP: #xyz 
   
   
   
   ### Motivation
   
   
   
   ### Modifications
   
   Corrected all spelling mistakes I found in the pulsar-broker module.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [X] `doc` 
   - [ ] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] lhotari commented on issue #20797: [Tests] Thread resource leaks in unit tests

2023-07-13 Thread via GitHub


lhotari commented on issue #20797:
URL: https://github.com/apache/pulsar/issues/20797#issuecomment-1634081930

   Simple way to reproduce the thread leak in ServerCnxTest by executing a 
single test method:
   ```
   mvn -DredirectTestOutputToFile=false -DtestRetryCount=0 test -pl 
pulsar-broker -Dtest=ServerCnxTest#testConnectCommandWithEnum
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] lhotari opened a new issue, #20797: [Tests] Thread resource leaks in unit tests

2023-07-13 Thread via GitHub


lhotari opened a new issue, #20797:
URL: https://github.com/apache/pulsar/issues/20797

   Thread leaks:
   ```
   ❯ grep -shEr "created [0-9]+ new threads" pulsar-broker | awk -F "Summary: " 
'{ print $2 }' | awk '{ print $(NF-2), $0}' |sort -rn | cut -f2- -d' '
   Tests in class org.apache.pulsar.broker.service.ServerCnxTest created 849 
new threads
   Tests in class org.apache.pulsar.broker.service.ClusterMigrationTest created 
743 new threads
   Tests in class 
org.apache.pulsar.broker.transaction.AuthenticatedTransactionProducerConsumerTest
 created 100 new threads
   Tests in class org.apache.pulsar.broker.loadbalance.LoadBalancerTest created 
92 new threads
   Tests in class 
org.apache.pulsar.broker.loadbalance.MultiBrokerLeaderElectionTest created 89 
new threads
   Tests in class 
org.apache.pulsar.broker.loadbalance.MultiBrokerLeaderElectionExpirationTest 
created 81 new threads
   Tests in class 
org.apache.pulsar.broker.transaction.pendingack.PendingAckInMemoryDeleteTest 
created 78 new threads
   Tests in class 
org.apache.pulsar.broker.transaction.buffer.TransactionStablePositionTest 
created 78 new threads
   Tests in class org.apache.pulsar.broker.service.MessageCumulativeAckTest 
created 62 new threads
   Tests in class org.apache.pulsar.broker.service.ReplicatorTest created 61 
new threads
   Tests in class org.apache.pulsar.broker.service.BrokerBookieIsolationTest 
created 51 new threads
   Tests in class 
org.apache.pulsar.broker.loadbalance.SimpleLoadManagerImplTest created 51 new 
threads
   Tests in class 
org.apache.pulsar.broker.loadbalance.ModularLoadManagerImplTest created 47 new 
threads
   Tests in class org.apache.pulsar.broker.service.TopicOwnerTest created 40 
new threads
   Tests in class org.apache.pulsar.broker.service.BrokerServiceTest created 37 
new threads
   Tests in class org.apache.pulsar.broker.service.BrokerBkEnsemblesTests 
created 32 new threads
   Tests in class org.apache.pulsar.broker.SLAMonitoringTest created 31 new 
threads
   Tests in class org.apache.pulsar.broker.service.ReplicatorAdminTlsTest 
created 26 new threads
   Tests in class 
org.apache.pulsar.broker.service.ReplicatorAdminTlsWithKeyStoreTest created 25 
new threads
   Tests in class 
org.apache.pulsar.broker.service.persistent.PersistentSubscriptionTest created 
25 new threads
   Tests in class org.apache.pulsar.broker.service.PeerReplicatorTest created 
24 new threads
   Tests in class org.apache.pulsar.broker.service.ReplicatorSubscriptionTest 
created 23 new threads
   Tests in class org.apache.pulsar.broker.service.MaxMessageSizeTest created 
23 new threads
   Tests in class org.apache.pulsar.broker.service.ReplicatorTopicPoliciesTest 
created 22 new threads
   Tests in class org.apache.pulsar.broker.service.OneWayReplicatorTest created 
20 new threads
   Tests in class org.apache.pulsar.broker.service.schema.SchemaServiceTest 
created 18 new threads
   Tests in class org.apache.pulsar.broker.service.ReplicatorTlsTest created 18 
new threads
   Tests in class org.apache.pulsar.broker.service.ReplicatorRemoveClusterTest 
created 18 new threads
   Tests in class org.apache.pulsar.broker.service.BacklogQuotaManagerTest 
created 17 new threads
   Tests in class 
org.apache.pulsar.broker.service.schema.PartitionedTopicsSchemaTest created 16 
new threads
   Tests in class org.apache.pulsar.broker.loadbalance.SimpleBrokerStartTest 
created 13 new threads
   Tests in class 
org.apache.pulsar.broker.loadbalance.LeaderElectionServiceTest created 13 new 
threads
   Tests in class 
org.apache.pulsar.broker.loadbalance.impl.BundleSplitterTaskTest created 13 new 
threads
   Tests in class org.apache.pulsar.broker.auth.AuthorizationTest created 12 
new threads
   Tests in class 
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumersTest
 created 11 new threads
   Tests in class 
org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadSchedulerTest 
created 10 new threads
   Tests in class 
org.apache.pulsar.broker.intercept.MangedLedgerInterceptorImplTest created 10 
new threads
   Tests in class org.apache.pulsar.broker.service.OpportunisticStripingTest 
created 9 new threads
   Tests in class org.apache.pulsar.broker.service.ManagedLedgerCompressionTest 
created 9 new threads
   Tests in class 
org.apache.pulsar.broker.loadbalance.extensions.AntiAffinityNamespaceGroupExtensionTest
 created 8 new threads
   Tests in class org.apache.pulsar.broker.stats.ManagedCursorMetricsTest 
created 7 new threads
   Tests in class org.apache.pulsar.broker.service.BusyWaitServiceTest created 
7 new threads
   Tests in class 
org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistryTest created 7 
new threads
   Tests in class org.apache.pulsar.broker.web.WebServiceTest created 6 new 
threads
   Tests in class org.apache.pulsar.broker.service.PersistentMessageFinderTest 
created 6 new threads
   Tests in class 

[GitHub] [pulsar] lhotari commented on issue #20673: Flaky-test: TransactionBufferCloseTest.deleteTopicCloseTransactionBufferTest

2023-07-13 Thread via GitHub


lhotari commented on issue #20673:
URL: https://github.com/apache/pulsar/issues/20673#issuecomment-1634054904

   Actually, setting `svcConfig.setBrokerShutdownTimeoutMs(5000L);` in 
ServerCnxTest didn't make the thread leak go away. There must be some other 
problem in shutdown sequence. I'll file another bug report for it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] lhotari commented on issue #20673: Flaky-test: TransactionBufferCloseTest.deleteTopicCloseTransactionBufferTest

2023-07-13 Thread via GitHub


lhotari commented on issue #20673:
URL: https://github.com/apache/pulsar/issues/20673#issuecomment-1634051197

   UPDATE: the thread leak detection does some false alarms since broker 
shutdown is asynchronous in tests because of performance reasons. For example, 
in ServerCnxTest, ` svcConfig.setBrokerShutdownTimeoutMs(5000L);` makes the 
shutdown wait for completion, but that makes the test very slow. /cc @gaoran10 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar-client-go] RobertIndie commented on pull request #1055: [fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and schema

2023-07-13 Thread via GitHub


RobertIndie commented on PR #1055:
URL: 
https://github.com/apache/pulsar-client-go/pull/1055#issuecomment-1634027811

   > Calculte the required resouce (semaphore/memory, when chunking, more than 
1 semaphores, we cache the compressedPayload/meta in the sendRequest when 
Calculting) before we put a request into the dataChan, if there is no enough 
resource, fail fast, in this way, we can delete the sendRequest.blockCh field 
and no need to block;
   
   When enabling the chunking, we cannot get the number of total chunks before 
pushing the request to the dataChan. And there may be a deadlock issue similar 
to https://github.com/apache/pulsar/issues/17446
   
   > Add a sendRequest.done() method, when a request is done (succeed or 
failed), call it, release the resources a request holds, run the callback, 
report metrics, write debug logs in this method, in this way, we manage the 
resource/logic together and don't have to do these things across the whole file.
   
   +1 for this. It's a good practice to manage the resource.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar-client-go] RobertIndie commented on a diff in pull request #1055: [fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and schema

2023-07-13 Thread via GitHub


RobertIndie commented on code in PR #1055:
URL: https://github.com/apache/pulsar-client-go/pull/1055#discussion_r1262383421


##
pulsar/producer_partition.go:
##
@@ -542,6 +538,11 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
 
uncompressedSize := len(uncompressedPayload)
 
+   // try to reserve memory for uncompressedPayload
+   if !p.canReserveMem(request, int64(uncompressedSize)) {
+   return

Review Comment:
   I'm +1 for moving the semaphore release out of `canReserveMem`. It's better 
that we release it here than in the `canReserveMem` before we find a good 
solution for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar-client-go] RobertIndie commented on a diff in pull request #1055: [fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and schema

2023-07-13 Thread via GitHub


RobertIndie commented on code in PR #1055:
URL: https://github.com/apache/pulsar-client-go/pull/1055#discussion_r1262354724


##
pulsar/producer_partition.go:
##
@@ -1407,31 +1410,39 @@ func (p *partitionProducer) releaseSemaphoreAndMem(size 
int64) {
p.client.memLimit.ReleaseMemory(size)
 }
 
-func (p *partitionProducer) canAddToQueue(sr *sendRequest, 
uncompressedPayloadSize int64) bool {
+func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool {
if p.options.DisableBlockIfQueueFull {
if !p.publishSemaphore.TryAcquire() {
runCallback(sr.callback, nil, sr.msg, 
errSendQueueIsFull)
return false
}
-   if !p.client.memLimit.TryReserveMemory(uncompressedPayloadSize) 
{
+   } else {
+   if !p.publishSemaphore.Acquire(sr.ctx) {
+   runCallback(sr.callback, nil, sr.msg, errContextExpired)
+   return false
+   }
+   }
+   p.metrics.MessagesPending.Inc()
+   p.metrics.BytesPending.Add(float64(len(sr.msg.Payload)))

Review Comment:
   I think we should consider the `msg.Schema` here when using the Schema.
   And why we add the bytes pending here instead of `canReserveMem` ?



##
pulsar/producer_test.go:
##
@@ -1924,6 +1924,159 @@ func TestMemLimitRejectProducerMessages(t *testing.T) {
assert.NoError(t, err)
 }
 
+func TestMemLimitRejectProducerMessagesWithSchema(t *testing.T) {
+
+   c, err := NewClient(ClientOptions{
+   URL:  serviceURL,
+   MemoryLimitBytes: 100 * 6,
+   })
+   assert.NoError(t, err)
+   defer c.Close()
+
+   schema := NewAvroSchema(`{"fields":
+   [
+   
{"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]}
+   ],
+   "name":"MyAvro","namespace":"schemaNotFoundTestCase","type":"record"}`, 
nil)
+
+   topicName := newTopicName()
+   producer1, _ := c.CreateProducer(ProducerOptions{
+   Topic:   topicName,
+   DisableBlockIfQueueFull: true,
+   DisableBatching: false,
+   BatchingMaxPublishDelay: 100 * time.Second,
+   SendTimeout: 2 * time.Second,
+   })
+
+   producer2, _ := c.CreateProducer(ProducerOptions{
+   Topic:   topicName,
+   DisableBlockIfQueueFull: true,
+   DisableBatching: false,
+   BatchingMaxPublishDelay: 100 * time.Second,
+   SendTimeout: 2 * time.Second,
+   })
+
+   // the size of encoded value is 6 bytes
+   value := map[string]interface{}{
+   "id": 0,
+   "name": map[string]interface{}{
+   "string": "abc",
+   },
+   }
+
+   n := 101
+   for i := 0; i < n/2; i++ {
+   producer1.SendAsync(context.Background(), {
+   Value:  value,
+   Schema: schema,
+   }, func(id MessageID, message *ProducerMessage, e error) {})
+
+   producer2.SendAsync(context.Background(), {
+   Value:  value,
+   Schema: schema,
+   }, func(id MessageID, message *ProducerMessage, e error) {})
+   }
+   // Last message in order to reach the limit
+   producer1.SendAsync(context.Background(), {
+   Value:  value,
+   Schema: schema,
+   }, func(id MessageID, message *ProducerMessage, e error) {})
+   time.Sleep(100 * time.Millisecond)
+   assert.Equal(t, int64(n*6), c.(*client).memLimit.CurrentUsage())
+
+   _, err = producer1.Send(context.Background(), {
+   Value:  value,
+   Schema: schema,
+   })
+   assert.Error(t, err)
+   assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))
+
+   _, err = producer2.Send(context.Background(), {
+   Value:  value,
+   Schema: schema,
+   })
+   assert.Error(t, err)
+   assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))
+
+   // flush pending msg
+   err = producer1.Flush()
+   assert.NoError(t, err)
+   err = producer2.Flush()
+   assert.NoError(t, err)
+   assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage())
+
+   _, err = producer1.Send(context.Background(), {
+   Value:  value,
+   Schema: schema,
+   })
+   assert.NoError(t, err)
+   _, err = producer2.Send(context.Background(), {
+   Value:  value,
+   Schema: schema,
+   })
+   assert.NoError(t, err)
+}
+
+func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) {
+
+   c, err := NewClient(ClientOptions{
+   URL:  serviceURL,
+   

[GitHub] [pulsar] gaoran10 commented on issue #20673: Flaky-test: TransactionBufferCloseTest.deleteTopicCloseTransactionBufferTest

2023-07-13 Thread via GitHub


gaoran10 commented on issue #20673:
URL: https://github.com/apache/pulsar/issues/20673#issuecomment-1633998198

   @lhotari Thanks for this discovery, I'll check the dump and try to fix this 
problem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a diff in pull request #20330: [optimize][admin]Enhancing Transaction Buffer Stats and Introducing TransactionBufferInternalStats API

2023-07-13 Thread via GitHub


gaoran10 commented on code in PR #20330:
URL: https://github.com/apache/pulsar/pull/20330#discussion_r1262322512


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##
@@ -431,6 +437,70 @@ protected 
CompletableFuture internalGetPendi
 );
 }
 
+protected CompletableFuture 
internalGetTransactionBufferInternalStats(
+boolean authoritative, boolean metadata) {
+TransactionBufferInternalStats transactionBufferInternalStats = new 
TransactionBufferInternalStats();
+return getExistingPersistentTopicAsync(authoritative)
+.thenCompose(topic -> {
+TransactionBuffer.SnapshotType snapshotType = 
topic.getTransactionBuffer().getSnapshotType();
+if (snapshotType == null) {
+return FutureUtil.failedFuture(new 
RestException(NOT_FOUND,
+"Transaction buffer Snapshot for the topic 
does not exist"));
+} else if (snapshotType == 
TransactionBuffer.SnapshotType.Segment) {
+transactionBufferInternalStats.snapshotType = 
snapshotType.toString();
+TopicName segmentTopic = 
TopicName.get(TopicDomain.persistent.toString(), namespaceName,
+
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+return getTxnSnapshotInternalStats(segmentTopic, 
metadata)
+.thenApply(snapshotSystemTopicInternalStats -> 
{

Review Comment:
   Maybe we can use the method `thenAccept` here.



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java:
##
@@ -66,9 +67,10 @@ public interface AbortedTxnProcessor {
 
 /**
  * Get the lastSnapshotTimestamps.
- * @return the lastSnapshotTimestamps.
+ *
+ * @return a transactionBufferStats with the stats in the 
abortedTxnProcessor.
  */
-long getLastSnapshotTimestamps();
+TransactionBufferStats generateSnapshotStats(boolean segmentStats);

Review Comment:
   Do we need to add a new method for compatibility?



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -3547,8 +3547,8 @@ public boolean checkSubscriptionTypesEnable(SubType 
subType) {
 return subTypesEnabled != null && subTypesEnabled.contains(subType);
 }
 
-public TransactionBufferStats getTransactionBufferStats(boolean 
lowWaterMarks) {
-return this.transactionBuffer.getStats(lowWaterMarks);
+public TransactionBufferStats getTransactionBufferStats(boolean 
lowWaterMarks, boolean segmentStats) {

Review Comment:
   Do we need to add a new method for compatibility? /cc @BewareMyPower 
@codelipenghui 



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##
@@ -431,6 +437,70 @@ protected 
CompletableFuture internalGetPendi
 );
 }
 
+protected CompletableFuture 
internalGetTransactionBufferInternalStats(
+boolean authoritative, boolean metadata) {
+TransactionBufferInternalStats transactionBufferInternalStats = new 
TransactionBufferInternalStats();
+return getExistingPersistentTopicAsync(authoritative)
+.thenCompose(topic -> {
+TransactionBuffer.SnapshotType snapshotType = 
topic.getTransactionBuffer().getSnapshotType();
+if (snapshotType == null) {
+return FutureUtil.failedFuture(new 
RestException(NOT_FOUND,
+"Transaction buffer Snapshot for the topic 
does not exist"));
+} else if (snapshotType == 
TransactionBuffer.SnapshotType.Segment) {
+transactionBufferInternalStats.snapshotType = 
snapshotType.toString();
+TopicName segmentTopic = 
TopicName.get(TopicDomain.persistent.toString(), namespaceName,
+
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+return getTxnSnapshotInternalStats(segmentTopic, 
metadata)
+.thenApply(snapshotSystemTopicInternalStats -> 
{
+
transactionBufferInternalStats.segmentInternalStats =
+snapshotSystemTopicInternalStats;
+return transactionBufferInternalStats;
+}).thenCompose(ignore -> {

Review Comment:
   Maybe we can use the `thenCombine`, such as this.
   
   ```
   var segmentStatsFuture = ...;
   var indexStatsFuture = ...;
   segmentStatsFuture.thenCombine(indexStatsFuture, (segmentStats, indexStats) 
-> {
 transactionBufferInternalStats.segmentInternalStats = segmentStats;
 

[GitHub] [pulsar] lhotari commented on issue #20673: Flaky-test: TransactionBufferCloseTest.deleteTopicCloseTransactionBufferTest

2023-07-13 Thread via GitHub


lhotari commented on issue #20673:
URL: https://github.com/apache/pulsar/issues/20673#issuecomment-1633958559

   thread leaks:
   ```
   ❯ grep -shEr "created [0-9][0-9]+ new threads" pulsar-broker
   2023-07-13T06:18:12,115 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class org.apache.pulsar.broker.service.ReplicatorAdminTlsTest 
created 26 new threads
   2023-07-13T06:09:10,589 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class org.apache.pulsar.broker.service.TopicOwnerTest created 
40 new threads
   2023-07-13T06:18:27,708 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class org.apache.pulsar.broker.service.BrokerBkEnsemblesTests 
created 32 new threads
   2023-07-13T05:59:04,195 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class 
org.apache.pulsar.broker.loadbalance.SimpleBrokerStartTest created 13 new 
threads
   2023-07-13T06:02:54,883 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class 
org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadSchedulerTest 
created 10 new threads
   2023-07-13T06:27:27,595 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class 
org.apache.pulsar.broker.service.BacklogQuotaManagerTest created 17 new threads
   2023-07-13T06:24:54,267 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class org.apache.pulsar.broker.service.BrokerServiceTest 
created 37 new threads
   2023-07-13T06:31:26,617 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class 
org.apache.pulsar.broker.service.ReplicatorSubscriptionTest created 23 new 
threads
   2023-07-13T06:14:34,465 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class org.apache.pulsar.broker.service.ReplicatorTlsTest 
created 18 new threads
   2023-07-13T06:34:40,969 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class 
org.apache.pulsar.broker.transaction.buffer.TransactionStablePositionTest 
created 78 new threads
   2023-07-13T06:32:03,632 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class 
org.apache.pulsar.broker.service.BrokerBookieIsolationTest created 51 new 
threads
   2023-07-13T06:00:16,670 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class 
org.apache.pulsar.broker.loadbalance.impl.BundleSplitterTaskTest created 13 new 
threads
   2023-07-13T06:07:32,859 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class org.apache.pulsar.broker.service.PeerReplicatorTest 
created 24 new threads
   2023-07-13T06:05:18,690 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class org.apache.pulsar.broker.auth.AuthorizationTest created 
12 new threads
   2023-07-13T06:05:50,722 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class 
org.apache.pulsar.broker.loadbalance.SimpleLoadManagerImplTest created 51 new 
threads
   2023-07-13T06:04:49,333 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class 
org.apache.pulsar.broker.loadbalance.MultiBrokerLeaderElectionTest created 89 
new threads
   2023-07-13T06:10:48,796 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class 
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumersTest
 created 11 new threads
   2023-07-13T06:30:07,831 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class 
org.apache.pulsar.broker.service.ReplicatorTopicPoliciesTest created 22 new 
threads
   2023-07-13T06:31:28,204 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class 
org.apache.pulsar.broker.intercept.MangedLedgerInterceptorImplTest created 10 
new threads
   2023-07-13T06:20:06,450 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class org.apache.pulsar.broker.service.ClusterMigrationTest 
created 743 new threads
   2023-07-13T06:20:30,680 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class 
org.apache.pulsar.broker.service.schema.PartitionedTopicsSchemaTest created 16 
new threads
   2023-07-13T06:08:03,645 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class 
org.apache.pulsar.broker.service.ReplicatorRemoveClusterTest created 18 new 
threads
   2023-07-13T06:06:43,539 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class 
org.apache.pulsar.broker.service.ReplicatorAdminTlsWithKeyStoreTest created 25 
new threads
   2023-07-13T06:27:39,557 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class 
org.apache.pulsar.broker.service.MessageCumulativeAckTest created 62 new threads
   2023-07-13T06:29:35,866 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class org.apache.pulsar.broker.service.ServerCnxTest created 
849 new threads
   2023-07-13T06:05:18,567 - WARN  - [main:ThreadLeakDetectorListener@60] - 
Summary: Tests in class 

[GitHub] [pulsar] oneforalone added a comment to the discussion: Broker restarted due to `ZooKeeper client is disconnected`

2023-07-13 Thread via GitHub


GitHub user oneforalone added a comment to the discussion: Broker restarted due 
to `ZooKeeper client is disconnected`

@lhotari The problem remains the same, is that related with the slow fsync?

GitHub link: 
https://github.com/apache/pulsar/discussions/20773#discussioncomment-6436778


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



[GitHub] [pulsar] rob-kvietkauskas-ign edited a discussion: Functions worker configuration for bare-metal cluster

2023-07-13 Thread via GitHub


GitHub user rob-kvietkauskas-ign edited a discussion: Functions worker 
configuration for bare-metal cluster

Hello! :wave: 
Could someone assist me with advices/tips on function worker configuration 
in bare-metal cluster? I have a cluster of 6 machines (3 for brokers and 
bookies, 3 for zookeepers). Zookeeper cluster is functioning fine, same goes 
for Bookies (I successfully ran sanity check and _simpletest_). Brokers also 
work fine. I am trying to run functions workers with brokers, but after I 
perform setup steps described in documentation, I end up having bookkeeper 
(ledger allocation) errors, which in turn seem to be caused by incorrect 
configuration, every time I try to create (source or sink) connector. I have 
created an [issue](https://github.com/apache/pulsar/issues/20722), but haven't 
received any replies yet so I am trying my luck in Discussion section. I 
encounter same errors in different Pulsar versions. I have deployed four 
separate clusters – two running version 2.9.1. and the other two – running 
version 2.11.1. The result is the same in both cases.

GitHub link: https://github.com/apache/pulsar/discussions/20796


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



[GitHub] [pulsar] lhotari commented on issue #20673: Flaky-test: TransactionBufferCloseTest.deleteTopicCloseTransactionBufferTest

2023-07-13 Thread via GitHub


lhotari commented on issue #20673:
URL: https://github.com/apache/pulsar/issues/20673#issuecomment-1633924891

   @gaoran10 It looks like there's a resource leak somewhere. When the test 
times out there are 718  threads:
   
https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_dump
   
   Lots of threads with the same name, for example:
   
[pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47c3c3a20)awaiting
 notification on [ 
[0x0800337ab3f8](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x0800337ab3f8)
 ]

[pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47ca98f80)awaiting
 notification on [ 
[0x08cab0d0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x08cab0d0)
 ]

[pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47caa29b0)awaiting
 notification on [ 
[0x0800336f0e18](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x0800336f0e18)
 ]

[pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47cc88f60)awaiting
 notification on [ 
[0x08001c456320](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x08001c456320)
 ]

[pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47cd2fd00)awaiting
 notification on [ 
[0x08001c4453c8](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x08001c4453c8)
 ]

[pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47cd34020)awaiting
 notification on [ 
[0x0803f5e314f0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x0803f5e314f0)
 ]

[pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47cd38a20)awaiting
 notification on [ 
[0x0803f5e1e848](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x0803f5e1e848)
 ]

[pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47cd64200)awaiting
 notification on [ 
[0x0800167b9f60](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x0800167b9f60)
 ]

[pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47cd6cce0)awaiting
 notification on [ 
[0x0803f5e29468](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x0803f5e29468)
 ]

[pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47d6d9810)awaiting
 notification on [ 
[0x0803f5e35928](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x0803f5e35928)
 ]

[pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47d7382f0)awaiting
 notification on [ 
[0x0803f49f6c58](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x0803f49f6c58)
 ]

[pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47d9dd150)awaiting
 notification on [ 
[0x0803f5e1a4c8](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_sync_0x0803f5e1a4c8)
 ]

[pulsar-backlog-quota-checker-OrderedScheduler-0-0](https://jstack.review/?https://gist.github.com/lhotari/a4276e94f10c8f62cc0fd3c1257624f6#tda_1_threaddetails_0x7fe47da15b80)awaiting
 notification on [ 

[pulsar-client-go] branch master updated: [Fix][Producer] Stop block request even if Value and Payload are both set (#1052)

2023-07-13 Thread zike
This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new e45122c  [Fix][Producer] Stop block request even if Value and Payload 
are both set (#1052)
e45122c is described below

commit e45122c2defc5efd4efc493d0acef278a7ccfc01
Author: gunli <24350...@qq.com>
AuthorDate: Thu Jul 13 17:15:37 2023 +0800

[Fix][Producer] Stop block request even if Value and Payload are both set 
(#1052)

### Motivation
Currently, if `!p.options.DisableBlockIfQueueFull` and `msg.Value != nil && 
msg.Payload != nil`, request will be blocked forever 'cause `defer 
request.stopBlock()` is set up after the verify logic.
```go
if msg.Value != nil && msg.Payload != nil {
p.log.Error("Can not set Value and Payload both")
runCallback(request.callback, nil, request.msg, errors.New("can not 
set Value and Payload both"))
return
}

// The block chan must be closed when returned with exception
defer request.stopBlock()
```
Here is the PR to stop block request even if Value and Payload are both set

### Modifications

- pulsar/producer_partition.go


-

Co-authored-by: gunli 
---
 pulsar/producer_partition.go | 11 ++-
 pulsar/producer_test.go  | 15 +++
 2 files changed, 21 insertions(+), 5 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index dd45ff2..48411b4 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -481,11 +481,6 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
 
var schemaPayload []byte
var err error
-   if msg.Value != nil && msg.Payload != nil {
-   p.log.Error("Can not set Value and Payload both")
-   runCallback(request.callback, nil, request.msg, errors.New("can 
not set Value and Payload both"))
-   return
-   }
 
// The block chan must be closed when returned with exception
defer request.stopBlock()
@@ -1117,6 +1112,12 @@ func (p *partitionProducer) internalSendAsync(ctx 
context.Context, msg *Producer
return
}
 
+   if msg.Value != nil && msg.Payload != nil {
+   p.log.Error("Can not set Value and Payload both")
+   runCallback(callback, nil, msg, newError(InvalidMessage, "Can 
not set Value and Payload both"))
+   return
+   }
+
// Register transaction operation to transaction and the transaction 
coordinator.
var newCallback func(MessageID, *ProducerMessage, error)
var txn *transaction
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 11ff089..adbdc71 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -111,6 +111,12 @@ func TestSimpleProducer(t *testing.T) {
 
_, err = producer.Send(context.Background(), nil)
assert.NotNil(t, err)
+
+   _, err = producer.Send(context.Background(), {
+   Payload: []byte("hello"),
+   Value:   []byte("hello"),
+   })
+   assert.NotNil(t, err)
 }
 
 func TestProducerAsyncSend(t *testing.T) {
@@ -163,6 +169,15 @@ func TestProducerAsyncSend(t *testing.T) {
wg.Done()
})
wg.Wait()
+
+   wg.Add(1)
+   producer.SendAsync(context.Background(), {Payload: 
[]byte("hello"), Value: []byte("hello")},
+   func(id MessageID, m *ProducerMessage, e error) {
+   assert.NotNil(t, e)
+   assert.Nil(t, id)
+   wg.Done()
+   })
+   wg.Wait()
 }
 
 func TestProducerCompression(t *testing.T) {



[GitHub] [pulsar-client-go] RobertIndie merged pull request #1052: [Fix][Producer] Stop block request even if Value and Payload are both set

2023-07-13 Thread via GitHub


RobertIndie merged PR #1052:
URL: https://github.com/apache/pulsar-client-go/pull/1052


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] lhotari commented on issue #20673: Flaky-test: TransactionBufferCloseTest.deleteTopicCloseTransactionBufferTest

2023-07-13 Thread via GitHub


lhotari commented on issue #20673:
URL: https://github.com/apache/pulsar/issues/20673#issuecomment-1633812413

   again: 
https://github.com/apache/pulsar/actions/runs/5536635370/jobs/10110676512?pr=20623#step:10:1201


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] fabien-dlb added a comment to the discussion: Client-Server compatibility matrix

2023-07-13 Thread via GitHub


GitHub user fabien-dlb added a comment to the discussion: Client-Server 
compatibility matrix

Some more update reading [annoucement of pulsar 
3.0.0](https://pulsar.apache.org/blog/2023/05/02/announcing-apache-pulsar-3-0/#compatibility-between-releases):

> Before Pulsar 3.0, upgrades should be performed linearly through each feature 
> version. For example, when upgrading from 2.8 to 2.10, it is important to 
> upgrade to 2.9 before going to 2.10.

So this confirms that we need to have brokers going through all versions and I 
understand that from 2.7.x we need to go through 2.8.x on broker side then 
client side, then to 2.9.x on broker side then client side, etc ... until 
2.11.x from which version only we can update to 3.0.0 (note: still looking for 
confirmation that we can update from 2.x to 3.x and from what 2.x version(s) it 
is possible).

GitHub link: 
https://github.com/apache/pulsar/discussions/20109#discussioncomment-6435673


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



[GitHub] [pulsar] shibd commented on pull request #20795: [fix][io] Not restart instance when kafka source poll exception.

2023-07-13 Thread via GitHub


shibd commented on PR #20795:
URL: https://github.com/apache/pulsar/pull/20795#issuecomment-1633790526

   @Technoboy- @poorbarcode Thanks for the feedback.
   
   About this comment: 
https://github.com/apache/pulsar/pull/20795#discussion_r1261959072
   
   I refactor the start logic:
   1. Let `consumer.subscribe` run on the start thread, which can quickly fail 
when subscribe exception.
   2. Fix `InterruptedException` will be caught: when encountering an 
`InterruptedException` should be thrown it.
   
   PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[pulsar] annotated tag v2.10.5-candidate-1 updated (82c589fe97a -> 8aacaf9d16f)

2023-07-13 Thread xiangying
This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a change to annotated tag v2.10.5-candidate-1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


*** WARNING: tag v2.10.5-candidate-1 was modified! ***

from 82c589fe97a (commit)
  to 8aacaf9d16f (tag)
 tagging 82c589fe97a1a76d4a580845eb07e205a20a0356 (commit)
 replaces v2.10.4
  by xiangying
  on Thu Jul 13 14:57:11 2023 +0800

- Log -
Release v2.10.5-candidate-1
-BEGIN PGP SIGNATURE-

iQJJBAABCgAzFiEE+P0S7h/2fz2fR3D9AvkjT8MxZvcFAmSvoEcVHHhpYW5neWlu
Z0BhcGFjaGUub3JnAAoJEAL5I0/DMWb3WcgQAKD4vwyHwxfsQI5v6MbN0x+QkXAn
+FqJTbPttCOwmWoxfhFeErON29HIEe6Rxx6UrCQKKwl83sQw5IyyawbsRnVKP/tP
eMQBsaQ5nM1Nv4KhoRXh+j7P863pJag5J8WeDbYi6YrhVXjgma14ZjqXhL9mqv1z
tkS1IAq0AhH0/w61tebcASVOuSWYpkyHCx7EqWepQxDUEvsRb2rIEo+oUUa+cpAE
r3faKK8OABwVG8DKZtmz4qC5VZv70JMWaoVEdLquzLVRVNsIicPK3HwNK7UKnKEH
hEgAd4xEQLF1JpA5eqqEyuewSu8hgXXPyZoA40Z9Xo8V9qWUSoj8zjElQ8wIh3Ls
ZIbVDfnLG5mkomcIV3+epIQRTnX7/Gq+GXr03Cgdj5IdI5sEOyXLZLMuknfbNP9L
FdoMEsgeSf3bJWz2J9399VPBG6EJhxArmJMYXUIdDlu5roPnKfAUHOAuapsSeq2b
tZ2uZHLk7I1cGRIaBBx9nggjOMParFddmBbsfhwIwabgPv2cRrWaaAyof5FABRw5
xBq8iCbmozdia6Zow8YRzC5NlNs8IaOaPB5+VT7segeCD8/Sxo7sjXiuH+ixlvdN
ttOJLvQoT7bmMozfNbcx7SFlCiU+6Z5PCM+0wxDGAtBuybkAMdmL2NF0lAOUUZuj
Pd9iVSt1QrsgTrzo
=s+UB
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #20718: [feat][broker][PIP-278] Support pluggable topic compaction service - part2

2023-07-13 Thread via GitHub


BewareMyPower commented on code in PR #20718:
URL: https://github.com/apache/pulsar/pull/20718#discussion_r1262144922


##
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##
@@ -400,6 +400,16 @@ private static Pair 
extractKeyAndSize(RawMessage m) {
 }
 }
 
+protected List> 
extractIdsAndKeysAndSizeFromBatch(RawMessage msg)
+throws IOException {
+return RawBatchConverter.extractIdsAndKeysAndSize(msg);
+}
+
+protected Optional rebatchMessage(RawMessage msg, 
BiPredicate filter)
+throws IOException {
+return RawBatchConverter.rebatchMessage(msg, filter);
+}

Review Comment:
   Let's keep it `protected` for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar-client-cpp] BewareMyPower commented on pull request #300: [fix] Fix the assertion of `ConsumerWrapper`.receiveAtMost

2023-07-13 Thread via GitHub


BewareMyPower commented on PR #300:
URL: 
https://github.com/apache/pulsar-client-cpp/pull/300#issuecomment-1633722258

   Oh it's right. But `receiveAtLeast` is also wrong because this method cannot 
receive over N messages.
   
   The proper implementation of `receiveAtMost` might be:
   
   ```c++
   Result receiveAtMost(int numMessages) {
   Message msg;
   for (int i = 0; i < numMessages; i++) {
   auto result = consumer_.receive(msg, 3000);
   if (result != ResultOk) {
   return result;
   }
   messageIdList_.emplace_back(msg.getMessageId());
   }
   return ResultOk;
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] oneforalone added a comment to the discussion: Broker restarted due to `ZooKeeper client is disconnected`

2023-07-13 Thread via GitHub


GitHub user oneforalone added a comment to the discussion: Broker restarted due 
to `ZooKeeper client is disconnected`

Okay, appreciate for the suggestion. Does this slow fsync matters, would it 
effect the starting/deploying procedure or it would just slow down the message 
syncing?

GitHub link: 
https://github.com/apache/pulsar/discussions/20773#discussioncomment-6435113


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



[GitHub] [pulsar] oneforalone added a comment to the discussion: Broker restarted due to `ZooKeeper client is disconnected`

2023-07-13 Thread via GitHub


GitHub user oneforalone added a comment to the discussion: Broker restarted due 
to `ZooKeeper client is disconnected`

> The pulsar-all 3.0.0 image is invalid. It's actually Pulsar 2.11
So, what exact version should I use. Is that 2.9.5 ok?

GitHub link: 
https://github.com/apache/pulsar/discussions/20773#discussioncomment-6435092


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



[GitHub] [pulsar-client-cpp] RobertIndie commented on pull request #300: [fix] Fix the assertion of `ConsumerWrapper`.receiveAtMost

2023-07-13 Thread via GitHub


RobertIndie commented on PR #300:
URL: 
https://github.com/apache/pulsar-client-cpp/pull/300#issuecomment-1633696870

   > It's designed to be the current behavior. We can receive some messages 
first, then try to receive the rest messages.
   > You'd better add the assertion explicitly after calling receiveAtMost. 
From the references of receiveAtMost (in AcknowledgeTest), I think we don't 
need to assert there is no more messages
   
   Currently, the `receiveAtMost` doesn't implement the 'at most' guarantee. 
This would confuse the developer.
   I think we could have two options:
   1. Implement the at most guarantee for `receiveAtMost`
   2. Change it to `receiveAtLeast`
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[pulsar] branch branch-2.10 updated: Release 2.10.5

2023-07-13 Thread xiangying
This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
 new 82c589fe97a Release 2.10.5
 new c185ae4c0cf Merge remote-tracking branch 'origin/branch-2.10' into 
branch-2.10
82c589fe97a is described below

commit 82c589fe97a1a76d4a580845eb07e205a20a0356
Author: xiangying <1984997...@qq.com>
AuthorDate: Thu Jul 13 14:54:19 2023 +0800

Release 2.10.5
---
 bouncy-castle/bc/pom.xml  | 2 +-
 bouncy-castle/bcfips-include-test/pom.xml | 2 +-
 bouncy-castle/bcfips/pom.xml  | 2 +-
 bouncy-castle/pom.xml | 2 +-
 buildtools/pom.xml| 2 +-
 distribution/io/pom.xml   | 2 +-
 distribution/offloaders/pom.xml   | 2 +-
 distribution/pom.xml  | 2 +-
 distribution/server/pom.xml   | 2 +-
 docker/pom.xml| 2 +-
 docker/pulsar-all/pom.xml | 2 +-
 docker/pulsar/pom.xml | 2 +-
 jclouds-shaded/pom.xml| 2 +-
 kafka-connect-avro-converter-shaded/pom.xml   | 2 +-
 managed-ledger/pom.xml| 2 +-
 pom.xml   | 2 +-
 pulsar-broker-auth-athenz/pom.xml | 2 +-
 pulsar-broker-auth-sasl/pom.xml   | 2 +-
 pulsar-broker-common/pom.xml  | 2 +-
 pulsar-broker-shaded/pom.xml  | 2 +-
 pulsar-broker/pom.xml | 2 +-
 pulsar-client-1x-base/pom.xml | 2 +-
 pulsar-client-1x-base/pulsar-client-1x/pom.xml| 2 +-
 pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml | 2 +-
 pulsar-client-admin-api/pom.xml   | 2 +-
 pulsar-client-admin-shaded/pom.xml| 2 +-
 pulsar-client-admin/pom.xml   | 2 +-
 pulsar-client-all/pom.xml | 2 +-
 pulsar-client-api/pom.xml | 2 +-
 pulsar-client-auth-athenz/pom.xml | 2 +-
 pulsar-client-auth-sasl/pom.xml   | 2 +-
 pulsar-client-messagecrypto-bc/pom.xml| 2 +-
 pulsar-client-shaded/pom.xml  | 2 +-
 pulsar-client-tools-test/pom.xml  | 2 +-
 pulsar-client-tools/pom.xml   | 2 +-
 pulsar-client/pom.xml | 2 +-
 pulsar-common/pom.xml | 2 +-
 pulsar-config-validation/pom.xml  | 2 +-
 pulsar-functions/api-java/pom.xml | 2 +-
 pulsar-functions/instance/pom.xml | 2 +-
 pulsar-functions/java-examples/pom.xml| 2 +-
 pulsar-functions/localrun-shaded/pom.xml  | 2 +-
 pulsar-functions/localrun/pom.xml | 2 +-
 pulsar-functions/pom.xml  | 2 +-
 pulsar-functions/proto/pom.xml| 2 +-
 pulsar-functions/runtime-all/pom.xml  | 2 +-
 pulsar-functions/runtime/pom.xml  | 2 +-
 pulsar-functions/secrets/pom.xml  | 2 +-
 pulsar-functions/utils/pom.xml| 2 +-
 pulsar-functions/worker/pom.xml   | 2 +-
 pulsar-io/aerospike/pom.xml   | 2 +-
 pulsar-io/aws/pom.xml | 2 +-
 pulsar-io/batch-data-generator/pom.xml| 2 +-
 pulsar-io/batch-discovery-triggerers/pom.xml  | 2 +-
 pulsar-io/canal/pom.xml   | 2 +-
 pulsar-io/cassandra/pom.xml   | 2 +-
 pulsar-io/common/pom.xml  | 2 +-
 pulsar-io/core/pom.xml| 2 +-
 pulsar-io/data-generator/pom.xml  | 2 +-
 pulsar-io/debezium/core/pom.xml   | 2 +-
 pulsar-io/debezium/mongodb/pom.xml| 2 +-
 pulsar-io/debezium/mssql/pom.xml  | 2 +-
 pulsar-io/debezium/mysql/pom.xml  | 2 +-
 pulsar-io/debezium/oracle/pom.xml | 2 +-
 pulsar-io/debezium/pom.xml| 2 +-
 pulsar-io/debezium/postgres/pom.xml   | 2 +-
 pulsar-io/docs/pom.xml| 2 +-
 pulsar-io/dynamodb/pom.xml| 2 +-
 pulsar-io/elastic-search/pom.xml  | 2 +-
 pulsar-io/file/pom.xml| 2 +-
 pulsar-io/flume/pom.xml   | 2 +-
 pulsar-io/hbase/pom.xml

[GitHub] [pulsar] aloyszhang commented on a diff in pull request #20750: [fix][client] fix negative message re-delivery twice issue

2023-07-13 Thread via GitHub


aloyszhang commented on code in PR #20750:
URL: https://github.com/apache/pulsar/pull/20750#discussion_r1262092557


##
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java:
##
@@ -146,6 +146,9 @@ public void testNegativeAcks(boolean batching, boolean 
usePartitions, Subscripti
 consumer.negativeAcknowledge(msg);
 }
 
+assertTrue(consumer instanceof ConsumerBase);
+assertEquals(((ConsumerBase) 
consumer).getUnAckedMessageTracker().size(), 0);

Review Comment:
`ComsumerImpl` with  non-batched message is sure to pass this test.
   Problems only happen in two situations described in Motivation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] littlecatjianjiao closed issue #20754: [Bug] flink consume multi-partition topic error

2023-07-13 Thread via GitHub


littlecatjianjiao closed issue #20754: [Bug] flink consume multi-partition 
topic error
URL: https://github.com/apache/pulsar/issues/20754


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] littlecatjianjiao commented on issue #20754: [Bug] flink consume multi-partition topic error

2023-07-13 Thread via GitHub


littlecatjianjiao commented on issue #20754:
URL: https://github.com/apache/pulsar/issues/20754#issuecomment-1633660913

   > @cbornet Please confirm, but I think that this is a limitation in the 
flink job and it will only consume from pulsar in a single thread because input 
topic processing is in the job thread and not the task threads.
   Thank you for your reply. We have resolved this issue by upgrading the Flink 
version. It appears to be an issue with the pulsarsource provided by Flink, and 
I will close this issue.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[pulsar] branch branch-2.10 updated: [fix] [broker] Can not receive any messages after switch to standby cluster (#20767)

2023-07-13 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
 new 20adfe4854b [fix] [broker] Can not receive any messages after switch 
to standby cluster (#20767)
20adfe4854b is described below

commit 20adfe4854b5d6c98b0ca5d03eda7a714854759a
Author: fengyubiao 
AuthorDate: Thu Jul 13 09:51:45 2023 +0800

[fix] [broker] Can not receive any messages after switch to standby cluster 
(#20767)

(cherry picked from commit 465fac523da946553b09298e13dc7dfcecfb6c78)
---
 .../ReplicatedSubscriptionsController.java | 10 ++-
 .../broker/service/ReplicatorSubscriptionTest.java | 91 ++
 2 files changed, 98 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index 1e1245ed36b..cf1603788f1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -192,10 +192,14 @@ public class ReplicatedSubscriptionsController implements 
AutoCloseable, Topic.P
 sub.acknowledgeMessage(Collections.singletonList(pos), 
AckType.Cumulative, Collections.emptyMap());
 } else {
 // Subscription doesn't exist. We need to force the creation of 
the subscription in this cluster, because
-log.info("[{}][{}] Creating subscription at {}:{} after receiving 
update from replicated subcription",
+log.info("[{}][{}] Creating subscription at {}:{} after receiving 
update from replicated subscription",
 topic, update.getSubscriptionName(), 
updatedMessageId.getLedgerId(), pos);
-topic.createSubscription(update.getSubscriptionName(),
-InitialPosition.Latest, true /* replicateSubscriptionState 
*/, null);
+topic.createSubscription(update.getSubscriptionName(), 
InitialPosition.Earliest,
+true /* replicateSubscriptionState */, 
Collections.emptyMap())
+.thenAccept(subscriptionCreated -> {
+
subscriptionCreated.acknowledgeMessage(Collections.singletonList(pos),
+AckType.Cumulative, Collections.emptyMap());
+});
 }
 }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
index 046adaa5ec2..250d971b9fb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
@@ -24,10 +24,12 @@ import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.Map;
@@ -41,6 +43,7 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import 
org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -48,6 +51,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -154,6 +158,93 @@ public class ReplicatorSubscriptionTest extends 
ReplicatorTestBase {
 "messages don't match.");
 }
 
+@Test
+public void testReplicatedSubscribeAndSwitchToStandbyCluster() throws 
Exception {
+final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns_");
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace + "/tp_");
+final String subscriptionName = 

[pulsar] branch branch-2.10 updated: [improve] [broker] Add consumer-id into the log when doing subscribe. (#20568)

2023-07-13 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
 new e25d764eaea [improve] [broker] Add consumer-id into the log when doing 
subscribe. (#20568)
e25d764eaea is described below

commit e25d764eaea13b7435a915e9aabe8b05929026b1
Author: fengyubiao 
AuthorDate: Wed Jul 12 22:46:38 2023 +0800

[improve] [broker] Add consumer-id into the log when doing subscribe. 
(#20568)

- Since `cnx.address + consumerId` is the identifier of one consumer; add 
`consumer-id` into the log when doing subscribe.
- add a test to confirm that even if the error occurs when sending messages 
to the client, the consumption is still OK.
- print debug log if ack-command was discarded due to `ConsumerFuture is 
not complete.`
- print debug log if sending a message to the client is failed.

(cherry picked from commit a41ac49d9f30c415d87ce747393a16fa724cf4c9)
---
 .../org/apache/pulsar/broker/service/Consumer.java |  6 +++
 .../apache/pulsar/broker/service/ServerCnx.java|  9 +++-
 .../client/api/SimpleProducerConsumerTest.java | 50 ++
 3 files changed, 64 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index f2cd77e485e..8924b750eb6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -318,6 +318,12 @@ public class Consumer {
 msgOutCounter.add(totalMessages);
 bytesOutCounter.add(totalBytes);
 chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 
0);
+} else {
+if (log.isDebugEnabled()) {
+log.debug("[{}-{}] Sent messages to client fail by IO 
exception[{}], close the connection"
++ " immediately. Consumer: {}",  
topicName, subscription,
+status.cause() == null ? "" : 
status.cause().getMessage(), this.toString());
+}
 }
 });
 return writeAndFlushPromise;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c1045c3d14b..e369e7d7b68 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -994,7 +994,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 remoteAddress, getPrincipal());
 }
 
-log.info("[{}] Subscribing on topic {} / {}", remoteAddress, 
topicName, subscriptionName);
+log.info("[{}] Subscribing on topic {} / {}. consumerId: {}", 
this.ctx().channel().toString(),
+topicName, subscriptionName, consumerId);
 try {
 Metadata.validateMetadata(metadata,
 
service.getPulsar().getConfiguration().getMaxConsumerMetadataSize());
@@ -1573,6 +1574,12 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 }
 return null;
 });
+} else {
+if (log.isDebugEnabled()) {
+log.debug("Consumer future is not complete(not complete or 
error), but received command ack. so discard"
++ " this command. consumerId: {}, cnx: {}, 
messageIdCount: {}", ack.getConsumerId(),
+this.ctx().channel().toString(), 
ack.getMessageIdsCount());
+}
 }
 }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 16a56af2039..577b9d5a648 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -39,6 +39,8 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.Timeout;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -79,6 +81,8 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
 import org.apache.commons.lang3.RandomUtils;
 

[GitHub] [pulsar] poorbarcode commented on a diff in pull request #20795: [fix][io] Not restart instance when kafka source poll exception.

2023-07-13 Thread via GitHub


poorbarcode commented on code in PR #20795:
URL: https://github.com/apache/pulsar/pull/20795#discussion_r1262054448


##
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java:
##
@@ -190,14 +190,12 @@ public void start() {
 });
 runnerThread.setUncaughtExceptionHandler(
 (t, e) -> {
-new Thread(() -> {
-LOG.error("[{}] Error while consuming records", 
t.getName(), e);
-try {
-this.close();
-} catch (Exception ex) {
-LOG.error("[{}] Close kafka source error", 
t.getName(), e);
-}
-}, "Kafka Source Close Task Thread").start();
+LOG.error("[{}] Error while consuming records", 
t.getName(), e);
+try {
+notifyError((Exception) e);

Review Comment:
   Maybe throwable is not instance of `Exception`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org