[GitHub] [pulsar-site] tisonkun opened a new pull request, #561: cleanup we are no longer supporting DC/OS

2023-05-07 Thread via GitHub


tisonkun opened a new pull request, #561:
URL: https://github.com/apache/pulsar-site/pull/561

   This closes https://github.com/apache/pulsar/issues/11699.
   
   ### Documentation
   
   
   
   - [x] `doc` 
   - [ ] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] tisonkun commented on issue #11480: 服务重启无法注册上,pulsar-manager上无法查看namespaces

2023-05-07 Thread via GitHub


tisonkun commented on issue #11480:
URL: https://github.com/apache/pulsar/issues/11480#issuecomment-1537786306

   Close as stale. The development of pulsar-manager has been permanently moved 
to https://github.com/apache/pulsar-manager


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] tisonkun closed issue #11480: 服务重启无法注册上,pulsar-manager上无法查看namespaces

2023-05-07 Thread via GitHub


tisonkun closed issue #11480: 服务重启无法注册上,pulsar-manager上无法查看namespaces
URL: https://github.com/apache/pulsar/issues/11480


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] tisonkun closed issue #11043: Sources on Maven Central

2023-05-07 Thread via GitHub


tisonkun closed issue #11043: Sources on Maven Central
URL: https://github.com/apache/pulsar/issues/11043


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] tisonkun commented on issue #11043: Sources on Maven Central

2023-05-07 Thread via GitHub


tisonkun commented on issue #11043:
URL: https://github.com/apache/pulsar/issues/11043#issuecomment-1537784693

   This is resolved at https://github.com/apache/pulsar/pull/19956


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] tisonkun closed issue #10496: Latest Spark Connectors for Pulsar

2023-05-07 Thread via GitHub


tisonkun closed issue #10496: Latest Spark Connectors for Pulsar 
URL: https://github.com/apache/pulsar/issues/10496


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] tisonkun closed issue #10587: Support alias for the Schema and Message

2023-05-07 Thread via GitHub


tisonkun closed issue #10587: Support alias for the Schema and Message
URL: https://github.com/apache/pulsar/issues/10587


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] abhilashmandaliya created a discussion: Support alias for the Schema and Message

2023-05-07 Thread GitBox


GitHub user abhilashmandaliya created a discussion: Support alias for the 
Schema and Message

Schema and message should support different names/alias while performing SerDe. 
Like `ObjectMapper` has  `@JsonAlias` and `@JsonProperty`. Probably we can use 
the same feature as Pulsar Schemas use `ObjectMapper` internally.

GitHub link: https://github.com/apache/pulsar/discussions/20248


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



[GitHub] [pulsar] eolivelli added a comment to the discussion: Support alias for the Schema and Message

2023-05-07 Thread GitBox


GitHub user eolivelli added a comment to the discussion: Support alias for the 
Schema and Message

I see your problem.

but we have to find some way in order to abstract from Jackson Mapper because 
relying on Jackson Mapper will add an hard dependency for Pulsar.

in the case of Avro it is fine to support their annotations because we are 
supporting Apache Avro.

but in this case Pulsar supports  JSON in general and not Jackson Mapper, that 
is a specific library.

 


GitHub link: 
https://github.com/apache/pulsar/discussions/20248#discussioncomment-5834415


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



[GitHub] [pulsar] codelipenghui added a comment to the discussion: Support alias for the Schema and Message

2023-05-07 Thread GitBox


GitHub user codelipenghui added a comment to the discussion: Support alias for 
the Schema and Message

The issue had no activity for 30 days, mark with Stale label.

GitHub link: 
https://github.com/apache/pulsar/discussions/20248#discussioncomment-5834416


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



[GitHub] [pulsar] tisonkun closed issue #10602: [Tests] Broker startup sporadically deadlocks when ZK client thread gets blocked in SimpleLoadManagerImpl.updateRanking

2023-05-07 Thread via GitHub


tisonkun closed issue #10602: [Tests] Broker startup sporadically deadlocks 
when ZK client thread gets blocked in SimpleLoadManagerImpl.updateRanking
URL: https://github.com/apache/pulsar/issues/10602


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] tisonkun commented on issue #10602: [Tests] Broker startup sporadically deadlocks when ZK client thread gets blocked in SimpleLoadManagerImpl.updateRanking

2023-05-07 Thread via GitHub


tisonkun commented on issue #10602:
URL: https://github.com/apache/pulsar/issues/10602#issuecomment-1537782777

   Close as stale. IIRC there are a few related fixes and it's hard to check if 
this issue still remains on the master branch. Feel free to open a new issue if 
it's still relevant.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] tisonkun closed issue #10643: Error when following getting started with Java libraries

2023-05-07 Thread via GitHub


tisonkun closed issue #10643: Error when following getting started with Java 
libraries
URL: https://github.com/apache/pulsar/issues/10643


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] tisonkun commented on issue #10643: Error when following getting started with Java libraries

2023-05-07 Thread via GitHub


tisonkun commented on issue #10643:
URL: https://github.com/apache/pulsar/issues/10643#issuecomment-1537782097

   Close as stale. Please open a new issue if the getting started journey is 
