(pulsar) branch master updated: [improve][pip] PIP-347: Add role field in consumer's stat (#22562)

2024-05-15 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new f3e52b568ec [improve][pip] PIP-347: Add role field in consumer's stat 
(#22562)
f3e52b568ec is described below

commit f3e52b568ec7e86e7582bdc425321fe172bc4deb
Author: Wenzhi Feng <52550727+thetumb...@users.noreply.github.com>
AuthorDate: Thu May 16 13:29:26 2024 +0800

[improve][pip] PIP-347: Add role field in consumer's stat (#22562)
---
 pip/pip-347.md |   2 +-
 .../org/apache/pulsar/broker/service/Consumer.java |   1 +
 .../stats/AuthenticatedConsumerStatsTest.java  | 169 +
 .../pulsar/broker/stats/ConsumerStatsTest.java |   2 +-
 .../pulsar/common/policies/data/ConsumerStats.java |   3 +
 .../policies/data/stats/ConsumerStatsImpl.java |   3 +
 6 files changed, 178 insertions(+), 2 deletions(-)

diff --git a/pip/pip-347.md b/pip/pip-347.md
index 5326fed3533..a5d5d76ae17 100644
--- a/pip/pip-347.md
+++ b/pip/pip-347.md
@@ -34,4 +34,4 @@ Fully compatible.
 Updated afterwards
 -->
 * Mailing List discussion thread: 
https://lists.apache.org/thread/p9y9r8pb7ygk8f0jd121c1121phvzd09
-* Mailing List voting thread:
+* Mailing List voting thread: 
https://lists.apache.org/thread/sfv0vq498dnjx6k6zdrnn0cw8f22tz05
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index fe9fbe6a400..c9f417c4bc4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -208,6 +208,7 @@ public class Consumer {
 stats = new ConsumerStatsImpl();
 stats.setAddress(cnx.clientSourceAddressAndPort());
 stats.consumerName = consumerName;
+stats.appId = appId;
 stats.setConnectedSince(DateFormatter.format(connectedSince));
 stats.setClientVersion(cnx.getClientVersion());
 stats.metadata = this.metadata;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java
new file mode 100644
index 000..e8cadb72e1e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.time.Duration;
+import java.util.Base64;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Set;
+
+public class AuthenticatedConsumerStatsTest extends ConsumerStatsTest{
+private final String ADMIN_TOKEN;
+private final String TOKEN_PUBLIC_KEY;
+private final KeyPair kp;
+
+AuthenticatedConsumerStatsTest() th

(pulsar) branch branch-3.2 updated: [fix][broker] Fix cursor should use latest ledger config (#22644)

2024-05-14 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 3de8358b22a [fix][broker] Fix cursor should use latest ledger config 
(#22644)
3de8358b22a is described below

commit 3de8358b22aa48bb3fba5d301938609868791924
Author: Zixuan Liu 
AuthorDate: Fri May 10 10:37:44 2024 +0800

[fix][broker] Fix cursor should use latest ledger config (#22644)

Signed-off-by: Zixuan Liu 
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 59 +++---
 .../mledger/impl/ManagedCursorMXBeanImpl.java  |  3 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  8 +--
 .../mledger/impl/NonDurableCursorImpl.java |  5 +-
 .../bookkeeper/mledger/impl/OpReadEntry.java   |  3 +-
 .../bookkeeper/mledger/impl/RangeSetWrapper.java   |  2 +-
 .../mledger/impl/ReadOnlyCursorImpl.java   |  5 +-
 .../mledger/impl/ReadOnlyManagedLedgerImpl.java|  2 +-
 ...ManagedCursorIndividualDeletedMessagesTest.java |  3 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  7 ++-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  2 +-
 .../broker/service/BrokerBkEnsemblesTests.java |  8 +--
 .../service/persistent/PersistentTopicTest.java| 25 +
 13 files changed, 76 insertions(+), 56 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 7065af203da..911eca48bac 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -119,7 +119,6 @@ public class ManagedCursorImpl implements ManagedCursor {
 return 0;
 };
 protected final BookKeeper bookkeeper;
-protected final ManagedLedgerConfig config;
 protected final ManagedLedgerImpl ledger;
 private final String name;
 
@@ -299,31 +298,30 @@ public class ManagedCursorImpl implements ManagedCursor {
 void operationFailed(ManagedLedgerException exception);
 }
 
-ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, 
ManagedLedgerImpl ledger, String cursorName) {
+ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerImpl ledger, String 
cursorName) {
 this.bookkeeper = bookkeeper;
 this.cursorProperties = Collections.emptyMap();
-this.config = config;
 this.ledger = ledger;
 this.name = cursorName;
 this.individualDeletedMessages = new 
RangeSetWrapper<>(positionRangeConverter,
 positionRangeReverseConverter, this);
-if (config.isDeletionAtBatchIndexLevelEnabled()) {
+if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
 this.batchDeletedIndexes = new ConcurrentSkipListMap<>();
 } else {
 this.batchDeletedIndexes = null;
 }
-this.digestType = 
BookKeeper.DigestType.fromApiDigestType(config.getDigestType());
+this.digestType = 
BookKeeper.DigestType.fromApiDigestType(getConfig().getDigestType());
 STATE_UPDATER.set(this, State.Uninitialized);
 PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.set(this, 0);
 PENDING_READ_OPS_UPDATER.set(this, 0);
 RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE);
 WAITING_READ_OP_UPDATER.set(this, null);
-this.clock = config.getClock();
+this.clock = getConfig().getClock();
 this.lastActive = this.clock.millis();
 this.lastLedgerSwitchTimestamp = this.clock.millis();
 
-if (config.getThrottleMarkDelete() > 0.0) {
-markDeleteLimiter = 
RateLimiter.create(config.getThrottleMarkDelete());
+if (getConfig().getThrottleMarkDelete() > 0.0) {
+markDeleteLimiter = 
RateLimiter.create(getConfig().getThrottleMarkDelete());
 } else {
 // Disable mark-delete rate limiter
 markDeleteLimiter = null;
@@ -602,7 +600,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
 
recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
 }
-if (config.isDeletionAtBatchIndexLevelEnabled()
+if (getConfig().isDeletionAtBatchIndexLevelEnabled()
 && positionInfo.getBatchedEntryDeletionIndexInfoCount() > 
0) {
 
recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
 }
@@ -611,7 +609,8 @@ public class ManagedCursorImpl implements ManagedCursor {
 }, null);
 };
 try {
-boo

(pulsar) branch branch-3.2 updated: [cleanup][ml] ManagedCursor clean up. (#22246)

2024-05-14 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 781a02b2085 [cleanup][ml] ManagedCursor clean up. (#22246)
781a02b2085 is described below

commit 781a02b20859e61361f1d18c369c5d00d1b2f7fd
Author: 道君 
AuthorDate: Tue Mar 12 23:36:59 2024 +0800

[cleanup][ml] ManagedCursor clean up. (#22246)
---
 .../java/org/apache/bookkeeper/mledger/impl/EntryImpl.java|  7 ++-
 .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 11 +++
 2 files changed, 9 insertions(+), 9 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
index 6512399173f..80397931357 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
@@ -42,6 +42,7 @@ public final class EntryImpl extends 
AbstractCASReferenceCounted implements Entr
 private long timestamp;
 private long ledgerId;
 private long entryId;
+private PositionImpl position;
 ByteBuf data;
 
 private Runnable onDeallocate;
@@ -151,7 +152,10 @@ public final class EntryImpl extends 
AbstractCASReferenceCounted implements Entr
 
 @Override
 public PositionImpl getPosition() {
-return new PositionImpl(ledgerId, entryId);
+if (position == null) {
+position = PositionImpl.get(ledgerId, entryId);
+}
+return position;
 }
 
 @Override
@@ -197,6 +201,7 @@ public final class EntryImpl extends 
AbstractCASReferenceCounted implements Entr
 timestamp = -1;
 ledgerId = -1;
 entryId = -1;
+position = null;
 recyclerHandle.recycle(this);
 }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 0c8dedd6b21..7065af203da 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1506,10 +1506,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 Set alreadyAcknowledgedPositions = new HashSet<>();
 lock.readLock().lock();
 try {
-positions.stream()
-.filter(position -> ((PositionImpl) 
position).compareTo(markDeletePosition) <= 0
-|| 
individualDeletedMessages.contains(position.getLedgerId(), 
position.getEntryId()))
-.forEach(alreadyAcknowledgedPositions::add);
+
positions.stream().filter(this::isMessageDeleted).forEach(alreadyAcknowledgedPositions::add);
 } finally {
 lock.readLock().unlock();
 }
@@ -2281,8 +2278,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 return;
 }
 
-if (position.compareTo(markDeletePosition) <= 0
-|| 
individualDeletedMessages.contains(position.getLedgerId(), 
position.getEntryId())) {
+if (isMessageDeleted(position)) {
 if (config.isDeletionAtBatchIndexLevelEnabled()) {
 BitSetRecyclable bitSetRecyclable = 
batchDeletedIndexes.remove(position);
 if (bitSetRecyclable != null) {
@@ -3517,8 +3513,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 @Override
 public void trimDeletedEntries(List entries) {
 entries.removeIf(entry -> {
-boolean isDeleted = 
markDeletePosition.compareTo(entry.getLedgerId(), entry.getEntryId()) >= 0
-|| individualDeletedMessages.contains(entry.getLedgerId(), 
entry.getEntryId());
+boolean isDeleted = isMessageDeleted(entry.getPosition());
 if (isDeleted) {
 entry.release();
 }



(pulsar) branch branch-3.2 updated: [fix][admin] Fix can't delete tenant for v1 (#22550)

2024-05-14 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new aa7f0676e5a [fix][admin] Fix can't delete tenant for v1 (#22550)
aa7f0676e5a is described below

commit aa7f0676e5a1b4b450da569fb70385523393d8e1
Author: Jiwei Guo 
AuthorDate: Tue Apr 23 22:04:13 2024 +0800

[fix][admin] Fix can't delete tenant for v1 (#22550)
---
 .../pulsar/broker/resources/TopicResources.java|  2 +-
 .../pulsar/broker/auth/AuthorizationTest.java  | 28 ++
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
index 0963f25c3d3..413184764f5 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
@@ -120,7 +120,7 @@ public class TopicResources {
 return store.exists(path)
 .thenCompose(exists -> {
 if (exists) {
-return store.delete(path, Optional.empty());
+return store.deleteRecursive(path);
 } else {
 return CompletableFuture.completedFuture(null);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
index e9ad401b878..6c913d42908 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
@@ -33,6 +33,7 @@ import 
org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -59,11 +60,15 @@ public class AuthorizationTest extends 
MockedPulsarServiceBaseTest {
 conf.setSystemTopicEnabled(false);
 conf.setForceDeleteNamespaceAllowed(true);
 conf.setAuthenticationEnabled(true);
+conf.setForceDeleteNamespaceAllowed(true);
+conf.setForceDeleteTenantAllowed(true);
 conf.setAuthenticationProviders(
 
Sets.newHashSet("org.apache.pulsar.broker.auth.MockAuthenticationProvider"));
 conf.setAuthorizationEnabled(true);
 conf.setAuthorizationAllowWildcardsMatching(true);
 conf.setSuperUserRoles(Sets.newHashSet("pulsar.super_user", 
"pass.pass"));
+
conf.setBrokerClientAuthenticationPlugin(MockAuthentication.class.getName());
+conf.setBrokerClientAuthenticationParameters("user:pass.pass");
 internalSetup();
 }
 
@@ -72,6 +77,11 @@ public class AuthorizationTest extends 
MockedPulsarServiceBaseTest {
 pulsarAdminBuilder.authentication(new MockAuthentication("pass.pass"));
 }
 
+@Override
+protected void customizeNewPulsarClientBuilder(ClientBuilder 
clientBuilder) {
+clientBuilder.authentication(new MockAuthentication("pass.pass"));
+}
+
 @AfterClass(alwaysRun = true)
 @Override
 public void cleanup() throws Exception {
@@ -237,6 +247,24 @@ public class AuthorizationTest extends 
MockedPulsarServiceBaseTest {
 
 admin.namespaces().deleteNamespace("p1/c1/ns1", true);
 admin.tenants().deleteTenant("p1");
+
+admin.clusters().deleteCluster("c1");
+}
+
+@Test
+public void testDeleteV1Tenant() throws Exception {
+admin.clusters().createCluster("c1", ClusterData.builder().build());
+admin.tenants().createTenant("p1", new 
TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1")));
+waitForChange();
+admin.namespaces().createNamespace("p1/c1/ns1");
+waitForChange();
+
+
+String topic = "persistent://p1/c1/ns1/ds2";
+admin.topics().createNonPartitionedTopic(topic);
+
+admin.namespaces().deleteNamespace("p1/c1/ns1", true);
+admin.tenants().deleteTenant("p1", true);
 admin.clusters().deleteCluster("c1");
 }
 



(pulsar) branch master updated: [fix][misc] Correct the description of patternAutoDiscoveryPeriod (#22615)

2024-05-12 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 3b24b6e0b72 [fix][misc] Correct the description of 
patternAutoDiscoveryPeriod (#22615)
3b24b6e0b72 is described below

commit 3b24b6e0b7250f531c86e5ee2635a9b23467419c
Author: jito 
AuthorDate: Mon May 13 09:29:38 2024 +0900

[fix][misc] Correct the description of patternAutoDiscoveryPeriod (#22615)

Signed-off-by: jitokim 
---
 .../src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java  | 5 +++--
 .../apache/pulsar/client/impl/conf/ConsumerConfigurationData.java| 2 +-
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 863432b478f..6f3c3be9727 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -464,7 +464,7 @@ public interface ConsumerBuilder extends Cloneable {
 ConsumerBuilder readCompacted(boolean readCompacted);
 
 /**
- * Sets topic's auto-discovery period when using a pattern for topics 
consumer.
+ * Sets topic's auto-discovery period when using a pattern for topic's 
consumer.
  * The period is in minutes, and the default and minimum values are 1 
minute.
  *
  * @param periodInMinutes
@@ -476,7 +476,8 @@ public interface ConsumerBuilder extends Cloneable {
 
 
 /**
- * Sets topic's auto-discovery period when using a pattern for topics 
consumer.
+ * Sets topic's auto-discovery period when using a pattern for topic's 
consumer.
+ * The default value of period is 1 minute, with a minimum of 1 second.
  *
  * @param interval
  *the amount of delay between checks for
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 3ae0e977d13..18529276c9c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -310,7 +310,7 @@ public class ConsumerConfigurationData implements 
Serializable, Cloneable {
 name = "patternAutoDiscoveryPeriod",
 value = "Topic auto discovery period when using a pattern for 
topic's consumer.\n"
 + "\n"
-+ "The default and minimum value is 1 minute."
++ "The default value is 1 minute, with a minimum of 1 
second."
 )
 private int patternAutoDiscoveryPeriod = 60;
 



(pulsar) branch branch-3.2 updated: [improve] [broker] Add additionalSystemCursorNames ignore list for TTL check (#22614)

2024-05-09 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new e98370e21cb [improve] [broker] Add additionalSystemCursorNames ignore 
list for TTL check (#22614)
e98370e21cb is described below

commit e98370e21cb6a6ddf0b5ef9c8123046c7e5b8e3d
Author: Hang Chen 
AuthorDate: Thu May 9 20:45:56 2024 +0800

[improve] [broker] Add additionalSystemCursorNames ignore list for TTL 
check (#22614)
---
 conf/broker.conf   |  4 +
 conf/standalone.conf   |  4 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  7 ++
 .../broker/service/persistent/PersistentTopic.java |  7 +-
 .../pulsar/broker/service/MessageTTLTest.java  | 96 ++
 5 files changed, 117 insertions(+), 1 deletion(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index e0ebbe3043a..c5beda206a4 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -180,6 +180,10 @@ backlogQuotaDefaultRetentionPolicy=producer_request_hold
 # Default ttl for namespaces if ttl is not already configured at namespace 
policies. (disable default-ttl with value 0)
 ttlDurationDefaultInSeconds=0
 
+# Additional system subscriptions that will be ignored by ttl check. The 
cursor names are comma separated.
+# Default is empty.
+# additionalSystemCursorNames=
+
 # Enable topic auto creation if new producer or consumer connected (disable 
auto creation with value false)
 allowAutoTopicCreation=true
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 5eb9fadcf19..5ca0c683d74 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -121,6 +121,10 @@ backlogQuotaDefaultLimitSecond=-1
 # Default ttl for namespaces if ttl is not already configured at namespace 
policies. (disable default-ttl with value 0)
 ttlDurationDefaultInSeconds=0
 
+# Additional system subscriptions that will be ignored by ttl check.  The 
cursor names are comma separated.
+# Default is empty.
+# additionalSystemCursorNames=
+
 # Enable the deletion of inactive topics. This parameter need to cooperate 
with the allowAutoTopicCreation parameter.
 # If brokerDeleteInactiveTopicsEnabled is set to true, we should ensure that 
allowAutoTopicCreation is also set to true.
 brokerDeleteInactiveTopicsEnabled=true
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index f53eb7e183f..10ffced0321 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -646,6 +646,13 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 )
 private int ttlDurationDefaultInSeconds = 0;
 
+@FieldContext(
+category = CATEGORY_POLICIES,
+doc = "Additional system subscriptions that will be ignored by ttl 
check. "
++ "The cursor names are comma separated. Default is empty."
+)
+private Set additionalSystemCursorNames = new TreeSet<>();
+
 @FieldContext(
 category = CATEGORY_POLICIES,
 dynamic = true,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index b4f295dbd97..472387a0a9b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -39,6 +39,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -272,6 +273,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 private final ExecutorService orderedExecutor;
 
 private volatile CloseFutures closeFutures;
+private Set additionalSystemCursorNames = new TreeSet<>();
 
 /***
  * We use 3 futures to prevent a new closing if there is an in-progress 
deletion or closing.  We make Pulsar return
@@ -384,6 +386,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 } else {
 shadowSourceTopic = null;
 }
+additionalSystemCursorNames = 
brokerService.pulsar().getConfiguration().getAdditionalSystemCursorNames();
 }
 
 @Override
@@ -1888,7 +1891,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal

(pulsar) branch master updated: [improve] [broker] Add additionalSystemCursorNames ignore list for TTL check (#22614)

2024-05-09 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new bed032e714a [improve] [broker] Add additionalSystemCursorNames ignore 
list for TTL check (#22614)
bed032e714a is described below

commit bed032e714aff9f5d2594bdc80a3e7888e53b1bf
Author: Hang Chen 
AuthorDate: Thu May 9 20:45:56 2024 +0800

[improve] [broker] Add additionalSystemCursorNames ignore list for TTL 
check (#22614)
---
 conf/broker.conf   |  4 +
 conf/standalone.conf   |  4 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  7 ++
 .../broker/service/persistent/PersistentTopic.java |  7 +-
 .../pulsar/broker/service/MessageTTLTest.java  | 96 ++
 5 files changed, 117 insertions(+), 1 deletion(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 1b51ff47551..1ef68a0395c 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -180,6 +180,10 @@ backlogQuotaDefaultRetentionPolicy=producer_request_hold
 # Default ttl for namespaces if ttl is not already configured at namespace 
policies. (disable default-ttl with value 0)
 ttlDurationDefaultInSeconds=0
 
+# Additional system subscriptions that will be ignored by ttl check. The 
cursor names are comma separated.
+# Default is empty.
+# additionalSystemCursorNames=
+
 # Enable topic auto creation if new producer or consumer connected (disable 
auto creation with value false)
 allowAutoTopicCreation=true
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 51035235d4d..a8615b70293 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -121,6 +121,10 @@ backlogQuotaDefaultLimitSecond=-1
 # Default ttl for namespaces if ttl is not already configured at namespace 
policies. (disable default-ttl with value 0)
 ttlDurationDefaultInSeconds=0
 
+# Additional system subscriptions that will be ignored by ttl check.  The 
cursor names are comma separated.
+# Default is empty.
+# additionalSystemCursorNames=
+
 # Enable the deletion of inactive topics. This parameter need to cooperate 
with the allowAutoTopicCreation parameter.
 # If brokerDeleteInactiveTopicsEnabled is set to true, we should ensure that 
allowAutoTopicCreation is also set to true.
 brokerDeleteInactiveTopicsEnabled=true
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a9d170ea5de..9efe1856509 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -652,6 +652,13 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 )
 private int ttlDurationDefaultInSeconds = 0;
 
+@FieldContext(
+category = CATEGORY_POLICIES,
+doc = "Additional system subscriptions that will be ignored by ttl 
check. "
++ "The cursor names are comma separated. Default is empty."
+)
+private Set additionalSystemCursorNames = new TreeSet<>();
+
 @FieldContext(
 category = CATEGORY_POLICIES,
 dynamic = true,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 7228bdeb2d3..28bc27f7961 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -40,6 +40,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -279,6 +280,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 private final ExecutorService orderedExecutor;
 
 private volatile CloseFutures closeFutures;
+private Set additionalSystemCursorNames = new TreeSet<>();
 
 @Getter
 private final PersistentTopicMetrics persistentTopicMetrics = new 
PersistentTopicMetrics();
@@ -414,6 +416,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 } else {
 shadowSourceTopic = null;
 }
+additionalSystemCursorNames = 
brokerService.pulsar().getConfiguration().getAdditionalSystemCursorNames();
 }
 
 @Override
@@ -1934,7 +1937,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 int messageTtlInSeconds = topicPolic

(pulsar) branch master updated: [fix][ml] Remove duplicated field initialization of ML (#22676)

2024-05-08 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 88feb874bb3 [fix][ml] Remove duplicated field initialization of ML 
(#22676)
88feb874bb3 is described below

commit 88feb874bb3ad58a74b3d40d931b2aa7380dc7e1
Author: 道君 
AuthorDate: Thu May 9 08:53:59 2024 +0800

[fix][ml] Remove duplicated field initialization of ML (#22676)
---
 .../java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 ---
 1 file changed, 3 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index e5e163127f7..b12346cadc9 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -365,9 +365,6 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 this.mlOwnershipChecker = mlOwnershipChecker;
 this.propertiesMap = new ConcurrentHashMap<>();
 this.inactiveLedgerRollOverTimeMs = 
config.getInactiveLedgerRollOverTimeMs();
-if (config.getManagedLedgerInterceptor() != null) {
-this.managedLedgerInterceptor = 
config.getManagedLedgerInterceptor();
-}
 this.minBacklogCursorsForCaching = 
config.getMinimumBacklogCursorsForCaching();
 this.minBacklogEntriesForCaching = 
config.getMinimumBacklogEntriesForCaching();
 this.maxBacklogBetweenCursorsForCaching = 
config.getMaxBacklogBetweenCursorsForCaching();



(pulsar) branch branch-3.2 updated: Revert "[fix][sec] Upgrade Debezium oracle connector version to avoid… (#22668)

2024-05-08 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new f7984d74d19 Revert "[fix][sec] Upgrade Debezium oracle connector 
version to avoid… (#22668)
f7984d74d19 is described below

commit f7984d74d19e50d31f7ea2abacef8430e4cf95bd
Author: Lari Hotari 
AuthorDate: Wed May 8 13:43:24 2024 +0300

Revert "[fix][sec] Upgrade Debezium oracle connector version to avoid… 
(#22668)
---
 pom.xml   | 1 -
 pulsar-io/debezium/oracle/pom.xml | 3 +--
 2 files changed, 1 insertion(+), 3 deletions(-)

diff --git a/pom.xml b/pom.xml
index bd231f92a4d..c817c8e1858 100644
--- a/pom.xml
+++ b/pom.xml
@@ -198,7 +198,6 @@ flexible messaging model and an intuitive client 
API.
 1.2.4
 8.12.1
 1.9.7.Final
-2.2.0.Final
 42.5.0
 8.0.30
 
diff --git a/pulsar-io/debezium/oracle/pom.xml 
b/pulsar-io/debezium/oracle/pom.xml
index 214e9c15c3a..1018d5f9573 100644
--- a/pulsar-io/debezium/oracle/pom.xml
+++ b/pulsar-io/debezium/oracle/pom.xml
@@ -48,8 +48,7 @@
 
   io.debezium
   debezium-connector-oracle
-  ${debezium.oracle.version}
-  runtime
+  ${debezium.version}
 
 
   



(pulsar) branch master updated: Revert "[fix][sec] Upgrade Debezium oracle connector version to avoid… (#22668)

2024-05-08 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new ca44b9bc7c4 Revert "[fix][sec] Upgrade Debezium oracle connector 
version to avoid… (#22668)
ca44b9bc7c4 is described below

commit ca44b9bc7c48eca59692744399872e1f14f4fe6f
Author: Lari Hotari 
AuthorDate: Wed May 8 13:43:24 2024 +0300

Revert "[fix][sec] Upgrade Debezium oracle connector version to avoid… 
(#22668)
---
 pom.xml   | 1 -
 pulsar-io/debezium/oracle/pom.xml | 3 +--
 2 files changed, 1 insertion(+), 3 deletions(-)

diff --git a/pom.xml b/pom.xml
index cec3b3c60db..c2f563eb60e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -199,7 +199,6 @@ flexible messaging model and an intuitive client 
API.
 1.2.4
 8.12.1
 1.9.7.Final
-2.2.0.Final
 42.5.0
 8.0.30
 
diff --git a/pulsar-io/debezium/oracle/pom.xml 
b/pulsar-io/debezium/oracle/pom.xml
index b22a5785dfb..c69640ecff7 100644
--- a/pulsar-io/debezium/oracle/pom.xml
+++ b/pulsar-io/debezium/oracle/pom.xml
@@ -48,8 +48,7 @@
 
   io.debezium
   debezium-connector-oracle
-  ${debezium.oracle.version}
-  runtime
+  ${debezium.version}
 
 
   



(pulsar) branch branch-3.2 updated: [fix] Fix Reader can be stuck from transaction aborted messages. (#22610)

2024-05-08 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 763f90f6dd3 [fix] Fix Reader can be stuck from transaction aborted 
messages. (#22610)
763f90f6dd3 is described below

commit 763f90f6dd317819d93990348bfc8519029c727d
Author: 道君 
AuthorDate: Tue May 7 20:45:16 2024 +0800

[fix] Fix Reader can be stuck from transaction aborted messages. (#22610)
---
 .../mledger/util/ManagedLedgerImplUtils.java   | 17 ++
 .../broker/service/persistent/PersistentTopic.java | 24 
 .../pulsar/broker/transaction/TransactionTest.java | 68 ++
 .../buffer/TopicTransactionBufferTest.java | 36 
 4 files changed, 110 insertions(+), 35 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java
index cd8671b0e62..01de115290a 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java
@@ -38,11 +38,7 @@ public class ManagedLedgerImplUtils {
 final 
Predicate predicate,
 final 
PositionImpl startPosition) {
 CompletableFuture future = new CompletableFuture<>();
-if (!ledger.isValidPosition(startPosition)) {
-future.complete(startPosition);
-} else {
-internalAsyncReverseFindPositionOneByOne(ledger, predicate, 
startPosition, future);
-}
+internalAsyncReverseFindPositionOneByOne(ledger, predicate, 
startPosition, future);
 return future;
 }
 
@@ -50,6 +46,10 @@ public class ManagedLedgerImplUtils {
  final 
Predicate predicate,
  final 
PositionImpl position,
  final 
CompletableFuture future) {
+if (!ledger.isValidPosition(position)) {
+future.complete(position);
+return;
+}
 ledger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() 
{
 @Override
 public void readEntryComplete(Entry entry, Object ctx) {
@@ -60,12 +60,7 @@ public class ManagedLedgerImplUtils {
 return;
 }
 PositionImpl previousPosition = 
ledger.getPreviousPosition((PositionImpl) position);
-if (!ledger.isValidPosition(previousPosition)) {
-future.complete(previousPosition);
-} else {
-internalAsyncReverseFindPositionOneByOne(ledger, 
predicate,
-ledger.getPreviousPosition((PositionImpl) 
position), future);
-}
+internalAsyncReverseFindPositionOneByOne(ledger, 
predicate, previousPosition, future);
 } catch (Exception e) {
 future.completeExceptionally(e);
 } finally {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 60eb700fc06..fa731b860f7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3561,18 +3561,18 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
 @Override
 public CompletableFuture getLastDispatchablePosition() {
-PositionImpl maxReadPosition = getMaxReadPosition();
-// If `maxReadPosition` is not equal to `LastPosition`. It means that 
there are uncommitted transactions.
-// so return `maxRedPosition` directly.
-if (maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0) {
-return CompletableFuture.completedFuture(maxReadPosition);
-} else {
-return 
ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, 
entry -> {
-MessageMetadata md = 
Commands.parseMessageMetadata(entry.getDataBuffer());
-// If a messages has marker will filter by 
AbstractBaseDispatcher.filterEntriesForConsumer
-return !Markers.isServerOnlyMarker(md);
-}, maxReadPosition);
-}
+return 
ManagedLedgerImplUtils.asyncGetLastVali

(pulsar) branch branch-3.2 updated: [fix][broker] avoid offload system topic (#22497)

2024-05-08 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new c3c17dee756 [fix][broker] avoid offload system topic (#22497)
c3c17dee756 is described below

commit c3c17dee7567d0a182affb1991e1e35098689d9b
Author: Qiang Zhao 
AuthorDate: Wed May 8 13:10:49 2024 +0800

[fix][broker] avoid offload system topic (#22497)

Co-authored-by: 道君 
---
 .../pulsar/broker/service/BrokerService.java   |  8 +-
 .../pulsar/broker/service/BrokerServiceTest.java   | 94 ++
 2 files changed, 101 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 032d4dd9369..60d56c0d908 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1906,7 +1906,13 @@ public class BrokerService implements Closeable {
 topicLevelOffloadPolicies,
 
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, 
policies.orElse(null)),
 getPulsar().getConfig().getProperties());
-if 
(NamespaceService.isSystemServiceNamespace(namespace.toString())) {
+if (NamespaceService.isSystemServiceNamespace(namespace.toString())
+|| SystemTopicNames.isSystemTopic(topicName)) {
+/*
+ Avoid setting broker internal system topics using off-loader 
because some of them are the
+ preconditions of other topics. The slow replying log speed 
will cause a delay in all the topic
+ loading.(timeout)
+ */
 
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
 } else  {
 if (topicLevelOffloadPolicies != null) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index fcf11fad708..ab0b8f813ea 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -67,12 +67,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.CloseableHttpClient;
@@ -111,6 +114,9 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.protocol.Commands;
@@ -1772,4 +1778,92 @@ public class BrokerServiceTest extends BrokerTestBase {
 fail("Unsubscribe failed");
 }
 }
+
+
+@Test
+public void testOffloadConfShouldNotAppliedForSystemTopic() throws 
PulsarAdminException {
+final String driver = "aws-s3";
+final String region = "test-region";
+final String bucket = "test-bucket";
+final String role = "test-role";
+final String roleSessionName = "test-role-session-name";
+final String credentialId = "test-credential-id";
+final String credentialSecret = "test-credential-secret";
+final String endPoint = "test-endpoint";
+final Integer maxBlockSizeInBytes = 5;
+final Integer readBufferSizeInBytes = 2;
+final Long offloadThresholdInBytes = 10L;
+final Long offloadThresholdInSeconds = 1000L;

(pulsar) branch branch-3.2 updated: [improve][ws] Add memory limit configuration for Pulsar client used in Websocket proxy (#22666)

2024-05-08 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new e32cdfb113b [improve][ws] Add memory limit configuration for Pulsar 
client used in Websocket proxy (#22666)
e32cdfb113b is described below

commit e32cdfb113b1693a2420e39ab40f985b59a44899
Author: Lari Hotari 
AuthorDate: Wed May 8 06:56:35 2024 +0300

[improve][ws] Add memory limit configuration for Pulsar client used in 
Websocket proxy (#22666)
---
 conf/broker.conf   | 3 +++
 conf/standalone.conf   | 3 +++
 conf/websocket.conf| 3 +++
 .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java   | 7 +++
 .../main/java/org/apache/pulsar/websocket/WebSocketService.java| 3 ++-
 .../pulsar/websocket/service/WebSocketProxyConfiguration.java  | 3 +++
 6 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index dd0f3e49e1f..e0ebbe3043a 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1539,6 +1539,9 @@ webSocketNumServiceThreads=
 # Number of connections per Broker in Pulsar Client used in WebSocket proxy
 webSocketConnectionsPerBroker=
 
+# Memory limit in MBs for direct memory in Pulsar Client used in WebSocket 
proxy
+webSocketPulsarClientMemoryLimitInMB=0
+
 # Time in milliseconds that idle WebSocket session times out
 webSocketSessionIdleTimeoutMillis=30
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 316143ab49d..5eb9fadcf19 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -967,6 +967,9 @@ webSocketNumIoThreads=8
 # Number of connections per Broker in Pulsar Client used in WebSocket proxy
 webSocketConnectionsPerBroker=8
 
+# Memory limit in MBs for direct memory in Pulsar Client used in WebSocket 
proxy
+webSocketPulsarClientMemoryLimitInMB=0
+
 # Time in milliseconds that idle WebSocket session times out
 webSocketSessionIdleTimeoutMillis=30
 
diff --git a/conf/websocket.conf b/conf/websocket.conf
index 9051f3b590c..91f7f7d4c23 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -71,6 +71,9 @@ numHttpServerThreads=
 # Number of connections per Broker in Pulsar Client used in WebSocket proxy
 webSocketConnectionsPerBroker=
 
+# Memory limit in MBs for direct memory in Pulsar Client used in WebSocket 
proxy
+webSocketPulsarClientMemoryLimitInMB=0
+
 # Time in milliseconds that idle WebSocket session times out
 webSocketSessionIdleTimeoutMillis=30
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index d1f2e9b585f..f53eb7e183f 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2892,6 +2892,13 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 doc = "Number of connections per Broker in Pulsar Client used in 
WebSocket proxy"
 )
 private int webSocketConnectionsPerBroker = 
Runtime.getRuntime().availableProcessors();
+
+@FieldContext(
+category = CATEGORY_WEBSOCKET,
+doc = "Memory limit in MBs for direct memory in Pulsar Client used 
in WebSocket proxy"
+)
+private int webSocketPulsarClientMemoryLimitInMB = 0;
+
 @FieldContext(
 category = CATEGORY_WEBSOCKET,
 doc = "Time in milliseconds that idle WebSocket session times out"
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 66b2a0075ec..889f4431cc3 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -195,7 +195,8 @@ public class WebSocketService implements Closeable {
 
 private PulsarClient createClientInstance(ClusterData clusterData) throws 
IOException {
 ClientBuilder clientBuilder = PulsarClient.builder() //
-.memoryLimit(0, SizeUnit.BYTES)
+
.memoryLimit(SizeUnit.MEGA_BYTES.toBytes(config.getWebSocketPulsarClientMemoryLimitInMB()),
+SizeUnit.BYTES)
 .statsInterval(0, TimeUnit.SECONDS) //
 .enableTls(config.isTlsEnabled()) //
 
.allowTlsInsecureConnection(config.isTlsAllowInsecureConnection()) //
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
 
b/pulsar-websocket/src/main/java/org/a

(pulsar) branch branch-3.2 updated: [fix][broker] Disable system topic message deduplication (#22582)

2024-05-07 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new b6f464ea0a1 [fix][broker] Disable system topic message deduplication 
(#22582)
b6f464ea0a1 is described below

commit b6f464ea0a17786fb857aac5111dcc394cba8f56
Author: Qiang Zhao 
AuthorDate: Wed May 8 10:53:53 2024 +0800

[fix][broker] Disable system topic message deduplication (#22582)
---
 .../org/apache/pulsar/broker/service/Topic.java| 10 +++
 .../service/persistent/MessageDeduplication.java   |  6 +---
 .../broker/service/persistent/PersistentTopic.java |  9 +++---
 .../broker/service/persistent/SystemTopic.java | 16 +++
 .../service/persistent/MessageDuplicationTest.java | 32 ++
 5 files changed, 64 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 1da8cfce4ee..c2eefcd18e3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -213,6 +213,16 @@ public interface Topic {
 
 void checkCursorsToCacheEntries();
 
+/**
+ * Indicate if the current topic enabled server side deduplication.
+ * This is a dynamic configuration, user may update it by namespace/topic 
policies.
+ *
+ * @return whether enabled server side deduplication
+ */
+default boolean isDeduplicationEnabled() {
+return false;
+}
+
 void checkDeduplicationSnapshot();
 
 void checkMessageExpiry();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index e508661364d..ab3b799093b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -217,7 +217,7 @@ public class MessageDeduplication {
  * returning a future to track the completion of the task
  */
 public CompletableFuture checkStatus() {
-boolean shouldBeEnabled = isDeduplicationEnabled();
+boolean shouldBeEnabled = topic.isDeduplicationEnabled();
 synchronized (this) {
 if (status == Status.Recovering || status == Status.Removing) {
 // If there's already a transition happening, check later for 
status
@@ -472,10 +472,6 @@ public class MessageDeduplication {
 }, null);
 }
 
-private boolean isDeduplicationEnabled() {
-return 
topic.getHierarchyTopicPolicies().getDeduplicationEnabled().get();
-}
-
 /**
  * Topic will call this method whenever a producer connects.
  */
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e99bd1425f4..60eb700fc06 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2146,10 +2146,6 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 return future;
 }
 
-public boolean isDeduplicationEnabled() {
-return messageDeduplication.isEnabled();
-}
-
 @Override
 public int getNumberOfConsumers() {
 int count = 0;
@@ -4080,6 +4076,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 return ledger.isMigrated();
 }
 
+public boolean isDeduplicationEnabled() {
+return getHierarchyTopicPolicies().getDeduplicationEnabled().get();
+}
+
 public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID 
txnID, String subName) {
 return 
this.subscriptions.get(subName).getTransactionInPendingAckStats(txnID);
 }
@@ -4104,4 +4104,5 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 public Optional getShadowSourceTopic() {
 return Optional.ofNullable(shadowSourceTopic);
 }
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
index 720ae3c5189..f2cec2138a3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
@@ -80,6 +80,22 @@ public class

(pulsar) branch branch-3.2 updated: [fix][fn]make sure the classloader for ContextImpl is `functionClassLoader` in different runtimes (#22501)

2024-05-07 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 2c92ae31722 [fix][fn]make sure the classloader for ContextImpl is 
`functionClassLoader` in different runtimes (#22501)
2c92ae31722 is described below

commit 2c92ae317222ac3e434a497aa458792f88debe75
Author: Rui Fu 
AuthorDate: Wed May 1 13:18:05 2024 +0800

[fix][fn]make sure the classloader for ContextImpl is `functionClassLoader` 
in different runtimes (#22501)
---
 .../apache/pulsar/functions/instance/JavaInstanceRunnable.java| 8 +++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 21f125d3497..f1b9af00f9d 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -283,13 +283,19 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
 Logger instanceLog = LoggerFactory.getILoggerFactory().getLogger(
 "function-" + instanceConfig.getFunctionDetails().getName());
 Thread currentThread = Thread.currentThread();
+ClassLoader clsLoader = currentThread.getContextClassLoader();
 Consumer fatalHandler = throwable -> {
 this.deathException = throwable;
 currentThread.interrupt();
 };
-return new ContextImpl(instanceConfig, instanceLog, client, 
secretsProvider,
+try {
+Thread.currentThread().setContextClassLoader(functionClassLoader);
+return new ContextImpl(instanceConfig, instanceLog, client, 
secretsProvider,
 collectorRegistry, metricsLabels, this.componentType, 
this.stats, stateManager,
 pulsarAdmin, clientBuilder, fatalHandler);
+} finally {
+Thread.currentThread().setContextClassLoader(clsLoader);
+}
 }
 
 public interface AsyncResultConsumer {



(pulsar) branch branch-3.2 updated: [fix][test] Clear MockedPulsarServiceBaseTest fields to prevent test runtime memory leak (#22659)

2024-05-07 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new ebbdad1e6ab [fix][test] Clear MockedPulsarServiceBaseTest fields to 
prevent test runtime memory leak (#22659)
ebbdad1e6ab is described below

commit ebbdad1e6abfe0233ddc715a49f1facccd6991f8
Author: Lari Hotari 
AuthorDate: Mon May 6 21:48:47 2024 +0300

[fix][test] Clear MockedPulsarServiceBaseTest fields to prevent test 
runtime memory leak (#22659)
---
 .../org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java | 7 +++
 1 file changed, 7 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index bd08ced1e03..248bd0e720e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -275,15 +275,22 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
 }
 if (brokerGateway != null) {
 brokerGateway.close();
+brokerGateway = null;
 }
 if (pulsarTestContext != null) {
 pulsarTestContext.close();
 pulsarTestContext = null;
 }
+
 resetConfig();
 callCloseables(closeables);
 closeables.clear();
 onCleanup();
+
+// clear fields to avoid test runtime memory leak, pulsarTestContext 
already handles closing of these instances
+pulsar = null;
+mockZooKeeper = null;
+mockZooKeeperGlobal = null;
 }
 
 protected void closeAdmin() {



(pulsar) branch branch-3.2 updated: [fix][sec] Upgrade Debezium oracle connector version to avoid CVE-2023-4586 (#22641)

2024-05-07 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 412a02a4157 [fix][sec] Upgrade Debezium oracle connector version to 
avoid CVE-2023-4586 (#22641)
412a02a4157 is described below

commit 412a02a4157a1ce0f16f8a2c6e119913352cba80
Author: Nikhil Erigila <60037808+nikhilerigil...@users.noreply.github.com>
AuthorDate: Sat May 4 02:00:28 2024 +0530

[fix][sec] Upgrade Debezium oracle connector version to avoid CVE-2023-4586 
(#22641)
---
 pom.xml   | 1 +
 pulsar-io/debezium/oracle/pom.xml | 3 ++-
 2 files changed, 3 insertions(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index c817c8e1858..bd231f92a4d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -198,6 +198,7 @@ flexible messaging model and an intuitive client 
API.
 1.2.4
 8.12.1
 1.9.7.Final
+2.2.0.Final
 42.5.0
 8.0.30
 
diff --git a/pulsar-io/debezium/oracle/pom.xml 
b/pulsar-io/debezium/oracle/pom.xml
index 1018d5f9573..214e9c15c3a 100644
--- a/pulsar-io/debezium/oracle/pom.xml
+++ b/pulsar-io/debezium/oracle/pom.xml
@@ -48,7 +48,8 @@
 
   io.debezium
   debezium-connector-oracle
-  ${debezium.version}
+  ${debezium.oracle.version}
+  runtime
 
 
   



(pulsar) branch branch-3.2 updated: [fix][sec] Upgrade elasticsearch-java version to avoid CVE-2023-4043 (#22640)

2024-05-07 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new e5af8ef5ad3 [fix][sec] Upgrade elasticsearch-java version to avoid 
CVE-2023-4043 (#22640)
e5af8ef5ad3 is described below

commit e5af8ef5ad34c94b7e18a311055fc25e766ea646
Author: Nikhil Erigila <60037808+nikhilerigil...@users.noreply.github.com>
AuthorDate: Sat May 4 02:51:48 2024 +0530

[fix][sec] Upgrade elasticsearch-java version to avoid CVE-2023-4043 
(#22640)
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 8c98cacfbb6..c817c8e1858 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,7 +196,7 @@ flexible messaging model and an intuitive client 
API.
 3.3.5
 2.4.10
 1.2.4
-8.5.2
+8.12.1
 1.9.7.Final
 42.5.0
 8.0.30



(pulsar) branch branch-3.2 updated: [fix][sec] Upgrade aws-sdk.version to avoid CVE-2024-21634 (#22633)

2024-05-07 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 40eb3914651 [fix][sec] Upgrade aws-sdk.version to avoid CVE-2024-21634 
(#22633)
40eb3914651 is described below

commit 40eb39146513c0f9f74142b5c12e9c53e74f318d
Author: Nikhil Erigila <60037808+nikhilerigil...@users.noreply.github.com>
AuthorDate: Thu May 2 18:58:02 2024 +0530

[fix][sec] Upgrade aws-sdk.version to avoid CVE-2024-21634 (#22633)
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 8aa8bf36c98..8c98cacfbb6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -182,7 +182,7 @@ flexible messaging model and an intuitive client 
API.
 4.5.0
 3.4.0
 5.18.0
-1.12.262
+1.12.638
 1.11.3
 2.10.10
 2.6.0



(pulsar) branch branch-3.2 updated: [fix] [client] Fix Consumer should return configured batch receive max messages (#22619)

2024-05-07 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new bc0c4a41b33 [fix] [client] Fix Consumer should return configured batch 
receive max messages (#22619)
bc0c4a41b33 is described below

commit bc0c4a41b33bae1344e19555bd5b8fd268997a5c
Author: Rajan Dhabalia 
AuthorDate: Thu May 2 16:57:49 2024 -0700

[fix] [client] Fix Consumer should return configured batch receive max 
messages (#22619)
---
 .../client/api/ConsumerBatchReceiveTest.java   |  8 +++---
 .../client/api/SimpleProducerConsumerTest.java | 29 ++
 .../pulsar/client/impl/ConsumerBuilderImpl.java|  4 +++
 3 files changed, 37 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
index d54b1c99e3e..974d25aad64 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
@@ -112,7 +112,7 @@ public class ConsumerBatchReceiveTest extends 
ProducerConsumerBase {
 // Number of message limitation exceed receiverQueue size
 {
 BatchReceivePolicy.builder()
-.maxNumMessages(70)
+.maxNumMessages(50)
 .build(), true, 50, false
 },
 // Number of message limitation exceed receiverQueue size and 
timeout limitation
@@ -147,7 +147,7 @@ public class ConsumerBatchReceiveTest extends 
ProducerConsumerBase {
 // Number of message limitation exceed receiverQueue size
 {
 BatchReceivePolicy.builder()
-.maxNumMessages(70)
+.maxNumMessages(50)
 .build(), false, 50, false
 },
 // Number of message limitation exceed receiverQueue size and 
timeout limitation
@@ -248,7 +248,7 @@ public class ConsumerBatchReceiveTest extends 
ProducerConsumerBase {
 // Number of message limitation exceed receiverQueue size
 {
 BatchReceivePolicy.builder()
-.maxNumMessages(70)
+.maxNumMessages(50)
 .build(), true, 50, true
 },
 // Number of message limitation exceed receiverQueue size and 
timeout limitation
@@ -283,7 +283,7 @@ public class ConsumerBatchReceiveTest extends 
ProducerConsumerBase {
 // Number of message limitation exceed receiverQueue size
 {
 BatchReceivePolicy.builder()
-.maxNumMessages(70)
+.maxNumMessages(50)
 .build(), false, 50, true
 },
 // Number of message limitation exceed receiverQueue size and 
timeout limitation
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 691f501777e..e9bb86fa33b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4825,6 +4825,35 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
 admin.topics().delete(topic, false);
 }
 
+/**
+ * It verifies that consumer receives configured number of messages into 
the batch.
+ * @throws Exception
+ */
+@Test
+public void testBatchReceiveWithMaxBatchSize() throws Exception {
+int maxBatchSize = 100;
+final int internalQueueSize = 10;
+final int maxBytes = 200;
+final int timeOutInSeconds = 900;
+final String topic = "persistent://my-property/my-ns/testBatchReceive";
+BatchReceivePolicy batchReceivePolicy = 
BatchReceivePolicy.builder().maxNumBytes(maxBytes)
+.maxNumMessages(maxBatchSize).timeout(timeOutInSeconds, 
TimeUnit.SECONDS).build();
+@Cleanup
+Consumer consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+.subscriptionName("my-subscriber-name")
+.receiverQueueSize(internalQueueSize)
+.batchReceivePolicy(batchReceivePolicy).subscribe();
+@Cleanup
+Producer producer = 
pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+
+final i

(pulsar) branch branch-3.2 updated: [fix][broker] Avoid being stuck when closing the broker with extensible load manager (#22573)

2024-05-07 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 892151bfcc2 [fix][broker] Avoid being stuck when closing the broker 
with extensible load manager (#22573)
892151bfcc2 is described below

commit 892151bfcc2b64b1ee0a9f05a182a488d1554ef5
Author: Yunze Xu 
AuthorDate: Fri Apr 26 21:30:15 2024 +0800

[fix][broker] Avoid being stuck when closing the broker with extensible 
load manager (#22573)
---
 .../org/apache/pulsar/broker/PulsarService.java|   3 +
 .../store/TableViewLoadDataStoreImpl.java  |   6 +-
 .../pulsar/broker/service/BrokerService.java   |  11 +++
 .../extensions/ExtensibleLoadManagerCloseTest.java | 107 +
 4 files changed, 122 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 499a981259f..bf266d44d83 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -431,6 +431,9 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 return closeFuture;
 }
 LOG.info("Closing PulsarService");
+if (brokerService != null) {
+brokerService.unloadNamespaceBundlesGracefully();
+}
 state = State.Closing;
 
 // close the service in reverse order v.s. in which they are 
started
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
index d916e917162..81cf33b4a55 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
@@ -161,12 +161,8 @@ public class TableViewLoadDataStoreImpl implements 
LoadDataStore {
 }
 
 private void validateProducer() {
-if (producer == null || !producer.isConnected()) {
+if (producer == null) {
 try {
-if (producer != null) {
-producer.close();
-}
-producer = null;
 startProducer();
 log.info("Restarted producer on {}", topic);
 } catch (Exception e) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ea55a43c7f0..032d4dd9369 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -288,6 +288,7 @@ public class BrokerService implements Closeable {
 private Set brokerEntryPayloadProcessors;
 
 private final TopicEventsDispatcher topicEventsDispatcher = new 
TopicEventsDispatcher();
+private volatile boolean unloaded = false;
 
 public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) 
throws Exception {
 this.pulsar = pulsar;
@@ -869,9 +870,13 @@ public class BrokerService implements Closeable {
 }
 
 public void unloadNamespaceBundlesGracefully(int maxConcurrentUnload, 
boolean closeWithoutWaitingClientDisconnect) {
+if (unloaded) {
+return;
+}
 try {
 log.info("Unloading namespace-bundles...");
 // make broker-node unavailable from the cluster
+long disableBrokerStartTime = System.nanoTime();
 if (pulsar.getLoadManager() != null && 
pulsar.getLoadManager().get() != null) {
 try {
 pulsar.getLoadManager().get().disableBroker();
@@ -880,6 +885,10 @@ public class BrokerService implements Closeable {
 // still continue and release bundle ownership as broker's 
registration node doesn't exist.
 }
 }
+double disableBrokerTimeSeconds =
+TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - 
disableBrokerStartTime))
+/ 1000.0;
+log.info("Disable broker in load manager completed in {} seconds", 
disableBrokerTimeSeconds);
 
 // unload all namespace-bundles gracefully
 long closeTopicsStartTime = System.nanoTime();
@@ -909,6 +918,8 @@ public class BrokerService implements Closeable {
 }
 } catch (Exception e) {
 

(pulsar) branch branch-3.2 updated: [fix][io] Fix es index creation (#22654)

2024-05-07 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new b86ebaa05d6 [fix][io] Fix es index creation (#22654)
b86ebaa05d6 is described below

commit b86ebaa05d62a8c79e3f8f2bc91e72c40cc23e4d
Author: Zixuan Liu 
AuthorDate: Mon May 6 20:35:51 2024 +0800

[fix][io] Fix es index creation (#22654)

Signed-off-by: Zixuan Liu 
---
 .../client/elastic/ElasticSearchJavaRestClient.java |  4 ++--
 .../pulsar/io/elasticsearch/ElasticSearchSinkTests.java | 13 +
 2 files changed, 15 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
index afda5ba0e74..133daa8cd6a 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
@@ -144,7 +144,7 @@ public class ElasticSearchJavaRestClient extends RestClient 
{
 public boolean deleteDocument(String index, String documentId) throws 
IOException {
 final DeleteRequest req = new
 DeleteRequest.Builder()
-.index(config.getIndexName())
+.index(index)
 .id(documentId)
 .build();
 
@@ -156,7 +156,7 @@ public class ElasticSearchJavaRestClient extends RestClient 
{
 public boolean indexDocument(String index, String documentId, String 
documentSource) throws IOException {
 final Map mapped = objectMapper.readValue(documentSource, Map.class);
 final IndexRequest indexRequest = new IndexRequest.Builder<>()
-.index(config.getIndexName())
+.index(index)
 .document(mapped)
 .id(documentId)
 .build();
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index 9a2cb4ab565..f1da6fd0c7e 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import co.elastic.clients.transport.ElasticsearchTransport;
 import com.fasterxml.jackson.core.JsonParseException;
@@ -43,6 +44,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericObject;
@@ -152,6 +154,7 @@ public abstract class ElasticSearchSinkTests extends 
ElasticSearchTestBase {
 });
 
 
when(mockRecord.getSchema()).thenAnswer((Answer>>) invocation -> kvSchema);
+when(mockRecord.getEventTime()).thenAnswer(invocation -> 
Optional.of(System.currentTimeMillis()));
 }
 
 @AfterMethod(alwaysRun = true)
@@ -209,6 +212,16 @@ public abstract class ElasticSearchSinkTests extends 
ElasticSearchTestBase {
 verify(mockRecord, times(100)).ack();
 }
 
+@Test
+public final void send1WithFormattedIndexTest() throws Exception {
+map.put("indexName", "test-formatted-index-%{+-MM-dd}");
+sink.open(map, mockSinkContext);
+send(1);
+verify(mockRecord, times(1)).ack();
+String value = getHitIdAtIndex("test-formatted-index-*", 0);
+assertTrue(StringUtils.isNotBlank(value));
+}
+
 @Test
 public final void sendNoSchemaTest() throws Exception {
 



(pulsar) branch master updated (7e88463d9a5 -> 816755429a3)

2024-05-07 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 7e88463d9a5 [fix] Fix Reader can be stuck from transaction aborted 
messages. (#22610)
 add 816755429a3 [improve][test] Clear fields in AuthZTest classes at 
cleanup (#22661)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/pulsar/broker/admin/AuthZTest.java  | 15 ++
 .../pulsar/broker/admin/NamespaceAuthZTest.java|  4 +++
 .../apache/pulsar/broker/admin/TopicAuthZTest.java | 32 +-
 .../broker/admin/TopicPoliciesAuthZTest.java   |  2 ++
 .../admin/TransactionAndSchemaAuthZTest.java   | 14 +++---
 .../pulsar/security/MockedPulsarStandalone.java|  3 ++
 6 files changed, 41 insertions(+), 29 deletions(-)



(pulsar) branch master updated: [improve][test] Add policy authentication test for namespace API (#22593)

2024-04-27 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 1bb9378b50a [improve][test] Add policy authentication test for 
namespace API (#22593)
1bb9378b50a is described below

commit 1bb9378b50aa891834b64cd39f55ae0e32a055bb
Author: Cong Zhao 
AuthorDate: Sun Apr 28 10:37:37 2024 +0800

[improve][test] Add policy authentication test for namespace API (#22593)
---
 .../pulsar/broker/admin/NamespaceAuthZTest.java| 1248 ++--
 1 file changed, 1140 insertions(+), 108 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
index 5358295b785..ec6a122f7df 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
@@ -20,9 +20,11 @@
 package org.apache.pulsar.broker.admin;
 
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.deleteNamespaceWithRetry;
+import static 
org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy.AutoUpdateDisabled;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Sets;
 import io.jsonwebtoken.Jwts;
 import java.io.File;
 import java.util.ArrayList;
@@ -32,6 +34,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
 import org.apache.commons.lang3.StringUtils;
@@ -44,17 +48,33 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.BundlesData;
+import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.EntryFilters;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.PublishRate;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import 
org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
 import org.apache.pulsar.packages.management.core.common.PackageMetadata;
 import org.apache.pulsar.security.MockedPulsarStandalone;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
@@ -72,7 +92,7 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
 
 private AuthorizationService authorizationService;
 
-private static final String TENANT_ADMIN_SUBJECT =  
UUID.randomUUID().toString();
+private static final String TENANT_ADMIN_SUBJECT = 
UUID.randomUUID().toString();
 private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
 .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
 
@@ -122,16 +142,46 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
 superUserAdmin.namespaces().createNamespace("public/default");
 }
 
-private void setAuthorizationOperationChecker(String role, 
NamespaceOperation operation) {
+private AtomicBoolean setAuthorizationOperationChecker(String role, 
NamespaceOperation operation) {
+AtomicBoolean execFlag = new AtomicBoolean(false);
 Mockito.doAnswer(invocationOnMock -> {
 String role_ = invocat

(pulsar) branch branch-3.2 updated: [improve][admin] Check if the topic existed before the permission operations (#22547)

2024-04-26 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new adac20a72ef [improve][admin] Check if the topic existed before the 
permission operations (#22547)
adac20a72ef is described below

commit adac20a72efdc2b1d9b16464ebffb569c41014e9
Author: Jiwei Guo 
AuthorDate: Fri Apr 26 14:05:30 2024 +0800

[improve][admin] Check if the topic existed before the permission 
operations (#22547)
---
 .../pulsar/broker/admin/impl/PersistentTopicsBase.java |  9 ++---
 .../pulsar/broker/admin/AdminApiSchemaWithAuthTest.java|  1 +
 .../java/org/apache/pulsar/broker/admin/AdminApiTest.java  | 12 
 .../apache/pulsar/broker/admin/PersistentTopicsTest.java   | 10 --
 .../org/apache/pulsar/broker/auth/AuthorizationTest.java   | 14 +-
 .../client/api/AuthenticatedProducerConsumerTest.java  |  4 +++-
 .../client/api/AuthorizationProducerConsumerTest.java  |  2 ++
 .../pulsar/websocket/proxy/ProxyAuthorizationTest.java |  8 +---
 8 files changed, 46 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index b0968f494ee..4b29452f98c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -207,6 +207,7 @@ public class PersistentTopicsBase extends AdminResource {
 protected CompletableFuture>> 
internalGetPermissionsOnTopic() {
 // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
 return validateAdminAccessForTenantAsync(namespaceName.getTenant())
+.thenCompose(__ -> internalCheckTopicExists(topicName))
 .thenCompose(__ -> 
getAuthorizationService().getPermissionsAsync(topicName));
 }
 
@@ -258,9 +259,10 @@ public class PersistentTopicsBase extends AdminResource {
Set actions) {
 // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
 validateAdminAccessForTenantAsync(namespaceName.getTenant())
-.thenCompose(__ -> 
validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
-grantPermissionsAsync(topicName, role, actions)
-.thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()
+.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+.thenCompose(__ -> internalCheckTopicExists(topicName))
+.thenCompose(unused1 -> grantPermissionsAsync(topicName, role, 
actions))
+.thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()))
 .exceptionally(ex -> {
 Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
 log.error("[{}] Failed to get permissions for topic {}", 
clientAppId(), topicName, realCause);
@@ -272,6 +274,7 @@ public class PersistentTopicsBase extends AdminResource {
 protected void internalRevokePermissionsOnTopic(AsyncResponse 
asyncResponse, String role) {
 // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
 validateAdminAccessForTenantAsync(namespaceName.getTenant())
+.thenCompose(__ -> internalCheckTopicExists(topicName))
 .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
true, false)
 .thenCompose(metadata -> {
 int numPartitions = metadata.partitions;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
index e89b4ff5e83..2dcb930fbe7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
@@ -120,6 +120,7 @@ public class AdminApiSchemaWithAuthTest extends 
MockedPulsarServiceBaseTest {
 .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : 
brokerUrlTls.toString())
 .authentication(AuthenticationToken.class.getName(), 
PRODUCE_TOKEN)
 .build();
+admin.topics().createNonPartitionedTopic(topicName);
 admin.topics().grantPermission(topicName, "consumer", 
EnumSet.of(AuthAction.consume));
 

(pulsar) branch master updated: [improve][admin] Check if the topic existed before the permission operations (#22547)

2024-04-26 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 69a600e86bb [improve][admin] Check if the topic existed before the 
permission operations (#22547)
69a600e86bb is described below

commit 69a600e86bb5110a118d836125411e941b83764d
Author: Jiwei Guo 
AuthorDate: Fri Apr 26 14:05:30 2024 +0800

[improve][admin] Check if the topic existed before the permission 
operations (#22547)
---
 .../pulsar/broker/admin/impl/PersistentTopicsBase.java  |  9 ++---
 .../pulsar/broker/admin/AdminApiSchemaWithAuthTest.java |  1 +
 .../java/org/apache/pulsar/broker/admin/AdminApiTest.java   | 12 
 .../apache/pulsar/broker/admin/PersistentTopicsTest.java| 10 --
 .../org/apache/pulsar/broker/auth/AuthorizationTest.java| 13 -
 .../client/api/AuthenticatedProducerConsumerTest.java   |  4 +++-
 .../client/api/AuthorizationProducerConsumerTest.java   |  2 ++
 .../pulsar/websocket/proxy/ProxyAuthorizationTest.java  |  8 +---
 8 files changed, 45 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 63ea987bb07..682f41dcdb6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -205,6 +205,7 @@ public class PersistentTopicsBase extends AdminResource {
 protected CompletableFuture>> 
internalGetPermissionsOnTopic() {
 // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
 return validateAdminAccessForTenantAsync(namespaceName.getTenant())
+.thenCompose(__ -> internalCheckTopicExists(topicName))
 .thenCompose(__ -> 
getAuthorizationService().getPermissionsAsync(topicName));
 }
 
@@ -256,9 +257,10 @@ public class PersistentTopicsBase extends AdminResource {
Set actions) {
 // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
 validateAdminAccessForTenantAsync(namespaceName.getTenant())
-.thenCompose(__ -> 
validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
-grantPermissionsAsync(topicName, role, actions)
-.thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()
+.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+.thenCompose(__ -> internalCheckTopicExists(topicName))
+.thenCompose(unused1 -> grantPermissionsAsync(topicName, role, 
actions))
+.thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()))
 .exceptionally(ex -> {
 Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
 log.error("[{}] Failed to get permissions for topic {}", 
clientAppId(), topicName, realCause);
@@ -270,6 +272,7 @@ public class PersistentTopicsBase extends AdminResource {
 protected void internalRevokePermissionsOnTopic(AsyncResponse 
asyncResponse, String role) {
 // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
 validateAdminAccessForTenantAsync(namespaceName.getTenant())
+.thenCompose(__ -> internalCheckTopicExists(topicName))
 .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
true, false)
 .thenCompose(metadata -> {
 int numPartitions = metadata.partitions;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
index e89b4ff5e83..2dcb930fbe7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
@@ -120,6 +120,7 @@ public class AdminApiSchemaWithAuthTest extends 
MockedPulsarServiceBaseTest {
 .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : 
brokerUrlTls.toString())
 .authentication(AuthenticationToken.class.getName(), 
PRODUCE_TOKEN)
 .build();
+admin.topics().createNonPartitionedTopic(topicName);
 admin.topics().grantPermission(topicName, "consumer", 
EnumSet.of(AuthAction.consume));
 

(pulsar) branch master updated: [fix][admin] Fix namespace admin api exception response (#22587)

2024-04-25 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new f25776d7fe6 [fix][admin] Fix namespace admin api exception response 
(#22587)
f25776d7fe6 is described below

commit f25776d7fe6812f11b17226995d989c5a2364920
Author: Cong Zhao 
AuthorDate: Fri Apr 26 09:18:27 2024 +0800

[fix][admin] Fix namespace admin api exception response (#22587)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  5 +-
 .../pulsar/broker/admin/NamespaceAuthZTest.java| 60 --
 2 files changed, 48 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index bbadc7bb331..5f2dccc3e9c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2019,7 +2019,7 @@ public abstract class NamespacesBase extends 
AdminResource {
 }
 
 protected void internalSetMaxSubscriptionsPerTopic(Integer 
maxSubscriptionsPerTopic){
-validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE);
+validateNamespacePolicyOperation(namespaceName, 
PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE);
 validatePoliciesReadOnlyAccess();
 if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) {
 throw new RestException(Status.PRECONDITION_FAILED,
@@ -2125,9 +2125,10 @@ public abstract class NamespacesBase extends 
AdminResource {
 f.complete(null);
 })
 .exceptionally(t -> {
+Throwable cause = FutureUtil.unwrapCompletionException(t);
 log.error("[{}] Failed to update offloadThresholdInSeconds 
configuration for namespace {}",
 clientAppId(), namespaceName, t);
-f.completeExceptionally(new RestException(t));
+f.completeExceptionally(new RestException(cause));
 return null;
 });
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
index d5a0468f340..5358295b785 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.broker.admin;
 
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.deleteNamespaceWithRetry;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -58,7 +59,6 @@ import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-admin")
@@ -72,8 +72,6 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
 
 private AuthorizationService authorizationService;
 
-private AuthorizationService orignalAuthorizationService;
-
 private static final String TENANT_ADMIN_SUBJECT =  
UUID.randomUUID().toString();
 private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
 .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
@@ -100,6 +98,9 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
 .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
 .build();
 this.pulsarClient = super.getPulsarService().getClient();
+this.authorizationService = 
Mockito.spy(getPulsarService().getBrokerService().getAuthorizationService());
+FieldUtils.writeField(getPulsarService().getBrokerService(), 
"authorizationService",
+authorizationService, true);
 }
 
 
@@ -115,19 +116,9 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
 close();
 }
 
-@BeforeMethod
-public void before() throws IllegalAccessException {
-orignalAuthorizationService = 
getPulsarService().getBrokerService().getAuthorizationService();
-authorizationService = Mockito.spy(orignalAuthorizationService);
-FieldUtils.writeField(getPulsarService().getBrokerService(), 
"authorizationService",
-authorizationService, true);
-}
-
 @AfterMethod
-public 

(pulsar) branch branch-3.0 updated: [improve][sec] Align some namespace level policy authorisation check (#21640)

2024-04-25 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 7fdfbf46f9b [improve][sec] Align some namespace level policy 
authorisation check (#21640)
7fdfbf46f9b is described below

commit 7fdfbf46f9b01e55cac270782b57008ec02eb5b2
Author: Qiang Zhao 
AuthorDate: Mon Dec 4 22:15:19 2023 +0800

[improve][sec] Align some namespace level policy authorisation check 
(#21640)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 30 +-
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  3 ++-
 2 files changed, 20 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 04ec944aab4..d3c5f681b6d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1204,7 +1204,8 @@ public abstract class NamespacesBase extends 
AdminResource {
 
 protected CompletableFuture internalSetPublishRateAsync(PublishRate 
maxPublishMessageRate) {
 log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), 
namespaceName, maxPublishMessageRate);
-return validateSuperUserAccessAsync().thenCompose(__ -> 
updatePoliciesAsync(namespaceName, policies -> {
+return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.RATE, PolicyOperation.WRITE)
+.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
 
policies.publishMaxMessageRate.put(pulsar().getConfiguration().getClusterName(),
 maxPublishMessageRate);
 log.info("[{}] Successfully updated the publish_max_message_rate 
for cluster on namespace {}",
 clientAppId(), namespaceName);
@@ -1233,7 +1234,8 @@ public abstract class NamespacesBase extends 
AdminResource {
 
 protected CompletableFuture internalRemovePublishRateAsync() {
 log.info("[{}] Remove namespace publish-rate {}/{}", clientAppId(), 
namespaceName, topicName);
-return validateSuperUserAccessAsync().thenCompose(__ -> 
updatePoliciesAsync(namespaceName, policies -> {
+return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.RATE, PolicyOperation.WRITE)
+.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
 if (policies.publishMaxMessageRate != null) {
 
policies.publishMaxMessageRate.remove(pulsar().getConfiguration().getClusterName());
 }
@@ -1253,7 +1255,8 @@ public abstract class NamespacesBase extends 
AdminResource {
 @SuppressWarnings("deprecation")
 protected CompletableFuture 
internalSetTopicDispatchRateAsync(DispatchRateImpl dispatchRate) {
 log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), 
namespaceName, dispatchRate);
-return validateSuperUserAccessAsync().thenCompose(__ -> 
updatePoliciesAsync(namespaceName, policies -> {
+return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.RATE, PolicyOperation.WRITE)
+.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
 
policies.topicDispatchRate.put(pulsar().getConfiguration().getClusterName(), 
dispatchRate);
 
policies.clusterDispatchRate.put(pulsar().getConfiguration().getClusterName(), 
dispatchRate);
 log.info("[{}] Successfully updated the dispatchRate for cluster 
on namespace {}", clientAppId(),
@@ -1263,7 +1266,8 @@ public abstract class NamespacesBase extends 
AdminResource {
 }
 
 protected CompletableFuture internalDeleteTopicDispatchRateAsync() {
-return validateSuperUserAccessAsync().thenCompose(__ -> 
updatePoliciesAsync(namespaceName, policies -> {
+return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.RATE, PolicyOperation.WRITE)
+.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
 
policies.topicDispatchRate.remove(pulsar().getConfiguration().getClusterName());
 
policies.clusterDispatchRate.remove(pulsar().getConfiguration().getClusterName());
 log.info("[{}] Successfully delete the dispatchRate for cluster on 
namespace {}", clientAppId(),
@@ -1280,7 +1284,7 @@ public abstract class NamespacesBase extends 
AdminResource {
 }
 
 protected CompletableFuture 
internalSetSubscriptionDispatchRateAsync(DispatchRateImpl dispatchRate) {
-return validateSuperUserAccessAsync()
+return validateNamespacePolicyOperationAsync(namespa

(pulsar) branch master updated: [improve][test] Add topic policy test for topic API (#22546)

2024-04-22 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 3a0f908e80d [improve][test] Add topic policy test for topic API 
(#22546)
3a0f908e80d is described below

commit 3a0f908e80d0863920a1258362fd782e95fe8f17
Author: Jiwei Guo 
AuthorDate: Mon Apr 22 19:47:03 2024 +0800

[improve][test] Add topic policy test for topic API (#22546)
---
 .../org/apache/pulsar/broker/admin/AuthZTest.java  |  113 ++
 .../apache/pulsar/broker/admin/TopicAuthZTest.java | 1121 ++--
 .../admin/TransactionAndSchemaAuthZTest.java   |  359 +++
 3 files changed, 1270 insertions(+), 323 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AuthZTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AuthZTest.java
new file mode 100644
index 000..a710a03970d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AuthZTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import io.jsonwebtoken.Jwts;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.security.MockedPulsarStandalone;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import static org.mockito.Mockito.doReturn;
+
+public class AuthZTest extends MockedPulsarStandalone {
+
+protected PulsarAdmin superUserAdmin;
+
+protected PulsarAdmin tenantManagerAdmin;
+
+protected AuthorizationService authorizationService;
+
+protected AuthorizationService orignalAuthorizationService;
+
+protected static final String TENANT_ADMIN_SUBJECT =  
UUID.randomUUID().toString();
+protected static final String TENANT_ADMIN_TOKEN = Jwts.builder()
+.claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
+
+@BeforeMethod(alwaysRun = true)
+public void before() throws IllegalAccessException {
+orignalAuthorizationService = 
getPulsarService().getBrokerService().getAuthorizationService();
+authorizationService = Mockito.spy(orignalAuthorizationService);
+FieldUtils.writeField(getPulsarService().getBrokerService(), 
"authorizationService",
+authorizationService, true);
+}
+
+@AfterMethod(alwaysRun = true)
+public void after() throws IllegalAccessException {
+FieldUtils.writeField(getPulsarService().getBrokerService(), 
"authorizationService",
+orignalAuthorizationService, true);
+}
+
+protected AtomicBoolean setAuthorizationTopicOperationChecker(String role, 
Object operation) {
+AtomicBoolean execFlag = new AtomicBoolean(false);
+if (operation instanceof TopicOperation) {
+Mockito.doAnswer(invocationOnMock -> {
+String role_ = invocationOnMock.getArgument(2);
+if (role.equals(role_)) {
+TopicOperation operation_ = 
invocationOnMock.getArgument(1);
+Assert.assertEquals(operation_, operation);
+}
+execFlag.set(true);
+return invocationOnMock.callRealMethod();
+
}).when(authorizationService).allowTopicOperationAsync(Mockito.any(), 
Mockito.any(), Mockito.any(),
+Mockito.any(), Mockito.any());
+} else if (operation instanceof NamespaceOperation) {
+doReturn(true)
+
.when(authorizationService).isValidOriginalPrincipal(Mockito.any(), 
Mockito.any(), Mockito.any());
+Mockito.doAnswer(invocationOnMock -> {
+ 

(pulsar) branch master updated: [improve][admin] Align the auth and check it at the first place for topic related API (#22507)

2024-04-17 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 8ca01cd42ed [improve][admin]  Align the auth and check it at the first 
place for topic related API (#22507)
8ca01cd42ed is described below

commit 8ca01cd42edfd4efd986f752f6f8538ea5bf4f94
Author: Jiwei Guo 
AuthorDate: Wed Apr 17 18:46:22 2024 +0800

[improve][admin]  Align the auth and check it at the first place for topic 
related API (#22507)
---
 .../broker/admin/impl/PersistentTopicsBase.java| 419 ++---
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  44 ++-
 .../apache/pulsar/broker/admin/TopicAuthZTest.java | 257 +++--
 3 files changed, 447 insertions(+), 273 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ab74b1e2bcc..1f8d0657190 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -128,8 +128,6 @@ import 
org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.PolicyName;
-import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -2727,14 +2725,14 @@ public class PersistentTopicsBase extends AdminResource 
{
 }
 
 protected CompletableFuture 
internalGetMessageIdByTimestampAsync(long timestamp, boolean authoritative) {
-CompletableFuture future;
-if (topicName.isGlobal()) {
-future = validateGlobalNamespaceOwnershipAsync(namespaceName);
-} else {
-future = CompletableFuture.completedFuture(null);
-}
-
+CompletableFuture future = 
validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
 return future.thenCompose(__ -> {
+if (topicName.isGlobal()) {
+return validateGlobalNamespaceOwnershipAsync(namespaceName);
+} else {
+return CompletableFuture.completedFuture(null);
+}
+}).thenCompose(__ -> {
 if (topicName.isPartitioned()) {
 return CompletableFuture.completedFuture(null);
 } else {
@@ -2748,7 +2746,6 @@ public class PersistentTopicsBase extends AdminResource {
 });
 }
 }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
-.thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.PEEK_MESSAGES))
 .thenCompose(__ -> getTopicReferenceAsync(topicName))
 .thenCompose(topic -> {
 if (!(topic instanceof PersistentTopic)) {
@@ -3158,65 +3155,56 @@ public class PersistentTopicsBase extends AdminResource 
{
 
 protected void internalGetBacklogSizeByMessageId(AsyncResponse 
asyncResponse,
  MessageIdImpl messageId, 
boolean authoritative) {
-CompletableFuture ret;
-// If the topic name is a partition name, no need to get partition 
topic metadata again
-if (!topicName.isPartitioned()) {
-ret = getPartitionedTopicMetadataAsync(topicName, authoritative, 
false)
-.thenCompose(topicMetadata -> {
-if (topicMetadata.partitions > 0) {
-log.warn("[{}] Not supported calculate backlog 
size operation on partitioned-topic {}",
-clientAppId(), topicName);
-asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED,
-"calculate backlog size is not allowed for 
partitioned-topic"));
-}
-return CompletableFuture.completedFuture(null);
-});
-} else {
-ret = CompletableFuture.completedFuture(null);
-}
-CompletableFuture future;
-if (topicName.isGlobal()) {
-future = ret.thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName));
-} else {
-future = ret;
-}
-future.thenAccept(__ -> val

(pulsar) branch master updated: [improve][broker] Optimize gzip compression for /metrics endpoint by sharing/caching compressed result (#22521)

2024-04-17 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 94f6c7ccd2b [improve][broker] Optimize gzip compression for /metrics 
endpoint by sharing/caching compressed result (#22521)
94f6c7ccd2b is described below

commit 94f6c7ccd2bf8bc261d45ab41f6c7f123359fa47
Author: Lari Hotari 
AuthorDate: Wed Apr 17 03:15:01 2024 -0700

[improve][broker] Optimize gzip compression for /metrics endpoint by 
sharing/caching compressed result (#22521)
---
 .../stats/prometheus/PrometheusMetricsServlet.java |   1 +
 .../apache/pulsar/broker/web/GzipHandlerUtil.java  |  21 +++
 .../pulsar/broker/web/GzipHandlerUtilTest.java |  36 +
 .../org/apache/pulsar/broker/PulsarService.java|   3 +-
 .../prometheus/PrometheusMetricsGenerator.java | 176 +++--
 .../prometheus/PulsarPrometheusMetricsServlet.java |  28 +++-
 .../apache/pulsar/PrometheusMetricsTestUtil.java   |   2 +-
 7 files changed, 253 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
index 8a41bed29d4..8685348174c 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -39,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class PrometheusMetricsServlet extends HttpServlet {
+public static final String DEFAULT_METRICS_PATH = "/metrics";
 private static final long serialVersionUID = 1L;
 static final int HTTP_STATUS_OK_200 = 200;
 static final int HTTP_STATUS_INTERNAL_SERVER_ERROR_500 = 500;
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java
index 37c9c05e5d5..9e980cecb79 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java
@@ -19,8 +19,10 @@
 package org.apache.pulsar.broker.web;
 
 import java.util.List;
+import org.eclipse.jetty.http.pathmap.PathSpecSet;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.handler.gzip.GzipHandler;
+import org.eclipse.jetty.util.IncludeExclude;
 
 public class GzipHandlerUtil {
 public static Handler wrapWithGzipHandler(Handler innerHandler, 
List gzipCompressionExcludedPaths) {
@@ -45,4 +47,23 @@ public class GzipHandlerUtil {
 && (gzipCompressionExcludedPaths.get(0).equals("^.*")
 || gzipCompressionExcludedPaths.get(0).equals("^.*$"));
 }
+
+/**
+ * Check if GZIP compression is enabled for the given endpoint.
+ * @param gzipCompressionExcludedPaths list of paths that should not be 
compressed
+ * @param endpoint the endpoint to check
+ * @return true if GZIP compression is enabled for the endpoint, false 
otherwise
+ */
+public static boolean isGzipCompressionEnabledForEndpoint(List 
gzipCompressionExcludedPaths,
+  String endpoint) 
{
+if (gzipCompressionExcludedPaths == null || 
gzipCompressionExcludedPaths.isEmpty()) {
+return true;
+}
+if (isGzipCompressionCompletelyDisabled(gzipCompressionExcludedPaths)) 
{
+return false;
+}
+IncludeExclude paths = new IncludeExclude<>(PathSpecSet.class);
+paths.exclude(gzipCompressionExcludedPaths.toArray(new String[0]));
+return paths.test(endpoint);
+}
 }
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/web/GzipHandlerUtilTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/web/GzipHandlerUtilTest.java
new file mode 100644
index 000..d6958695dec
--- /dev/null
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/web/GzipHandlerUtilTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ *

(pulsar) branch master updated: [improve][broker] Repeat the handleMetadataChanges callback when configurationMetadataStore equals localMetadataStore (#22519)

2024-04-17 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 1dd82a0affd [improve][broker] Repeat the handleMetadataChanges 
callback when configurationMetadataStore equals localMetadataStore (#22519)
1dd82a0affd is described below

commit 1dd82a0affd6ec3686fa85d444c354e9ce12
Author: hanmz 
AuthorDate: Wed Apr 17 18:14:38 2024 +0800

[improve][broker] Repeat the handleMetadataChanges callback when 
configurationMetadataStore equals localMetadataStore (#22519)
---
 .../src/main/java/org/apache/pulsar/broker/service/BrokerService.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2687532693a..249008bad91 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -357,7 +357,9 @@ public class BrokerService implements Closeable {
 this.entryFilterProvider = new 
EntryFilterProvider(pulsar.getConfiguration());
 
 
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
-
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
+if (pulsar.getConfigurationMetadataStore() != 
pulsar.getLocalMetadataStore()) {
+
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
+}
 
 this.inactivityMonitor = OrderedScheduler.newSchedulerBuilder()
 .name("pulsar-inactivity-monitor")



(pulsar) branch master updated (70b401b1de9 -> ffdfc0c4e08)

2024-04-16 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 70b401b1de9 [improve][fn] Expose `RuntimeFlags` as CLI option for 
Pulsar Functions and Connectors (#22514)
 add ffdfc0c4e08 [fix][test] SchemaMap in AutoConsumeSchema has been reused 
(#22500)

No new revisions were added by this update.

Summary of changes:
 .../client/api/SimpleProducerConsumerTest.java | 66 +-
 1 file changed, 38 insertions(+), 28 deletions(-)



(pulsar) branch master updated: [improve][fn] Expose `RuntimeFlags` as CLI option for Pulsar Functions and Connectors (#22514)

2024-04-16 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 70b401b1de9 [improve][fn] Expose `RuntimeFlags` as CLI option for 
Pulsar Functions and Connectors (#22514)
70b401b1de9 is described below

commit 70b401b1de9df685283140cff1f83252abc27045
Author: Rui Fu 
AuthorDate: Tue Apr 16 19:53:29 2024 +0800

[improve][fn] Expose `RuntimeFlags` as CLI option for Pulsar Functions and 
Connectors (#22514)
---
 .../test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java| 4 +++-
 .../src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java| 7 +++
 .../src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java| 6 ++
 .../src/main/java/org/apache/pulsar/admin/cli/CmdSources.java  | 6 ++
 4 files changed, 22 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index 4d906af9424..d3087b7fc87 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -165,7 +165,8 @@ public class CmdFunctionsTest {
 "--className", DummyFunction.class.getName(),
 "--dead-letter-topic", "test-dead-letter-topic",
 "--custom-runtime-options", "custom-runtime-options",
-"--user-config", "{\"key\": [\"value1\", \"value2\"]}"
+"--user-config", "{\"key\": [\"value1\", \"value2\"]}",
+"--runtime-flags", "--add-opens java.base/java.lang=ALL-UNNAMED"
 });
 
 CreateFunction creater = cmd.getCreater();
@@ -175,6 +176,7 @@ public class CmdFunctionsTest {
 assertEquals(Boolean.FALSE, creater.getAutoAck());
 assertEquals("test-dead-letter-topic", creater.getDeadLetterTopic());
 assertEquals("custom-runtime-options", 
creater.getCustomRuntimeOptions());
+assertEquals("--add-opens java.base/java.lang=ALL-UNNAMED", 
creater.getRuntimeFlags());
 
 verify(functions, times(1)).createFunction(any(FunctionConfig.class), 
anyString());
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 15b8fca0761..5e80c168d92 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -377,6 +377,9 @@ public class CmdFunctions extends CmdBase {
 @Option(names = "--dead-letter-topic",
 description = "The topic where messages that are not processed 
successfully are sent to #Java")
 protected String deadLetterTopic;
+@Option(names = "--runtime-flags", description = "Any flags that you 
want to pass to a runtime"
++ " (for process & Kubernetes runtime only).")
+protected String runtimeFlags;
 protected FunctionConfig functionConfig;
 protected String userCodeFile;
 
@@ -676,6 +679,10 @@ public class CmdFunctions extends CmdBase {
 userCodeFile = functionConfig.getGo();
 }
 
+if (null != runtimeFlags) {
+functionConfig.setRuntimeFlags(runtimeFlags);
+}
+
 // check if configs are valid
 validateFunctionConfigs(functionConfig);
 }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index f3172a49b01..be1cd0af960 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -404,6 +404,9 @@ public class CmdSinks extends CmdBase {
 protected String transformFunctionConfig;
 @Option(names = "--log-topic", description = "The topic to which the 
logs of a Pulsar Sink are produced")
 protected String logTopic;
+@Option(names = "--runtime-flags", description = "Any flags that you 
want to pass to a runtime"
++ " (for process & Kubernetes runtime only).")
+protected String runtimeFlags;
 
 protected SinkConfig sinkConfig;
 
@@ -602,6 +605,9 @@ public class CmdSinks extends CmdBase {
   

(pulsar) branch master updated: [improve][test] Add topic operation checker for topic API (#22468)

2024-04-15 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 9d72e6bd847 [improve][test] Add topic operation checker for topic API 
(#22468)
9d72e6bd847 is described below

commit 9d72e6bd847df85a7d18f1827274df96a446798f
Author: Cong Zhao 
AuthorDate: Mon Apr 15 16:15:59 2024 +0800

[improve][test] Add topic operation checker for topic API (#22468)
---
 .../apache/pulsar/broker/admin/TopicAuthZTest.java | 156 ++---
 1 file changed, 135 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
index d09bc0a3ffd..e6ff0ce2bb4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
@@ -20,9 +20,18 @@
 package org.apache.pulsar.broker.admin;
 
 import io.jsonwebtoken.Jwts;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
@@ -37,22 +46,21 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.security.MockedPulsarStandalone;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
 
 @Test(groups = "broker-admin")
 public class TopicAuthZTest extends MockedPulsarStandalone {
@@ -61,13 +69,17 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
 
 private PulsarAdmin tenantManagerAdmin;
 
+private AuthorizationService authorizationService;
+
+private AuthorizationService orignalAuthorizationService;
+
 private static final String TENANT_ADMIN_SUBJECT =  
UUID.randomUUID().toString();
 private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
 .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
 
 @SneakyThrows
 @BeforeClass(alwaysRun = true)
-public void before() {
+public void setup() {
 configureTokenAuthentication();
 configureDefaultAuthorization();
 enableTransaction();
@@ -99,7 +111,7 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
 
 @SneakyThrows
 @AfterClass(alwaysRun = true)
-public void after() {
+public void cleanup() {
 if (superUserAdmin != null) {
 superUserAdmin.close();
 }
@@ -109,6 +121,51 @@ public class TopicAuthZTest extends MockedPulsarStandalone 
{
 close();
 }
 
+@BeforeMethod
+public void before() throws IllegalAccessException {
+orignalAuthorizationService = 
getPulsarService().getBrokerService().getAuthorizationService();
+authorizationService = Mockito.spy(orignalAuthorizationService);
+FieldUtils.writeField(getPulsarService().getBrokerService(), 
"authorizationService",
+authorizationService, true);
+}
+
+@AfterMethod
+public void after() throws IllegalAccessException {
+FieldUtils.writeField(getPulsarService().getBrokerService(), 
"authorizationService",
+orignalAuthorizationService, true);
+}
+
+private AtomicBoolean setAuthorizationTopicOperationChecker(String role, 
Object operation) {
+AtomicBoolean execFlag = new AtomicBoolean(false);
+   

(pulsar) branch master updated: [fix][test] Flaky-test: testMessageExpiryWithTimestampNonRecoverableException and testIncorrectClientClock (#22489)

2024-04-14 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new d9a43dd2160 [fix][test] Flaky-test: 
testMessageExpiryWithTimestampNonRecoverableException and 
testIncorrectClientClock (#22489)
d9a43dd2160 is described below

commit d9a43dd21605930e16bb038095e36fceff3a4a40
Author: Baodi Shi 
AuthorDate: Mon Apr 15 13:55:34 2024 +0800

[fix][test] Flaky-test: 
testMessageExpiryWithTimestampNonRecoverableException and 
testIncorrectClientClock (#22489)
---
 .../service/PersistentMessageFinderTest.java   | 42 +++---
 1 file changed, 22 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 6965ac28068..0972c9098b5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.service;
 
-import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doAnswer;
@@ -383,7 +382,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
  *
  * @throws Exception
  */
-@Test(groups = "flaky")
+@Test
 void testMessageExpiryWithTimestampNonRecoverableException() throws 
Exception {
 
 final String ledgerAndCursorName = 
"testPersistentMessageExpiryWithNonRecoverableLedgers";
@@ -402,11 +401,15 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
 for (int i = 0; i < totalEntries; i++) {
 ledger.addEntry(createMessageWrittenToLedger("msg" + i));
 }
+Awaitility.await().untilAsserted(() ->
+assertEquals(ledger.getState(), 
ManagedLedgerImpl.State.LedgerOpened));
 
 List ledgers = ledger.getLedgersInfoAsList();
 LedgerInfo lastLedgerInfo = ledgers.get(ledgers.size() - 1);
-
-assertEquals(ledgers.size(), totalEntries / entriesPerLedger);
+// The `lastLedgerInfo` should be newly opened, and it does not 
contain any entries. 
+// Please refer to: https://github.com/apache/pulsar/pull/22034
+assertEquals(lastLedgerInfo.getEntries(), 0);
+assertEquals(ledgers.size(), totalEntries / entriesPerLedger + 1);
 
 // this will make sure that all entries should be deleted
 Thread.sleep(TimeUnit.SECONDS.toMillis(ttlSeconds));
@@ -420,19 +423,13 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
 when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
 
 PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
-Position previousMarkDelete = null;
-for (int i = 0; i < totalEntries; i++) {
-monitor.expireMessages(1);
-Position previousPos = previousMarkDelete;
-retryStrategically(
-(test) -> c1.getMarkDeletedPosition() != null && 
!c1.getMarkDeletedPosition().equals(previousPos),
-5, 100);
-previousMarkDelete = c1.getMarkDeletedPosition();
-}
-
-PositionImpl markDeletePosition = (PositionImpl) 
c1.getMarkDeletedPosition();
-assertEquals(lastLedgerInfo.getLedgerId(), 
markDeletePosition.getLedgerId());
-assertEquals(lastLedgerInfo.getEntries() - 1, 
markDeletePosition.getEntryId());
+assertTrue(monitor.expireMessages(ttlSeconds));
+Awaitility.await().untilAsserted(() -> {
+PositionImpl markDeletePosition = (PositionImpl) 
c1.getMarkDeletedPosition();
+// The markDeletePosition points to the last entry of the previous 
ledger in lastLedgerInfo.
+assertEquals(markDeletePosition.getLedgerId(), 
lastLedgerInfo.getLedgerId() - 1);
+assertEquals(markDeletePosition.getEntryId(), entriesPerLedger - 
1); 
+});
 
 c1.close();
 ledger.close();
@@ -440,20 +437,25 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
 
 }
 
-@Test(groups = "flaky")
+@Test
 public void testIncorrectClientClock() throws Exception {
 final String ledgerAndCursorName = "testIncorrectClientClock";
 int maxTTLSeconds = 1;
+int entriesNum = 10;
 ManagedLedgerConfig config = new ManagedLedgerConfig();
 config.setMaxEntriesPerLedger(

svn commit: r68514 - /release/pulsar/pulsar-client-cpp-3.4.0/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:28:45 2024
New Revision: 68514

Log:
cleanup

Removed:
release/pulsar/pulsar-client-cpp-3.4.0/



svn commit: r68517 - /release/pulsar/pulsar-client-cpp-3.1.1/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:29:42 2024
New Revision: 68517

Log:
cleanup

Removed:
release/pulsar/pulsar-client-cpp-3.1.1/



svn commit: r68516 - /release/pulsar/pulsar-client-cpp-3.1.0/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:29:27 2024
New Revision: 68516

Log:
cleanup

Removed:
release/pulsar/pulsar-client-cpp-3.1.0/



svn commit: r68515 - /release/pulsar/pulsar-client-cpp-3.4.1/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:28:58 2024
New Revision: 68515

Log:
cleanup

Removed:
release/pulsar/pulsar-client-cpp-3.4.1/



svn commit: r68513 - /release/pulsar/pulsar-client-go-0.11.0/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:28:14 2024
New Revision: 68513

Log:
cleanup

Removed:
release/pulsar/pulsar-client-go-0.11.0/



svn commit: r68512 - /release/pulsar/pulsar-dotpulsar-3.1.1/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:27:45 2024
New Revision: 68512

Log:
cleanup

Removed:
release/pulsar/pulsar-dotpulsar-3.1.1/



svn commit: r68511 - /release/pulsar/pulsar-dotpulsar-3.1.0/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:27:29 2024
New Revision: 68511

Log:
cleanup

Removed:
release/pulsar/pulsar-dotpulsar-3.1.0/



svn commit: r68510 - /release/pulsar/pulsar-client-go-0.12.0/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:26:17 2024
New Revision: 68510

Log:
cleanup

Removed:
release/pulsar/pulsar-client-go-0.12.0/



svn commit: r68509 - /release/pulsar/pulsar-client-reactive-0.5.2/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:25:44 2024
New Revision: 68509

Log:
cleanup

Removed:
release/pulsar/pulsar-client-reactive-0.5.2/



svn commit: r68508 - /release/pulsar/pulsar-client-reactive-0.5.1/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:25:31 2024
New Revision: 68508

Log:
cleanup

Removed:
release/pulsar/pulsar-client-reactive-0.5.1/



svn commit: r68507 - /release/pulsar/pulsar-client-reactive-0.5.0/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:25:17 2024
New Revision: 68507

Log:
cleanup

Removed:
release/pulsar/pulsar-client-reactive-0.5.0/



svn commit: r68506 - /release/pulsar/pulsar-3.2.1/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:24:19 2024
New Revision: 68506

Log:
cleanup

Removed:
release/pulsar/pulsar-3.2.1/



svn commit: r68505 - /release/pulsar/pulsar-3.2.0/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:23:56 2024
New Revision: 68505

Log:
cleanup

Removed:
release/pulsar/pulsar-3.2.0/



svn commit: r68502 - /release/pulsar/pulsar-3.1.0/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:22:31 2024
New Revision: 68502

Log:
cleanup

Removed:
release/pulsar/pulsar-3.1.0/



svn commit: r68504 - /release/pulsar/pulsar-3.1.2/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:23:19 2024
New Revision: 68504

Log:
cleanup

Removed:
release/pulsar/pulsar-3.1.2/



svn commit: r68503 - /release/pulsar/pulsar-3.1.1/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:22:58 2024
New Revision: 68503

Log:
cleanup

Removed:
release/pulsar/pulsar-3.1.1/



svn commit: r68501 - /release/pulsar/pulsar-3.0.3/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:22:04 2024
New Revision: 68501

Log:
cleanup

Removed:
release/pulsar/pulsar-3.0.3/



svn commit: r68500 - /release/pulsar/pulsar-3.0.2/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:21:51 2024
New Revision: 68500

Log:
cleanup

Removed:
release/pulsar/pulsar-3.0.2/



svn commit: r68499 - /release/pulsar/pulsar-3.0.1/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:21:23 2024
New Revision: 68499

Log:
cleanup

Removed:
release/pulsar/pulsar-3.0.1/



svn commit: r68498 - /release/pulsar/pulsar-3.0.0/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:21:08 2024
New Revision: 68498

Log:
cleanup

Removed:
release/pulsar/pulsar-3.0.0/



svn commit: r68496 - /release/pulsar/pulsar-2.11.3/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:20:06 2024
New Revision: 68496

Log:
cleanup

Removed:
release/pulsar/pulsar-2.11.3/



svn commit: r68497 - /release/pulsar/pulsar-2.9.4/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:20:37 2024
New Revision: 68497

Log:
cleanup

Removed:
release/pulsar/pulsar-2.9.4/



svn commit: r68493 - /release/pulsar/pulsar-2.11.0/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:19:17 2024
New Revision: 68493

Log:
cleanup

Removed:
release/pulsar/pulsar-2.11.0/



svn commit: r68494 - /release/pulsar/pulsar-2.11.1/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:19:42 2024
New Revision: 68494

Log:
cleanup

Removed:
release/pulsar/pulsar-2.11.1/



svn commit: r68495 - /release/pulsar/pulsar-2.11.2/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:19:53 2024
New Revision: 68495

Log:
cleanup

Removed:
release/pulsar/pulsar-2.11.2/



svn commit: r68490 - /release/pulsar/pulsar-2.10.3/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:18:19 2024
New Revision: 68490

Log:
cleanup

Removed:
release/pulsar/pulsar-2.10.3/



svn commit: r68492 - /release/pulsar/pulsar-2.10.5/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:18:56 2024
New Revision: 68492

Log:
cleanup

Removed:
release/pulsar/pulsar-2.10.5/



svn commit: r68491 - /release/pulsar/pulsar-2.10.4/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:18:39 2024
New Revision: 68491

Log:
cleanup

Removed:
release/pulsar/pulsar-2.10.4/



svn commit: r68489 - /release/pulsar/pulsar-2.10.2/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:17:31 2024
New Revision: 68489

Log:
cleanup

Removed:
release/pulsar/pulsar-2.10.2/



(pulsar) branch master updated: [fix][broker] Fix message drop record in producer stat (#22458)

2024-04-10 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new cea1a9ba9b5 [fix][broker] Fix message drop record in producer stat 
(#22458)
cea1a9ba9b5 is described below

commit cea1a9ba9b576bf43f0a45ff8d65369b0f2bbb36
Author: zhangqian <503837...@qq.com>
AuthorDate: Wed Apr 10 16:51:26 2024 +0800

[fix][broker] Fix message drop record in producer stat (#22458)

Co-authored-by: ceceezhang 
---
 .../src/main/java/org/apache/pulsar/broker/service/Producer.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 7e4459505a5..9cfde67802b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -749,7 +749,7 @@ public class Producer {
 }
 if (this.isNonPersistentTopic) {
 msgDrop.calculateRate();
-((NonPersistentPublisherStatsImpl) stats).msgDropRate = 
msgDrop.getRate();
+((NonPersistentPublisherStatsImpl) stats).msgDropRate = 
msgDrop.getValueRate();
 }
 }
 



(pulsar) branch master updated: [improve][admin] Add authorization test for schema and align auth for transaction (#22399)

2024-04-09 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 904ee2c [improve][admin] Add authorization test for schema and 
align auth for transaction (#22399)
904ee2c is described below

commit 904ee2c7adf9febddc585a699a1fdb724013
Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com>
AuthorDate: Tue Apr 9 22:43:35 2024 +0800

[improve][admin] Add authorization test for schema and align auth for 
transaction (#22399)
---
 .../apache/pulsar/broker/admin/TopicAuthZTest.java | 249 +
 .../pulsar/security/MockedPulsarStandalone.java|   4 +-
 2 files changed, 252 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
index 2e75b59ec85..d09bc0a3ffd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
@@ -20,19 +20,27 @@
 package org.apache.pulsar.broker.admin;
 
 import io.jsonwebtoken.Jwts;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.security.MockedPulsarStandalone;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -62,7 +70,9 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
 public void before() {
 configureTokenAuthentication();
 configureDefaultAuthorization();
+enableTransaction();
 start();
+createTransactionCoordinatorAssign(16);
 this.superUserAdmin =PulsarAdmin.builder()
 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
 .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
@@ -74,8 +84,18 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
 .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
 .build();
+
+superUserAdmin.tenants().createTenant("pulsar", tenantInfo);
+superUserAdmin.namespaces().createNamespace("pulsar/system");
 }
 
+protected void createTransactionCoordinatorAssign(int numPartitionsOfTC) 
throws MetadataStoreException {
+getPulsarService().getPulsarResources()
+.getNamespaceResources()
+.getPartitionedTopicResources()
+
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+new PartitionedTopicMetadata(numPartitionsOfTC));
+}
 
 @SneakyThrows
 @AfterClass(alwaysRun = true)
@@ -1086,6 +1106,235 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
 deleteTopic(topic, false);
 }
 
+public enum OperationAuthType {
+Lookup,
+Produce,
+Consume,
+AdminOrSuperUser,
+NOAuth
+}
+
+private final String testTopic = "persistent://public/default/" + 
UUID.randomUUID().toString();
+@FunctionalInterface
+public interface ThrowingBiConsumer {
+void accept(T t) throws PulsarAdminException;
+}
+
+@DataProvider(name = "authFunction")
+public Object[][] authFunction () throws Exception {
+String sub = "my-sub";
+createTopic(testTopic, false);
+@Cleanup final PulsarClient pulsarClient = PulsarClient.builder()
+.serviceUrl(getPulsarService().getBrokerServiceUrl())
+.authentication(new AuthenticationToken(SUPER_USER_TOKEN))
+.enableTransaction(true)
+.build();
+@Cleanup final Producer producer = 

(pulsar) branch master updated: [improve][test] Add operation authentication test for namespace API (#22398)

2024-04-09 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 6de711d4008 [improve][test] Add operation authentication test for 
namespace API (#22398)
6de711d4008 is described below

commit 6de711d4008338a875c5c145e856c90dcb041f8f
Author: Cong Zhao 
AuthorDate: Tue Apr 9 16:38:18 2024 +0800

[improve][test] Add operation authentication test for namespace API (#22398)
---
 .../pulsar/broker/admin/NamespaceAuthZTest.java| 882 -
 1 file changed, 875 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
index ce0b925614c..d5a0468f340 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
@@ -19,23 +19,47 @@
 
 package org.apache.pulsar.broker.admin;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 import io.jsonwebtoken.Jwts;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
+import org.apache.pulsar.common.policies.data.BundlesData;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import 
org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
+import org.apache.pulsar.packages.management.core.common.PackageMetadata;
 import org.apache.pulsar.security.MockedPulsarStandalone;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
 
 @Test(groups = "broker-admin")
 public class NamespaceAuthZTest extends MockedPulsarStandalone {
@@ -44,17 +68,27 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
 
 private PulsarAdmin tenantManagerAdmin;
 
+private PulsarClient pulsarClient;
+
+private AuthorizationService authorizationService;
+
+private AuthorizationService orignalAuthorizationService;
+
 private static final String TENANT_ADMIN_SUBJECT =  
UUID.randomUUID().toString();
 private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
 .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
 
 @SneakyThrows
 @BeforeClass
-public void before() {
+public void setup() {
+getServiceConfiguration().setEnablePackagesManagement(true);
+
getServiceConfiguration().setPackagesManagementStorageProvider(MockedPackagesStorageProvider.class.getName());
+getServiceConfiguration().setDefaultNumberOfNamespaceBundles(1);
+getServiceConfiguration().setForceDeleteNamespaceAllowed(true);
 configureTokenAuthentication();
 configureDefaultAuthorization();
 start();
-this.superUserAdmin =PulsarAdmin.builder()
+this.superUserAdmin = PulsarAdmin.builder()
 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
 .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
 .build();
@@ -65,12 +99,13 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
 .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
 .build();
+this.pulsarClient = super.getPulsarService().getClient();
 }
 
 
 @SneakyThrows
 @AfterCl

(pulsar) branch branch-3.0 updated: [fix][ml] No rollover inactive ledgers when metadata service invalid (#22284)

2024-04-08 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new b9bf0a83b5c [fix][ml] No rollover inactive ledgers when metadata 
service invalid (#22284)
b9bf0a83b5c is described below

commit b9bf0a83b5ccf89e036cf77a1c4cb764acf59bba
Author: houxiaoyu 
AuthorDate: Sat Mar 30 21:38:55 2024 +0800

[fix][ml] No rollover inactive ledgers when metadata service invalid 
(#22284)

### Motivation

We should not rollover inactive ledgers when metadata service is invailable.

### Modifications

Checking metadata service is vailable when schedule 
`checkInactiveLedgerAndRollOver`
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  7 ---
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 24 ++
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index da9b1fde27a..8a7b376a7f5 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -4464,9 +4464,10 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
 @Override
 public boolean checkInactiveLedgerAndRollOver() {
-long currentTimeMs = System.currentTimeMillis();
-if (currentLedgerEntries > 0 && inactiveLedgerRollOverTimeMs > 0 && 
currentTimeMs > (lastAddEntryTimeMs
-+ inactiveLedgerRollOverTimeMs)) {
+if (factory.isMetadataServiceAvailable()
+&& currentLedgerEntries > 0
+&& inactiveLedgerRollOverTimeMs > 0
+&& System.currentTimeMillis() > (lastAddEntryTimeMs + 
inactiveLedgerRollOverTimeMs)) {
 log.info("[{}] Closing inactive ledger, last-add entry {}", name, 
lastAddEntryTimeMs);
 if (STATE_UPDATER.compareAndSet(this, State.LedgerOpened, 
State.ClosingLedger)) {
 LedgerHandle currentLedger = this.currentLedger;
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index a08b51cf29f..f918ffdc755 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3922,6 +3922,30 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
 ledger.close();
 }
 
+@Test
+public void testDontRollOverInactiveLedgersWhenMetadataServiceInvalid() 
throws Exception {
+int inactiveLedgerRollOverTimeMs = 5;
+@Cleanup("shutdown")
+ManagedLedgerFactoryImpl factory = spy(new 
ManagedLedgerFactoryImpl(metadataStore, bkc));
+// mock metadata service invalid
+when(factory.isMetadataServiceAvailable()).thenReturn(false);
+ManagedLedgerConfig config = new ManagedLedgerConfig();
+config.setInactiveLedgerRollOverTime(inactiveLedgerRollOverTimeMs, 
TimeUnit.MILLISECONDS);
+ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("rollover_inactive", config);
+
+long ledgerId = ledger.currentLedger.getId();
+
+Thread.sleep(inactiveLedgerRollOverTimeMs * 5);
+ledger.checkInactiveLedgerAndRollOver();
+
+Thread.sleep(inactiveLedgerRollOverTimeMs * 5);
+ledger.checkInactiveLedgerAndRollOver();
+
+assertEquals(ledger.currentLedger.getId(), ledgerId);
+
+ledger.close();
+}
+
 @Test
 public void testOffloadTaskCancelled() throws Exception {
 ManagedLedgerFactory factory = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);



(pulsar) branch branch-3.0 updated: [improve][io]: Add validation for JDBC sink not supporting primitive schema (#22376)

2024-04-08 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new a503efc826e [improve][io]: Add validation for JDBC sink not supporting 
primitive schema (#22376)
a503efc826e is described below

commit a503efc826e60b8e26f9792aeb45223374b8f4ca
Author: Baodi Shi 
AuthorDate: Fri Mar 29 08:33:27 2024 +0800

[improve][io]: Add validation for JDBC sink not supporting primitive schema 
(#22376)
---
 pulsar-io/jdbc/core/pom.xml|  7 +
 .../pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java |  5 
 .../pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java | 25 +
 .../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java  | 31 +++---
 4 files changed, 59 insertions(+), 9 deletions(-)

diff --git a/pulsar-io/jdbc/core/pom.xml b/pulsar-io/jdbc/core/pom.xml
index 3f44a062fb9..0322f6e11f1 100644
--- a/pulsar-io/jdbc/core/pom.xml
+++ b/pulsar-io/jdbc/core/pom.xml
@@ -71,6 +71,13 @@
   provided
 
 
+
+  ${project.groupId}
+  pulsar-client-original
+  ${project.version}
+  test
+
+
   
 
 
\ No newline at end of file
diff --git 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
index 36c36740919..3655688c0f3 100644
--- 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
+++ 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.KeyValueSchema;
 import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
 
@@ -137,6 +138,10 @@ public abstract class BaseJdbcAutoSchemaSink extends 
JdbcAbstractSink data.get(k);
 } else {
+SchemaType schemaType = 
message.getSchema().getSchemaInfo().getType();
+if (schemaType.isPrimitive()) {
+throw new UnsupportedOperationException("Primitive schema is 
not supported: " + schemaType);
+}
 recordValueGetter = (key) -> ((GenericRecord) 
record).getField(key);
 }
 String action = message.getProperties().get(ACTION_PROPERTY);
diff --git 
a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
 
b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
index b15eb832242..c088dd3c42c 100644
--- 
a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
+++ 
b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
@@ -22,6 +22,10 @@ import java.util.function.Function;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.util.Utf8;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.functions.api.Record;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -143,5 +147,26 @@ public class BaseJdbcAutoSchemaSinkTest {
 return consumer.apply(record).endRecord().getFields().get(0).schema();
 }
 
+@Test(expectedExceptions = UnsupportedOperationException.class,
+expectedExceptionsMessageRegExp = "Primitive schema is not 
supported.*")
+@SuppressWarnings("unchecked")
+public void testNotSupportPrimitiveSchema() {
+BaseJdbcAutoSchemaSink baseJdbcAutoSchemaSink = new 
BaseJdbcAutoSchemaSink() {};
+AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+
autoConsumeSchema.setSchema(org.apache.pulsar.client.api.Schema.STRING);
+Record record = new Record() {
+@Override
+public org.apache.pulsar.client.api.Schema 
getSchema() {
+return autoConsumeSchema;
+}
+
+@Override
+public GenericRecord getValue() {
+return null;
+}
+};
+baseJdbcAutoSchemaSink.createMutation((Record) record);
+}
+
 
 }
\ No newline at end of file
diff --git 
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
 
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
index d9ed4cbd442..ca01615bef1 100644
--- 
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
+++ 
b/pulsar-io/jdb

(pulsar) branch branch-3.0 updated: [admin][broker] Fix force delete subscription not working (#22423)

2024-04-08 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 110add00c1c [admin][broker] Fix force delete subscription not working 
(#22423)
110add00c1c is described below

commit 110add00c1cdc59542e7ef906cd5f4409e63dc04
Author: 道君 
AuthorDate: Thu Apr 4 23:08:45 2024 +0800

[admin][broker] Fix force delete subscription not working (#22423)
---
 .../broker/admin/impl/PersistentTopicsBase.java|  5 ++--
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 30 ++
 2 files changed, 32 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 00c2eed2763..602cd47e595 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1656,7 +1656,7 @@ public class PersistentTopicsBase extends AdminResource {
 for (int i = 0; i < partitionMetadata.partitions; i++) 
{
 TopicName topicNamePartition = 
topicName.getPartition(i);
 futures.add(adminClient.topics()
-
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, false));
+
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, force));
 }
 
 return FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
@@ -1675,8 +1675,7 @@ public class PersistentTopicsBase extends AdminResource {
 return null;
 });
 }
-return 
internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative,
-force);
+return 
internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative, 
force);
 });
 }
 });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 59c3dbf6ff3..d7ffa656bdb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -77,11 +77,13 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Topics;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -1696,6 +1698,34 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
 assertThrows(PulsarAdminException.NotFoundException.class, () -> 
admin.topics().createMissedPartitions(topicName));
 }
 
+@Test
+public void testForceDeleteSubscription() throws Exception {
+try {
+pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);
+String topicName = "persistent://" + testTenant + "/" + 
testNamespaceLocal + "/testForceDeleteSubscription";
+String subName = "sub1";
+admin.topics().createNonPartitionedTopic(topicName);
+admin.topics().createSubscription(topicName, subName, 
MessageId.latest);
+
+@Cleanup
+Consumer c0 = pulsarClient.newConsumer(Schema.STRING)
+.topic(topicName)
+.subscriptionName(subName)
+.subscriptionType(SubscriptionType.Shared)
+.subscribe();
+@Cleanup
+Consumer c1 = pulsarClient.newConsumer(Schema.STRING)
+.topic(topicName)
+.subscriptionName(subName)
+.subscriptionType(SubscriptionType.Shared)
+.subscribe();
+
+admin.topics().deleteSubscription(topicName, subName, true);
+} finally {
+pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true)

(pulsar) branch branch-3.0 updated: [fix][broker] Fix consumer stops receiving messages when with large backlogs processing (#22454)

2024-04-08 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 40329ee3615 [fix][broker] Fix consumer stops receiving messages when 
with large backlogs processing (#22454)
40329ee3615 is described below

commit 40329ee3615ebeff6eaa0dca2d454d6389fa6f43
Author: Jiwei Guo 
AuthorDate: Mon Apr 8 18:22:05 2024 +0800

[fix][broker] Fix consumer stops receiving messages when with large 
backlogs processing (#22454)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  5 ++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  9 +---
 .../service/persistent/PersistentSubscription.java |  4 +-
 .../service/persistent/PersistentTopicTest.java| 56 +-
 4 files changed, 65 insertions(+), 9 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 995f980953b..a8e2a6931af 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -985,6 +985,11 @@ public class ManagedCursorImpl implements ManagedCursor {
 log.debug("[{}] [{}] Re-trying the read at position {}", 
ledger.getName(), name, op.readPosition);
 }
 
+if (isClosed()) {
+callback.readEntriesFailed(new 
CursorAlreadyClosedException("Cursor was already closed"), ctx);
+return;
+}
+
 if (!hasMoreEntries()) {
 if (log.isDebugEnabled()) {
 log.debug("[{}] [{}] Still no entries available. Register 
for notification", ledger.getName(),
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 73dfc86e1ad..da9b1fde27a 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1033,6 +1033,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 + consumerName), ctx);
 return;
 } else if (!cursor.isDurable()) {
+cursor.setState(ManagedCursorImpl.State.Closed);
 cursors.removeCursor(consumerName);
 deactivateCursorByName(consumerName);
 callback.deleteCursorComplete(ctx);
@@ -3805,13 +3806,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 }
 
 public void addWaitingCursor(ManagedCursorImpl cursor) {
-if (cursor instanceof NonDurableCursorImpl) {
-if (cursor.isActive()) {
-this.waitingCursors.add(cursor);
-}
-} else {
-this.waitingCursors.add(cursor);
-}
+this.waitingCursors.add(cursor);
 }
 
 public boolean isCursorActive(ManagedCursor cursor) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 949dc398b32..f745f95b3dc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -320,7 +320,6 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
 
 if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
 deactivateCursor();
-topic.getManagedLedger().removeWaitingCursor(cursor);
 
 if (!cursor.isDurable()) {
 // If cursor is not durable, we need to clean up the 
subscription as well
@@ -349,11 +348,14 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
 if (!isResetCursor) {
 try {
 
topic.getManagedLedger().deleteCursor(cursor.getName());
+
topic.getManagedLedger().removeWaitingCursor(cursor);
 } catch (InterruptedException | ManagedLedgerException 
e) {
 log.warn("[{}] [{}] Failed to remove non durable 
cursor", topic.getName(), subName, e);
 }
 }
 });
+} else {
+topic.getManagedLedger().removeWaitingCursor(cursor);
 }
 }
 
diff --git 
a/puls

(pulsar) branch branch-3.2 updated: [admin][broker] Fix force delete subscription not working (#22423)

2024-04-08 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new ed58028e0bd [admin][broker] Fix force delete subscription not working 
(#22423)
ed58028e0bd is described below

commit ed58028e0bd0939b616d29ea31558fedcbc563d7
Author: 道君 
AuthorDate: Thu Apr 4 23:08:45 2024 +0800

[admin][broker] Fix force delete subscription not working (#22423)
---
 .../broker/admin/impl/PersistentTopicsBase.java|  5 ++--
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 30 ++
 2 files changed, 32 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 3ef8773..318e2bc2cde 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1566,7 +1566,7 @@ public class PersistentTopicsBase extends AdminResource {
 for (int i = 0; i < partitionMetadata.partitions; i++) 
{
 TopicName topicNamePartition = 
topicName.getPartition(i);
 futures.add(adminClient.topics()
-
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, false));
+
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, force));
 }
 
 return FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
@@ -1585,8 +1585,7 @@ public class PersistentTopicsBase extends AdminResource {
 return null;
 });
 }
-return 
internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative,
-force);
+return 
internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative, 
force);
 });
 }
 });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 6641a5805c0..8e1375303ce 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -79,11 +79,13 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Topics;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -1793,6 +1795,34 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
 assertThrows(PulsarAdminException.NotFoundException.class, () -> 
admin.topics().createMissedPartitions(topicName));
 }
 
+@Test
+public void testForceDeleteSubscription() throws Exception {
+try {
+pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);
+String topicName = "persistent://" + testTenant + "/" + 
testNamespaceLocal + "/testForceDeleteSubscription";
+String subName = "sub1";
+admin.topics().createNonPartitionedTopic(topicName);
+admin.topics().createSubscription(topicName, subName, 
MessageId.latest);
+
+@Cleanup
+Consumer c0 = pulsarClient.newConsumer(Schema.STRING)
+.topic(topicName)
+.subscriptionName(subName)
+.subscriptionType(SubscriptionType.Shared)
+.subscribe();
+@Cleanup
+Consumer c1 = pulsarClient.newConsumer(Schema.STRING)
+.topic(topicName)
+.subscriptionName(subName)
+.subscriptionType(SubscriptionType.Shared)
+.subscribe();
+
+admin.topics().deleteSubscription(topicName, subName, true);
+} finally {
+pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true)

(pulsar) 01/02: [fix][broker] Fix consumer stops receiving messages when with large backlogs processing (#22454)

2024-04-08 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7259fd0b26a246340262242a792dd57b35af2970
Author: Jiwei Guo 
AuthorDate: Mon Apr 8 18:22:05 2024 +0800

[fix][broker] Fix consumer stops receiving messages when with large 
backlogs processing (#22454)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  5 ++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  9 +---
 .../service/persistent/PersistentSubscription.java |  4 +-
 .../service/persistent/PersistentTopicTest.java| 56 +-
 4 files changed, 65 insertions(+), 9 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index dbdde7cf707..9bbcda327f0 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -985,6 +985,11 @@ public class ManagedCursorImpl implements ManagedCursor {
 log.debug("[{}] [{}] Re-trying the read at position {}", 
ledger.getName(), name, op.readPosition);
 }
 
+if (isClosed()) {
+callback.readEntriesFailed(new 
CursorAlreadyClosedException("Cursor was already closed"), ctx);
+return;
+}
+
 if (!hasMoreEntries()) {
 if (log.isDebugEnabled()) {
 log.debug("[{}] [{}] Still no entries available. Register 
for notification", ledger.getName(),
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 3d66bc8d6c0..426ac8df218 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1032,6 +1032,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 + consumerName), ctx);
 return;
 } else if (!cursor.isDurable()) {
+cursor.setState(ManagedCursorImpl.State.Closed);
 cursors.removeCursor(consumerName);
 deactivateCursorByName(consumerName);
 callback.deleteCursorComplete(ctx);
@@ -3804,13 +3805,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 }
 
 public void addWaitingCursor(ManagedCursorImpl cursor) {
-if (cursor instanceof NonDurableCursorImpl) {
-if (cursor.isActive()) {
-this.waitingCursors.add(cursor);
-}
-} else {
-this.waitingCursors.add(cursor);
-}
+this.waitingCursors.add(cursor);
 }
 
 public boolean isCursorActive(ManagedCursor cursor) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 3bedf497f87..fdb977aa366 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -308,7 +308,6 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
 
 if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
 deactivateCursor();
-topic.getManagedLedger().removeWaitingCursor(cursor);
 
 if (!cursor.isDurable()) {
 // If cursor is not durable, we need to clean up the 
subscription as well. No need to check for active
@@ -338,11 +337,14 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
 if (!isResetCursor) {
 try {
 
topic.getManagedLedger().deleteCursor(cursor.getName());
+
topic.getManagedLedger().removeWaitingCursor(cursor);
 } catch (InterruptedException | ManagedLedgerException 
e) {
 log.warn("[{}] [{}] Failed to remove non durable 
cursor", topic.getName(), subName, e);
 }
 }
 });
+} else {
+topic.getManagedLedger().removeWaitingCursor(cursor);
 }
 }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 618f

(pulsar) branch branch-3.2 updated (4a5d14ce683 -> de5a2ebd609)

2024-04-08 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a change to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 4a5d14ce683 [improve][misc] Specify valid home dir for the default 
user in the Ubuntu based docker image (#22446)
 new 7259fd0b26a [fix][broker] Fix consumer stops receiving messages when 
with large backlogs processing (#22454)
 new de5a2ebd609 [improve][build] Upgrade Lombok to 1.18.32 for Java 22 
support (#22425)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  5 ++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  9 +---
 pom.xml|  2 +-
 .../service/persistent/PersistentSubscription.java |  4 +-
 .../service/persistent/PersistentTopicTest.java| 56 +-
 5 files changed, 66 insertions(+), 10 deletions(-)



(pulsar) 02/02: [improve][build] Upgrade Lombok to 1.18.32 for Java 22 support (#22425)

2024-04-08 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit de5a2ebd609e75c11857d5b02b58ef196efeb689
Author: Lari Hotari 
AuthorDate: Thu Apr 4 06:39:53 2024 -0700

[improve][build] Upgrade Lombok to 1.18.32 for Java 22 support (#22425)
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 7a831b51bd1..85b3f9b8ea9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -228,7 +228,7 @@ flexible messaging model and an intuitive client 
API.
 0.9.1
 2.1.0
 3.24.2
-1.18.30
+1.18.32
 1.3.2
 2.3.1
 1.2.0



(pulsar) branch master updated: [fix][broker] Fix consumer stops receiving messages when with large backlogs processing (#22454)

2024-04-08 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 57a616eaa79 [fix][broker] Fix consumer stops receiving messages when 
with large backlogs processing (#22454)
57a616eaa79 is described below

commit 57a616eaa79096af5b49db89c99cd39ccc94ec00
Author: Jiwei Guo 
AuthorDate: Mon Apr 8 18:22:05 2024 +0800

[fix][broker] Fix consumer stops receiving messages when with large 
backlogs processing (#22454)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  5 ++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  9 +---
 .../service/persistent/PersistentSubscription.java |  4 +-
 .../service/persistent/PersistentTopicTest.java| 56 +-
 4 files changed, 65 insertions(+), 9 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 4daa06cad57..69b130a98c8 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -990,6 +990,11 @@ public class ManagedCursorImpl implements ManagedCursor {
 log.debug("[{}] [{}] Re-trying the read at position {}", 
ledger.getName(), name, op.readPosition);
 }
 
+if (isClosed()) {
+callback.readEntriesFailed(new 
CursorAlreadyClosedException("Cursor was already closed"), ctx);
+return;
+}
+
 if (!hasMoreEntries()) {
 if (log.isDebugEnabled()) {
 log.debug("[{}] [{}] Still no entries available. Register 
for notification", ledger.getName(),
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 3a12cb2ad6c..698563ed7a1 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1032,6 +1032,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 + consumerName), ctx);
 return;
 } else if (!cursor.isDurable()) {
+cursor.setState(ManagedCursorImpl.State.Closed);
 cursors.removeCursor(consumerName);
 deactivateCursorByName(consumerName);
 callback.deleteCursorComplete(ctx);
@@ -3814,13 +3815,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 }
 
 public void addWaitingCursor(ManagedCursorImpl cursor) {
-if (cursor instanceof NonDurableCursorImpl) {
-if (cursor.isActive()) {
-this.waitingCursors.add(cursor);
-}
-} else {
-this.waitingCursors.add(cursor);
-}
+this.waitingCursors.add(cursor);
 }
 
 public boolean isCursorActive(ManagedCursor cursor) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 6e8e94baeae..dbbf92aa76d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -308,7 +308,6 @@ public class PersistentSubscription extends 
AbstractSubscription {
 
 if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
 deactivateCursor();
-topic.getManagedLedger().removeWaitingCursor(cursor);
 
 if (!cursor.isDurable()) {
 // If cursor is not durable, we need to clean up the 
subscription as well. No need to check for active
@@ -338,11 +337,14 @@ public class PersistentSubscription extends 
AbstractSubscription {
 if (!isResetCursor) {
 try {
 
topic.getManagedLedger().deleteCursor(cursor.getName());
+
topic.getManagedLedger().removeWaitingCursor(cursor);
 } catch (InterruptedException | ManagedLedgerException 
e) {
 log.warn("[{}] [{}] Failed to remove non durable 
cursor", topic.getName(), subName, e);
 }
 }
 });
+} else {
+topic.getManagedLedger().removeWaitingCursor(cursor);
 }
 }
 
diff --git 
a/pulsar-broke

(pulsar) branch branch-3.2 updated: [improve][io]: Add validation for JDBC sink not supporting primitive schema (#22376)

2024-04-02 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 1639ae2a0ec [improve][io]: Add validation for JDBC sink not supporting 
primitive schema (#22376)
1639ae2a0ec is described below

commit 1639ae2a0ec34ce475bb813f629a3ce97a3c5e14
Author: Baodi Shi 
AuthorDate: Fri Mar 29 08:33:27 2024 +0800

[improve][io]: Add validation for JDBC sink not supporting primitive schema 
(#22376)
---
 pulsar-io/jdbc/core/pom.xml|  7 +
 .../pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java |  5 
 .../pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java | 25 +
 .../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java  | 31 +++---
 4 files changed, 59 insertions(+), 9 deletions(-)

diff --git a/pulsar-io/jdbc/core/pom.xml b/pulsar-io/jdbc/core/pom.xml
index e1f15332a6c..7617e221105 100644
--- a/pulsar-io/jdbc/core/pom.xml
+++ b/pulsar-io/jdbc/core/pom.xml
@@ -71,6 +71,13 @@
   provided
 
 
+
+  ${project.groupId}
+  pulsar-client-original
+  ${project.version}
+  test
+
+
   
 
 
\ No newline at end of file
diff --git 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
index 36c36740919..3655688c0f3 100644
--- 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
+++ 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.KeyValueSchema;
 import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
 
@@ -137,6 +138,10 @@ public abstract class BaseJdbcAutoSchemaSink extends 
JdbcAbstractSink data.get(k);
 } else {
+SchemaType schemaType = 
message.getSchema().getSchemaInfo().getType();
+if (schemaType.isPrimitive()) {
+throw new UnsupportedOperationException("Primitive schema is 
not supported: " + schemaType);
+}
 recordValueGetter = (key) -> ((GenericRecord) 
record).getField(key);
 }
 String action = message.getProperties().get(ACTION_PROPERTY);
diff --git 
a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
 
b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
index b15eb832242..c088dd3c42c 100644
--- 
a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
+++ 
b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
@@ -22,6 +22,10 @@ import java.util.function.Function;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.util.Utf8;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.functions.api.Record;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -143,5 +147,26 @@ public class BaseJdbcAutoSchemaSinkTest {
 return consumer.apply(record).endRecord().getFields().get(0).schema();
 }
 
+@Test(expectedExceptions = UnsupportedOperationException.class,
+expectedExceptionsMessageRegExp = "Primitive schema is not 
supported.*")
+@SuppressWarnings("unchecked")
+public void testNotSupportPrimitiveSchema() {
+BaseJdbcAutoSchemaSink baseJdbcAutoSchemaSink = new 
BaseJdbcAutoSchemaSink() {};
+AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+
autoConsumeSchema.setSchema(org.apache.pulsar.client.api.Schema.STRING);
+Record record = new Record() {
+@Override
+public org.apache.pulsar.client.api.Schema 
getSchema() {
+return autoConsumeSchema;
+}
+
+@Override
+public GenericRecord getValue() {
+return null;
+}
+};
+baseJdbcAutoSchemaSink.createMutation((Record) record);
+}
+
 
 }
\ No newline at end of file
diff --git 
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
 
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
index d9ed4cbd442..ca01615bef1 100644
--- 
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
+++ 
b/pulsar-io/jdb

(pulsar) branch branch-3.2 updated: [fix][ml] No rollover inactive ledgers when metadata service invalid (#22284)

2024-04-02 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new a83c892f1a9 [fix][ml] No rollover inactive ledgers when metadata 
service invalid (#22284)
a83c892f1a9 is described below

commit a83c892f1a954564f6672253b8d533b020c0fce5
Author: houxiaoyu 
AuthorDate: Sat Mar 30 21:38:55 2024 +0800

[fix][ml] No rollover inactive ledgers when metadata service invalid 
(#22284)

### Motivation

We should not rollover inactive ledgers when metadata service is invailable.

### Modifications

Checking metadata service is vailable when schedule 
`checkInactiveLedgerAndRollOver`
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  7 ---
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 24 ++
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index ee8e7c430ef..3d66bc8d6c0 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -4468,9 +4468,10 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
 @Override
 public boolean checkInactiveLedgerAndRollOver() {
-long currentTimeMs = System.currentTimeMillis();
-if (currentLedgerEntries > 0 && inactiveLedgerRollOverTimeMs > 0 && 
currentTimeMs > (lastAddEntryTimeMs
-+ inactiveLedgerRollOverTimeMs)) {
+if (factory.isMetadataServiceAvailable()
+&& currentLedgerEntries > 0
+&& inactiveLedgerRollOverTimeMs > 0
+&& System.currentTimeMillis() > (lastAddEntryTimeMs + 
inactiveLedgerRollOverTimeMs)) {
 log.info("[{}] Closing inactive ledger, last-add entry {}", name, 
lastAddEntryTimeMs);
 if (STATE_UPDATER.compareAndSet(this, State.LedgerOpened, 
State.ClosingLedger)) {
 LedgerHandle currentLedger = this.currentLedger;
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 0d9d6c0e573..9b4375dbedf 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3922,6 +3922,30 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
 ledger.close();
 }
 
+@Test
+public void testDontRollOverInactiveLedgersWhenMetadataServiceInvalid() 
throws Exception {
+int inactiveLedgerRollOverTimeMs = 5;
+@Cleanup("shutdown")
+ManagedLedgerFactoryImpl factory = spy(new 
ManagedLedgerFactoryImpl(metadataStore, bkc));
+// mock metadata service invalid
+when(factory.isMetadataServiceAvailable()).thenReturn(false);
+ManagedLedgerConfig config = new ManagedLedgerConfig();
+config.setInactiveLedgerRollOverTime(inactiveLedgerRollOverTimeMs, 
TimeUnit.MILLISECONDS);
+ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("rollover_inactive", config);
+
+long ledgerId = ledger.currentLedger.getId();
+
+Thread.sleep(inactiveLedgerRollOverTimeMs * 5);
+ledger.checkInactiveLedgerAndRollOver();
+
+Thread.sleep(inactiveLedgerRollOverTimeMs * 5);
+ledger.checkInactiveLedgerAndRollOver();
+
+assertEquals(ledger.currentLedger.getId(), ledgerId);
+
+ledger.close();
+}
+
 @Test
 public void testOffloadTaskCancelled() throws Exception {
 @Cleanup("shutdown")



(pulsar) branch branch-3.2 updated: [improve] [broker] Servlet support response compression (#21667)

2024-04-02 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new c8591994807 [improve] [broker] Servlet support response compression 
(#21667)
c8591994807 is described below

commit c859199480756eee5bf2dec1b3e6ebf2af9cec21
Author: Hang Chen 
AuthorDate: Wed Mar 13 14:52:43 2024 +0800

[improve] [broker] Servlet support response compression (#21667)
---
 .../org/apache/pulsar/broker/web/WebService.java   | 16 +++--
 .../apache/pulsar/broker/web/WebServiceTest.java   | 72 ++
 2 files changed, 82 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 902593b7bf6..a7c42448990 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -44,6 +44,7 @@ import org.eclipse.jetty.server.handler.HandlerCollection;
 import org.eclipse.jetty.server.handler.RequestLogHandler;
 import org.eclipse.jetty.server.handler.ResourceHandler;
 import org.eclipse.jetty.server.handler.StatisticsHandler;
+import org.eclipse.jetty.server.handler.gzip.GzipHandler;
 import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
@@ -258,15 +259,18 @@ public class WebService implements AutoCloseable {
 
 public void addServlet(String path, ServletHolder servletHolder, boolean 
requiresAuthentication,
Map attributeMap) {
-ServletContextHandler context = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
+ServletContextHandler servletContextHandler = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
 // Notice: each context path should be unique, but there's nothing 
here to verify that
-context.setContextPath(path);
-context.addServlet(servletHolder, MATCH_ALL);
+servletContextHandler.setContextPath(path);
+servletContextHandler.addServlet(servletHolder, MATCH_ALL);
 if (attributeMap != null) {
-attributeMap.forEach(context::setAttribute);
+attributeMap.forEach(servletContextHandler::setAttribute);
 }
-filterInitializer.addFilters(context, requiresAuthentication);
-handlers.add(context);
+filterInitializer.addFilters(servletContextHandler, 
requiresAuthentication);
+
+GzipHandler gzipHandler = new GzipHandler();
+gzipHandler.setHandler(servletContextHandler);
+handlers.add(gzipHandler);
 }
 
 public void addStaticResources(String basePath, String resourcePath) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index 405f3a11b5d..5386363373a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -21,11 +21,14 @@ package org.apache.pulsar.broker.web;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import com.google.common.io.CharStreams;
 import com.google.common.io.Closeables;
 import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+
+import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -42,6 +45,8 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.ZipException;
 import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.KeyManager;
 import javax.net.ssl.KeyManagerFactory;
@@ -353,6 +358,73 @@ public class WebServiceTest {
 assertEquals(res.getResponseBody(), "ok");
 }
 
+@Test
+public void testCompressOutputMetricsInPrometheus() throws Exception {
+
+setupEnv(true, false, false, false, -1, false);
+
+String metricsUrl = pulsar.getWebServiceAddress() + "/metrics/";
+
+String[] command = {"curl", "-H", "Accept-Encoding: gzip", metricsUrl};
+
+ProcessBuilder processBuilder = new ProcessBuilder(command);
+Process process = processBuilder.start();
+
+InputStream inputStream = process.getInputStream();
+
+try {
+GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream);
+
+// Process the decompressed content
+   

(pulsar) branch master updated: [improve][admin] Align the auth and check it at the first place for topic related API (#22342)

2024-04-01 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 50121e7f7be [improve][admin] Align the auth and check it at the first 
place for topic related API  (#22342)
50121e7f7be is described below

commit 50121e7f7be541f45bb6dc976f51e30658b1cb8d
Author: Jiwei Guo 
AuthorDate: Mon Apr 1 20:46:18 2024 +0800

[improve][admin] Align the auth and check it at the first place for topic 
related API  (#22342)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |4 +-
 .../broker/admin/impl/PersistentTopicsBase.java| 1284 ++--
 .../pulsar/broker/admin/v2/PersistentTopics.java   |3 +-
 .../apache/pulsar/broker/admin/TopicAuthZTest.java |  759 
 4 files changed, 1388 insertions(+), 662 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 618c4ca73e1..a1bfeb2142f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -480,9 +480,9 @@ public abstract class AdminResource extends 
PulsarWebResource {
 // validates global-namespace contains local/peer cluster: if 
peer/local cluster present then lookup can
 // serve/redirect request else fail partitioned-metadata-request so, 
client fails while creating
 // producer/consumer
-return validateClusterOwnershipAsync(topicName.getCluster())
+return validateTopicOperationAsync(topicName, TopicOperation.LOOKUP)
+.thenCompose(__ -> 
validateClusterOwnershipAsync(topicName.getCluster()))
 .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject()))
-.thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.LOOKUP))
 .thenCompose(__ -> {
 if (checkAllowAutoCreation) {
 return pulsar().getBrokerService()
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 86993f749b5..16d088756f5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -553,8 +553,8 @@ public class PersistentTopicsBase extends AdminResource {
 }
 
 protected CompletableFuture> 
internalGetPropertiesAsync(boolean authoritative) {
-return validateTopicOwnershipAsync(topicName, authoritative)
-.thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.GET_METADATA))
+return validateTopicOperationAsync(topicName, 
TopicOperation.GET_METADATA)
+.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
 .thenCompose(__ -> {
 if (topicName.isPartitioned()) {
 return getPropertiesAsync();
@@ -586,27 +586,27 @@ public class PersistentTopicsBase extends AdminResource {
 log.warn("[{}] [{}] properties is empty, ignore update", 
clientAppId(), topicName);
 return CompletableFuture.completedFuture(null);
 }
-return validateTopicOwnershipAsync(topicName, authoritative)
-.thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.UPDATE_METADATA))
-.thenCompose(__ -> {
-if (topicName.isPartitioned()) {
-return 
internalUpdateNonPartitionedTopicProperties(properties);
-} else {
-return 
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
-.thenCompose(metadata -> {
-if (metadata.partitions == 0) {
-return 
internalUpdateNonPartitionedTopicProperties(properties);
-}
-return namespaceResources()
-
.getPartitionedTopicResources().updatePartitionedTopicAsync(topicName,
-p -> new 
PartitionedTopicMetadata(p.partitions,
-p.properties == null ? properties
-: MapUtils.putAll(p.properties, 
properties.entrySet().toArray(;
-});
-}
-}).thenAccept(__ ->
-log.info("[{}] [{}] update properties success with properties 
{}"

(pulsar) branch master updated: [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync fo

2024-04-01 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 3eb3b1cd23d [fix][broker] Skip topic.close during unloading if the 
topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to 
use ns.checkOwnershipPresentAsync for ExtensibleLoadBalancer (#22379)
3eb3b1cd23d is described below

commit 3eb3b1cd23d2cc11424bf882e244d3bc2e92bf27
Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com>
AuthorDate: Mon Apr 1 01:52:21 2024 -0700

[fix][broker] Skip topic.close during unloading if the topic future fails 
with ownership check, and fix isBundleOwnedByAnyBroker to use 
ns.checkOwnershipPresentAsync for ExtensibleLoadBalancer (#22379)
---
 .../extensions/manager/UnloadManager.java  | 14 ++-
 .../pulsar/broker/service/BrokerService.java   | 11 +-
 .../pulsar/broker/web/PulsarWebResource.java   |  5 +++
 .../ExtensibleLoadManagerImplBaseTest.java |  8 
 .../extensions/ExtensibleLoadManagerImplTest.java  | 27 +
 .../extensions/manager/UnloadManagerTest.java  | 44 --
 6 files changed, 80 insertions(+), 29 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
index b210dedbfe8..ffae9475243 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions.manager;
 
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigning;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
 import com.google.common.annotations.VisibleForTesting;
@@ -28,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
 import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
@@ -170,6 +173,15 @@ public class UnloadManager implements StateChangeListener {
 
 @Override
 public void handleEvent(String serviceUnit, ServiceUnitStateData data, 
Throwable t) {
+ServiceUnitState state = ServiceUnitStateData.state(data);
+
+if (StringUtils.isBlank(data.sourceBroker()) && (state == Owned || 
state == Assigning)) {
+if (log.isDebugEnabled()) {
+log.debug("Skipping {} for service unit {} from the assignment 
command.", data, serviceUnit);
+}
+return;
+}
+
 if (t != null) {
 if (log.isDebugEnabled()) {
 log.debug("Handling {} for service unit {} with exception.", 
data, serviceUnit, t);
@@ -181,7 +193,7 @@ public class UnloadManager implements StateChangeListener {
 if (log.isDebugEnabled()) {
 log.debug("Handling {} for service unit {}", data, serviceUnit);
 }
-ServiceUnitState state = ServiceUnitStateData.state(data);
+
 switch (state) {
 case Free, Owned -> complete(serviceUnit, t);
 case Releasing -> 
LatencyMetric.RELEASE.endMeasurement(serviceUnit);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 98a0ed95b1a..549dfef896c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2244,9 +2244,18 @@ public class BrokerService implements Closeable {
 closeFutures.add(topicFuture
 .thenCompose(t -> t.isPresent() ? t.get().close(
 disconnectClients, 
closeWithoutWaitingClientDisconnect)
-: CompletableFuture.completedFuture(null)));
+: CompletableFuture.completedFuture(null))
+.exceptionally(e -> {
+if (e.getCause() instanc

(pulsar) branch master updated: [improve][fn] Pass FunctionDetails to Go instance (#22350)

2024-03-28 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 7315aeb6258 [improve][fn] Pass FunctionDetails to Go instance (#22350)
7315aeb6258 is described below

commit 7315aeb6258b7adc9d874268d50acb95ffc0cf2b
Author: jiangpengcheng 
AuthorDate: Fri Mar 29 13:47:29 2024 +0800

[improve][fn] Pass FunctionDetails to Go instance (#22350)
---
 pulsar-function-go/conf/conf.go|   2 +
 pulsar-function-go/pf/instanceConf.go  |  11 ++
 pulsar-function-go/pf/instanceConf_test.go | 207 +
 .../functions/instance/go/GoInstanceConfig.java|   2 +
 .../pulsar/functions/runtime/RuntimeUtils.java |   6 +
 .../runtime/kubernetes/KubernetesRuntimeTest.java  |   8 +
 6 files changed, 236 insertions(+)

diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go
index 03513648fac..1442a0f865f 100644
--- a/pulsar-function-go/conf/conf.go
+++ b/pulsar-function-go/conf/conf.go
@@ -91,6 +91,8 @@ type Conf struct {
UserConfig  string `json:"userConfig" yaml:"userConfig"`
//metrics config
MetricsPort int `json:"metricsPort" yaml:"metricsPort"`
+   // FunctionDetails
+   FunctionDetails string `json:"functionDetails" yaml:"functionDetails"`
 }
 
 var (
diff --git a/pulsar-function-go/pf/instanceConf.go 
b/pulsar-function-go/pf/instanceConf.go
index 4cb60dd258a..844a2bc9b89 100644
--- a/pulsar-function-go/pf/instanceConf.go
+++ b/pulsar-function-go/pf/instanceConf.go
@@ -25,7 +25,9 @@ import (
"time"
 
"github.com/apache/pulsar/pulsar-function-go/conf"
+   log "github.com/apache/pulsar/pulsar-function-go/logutil"
pb "github.com/apache/pulsar/pulsar-function-go/pb"
+   "google.golang.org/protobuf/encoding/protojson"
 )
 
 // This is the config passed to the Golang Instance. Contains all the 
information
@@ -122,6 +124,15 @@ func newInstanceConfWithConf(cfg *conf.Conf) *instanceConf 
{
tlsAllowInsecure:cfg.TLSAllowInsecureConnection,
tlsHostnameVerification: cfg.TLSHostnameVerificationEnable,
}
+   // parse the raw function details and ignore the unmarshal 
error(fallback to original way)
+   if cfg.FunctionDetails != "" {
+   functionDetails := pb.FunctionDetails{}
+   if err := protojson.Unmarshal([]byte(cfg.FunctionDetails), 
); err != nil {
+   log.Errorf("Failed to unmarshal function details: %v", 
err)
+   } else {
+   instanceConf.funcDetails = functionDetails
+   }
+   }
 
if instanceConf.funcDetails.ProcessingGuarantees == 
pb.ProcessingGuarantees_EFFECTIVELY_ONCE {
panic("Go instance current not support EFFECTIVELY_ONCE 
processing guarantees.")
diff --git a/pulsar-function-go/pf/instanceConf_test.go 
b/pulsar-function-go/pf/instanceConf_test.go
index 02aef913ebc..cc5f46e2fe1 100644
--- a/pulsar-function-go/pf/instanceConf_test.go
+++ b/pulsar-function-go/pf/instanceConf_test.go
@@ -20,6 +20,7 @@
 package pf
 
 import (
+   "fmt"
"testing"
 
cfg "github.com/apache/pulsar/pulsar-function-go/conf"
@@ -113,3 +114,209 @@ func TestInstanceConf_Fail(t *testing.T) {
newInstanceConfWithConf({ProcessingGuarantees: 3})
}, "Should have a panic")
 }
+
+func TestInstanceConf_WithDetails(t *testing.T) {
+   cfg := {
+   FunctionDetails: 
`{"tenant":"public","namespace":"default","name":"test-function","className":"process",
+"logTopic":"test-logs","userConfig":"{\"key1\":\"value1\"}","runtime":"GO","autoAck":true,"parallelism":1,
+"source":{"configs":"{\"username\":\"admin\"}","typeClassName":"string","timeoutMs":"15000",
+"subscriptionName":"test-subscription","inputSpecs":{"input":{"schemaType":"avro","receiverQueueSize":{"value":1000},
+"schemaProperties":{"schema_prop1":"schema1"},"consumerProperties":{"consumer_prop1":"consumer1"},"cryptoSpec":
+{"cryptoKeyReaderClassName":"key-reader","producerCryptoFailureAction":"SEND","consumerCryptoFailureAction":"CONSUME"}}}
+,"negativeAckRedeliveryDelayMs

(pulsar) branch master updated: [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription (#22359)

2024-03-27 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 6f9c8e7f70e [fix][broker] Fix PersistentSubscription duplicate 
implementation interface Subscription (#22359)
6f9c8e7f70e is described below

commit 6f9c8e7f70ec201d65c7fc270480bed3aa3b5aba
Author: sherlock-lin <1193179...@qq.com>
AuthorDate: Thu Mar 28 11:58:47 2024 +0800

[fix][broker] Fix PersistentSubscription duplicate implementation interface 
Subscription (#22359)
---
 .../pulsar/broker/service/nonpersistent/NonPersistentSubscription.java | 3 +--
 .../pulsar/broker/service/persistent/PersistentSubscription.java   | 2 +-
 2 files changed, 2 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 92aba6221da..cfe05cc32b7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -40,7 +40,6 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFence
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.GetStatsOptions;
-import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
@@ -53,7 +52,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class NonPersistentSubscription extends AbstractSubscription implements 
Subscription {
+public class NonPersistentSubscription extends AbstractSubscription {
 private final NonPersistentTopic topic;
 private volatile NonPersistentDispatcher dispatcher;
 private final String topicName;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 50e84310ac1..6e8e94baeae 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -97,7 +97,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PersistentSubscription extends AbstractSubscription implements 
Subscription {
+public class PersistentSubscription extends AbstractSubscription {
 protected final PersistentTopic topic;
 protected final ManagedCursor cursor;
 protected volatile Dispatcher dispatcher;



(pulsar) branch master updated: [fix][broker] Fix typos in PersistentTopic class (#22364)

2024-03-27 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 32037c3b098 [fix][broker] Fix typos in PersistentTopic class (#22364)
32037c3b098 is described below

commit 32037c3b0982aa00a7cb5ee7e17a6b235a8c2d7f
Author: hanmz 
AuthorDate: Thu Mar 28 10:31:09 2024 +0800

[fix][broker] Fix typos in PersistentTopic class (#22364)
---
 .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6179e73169f..1650e449a3f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2753,7 +2753,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 ledger.asyncMigrate();
 }
 if (log.isDebugEnabled()) {
-log.debug("{} has replication backlog and applied 
migraiton", topic);
+log.debug("{} has replication backlog and applied 
migration", topic);
 }
 return CompletableFuture.completedFuture(null);
 }



(pulsar) branch master updated: [improve][broker] Optimize web interface deleteDynamicConfiguration return error message (#22356)

2024-03-27 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new e4553391f96 [improve][broker] Optimize web interface 
deleteDynamicConfiguration return error message (#22356)
e4553391f96 is described below

commit e4553391f96af3bda3d8252b97cac3de1f39a1b5
Author: hanmz 
AuthorDate: Thu Mar 28 00:07:54 2024 +0800

[improve][broker] Optimize web interface deleteDynamicConfiguration return 
error message (#22356)
---
 .../src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 61b354610ac..83067e9f296 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -526,7 +526,7 @@ public class BrokersBase extends AdminResource {
 
 private CompletableFuture 
internalDeleteDynamicConfigurationOnMetadataAsync(String configName) {
 if (!pulsar().getBrokerService().isDynamicConfiguration(configName)) {
-throw new RestException(Status.PRECONDITION_FAILED, " Can't update 
non-dynamic configuration");
+throw new RestException(Status.PRECONDITION_FAILED, "Can't delete 
non-dynamic configuration");
 } else {
 return 
dynamicConfigurationResources().setDynamicConfigurationAsync(old -> {
 if (old != null) {



(pulsar) branch master updated: [cleanup][cli] Cleanup jcommander (#22337)

2024-03-27 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new d23a8f64acb [cleanup][cli] Cleanup jcommander (#22337)
d23a8f64acb is described below

commit d23a8f64acbfb4179b9f2f64e1e9dd0756742a5b
Author: Zixuan Liu 
AuthorDate: Wed Mar 27 16:36:36 2024 +0800

[cleanup][cli] Cleanup jcommander (#22337)

Signed-off-by: Zixuan Liu 
---
 distribution/shell/src/assemble/LICENSE.bin.txt|  1 -
 pom.xml|  7 ---
 pulsar-cli-utils/pom.xml   |  5 --
 .../cli/converters/TimeUnitToSecondsConverter.java | 42 --
 .../validators/IntegerMaxValueLongValidator.java   | 30 --
 .../cli/validators/MinNegativeOneValidator.java| 30 --
 .../cli/validators/NonNegativeValueValidator.java  | 30 --
 .../validators/PositiveIntegerValueValidator.java  | 31 ---
 .../cli/validators/PositiveLongValueValidator.java | 31 ---
 .../apache/pulsar/cli/validators/package-info.java | 19 ---
 .../pulsar/cli/converters/TimeConversionTest.java  |  5 +-
 .../cli/validators/CliUtilValidatorsTest.java  | 64 --
 .../cli/utils/NameValueParameterSplitterTest.java  | 52 --
 .../apache/pulsar/admin/cli/utils/CmdUtils.java| 11 ++--
 .../cli/utils/NameValueParameterSplitter.java  | 61 -
 .../org/apache/pulsar/client/cli/CmdProduce.java   |  9 ++-
 .../java/org/apache/pulsar/client/cli/CmdRead.java |  3 +-
 .../apache/pulsar/admin/cli/TestCmdSources.java|  3 +-
 18 files changed, 14 insertions(+), 420 deletions(-)

diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt 
b/distribution/shell/src/assemble/LICENSE.bin.txt
index 2b2f1c26be1..e735bd454ee 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -309,7 +309,6 @@ pulsar-client-cpp/lib/checksum/crc32c_sw.cc
 This projects includes binary packages with the following licenses:
 
 The Apache Software License, Version 2.0
- * JCommander -- jcommander-1.82.jar
  * Picocli
  - picocli-4.7.5.jar
  - picocli-shell-jline3-4.7.5.jar
diff --git a/pom.xml b/pom.xml
index caa2fc49b27..da7f2c76421 100644
--- a/pom.xml
+++ b/pom.xml
@@ -213,7 +213,6 @@ flexible messaging model and an intuitive client 
API.
 6.2.8
 0.20
 2.12.1
-1.82
 3.11
 1.10
 2.8.0
@@ -693,12 +692,6 @@ flexible messaging model and an intuitive client 
API.
 linux-aarch_64
   
 
-  
-com.beust
-jcommander
-${jcommander.version}
-  
-
   
 info.picocli
 picocli
diff --git a/pulsar-cli-utils/pom.xml b/pulsar-cli-utils/pom.xml
index ac442b4004e..1638029f4c8 100644
--- a/pulsar-cli-utils/pom.xml
+++ b/pulsar-cli-utils/pom.xml
@@ -35,11 +35,6 @@
   Isolated CLI utility module
 
   
-
-  com.beust
-  jcommander
-  compile
-
 
   info.picocli
   picocli
diff --git 
a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/TimeUnitToSecondsConverter.java
 
b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/TimeUnitToSecondsConverter.java
deleted file mode 100644
index 3aca2e95d25..000
--- 
a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/TimeUnitToSecondsConverter.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.cli.converters;
-
-import static org.apache.pulsar.cli.ValueValidationUtil.emptyCheck;
-import com.beust.jcommander.ParameterException;
-import com.beust.jcommander.converters.BaseConverter;
-import java.util.concurrent.TimeUnit;
-
-public class TimeUnitToSecondsConverter extends BaseConverter {
-
-public TimeUnitToSecondsConverter(String optionName) {
-super(optionName);
-}
-
-@Override
-public Long convert(String str) {
-emptyCheck(getOptionName(), str);
-try {
-return TimeUn

(pulsar) branch master updated: [fix][cli] Fix typos in CmdSinks class (#22358)

2024-03-26 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 023446b7328 [fix][cli] Fix typos in CmdSinks class (#22358)
023446b7328 is described below

commit 023446b73287dea25c22c6db307e5d723306e765
Author: hanmz 
AuthorDate: Wed Mar 27 11:40:28 2024 +0800

[fix][cli] Fix typos in CmdSinks class (#22358)
---
 .../src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java   | 2 +-
 .../src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java   | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 47af7e6794c..f3172a49b01 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -617,7 +617,7 @@ public class CmdSinks extends CmdBase {
 protected void validateSinkConfigs(SinkConfig sinkConfig) {
 
 if (isBlank(sinkConfig.getArchive())) {
-throw new ParameterException("Sink archive not specfied");
+throw new ParameterException("Sink archive not specified");
 }
 
 
org.apache.pulsar.common.functions.Utils.inferMissingArguments(sinkConfig);
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index c68bbd20ab8..6fbe3bc5da2 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -283,7 +283,7 @@ public class TestCmdSinks {
 }
 
 @Test(expectedExceptions = CliCommand.ParameterException.class,
-expectedExceptionsMessageRegExp = "Sink archive not specfied")
+expectedExceptionsMessageRegExp = "Sink archive not specified")
 public void testMissingArchive() throws Exception {
 SinkConfig sinkConfig = getSinkConfig();
 sinkConfig.setArchive(null);
@@ -503,7 +503,7 @@ public class TestCmdSinks {
 testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
 }
 
-@Test(expectedExceptions = CliCommand.ParameterException.class, 
expectedExceptionsMessageRegExp = "Sink archive not specfied")
+@Test(expectedExceptions = CliCommand.ParameterException.class, 
expectedExceptionsMessageRegExp = "Sink archive not specified")
 public void testCmdSinkConfigFileMissingJar() throws Exception {
 SinkConfig testSinkConfig = getSinkConfig();
 testSinkConfig.setArchive(null);



(pulsar) branch branch-2.10 updated: [fix][broker] Fast fix infinite HTTP call getSubscriptions caused by wrong topicName (#22357)

2024-03-26 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
 new 13938c1b051 [fix][broker] Fast fix infinite HTTP call getSubscriptions 
caused by wrong topicName (#22357)
13938c1b051 is described below

commit 13938c1b051786077744cbf362cd68a0f0875f16
Author: fengyubiao 
AuthorDate: Tue Mar 26 19:50:15 2024 +0800

[fix][broker] Fast fix infinite HTTP call getSubscriptions caused by wrong 
topicName (#22357)

Co-authored-by: Jiwe Guo 
---
 .../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java| 2 +-
 .../api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java| 9 ++---
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index dbcfd734a37..6e537900f39 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -,7 +,7 @@ public class PersistentTopicsBase extends AdminResource {
 getPartitionedTopicMetadataAsync(topicName, 
authoritative, allowAutoTopicCreation))
 .thenAccept(partitionMetadata -> {
 final int numPartitions = partitionMetadata.partitions;
-if (numPartitions > 0) {
+if (partitionMetadata.partitions > 0 && 
!isUnexpectedTopicName(partitionMetadata)) {
 final CompletableFuture future = new 
CompletableFuture<>();
 final AtomicInteger count = new 
AtomicInteger(numPartitions);
 final AtomicInteger failureCount = new 
AtomicInteger(0);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java
index 6d3806f312e..25eee62609f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java
@@ -52,7 +52,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest 
extends ProducerCo
 }
 
 @Test
-public void testInfiniteHttpCallGetSubscriptions() throws Exception {
+public void testInfiniteHttpCallGetOrCreateSubscriptions() throws 
Exception {
 final String randomStr = UUID.randomUUID().toString().replaceAll("-", 
"");
 final String partitionedTopicName = 
"persistent://my-property/my-ns/tp1_" + randomStr;
 final String topic_p0 = partitionedTopicName + 
TopicName.PARTITIONED_TOPIC_SUFFIX + "0";
@@ -64,6 +64,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest 
extends ProducerCo
 // Do test.
 ProducerAndConsumerEntry pcEntry = triggerDLQCreated(topic_p0, 
topicDLQ, subscriptionName);
 admin.topics().getSubscriptions(topicDLQ);
+admin.topics().createSubscription(topicDLQ, "s1", MessageId.earliest);
 
 // cleanup.
 pcEntry.consumer.close();
@@ -72,7 +73,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest 
extends ProducerCo
 }
 
 @Test
-public void testInfiniteHttpCallGetSubscriptions2() throws Exception {
+public void testInfiniteHttpCallGetOrCreateSubscriptions2() throws 
Exception {
 final String randomStr = UUID.randomUUID().toString().replaceAll("-", 
"");
 final String topicName = "persistent://my-property/my-ns/tp1_" + 
randomStr + "-partition-0-abc";
 Producer producer = pulsarClient.newProducer(Schema.STRING)
@@ -81,13 +82,14 @@ public class 
TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerCo
 
 // Do test.
 admin.topics().getSubscriptions(topicName);
+admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
 
 // cleanup.
 producer.close();
 }
 
 @Test
-public void testInfiniteHttpCallGetSubscriptions3() throws Exception {
+public void testInfiniteHttpCallGetOrCreateSubscriptions3() throws 
Exception {
 final String randomStr = UUID.randomUUID().toString().replaceAll("-", 
"");
 final String topicName = "persistent://my-property/my-ns/tp1_" + 
randomStr + "-partition-0";
 Producer producer = pulsarClient.newProducer(Schema.STRING)
@@ -96,6 +98,7 @@ public class Top

(pulsar) branch master updated: [improve][cli] PIP-343: Use picocli instead of jcommander in pulsar-perf (#22303)

2024-03-21 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 69c45ad5300 [improve][cli] PIP-343: Use picocli instead of jcommander 
in pulsar-perf (#22303)
69c45ad5300 is described below

commit 69c45ad5300e36a62a923b8eaa58aab99c6e02fb
Author: crossoverJie 
AuthorDate: Fri Mar 22 09:12:37 2024 +0800

[improve][cli] PIP-343: Use picocli instead of jcommander in pulsar-perf 
(#22303)

Co-authored-by: Zixuan Liu 
---
 .../cli/converters/ByteUnitToLongConverter.java| 39 -
 .../pulsar/cli/converters/ByteConversionTest.java  |  9 +-
 pulsar-testclient/pom.xml  |  4 +-
 .../proxy/socket/client/PerformanceClient.java | 65 +++
 .../apache/pulsar/testclient/BrokerMonitor.java| 30 +++
 .../testclient/CmdGenerateDocumentation.java   | 67 +--
 .../pulsar/testclient/LoadSimulationClient.java| 34 
 .../testclient/LoadSimulationController.java   | 68 +++
 .../pulsar/testclient/ManagedLedgerWriter.java | 57 ++---
 .../testclient/PerformanceBaseArguments.java   | 59 +++--
 .../pulsar/testclient/PerformanceConsumer.java | 65 ---
 .../pulsar/testclient/PerformanceProducer.java | 96 +++---
 .../pulsar/testclient/PerformanceReader.java   | 19 +++--
 .../testclient/PerformanceTopicListArguments.java  | 10 ++-
 .../pulsar/testclient/PerformanceTransaction.java  | 45 +-
 ...or.java => PositiveNumberParameterConvert.java} | 15 ++--
 .../pulsar/testclient/GenerateDocumentionTest.java | 37 +
 17 files changed, 377 insertions(+), 342 deletions(-)

diff --git 
a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/ByteUnitToLongConverter.java
 
b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/ByteUnitToLongConverter.java
deleted file mode 100644
index 6170fb489d4..000
--- 
a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/ByteUnitToLongConverter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.cli.converters;
-
-import static org.apache.pulsar.cli.ValueValidationUtil.emptyCheck;
-import com.beust.jcommander.converters.BaseConverter;
-
-public class ByteUnitToLongConverter extends BaseConverter {
-
-public ByteUnitToLongConverter(String optionName) {
-super(optionName);
-}
-
-@Override
-public Long convert(String argStr) {
-return parseBytes(argStr);
-}
-
-Long parseBytes(String argStr) {
-emptyCheck(getOptionName(), argStr);
-return ByteUnitUtil.validateSizeString(argStr);
-}
-}
diff --git 
a/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/ByteConversionTest.java
 
b/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/ByteConversionTest.java
index 283e94bfb9c..6e7a2e6d7e3 100644
--- 
a/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/ByteConversionTest.java
+++ 
b/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/ByteConversionTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.cli.converters;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertThrows;
 import org.apache.pulsar.cli.converters.picocli.ByteUnitToIntegerConverter;
+import org.apache.pulsar.cli.converters.picocli.ByteUnitToLongConverter;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 import picocli.CommandLine.TypeConversionException;
@@ -59,8 +60,8 @@ public class ByteConversionTest {
 }
 
 @Test(dataProvider = "successfulByteUnitUtilTestCases")
-public void testSuccessfulByteUnitToLongConverter(String input, long 
expected) {
-ByteUnitToLongConverter converter = new 
ByteUnitToLongConverter("optionName");
+public void testSuccessfulByteUnitToLongConverter(String input, long 
expected) throws Exception{
+ByteUnitToLongConverter converter = new ByteU

(pulsar) branch master updated: [improve][admin] Fix the `createMissingPartitions` doesn't response correctly (#22311)

2024-03-21 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 5cabcacbfa8 [improve][admin] Fix the `createMissingPartitions` doesn't 
response correctly (#22311)
5cabcacbfa8 is described below

commit 5cabcacbfa8874931d501cd040f7a8ac3d6d1923
Author: Jiwei Guo 
AuthorDate: Thu Mar 21 15:24:50 2024 +0800

[improve][admin] Fix the `createMissingPartitions` doesn't response 
correctly (#22311)
---
 .../org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java  | 4 +++-
 .../java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java  | 7 +++
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 10cf5edd3c3..86993f749b5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -487,7 +487,7 @@ public class PersistentTopicsBase extends AdminResource {
 
 protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) 
{
 getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
-if (metadata != null) {
+if (metadata != null && metadata.partitions > 0) {
 CompletableFuture future = 
validateNamespaceOperationAsync(topicName.getNamespaceObject(),
 NamespaceOperation.CREATE_TOPIC);
 future.thenCompose(__ -> 
tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> {
@@ -497,6 +497,8 @@ public class PersistentTopicsBase extends AdminResource {
 resumeAsyncResponseExceptionally(asyncResponse, e);
 return null;
 });
+} else {
+throw new RestException(Status.NOT_FOUND, String.format("Topic 
%s does not exist", topicName));
 }
 }).exceptionally(ex -> {
 // If the exception is not redirect exception we need to log it.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 23cb413614f..9a292175caa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -33,6 +33,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -1779,4 +1780,10 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
 assertTrue(namespaces.contains(ns1V2));
 assertTrue(namespaces.contains(ns1V1));
 }
+
+@Test
+public void testCreateMissingPartitions() throws Exception {
+String topicName = "persistent://" + testTenant + "/" + 
testNamespaceLocal + "/testCreateMissingPartitions";
+assertThrows(PulsarAdminException.NotFoundException.class, () -> 
admin.topics().createMissedPartitions(topicName));
+}
 }



(pulsar) branch branch-3.0 updated: [improve][broker] Add fine-grain authorization to ns/topic management endpoints (#22309)

2024-03-20 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new b51b74883fb [improve][broker] Add fine-grain authorization to ns/topic 
management endpoints (#22309)
b51b74883fb is described below

commit b51b74883fb66673161d0b73c6a7257d073c57a5
Author: Jiwei Guo 
AuthorDate: Wed Mar 20 20:37:32 2024 +0800

[improve][broker] Add fine-grain authorization to ns/topic management 
endpoints (#22309)
---
 .../authorization/PulsarAuthorizationProvider.java |   1 +
 .../apache/pulsar/broker/admin/AdminResource.java  |   7 +-
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 166 +-
 .../broker/admin/impl/PersistentTopicsBase.java| 161 +-
 .../pulsar/broker/admin/NamespaceAuthZTest.java| 164 ++
 .../apache/pulsar/broker/admin/TopicAuthZTest.java | 346 +
 6 files changed, 680 insertions(+), 165 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 162c44bec38..9e9f2a446de 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -542,6 +542,7 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
 case COMPACT:
 case OFFLOAD:
 case UNLOAD:
+case TRIM_TOPIC:
 case DELETE_METADATA:
 case UPDATE_METADATA:
 case ADD_BUNDLE_RANGE:
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index e5806b7bec2..f048d650f8e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -59,8 +59,6 @@ import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.PolicyName;
-import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
@@ -710,10 +708,7 @@ public abstract class AdminResource extends 
PulsarWebResource {
 }
 
 protected CompletableFuture 
getSchemaCompatibilityStrategyAsync() {
-return validateTopicPolicyOperationAsync(topicName,
-PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
-PolicyOperation.READ)
-.thenCompose((__) -> 
getSchemaCompatibilityStrategyAsyncWithoutAuth()).whenComplete((__, ex) -> {
+return 
getSchemaCompatibilityStrategyAsyncWithoutAuth().whenComplete((__, ex) -> {
 if (ex != null) {
 log.error("[{}] Failed to get schema compatibility 
strategy of topic {} {}",
 clientAppId(), topicName, ex);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 46b1712e7dd..72f5f1439d9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2378,102 +2378,110 @@ public abstract class NamespacesBase extends 
AdminResource {
}
 
protected void internalSetProperty(String key, String value, AsyncResponse 
asyncResponse) {
-   validatePoliciesReadOnlyAccess();
-   updatePoliciesAsync(namespaceName, policies -> {
-   policies.properties.put(key, value);
-   return policies;
-   }).thenAccept(v -> {
-   log.info("[{}] Successfully set property for key {} on namespace 
{}", clientAppId(), key,
-   namespaceName);
-   asyncResponse.resume(Response.noContent().build());
-   }).exceptionally(ex -> {
-   Throwable cause = ex.getCause();
-   log.error("[{}] Failed to set property for key {} on namespace {}", 
clientAppId(), key,
-   namespaceName

(pulsar) branch branch-3.2 updated: [fix] [client] Unclear error message when creating a consumer with two same topics (#22255)

2024-03-20 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 0edb8a93470 [fix] [client] Unclear error message when creating a 
consumer with two same topics (#22255)
0edb8a93470 is described below

commit 0edb8a934704ede1cc134983a84016e611ac8cec
Author: fengyubiao 
AuthorDate: Tue Mar 19 17:23:13 2024 +0800

[fix] [client] Unclear error message when creating a consumer with two same 
topics (#22255)
---
 .../pulsar/client/api/MultiTopicsConsumerTest.java | 27 ++
 .../client/impl/MultiTopicsConsumerImpl.java   | 21 +
 2 files changed, 38 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
index bb8bab29ad9..7a12acd47ed 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -26,9 +26,11 @@ import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -42,6 +44,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -50,6 +53,7 @@ import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.mockito.AdditionalAnswers;
 import org.mockito.Mockito;
@@ -372,6 +376,29 @@ public class MultiTopicsConsumerTest extends 
ProducerConsumerBase {
 assertTrue(consumer.isConnected());
 }
 
+@Test
+public void testSameTopics() throws Exception {
+final String topic1 = 
BrokerTestUtil.newUniqueName("public/default/tp");
+final String topic2 = "persistent://" + topic1;
+admin.topics().createNonPartitionedTopic(topic2);
+// Create consumer with two same topics.
+try {
+
pulsarClient.newConsumer(Schema.INT32).topics(Arrays.asList(topic1, topic2))
+.subscriptionName("s1").subscribe();
+fail("Do not allow use two same topics.");
+} catch (Exception e) {
+if (e instanceof PulsarClientException && e.getCause() != null) {
+e = (Exception) e.getCause();
+}
+Throwable unwrapEx = FutureUtil.unwrapCompletionException(e);
+assertTrue(unwrapEx instanceof IllegalArgumentException);
+assertTrue(e.getMessage().contains( "Subscription topics include 
duplicate items"
++ " or invalid names"));
+}
+// cleanup.
+admin.topics().delete(topic2);
+}
+
 @Test(timeOut = 3)
 public void testSubscriptionNotFound() throws PulsarAdminException, 
PulsarClientException {
 final var topic1 = newTopicName();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index d18af475d61..20fd03d6a28 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -165,7 +165,8 @@ public class MultiTopicsConsumerImpl extends 
ConsumerBase {
 return;
 }
 
-checkArgument(topicNamesValid(conf.getTopicNames()), "Topics is 
invalid.");
+checkArgument(topicNamesValid(conf.getTopicNames()), "Subscription 
topics include duplicate items"
++ " or invalid names.");
 
 List> futures = conf.getTopicNames().stream()
 .map(t -> subscribeAsync(t, createTopicIfDoesNotExist))
@@ -202,21 +203,21 @@ public class MultiTopicsConsumerImpl extends 
ConsumerBase {
 checkState(topics != null && topics.size() >= 1,
   

(pulsar) branch branch-3.2 updated: [fix][broker] Fix wrong double-checked locking for readOnActiveConsumerTask in dispatcher (#22279)

2024-03-20 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 35ea4712a60 [fix][broker] Fix wrong double-checked locking for 
readOnActiveConsumerTask in dispatcher (#22279)
35ea4712a60 is described below

commit 35ea4712a60992deeb93171c8b0e6d48c84e530a
Author: Yunze Xu 
AuthorDate: Sat Mar 16 14:56:34 2024 +0800

[fix][broker] Fix wrong double-checked locking for readOnActiveConsumerTask 
in dispatcher (#22279)
---
 .../PersistentDispatcherSingleActiveConsumer.java  | 26 +-
 1 file changed, 16 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 387ba83d9cd..7494d11421a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -74,6 +74,7 @@ public class PersistentDispatcherSingleActiveConsumer extends 
AbstractDispatcher
 protected volatile int readBatchSize;
 protected final Backoff readFailureBackoff;
 private volatile ScheduledFuture readOnActiveConsumerTask = null;
+private final Object lockForReadOnActiveConsumerTask = new Object();
 
 private final RedeliveryTracker redeliveryTracker;
 
@@ -123,18 +124,23 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
 return;
 }
 
-readOnActiveConsumerTask = 
topic.getBrokerService().executor().schedule(() -> {
-if (log.isDebugEnabled()) {
-log.debug("[{}] Rewind cursor and read more entries after {} 
ms delay", name,
-
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
+synchronized (lockForReadOnActiveConsumerTask) {
+if (readOnActiveConsumerTask != null) {
+return;
 }
-Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
-cursor.rewind(activeConsumer != null && 
activeConsumer.readCompacted());
+readOnActiveConsumerTask = 
topic.getBrokerService().executor().schedule(() -> {
+if (log.isDebugEnabled()) {
+log.debug("[{}] Rewind cursor and read more entries after 
{} ms delay", name,
+
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
+}
+Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+cursor.rewind(activeConsumer != null && 
activeConsumer.readCompacted());
 
-notifyActiveConsumerChanged(activeConsumer);
-readMoreEntries(activeConsumer);
-readOnActiveConsumerTask = null;
-}, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), 
TimeUnit.MILLISECONDS);
+notifyActiveConsumerChanged(activeConsumer);
+readMoreEntries(activeConsumer);
+readOnActiveConsumerTask = null;
+}, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), 
TimeUnit.MILLISECONDS);
+}
 }
 
 @Override



(pulsar) branch branch-3.2 updated: [improve][broker] Add fine-grain authorization to ns/topic management endpoints (#22305)

2024-03-20 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 6ffe667cdda [improve][broker] Add fine-grain authorization to ns/topic 
management endpoints (#22305)
6ffe667cdda is described below

commit 6ffe667cddad3e959e02ce31fd09b2f9a439d50a
Author: Jiwei Guo 
AuthorDate: Wed Mar 20 13:49:54 2024 +0800

[improve][broker] Add fine-grain authorization to ns/topic management 
endpoints (#22305)
---
 .../authorization/PulsarAuthorizationProvider.java |   1 +
 .../apache/pulsar/broker/admin/AdminResource.java  |   7 +-
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 166 +-
 .../broker/admin/impl/PersistentTopicsBase.java| 172 +-
 .../pulsar/broker/admin/NamespaceAuthZTest.java| 163 ++
 .../apache/pulsar/broker/admin/TopicAuthZTest.java | 345 +
 6 files changed, 683 insertions(+), 171 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index acb6fce9b92..a39c3d05607 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -597,6 +597,7 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
 case COMPACT:
 case OFFLOAD:
 case UNLOAD:
+case TRIM_TOPIC:
 case DELETE_METADATA:
 case UPDATE_METADATA:
 case ADD_BUNDLE_RANGE:
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 2ceec189975..b0d66c27d63 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -62,8 +62,6 @@ import 
org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.PolicyName;
-import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
@@ -714,10 +712,7 @@ public abstract class AdminResource extends 
PulsarWebResource {
 }
 
 protected CompletableFuture 
getSchemaCompatibilityStrategyAsync() {
-return validateTopicPolicyOperationAsync(topicName,
-PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
-PolicyOperation.READ)
-.thenCompose((__) -> 
getSchemaCompatibilityStrategyAsyncWithoutAuth()).whenComplete((__, ex) -> {
+return 
getSchemaCompatibilityStrategyAsyncWithoutAuth().whenComplete((__, ex) -> {
 if (ex != null) {
 log.error("[{}] Failed to get schema compatibility 
strategy of topic {} {}",
 clientAppId(), topicName, ex);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index f274cffa46b..b2e427b6e46 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2339,102 +2339,110 @@ public abstract class NamespacesBase extends 
AdminResource {
}
 
protected void internalSetProperty(String key, String value, AsyncResponse 
asyncResponse) {
-   validatePoliciesReadOnlyAccess();
-   updatePoliciesAsync(namespaceName, policies -> {
-   policies.properties.put(key, value);
-   return policies;
-   }).thenAccept(v -> {
-   log.info("[{}] Successfully set property for key {} on namespace 
{}", clientAppId(), key,
-   namespaceName);
-   asyncResponse.resume(Response.noContent().build());
-   }).exceptionally(ex -> {
-   Throwable cause = ex.getCause();
-   log.error("[{}] Failed to set property for key {} on namespace {}", 
clientAppId(), key,
-   namespaceName

(pulsar) branch master updated: [improve][broker] Remove the atomicity on active consumer of a dispatcher (#22285)

2024-03-19 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 1b1bd4b610d [improve][broker] Remove the atomicity on active consumer 
of a dispatcher (#22285)
1b1bd4b610d is described below

commit 1b1bd4b610dd768a6908964ef841a6790bb0f4f0
Author: Yunze Xu 
AuthorDate: Tue Mar 19 18:57:10 2024 +0800

[improve][broker] Remove the atomicity on active consumer of a dispatcher 
(#22285)
---
 .../AbstractDispatcherSingleActiveConsumer.java| 30 ++--
 ...onPersistentDispatcherSingleActiveConsumer.java |  4 +-
 .../PersistentDispatcherSingleActiveConsumer.java  | 56 +++---
 3 files changed, 45 insertions(+), 45 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 7726eb814a0..9980b6ae97c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -30,7 +30,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
@@ -47,9 +46,6 @@ import org.slf4j.LoggerFactory;
 public abstract class AbstractDispatcherSingleActiveConsumer extends 
AbstractBaseDispatcher {
 
 protected final String topicName;
-protected static final 
AtomicReferenceFieldUpdater
-ACTIVE_CONSUMER_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
-AbstractDispatcherSingleActiveConsumer.class, Consumer.class, 
"activeConsumer");
 private volatile Consumer activeConsumer = null;
 protected final CopyOnWriteArrayList consumers;
 protected StickyKeyConsumerSelector stickyKeyConsumerSelector;
@@ -78,11 +74,16 @@ public abstract class 
AbstractDispatcherSingleActiveConsumer extends AbstractBas
 this.partitionIndex = partitionIndex;
 this.subscriptionType = subscriptionType;
 this.cursor = cursor;
-ACTIVE_CONSUMER_UPDATER.set(this, null);
 }
 
+/**
+ * @apiNote this method does not need to be thread safe
+ */
 protected abstract void scheduleReadOnActiveConsumer();
 
+/**
+ * @apiNote this method does not need to be thread safe
+ */
 protected abstract void cancelPendingRead();
 
 protected void notifyActiveConsumerChanged(Consumer activeConsumer) {
@@ -99,6 +100,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer 
extends AbstractBas
  * distributed partitions evenly across consumers with highest priority 
level.
  *
  * @return the true consumer if the consumer is changed, otherwise false.
+ * @apiNote this method is not thread safe
  */
 protected boolean pickAndScheduleActiveConsumer() {
 checkArgument(!consumers.isEmpty());
@@ -128,14 +130,14 @@ public abstract class 
AbstractDispatcherSingleActiveConsumer extends AbstractBas
 ? partitionIndex % consumersSize
 : peekConsumerIndexFromHashRing(makeHashRing(consumersSize));
 
-Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, 
consumers.get(index));
+Consumer selectedConsumer = consumers.get(index);
 
-Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
-if (prevConsumer == activeConsumer) {
+if (selectedConsumer == activeConsumer) {
 // Active consumer did not change. Do nothing at this point
 return false;
 } else {
 // If the active consumer is changed, send notification.
+activeConsumer = selectedConsumer;
 scheduleReadOnActiveConsumer();
 return true;
 }
@@ -167,7 +169,7 @@ public abstract class 
AbstractDispatcherSingleActiveConsumer extends AbstractBas
 }
 
 if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
-Consumer actConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+Consumer actConsumer = getActiveConsumer();
 if (actConsumer != null) {
 return 
actConsumer.cnx().checkConnectionLiveness().thenCompose(actConsumerStillAlive 
-> {
 if (actConsumerStillAlive.isEmpty() || 
actConsumerStillAlive.get()) {
@@ -210,7 +212,7 @@ 

(pulsar) branch master updated: [improve][misc] Upgrade jersey to 2.41 (#22290)

2024-03-18 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 0c9d8601698 [improve][misc] Upgrade jersey to 2.41 (#22290)
0c9d8601698 is described below

commit 0c9d86016983b9683aa3b637be38e281090f40f0
Author: Zixuan Liu 
AuthorDate: Mon Mar 18 22:14:11 2024 +0800

[improve][misc] Upgrade jersey to 2.41 (#22290)
---
 distribution/server/src/assemble/LICENSE.bin.txt | 20 ++--
 distribution/shell/src/assemble/LICENSE.bin.txt  | 14 +++---
 pom.xml  |  2 +-
 3 files changed, 18 insertions(+), 18 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index 9e12db74eb1..dac03d966a6 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -582,16 +582,16 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
 - org.glassfish.hk2-osgi-resource-locator-1.0.3.jar
 - org.glassfish.hk2.external-aopalliance-repackaged-2.6.1.jar
  * Jersey
-- org.glassfish.jersey.containers-jersey-container-servlet-2.34.jar
-- org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar
-- org.glassfish.jersey.core-jersey-client-2.34.jar
-- org.glassfish.jersey.core-jersey-common-2.34.jar
-- org.glassfish.jersey.core-jersey-server-2.34.jar
-- org.glassfish.jersey.ext-jersey-entity-filtering-2.34.jar
-- org.glassfish.jersey.media-jersey-media-json-jackson-2.34.jar
-- org.glassfish.jersey.media-jersey-media-multipart-2.34.jar
-- org.glassfish.jersey.inject-jersey-hk2-2.34.jar
- * Mimepull -- org.jvnet.mimepull-mimepull-1.9.13.jar
+- org.glassfish.jersey.containers-jersey-container-servlet-2.41.jar
+- org.glassfish.jersey.containers-jersey-container-servlet-core-2.41.jar
+- org.glassfish.jersey.core-jersey-client-2.41.jar
+- org.glassfish.jersey.core-jersey-common-2.41.jar
+- org.glassfish.jersey.core-jersey-server-2.41.jar
+- org.glassfish.jersey.ext-jersey-entity-filtering-2.41.jar
+- org.glassfish.jersey.media-jersey-media-json-jackson-2.41.jar
+- org.glassfish.jersey.media-jersey-media-multipart-2.41.jar
+- org.glassfish.jersey.inject-jersey-hk2-2.41.jar
+ * Mimepull -- org.jvnet.mimepull-mimepull-1.9.15.jar
 
 Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt
  * Jakarta Activation
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt 
b/distribution/shell/src/assemble/LICENSE.bin.txt
index a9bbd541448..7ae4241dfb9 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -441,13 +441,13 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
 - aopalliance-repackaged-2.6.1.jar
 - osgi-resource-locator-1.0.3.jar
  * Jersey
-- jersey-client-2.34.jar
-- jersey-common-2.34.jar
-- jersey-entity-filtering-2.34.jar
-- jersey-media-json-jackson-2.34.jar
-- jersey-media-multipart-2.34.jar
-- jersey-hk2-2.34.jar
- * Mimepull -- mimepull-1.9.13.jar
+- jersey-client-2.41.jar
+- jersey-common-2.41.jar
+- jersey-entity-filtering-2.41.jar
+- jersey-media-json-jackson-2.41.jar
+- jersey-media-multipart-2.41.jar
+- jersey-hk2-2.41.jar
+ * Mimepull -- mimepull-1.9.15.jar
 
 Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt
  * Jakarta Activation
diff --git a/pom.xml b/pom.xml
index 98eea81c30a..caa2fc49b27 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,7 +148,7 @@ flexible messaging model and an intuitive client 
API.
 0.0.24.Final
 9.4.54.v20240208
 2.5.2
-2.34
+2.41
 1.10.50
 0.16.0
 4.3.8



(pulsar) branch branch-3.2 updated: [improve] [broker] Support create RawReader based on configuration (#22280)

2024-03-18 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new f1bdb78564e [improve] [broker] Support create RawReader based on 
configuration (#22280)
f1bdb78564e is described below

commit f1bdb78564e739ed558fef1e0c9b8ea1d54c402c
Author: Hang Chen 
AuthorDate: Mon Mar 18 18:49:52 2024 +0800

[improve] [broker] Support create RawReader based on configuration (#22280)
---
 .../org/apache/pulsar/client/api/RawReader.java| 11 ++
 .../apache/pulsar/client/impl/RawReaderImpl.java   |  8 +
 .../apache/pulsar/client/impl/RawReaderTest.java   | 39 --
 3 files changed, 56 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
index b7805c36b3b..55483708fdf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.RawReaderImpl;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 
 /**
  * Topic reader which receives raw messages (i.e. as they are stored in the 
managed ledger).
@@ -43,6 +44,16 @@ public interface RawReader {
 return future.thenApply(__ -> r);
 }
 
+static CompletableFuture create(PulsarClient client,
+   
ConsumerConfigurationData consumerConfiguration,
+   boolean 
createTopicIfDoesNotExist) {
+CompletableFuture> future = new CompletableFuture<>();
+RawReader r = new RawReaderImpl((PulsarClientImpl) client,
+consumerConfiguration, future, createTopicIfDoesNotExist);
+return future.thenApply(__ -> r);
+}
+
+
 /**
  * Get the topic for the reader.
  *
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 3d7ad9f5865..5ac051d2271 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -65,6 +65,14 @@ public class RawReaderImpl implements RawReader {
 consumer = new RawConsumerImpl(client, consumerConfiguration, 
consumerFuture, createTopicIfDoesNotExist);
 }
 
+public RawReaderImpl(PulsarClientImpl client, 
ConsumerConfigurationData consumerConfiguration,
+ CompletableFuture> consumerFuture,
+ boolean createTopicIfDoesNotExist) {
+this.consumerConfiguration = consumerConfiguration;
+consumer = new RawConsumerImpl(client, consumerConfiguration, 
consumerFuture, createTopicIfDoesNotExist);
+}
+
+
 @Override
 public String getTopic() {
 return consumerConfiguration.getTopicNames().stream()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index d3fcc36a546..d9ddc00b2e8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -44,6 +44,9 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -56,6 +59,8 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static 
org.apache.pulsar.client.impl.RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE;
+
 @Test(groups = "broker-impl")
 @Slf4j
 public class RawReaderTest extends MockedPulsarServiceBaseTest {
@@ -195,6 +200,36 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
 reader.closeAsync().get(3, TimeUnit.SECONDS);
 }
 
+@Test
+public void testRawReaderWithConfigurationCreation() throws Exception {
+int numKeys = 10;
+
+String topic = "persistent://my-property/my-ns/" +

(pulsar) branch master updated: [improve] [broker] Support create RawReader based on configuration (#22280)

2024-03-18 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 34f8e0e9456 [improve] [broker] Support create RawReader based on 
configuration (#22280)
34f8e0e9456 is described below

commit 34f8e0e9456674cd6459105cd7a3619b113b06cf
Author: Hang Chen 
AuthorDate: Mon Mar 18 18:49:52 2024 +0800

[improve] [broker] Support create RawReader based on configuration (#22280)
---
 .../org/apache/pulsar/client/api/RawReader.java| 11 ++
 .../apache/pulsar/client/impl/RawReaderImpl.java   |  8 +
 .../apache/pulsar/client/impl/RawReaderTest.java   | 39 --
 3 files changed, 56 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
index b7805c36b3b..55483708fdf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.RawReaderImpl;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 
 /**
  * Topic reader which receives raw messages (i.e. as they are stored in the 
managed ledger).
@@ -43,6 +44,16 @@ public interface RawReader {
 return future.thenApply(__ -> r);
 }
 
+static CompletableFuture create(PulsarClient client,
+   
ConsumerConfigurationData consumerConfiguration,
+   boolean 
createTopicIfDoesNotExist) {
+CompletableFuture> future = new CompletableFuture<>();
+RawReader r = new RawReaderImpl((PulsarClientImpl) client,
+consumerConfiguration, future, createTopicIfDoesNotExist);
+return future.thenApply(__ -> r);
+}
+
+
 /**
  * Get the topic for the reader.
  *
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 3d7ad9f5865..5ac051d2271 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -65,6 +65,14 @@ public class RawReaderImpl implements RawReader {
 consumer = new RawConsumerImpl(client, consumerConfiguration, 
consumerFuture, createTopicIfDoesNotExist);
 }
 
+public RawReaderImpl(PulsarClientImpl client, 
ConsumerConfigurationData consumerConfiguration,
+ CompletableFuture> consumerFuture,
+ boolean createTopicIfDoesNotExist) {
+this.consumerConfiguration = consumerConfiguration;
+consumer = new RawConsumerImpl(client, consumerConfiguration, 
consumerFuture, createTopicIfDoesNotExist);
+}
+
+
 @Override
 public String getTopic() {
 return consumerConfiguration.getTopicNames().stream()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index d3fcc36a546..d9ddc00b2e8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -44,6 +44,9 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -56,6 +59,8 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static 
org.apache.pulsar.client.impl.RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE;
+
 @Test(groups = "broker-impl")
 @Slf4j
 public class RawReaderTest extends MockedPulsarServiceBaseTest {
@@ -195,6 +200,36 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
 reader.closeAsync().get(3, TimeUnit.SECONDS);
 }
 
+@Test
+public void testRawReaderWithConfigurationCreation() throws Exception {
+int numKeys = 10;
+
+String topic = "persistent://my-property/my-ns/" +

(pulsar) branch branch-3.2 updated: [fix][ci] Enable CI for branch-3.2 (#22287)

2024-03-18 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 5fae440e58f [fix][ci] Enable CI for branch-3.2 (#22287)
5fae440e58f is described below

commit 5fae440e58fab38ea57a6e7ce1927f01427f951d
Author: Lishen Yao 
AuthorDate: Mon Mar 18 18:37:33 2024 +0800

[fix][ci] Enable CI for branch-3.2 (#22287)
---
 .github/workflows/pulsar-ci.yaml | 1 +
 1 file changed, 1 insertion(+)

diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml
index 02496a82392..ba043ded210 100644
--- a/.github/workflows/pulsar-ci.yaml
+++ b/.github/workflows/pulsar-ci.yaml
@@ -22,6 +22,7 @@ on:
   pull_request:
 branches:
   - master
+  - branch-3.2
   schedule:
 # scheduled job with JDK 17
 - cron: '0 12 * * *'



  1   2   3   4   5   6   7   8   9   10   >