still problematic on maintained versions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[pulsar] branch branch-2.11 updated: [improve] [broker] Skip split boundle if only one broker (#20190)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
 new 8346ad394aa [improve] [broker] Skip split boundle if only one broker 
(#20190)
8346ad394aa is described below

commit 8346ad394aa3dcb173d49d424ae77f3ef70bb0ed
Author: fengyubiao 
AuthorDate: Sat Apr 29 01:54:05 2023 +0800

[improve] [broker] Skip split boundle if only one broker (#20190)

Co-authored-by: Zixuan Liu 
(cherry picked from commit d135c4a115038dc61f8fe2d230cb1f0c02239f92)
---
 .../loadbalance/impl/ModularLoadManagerImpl.java   |  2 +-
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  9 
 .../pulsar/client/api/BrokerServiceLookupTest.java | 58 ++
 3 files changed, 68 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 1cd22e8233e..8876e2818a2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -748,7 +748,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
 public void checkNamespaceBundleSplit() {
 
 if (!conf.isLoadBalancerAutoBundleSplitEnabled() || 
pulsar.getLeaderElectionService() == null
-|| !pulsar.getLeaderElectionService().isLeader()) {
+|| !pulsar.getLeaderElectionService().isLeader() || 
knownBrokers.size() <= 1) {
 return;
 }
 final boolean unloadSplitBundles = 
pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index b234676bab3..2cbe7486d56 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -36,6 +36,7 @@ import java.nio.charset.StandardCharsets;
 import java.text.NumberFormat;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -73,6 +74,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.zookeeper.CreateMode;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -550,6 +552,10 @@ public class PrometheusMetricsTest extends BrokerTestBase {
 c1.acknowledge(c1.receive());
 }
 
+// Mock another broker to make split task work.
+String mockedBroker = "/loadbalance/brokers/127.0.0.1:0";
+mockZooKeeper.create(mockedBroker, new byte[]{0}, 
Collections.emptyList(), CreateMode.EPHEMERAL);
+
 pulsar.getBrokerService().updateRates();
 Awaitility.await().untilAsserted(() -> 
assertTrue(pulsar.getBrokerService().getBundleStats().size() > 0));
 ModularLoadManagerWrapper loadManager = 
(ModularLoadManagerWrapper)pulsar.getLoadManager().get();
@@ -574,6 +580,9 @@ public class PrometheusMetricsTest extends BrokerTestBase {
 assertTrue(metrics.containsKey("pulsar_lb_bandwidth_out_usage"));
 
 assertTrue(metrics.containsKey("pulsar_lb_bundles_split_total"));
+
+// cleanup.
+mockZooKeeper.delete(mockedBroker, 0);
 }
 
 @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index d1aa9c5bc8c..8b9691293a2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -66,6 +66,7 @@ import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManager;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -78,6 +79,7 @@ import 
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.common.naming.NamespaceBundle;

[GitHub] [pulsar] tisonkun commented on issue #20234: [Bug] JavaDoc (generated?) has Chinese text in it (as appears in Intellij)

2023-05-07 Thread via GitHub


tisonkun commented on issue #20234:
URL: https://github.com/apache/pulsar/issues/20234#issuecomment-1537695620

   You may check 
https://www.jetbrains.com/help/idea/working-with-code-documentation.html#generate-javadoc
 about setting locales. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[pulsar] branch branch-2.10 updated: [improve] [broker] Skip split boundle if only one broker (#20190)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
 new 7da78737808 [improve] [broker] Skip split boundle if only one broker 
(#20190)
7da78737808 is described below

commit 7da787378089e0142a5235a15d0915925c177edd
Author: fengyubiao 
AuthorDate: Sat Apr 29 01:54:05 2023 +0800

[improve] [broker] Skip split boundle if only one broker (#20190)

Co-authored-by: Zixuan Liu 
(cherry picked from commit d135c4a115038dc61f8fe2d230cb1f0c02239f92)
---
 .../loadbalance/impl/ModularLoadManagerImpl.java   |  2 +-
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 10 +++-
 .../pulsar/client/api/BrokerServiceLookupTest.java | 58 ++
 3 files changed, 68 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index d81f6949f43..293ff2760f0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -700,7 +700,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
 public void checkNamespaceBundleSplit() {
 
 if (!conf.isLoadBalancerAutoBundleSplitEnabled() || 
pulsar.getLeaderElectionService() == null
-|| !pulsar.getLeaderElectionService().isLeader()) {
+|| !pulsar.getLeaderElectionService().isLeader() || 
loadData.getBrokerData().size() <= 1) {
 return;
 }
 final boolean unloadSplitBundles = 
pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index c99717e676b..a01f5ee4edb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -36,6 +36,7 @@ import java.nio.charset.StandardCharsets;
 import java.text.NumberFormat;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -73,6 +74,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.zookeeper.CreateMode;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -547,6 +549,10 @@ public class PrometheusMetricsTest extends BrokerTestBase {
 c1.acknowledge(c1.receive());
 }
 
+// Mock another broker to make split task work.
+String mockedBroker = "/loadbalance/brokers/127.0.0.1:0";
+mockZooKeeper.create(mockedBroker, new byte[]{0}, 
Collections.emptyList(), CreateMode.EPHEMERAL);
+
 pulsar.getBrokerService().updateRates();
 Awaitility.await().untilAsserted(() -> 
assertTrue(pulsar.getBrokerService().getBundleStats().size() > 0));
 ModularLoadManagerWrapper loadManager = 
(ModularLoadManagerWrapper)pulsar.getLoadManager().get();
@@ -570,7 +576,9 @@ public class PrometheusMetricsTest extends BrokerTestBase {
 assertTrue(metrics.containsKey("pulsar_lb_bandwidth_in_usage"));
 assertTrue(metrics.containsKey("pulsar_lb_bandwidth_out_usage"));
 
-assertTrue(metrics.containsKey("pulsar_lb_bundles_split_count"));
+assertTrue(metrics.containsKey("pulsar_lb_bundles_split_total"));
+
+mockZooKeeper.delete(mockedBroker, 0);
 }
 
 @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index e3bb6a92dc0..396fda82b59 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -66,6 +66,7 @@ import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManager;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -78,6 +79,7 @@ import 
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
 import 

[GitHub] [pulsar] coderzc closed pull request #20176: [fix][broker] Fix `RoaringBitmap.contains` can't check value 65535

2023-05-07 Thread via GitHub


coderzc closed pull request #20176: [fix][broker] Fix `RoaringBitmap.contains` 
can't check value 65535
URL: https://github.com/apache/pulsar/pull/20176


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] tisonkun commented on issue #20234: [Bug] JavaDoc (generated?) has Chinese text in it (as appears in Intellij)

2023-05-07 Thread via GitHub


tisonkun commented on issue #20234:
URL: https://github.com/apache/pulsar/issues/20234#issuecomment-1537689188

   Cannot reproduce. It seems an IDE settings issue, especially the Chinese 
content is not API content but the display template.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[pulsar] branch branch-2.10 updated: [fix][txn] Fix transaction is not aborted when send or ACK failed (#20240)

2023-05-07 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
 new f6eee8de974 [fix][txn] Fix transaction is not aborted when send or ACK 
failed (#20240)
f6eee8de974 is described below

commit f6eee8de9748789d70dde6cb23b19d70cb1c029b
Author: Yunze Xu 
AuthorDate: Mon May 8 11:00:23 2023 +0800

[fix][txn] Fix transaction is not aborted when send or ACK failed (#20240)
---
 .../broker/transaction/TransactionProduceTest.java | 26 +++
 .../client/impl/transaction/TransactionImpl.java   | 88 ++
 2 files changed, 67 insertions(+), 47 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 0d1bbda4568..7ec36214047 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import lombok.Cleanup;
@@ -43,6 +45,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
@@ -51,6 +54,7 @@ import org.apache.pulsar.common.api.proto.MarkerType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -370,4 +374,26 @@ public class TransactionProduceTest extends 
TransactionTestBase {
 }
 
 
+@Test
+public void testCommitFailure() throws Exception {
+Transaction txn = pulsarClient.newTransaction().build().get();
+final String topic = NAMESPACE1 + "/test-commit-failure";
+@Cleanup
+final Producer producer = 
pulsarClient.newProducer().topic(topic).create();
+producer.newMessage(txn).value(new byte[1024 * 1024 * 10]).sendAsync();
+try {
+txn.commit().get();
+Assert.fail();
+} catch (ExecutionException e) {
+Assert.assertTrue(e.getCause() instanceof 
PulsarClientException.InvalidMessageException);
+Assert.assertEquals(txn.getState(), Transaction.State.ABORTED);
+}
+try {
+
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txn.getTxnID())
+.getNow(null);
+Assert.fail();
+} catch (CompletionException e) {
+Assert.assertTrue(e.getCause() instanceof 
CoordinatorException.TransactionNotFoundException);
+}
+}
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index fc9cdf501f7..554c47257b9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -156,12 +157,13 @@ public class TransactionImpl implements Transaction , 
TimerTask {
 @Override
 public CompletableFuture commit() {
 timeout.cancel();
-return checkIfOpenOrCommitting().thenCompose((value) -> {
+return checkState(State.OPEN, State.COMMITTING).thenCompose((value) -> 
{
 CompletableFuture commitFuture = new CompletableFuture<>();
 this.state = State.COMMITTING;
 allOpComplete().whenComplete((v, e) -> {
 if (e != null) {
-abort().whenComplete((vx, ex) -> 
commitFuture.completeExceptionally(e));
+checkState(State.COMMITTING).thenCompose(__ -> 

[GitHub] [pulsar] Technoboy- merged pull request #20240: [fix][txn][branch-2.10] Fix transaction is not aborted when send or ACK failed

2023-05-07 Thread via GitHub


Technoboy- merged PR #20240:
URL: https://github.com/apache/pulsar/pull/20240


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar-client-node] shibd commented on a diff in pull request #322: [feat]: Support client.getPartitionsForTopic (#320)

2023-05-07 Thread via GitHub


shibd commented on code in PR #322:
URL: 
https://github.com/apache/pulsar-client-node/pull/322#discussion_r1186980916


##
tests/end_to_end.test.js:
##
@@ -1242,5 +1242,21 @@ const Pulsar = require('../index.js');
   await consumer.close();
   await client.close();
 });
+
+test('Client/getPartitionsForTopic', async () => {

Review Comment:
   This test should belong `client.test.js`.



##
tests/end_to_end.test.js:
##
@@ -1242,5 +1242,21 @@ const Pulsar = require('../index.js');
   await consumer.close();
   await client.close();
 });
+
+test('Client/getPartitionsForTopic', async () => {
+  const client = new Pulsar.Client({
+serviceUrl: 'pulsar://localhost:6650',
+operationTimeoutSeconds: 30,
+  });
+
+  const topic = 'persistent://public/default/get-partitions-for-topic';
+  await client.createProducer({

Review Comment:
   You can use admin [REST 
API](https://pulsar.apache.org/admin-rest-api/?version=3.0.0#) to create 
specified partitions topic, and then to assert `getPartitionsForTopic`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] codecov-commenter commented on pull request #20233: [fix][broker] Fix the behavior of delayed message in Key_Shared mode

2023-05-07 Thread via GitHub


codecov-commenter commented on PR #20233:
URL: https://github.com/apache/pulsar/pull/20233#issuecomment-1537660680

   ## 
[Codecov](https://app.codecov.io/gh/apache/pulsar/pull/20233?src=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation)
 Report
   > Merging 
[#20233](https://app.codecov.io/gh/apache/pulsar/pull/20233?src=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation)
 (f4040d0) into 
[master](https://app.codecov.io/gh/apache/pulsar/commit/0dd238abf3798ba1f5b83182c1c4fb36e6ba6511?el=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation)
 (0dd238a) will **increase** coverage by `38.44%`.
   > The diff coverage is `80.00%`.
   
   [![Impacted file tree 
graph](https://app.codecov.io/gh/apache/pulsar/pull/20233/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation)](https://app.codecov.io/gh/apache/pulsar/pull/20233?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation)
   
   ```diff
   @@  Coverage Diff  @@
   ## master   #20233   +/-   ##
   =
   + Coverage 34.48%   72.93%   +38.44% 
   - Complexity1253731964+19427 
   =
 Files  1614 1868  +254 
 Lines126170   138591+12421 
 Branches  1377115246 +1475 
   =
   + Hits  43509   101076+57567 
   + Misses7705329470-47583 
   - Partials   5608 8045 +2437 
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | inttests | `24.11% <60.00%> (-0.02%)` | :arrow_down: |
   | systests | `24.80% <40.00%> (?)` | |
   | unittests | `72.22% <80.00%> (+39.11%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   | [Impacted 
Files](https://app.codecov.io/gh/apache/pulsar/pull/20233?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation)
 | Coverage Δ | |
   |---|---|---|
   | 
[...mon/policies/data/stats/SubscriptionStatsImpl.java](https://app.codecov.io/gh/apache/pulsar/pull/20233?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNvbW1vbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbW1vbi9wb2xpY2llcy9kYXRhL3N0YXRzL1N1YnNjcmlwdGlvblN0YXRzSW1wbC5qYXZh)
 | `69.56% <50.00%> (-0.35%)` | :arrow_down: |
   | 
[...sistent/PersistentDispatcherMultipleConsumers.java](https://app.codecov.io/gh/apache/pulsar/pull/20233?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvUGVyc2lzdGVudERpc3BhdGNoZXJNdWx0aXBsZUNvbnN1bWVycy5qYXZh)
 | `76.63% <100.00%> (+20.26%)` | :arrow_up: |
   
   ... and [1510 files with indirect coverage 
changes](https://app.codecov.io/gh/apache/pulsar/pull/20233/indirect-changes?src=pr=tree-more_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar-client-node] XXXMrG commented on pull request #322: [feat]: Support client.getPartitionsForTopic (#320)

2023-05-07 Thread via GitHub


XXXMrG commented on PR #322:
URL: 
https://github.com/apache/pulsar-client-node/pull/322#issuecomment-1537652243

   Done. Would you mind giving my code a quick review? @shibd 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] poorbarcode commented on a diff in pull request #20235: [fix][txn] Implement compatibility for transaction buffer segmented snapshot feature upgrade

2023-05-07 Thread via GitHub


poorbarcode commented on code in PR #20235:
URL: https://github.com/apache/pulsar/pull/20235#discussion_r1186970727


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java:
##
@@ -265,7 +266,7 @@ public CompletableFuture 
recoverFromSnapshot() {
 PositionImpl finalStartReadCursorPosition = 
startReadCursorPosition;
 TransactionBufferSnapshotIndexes 
finalPersistentSnapshotIndexes = persistentSnapshotIndexes;
 if (persistentSnapshotIndexes == null) {

Review Comment:
   After turning on the feature `transactionBufferSegmentedSnapshotEnabled,` if 
users create a new topic, the method `recoverOldSnapshot` will trigger the 
creation of the topic `__transaction_buffer_snapshot`, but it is not needed, 
right?
   
   And can we add a test to guarantee this issue will not occur?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] tisonkun commented on a diff in pull request #20242: [fix][client] Java Client's Seek Logic Not Threadsafe #1

2023-05-07 Thread via GitHub


tisonkun commented on code in PR #20242:
URL: https://github.com/apache/pulsar/pull/20242#discussion_r1186971096


##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##
@@ -2153,9 +2153,14 @@ private CompletableFuture seekAsyncInternal(long 
requestId, ByteBuf seek,
 
 MessageIdAdv originSeekMessageId = seekMessageId;
 seekMessageId = (MessageIdAdv) seekId;
-duringSeek.set(true);
+
+if (!duringSeek.compareAndSet(false, true)) {
+log.warn("[{}][{}] Attempting to seek operation that is already in 
progress, cancelling {}", 
+topic, subscription, seekBy);
+seekFuture.cancel(true);

Review Comment:
   After a closer look `f.cancel(true)` is the same as 
`f.completeExceptionally(new CancellationException())`. So either way should be 
OK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[pulsar] branch branch-2.9 updated: [improve][build] Upgrade dependencies to reduce CVE (#20228)

2023-05-07 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
 new 57b18e4f182 [improve][build] Upgrade dependencies to reduce CVE 
(#20228)
57b18e4f182 is described below

commit 57b18e4f1825470a4816a56c69d2da7895980e49
Author: Jiwei Guo 
AuthorDate: Mon May 8 10:00:16 2023 +0800

[improve][build] Upgrade dependencies to reduce CVE (#20228)
---
 distribution/server/src/assemble/LICENSE.bin.txt   | 70 +++---
 pom.xml| 21 ---
 pulsar-io/canal/pom.xml| 43 -
 pulsar-io/flume/pom.xml|  8 ++-
 pulsar-io/hdfs2/pom.xml|  2 +-
 .../apache/pulsar/io/hdfs2/AbstractHdfsConfig.java |  2 +-
 .../pulsar/io/hdfs2/AbstractHdfsConnector.java |  2 +-
 .../pulsar/io/hdfs2/sink/HdfsAbstractSink.java |  2 +-
 .../pulsar/io/hdfs2/sink/HdfsSinkConfig.java   |  2 +-
 pulsar-io/hdfs3/pom.xml|  2 +-
 .../apache/pulsar/io/hdfs3/AbstractHdfsConfig.java |  2 +-
 .../pulsar/io/hdfs3/AbstractHdfsConnector.java |  2 +-
 .../pulsar/io/hdfs3/sink/HdfsAbstractSink.java |  2 +-
 .../pulsar/io/hdfs3/sink/HdfsSinkConfig.java   |  2 +-
 pulsar-sql/presto-distribution/LICENSE | 64 ++--
 pulsar-sql/presto-pulsar/pom.xml   | 22 +++
 tiered-storage/file-system/pom.xml |  9 +++
 17 files changed, 171 insertions(+), 86 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index 7803532c882..9395cb42829 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -312,14 +312,14 @@ The Apache Software License, Version 2.0
  * JCommander -- com.beust-jcommander-1.78.jar
  * High Performance Primitive Collections for Java -- 
com.carrotsearch-hppc-0.7.3.jar
  * Jackson
- - com.fasterxml.jackson.core-jackson-annotations-2.13.4.jar
- - com.fasterxml.jackson.core-jackson-core-2.13.4.jar
- - com.fasterxml.jackson.core-jackson-databind-2.13.4.2.jar
- - com.fasterxml.jackson.dataformat-jackson-dataformat-yaml-2.13.4.jar
- - com.fasterxml.jackson.jaxrs-jackson-jaxrs-base-2.13.4.jar
- - com.fasterxml.jackson.jaxrs-jackson-jaxrs-json-provider-2.13.4.jar
- - com.fasterxml.jackson.module-jackson-module-jaxb-annotations-2.13.4.jar
- - com.fasterxml.jackson.module-jackson-module-jsonSchema-2.13.4.jar
+ - com.fasterxml.jackson.core-jackson-annotations-2.14.2.jar
+ - com.fasterxml.jackson.core-jackson-core-2.14.2.jar
+ - com.fasterxml.jackson.core-jackson-databind-2.14.2.jar
+ - com.fasterxml.jackson.dataformat-jackson-dataformat-yaml-2.14.2.jar
+ - com.fasterxml.jackson.jaxrs-jackson-jaxrs-base-2.14.2.jar
+ - com.fasterxml.jackson.jaxrs-jackson-jaxrs-json-provider-2.14.2.jar
+ - com.fasterxml.jackson.module-jackson-module-jaxb-annotations-2.14.2.jar
+ - com.fasterxml.jackson.module-jackson-module-jsonSchema-2.14.2.jar
  * Caffeine -- com.github.ben-manes.caffeine-caffeine-2.9.1.jar
  * Conscrypt -- org.conscrypt-conscrypt-openjdk-uber-2.5.2.jar
  * Proto Google Common Protos -- 
com.google.api.grpc-proto-google-common-protos-2.0.1.jar
@@ -334,9 +334,9 @@ The Apache Software License, Version 2.0
  * J2ObjC Annotations -- com.google.j2objc-j2objc-annotations-1.3.jar
  * Netty Reactive Streams -- 
com.typesafe.netty-netty-reactive-streams-2.0.6.jar
  * Swagger
-- io.swagger-swagger-annotations-1.6.2.jar
-- io.swagger-swagger-core-1.6.2.jar
-- io.swagger-swagger-models-1.6.2.jar
+- io.swagger-swagger-annotations-1.6.10.jar
+- io.swagger-swagger-core-1.6.10.jar
+- io.swagger-swagger-models-1.6.10.jar
  * DataSketches
 - com.yahoo.datasketches-memory-0.8.3.jar
 - com.yahoo.datasketches-sketches-core-0.8.3.jar
@@ -430,26 +430,26 @@ The Apache Software License, Version 2.0
 - org.asynchttpclient-async-http-client-2.12.1.jar
 - org.asynchttpclient-async-http-client-netty-utils-2.12.1.jar
  * Jetty
-- org.eclipse.jetty-jetty-client-9.4.48.v20220622.jar
-- org.eclipse.jetty-jetty-continuation-9.4.48.v20220622.jar
-- org.eclipse.jetty-jetty-http-9.4.48.v20220622.jar
-- org.eclipse.jetty-jetty-io-9.4.48.v20220622.jar
-- org.eclipse.jetty-jetty-proxy-9.4.48.v20220622.jar
-- org.eclipse.jetty-jetty-security-9.4.48.v20220622.jar
-- org.eclipse.jetty-jetty-server-9.4.48.v20220622.jar
-- org.eclipse.jetty-jetty-servlet-9.4.48.v20220622.jar
-- org.eclipse.jetty-jetty-servlets-9.4.48.v20220622.jar
-- org.eclipse.jetty-jetty-util-9.4.48.v20220622.jar
-- org.eclipse.jetty-jetty-util-ajax-9.4.48.v20220622.jar
-- 

[GitHub] [pulsar] Technoboy- merged pull request #20228: [improve][build] Upgrade dependencies to reduce CVE

2023-05-07 Thread via GitHub


Technoboy- merged PR #20228:
URL: https://github.com/apache/pulsar/pull/20228


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] poorbarcode commented on issue #16859: [broker] Cursor status has always been SwitchingLedger and pendingMarkDeleteOps has accumulated tens of thousands of requests

2023-05-07 Thread via GitHub


poorbarcode commented on issue #16859:
URL: https://github.com/apache/pulsar/issues/16859#issuecomment-1537631227

   This issue might be fixed by https://github.com/apache/pulsar/pull/17971


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] poorbarcode closed issue #16859: [broker] Cursor status has always been SwitchingLedger and pendingMarkDeleteOps has accumulated tens of thousands of requests

2023-05-07 Thread via GitHub


poorbarcode closed issue #16859: [broker] Cursor status has always been 
SwitchingLedger and  pendingMarkDeleteOps has accumulated tens of thousands of 
requests
URL: https://github.com/apache/pulsar/issues/16859


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on issue #20033: PIP-263: Just auto-create no-partitioned DLQ And Prevent auto-create a DLQ for a DLQ

2023-05-07 Thread via GitHub


github-actions[bot] commented on issue #20033:
URL: https://github.com/apache/pulsar/issues/20033#issuecomment-1537628175

   The issue had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[pulsar-site] branch 2.10.4-release-note deleted (was 4022dfe26d8)

2023-05-07 Thread tison
This is an automated email from the ASF dual-hosted git repository.

tison pushed a change to branch 2.10.4-release-note
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git


 was 4022dfe26d8 release blog

The revisions that were on this branch are still contained in
other references; therefore, this change does not discard any commits
from the repository.



[pulsar-site] branch main updated: Release blog 2.10.4 (#553)

2023-05-07 Thread tison
This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git


The following commit(s) were added to refs/heads/main by this push:
 new ed3060e8a5b Release blog 2.10.4 (#553)
ed3060e8a5b is described below

commit ed3060e8a5bd585b3926df489a168effa5c520a7
Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com>
AuthorDate: Mon May 8 09:51:01 2023 +0800

Release blog 2.10.4 (#553)

* release blog

* release blog
---
 blog/2023-04-19-Apache-Pulsar-2-10-4.md | 61 +
 data/release-pulsar.js  |  2 +-
 2 files changed, 62 insertions(+), 1 deletion(-)

diff --git a/blog/2023-04-19-Apache-Pulsar-2-10-4.md 
b/blog/2023-04-19-Apache-Pulsar-2-10-4.md
new file mode 100644
index 000..74d52a5cbd5
--- /dev/null
+++ b/blog/2023-04-19-Apache-Pulsar-2-10-4.md
@@ -0,0 +1,61 @@
+---
+title: "What’s New in Apache Pulsar 2.10.4"
+date: 2023-04-19
+author: "liangyepianzhou, Anonymitaet, momo-jun"
+---
+
+The Apache Pulsar community releases version 2.10.4! 37 contributors provided 
improvements and bug fixes that delivered 12 commits. Thanks for all your 
contributions.
+
+This blog walks through the most noteworthy changes. For the complete list, 
including all feature enhancements and bug fixes, check out the [Pulsar 2.10.4 
Release 
Notes](https://pulsar.apache.org/release-notes/versioned/pulsar-2.10.4/).
+
+
+
+### Improve performance in Key_Shared subscriptions 
([PR-19167](https://github.com/apache/pulsar/pull/19167))
+
+ Issue
+In earlier versions, Key_Shared subscriptions suffered from performance issues 
due to the lack of a ref count for sticky hash.
+
+ Resolution
+A significant performance improvement has been made to the Key_Shared 
subscriptions by adding a ref count for sticky hash. This optimization reduces 
the time required to check if messages contain the hash from a given sticky 
hash set, resulting in improved performance and efficiency.
+
+### AbstractBatchedMetadataStore - Use AlreadyClosedException instead of 
IllegalStateException ([PR-19284](https://github.com/apache/pulsar/pull/19284))
+
+ Issue
+In earlier versions, when the broker was shutting down, the 
`AbstractBatchedMetadataStore` would complete pending operations with a generic 
`IllegalStateException`. However, code dependent on the `MetadataStore` usually 
expected instances of `MetadataStoreException` and might not react properly to 
this error.
+
+ Resolution
+This PR improves the error handling during broker shutdown by completing 
pending operations with an `AlreadyClosedException` instead of 
`IllegalStateException`. This ensures that the dependent code responds more 
appropriately to the error, even if the broker is shutting down.
+
+### Fix open cursor with null-initialPosition, resulting in the earliest 
position ([PR-18416](https://github.com/apache/pulsar/pull/18416))
+
+ Issue
+In earlier versions, called `ledger.openCursor("xxx", null)` with a null 
`initialPosition` parameter would result in the cursor being set to the 
earliest position, causing unexpected behavior. The root cause of this issue 
was due to incorrect handling of the `initialPosition` in 
`ManagedLedgerImpl.java`.
+
+
+ Resolution
+This PR fixes the issue by ensuring that the correct initial position is used 
when opening a cursor with a null `initialPosition`. The code now sets the 
cursor to the latest position, as expected.
+
+
+### Add isActive in ManagedCursorImpl 
([PR-19341](https://github.com/apache/pulsar/pull/19341))
+
+ Issue
+In previous versions, when there were many concurrent subscriptions in a 
topic, broker performance would degrade due to many io-threads waiting for the 
lock, synchronized (activeCursors) while calling checkBackloggedCursors.
+
+ Resolution
+This PR adds an `isActive` variable in `ManagedCursorImpl` to minimize the 
access to `activeCursors` in `ManagedLedgerImpl,` which reduces the contention 
on the lock and improves the broker performance with many concurrent 
subscriptions.
+
+### Support deleting partitioned topics with the keyword -partition- 
([PR-19230](https://github.com/apache/pulsar/pull/19230))
+
+ Issue
+In earlier versions, although users could create partitioned topics using the 
client when partitioned type auto-creation was enabled, there was no support 
for deleting these partitioned topics.
+
+ Resolution
+This PR adds support for deleting partitioned topics using the keyword 
`-partition-`, making it easier for users to manage their partitioned topics.
+
+# What’s Next?
+
+If you are interested in learning more about Pulsar 2.10.4, you can 
[download](https://pulsar.apache.org/download/) and try it out now!
+
+For more information about the Apache Pulsar project and current progress, 
visit
+the [Pulsar website](https://pulsar.apache.org), follow the project on 

[GitHub] [pulsar-site] tisonkun merged pull request #553: Release blog 2.10.4

2023-05-07 Thread via GitHub


tisonkun merged PR #553:
URL: https://github.com/apache/pulsar-site/pull/553


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar-site] momo-jun commented on pull request #555: Document OpenID Connect Auth Provider

2023-05-07 Thread via GitHub


momo-jun commented on PR #555:
URL: https://github.com/apache/pulsar-site/pull/555#issuecomment-1537618192

   IIUC, this PR also adds docs for https://github.com/apache/pulsar/pull/19888.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar-site] Anonymitaet commented on pull request #557: Improve OIDC client authentication docs

2023-05-07 Thread via GitHub


Anonymitaet commented on PR #557:
URL: https://github.com/apache/pulsar-site/pull/557#issuecomment-1537613871

   Thanks for your contribution!
   
   Have you previewed your changes and ensured everything goes as expected? 
   
   If not, please [preview your changes 
locally](https://pulsar.apache.org/contribute/document-preview/#preview-changes)
 and attach the screenshots to this PR. In this way, you can get your PR merged 
more quickly. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar-site] Anonymitaet commented on a diff in pull request #558: Minor improvements to OIDC docs

2023-05-07 Thread via GitHub


Anonymitaet commented on code in PR #558:
URL: https://github.com/apache/pulsar-site/pull/558#discussion_r1186952146


##
docs/security-openid-connect.md:
##
@@ -21,9 +25,9 @@ After authenticating with the Identity Provider, the Pulsar 
client gets an acces
 7. When token validation is successful, the Pulsar Server extracts the `sub` 
claim from the token (or the configured `openIDRoleClaim`) and uses it as the 
principal for authorization.
 8. When the token expires, the Pulsar Server challenges the client to 
re-authenticate with the Identity Provider and provide a new access token. If 
the client fails to re-authenticate, the Pulsar Server closes the connection.
 
-## Enable OpenID Connect Authentication in the Brokers, Proxies, and WebSocket 
Proxies
+## Enable OpenID Connect Authentication in the Broker and Proxy
 
-To configure Pulsar Servers to authenticate clients using OpenID Connect, add 
the following parameters to the `conf/broker.conf`, the `conf/proxy.conf`, and 
the `conf/websocket.conf` files. If you use a standalone Pulsar, you need to 
add these parameters to the `conf/standalone.conf` file:
+To configure Pulsar Servers to authenticate clients using OpenID Connect, add 
the following parameters to the `conf/broker.conf` and the `conf/proxy.conf`. 
If you use a standalone Pulsar, you need to add these parameters to the 
`conf/standalone.conf` file:

Review Comment:
   ```suggestion
   To configure Pulsar servers to authenticate clients using OpenID Connect, 
add the following parameters to the `conf/broker.conf` and the 
`conf/proxy.conf`. If you use a standalone Pulsar, add these parameters to the 
`conf/standalone.conf` file:
   ```



##
docs/security-openid-connect.md:
##
@@ -8,6 +8,10 @@ Apache Pulsar supports authenticating clients using [OpenID 
Connect](https://ope
 
 The source code for the OpenID Connect implementation is in the 
[pulsar-broker-auth-oidc](https://github.com/apache/pulsar/blob/master/pulsar-broker-auth-oidc/)
 submodule in the Apache Pulsar git repo.
 
+:::note
+Pulsar's OpenID Connect integration was introduced in Pulsar 3.0.0. As always, 
if you encounter any issues, please ask questions on Pulsar channels and open 
issues in GitHub.

Review Comment:
   ```suggestion
   Pulsar's OpenID Connect integration is available from 3.0.0. 
   ```
   
   > As always, if you encounter any issues, please ask questions on Pulsar 
channels and open issues in GitHub.
   
   We do not need to add this because:
   1) it's a common practice for developers
   2) it's applicable for all features/improvements/... If we add this here, we 
need to add this everywhere
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar-site] momo-jun commented on pull request #543: Add doc for transform functions before sinks

2023-05-07 Thread via GitHub


momo-jun commented on PR #543:
URL: https://github.com/apache/pulsar-site/pull/543#issuecomment-1537607778

   This PR adds docs for https://github.com/apache/pulsar/pull/16740. 
   CC @Anonymitaet to review and track.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[pulsar-site] branch main updated: [improve][doc] Update links for new Client/Feature Matrix page (#546)

2023-05-07 Thread junma
This is an automated email from the ASF dual-hosted git repository.

junma pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git


The following commit(s) were added to refs/heads/main by this push:
 new db62c2ca3ed [improve][doc] Update links for new Client/Feature Matrix 
page (#546)
db62c2ca3ed is described below

commit db62c2ca3ed26e00ad683bb6bfbebea97fe23989
Author: Jun Ma <60642177+momo-...@users.noreply.github.com>
AuthorDate: Mon May 8 09:12:05 2023 +0800

[improve][doc] Update links for new Client/Feature Matrix page (#546)

* Update sidebars.json

* add the new client feature matrix into the navigation and update 
reference links

* Update sidebars.json

* Update the contribution guide

* fix
---
 contribute/document-contribution.md | 24 +---
 contribute/site-intro.md| 26 +++---
 docs/client-libraries-cpp.md|  4 ++--
 docs/client-libraries-dotnet.md |  4 ++--
 docs/client-libraries-go.md |  4 ++--
 docs/client-libraries-java.md   |  4 ++--
 docs/client-libraries-node.md   |  4 ++--
 docs/client-libraries-python.md |  4 ++--
 docs/client-libraries-websocket.md  |  2 +-
 docs/client-libraries.md|  2 +-
 sidebars.json   | 11 ---
 11 files changed, 50 insertions(+), 39 deletions(-)

diff --git a/contribute/document-contribution.md 
b/contribute/document-contribution.md
index f99d410b0f0..cbe5d84fe22 100644
--- a/contribute/document-contribution.md
+++ b/contribute/document-contribution.md
@@ -77,21 +77,23 @@ Docs for external command-line tools or bare scripts are 
updated **manually**:
 | pulsar-daemon | 
[pulsar-daemon.md](https://github.com/apache/pulsar-site/blob/main/static/reference/next/pulsar-daemon/pulsar-daemon.md)
 |
 | bookkeeper| 
[bookkeeper.md](https://github.com/apache/pulsar-site/blob/main/static/reference/next/bookkeeper/bookkeeper.md)
  |
 
-## Update client/function matrix
+## Update feature matrix
 
-[Pulsar Feature 
Matrix](https://docs.google.com/spreadsheets/d/1YHYTkIXR8-Ql103u-IMI18TXLlGStK8uJjDsOOA0T20/edit#gid=1784579914)
 outlines every feature supported by the Pulsar client and function.
+Pulsar feature matrix introduces the features supported by language-specific 
clients and functions. It includes:
+* [Client Feature Matrix](pathname:///client-feature-matrix)
+* [Function Feature 
Matrix](https://docs.google.com/spreadsheets/d/1YHYTkIXR8-Ql103u-IMI18TXLlGStK8uJjDsOOA0T20/edit#gid=328808194)
 
-:::note
+You need to update the feature matrix as soon as your related commits get 
merged. The workflow is illustrated as follows.
 
-* It's public and everyone has access to edit it. You can reach out to 
`d...@pulsar.apache.org` if you have problems in editing.
-* This matrix will be moved to the Pulsar website (instead of the spreadsheet) 
in the future.
+![Client Feature Matrix Workflow](media/client-matrix-workflow.png)
 
-:::
+1. Submit your code and doc PRs.
+2. Get your PRs reviewed and merged.
+3. Update the feature matrix to flag your contribution.
 
-If you want to update the Pulsar Feature Matrix, follow the steps below.
+:::note
 
-1. Submit your code and doc PRs.
-2. Get your PR reviewed and merged.
-3. In the [Pulsar Feature 
Matrix](https://docs.google.com/spreadsheets/d/1YHYTkIXR8-Ql103u-IMI18TXLlGStK8uJjDsOOA0T20/edit#gid=1784579914),
 check the box in the corresponding cell with the links of PRs and doc site.
+* [Client Feature Matrix](pathname:///client-feature-matrix) is in the 
transition phase from the spreadsheet to the webpage. For how to update it, see 
[How to update data-driven 
pages](site-intro.md#how-to-update-data-driven-pages).
+* If you have problems in editing the spreadsheet of [Function Feature 
Matrix](https://docs.google.com/spreadsheets/d/1YHYTkIXR8-Ql103u-IMI18TXLlGStK8uJjDsOOA0T20/edit#gid=328808194),
 you can reach out to `d...@pulsar.apache.org`.
 
-![Client Feature Matrix Workflow](media/client-matrix-workflow.png)
+:::
\ No newline at end of file
diff --git a/contribute/site-intro.md b/contribute/site-intro.md
index 7c1a3db2270..3e5567626a7 100644
--- a/contribute/site-intro.md
+++ b/contribute/site-intro.md
@@ -1,6 +1,6 @@
 # Introduction
 
-The Pulsar site is built with [Docusaurus](http://docusaurus.io/) framework. 
You can find all technical details on [its docs](https://docusaurus.io/docs).
+The Pulsar site is built with [Docusaurus](http://docusaurus.io/) framework. 
You can find all the technical details on [its 
docs](https://docusaurus.io/docs).
 
 Specifically, this chapter provides a [writing syntax](document-syntax.md) 
guide selecting knowledge for writing content of the site.
 
@@ -14,14 +14,15 @@ Docusaurus provides three kinds of pages out-of-the-box: 
[docs](https://docusaur
 
 The Pulsar site pages are of:
 
-| Page   | Type  | Source  
 

[GitHub] [pulsar-site] momo-jun merged pull request #546: [improve][doc] Update links for new Client/Feature Matrix page

2023-05-07 Thread via GitHub


momo-jun merged PR #546:
URL: https://github.com/apache/pulsar-site/pull/546


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar-site] Anonymitaet commented on pull request #542: Add doc for available-functions and functions reload

2023-05-07 Thread via GitHub


Anonymitaet commented on PR #542:
URL: https://github.com/apache/pulsar-site/pull/542#issuecomment-1537604351

   Can you apply the same doc changes to the 3.0.x doc set (it was released 
last week)?
   
   > Those commands are available starting Pulsar 3.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #20233: [fix][broker] Fix the behavior of delayed message in Key_Shared mode

2023-05-07 Thread via GitHub


codelipenghui commented on PR #20233:
URL: https://github.com/apache/pulsar/pull/20233#issuecomment-1537593154

   @poorbarcode The 
[document](https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#delayed-message-delivery)
 said
   
   ```
   Only shared and key-shared subscriptions support delayed message delivery. 
In other subscriptions, delayed messages are dispatched immediately.
   ```
   
   The key-shared subscription can allow out-of-order delivery, or we can say a 
loose ordering guarantee. So it still makes sense to let them work together.
   
   But we should have clear documentation for the expected behavior of the 
consumption of the delayed message under a key-shared subscription.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #20233: [fix][broker] Fix the behavior of delayed message in Key_Shared mode

2023-05-07 Thread via GitHub


codelipenghui commented on PR #20233:
URL: https://github.com/apache/pulsar/pull/20233#issuecomment-1537590598

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar-adapters] avinash-fk opened a new issue, #50: Shaded client throw NoSuchMethodError when using a http url for bootstrap servers

2023-05-07 Thread via GitHub


avinash-fk opened a new issue, #50:
URL: https://github.com/apache/pulsar-adapters/issues/50

   **Describe the bug**
   
   When using the shaded kafka pulsar adapter with HTTP URL for 
bootstrap.servers, results in the following error
   
   ```java
   Exception in thread "main" java.lang.NoSuchMethodError: 
org.asynchttpclient.DefaultAsyncHttpClientConfig$Builder.setEventLoopGroup(Lorg/apache/pulsar/shade/io/netty/channel/EventLoopGroup;)Lorg/asynchttpclient/DefaultAsyncHttpClientConfig$Builder;
at org.apache.pulsar.client.impl.HttpClient.(HttpClient.java:158)
at 
org.apache.pulsar.client.impl.HttpLookupService.(HttpLookupService.java:67)
at 
org.apache.pulsar.client.impl.PulsarClientImpl.(PulsarClientImpl.java:207)
at 
org.apache.pulsar.client.impl.PulsarClientImpl.(PulsarClientImpl.java:154)
at 
org.apache.pulsar.client.impl.ClientBuilderImpl.build(ClientBuilderImpl.java:63)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:151)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:106)
at 
com.flipkart.viesti.producer.KafkaProducerClient.(KafkaProducerClient.java:41)
at 
com.flipkart.viesti.producer.ProducerMain$SyncProduce.singleThreaded(ProducerMain.java:32)
at com.flipkart.viesti.producer.ProducerMain.main(ProducerMain.java:19)
   ```
   
   Looks like async HTTP client is not part of the shaded jar and netty 
included in leading to this issue.
   
   **To Reproduce**
   Run a producer or consumer with the shaded version of the adapter as the 
dependency.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[pulsar] branch branch-3.0 updated: [fix] [broker] Fix infinite ack of Replicator after topic is closed (#20232)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 20c2b5bd653 [fix] [broker] Fix infinite ack of Replicator after topic 
is closed (#20232)
20c2b5bd653 is described below

commit 20c2b5bd653c9c738279f2bd41a7eaa2da797b8f
Author: fengyubiao 
AuthorDate: Sun May 7 14:21:51 2023 +0800

[fix] [broker] Fix infinite ack of Replicator after topic is closed (#20232)

(cherry picked from commit 98413642995eb6e562f6a591dcf56e20ac0cc7ef)
---
 .../service/persistent/PersistentReplicator.java   |  7 
 .../pulsar/broker/service/ReplicatorTest.java  | 45 +++---
 2 files changed, 38 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index a556237f434..d882cbf56b2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -545,6 +545,13 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
 public void deleteFailed(ManagedLedgerException exception, Object ctx) {
 log.error("[{}] Failed to delete message at {}: {}", replicatorId, ctx,
 exception.getMessage(), exception);
+if (exception instanceof CursorAlreadyClosedException) {
+log.error("[{}] Asynchronous ack failure because replicator is 
already deleted and cursor is already"
++ " closed {}, ({})", replicatorId, ctx, 
exception.getMessage(), exception);
+// replicator is already deleted and cursor is already closed so, 
producer should also be stopped
+closeProducerAsync();
+return;
+}
 if (ctx instanceof PositionImpl) {
 PositionImpl deletedEntry = (PositionImpl) ctx;
 if (deletedEntry.compareTo((PositionImpl) 
cursor.getMarkDeletedPosition()) > 0) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 901451c022b..176eab0e94b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
@@ -51,8 +52,10 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -60,7 +63,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.State;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -1645,20 +1647,41 @@ public class ReplicatorTest extends ReplicatorTestBase {
 log.info("--- Starting ReplicatorTest::testReplication ---");
 
 String namespace = "pulsar/global/ns2";
-admin1.namespaces().createNamespace(namespace);
-admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2"));
+admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1"));
 final TopicName dest = TopicName
 .get(BrokerTestUtil.newUniqueName("persistent://" + namespace 
+ "/ackFailedTopic"));
 
 @Cleanup
 MessageProducer producer1 = new MessageProducer(url1, dest);
-log.info("--- Starting producer --- " + url1);
 
+PersistentTopic topic = (PersistentTopic) 

[pulsar] branch branch-3.0 updated: [fix] [broker] Producer created by replicator is not displayed in topic stats (#20229)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new d1348522d82 [fix] [broker] Producer created by replicator is not 
displayed in topic stats (#20229)
d1348522d82 is described below

commit d1348522d82f7fd972b26af6fd23e55049b0943e
Author: fengyubiao 
AuthorDate: Sat May 6 13:46:05 2023 +0800

[fix] [broker] Producer created by replicator is not displayed in topic 
stats (#20229)

### Motivation

A producer of the remote cluster is automatically created when replication 
is turned on. But we can't see anything about it from the response of `(remote 
cluster) pulsar-admin topic stats`

### Modifications

Make this producer displayed in the topic stats

(cherry picked from commit 9f7a539593de57ad8c4d224fde81a9e04ac38494)
---
 .../broker/service/persistent/PersistentTopic.java |   3 +-
 .../broker/service/OneWayReplicatorTest.java   |  78 
 .../broker/service/OneWayReplicatorTestBase.java   | 219 +
 3 files changed, 298 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index fe181bb1c01..15854f55c5c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2226,9 +2226,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
 if (producer.isRemote()) {
 remotePublishersStats.put(producer.getRemoteCluster(), 
publisherStats);
-} else {
-stats.addPublisher(publisherStats);
 }
+stats.addPublisher(publisherStats);
 });
 
 stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : 
(stats.msgThroughputIn / stats.msgRateIn);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
new file mode 100644
index 000..e8a21502fb1
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.junit.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class OneWayReplicatorTest extends OneWayReplicatorTestBase {
+
+@Override
+@BeforeClass(alwaysRun = true, timeOut = 30)
+public void setup() throws Exception {
+super.setup();
+}
+
+@Override
+@AfterClass(alwaysRun = true, timeOut = 30)
+public void cleanup() throws Exception {
+super.cleanup();
+}
+
+@Test
+public void testReplicatorProducerStatInTopic() throws Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ defaultNamespace + "/tp_");
+final String subscribeName = "subscribe_1";
+final byte[] msgValue = "test".getBytes();
+
+admin1.topics().createNonPartitionedTopic(topicName);
+admin2.topics().createNonPartitionedTopic(topicName);
+admin1.topics().createSubscription(topicName, subscribeName, 
MessageId.earliest);
+admin2.topics().createSubscription(topicName, subscribeName, 
MessageId.earliest);
+
+// Verify replicator works.
+Producer producer1 = 

[pulsar] branch branch-3.0 updated: [fix][broker] Fix Return value of getPartitionedStats doesn't contain subscription type (#20210)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 10019391ea7 [fix][broker] Fix Return value of getPartitionedStats 
doesn't contain subscription type (#20210)
10019391ea7 is described below

commit 10019391ea7491965d12371d8f614eb0783d7362
Author: jack zhang 
AuthorDate: Fri May 5 21:25:14 2023 +0800

[fix][broker] Fix Return value of getPartitionedStats doesn't contain 
subscription type (#20210)

(cherry picked from commit 4ac117ffc86111e7c38ddadbf933c8be0749f507)
---
 .../org/apache/pulsar/broker/admin/AdminApiTest.java   | 18 ++
 .../policies/data/stats/SubscriptionStatsImpl.java |  2 ++
 2 files changed, 20 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index e33108203cc..f0ac63f08ae 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -1266,6 +1266,24 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 0);
 }
 
+@Test
+public void testGetPartitionedStatsContainSubscriptionType() throws 
Exception {
+final String topic = "persistent://prop-xyz/ns1/my-topic" + 
UUID.randomUUID();
+final int numPartitions = 4;
+admin.topics().createPartitionedTopic(topic, numPartitions);
+
+// create consumer and subscription
+final String subName = "my-sub";
+@Cleanup Consumer exclusiveConsumer = 
pulsarClient.newConsumer().topic(topic)
+.subscriptionName(subName)
+.subscriptionType(SubscriptionType.Exclusive)
+.subscribe();
+
+TopicStats topicStats = admin.topics().getPartitionedStats(topic, 
false);
+assertEquals(topicStats.getSubscriptions().size(), 1);
+assertEquals(topicStats.getSubscriptions().get(subName).getType(), 
SubscriptionType.Exclusive.toString());
+}
+
 
 @Test
 public void testGetPartitionedStatsInternal() throws Exception {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index 7ae439829b3..ea7639a8cd2 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -166,6 +166,7 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
 backlogSize = 0;
 msgBacklogNoDelayed = 0;
 unackedMessages = 0;
+type = null;
 msgRateExpired = 0;
 totalMsgExpired = 0;
 lastExpireTimestamp = 0L;
@@ -199,6 +200,7 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
 this.msgBacklogNoDelayed += stats.msgBacklogNoDelayed;
 this.msgDelayed += stats.msgDelayed;
 this.unackedMessages += stats.unackedMessages;
+this.type = stats.type;
 this.msgRateExpired += stats.msgRateExpired;
 this.totalMsgExpired += stats.totalMsgExpired;
 this.isReplicated |= stats.isReplicated;



[pulsar] branch branch-3.0 updated: [fix][broker] Fix the reason label of authentication metrics (#20030)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 21c2eddeb8a [fix][broker] Fix the reason label of authentication 
metrics (#20030)
21c2eddeb8a is described below

commit 21c2eddeb8a912b5ad88f7c4071d6e3f71627c22
Author: ran 
AuthorDate: Fri May 5 08:48:59 2023 +0800

[fix][broker] Fix the reason label of authentication metrics (#20030)

(cherry picked from commit 2b515ffb389c4b4fe3cb5a9c5f3d7eb0a4c9ef99)
---
 .../AuthenticationProviderAthenz.java  | 20 --
 .../oidc/AuthenticationProviderOpenID.java |  2 +-
 .../authentication/AuthenticationProvider.java |  5 
 .../AuthenticationProviderBasic.java   | 21 ---
 .../authentication/AuthenticationProviderList.java | 20 +-
 .../authentication/AuthenticationProviderTls.java  | 12 +++--
 .../AuthenticationProviderToken.java   | 31 ++
 .../metrics/AuthenticationMetrics.java | 16 +++
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  6 ++---
 9 files changed, 103 insertions(+), 30 deletions(-)

diff --git 
a/pulsar-broker-auth-athenz/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenz.java
 
b/pulsar-broker-auth-athenz/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenz.java
index 2e062b87a83..652a922b9a5 100644
--- 
a/pulsar-broker-auth-athenz/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenz.java
+++ 
b/pulsar-broker-auth-athenz/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenz.java
@@ -43,6 +43,15 @@ public class AuthenticationProviderAthenz implements 
AuthenticationProvider {
 private List domainNameList = null;
 private int allowedOffset = 30;
 
+public enum ErrorCode {
+UNKNOWN,
+NO_CLIENT,
+NO_TOKEN,
+NO_PUBLIC_KEY,
+DOMAIN_MISMATCH,
+INVALID_TOKEN,
+}
+
 @Override
 public void initialize(ServiceConfiguration config) throws IOException {
 String domainNames;
@@ -81,11 +90,13 @@ public class AuthenticationProviderAthenz implements 
AuthenticationProvider {
 public String authenticate(AuthenticationDataSource authData) throws 
AuthenticationException {
 SocketAddress clientAddress;
 String roleToken;
+ErrorCode errorCode = ErrorCode.UNKNOWN;
 try {
 
 if (authData.hasDataFromPeer()) {
 clientAddress = authData.getPeerAddress();
 } else {
+errorCode = ErrorCode.NO_CLIENT;
 throw new AuthenticationException("Authentication data source 
does not have a client address");
 }
 
@@ -94,13 +105,16 @@ public class AuthenticationProviderAthenz implements 
AuthenticationProvider {
 } else if (authData.hasDataFromHttp()) {
 roleToken = 
authData.getHttpHeader(AuthZpeClient.ZPE_TOKEN_HDR);
 } else {
+errorCode = ErrorCode.NO_TOKEN;
 throw new AuthenticationException("Authentication data source 
does not have a role token");
 }
 
 if (roleToken == null) {
+errorCode = ErrorCode.NO_TOKEN;
 throw new AuthenticationException("Athenz token is null, can't 
authenticate");
 }
 if (roleToken.isEmpty()) {
+errorCode = ErrorCode.NO_TOKEN;
 throw new AuthenticationException("Athenz RoleToken is empty, 
Server is Using Athenz Authentication");
 }
 if (log.isDebugEnabled()) {
@@ -110,6 +124,7 @@ public class AuthenticationProviderAthenz implements 
AuthenticationProvider {
 RoleToken token = new RoleToken(roleToken);
 
 if (!domainNameList.contains(token.getDomain())) {
+errorCode = ErrorCode.DOMAIN_MISMATCH;
 throw new AuthenticationException(
 String.format("Athenz RoleToken Domain mismatch, 
Expected: %s, Found: %s",
 domainNameList.toString(), token.getDomain()));
@@ -120,6 +135,7 @@ public class AuthenticationProviderAthenz implements 
AuthenticationProvider {
 PublicKey ztsPublicKey = 
AuthZpeClient.getZtsPublicKey(token.getKeyId());
 
 if (ztsPublicKey == null) {
+errorCode = ErrorCode.NO_PUBLIC_KEY;
 throw new AuthenticationException("Unable to retrieve ZTS 
Public Key");
 }
 
@@ -128,13 +144,13 @@ public class AuthenticationProviderAthenz implements 
AuthenticationProvider {
 
AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), 
getAuthMethodName());
 

[pulsar] branch branch-3.0 updated: [fix][docs] Remove old template inlined (#20208)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new e184605f15f [fix][docs] Remove old template inlined (#20208)
e184605f15f is described below

commit e184605f15fae02e088a5f9d685a3ce24006b541
Author: Asaf Mesika 
AuthorDate: Sun Apr 30 12:58:53 2023 +0300

[fix][docs] Remove old template inlined (#20208)

(cherry picked from commit e8d63952a40b79499eb3fd524f80db6bc986ed34)
---
 wiki/proposals/PIP.md | 35 +--
 1 file changed, 1 insertion(+), 34 deletions(-)

diff --git a/wiki/proposals/PIP.md b/wiki/proposals/PIP.md
index 159c9199895..fc68905726e 100644
--- a/wiki/proposals/PIP.md
+++ b/wiki/proposals/PIP.md
@@ -116,38 +116,5 @@ Some examples:
 
 ## Template for a PIP design doc
 
-```
-## Motivation
+Read [the template file](/.github/ISSUE_TEMPLATE/pip.md).
 
-Explain why this change is needed, what benefits it would bring to Apache 
Pulsar
-and what problem it's trying to solve.
-
-## Goal
-
-Define the scope of this proposal. Given the motivation stated above, what are
-the problems that this proposal is addressing and what other items will be
-considering out of scope, perhaps to be left to a different PIP.
-
-## API Changes
-
-Illustrate all the proposed changes to the API or wire protocol, with examples
-of all the newly added classes/methods, including Javadoc.
-
-## Implementation
-
-This should be a detailed description of all the changes that are
-expected to be made. It should be detailed enough that any developer that is
-familiar with Pulsar internals would be able to understand all the parts of the
-code changes for this proposal.
-
-This should also serve as documentation for any person that is trying to
-understand or debug the behavior of a certain feature.
-
-
-## Reject Alternatives
-
-If there are alternatives that were already considered by the authors or,
-after the discussion, by the community, and were rejected, please list them
-here along with the reason why they were rejected.
-
-```



[pulsar] branch branch-3.0 updated: [improve] [broker] Skip split boundle if only one broker (#20190)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 89dafa5e031 [improve] [broker] Skip split boundle if only one broker 
(#20190)
89dafa5e031 is described below

commit 89dafa5e03129b4f35db31232fd816acd454cf11
Author: fengyubiao 
AuthorDate: Sat Apr 29 01:54:05 2023 +0800

[improve] [broker] Skip split boundle if only one broker (#20190)

Co-authored-by: Zixuan Liu 
(cherry picked from commit d135c4a115038dc61f8fe2d230cb1f0c02239f92)
---
 .../loadbalance/impl/ModularLoadManagerImpl.java   |  2 +-
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  9 
 .../pulsar/client/api/BrokerServiceLookupTest.java | 58 ++
 3 files changed, 68 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index c6f406a7c8a..30a2ef5cdf2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -744,7 +744,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
 public void checkNamespaceBundleSplit() {
 
 if (!conf.isLoadBalancerAutoBundleSplitEnabled() || 
pulsar.getLeaderElectionService() == null
-|| !pulsar.getLeaderElectionService().isLeader()) {
+|| !pulsar.getLeaderElectionService().isLeader() || 
knownBrokers.size() <= 1) {
 return;
 }
 final boolean unloadSplitBundles = 
pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index f5ae8459f18..bf141e10aa1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -38,6 +38,7 @@ import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -80,6 +81,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.zookeeper.CreateMode;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -772,6 +774,10 @@ public class PrometheusMetricsTest extends BrokerTestBase {
 c1.acknowledge(c1.receive());
 }
 
+// Mock another broker to make split task work.
+String mockedBroker = "/loadbalance/brokers/127.0.0.1:0";
+mockZooKeeper.create(mockedBroker, new byte[]{0}, 
Collections.emptyList(), CreateMode.EPHEMERAL);
+
 pulsar.getBrokerService().updateRates();
 Awaitility.await().untilAsserted(() -> 
assertTrue(pulsar.getBrokerService().getBundleStats().size() > 0));
 ModularLoadManagerWrapper loadManager = 
(ModularLoadManagerWrapper)pulsar.getLoadManager().get();
@@ -796,6 +802,9 @@ public class PrometheusMetricsTest extends BrokerTestBase {
 assertTrue(metrics.containsKey("pulsar_lb_bandwidth_out_usage"));
 
 assertTrue(metrics.containsKey("pulsar_lb_bundles_split_total"));
+
+// cleanup.
+mockZooKeeper.delete(mockedBroker, 0);
 }
 
 @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 2af9f450dd3..8597e0a8799 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -67,6 +67,7 @@ import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManager;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -80,6 +81,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.common.naming.NamespaceBundle;
+import 

[pulsar] branch branch-3.0 updated: [fix][admin] Fix examine messages if total message is zero (#20152)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 8da40f0f033 [fix][admin] Fix examine messages if total message is zero 
(#20152)
8da40f0f033 is described below

commit 8da40f0f033cfad7d7d8a2b005ced69e549f7e86
Author: houxiaoyu 
AuthorDate: Fri Apr 28 16:46:51 2023 +0800

[fix][admin] Fix examine messages if total message is zero (#20152)

(cherry picked from commit 8b7aa6312b2af49d0c6edde85c6b0108007a0a96)
---
 .../org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 4 
 .../java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java | 8 
 2 files changed, 12 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 6b163ae0446..fcade8270cb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3061,6 +3061,10 @@ public class PersistentTopicsBase extends AdminResource {
 try {
 PersistentTopic persistentTopic = (PersistentTopic) 
topic;
 long totalMessage = 
persistentTopic.getNumberOfEntries();
+if (totalMessage <= 0) {
+throw new RestException(Status.PRECONDITION_FAILED,
+"Could not examine messages due to the 
total message is zero");
+}
 PositionImpl startPosition = 
persistentTopic.getFirstPosition();
 
 long messageToSkip = 
initialPositionLocal.equals("earliest") ? messagePositionLocal :
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index f80d9863a26..284e50c8302 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -1151,6 +1151,14 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
 + "topic partition");
 }
 
+try {
+admin.topics().examineMessage(topicName + "-partition-0", 
"earliest", 1);
+Assert.fail();
+} catch (PulsarAdminException e) {
+Assert.assertEquals(e.getMessage(),
+"Could not examine messages due to the total message is 
zero");
+}
+
 producer.send("message1");
 Assert.assertEquals(
 new String(admin.topics().examineMessage(topicName + 
"-partition-0", "earliest", 1).getData()),



[pulsar] branch branch-3.0 updated: [fix][test] Fix flaky test `ConcurrentBitmapSortedLongPairSetTest` (#20165)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 29eee9495ce [fix][test] Fix flaky test 
`ConcurrentBitmapSortedLongPairSetTest` (#20165)
29eee9495ce is described below

commit 29eee9495cea82cc3547d238d73611c246adae35
Author: Jiwei Guo 
AuthorDate: Wed Apr 26 22:16:39 2023 +0800

[fix][test] Fix flaky test `ConcurrentBitmapSortedLongPairSetTest` (#20165)

(cherry picked from commit 0ec576b25fdd7e4890799184ac03f66deb4735f6)
---
 .../pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java   | 8 ++--
 1 file changed, 2 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java
index 47379c3b6f1..5f8f13288cf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java
@@ -26,7 +26,6 @@ import 
org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
 import org.testng.annotations.Test;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -188,15 +187,12 @@ public class ConcurrentBitmapSortedLongPairSetTest {
 List> futures = new ArrayList<>();
 for (int i = 0; i < nThreads; i++) {
 final int threadIdx = i;
-
 futures.add(executor.submit(() -> {
-Random random = new Random();
 
+int start = N * (threadIdx + 1);
 for (int j = 0; j < N; j++) {
-int key = random.nextInt();
+int key = start + j;
 // Ensure keys are unique
-key -= key % (threadIdx + 1);
-key = Math.abs(key);
 set.add(key, key);
 }
 }));



[pulsar] branch branch-3.0 updated (43e81029c98 -> 6ca4029af15)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 43e81029c98 [fix][meta] deadlock of zkSessionWatcher when zkConnection 
loss (#20122)
 new 66f04fafa46 [fix][broker] Remove useless field in the 
DelayedMessageIndexBucketSegment.proto (#20166)
 new 6ca4029af15 [fix][broker] Release EntryBuffer after parse proto object 
(#20170)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bucket/BookkeeperBucketSnapshotStorage.java| 30 ++
 .../proto/DelayedMessageIndexBucketSegment.proto   |  1 -
 2 files changed, 25 insertions(+), 6 deletions(-)



[pulsar] 01/02: [fix][broker] Remove useless field in the DelayedMessageIndexBucketSegment.proto (#20166)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 66f04fafa46a9ca96861785ae8a086b6c2886a6b
Author: Cong Zhao 
AuthorDate: Mon Apr 24 00:26:23 2023 +0800

[fix][broker] Remove useless field in the 
DelayedMessageIndexBucketSegment.proto (#20166)

(cherry picked from commit ab8a8c9ef58c0d3b7c0838df81f9762637c24f90)
---
 pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto | 1 -
 1 file changed, 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto 
b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto
index 633d6a8f161..a6ed30cfe8c 100644
--- a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto
+++ b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto
@@ -31,5 +31,4 @@ message DelayedIndex {
 
 message SnapshotSegment {
 repeated DelayedIndex indexes = 1;
-map delayed_index_bit_map = 2;
 }



[pulsar] 02/02: [fix][broker] Release EntryBuffer after parse proto object (#20170)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6ca4029af151903d78c9dd46f099761e54c726b6
Author: Cong Zhao 
AuthorDate: Mon Apr 24 10:34:20 2023 +0800

[fix][broker] Release EntryBuffer after parse proto object (#20170)

(cherry picked from commit 35e9897742b7db4bd29349940075a819b2ad6999)
---
 .../bucket/BookkeeperBucketSnapshotStorage.java| 30 ++
 1 file changed, 25 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index 040bbbc586f..e99f39b382f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.delayed.bucket;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
@@ -38,6 +39,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
 import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.util.FutureUtil;
 
 @Slf4j
@@ -60,8 +62,9 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
 public CompletableFuture createBucketSnapshot(SnapshotMetadata 
snapshotMetadata,
 List 
bucketSnapshotSegments,
 String bucketKey, 
String topicName, String cursorName) {
+ByteBuf metadataByteBuf = 
Unpooled.wrappedBuffer(snapshotMetadata.toByteArray());
 return createLedger(bucketKey, topicName, cursorName)
-.thenCompose(ledgerHandle -> addEntry(ledgerHandle, 
snapshotMetadata.toByteArray())
+.thenCompose(ledgerHandle -> addEntry(ledgerHandle, 
metadataByteBuf)
 .thenCompose(__ -> addSnapshotSegments(ledgerHandle, 
bucketSnapshotSegments))
 .thenCompose(__ -> closeLedger(ledgerHandle))
 .thenApply(__ -> ledgerHandle.getId()));
@@ -117,19 +120,32 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
 private CompletableFuture addSnapshotSegments(LedgerHandle 
ledgerHandle,
 List 
bucketSnapshotSegments) {
 List> addFutures = new ArrayList<>();
+ByteBuf byteBuf;
 for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
-addFutures.add(addEntry(ledgerHandle, 
bucketSnapshotSegment.toByteArray()));
+byteBuf = 
PulsarByteBufAllocator.DEFAULT.directBuffer(bucketSnapshotSegment.getSerializedSize());
+try {
+bucketSnapshotSegment.writeTo(byteBuf);
+} catch (Exception e){
+byteBuf.release();
+throw e;
+}
+addFutures.add(addEntry(ledgerHandle, byteBuf));
 }
 
 return FutureUtil.waitForAll(addFutures);
 }
 
 private SnapshotMetadata parseSnapshotMetadataEntry(LedgerEntry 
ledgerEntry) {
+ByteBuf entryBuffer = null;
 try {
-ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
+entryBuffer = ledgerEntry.getEntryBuffer();
 return SnapshotMetadata.parseFrom(entryBuffer.nioBuffer());
 } catch (InvalidProtocolBufferException e) {
 throw new BucketSnapshotSerializationException(e);
+} finally {
+if (entryBuffer != null) {
+entryBuffer.release();
+}
 }
 }
 
@@ -139,7 +155,11 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
 LedgerEntry ledgerEntry = entryEnumeration.nextElement();
 SnapshotSegment snapshotSegment = new SnapshotSegment();
 ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
-snapshotSegment.parseFrom(entryBuffer, 
entryBuffer.readableBytes());
+try {
+snapshotSegment.parseFrom(entryBuffer, 
entryBuffer.readableBytes());
+} finally {
+entryBuffer.release();
+}
 snapshotMetadataList.add(snapshotSegment);
 }
 return snapshotMetadataList;
@@ -208,7 +228,7 @@ public class BookkeeperBucketSnapshotStorage implements 

[GitHub] [pulsar] poorbarcode commented on pull request #20028: [improve][cli] Add `--cleanupSubscription` to pulsar-admin

2023-05-07 Thread via GitHub


poorbarcode commented on PR #20028:
URL: https://github.com/apache/pulsar/pull/20028#issuecomment-1537564507

   @jiangpengcheng @Anonymitaet 
   
   If it should be cherry-picked into `branch-3.0`, please ping me


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[pulsar] branch branch-3.0 updated: [fix][meta] deadlock of zkSessionWatcher when zkConnection loss (#20122)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 43e81029c98 [fix][meta] deadlock of zkSessionWatcher when zkConnection 
loss (#20122)
43e81029c98 is described below

commit 43e81029c98fa72070200684423a6baabada7849
Author: tiny-rain 
AuthorDate: Sun Apr 23 13:45:03 2023 +0800

[fix][meta] deadlock of zkSessionWatcher when zkConnection loss (#20122)

(cherry picked from commit d3d36bdbd22a1b1fa6cee41a91acd9ec8a4c9341)
---
 .../java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java| 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
index 1ce01f57d4f..a8407210230 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
@@ -81,7 +81,11 @@ public class ZKSessionWatcher implements AutoCloseable, 
Watcher {
 }
 
 // task that runs every TICK_TIME to check zk connection
-private synchronized void checkConnectionStatus() {
+// NOT ThreadSafe:
+// If zk client can't ensure the order, it may lead to problems.
+// Currently,we only use it in single thread, it will be fine. but we 
shouldn't leave any potential problems
+// in the future.
+private void checkConnectionStatus() {
 try {
 CompletableFuture future = new 
CompletableFuture<>();
 zk.exists("/", false, (StatCallback) (rc, path, ctx, stat) -> {
@@ -126,7 +130,7 @@ public class ZKSessionWatcher implements AutoCloseable, 
Watcher {
 currentStatus = SessionEvent.SessionLost;
 }
 
-private void checkState(Watcher.Event.KeeperState zkClientState) {
+private synchronized void checkState(Watcher.Event.KeeperState 
zkClientState) {
 switch (zkClientState) {
 case Expired:
 if (currentStatus != SessionEvent.SessionLost) {



[pulsar] branch branch-3.0 updated: [fix][broker] Fix getPartitionedStats miss subscription's messageAckRate (#19870)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new cde687a8e85 [fix][broker] Fix getPartitionedStats miss subscription's 
messageAckRate (#19870)
cde687a8e85 is described below

commit cde687a8e853b6944e5f8942bcbb9fb29f80e632
Author: WangJialing <65590138+wangjialing...@users.noreply.github.com>
AuthorDate: Sun Apr 23 10:07:02 2023 +0800

[fix][broker] Fix getPartitionedStats miss subscription's messageAckRate 
(#19870)

(cherry picked from commit d3158bfebf9565ea9422a9116749628648e9f90d)
---
 .../pulsar/common/policies/data/stats/SubscriptionStatsImpl.java  | 4 
 1 file changed, 4 insertions(+)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index 25fa666523f..7ae439829b3 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -160,6 +160,8 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
 bytesOutCounter = 0;
 msgOutCounter = 0;
 msgRateRedeliver = 0;
+messageAckRate = 0;
+chunkedMessageRate = 0;
 msgBacklog = 0;
 backlogSize = 0;
 msgBacklogNoDelayed = 0;
@@ -190,6 +192,8 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
 this.bytesOutCounter += stats.bytesOutCounter;
 this.msgOutCounter += stats.msgOutCounter;
 this.msgRateRedeliver += stats.msgRateRedeliver;
+this.messageAckRate += stats.messageAckRate;
+this.chunkedMessageRate += stats.chunkedMessageRate;
 this.msgBacklog += stats.msgBacklog;
 this.backlogSize += stats.backlogSize;
 this.msgBacklogNoDelayed += stats.msgBacklogNoDelayed;



[pulsar] branch branch-3.0 updated: [fix] [broker] Fast fix infinite HTTP call getSubscriptions caused by wrong topicName (#20131)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 214a6bd3f55 [fix] [broker] Fast fix infinite HTTP call 
getSubscriptions caused by wrong topicName (#20131)
214a6bd3f55 is described below

commit 214a6bd3f55048ebd39efdc999a801b52fa8a0a1
Author: fengyubiao 
AuthorDate: Sun Apr 23 09:41:28 2023 +0800

[fix] [broker] Fast fix infinite HTTP call getSubscriptions caused by wrong 
topicName (#20131)

(cherry picked from commit 0c50866fbc18525f82a04c3a918628b8b50a4de8)
---
 .../broker/admin/impl/PersistentTopicsBase.java|  32 +++--
 ...ameForInfiniteHttpCallGetSubscriptionsTest.java | 143 +
 2 files changed, 167 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 7347d6dbf20..6b163ae0446 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin.impl;
 
 import static 
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionCoordinatorAssign;
 import static 
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
+import static 
org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectReader;
 import com.github.zafarkhaja.semver.Version;
@@ -1171,6 +1172,21 @@ public class PersistentTopicsBase extends AdminResource {
 .thenCompose(__ -> 
pulsar().getBrokerService().deleteTopic(topicName.toString(), force));
 }
 
+/**
+ * There has a known bug will make Pulsar misidentifies 
"tp-partition-0-DLQ-partition-0" as "tp-partition-0-DLQ".
+ * You can see the details from PR 
https://github.com/apache/pulsar/pull/19841.
+ * This method is a quick fix and will be removed in master branch after 
#19841 and PIP 263 are done.
+ */
+private boolean isUnexpectedTopicName(PartitionedTopicMetadata 
topicMetadata) {
+if (!topicName.toString().contains(PARTITIONED_TOPIC_SUFFIX)){
+return false;
+}
+if (topicMetadata.partitions <= 0){
+return false;
+}
+return 
topicName.getPartition(0).toString().equals(topicName.toString());
+}
+
 protected void internalGetSubscriptions(AsyncResponse asyncResponse, 
boolean authoritative) {
 CompletableFuture future;
 if (topicName.isGlobal()) {
@@ -1188,7 +1204,7 @@ public class PersistentTopicsBase extends AdminResource {
 } else {
 getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
 .thenAccept(partitionMetadata -> {
-if (partitionMetadata.partitions > 0) {
+if (partitionMetadata.partitions > 0 && 
!isUnexpectedTopicName(partitionMetadata)) {
 try {
 final Set subscriptions =
 Collections.newSetFromMap(
@@ -3716,7 +3732,7 @@ public class PersistentTopicsBase extends AdminResource {
 .thenCompose(metadata -> {
 if (metadata.partitions > 0) {
 return 
validateTopicOwnershipAsync(TopicName.get(topicName.toString()
-+ TopicName.PARTITIONED_TOPIC_SUFFIX + 0), 
authoritative);
++ PARTITIONED_TOPIC_SUFFIX + 0), 
authoritative);
 } else {
 return 
validateTopicOwnershipAsync(topicName, authoritative);
 }
@@ -4543,7 +4559,7 @@ public class PersistentTopicsBase extends AdminResource {
 private CompletableFuture validatePartitionTopicUpdateAsync(String 
topicName, int numberOfPartition) {
 return internalGetListAsync().thenCompose(existingTopicList -> {
 TopicName partitionTopicName = TopicName.get(domain(), 
namespaceName, topicName);
-String prefix = partitionTopicName.getPartitionedTopicName() + 
TopicName.PARTITIONED_TOPIC_SUFFIX;
+String prefix = partitionTopicName.getPartitionedTopicName() + 
PARTITIONED_TOPIC_SUFFIX;
 return getPartitionedTopicMetadataAsync(partitionTopicName, false, 
false)
 .thenAccept(metadata -> {
 int oldPartition = metadata.partitions;
@@ -4551,8 +4567,8 @@ public class 

[pulsar] branch branch-3.0 updated: [fix][test] fix bug for testcase(PersistentTopicsTest#testExamineMessage) (#20154)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 54382a85a18 [fix][test] fix bug for 
testcase(PersistentTopicsTest#testExamineMessage) (#20154)
54382a85a18 is described below

commit 54382a85a188e700cd6abf1374df571d9eb4b9af
Author: StevenLuMT 
AuthorDate: Sat Apr 22 21:22:26 2023 +0800

[fix][test] fix bug for testcase(PersistentTopicsTest#testExamineMessage) 
(#20154)

Co-authored-by: lushiji 
(cherry picked from commit c58da14968f27cf2a4813db01f387b61a5a03636)
---
 .../test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java   | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 6949fe931ca..f80d9863a26 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -1144,6 +1144,7 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
 // Check examine message not allowed on partitioned topic.
 try {
 admin.topics().examineMessage(topicName, "earliest", 1);
+Assert.fail("fail to check examine message not allowed on 
partitioned topic");
 } catch (PulsarAdminException e) {
 Assert.assertEquals(e.getMessage(),
 "Examine messages on a partitioned topic is not allowed, 
please try examine message on specific "



[pulsar] branch branch-3.0 updated (0743c774fd6 -> 3a5ffb82eb7)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 0743c774fd6 [improve][broker] Optimization protobuf code in the bucket 
delayed tracker (#20158)
 new 044dd84966f [improve][schema] Add admin cli for testCompatibility 
(#19974)
 new 3a5ffb82eb7 [improve][broker] Move bitmap from lastMutableBucket to 
ImmutableBucket (#20156)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bucket/BucketDelayedDeliveryTracker.java   |  3 +-
 .../broker/delayed/bucket/MutableBucket.java   | 41 +++---
 .../pulsar/broker/admin/AdminApiSchemaTest.java| 29 +++
 .../protocol/schema/IsCompatibilityResponse.java   |  1 -
 .../pulsar/admin/cli/PulsarAdminToolTest.java  |  5 +++
 .../org/apache/pulsar/admin/cli/CmdSchemas.java| 17 +
 6 files changed, 82 insertions(+), 14 deletions(-)



[pulsar] 01/02: [improve][schema] Add admin cli for testCompatibility (#19974)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 044dd84966fe1faad725568456fbab26a75eebe8
Author: congbo <39078850+congbobo...@users.noreply.github.com>
AuthorDate: Fri Apr 21 09:57:06 2023 +0800

[improve][schema] Add admin cli for testCompatibility (#19974)











### Motivation

1. add admin cli `testCompatibility`
2. add `admin.schemas().testCompatibility()` test
### Modifications

1. add admin cli `testCompatibility`
2. add `admin.schemas().testCompatibility()` test
### Verifying this change
add test for it
- [ ] Make sure that the change passes the CI checks.

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
  - *Extended integration test for recovery after broker failure*

### Does this pull request potentially affect one of the following parts:



*If the box was checked, please highlight the changes*

- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [x] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment

### Documentation



- [ ] `doc` 
- [ ] `doc-required` 
- [x] `doc-not-needed` 
- [ ] `doc-complete` 

### Matching PR in forked repository

PR in forked repository: 
https://github.com/congbobo184/pulsar/pull/14



(cherry picked from commit fce6e737a7ff785859117b216cbb12f999ddeb94)
---
 .../pulsar/broker/admin/AdminApiSchemaTest.java| 29 ++
 .../protocol/schema/IsCompatibilityResponse.java   |  1 -
 .../pulsar/admin/cli/PulsarAdminToolTest.java  |  5 
 .../org/apache/pulsar/admin/cli/CmdSchemas.java| 17 +
 4 files changed, 51 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
index b37114f1802..f67bd6fcfce 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
@@ -49,6 +49,8 @@ import 
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import 
org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
+import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -438,4 +440,31 @@ public class AdminApiSchemaTest extends 
MockedPulsarServiceBaseTest {
 
admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace),
 SchemaCompatibilityStrategy.UNDEFINED));
 }
+
+@Test
+public void testCompatibility() throws Exception {
+String topicName = schemaCompatibilityNamespace + "/testCompatibility";
+try {
+admin.schemas().getSchemaInfo(topicName);
+fail();
+} catch (PulsarAdminException.NotFoundException e) {
+assertEquals(e.getMessage(), "Schema not found");
+}
+Map properties = new HashMap<>();
+PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", 
"", properties);
+admin.schemas().createSchema(topicName, postSchemaPayload);
+IsCompatibilityResponse isCompatibilityResponse =
+admin.schemas().testCompatibility(topicName, 
postSchemaPayload);
+
+assertTrue(isCompatibilityResponse.isCompatibility());
+assertEquals(isCompatibilityResponse.getSchemaCompatibilityStrategy(), 
SchemaCompatibilityStrategy.FULL.name());
+postSchemaPayload = new PostSchemaPayload("INT8", "", properties);
+try {
+admin.schemas().testCompatibility(topicName, postSchemaPayload);
+fail();
+} catch (Exception e) {
+assertTrue(e instanceof 

[pulsar] 02/02: [improve][broker] Move bitmap from lastMutableBucket to ImmutableBucket (#20156)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3a5ffb82eb7be4197db08d25de143b2dc9486269
Author: Cong Zhao 
AuthorDate: Fri Apr 21 16:06:22 2023 +0800

[improve][broker] Move bitmap from lastMutableBucket to ImmutableBucket 
(#20156)

(cherry picked from commit e5a833a2dcb7ce13ada4ca94714cc045a02de276)
---
 .../bucket/BucketDelayedDeliveryTracker.java   |  3 +-
 .../broker/delayed/bucket/MutableBucket.java   | 41 +++---
 2 files changed, 31 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index c90064c9137..67a7de1f013 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -168,7 +168,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 }
 
 try {
-
FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds * 2, 
TimeUnit.SECONDS);
+
FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds * 5, 
TimeUnit.SECONDS);
 } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
 log.error("[{}] Failed to recover delayed message index bucket 
snapshot.", dispatcher.getName(), e);
 if (e instanceof InterruptedException) {
@@ -343,6 +343,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 // If (ledgerId < startLedgerId || existBucket) means that message 
index belong to previous bucket range,
 // enter sharedBucketPriorityQueue directly
 sharedBucketPriorityQueue.add(deliverAt, ledgerId, entryId);
+lastMutableBucket.putIndexBit(ledgerId, entryId);
 } else {
 checkArgument(ledgerId >= lastMutableBucket.endLedgerId);
 lastMutableBucket.addMessage(ledgerId, entryId, deliverAt);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index b7e9e68f1bd..1173a401a89 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.broker.delayed.bucket;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import com.google.protobuf.ByteString;
+import com.google.protobuf.UnsafeByteOperations;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -74,6 +74,8 @@ class MutableBucket extends Bucket implements AutoCloseable {
 
 List bucketSnapshotSegments = new ArrayList<>();
 List segmentMetadataList = new ArrayList<>();
+Map immutableBucketBitMap = new HashMap<>();
+
 Map bitMap = new HashMap<>();
 SnapshotSegment snapshotSegment = new SnapshotSegment();
 SnapshotSegmentMetadata.Builder segmentMetadataBuilder = 
SnapshotSegmentMetadata.newBuilder();
@@ -82,18 +84,20 @@ class MutableBucket extends Bucket implements AutoCloseable 
{
 long currentTimestampUpperLimit = 0;
 long currentFirstTimestamp = 0L;
 while (!delayedIndexQueue.isEmpty()) {
-DelayedIndex delayedIndex = snapshotSegment.addIndexe();
-delayedIndexQueue.popToObject(delayedIndex);
-
-long timestamp = delayedIndex.getTimestamp();
+final long timestamp = delayedIndexQueue.peekTimestamp();
 if (currentTimestampUpperLimit == 0) {
 currentFirstTimestamp = timestamp;
 firstScheduleTimestamps.add(currentFirstTimestamp);
 currentTimestampUpperLimit = timestamp + 
timeStepPerBucketSnapshotSegment - 1;
 }
 
-long ledgerId = delayedIndex.getLedgerId();
-long entryId = delayedIndex.getEntryId();
+DelayedIndex delayedIndex = snapshotSegment.addIndexe();
+delayedIndexQueue.popToObject(delayedIndex);
+
+final long ledgerId = delayedIndex.getLedgerId();
+final long entryId = delayedIndex.getEntryId();
+
+removeIndexBit(ledgerId, entryId);
 
 checkArgument(ledgerId >= startLedgerId && ledgerId <= 
endLedgerId);
 
@@ -102,10 +106,10 @@ class MutableBucket extends Bucket implements 
AutoCloseable {
 sharedQueue.add(timestamp, ledgerId, entryId);
 }
 
-numMessages++;
-
 

[GitHub] [pulsar] poorbarcode commented on pull request #19974: [improve][schema] Add admin cli for testCompatibility

2023-05-07 Thread via GitHub


poorbarcode commented on PR #19974:
URL: https://github.com/apache/pulsar/pull/19974#issuecomment-1537563160

   Since this patch is a help tools API, I cherry-picked it into `branch-3.0`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[pulsar] 02/03: [fix][txn] Fix transaction is not aborted when send or ACK failed (#20055)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d5edcc449b4e8511697d16151721ada218c90a3e
Author: Yunze Xu 
AuthorDate: Thu Apr 20 18:11:52 2023 +0800

[fix][txn] Fix transaction is not aborted when send or ACK failed (#20055)

(cherry picked from commit 00d09cbbd2b3063fecbdc3988b5eef7824f40ce0)
---
 .../broker/transaction/TransactionProduceTest.java | 35 +--
 .../client/impl/transaction/TransactionImpl.java   | 70 ++
 2 files changed, 62 insertions(+), 43 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index cdbb1563280..ddd8cf07903 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import lombok.Cleanup;
@@ -43,6 +45,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
@@ -51,10 +54,11 @@ import org.apache.pulsar.common.api.proto.MarkerType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 /**
@@ -70,7 +74,7 @@ public class TransactionProduceTest extends 
TransactionTestBase {
 private static final String ACK_COMMIT_TOPIC = NAMESPACE1 + "/ack-commit";
 private static final String ACK_ABORT_TOPIC = NAMESPACE1 + "/ack-abort";
 private static final int NUM_PARTITIONS = 16;
-@BeforeMethod
+@BeforeClass
 protected void setup() throws Exception {
 setUpBase(1, NUM_PARTITIONS, PRODUCE_COMMIT_TOPIC, TOPIC_PARTITION);
 admin.topics().createPartitionedTopic(PRODUCE_ABORT_TOPIC, 
TOPIC_PARTITION);
@@ -78,7 +82,7 @@ public class TransactionProduceTest extends 
TransactionTestBase {
 admin.topics().createPartitionedTopic(ACK_ABORT_TOPIC, 
TOPIC_PARTITION);
 }
 
-@AfterMethod(alwaysRun = true)
+@AfterClass(alwaysRun = true)
 protected void cleanup() throws Exception {
 super.internalCleanup();
 }
@@ -369,5 +373,26 @@ public class TransactionProduceTest extends 
TransactionTestBase {
 return pendingAckCount;
 }
 
-
+@Test
+public void testCommitFailure() throws Exception {
+Transaction txn = pulsarClient.newTransaction().build().get();
+final String topic = NAMESPACE1 + "/test-commit-failure";
+@Cleanup
+final Producer producer = 
pulsarClient.newProducer().topic(topic).create();
+producer.newMessage(txn).value(new byte[1024 * 1024 * 10]).sendAsync();
+try {
+txn.commit().get();
+Assert.fail();
+} catch (ExecutionException e) {
+Assert.assertTrue(e.getCause() instanceof 
PulsarClientException.TransactionHasOperationFailedException);
+Assert.assertEquals(txn.getState(), Transaction.State.ABORTED);
+}
+try {
+
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txn.getTxnID())
+.getNow(null);
+Assert.fail();
+} catch (CompletionException e) {
+Assert.assertTrue(e.getCause() instanceof 
CoordinatorException.TransactionNotFoundException);
+}
+}
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index b7e085ed82a..d1260ba045e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ 

[pulsar] 01/03: [fix][broker] Fix entry filter feature for the non-persistent topic (#20141)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bafae5d3f48c6c858e48f3cfc7e0c7f212ece2f4
Author: ran 
AuthorDate: Thu Apr 20 16:18:06 2023 +0800

[fix][broker] Fix entry filter feature for the non-persistent topic (#20141)

(cherry picked from commit 575cf2331dd3ac0048923487c6c7904eda4301e6)
---
 .../service/nonpersistent/NonPersistentTopic.java  |  5 +--
 .../broker/service/plugin/FilterEntryTest.java | 15 +---
 .../pulsar/broker/stats/SubscriptionStatsTest.java | 42 +++---
 3 files changed, 36 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 317b8df6b9a..33258b06726 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -26,7 +26,7 @@ import com.carrotsearch.hppc.ObjectObjectHashMap;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.FastThreadLocal;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -199,7 +199,8 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
 // entry internally retains data so, duplicateBuffer should be 
release here
 duplicateBuffer.release();
 if (subscription.getDispatcher() != null) {
-
subscription.getDispatcher().sendMessages(Collections.singletonList(entry));
+// Dispatcher needs to call the set method to support entry 
filter feature.
+
subscription.getDispatcher().sendMessages(Arrays.asList(entry));
 } else {
 // it happens when subscription is created but dispatcher is 
not created as consumer is not added
 // yet
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index 4b9d91fbde2..b868858646c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -51,6 +51,7 @@ import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.EntryFilterSupport;
+import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
@@ -286,10 +287,16 @@ public class FilterEntryTest extends BrokerTestBase {
 
 }
 
+@DataProvider(name = "topicProvider")
+public Object[][] topicProvider() {
+return new Object[][]{
+{"persistent://prop/ns-abc/topic" + UUID.randomUUID()},
+{"non-persistent://prop/ns-abc/topic" + UUID.randomUUID()},
+};
+}
 
-@Test
-public void testFilteredMsgCount() throws Throwable {
-String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
+@Test(dataProvider = "topicProvider")
+public void testFilteredMsgCount(String topic) throws Throwable {
 String subName = "sub";
 
 try (Producer producer = 
pulsarClient.newProducer(Schema.STRING)
@@ -298,7 +305,7 @@ public class FilterEntryTest extends BrokerTestBase {
  .subscriptionName(subName).subscribe()) {
 
 // mock entry filters
-PersistentSubscription subscription = (PersistentSubscription) 
pulsar.getBrokerService()
+Subscription subscription = pulsar.getBrokerService()
 .getTopicReference(topic).get().getSubscription(subName);
 Dispatcher dispatcher = subscription.getDispatcher();
 Field field = 
EntryFilterSupport.class.getDeclaredField("entryFilters");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
index bf9c1d540bf..d5e0066a86f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
@@ -211,21 +211,22 @@ public class SubscriptionStatsTest extends 
ProducerConsumerBase {
 

[pulsar] 03/03: [improve][broker] Optimization protobuf code in the bucket delayed tracker (#20158)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0743c774fd6621a8ddcdf50b7f28dc8f68a6e492
Author: Cong Zhao 
AuthorDate: Fri Apr 21 09:51:55 2023 +0800

[improve][broker] Optimization protobuf code in the bucket delayed tracker 
(#20158)

(cherry picked from commit a20d5e96acc91f8099845e8924e84cb0b5632f3f)
---
 pulsar-broker/pom.xml  |  2 +
 .../bucket/BookkeeperBucketSnapshotStorage.java| 24 +++---
 .../pulsar/broker/delayed/bucket/Bucket.java   |  7 +-
 .../bucket/BucketDelayedDeliveryTracker.java   | 13 ++-
 .../delayed/bucket/BucketSnapshotStorage.java  |  4 +-
 .../bucket/CombinedSegmentDelayedIndexQueue.java   | 22 +++--
 .../broker/delayed/bucket/DelayedIndexQueue.java   | 12 ++-
 .../broker/delayed/bucket/ImmutableBucket.java | 19 ++---
 .../broker/delayed/bucket/MutableBucket.java   | 29 ---
 .../TripleLongPriorityDelayedIndexQueue.java   | 25 --
 ...oto => DelayedMessageIndexBucketMetadata.proto} | 11 +--
 ...roto => DelayedMessageIndexBucketSegment.proto} | 12 +--
 .../BookkeeperBucketSnapshotStorageTest.java   | 95 +++---
 .../broker/delayed/MockBucketSnapshotStorage.java  | 14 ++--
 .../delayed/bucket/DelayedIndexQueueTest.java  | 70 +++-
 15 files changed, 177 insertions(+), 182 deletions(-)

diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 31b335e1aea..25b31b4f2b7 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -566,6 +566,7 @@
   
 **/ResourceUsage.proto
 **/TransactionPendingAck.proto
+**/DelayedMessageIndexBucketSegment.proto
   
 
 
@@ -610,6 +611,7 @@
   
 
${project.basedir}/src/main/proto/TransactionPendingAck.proto
 
${project.basedir}/src/main/proto/ResourceUsage.proto
+
${project.basedir}/src/main/proto/DelayedMessageIndexBucketSegment.proto
   
   
generated-sources/lightproto/java
   
generated-sources/lightproto/java
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index 18a4c322f7b..040bbbc586f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.broker.delayed.bucket;
 
 import com.google.protobuf.InvalidProtocolBufferException;
-import java.io.IOException;
+import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
@@ -36,8 +36,8 @@ import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
-import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
 import org.apache.pulsar.common.util.FutureUtil;
 
 @Slf4j
@@ -126,7 +126,8 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
 
 private SnapshotMetadata parseSnapshotMetadataEntry(LedgerEntry 
ledgerEntry) {
 try {
-return SnapshotMetadata.parseFrom(ledgerEntry.getEntry());
+ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
+return SnapshotMetadata.parseFrom(entryBuffer.nioBuffer());
 } catch (InvalidProtocolBufferException e) {
 throw new BucketSnapshotSerializationException(e);
 }
@@ -134,15 +135,14 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
 
 private List 
parseSnapshotSegmentEntries(Enumeration entryEnumeration) {
 List snapshotMetadataList = new ArrayList<>();
-try {
-while (entryEnumeration.hasMoreElements()) {
-LedgerEntry ledgerEntry = entryEnumeration.nextElement();
-
snapshotMetadataList.add(SnapshotSegment.parseFrom(ledgerEntry.getEntry()));
-}
-return snapshotMetadataList;
-} catch (IOException e) {
-throw new BucketSnapshotSerializationException(e);
+while (entryEnumeration.hasMoreElements()) {
+LedgerEntry ledgerEntry = entryEnumeration.nextElement();
+SnapshotSegment snapshotSegment = new SnapshotSegment();
+ByteBuf 

[pulsar] branch branch-3.0 updated (38360d06ba1 -> 0743c774fd6)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 38360d06ba1 [revert] "[fix][broker] Fix tenant admin authorization 
bug. (#20068)" (#20143)
 new bafae5d3f48 [fix][broker] Fix entry filter feature for the 
non-persistent topic (#20141)
 new d5edcc449b4 [fix][txn] Fix transaction is not aborted when send or ACK 
failed (#20055)
 new 0743c774fd6 [improve][broker] Optimization protobuf code in the bucket 
delayed tracker (#20158)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pulsar-broker/pom.xml  |  2 +
 .../bucket/BookkeeperBucketSnapshotStorage.java| 24 +++---
 .../pulsar/broker/delayed/bucket/Bucket.java   |  7 +-
 .../bucket/BucketDelayedDeliveryTracker.java   | 13 ++-
 .../delayed/bucket/BucketSnapshotStorage.java  |  4 +-
 .../bucket/CombinedSegmentDelayedIndexQueue.java   | 22 +++--
 .../broker/delayed/bucket/DelayedIndexQueue.java   | 12 ++-
 .../broker/delayed/bucket/ImmutableBucket.java | 19 ++---
 .../broker/delayed/bucket/MutableBucket.java   | 29 ---
 .../TripleLongPriorityDelayedIndexQueue.java   | 25 --
 .../service/nonpersistent/NonPersistentTopic.java  |  5 +-
 ...oto => DelayedMessageIndexBucketMetadata.proto} | 11 +--
 ...roto => DelayedMessageIndexBucketSegment.proto} | 12 +--
 .../BookkeeperBucketSnapshotStorageTest.java   | 95 +++---
 .../broker/delayed/MockBucketSnapshotStorage.java  | 14 ++--
 .../delayed/bucket/DelayedIndexQueueTest.java  | 70 +++-
 .../broker/service/plugin/FilterEntryTest.java | 15 +++-
 .../pulsar/broker/stats/SubscriptionStatsTest.java | 42 +-
 .../broker/transaction/TransactionProduceTest.java | 35 ++--
 .../client/impl/transaction/TransactionImpl.java   | 70 
 20 files changed, 275 insertions(+), 251 deletions(-)
 copy 
pulsar-broker/src/main/proto/{DelayedMessageIndexBucketSnapshotFormat.proto => 
DelayedMessageIndexBucketMetadata.proto} (85%)
 rename 
pulsar-broker/src/main/proto/{DelayedMessageIndexBucketSnapshotFormat.proto => 
DelayedMessageIndexBucketSegment.proto} (80%)



[pulsar] 03/05: [improve][broker] Cache LedgerHandle in BookkeeperBucketSnapshotStorage (#20117)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3c4a1ce026db0602cda1d3c65ce9211e9e88f02b
Author: Cong Zhao 
AuthorDate: Thu Apr 20 09:26:30 2023 +0800

[improve][broker] Cache LedgerHandle in BookkeeperBucketSnapshotStorage 
(#20117)

(cherry picked from commit d3fa998aa7c0a7a9452079ef2ff05bccf6b273cf)
---
 .../bucket/BookkeeperBucketSnapshotStorage.java| 52 +++---
 .../BookkeeperBucketSnapshotStorageTest.java   | 43 ++
 2 files changed, 69 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index 9c30ccf1c0b..18a4c322f7b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import javax.validation.constraints.NotNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
@@ -48,6 +49,8 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
 private final ServiceConfiguration config;
 private BookKeeper bookKeeper;
 
+private final Map> 
ledgerHandleFutureCache = new ConcurrentHashMap<>();
+
 public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
 this.pulsar = pulsar;
 this.config = pulsar.getConfig();
@@ -66,45 +69,30 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
 
 @Override
 public CompletableFuture getBucketSnapshotMetadata(long 
bucketId) {
-return openLedger(bucketId).thenCompose(ledgerHandle -> {
-CompletableFuture snapshotFuture =
-getLedgerEntry(ledgerHandle, 0, 0)
-.thenApply(entryEnumeration -> 
parseSnapshotMetadataEntry(entryEnumeration.nextElement()));
-
-snapshotFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
-
-return snapshotFuture;
-});
+return getLedgerHandle(bucketId).thenCompose(ledgerHandle -> 
getLedgerEntry(ledgerHandle, 0, 0)
+.thenApply(entryEnumeration -> 
parseSnapshotMetadataEntry(entryEnumeration.nextElement(;
 }
 
 @Override
 public CompletableFuture> 
getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
  
long lastSegmentEntryId) {
-return openLedger(bucketId).thenCompose(ledgerHandle -> {
-CompletableFuture> parseFuture =
-getLedgerEntry(ledgerHandle, firstSegmentEntryId, 
lastSegmentEntryId)
-.thenApply(this::parseSnapshotSegmentEntries);
-
-parseFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
-
-return parseFuture;
-});
+return getLedgerHandle(bucketId).thenCompose(
+ledgerHandle -> getLedgerEntry(ledgerHandle, 
firstSegmentEntryId, lastSegmentEntryId)
+.thenApply(this::parseSnapshotSegmentEntries));
 }
 
 @Override
 public CompletableFuture getBucketSnapshotLength(long bucketId) {
-return openLedger(bucketId).thenCompose(ledgerHandle -> {
-CompletableFuture lengthFuture =
-
CompletableFuture.completedFuture(ledgerHandle.getLength());
-
-lengthFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
-
-return lengthFuture;
-});
+return getLedgerHandle(bucketId).thenCompose(
+ledgerHandle -> 
CompletableFuture.completedFuture(ledgerHandle.getLength()));
 }
 
 @Override
 public CompletableFuture deleteBucketSnapshot(long bucketId) {
+CompletableFuture ledgerHandleFuture = 
ledgerHandleFutureCache.remove(bucketId);
+if (ledgerHandleFuture != null) {
+ledgerHandleFuture.whenComplete((lh, ex) -> closeLedger(lh));
+}
 return deleteLedger(bucketId);
 }
 
@@ -178,6 +166,18 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
 return future;
 }
 
+private CompletableFuture getLedgerHandle(Long ledgerId) {
+CompletableFuture ledgerHandleCompletableFuture =
+ledgerHandleFutureCache.computeIfAbsent(ledgerId, k -> 
openLedger(ledgerId));
+// remove future of completed exceptionally
+ledgerHandleCompletableFuture.whenComplete((__, ex) -> {
+   

[pulsar] 04/05: [fix][broker] Producer/Consumer should call allowTopicOperationAsync (#20142)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 800a82a3a9f0d480b594cf8268410ada00b4d313
Author: Michael Marshall 
AuthorDate: Thu Apr 20 00:17:02 2023 -0500

[fix][broker] Producer/Consumer should call allowTopicOperationAsync 
(#20142)

Fixes: https://github.com/apache/pulsar/issues/20066

### Motivation

In https://github.com/apache/pulsar/pull/20068 we changed the way that the 
`AuthorizationService` is implemented. I think we should change the `Consumer` 
and the `Producer` logic to call the correct `AuthorizationService` method.

Given that our goal is to deprecate the `AuthorizationService` methods for 
`canProduce` and `canConsume`, this change helps us move in the right direction.

### Modifications

* Update `Producer` and `Consumer` in broker to call the 
`AuthorizationService#allowTopicOperationAsync` method.

### Verifying this change

This change is trivial.

### Documentation

- [x] `doc-not-needed`

### Matching PR in forked repository

PR in forked repository: Skipping PR as I ran tests locally.

(cherry picked from commit dc5e497b2a42910146690c6b4e922e0a80bf1587)
---
 .../src/main/java/org/apache/pulsar/broker/service/Consumer.java  | 8 ++--
 .../src/main/java/org/apache/pulsar/broker/service/Producer.java  | 3 ++-
 2 files changed, 8 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 1ee3f513ef2..a3f9da41e6b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -43,6 +43,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.MessageId;
@@ -56,6 +57,7 @@ import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -901,8 +903,10 @@ public class Consumer {
 public CompletableFuture checkPermissionsAsync() {
 TopicName topicName = TopicName.get(subscription.getTopicName());
 if (cnx.getBrokerService().getAuthorizationService() != null) {
-return 
cnx.getBrokerService().getAuthorizationService().canConsumeAsync(topicName, 
appId,
-cnx.getAuthenticationData(), 
subscription.getName())
+AuthenticationDataSubscription authData =
+new 
AuthenticationDataSubscription(cnx.getAuthenticationData(), 
subscription.getName());
+return cnx.getBrokerService().getAuthorizationService()
+.allowTopicOperationAsync(topicName, 
TopicOperation.CONSUME, appId, authData)
 .handle((ok, e) -> {
 if (e != null) {
 log.warn("[{}] Get unexpected error while 
authorizing [{}]  {}", appId,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 5b62e3261e6..53b79f06e8e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -51,6 +51,7 @@ import org.apache.pulsar.common.api.proto.ProducerAccessMode;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import 
org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
 import org.apache.pulsar.common.protocol.Commands;
@@ -781,7 +782,7 @@ public class Producer {
 TopicName topicName = TopicName.get(topic.getName());
 if (cnx.getBrokerService().getAuthorizationService() != null) {
 return 

[pulsar] 05/05: [revert] "[fix][broker] Fix tenant admin authorization bug. (#20068)" (#20143)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 38360d06ba16fbf6339d325ae200d603850b47ec
Author: Michael Marshall 
AuthorDate: Thu Apr 20 00:17:22 2023 -0500

[revert] "[fix][broker] Fix tenant admin authorization bug. (#20068)" 
(#20143)

This reverts commit fc17c1d98a3c1edd975c131d174a9ef69887d9cd.

### Motivation

In https://github.com/apache/pulsar/pull/20068 we changed the way that the 
`AuthorizationService` is implemented. I think this approach could have 
unintended consequences. Instead, I think we should change the `Consumer` and 
the `Producer` logic to call the correct `AuthorizationService` method. I 
propose an update to the `Consumer` and `Producer` logic here #20142.

Given that our goal is to deprecate the `AuthorizationService` methods for 
`canProduce` and `canConsume`, I think we should not change their 
implementations.

### Modifications

* Revert https://github.com/apache/pulsar/pull/20068

### Verifying this change

This change is trivial. It removes certain test changes that were only made 
to make the previous PR work.

### Documentation

- [x] `doc-not-needed`

### Matching PR in forked repository

PR in forked repository: Skipping PR as I ran tests locally.

(cherry picked from commit 00dc7a0691b496065dba6af0c6de64af4973886e)
---
 .../broker/authorization/AuthorizationService.java | 27 +-
 .../pulsar/broker/auth/AuthorizationTest.java  | 22 --
 .../api/AuthorizationProducerConsumerTest.java | 16 -
 .../websocket/proxy/ProxyAuthorizationTest.java| 14 +++
 4 files changed, 33 insertions(+), 46 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index a9225f5e48f..0c61219b57a 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -28,7 +28,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
-import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
 import org.apache.pulsar.broker.authentication.AuthenticationParameters;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -183,7 +182,13 @@ public class AuthorizationService {
 if (!this.conf.isAuthorizationEnabled()) {
 return CompletableFuture.completedFuture(true);
 }
-return provider.allowTopicOperationAsync(topicName, role, 
TopicOperation.PRODUCE, authenticationData);
+return provider.isSuperUser(role, authenticationData, 
conf).thenComposeAsync(isSuperUser -> {
+if (isSuperUser) {
+return CompletableFuture.completedFuture(true);
+} else {
+return provider.canProduceAsync(topicName, role, 
authenticationData);
+}
+});
 }
 
 /**
@@ -202,9 +207,13 @@ public class AuthorizationService {
 if (!this.conf.isAuthorizationEnabled()) {
 return CompletableFuture.completedFuture(true);
 }
-
-return provider.allowTopicOperationAsync(topicName, role, 
TopicOperation.CONSUME,
-new AuthenticationDataSubscription(authenticationData, 
subscription));
+return provider.isSuperUser(role, authenticationData, 
conf).thenComposeAsync(isSuperUser -> {
+if (isSuperUser) {
+return CompletableFuture.completedFuture(true);
+} else {
+return provider.canConsumeAsync(topicName, role, 
authenticationData, subscription);
+}
+});
 }
 
 public boolean canProduce(TopicName topicName, String role, 
AuthenticationDataSource authenticationData)
@@ -280,7 +289,13 @@ public class AuthorizationService {
 if (!this.conf.isAuthorizationEnabled()) {
 return CompletableFuture.completedFuture(true);
 }
-return provider.allowTopicOperationAsync(topicName, role, 
TopicOperation.LOOKUP, authenticationData);
+return provider.isSuperUser(role, authenticationData, 
conf).thenComposeAsync(isSuperUser -> {
+if (isSuperUser) {
+return CompletableFuture.completedFuture(true);
+} else {
+return provider.canLookupAsync(topicName, role, 
authenticationData);
+}
+});
 }
 
 public 

[pulsar] 02/05: [improve][broker] Optimize delayed metadata index bitmap (#20136)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 17b59a23244d6b00031cddd5ee33ba8aac2df6f4
Author: Qiang Zhao 
AuthorDate: Thu Apr 20 08:21:08 2023 +0800

[improve][broker] Optimize delayed metadata index bitmap (#20136)

(cherry picked from commit 6dc0b0ea38ab9dc410a24b19fd7567b9e013837d)
---
 .../bucket/BucketDelayedDeliveryTracker.java   |  3 ++
 .../broker/delayed/bucket/ImmutableBucket.java | 53 +-
 .../broker/delayed/bucket/MutableBucket.java   | 11 +++--
 3 files changed, 42 insertions(+), 25 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index b4d1745e22f..b17387e276e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -485,6 +485,9 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 });
 });
 }
+
+// optimize bm
+
delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize);
 
immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap);
 
 
afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
index 82e98cefa5d..57de5c84fcd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -20,11 +20,10 @@ package org.apache.pulsar.broker.delayed.bucket;
 
 import static org.apache.bookkeeper.mledger.util.Futures.executeWithRetry;
 import static 
org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.NULL_LONG_PROMISE;
-import com.google.protobuf.ByteString;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
@@ -37,8 +36,8 @@ import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotF
 import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
 import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.roaringbitmap.InvalidRoaringFormat;
 import org.roaringbitmap.RoaringBitmap;
-import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 
 @Slf4j
 class ImmutableBucket extends Bucket {
@@ -98,7 +97,7 @@ class ImmutableBucket extends Bucket {
 this.setLastSegmentEntryId(metadataList.size());
 
this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex, metadataList);
 List firstScheduleTimestamps = 
metadataList.stream().map(
-
SnapshotSegmentMetadata::getMinScheduleTimestamp).toList();
+
SnapshotSegmentMetadata::getMinScheduleTimestamp).toList();
 
this.setFirstScheduleTimestamps(firstScheduleTimestamps);
 
 return nextSnapshotEntryIndex + 1;
@@ -139,25 +138,37 @@ class ImmutableBucket extends Bucket {
 });
 }
 
+/**
+ * Recover delayed index bit map and message numbers.
+ * @throws InvalidRoaringFormat invalid bitmap serialization format
+ */
 private void recoverDelayedIndexBitMapAndNumber(int startSnapshotIndex,
-
List segmentMetadata) {
-this.delayedIndexBitMap.clear();
-MutableLong numberMessages = new MutableLong(0);
-for (int i = startSnapshotIndex; i < segmentMetadata.size(); i++) {
-Map bitByteStringMap = 
segmentMetadata.get(i).getDelayedIndexBitMapMap();
-bitByteStringMap.forEach((leaderId, bitSetString) -> {
-boolean exist = this.delayedIndexBitMap.containsKey(leaderId);
-RoaringBitmap bitSet =
-new 
ImmutableRoaringBitmap(bitSetString.asReadOnlyByteBuffer()).toRoaringBitmap();
-numberMessages.add(bitSet.getCardinality());
-if (!exist) {
-

[pulsar] 01/05: [fix][broker] Fix tenant admin authorization bug. (#20068)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4c5f9d2acf94fab01206258ffbb1068bd9afd97e
Author: Shen Liu 
AuthorDate: Wed Apr 19 23:40:28 2023 +0800

[fix][broker] Fix tenant admin authorization bug. (#20068)

Co-authored-by: druidliu 
(cherry picked from commit fc17c1d98a3c1edd975c131d174a9ef69887d9cd)
---
 .../broker/authorization/AuthorizationService.java | 27 +-
 .../pulsar/broker/auth/AuthorizationTest.java  | 22 ++
 .../api/AuthorizationProducerConsumerTest.java | 16 +
 .../websocket/proxy/ProxyAuthorizationTest.java| 14 ---
 4 files changed, 46 insertions(+), 33 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 0c61219b57a..a9225f5e48f 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
 import org.apache.pulsar.broker.authentication.AuthenticationParameters;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -182,13 +183,7 @@ public class AuthorizationService {
 if (!this.conf.isAuthorizationEnabled()) {
 return CompletableFuture.completedFuture(true);
 }
-return provider.isSuperUser(role, authenticationData, 
conf).thenComposeAsync(isSuperUser -> {
-if (isSuperUser) {
-return CompletableFuture.completedFuture(true);
-} else {
-return provider.canProduceAsync(topicName, role, 
authenticationData);
-}
-});
+return provider.allowTopicOperationAsync(topicName, role, 
TopicOperation.PRODUCE, authenticationData);
 }
 
 /**
@@ -207,13 +202,9 @@ public class AuthorizationService {
 if (!this.conf.isAuthorizationEnabled()) {
 return CompletableFuture.completedFuture(true);
 }
-return provider.isSuperUser(role, authenticationData, 
conf).thenComposeAsync(isSuperUser -> {
-if (isSuperUser) {
-return CompletableFuture.completedFuture(true);
-} else {
-return provider.canConsumeAsync(topicName, role, 
authenticationData, subscription);
-}
-});
+
+return provider.allowTopicOperationAsync(topicName, role, 
TopicOperation.CONSUME,
+new AuthenticationDataSubscription(authenticationData, 
subscription));
 }
 
 public boolean canProduce(TopicName topicName, String role, 
AuthenticationDataSource authenticationData)
@@ -289,13 +280,7 @@ public class AuthorizationService {
 if (!this.conf.isAuthorizationEnabled()) {
 return CompletableFuture.completedFuture(true);
 }
-return provider.isSuperUser(role, authenticationData, 
conf).thenComposeAsync(isSuperUser -> {
-if (isSuperUser) {
-return CompletableFuture.completedFuture(true);
-} else {
-return provider.canLookupAsync(topicName, role, 
authenticationData);
-}
-});
+return provider.allowTopicOperationAsync(topicName, role, 
TopicOperation.LOOKUP, authenticationData);
 }
 
 public CompletableFuture allowFunctionOpsAsync(NamespaceName 
namespaceName, String role,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
index 58cf4ee418e..4fce7c50e1c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
@@ -79,10 +79,13 @@ public class AuthorizationTest extends 
MockedPulsarServiceBaseTest {
 public void simple() throws Exception {
 AuthorizationService auth = 
pulsar.getBrokerService().getAuthorizationService();
 
-
assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), 
"my-role", null));
-
+try {
+
assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), 
"my-role", null));
+fail("Should throw exception when tenant not exist");
+} catch (Exception ignored) {}
 

[pulsar] branch branch-3.0 updated (d08c3cbc704 -> 38360d06ba1)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from d08c3cbc704 [improve][broker] Make timer execute immediately after 
load index (#20126)
 new 4c5f9d2acf9 [fix][broker] Fix tenant admin authorization bug. (#20068)
 new 17b59a23244 [improve][broker] Optimize delayed metadata index bitmap 
(#20136)
 new 3c4a1ce026d [improve][broker] Cache LedgerHandle in 
BookkeeperBucketSnapshotStorage (#20117)
 new 800a82a3a9f [fix][broker] Producer/Consumer should call 
allowTopicOperationAsync (#20142)
 new 38360d06ba1 [revert] "[fix][broker] Fix tenant admin authorization 
bug. (#20068)" (#20143)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bucket/BookkeeperBucketSnapshotStorage.java| 52 ++---
 .../bucket/BucketDelayedDeliveryTracker.java   |  3 ++
 .../broker/delayed/bucket/ImmutableBucket.java | 53 +-
 .../broker/delayed/bucket/MutableBucket.java   | 11 +++--
 .../org/apache/pulsar/broker/service/Consumer.java |  8 +++-
 .../org/apache/pulsar/broker/service/Producer.java |  3 +-
 .../BookkeeperBucketSnapshotStorageTest.java   | 43 ++
 7 files changed, 119 insertions(+), 54 deletions(-)



[GitHub] [pulsar] poorbarcode commented on pull request #20143: [revert] "[fix][broker] Fix tenant admin authorization bug. (#20068)"

2023-05-07 Thread via GitHub


poorbarcode commented on PR #20143:
URL: https://github.com/apache/pulsar/pull/20143#issuecomment-1537561223

   @michaeljmarshall 
   
   To avoid conflicts, I cherry-picked all three patches into branch 3.1.
   - https://github.com/apache/pulsar/pull/20068
   - https://github.com/apache/pulsar/pull/20142
   - https://github.com/apache/pulsar/pull/20143


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] poorbarcode commented on pull request #20142: [fix][broker] Producer/Consumer should call allowTopicOperationAsync

2023-05-07 Thread via GitHub


poorbarcode commented on PR #20142:
URL: https://github.com/apache/pulsar/pull/20142#issuecomment-1537561187

   @michaeljmarshall 
   
   To avoid conflicts, I cherry-picked all three patches into branch 3.1.
   - https://github.com/apache/pulsar/pull/20068
   - https://github.com/apache/pulsar/pull/20142
   - https://github.com/apache/pulsar/pull/20143


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] poorbarcode commented on pull request #20068: [fix][broker] Fix tenant admin authorization bug.

2023-05-07 Thread via GitHub


poorbarcode commented on PR #20068:
URL: https://github.com/apache/pulsar/pull/20068#issuecomment-1537561118

   @michaeljmarshall 
   
   To avoid conflicts, I cherry-picked all three patches into branch 3.1.
   - https://github.com/apache/pulsar/pull/20068
   - https://github.com/apache/pulsar/pull/20142
   - https://github.com/apache/pulsar/pull/20143


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[pulsar] branch branch-3.0 updated: [improve][broker] Make timer execute immediately after load index (#20126)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new d08c3cbc704 [improve][broker] Make timer execute immediately after 
load index (#20126)
d08c3cbc704 is described below

commit d08c3cbc70468011430bd7482431d34014c90ed5
Author: Cong Zhao 
AuthorDate: Wed Apr 19 16:44:12 2023 +0800

[improve][broker] Make timer execute immediately after load index (#20126)

(cherry picked from commit 9b723022436cc1a150af765103e8d343679f92ce)
---
 .../broker/delayed/bucket/BucketDelayedDeliveryTracker.java   | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index f57248acbb7..b4d1745e22f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -541,7 +541,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 
 @Override
 public synchronized NavigableSet getScheduledMessages(int 
maxMessages) {
-if (!checkPendingOpDone()) {
+if (!checkPendingLoadDone()) {
 if (log.isDebugEnabled()) {
 log.debug("[{}] Skip getScheduledMessages to wait for bucket 
snapshot load finish.",
 dispatcher.getName());
@@ -628,11 +628,11 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 if (timeout != null) {
 timeout.cancel();
 }
-timeout = timer.newTimeout(this, tickTimeMillis, 
TimeUnit.MILLISECONDS);
+timeout = timer.newTimeout(this, 0, 
TimeUnit.MILLISECONDS);
 }
 });
 
-if (!checkPendingOpDone() || 
loadFuture.isCompletedExceptionally()) {
+if (!checkPendingLoadDone() || 
loadFuture.isCompletedExceptionally()) {
 break;
 }
 }
@@ -651,7 +651,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 return positions;
 }
 
-private synchronized boolean checkPendingOpDone() {
+private synchronized boolean checkPendingLoadDone() {
 if (pendingLoad == null || pendingLoad.isDone()) {
 pendingLoad = null;
 return true;



[pulsar] branch branch-3.0 updated: [fix][sec] spring.version=5.3.27 to fix CVE-2023-20863 (#20124)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 88e05b5b9ee [fix][sec] spring.version=5.3.27 to fix CVE-2023-20863 
(#20124)
88e05b5b9ee is described below

commit 88e05b5b9ee65d245276dfdca87e9200206fede6
Author: tison 
AuthorDate: Wed Apr 19 14:45:29 2023 +0800

[fix][sec] spring.version=5.3.27 to fix CVE-2023-20863 (#20124)

Signed-off-by: tison 
(cherry picked from commit 866d405726bb6527db954201f2260620181747dc)
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index e7e00db74e4..252cedbf659 100644
--- a/pom.xml
+++ b/pom.xml
@@ -238,7 +238,7 @@ flexible messaging model and an intuitive client 
API.
 1.6.0
 1.0
 9.1.6
-5.3.26
+5.3.27
 4.5.13
 4.4.15
 0.5.11



[pulsar] branch branch-3.0 updated: [fix][io] KCA: handle kafka sources that use commitRecord (#20121)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new dc339df94ab [fix][io] KCA: handle kafka sources that use commitRecord 
(#20121)
dc339df94ab is described below

commit dc339df94abf89e8f8513c37adeb261931489813
Author: Andrey Yegorov <8622884+dl...@users.noreply.github.com>
AuthorDate: Tue Apr 18 14:37:41 2023 -0700

[fix][io] KCA: handle kafka sources that use commitRecord (#20121)

(cherry picked from commit 46a65fdc182a34753ebabfa35f63c0f6c765462f)
---
 .../io/kafka/connect/KafkaConnectSource.java   |  43 +++-
 .../kafka/connect/ErrRecFileStreamSourceTask.java  |  33 ++
 .../connect/KafkaConnectSourceErrRecTest.java  | 118 +
 3 files changed, 193 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
index f5f6efd08bd..f2ee8a8e6ca 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
@@ -27,6 +27,8 @@ import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.pulsar.client.api.Schema;
@@ -57,6 +59,7 @@ public class KafkaConnectSource extends 
AbstractKafkaConnectSource>
+public class KafkaSourceRecord extends 
AbstractKafkaSourceRecord>
 implements KVRecord {
 
+final int keySize;
+final int valueSize;
+
+final SourceRecord srcRecord;
+
 KafkaSourceRecord(SourceRecord srcRecord) {
 super(srcRecord);
+this.srcRecord = srcRecord;
+
 byte[] keyBytes = keyConverter.fromConnectData(
 srcRecord.topic(), srcRecord.keySchema(), srcRecord.key());
+keySize = keyBytes != null ? keyBytes.length : 0;
 this.key = keyBytes != null ? 
Optional.of(Base64.getEncoder().encodeToString(keyBytes)) : Optional.empty();
 
 byte[] valueBytes = valueConverter.fromConnectData(
 srcRecord.topic(), srcRecord.valueSchema(), 
srcRecord.value());
+valueSize = valueBytes != null ? valueBytes.length : 0;
 
 this.value = new KeyValue<>(keyBytes, valueBytes);
 
@@ -145,6 +157,35 @@ public class KafkaConnectSource extends 
AbstractKafkaConnectSourcehttp://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.connect.file.FileStreamSourceTask;
+import org.apache.kafka.connect.source.SourceRecord;
+
+public class ErrRecFileStreamSourceTask extends FileStreamSourceTask {
+
+@Override
+public void commitRecord(SourceRecord record, RecordMetadata metadata) 
throws InterruptedException {
+throw new org.apache.kafka.connect.errors.ConnectException("blah");
+}
+
+}
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceErrRecTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceErrRecTest.java
new file mode 100644
index 000..9872e1fbc7e
--- /dev/null
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceErrRecTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 

[pulsar] branch branch-3.0 updated: [improve] [broker] Close temporary open ledger in BookkeeperBucketSnapshotStorage (#20111)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new d7f97dd6a7b [improve] [broker] Close temporary open ledger in 
BookkeeperBucketSnapshotStorage (#20111)
d7f97dd6a7b is described below

commit d7f97dd6a7b6cf8f063c3c9a17bc6e58eaca1caa
Author: lifepuzzlefun 
AuthorDate: Mon Apr 17 19:32:13 2023 +0800

[improve] [broker] Close temporary open ledger in 
BookkeeperBucketSnapshotStorage (#20111)

(cherry picked from commit b50e8802a5224dd68832e263e7046650771a1a4e)
---
 .../bucket/BookkeeperBucketSnapshotStorage.java| 33 +-
 1 file changed, 26 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index e7d4f9301dd..9c30ccf1c0b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -66,22 +66,41 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
 
 @Override
 public CompletableFuture getBucketSnapshotMetadata(long 
bucketId) {
-return openLedger(bucketId).thenCompose(
-ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0).
-thenApply(entryEnumeration -> 
parseSnapshotMetadataEntry(entryEnumeration.nextElement(;
+return openLedger(bucketId).thenCompose(ledgerHandle -> {
+CompletableFuture snapshotFuture =
+getLedgerEntry(ledgerHandle, 0, 0)
+.thenApply(entryEnumeration -> 
parseSnapshotMetadataEntry(entryEnumeration.nextElement()));
+
+snapshotFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
+
+return snapshotFuture;
+});
 }
 
 @Override
 public CompletableFuture> 
getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
  
long lastSegmentEntryId) {
-return openLedger(bucketId).thenCompose(
-ledgerHandle -> getLedgerEntry(ledgerHandle, 
firstSegmentEntryId,
-
lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries));
+return openLedger(bucketId).thenCompose(ledgerHandle -> {
+CompletableFuture> parseFuture =
+getLedgerEntry(ledgerHandle, firstSegmentEntryId, 
lastSegmentEntryId)
+.thenApply(this::parseSnapshotSegmentEntries);
+
+parseFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
+
+return parseFuture;
+});
 }
 
 @Override
 public CompletableFuture getBucketSnapshotLength(long bucketId) {
-return openLedger(bucketId).thenApply(LedgerHandle::getLength);
+return openLedger(bucketId).thenCompose(ledgerHandle -> {
+CompletableFuture lengthFuture =
+
CompletableFuture.completedFuture(ledgerHandle.getLength());
+
+lengthFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
+
+return lengthFuture;
+});
 }
 
 @Override



[pulsar] branch branch-3.0 updated: [improve][ci]Add branch-3.0 to owasp checks (#20081)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new add2594c0fa [improve][ci]Add branch-3.0 to owasp checks (#20081)
add2594c0fa is described below

commit add2594c0fa9c472232338ea94d9467fd9bb6df3
Author: Christophe Bornet 
AuthorDate: Thu Apr 13 08:51:52 2023 +0200

[improve][ci]Add branch-3.0 to owasp checks (#20081)

(cherry picked from commit 4d8156fca0582a350d30557d9243c851c4a45830)
---
 .github/workflows/ci-owasp-dependency-check.yaml | 1 +
 1 file changed, 1 insertion(+)

diff --git a/.github/workflows/ci-owasp-dependency-check.yaml 
b/.github/workflows/ci-owasp-dependency-check.yaml
index dba1c92ca28..8aec501f8d8 100644
--- a/.github/workflows/ci-owasp-dependency-check.yaml
+++ b/.github/workflows/ci-owasp-dependency-check.yaml
@@ -39,6 +39,7 @@ jobs:
   matrix:
 include:
   - branch: master
+  - branch: branch-3.0
   - branch: branch-2.11
   - branch: branch-2.10
 jdk: 11



[pulsar] 01/02: [fix][doc] Add link to PIP proposal (#19911)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b47af87a4ff4438549b7e9f3242ef5e3f7e33e11
Author: Asaf Mesika 
AuthorDate: Sun Apr 16 17:05:46 2023 +0300

[fix][doc] Add link to PIP proposal (#19911)

### Motivation

Add a link to see existing PIP issues so you can select a number

(cherry picked from commit 47b24b071353f9285704e29e397b851e31df4037)
---
 wiki/proposals/PIP.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/wiki/proposals/PIP.md b/wiki/proposals/PIP.md
index e10452b107f..159c9199895 100644
--- a/wiki/proposals/PIP.md
+++ b/wiki/proposals/PIP.md
@@ -80,7 +80,7 @@ The process works in the following way:
 1. The author(s) of the proposal will create a GitHub issue ticket choosing the
template for PIP proposals. The issue title should be "PIP-xxx: title", 
where
the "xxx" number should be chosen to be the next number from the existing 
PIP 
-   issues, listed [here]([url](https://github.com/apache/pulsar/labels/PIP)).
+   issues, listed 
[here](https://github.com/apache/pulsar/issues?q=is%3Aissue+label%3APIP+)
 2. The author(s) will send a note to the d...@pulsar.apache.org mailing list
to start the discussion, using subject prefix `[DISCUSS] PIP-xxx: {PIP 
TITLE}`. The discussion
need to happen in the mailing list. Please avoid discussing it using



[pulsar] branch branch-3.0 updated (63646567917 -> 9f98caa6032)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 63646567917 [fix][test]Fix flaky test produceCommitTest (#20006)
 new b47af87a4ff [fix][doc] Add link to PIP proposal (#19911)
 new 9f98caa6032 [fix][doc] ConnectorDocGenerator support Java 9+ (#20100)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../pulsar/io/docs/ConnectorDocGenerator.java  | 69 --
 .../io/elasticsearch/ElasticSearchConfig.java  |  5 ++
 wiki/proposals/PIP.md  |  2 +-
 3 files changed, 32 insertions(+), 44 deletions(-)



[pulsar] 02/02: [fix][doc] ConnectorDocGenerator support Java 9+ (#20100)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9f98caa6032f9e8ae0fbd5aecb8a5b07713853b8
Author: tison 
AuthorDate: Mon Apr 17 10:59:32 2023 +0800

[fix][doc] ConnectorDocGenerator support Java 9+ (#20100)

Signed-off-by: tison 
(cherry picked from commit b7eab9469177eda2c56e36bb9871aab48a17d4ec)
---
 .../pulsar/io/docs/ConnectorDocGenerator.java  | 69 --
 .../io/elasticsearch/ElasticSearchConfig.java  |  5 ++
 2 files changed, 31 insertions(+), 43 deletions(-)

diff --git 
a/pulsar-io/docs/src/main/java/org/apache/pulsar/io/docs/ConnectorDocGenerator.java
 
b/pulsar-io/docs/src/main/java/org/apache/pulsar/io/docs/ConnectorDocGenerator.java
index fe12b2b11ce..fec7b120879 100644
--- 
a/pulsar-io/docs/src/main/java/org/apache/pulsar/io/docs/ConnectorDocGenerator.java
+++ 
b/pulsar-io/docs/src/main/java/org/apache/pulsar/io/docs/ConnectorDocGenerator.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.io.docs;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.google.common.base.Strings;
+import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
@@ -28,11 +29,9 @@ import java.io.PrintWriter;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.net.URL;
-import java.net.URLClassLoader;
 import java.nio.charset.StandardCharsets;
-import java.nio.file.Paths;
+import java.nio.file.Path;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import lombok.extern.slf4j.Slf4j;
@@ -47,21 +46,12 @@ public class ConnectorDocGenerator {
 private static final String INDENT = "  ";
 
 private static Reflections newReflections() throws Exception {
-List urls = new ArrayList<>();
-ClassLoader[] classLoaders = new ClassLoader[] {
-ConnectorDocGenerator.class.getClassLoader(),
-Thread.currentThread().getContextClassLoader()
-};
-for (int i = 0; i < classLoaders.length; i++) {
-if (classLoaders[i] instanceof URLClassLoader) {
-urls.addAll(Arrays.asList(((URLClassLoader) 
classLoaders[i]).getURLs()));
-} else {
-throw new RuntimeException("ClassLoader '" + classLoaders[i] + 
" is not an instance of URLClassLoader");
-}
+final String[] classpathList = 
System.getProperty("java.class.path").split(":");
+final List urlList = new ArrayList<>();
+for (String file : classpathList) {
+urlList.add(new File(file).toURI().toURL());
 }
-ConfigurationBuilder confBuilder = new ConfigurationBuilder();
-confBuilder.setUrls(urls);
-return new Reflections(confBuilder);
+return new Reflections(new ConfigurationBuilder().setUrls(urlList));
 }
 
 private final Reflections reflections;
@@ -70,7 +60,7 @@ public class ConnectorDocGenerator {
 this.reflections = newReflections();
 }
 
-private void generateConnectorYaml(Class configClass, PrintWriter writer) {
+private void generateConnectorYamlFile(Class configClass, PrintWriter 
writer) {
 log.info("Processing connector config class : {}", configClass);
 
 writer.println("configs:");
@@ -82,7 +72,9 @@ public class ConnectorDocGenerator {
 }
 FieldDoc fieldDoc = field.getDeclaredAnnotation(FieldDoc.class);
 if (null == fieldDoc) {
-throw new RuntimeException("Missing `FieldDoc` for field '" + 
field.getName() + "'");
+final String message = "Missing FieldDoc for field '%s' in 
class '%s'."
+.formatted(field.getName(), 
configClass.getCanonicalName());
+throw new RuntimeException(message);
 }
 writer.println(INDENT + "# " + fieldDoc.help());
 String fieldPrefix = "";
@@ -99,28 +91,28 @@ public class ConnectorDocGenerator {
 writer.flush();
 }
 
-private void generateConnectorYaml(Class connectorClass, Connector 
connectorDef, PrintWriter writer) {
+private void generateConnectorYamlFile(Class connectorClass, Connector 
connectorDef, PrintWriter writer) {
 log.info("Processing connector definition : {}", connectorDef);
 writer.println("# " + connectorDef.type() + " connector : " + 
connectorClass.getName());
 writer.println();
 writer.println("# " + connectorDef.help());
 writer.println();
-generateConnectorYaml(connectorDef.configClass(), writer);
+generateConnectorYamlFile(connectorDef.configClass(), writer);
 }
 
-private void generatorConnectorYamls(String outputDir) throws IOException  
{
+private void generatorConnectorYamlFiles(String outputDir) throws 
IOException {

[pulsar] branch branch-3.0 updated: [fix][test]Fix flaky test produceCommitTest (#20006)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 63646567917 [fix][test]Fix flaky test produceCommitTest (#20006)
63646567917 is described below

commit 636465679175b07e260f2c668c8d4e4f4a683487
Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com>
AuthorDate: Thu Apr 13 23:09:10 2023 +0800

[fix][test]Fix flaky test produceCommitTest (#20006)

Fixes https://github.com/apache/pulsar/issues/18466
### Motivation
There are two main goals in solving this issue:

1. Fix the unstable tests in `produceCommitTest`.
2. Prevent transaction timeouts created by other tests from affecting the 
`testTxnTimeoutAtTransactionMetadataStore` test during its execution.

### Modification
1. Change the message-sending method to synchronous. (fix 
`produceCommitTest`)
2. Increase the transaction timeout to 10 minutes (fix 
`testTxnTimeoutAtTransactionMetadataStore`).

(cherry picked from commit 653271e8d8aeb361ec260780ba88ccf3a880f403)
---
 .../java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 83feaa3ac11..34cc3bc1ca5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -280,9 +280,9 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
 int messageCnt = 1000;
 for (int i = 0; i < messageCnt; i++) {
 if (i % 5 == 0) {
-producer.newMessage(txn1).value(("Hello Txn - " + 
i).getBytes(UTF_8)).send();
+producer.newMessage(txn1).value(("Hello Txn - " + 
i).getBytes(UTF_8)).sendAsync();
 } else {
-producer.newMessage(txn2).value(("Hello Txn - " + 
i).getBytes(UTF_8)).send();
+producer.newMessage(txn2).value(("Hello Txn - " + 
i).getBytes(UTF_8)).sendAsync();
 }
 txnMessageCnt++;
 }
@@ -811,7 +811,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
 public Transaction getTxn() throws Exception {
 return pulsarClient
 .newTransaction()
-.withTransactionTimeout(10, TimeUnit.SECONDS)
+.withTransactionTimeout(10, TimeUnit.MINUTES)
 .build()
 .get();
 }



[pulsar] branch branch-3.0 updated: [fix][fn] check user metric len before iterating (#20021)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 9a271ae4454 [fix][fn] check user metric len before iterating (#20021)
9a271ae4454 is described below

commit 9a271ae445471d8ba516abe53747eca0ae617b69
Author: Andy Walker 
AuthorDate: Wed Apr 12 20:28:42 2023 -0400

[fix][fn] check user metric len before iterating (#20021)

Co-authored-by: Andy Walker 
(cherry picked from commit 52e8144587548a692e550a8538f4d2667b5499d6)
---
 pulsar-function-go/pf/instance.go  |  3 ++
 .../pf/instanceControlServicer_test.go | 53 ++
 pulsar-function-go/pf/stats_test.go| 27 +++
 3 files changed, 83 insertions(+)

diff --git a/pulsar-function-go/pf/instance.go 
b/pulsar-function-go/pf/instance.go
index 5d17cfe0c33..a82273031ec 100644
--- a/pulsar-function-go/pf/instance.go
+++ b/pulsar-function-go/pf/instance.go
@@ -669,6 +669,9 @@ func (gi *goInstance) getTotalReceived1min() float32 {
 func (gi *goInstance) getUserMetricsMap() map[string]float64 {
userMetricMap := map[string]float64{}
filteredMetricFamilies := 
gi.getFilteredMetricFamilies(PulsarFunctionMetricsPrefix + UserMetric)
+   if len(filteredMetricFamilies) == 0 {
+   return userMetricMap
+   }
for _, m := range filteredMetricFamilies[0].GetMetric() {
var isFuncMetric bool
var userLabelName string
diff --git a/pulsar-function-go/pf/instanceControlServicer_test.go 
b/pulsar-function-go/pf/instanceControlServicer_test.go
index 836ec6e5c79..9344d0a5915 100644
--- a/pulsar-function-go/pf/instanceControlServicer_test.go
+++ b/pulsar-function-go/pf/instanceControlServicer_test.go
@@ -21,6 +21,7 @@ package pf
 
 import (
"context"
+   "fmt"
"log"
"net"
"testing"
@@ -76,3 +77,55 @@ func 
TestInstanceControlServicer_serve_creates_valid_instance(t *testing.T) {
log.Printf("Response: %+v", resp.Success)
assert.Equal(t, resp.Success, true)
 }
+
+func instanceCommunicationClient(t *testing.T, instance *goInstance) 
pb.InstanceControlClient {
+   t.Helper()
+
+   if instance == nil {
+   t.Fatalf("cannot create communication client for nil instance")
+   }
+
+   var (
+   ctx context.Context = context.Background()
+   cf  context.CancelFunc
+   )
+
+   if testDeadline, ok := t.Deadline(); ok {
+   ctx, cf = context.WithDeadline(context.Background(), 
testDeadline)
+   t.Cleanup(cf)
+   }
+
+   lis = bufconn.Listen(bufSize)
+   t.Cleanup(func() {
+   lis.Close()
+   })
+   // create a gRPC server object
+   grpcServer := grpc.NewServer()
+   t.Cleanup(func() {
+   grpcServer.Stop()
+   })
+
+   servicer := InstanceControlServicer{instance}
+   // must register before we start the service.
+   pb.RegisterInstanceControlServer(grpcServer, )
+
+   // start the server
+   t.Logf("Serving InstanceCommunication on port %d", 
instance.context.GetPort())
+
+   go func() {
+   if err := grpcServer.Serve(lis); err != nil {
+   panic(fmt.Sprintf("grpc server exited with error: %v", 
err))
+   }
+   }()
+
+   // Now we can setup the client:
+   conn, err := grpc.DialContext(ctx, "bufnet", 
grpc.WithContextDialer(getBufDialer(lis)), grpc.WithInsecure())
+   if err != nil {
+   t.Fatalf("Failed to dial bufnet: %v", err)
+   }
+   t.Cleanup(func() {
+   conn.Close()
+   })
+   client := pb.NewInstanceControlClient(conn)
+   return client
+}
diff --git a/pulsar-function-go/pf/stats_test.go 
b/pulsar-function-go/pf/stats_test.go
index d52b08b7173..7b415ef5eff 100644
--- a/pulsar-function-go/pf/stats_test.go
+++ b/pulsar-function-go/pf/stats_test.go
@@ -20,6 +20,7 @@
 package pf
 
 import (
+   "context"
"fmt"
"io/ioutil"
"math"
@@ -28,6 +29,7 @@ import (
"time"
 
"github.com/golang/protobuf/proto"
+   "github.com/golang/protobuf/ptypes/empty"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
 
@@ -257,3 +259,28 @@ func TestUserMetrics(t *testing.T) {
gi.close()
metricsServicer.close()
 }
+
+func TestInstanceControlMetrics(t *testing.T) {
+   instance := newGoInstance()
+   t.Cleanup(instance.close)
+   instanceClient := instanceCommunicationClient(t, instance)
+   _, err := instanceClient.GetMetrics(context.Background(), 
{})
+   assert.NoError(t, err, "err communicating with instance control: %v", 
err)
+
+   testLabels := []string{"userMetricControlTest1", 

[pulsar] branch branch-3.0 updated: [fix] [broker] error TimeUnit to record publish latency (#20074)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new c7cffba6b62 [fix] [broker] error TimeUnit to record publish latency 
(#20074)
c7cffba6b62 is described below

commit c7cffba6b623183f82ec48c6c83a098dcd1a1fa8
Author: ken <1647023...@qq.com>
AuthorDate: Thu Apr 13 01:22:02 2023 +0800

[fix] [broker] error TimeUnit to record publish latency (#20074)

Co-authored-by: fanjianye 
Co-authored-by: tison 
(cherry picked from commit 547d792439e7b8bfefb0236b078ed145731da658)
---
 .../java/org/apache/pulsar/broker/rest/RestMessagePublishContext.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/RestMessagePublishContext.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/RestMessagePublishContext.java
index d7cee8b600b..3c9adbd3e4f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/RestMessagePublishContext.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/RestMessagePublishContext.java
@@ -53,7 +53,7 @@ public class RestMessagePublishContext implements 
Topic.PublishContext {
 + "triggered send callback.",
 topic.getName(), ledgerId, entryId);
 }
-topic.recordAddLatency(System.nanoTime() - startTimeNs, 
TimeUnit.MICROSECONDS);
+topic.recordAddLatency(System.nanoTime() - startTimeNs, 
TimeUnit.NANOSECONDS);
 positionFuture.complete(PositionImpl.get(ledgerId, entryId));
 }
 recycle();



[GitHub] [pulsar-site] visortelle opened a new pull request, #560: PIP-249 site redesign

2023-05-07 Thread via GitHub


visortelle opened a new pull request, #560:
URL: https://github.com/apache/pulsar-site/pull/560

   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   Live deployment: https://pulsar-site-pip-249.vercel.app/
   
   Visible changes:
   - [ ] The home page.
   - [ ] Docs page.
   - [ ] Blog page.
   - [ ] Ecosystem page.
   - [ ] Case studies page.
   - [ ] Resources page.
   - [ ] Events page.
   
   What’s left:
   - [ ] Ensure that homepage animation performance is ok, otherwise, replace 
it with something else. 
   - [ ] Also, we need to decide what to do with the animation in Firefox. See 
attached screenshot. Maybe use a static illustration variant for Firefox?
   - [ ] Announcement bar. The styles are ready. Need to integrate it with 
docusaurus.config.js (simple). ~0.5h.
   - [ ] Powered by page. ~0.5h.
   - [ ] Community page. ~3h.
   I’ll resolve these points while @tisonkun reviews the changes.
   
   Needs recheck:
   - [ ] Contribute page and the recent Client Feature Matrix page.
   - [ ] Maybe some hidden pages I don’t know about. I checked all listed under 
src/pages/*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[pulsar] branch branch-2.11 updated: [fix] [broker] Fix infinite ack of Replicator after topic is closed (#20232)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
 new a9ec89379e7 [fix] [broker] Fix infinite ack of Replicator after topic 
is closed (#20232)
a9ec89379e7 is described below

commit a9ec89379e74246c6b6d5150bea28341a8bc16b6
Author: fengyubiao 
AuthorDate: Sun May 7 14:21:51 2023 +0800

[fix] [broker] Fix infinite ack of Replicator after topic is closed (#20232)

(cherry picked from commit 98413642995eb6e562f6a591dcf56e20ac0cc7ef)
---
 .../service/persistent/PersistentReplicator.java   |  8 
 .../pulsar/broker/service/ReplicatorTest.java  | 46 +++---
 2 files changed, 39 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 76a032c0217..0471f12f3c9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -669,6 +669,14 @@ public class PersistentReplicator extends 
AbstractReplicator
 public void deleteFailed(ManagedLedgerException exception, Object ctx) {
 log.error("[{}][{} -> {}] Failed to delete message at {}: {}", 
topicName, localCluster, remoteCluster, ctx,
 exception.getMessage(), exception);
+if (exception instanceof CursorAlreadyClosedException) {
+log.error("[{}][{} -> {}] Asynchronous ack failure because 
replicator is already deleted and cursor is"
++ " already closed {}, ({})", topic, localCluster, 
remoteCluster, ctx,
+exception.getMessage(), exception);
+// replicator is already deleted and cursor is already closed so, 
producer should also be stopped
+closeProducerAsync();
+return;
+}
 if (ctx instanceof PositionImpl) {
 PositionImpl deletedEntry = (PositionImpl) ctx;
 if (deletedEntry.compareTo((PositionImpl) 
cursor.getMarkDeletedPosition()) > 0) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 23491972f4a..f80863cda5b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
@@ -50,9 +51,10 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
-
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -60,7 +62,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.State;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -1620,20 +1621,41 @@ public class ReplicatorTest extends ReplicatorTestBase {
 log.info("--- Starting ReplicatorTest::testReplication ---");
 
 String namespace = "pulsar/global/ns2";
-admin1.namespaces().createNamespace(namespace);
-admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2"));
+admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1"));
 final TopicName dest = TopicName
 .get(BrokerTestUtil.newUniqueName("persistent://" + namespace 
+ "/ackFailedTopic"));
 
 @Cleanup
 MessageProducer producer1 = new MessageProducer(url1, dest);
-log.info("--- Starting producer --- " 

[pulsar] branch branch-2.11 updated: [fix][monitor] Fix the partitioned publisher topic stat aggregation bug (#18807)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
 new a764bbd2a7b [fix][monitor] Fix the partitioned publisher topic stat 
aggregation bug (#18807)
a764bbd2a7b is described below

commit a764bbd2a7b5c7053ba0f50ed1aa3197cba6e7de
Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com>
AuthorDate: Wed Jan 4 06:04:06 2023 -0800

[fix][monitor] Fix the partitioned publisher topic stat aggregation bug 
(#18807)

(cherry picked from commit 8790ed18fc037988b044fed18202a1f3b50f7c65)
---
 .../NonPersistentPartitionedTopicStatsImpl.java|  1 +
 .../data/stats/NonPersistentTopicStatsImpl.java| 31 +++---
 .../data/stats/PartitionedTopicStatsImpl.java  |  1 +
 .../common/policies/data/stats/TopicStatsImpl.java | 29 +++-
 .../NonPersistentPartitionedTopicStatsTest.java| 16 +--
 .../policies/data/PersistentTopicStatsTest.java| 30 +
 6 files changed, 73 insertions(+), 35 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java
index 2a9fe423284..2ba9682383f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java
@@ -27,6 +27,7 @@ import 
org.apache.pulsar.common.policies.data.NonPersistentPartitionedTopicStats
 
 /**
  * Statistics for a non-persistent partitioned topic.
+ * This class is not thread-safe.
  */
 @SuppressFBWarnings("EQ_DOESNT_OVERRIDE_EQUALS")
 public class NonPersistentPartitionedTopicStatsImpl extends 
NonPersistentTopicStatsImpl
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
index 23e603ea028..c798c00acec 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.common.policies.data.PublisherStats;
 
 /**
  * Statistics for a non-persistent topic.
+ * This class is not thread-safe.
  */
 @SuppressFBWarnings("EQ_DOESNT_OVERRIDE_EQUALS")
 public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements 
NonPersistentTopicStats {
@@ -148,14 +149,14 @@ public class NonPersistentTopicStatsImpl extends 
TopicStatsImpl implements NonPe
 }
 
 // if the stats are added for the 1st time, we will need to make a copy of 
these stats and add it to the current
-// stats.
+// stats. This stat addition is not thread-safe.
 public NonPersistentTopicStatsImpl add(NonPersistentTopicStats ts) {
 NonPersistentTopicStatsImpl stats = (NonPersistentTopicStatsImpl) ts;
 Objects.requireNonNull(stats);
 super.add(stats);
 this.msgDropRate += stats.msgDropRate;
-
-stats.getNonPersistentPublishers().forEach(s -> {
+for (int index = 0; index < stats.getNonPersistentPublishers().size(); 
index++) {
+NonPersistentPublisherStats s = 
stats.getNonPersistentPublishers().get(index);
 if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
 ((NonPersistentPublisherStatsImpl) 
this.nonPersistentPublishersMap
 .computeIfAbsent(s.getProducerName(), key -> {
@@ -165,20 +166,20 @@ public class NonPersistentTopicStatsImpl extends 
TopicStatsImpl implements NonPe
 return newStats;
 })).add((NonPersistentPublisherStatsImpl) s);
 } else {
-if (this.nonPersistentPublishers.size() != 
stats.getNonPersistentPublishers().size()) {
-for (int i = 0; i < 
stats.getNonPersistentPublishers().size(); i++) {
-NonPersistentPublisherStatsImpl newStats = new 
NonPersistentPublisherStatsImpl();
-newStats.setSupportsPartialProducer(false);
-
this.nonPersistentPublishers.add(newStats.add((NonPersistentPublisherStatsImpl) 
s));
-}
-} else {
-for (int i = 0; i < 
stats.getNonPersistentPublishers().size(); i++) {
-((NonPersistentPublisherStatsImpl) 
this.nonPersistentPublishers.get(i))
-.add((NonPersistentPublisherStatsImpl) s);
-   

[pulsar] branch branch-2.10 updated: [fix] [broker] Fix infinite ack of Replicator after topic is closed (#20232)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
 new c934df79234 [fix] [broker] Fix infinite ack of Replicator after topic 
is closed (#20232)
c934df79234 is described below

commit c934df792343fa169178f85331a0d15cae2c149e
Author: fengyubiao 
AuthorDate: Sun May 7 14:21:51 2023 +0800

[fix] [broker] Fix infinite ack of Replicator after topic is closed (#20232)

(cherry picked from commit 98413642995eb6e562f6a591dcf56e20ac0cc7ef)
---
 .../service/persistent/PersistentReplicator.java   |  8 
 .../pulsar/broker/service/ReplicatorTest.java  | 46 +++---
 2 files changed, 39 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 76a032c0217..ceddfc47749 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -669,6 +669,14 @@ public class PersistentReplicator extends 
AbstractReplicator
 public void deleteFailed(ManagedLedgerException exception, Object ctx) {
 log.error("[{}][{} -> {}] Failed to delete message at {}: {}", 
topicName, localCluster, remoteCluster, ctx,
 exception.getMessage(), exception);
+if (exception instanceof CursorAlreadyClosedException) {
+log.error("[{}][{} -> {}] Asynchronous ack failure because 
replicator is already deleted and cursor is"
++ " already closed {}, ({})", topic, localCluster, 
remoteCluster, ctx, exception.getMessage(),
+exception);
+// replicator is already deleted and cursor is already closed so, 
producer should also be stopped
+closeProducerAsync();
+return;
+}
 if (ctx instanceof PositionImpl) {
 PositionImpl deletedEntry = (PositionImpl) ctx;
 if (deletedEntry.compareTo((PositionImpl) 
cursor.getMarkDeletedPosition()) > 0) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 1e76d216986..158d223336b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
@@ -50,9 +51,10 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
-
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -60,7 +62,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.State;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -1614,20 +1615,41 @@ public class ReplicatorTest extends ReplicatorTestBase {
 log.info("--- Starting ReplicatorTest::testReplication ---");
 
 String namespace = "pulsar/global/ns2";
-admin1.namespaces().createNamespace(namespace);
-admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2"));
+admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1"));
 final TopicName dest = TopicName
 .get(BrokerTestUtil.newUniqueName("persistent://" + namespace 
+ "/ackFailedTopic"));
 
 @Cleanup
 MessageProducer producer1 = new MessageProducer(url1, dest);
-log.info("--- Starting producer --- " + 

[pulsar] branch branch-2.10 updated: [fix][monitor] Fix the partitioned publisher topic stat aggregation bug (#18807)

2023-05-07 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
 new b5b2de6596a [fix][monitor] Fix the partitioned publisher topic stat 
aggregation bug (#18807)
b5b2de6596a is described below

commit b5b2de6596af45c371c3014aef4c52903f40c310
Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com>
AuthorDate: Wed Jan 4 06:04:06 2023 -0800

[fix][monitor] Fix the partitioned publisher topic stat aggregation bug 
(#18807)

(cherry picked from commit 8790ed18fc037988b044fed18202a1f3b50f7c65)
---
 .../NonPersistentPartitionedTopicStatsImpl.java|  1 +
 .../data/stats/NonPersistentTopicStatsImpl.java| 31 +++---
 .../data/stats/PartitionedTopicStatsImpl.java  |  1 +
 .../common/policies/data/stats/TopicStatsImpl.java | 29 +++-
 .../NonPersistentPartitionedTopicStatsTest.java| 16 +--
 .../policies/data/PersistentTopicStatsTest.java| 30 +
 6 files changed, 73 insertions(+), 35 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java
index 2a9fe423284..2ba9682383f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java
@@ -27,6 +27,7 @@ import 
org.apache.pulsar.common.policies.data.NonPersistentPartitionedTopicStats
 
 /**
  * Statistics for a non-persistent partitioned topic.
+ * This class is not thread-safe.
  */
 @SuppressFBWarnings("EQ_DOESNT_OVERRIDE_EQUALS")
 public class NonPersistentPartitionedTopicStatsImpl extends 
NonPersistentTopicStatsImpl
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
index 23e603ea028..c798c00acec 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.common.policies.data.PublisherStats;
 
 /**
  * Statistics for a non-persistent topic.
+ * This class is not thread-safe.
  */
 @SuppressFBWarnings("EQ_DOESNT_OVERRIDE_EQUALS")
 public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements 
NonPersistentTopicStats {
@@ -148,14 +149,14 @@ public class NonPersistentTopicStatsImpl extends 
TopicStatsImpl implements NonPe
 }
 
 // if the stats are added for the 1st time, we will need to make a copy of 
these stats and add it to the current
-// stats.
+// stats. This stat addition is not thread-safe.
 public NonPersistentTopicStatsImpl add(NonPersistentTopicStats ts) {
 NonPersistentTopicStatsImpl stats = (NonPersistentTopicStatsImpl) ts;
 Objects.requireNonNull(stats);
 super.add(stats);
 this.msgDropRate += stats.msgDropRate;
-
-stats.getNonPersistentPublishers().forEach(s -> {
+for (int index = 0; index < stats.getNonPersistentPublishers().size(); 
index++) {
+NonPersistentPublisherStats s = 
stats.getNonPersistentPublishers().get(index);
 if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
 ((NonPersistentPublisherStatsImpl) 
this.nonPersistentPublishersMap
 .computeIfAbsent(s.getProducerName(), key -> {
@@ -165,20 +166,20 @@ public class NonPersistentTopicStatsImpl extends 
TopicStatsImpl implements NonPe
 return newStats;
 })).add((NonPersistentPublisherStatsImpl) s);
 } else {
-if (this.nonPersistentPublishers.size() != 
stats.getNonPersistentPublishers().size()) {
-for (int i = 0; i < 
stats.getNonPersistentPublishers().size(); i++) {
-NonPersistentPublisherStatsImpl newStats = new 
NonPersistentPublisherStatsImpl();
-newStats.setSupportsPartialProducer(false);
-
this.nonPersistentPublishers.add(newStats.add((NonPersistentPublisherStatsImpl) 
s));
-}
-} else {
-for (int i = 0; i < 
stats.getNonPersistentPublishers().size(); i++) {
-((NonPersistentPublisherStatsImpl) 
this.nonPersistentPublishers.get(i))
-.add((NonPersistentPublisherStatsImpl) s);
-   

[GitHub] [pulsar] poorbarcode commented on pull request #18807: [fix][monitor] Fix the partitioned publisher topic stat aggregation bug

2023-05-07 Thread via GitHub


poorbarcode commented on PR #18807:
URL: https://github.com/apache/pulsar/pull/18807#issuecomment-1537483222

   @heesung-sn 
   
   I have add the label `[release/2.10.5, release/2.11.2]` and cherry-picked 
this patch to there branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] tisonkun commented on a diff in pull request #20242: [fix][client] Java Client's Seek Logic Not Threadsafe #1

2023-05-07 Thread via GitHub


tisonkun commented on code in PR #20242:
URL: https://github.com/apache/pulsar/pull/20242#discussion_r1186877917


##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##
@@ -2153,9 +2153,14 @@ private CompletableFuture seekAsyncInternal(long 
requestId, ByteBuf seek,
 
 MessageIdAdv originSeekMessageId = seekMessageId;
 seekMessageId = (MessageIdAdv) seekId;
-duringSeek.set(true);
+
+if (!duringSeek.compareAndSet(false, true)) {
+log.warn("[{}][{}] Attempting to seek operation that is already in 
progress, cancelling {}", 
+topic, subscription, seekBy);
+seekFuture.cancel(true);

Review Comment:
   Sounds reasonable with your test case. I'll give another look tomorrow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] tisonkun commented on pull request #20242: [fix][client] Java Client's Seek Logic Not Threadsafe #1

2023-05-07 Thread via GitHub


tisonkun commented on PR #20242:
URL: https://github.com/apache/pulsar/pull/20242#issuecomment-1537479452

   > tool
   
   Generally, I just write in text :D
   
   It's about:
   
   1. Define the actors.
   2. Describe at T_N, what actor takes what action
   3. The order of T_N
   4. What actions cause race condition
   
   But any tool you searched is acceptable as long as the diagram is clear 
(with text, reviewers manually dump to diagram).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar-adapters] avinash-fk commented on pull request #49: [Issue#48]: Fixed bug in doCommitOffsets of PulsarKafkaConsumer in version 2.11.0

2023-05-07 Thread via GitHub


avinash-fk commented on PR #49:
URL: https://github.com/apache/pulsar-adapters/pull/49#issuecomment-1537476986

   @eolivelli @dlg99 added a unit test for commitSync, specifically on how 
acknowledgeCumulativeAsync is called inside doCommitOffsets.  Let me know if 
there is any thing else required.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] JooHyukKim commented on pull request #20242: [fix][client] Java Client's Seek Logic Not Threadsafe #1

2023-05-07 Thread via GitHub


JooHyukKim commented on PR #20242:
URL: https://github.com/apache/pulsar/pull/20242#issuecomment-1537476267

   > I'd appreciate it if you can provide a timing diagram to prove the fix. 
You may refer to 
[apache/curator#430](https://github.com/apache/curator/pull/430) as a typical 
description.
   
   Np .  May I ask what tool everyone conventionally uses? Or your suggestion 
@tisonkun ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] JooHyukKim commented on a diff in pull request #20242: [fix][client] Java Client's Seek Logic Not Threadsafe #1

2023-05-07 Thread via GitHub


JooHyukKim commented on code in PR #20242:
URL: https://github.com/apache/pulsar/pull/20242#discussion_r1186875184


##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##
@@ -2153,9 +2153,14 @@ private CompletableFuture seekAsyncInternal(long 
requestId, ByteBuf seek,
 
 MessageIdAdv originSeekMessageId = seekMessageId;
 seekMessageId = (MessageIdAdv) seekId;
-duringSeek.set(true);
+
+if (!duringSeek.compareAndSet(false, true)) {
+log.warn("[{}][{}] Attempting to seek operation that is already in 
progress, cancelling {}", 
+topic, subscription, seekBy);
+seekFuture.cancel(true);

Review Comment:
   > completing with `null` will not set the message clear.
   
   That indeed the operation is "cancelled".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] JooHyukKim commented on a diff in pull request #20242: [fix][client] Java Client's Seek Logic Not Threadsafe #1

2023-05-07 Thread via GitHub


JooHyukKim commented on code in PR #20242:
URL: https://github.com/apache/pulsar/pull/20242#discussion_r1186873025


##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##
@@ -2153,9 +2153,14 @@ private CompletableFuture seekAsyncInternal(long 
requestId, ByteBuf seek,
 
 MessageIdAdv originSeekMessageId = seekMessageId;
 seekMessageId = (MessageIdAdv) seekId;
-duringSeek.set(true);
+
+if (!duringSeek.compareAndSet(false, true)) {
+log.warn("[{}][{}] Attempting to seek operation that is already in 
progress, cancelling {}", 
+topic, subscription, seekBy);
+seekFuture.cancel(true);

Review Comment:
   But then again, completing with `null` will not set the message clear. I 
think we should just cancel the `seekFuture`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >