(pulsar) branch master updated: [improve][pip] PIP-347: Add role field in consumer's stat (#22562)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new f3e52b568ec [improve][pip] PIP-347: Add role field in consumer's stat (#22562) f3e52b568ec is described below commit f3e52b568ec7e86e7582bdc425321fe172bc4deb Author: Wenzhi Feng <52550727+thetumb...@users.noreply.github.com> AuthorDate: Thu May 16 13:29:26 2024 +0800 [improve][pip] PIP-347: Add role field in consumer's stat (#22562) --- pip/pip-347.md | 2 +- .../org/apache/pulsar/broker/service/Consumer.java | 1 + .../stats/AuthenticatedConsumerStatsTest.java | 169 + .../pulsar/broker/stats/ConsumerStatsTest.java | 2 +- .../pulsar/common/policies/data/ConsumerStats.java | 3 + .../policies/data/stats/ConsumerStatsImpl.java | 3 + 6 files changed, 178 insertions(+), 2 deletions(-) diff --git a/pip/pip-347.md b/pip/pip-347.md index 5326fed3533..a5d5d76ae17 100644 --- a/pip/pip-347.md +++ b/pip/pip-347.md @@ -34,4 +34,4 @@ Fully compatible. Updated afterwards --> * Mailing List discussion thread: https://lists.apache.org/thread/p9y9r8pb7ygk8f0jd121c1121phvzd09 -* Mailing List voting thread: +* Mailing List voting thread: https://lists.apache.org/thread/sfv0vq498dnjx6k6zdrnn0cw8f22tz05 diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index fe9fbe6a400..c9f417c4bc4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -208,6 +208,7 @@ public class Consumer { stats = new ConsumerStatsImpl(); stats.setAddress(cnx.clientSourceAddressAndPort()); stats.consumerName = consumerName; +stats.appId = appId; stats.setConnectedSince(DateFormatter.format(connectedSince)); stats.setClientVersion(cnx.getClientVersion()); stats.metadata = this.metadata; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java new file mode 100644 index 000..e8cadb72e1e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; +import io.jsonwebtoken.Jwts; +import io.jsonwebtoken.SignatureAlgorithm; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.NoSuchAlgorithmException; +import java.security.PrivateKey; +import java.time.Duration; +import java.util.Base64; +import java.util.Date; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Properties; +import java.util.Set; + +public class AuthenticatedConsumerStatsTest extends ConsumerStatsTest{ +private final String ADMIN_TOKEN; +private final String TOKEN_PUBLIC_KEY; +private final KeyPair kp; + +AuthenticatedConsumerStatsTest() th
(pulsar) branch branch-3.2 updated: [fix][broker] Fix cursor should use latest ledger config (#22644)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 3de8358b22a [fix][broker] Fix cursor should use latest ledger config (#22644) 3de8358b22a is described below commit 3de8358b22aa48bb3fba5d301938609868791924 Author: Zixuan Liu AuthorDate: Fri May 10 10:37:44 2024 +0800 [fix][broker] Fix cursor should use latest ledger config (#22644) Signed-off-by: Zixuan Liu --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 59 +++--- .../mledger/impl/ManagedCursorMXBeanImpl.java | 3 +- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 8 +-- .../mledger/impl/NonDurableCursorImpl.java | 5 +- .../bookkeeper/mledger/impl/OpReadEntry.java | 3 +- .../bookkeeper/mledger/impl/RangeSetWrapper.java | 2 +- .../mledger/impl/ReadOnlyCursorImpl.java | 5 +- .../mledger/impl/ReadOnlyManagedLedgerImpl.java| 2 +- ...ManagedCursorIndividualDeletedMessagesTest.java | 3 +- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 7 ++- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 +- .../broker/service/BrokerBkEnsemblesTests.java | 8 +-- .../service/persistent/PersistentTopicTest.java| 25 + 13 files changed, 76 insertions(+), 56 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 7065af203da..911eca48bac 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -119,7 +119,6 @@ public class ManagedCursorImpl implements ManagedCursor { return 0; }; protected final BookKeeper bookkeeper; -protected final ManagedLedgerConfig config; protected final ManagedLedgerImpl ledger; private final String name; @@ -299,31 +298,30 @@ public class ManagedCursorImpl implements ManagedCursor { void operationFailed(ManagedLedgerException exception); } -ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName) { +ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerImpl ledger, String cursorName) { this.bookkeeper = bookkeeper; this.cursorProperties = Collections.emptyMap(); -this.config = config; this.ledger = ledger; this.name = cursorName; this.individualDeletedMessages = new RangeSetWrapper<>(positionRangeConverter, positionRangeReverseConverter, this); -if (config.isDeletionAtBatchIndexLevelEnabled()) { +if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { this.batchDeletedIndexes = new ConcurrentSkipListMap<>(); } else { this.batchDeletedIndexes = null; } -this.digestType = BookKeeper.DigestType.fromApiDigestType(config.getDigestType()); +this.digestType = BookKeeper.DigestType.fromApiDigestType(getConfig().getDigestType()); STATE_UPDATER.set(this, State.Uninitialized); PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.set(this, 0); PENDING_READ_OPS_UPDATER.set(this, 0); RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE); WAITING_READ_OP_UPDATER.set(this, null); -this.clock = config.getClock(); +this.clock = getConfig().getClock(); this.lastActive = this.clock.millis(); this.lastLedgerSwitchTimestamp = this.clock.millis(); -if (config.getThrottleMarkDelete() > 0.0) { -markDeleteLimiter = RateLimiter.create(config.getThrottleMarkDelete()); +if (getConfig().getThrottleMarkDelete() > 0.0) { +markDeleteLimiter = RateLimiter.create(getConfig().getThrottleMarkDelete()); } else { // Disable mark-delete rate limiter markDeleteLimiter = null; @@ -602,7 +600,7 @@ public class ManagedCursorImpl implements ManagedCursor { if (positionInfo.getIndividualDeletedMessagesCount() > 0) { recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList()); } -if (config.isDeletionAtBatchIndexLevelEnabled() +if (getConfig().isDeletionAtBatchIndexLevelEnabled() && positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) { recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList()); } @@ -611,7 +609,8 @@ public class ManagedCursorImpl implements ManagedCursor { }, null); }; try { -boo
(pulsar) branch branch-3.2 updated: [cleanup][ml] ManagedCursor clean up. (#22246)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 781a02b2085 [cleanup][ml] ManagedCursor clean up. (#22246) 781a02b2085 is described below commit 781a02b20859e61361f1d18c369c5d00d1b2f7fd Author: 道君 AuthorDate: Tue Mar 12 23:36:59 2024 +0800 [cleanup][ml] ManagedCursor clean up. (#22246) --- .../java/org/apache/bookkeeper/mledger/impl/EntryImpl.java| 7 ++- .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 11 +++ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index 6512399173f..80397931357 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -42,6 +42,7 @@ public final class EntryImpl extends AbstractCASReferenceCounted implements Entr private long timestamp; private long ledgerId; private long entryId; +private PositionImpl position; ByteBuf data; private Runnable onDeallocate; @@ -151,7 +152,10 @@ public final class EntryImpl extends AbstractCASReferenceCounted implements Entr @Override public PositionImpl getPosition() { -return new PositionImpl(ledgerId, entryId); +if (position == null) { +position = PositionImpl.get(ledgerId, entryId); +} +return position; } @Override @@ -197,6 +201,7 @@ public final class EntryImpl extends AbstractCASReferenceCounted implements Entr timestamp = -1; ledgerId = -1; entryId = -1; +position = null; recyclerHandle.recycle(this); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 0c8dedd6b21..7065af203da 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1506,10 +1506,7 @@ public class ManagedCursorImpl implements ManagedCursor { Set alreadyAcknowledgedPositions = new HashSet<>(); lock.readLock().lock(); try { -positions.stream() -.filter(position -> ((PositionImpl) position).compareTo(markDeletePosition) <= 0 -|| individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())) -.forEach(alreadyAcknowledgedPositions::add); + positions.stream().filter(this::isMessageDeleted).forEach(alreadyAcknowledgedPositions::add); } finally { lock.readLock().unlock(); } @@ -2281,8 +2278,7 @@ public class ManagedCursorImpl implements ManagedCursor { return; } -if (position.compareTo(markDeletePosition) <= 0 -|| individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())) { +if (isMessageDeleted(position)) { if (config.isDeletionAtBatchIndexLevelEnabled()) { BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position); if (bitSetRecyclable != null) { @@ -3517,8 +3513,7 @@ public class ManagedCursorImpl implements ManagedCursor { @Override public void trimDeletedEntries(List entries) { entries.removeIf(entry -> { -boolean isDeleted = markDeletePosition.compareTo(entry.getLedgerId(), entry.getEntryId()) >= 0 -|| individualDeletedMessages.contains(entry.getLedgerId(), entry.getEntryId()); +boolean isDeleted = isMessageDeleted(entry.getPosition()); if (isDeleted) { entry.release(); }
(pulsar) branch branch-3.2 updated: [fix][admin] Fix can't delete tenant for v1 (#22550)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new aa7f0676e5a [fix][admin] Fix can't delete tenant for v1 (#22550) aa7f0676e5a is described below commit aa7f0676e5a1b4b450da569fb70385523393d8e1 Author: Jiwei Guo AuthorDate: Tue Apr 23 22:04:13 2024 +0800 [fix][admin] Fix can't delete tenant for v1 (#22550) --- .../pulsar/broker/resources/TopicResources.java| 2 +- .../pulsar/broker/auth/AuthorizationTest.java | 28 ++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java index 0963f25c3d3..413184764f5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java @@ -120,7 +120,7 @@ public class TopicResources { return store.exists(path) .thenCompose(exists -> { if (exists) { -return store.delete(path, Optional.empty()); +return store.deleteRecursive(path); } else { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java index e9ad401b878..6c913d42908 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java @@ -33,6 +33,7 @@ import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; @@ -59,11 +60,15 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest { conf.setSystemTopicEnabled(false); conf.setForceDeleteNamespaceAllowed(true); conf.setAuthenticationEnabled(true); +conf.setForceDeleteNamespaceAllowed(true); +conf.setForceDeleteTenantAllowed(true); conf.setAuthenticationProviders( Sets.newHashSet("org.apache.pulsar.broker.auth.MockAuthenticationProvider")); conf.setAuthorizationEnabled(true); conf.setAuthorizationAllowWildcardsMatching(true); conf.setSuperUserRoles(Sets.newHashSet("pulsar.super_user", "pass.pass")); + conf.setBrokerClientAuthenticationPlugin(MockAuthentication.class.getName()); +conf.setBrokerClientAuthenticationParameters("user:pass.pass"); internalSetup(); } @@ -72,6 +77,11 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest { pulsarAdminBuilder.authentication(new MockAuthentication("pass.pass")); } +@Override +protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) { +clientBuilder.authentication(new MockAuthentication("pass.pass")); +} + @AfterClass(alwaysRun = true) @Override public void cleanup() throws Exception { @@ -237,6 +247,24 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest { admin.namespaces().deleteNamespace("p1/c1/ns1", true); admin.tenants().deleteTenant("p1"); + +admin.clusters().deleteCluster("c1"); +} + +@Test +public void testDeleteV1Tenant() throws Exception { +admin.clusters().createCluster("c1", ClusterData.builder().build()); +admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1"))); +waitForChange(); +admin.namespaces().createNamespace("p1/c1/ns1"); +waitForChange(); + + +String topic = "persistent://p1/c1/ns1/ds2"; +admin.topics().createNonPartitionedTopic(topic); + +admin.namespaces().deleteNamespace("p1/c1/ns1", true); +admin.tenants().deleteTenant("p1", true); admin.clusters().deleteCluster("c1"); }
(pulsar) branch master updated: [fix][misc] Correct the description of patternAutoDiscoveryPeriod (#22615)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 3b24b6e0b72 [fix][misc] Correct the description of patternAutoDiscoveryPeriod (#22615) 3b24b6e0b72 is described below commit 3b24b6e0b7250f531c86e5ee2635a9b23467419c Author: jito AuthorDate: Mon May 13 09:29:38 2024 +0900 [fix][misc] Correct the description of patternAutoDiscoveryPeriod (#22615) Signed-off-by: jitokim --- .../src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java | 5 +++-- .../apache/pulsar/client/impl/conf/ConsumerConfigurationData.java| 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 863432b478f..6f3c3be9727 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -464,7 +464,7 @@ public interface ConsumerBuilder extends Cloneable { ConsumerBuilder readCompacted(boolean readCompacted); /** - * Sets topic's auto-discovery period when using a pattern for topics consumer. + * Sets topic's auto-discovery period when using a pattern for topic's consumer. * The period is in minutes, and the default and minimum values are 1 minute. * * @param periodInMinutes @@ -476,7 +476,8 @@ public interface ConsumerBuilder extends Cloneable { /** - * Sets topic's auto-discovery period when using a pattern for topics consumer. + * Sets topic's auto-discovery period when using a pattern for topic's consumer. + * The default value of period is 1 minute, with a minimum of 1 second. * * @param interval *the amount of delay between checks for diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 3ae0e977d13..18529276c9c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -310,7 +310,7 @@ public class ConsumerConfigurationData implements Serializable, Cloneable { name = "patternAutoDiscoveryPeriod", value = "Topic auto discovery period when using a pattern for topic's consumer.\n" + "\n" -+ "The default and minimum value is 1 minute." ++ "The default value is 1 minute, with a minimum of 1 second." ) private int patternAutoDiscoveryPeriod = 60;
(pulsar) branch branch-3.2 updated: [improve] [broker] Add additionalSystemCursorNames ignore list for TTL check (#22614)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new e98370e21cb [improve] [broker] Add additionalSystemCursorNames ignore list for TTL check (#22614) e98370e21cb is described below commit e98370e21cb6a6ddf0b5ef9c8123046c7e5b8e3d Author: Hang Chen AuthorDate: Thu May 9 20:45:56 2024 +0800 [improve] [broker] Add additionalSystemCursorNames ignore list for TTL check (#22614) --- conf/broker.conf | 4 + conf/standalone.conf | 4 + .../apache/pulsar/broker/ServiceConfiguration.java | 7 ++ .../broker/service/persistent/PersistentTopic.java | 7 +- .../pulsar/broker/service/MessageTTLTest.java | 96 ++ 5 files changed, 117 insertions(+), 1 deletion(-) diff --git a/conf/broker.conf b/conf/broker.conf index e0ebbe3043a..c5beda206a4 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -180,6 +180,10 @@ backlogQuotaDefaultRetentionPolicy=producer_request_hold # Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0) ttlDurationDefaultInSeconds=0 +# Additional system subscriptions that will be ignored by ttl check. The cursor names are comma separated. +# Default is empty. +# additionalSystemCursorNames= + # Enable topic auto creation if new producer or consumer connected (disable auto creation with value false) allowAutoTopicCreation=true diff --git a/conf/standalone.conf b/conf/standalone.conf index 5eb9fadcf19..5ca0c683d74 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -121,6 +121,10 @@ backlogQuotaDefaultLimitSecond=-1 # Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0) ttlDurationDefaultInSeconds=0 +# Additional system subscriptions that will be ignored by ttl check. The cursor names are comma separated. +# Default is empty. +# additionalSystemCursorNames= + # Enable the deletion of inactive topics. This parameter need to cooperate with the allowAutoTopicCreation parameter. # If brokerDeleteInactiveTopicsEnabled is set to true, we should ensure that allowAutoTopicCreation is also set to true. brokerDeleteInactiveTopicsEnabled=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index f53eb7e183f..10ffced0321 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -646,6 +646,13 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int ttlDurationDefaultInSeconds = 0; +@FieldContext( +category = CATEGORY_POLICIES, +doc = "Additional system subscriptions that will be ignored by ttl check. " ++ "The cursor names are comma separated. Default is empty." +) +private Set additionalSystemCursorNames = new TreeSet<>(); + @FieldContext( category = CATEGORY_POLICIES, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index b4f295dbd97..472387a0a9b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -39,6 +39,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -272,6 +273,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal private final ExecutorService orderedExecutor; private volatile CloseFutures closeFutures; +private Set additionalSystemCursorNames = new TreeSet<>(); /*** * We use 3 futures to prevent a new closing if there is an in-progress deletion or closing. We make Pulsar return @@ -384,6 +386,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } else { shadowSourceTopic = null; } +additionalSystemCursorNames = brokerService.pulsar().getConfiguration().getAdditionalSystemCursorNames(); } @Override @@ -1888,7 +1891,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
(pulsar) branch master updated: [improve] [broker] Add additionalSystemCursorNames ignore list for TTL check (#22614)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new bed032e714a [improve] [broker] Add additionalSystemCursorNames ignore list for TTL check (#22614) bed032e714a is described below commit bed032e714aff9f5d2594bdc80a3e7888e53b1bf Author: Hang Chen AuthorDate: Thu May 9 20:45:56 2024 +0800 [improve] [broker] Add additionalSystemCursorNames ignore list for TTL check (#22614) --- conf/broker.conf | 4 + conf/standalone.conf | 4 + .../apache/pulsar/broker/ServiceConfiguration.java | 7 ++ .../broker/service/persistent/PersistentTopic.java | 7 +- .../pulsar/broker/service/MessageTTLTest.java | 96 ++ 5 files changed, 117 insertions(+), 1 deletion(-) diff --git a/conf/broker.conf b/conf/broker.conf index 1b51ff47551..1ef68a0395c 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -180,6 +180,10 @@ backlogQuotaDefaultRetentionPolicy=producer_request_hold # Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0) ttlDurationDefaultInSeconds=0 +# Additional system subscriptions that will be ignored by ttl check. The cursor names are comma separated. +# Default is empty. +# additionalSystemCursorNames= + # Enable topic auto creation if new producer or consumer connected (disable auto creation with value false) allowAutoTopicCreation=true diff --git a/conf/standalone.conf b/conf/standalone.conf index 51035235d4d..a8615b70293 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -121,6 +121,10 @@ backlogQuotaDefaultLimitSecond=-1 # Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0) ttlDurationDefaultInSeconds=0 +# Additional system subscriptions that will be ignored by ttl check. The cursor names are comma separated. +# Default is empty. +# additionalSystemCursorNames= + # Enable the deletion of inactive topics. This parameter need to cooperate with the allowAutoTopicCreation parameter. # If brokerDeleteInactiveTopicsEnabled is set to true, we should ensure that allowAutoTopicCreation is also set to true. brokerDeleteInactiveTopicsEnabled=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index a9d170ea5de..9efe1856509 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -652,6 +652,13 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int ttlDurationDefaultInSeconds = 0; +@FieldContext( +category = CATEGORY_POLICIES, +doc = "Additional system subscriptions that will be ignored by ttl check. " ++ "The cursor names are comma separated. Default is empty." +) +private Set additionalSystemCursorNames = new TreeSet<>(); + @FieldContext( category = CATEGORY_POLICIES, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 7228bdeb2d3..28bc27f7961 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -40,6 +40,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -279,6 +280,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal private final ExecutorService orderedExecutor; private volatile CloseFutures closeFutures; +private Set additionalSystemCursorNames = new TreeSet<>(); @Getter private final PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics(); @@ -414,6 +416,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } else { shadowSourceTopic = null; } +additionalSystemCursorNames = brokerService.pulsar().getConfiguration().getAdditionalSystemCursorNames(); } @Override @@ -1934,7 +1937,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal int messageTtlInSeconds = topicPolic
(pulsar) branch master updated: [fix][ml] Remove duplicated field initialization of ML (#22676)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 88feb874bb3 [fix][ml] Remove duplicated field initialization of ML (#22676) 88feb874bb3 is described below commit 88feb874bb3ad58a74b3d40d931b2aa7380dc7e1 Author: 道君 AuthorDate: Thu May 9 08:53:59 2024 +0800 [fix][ml] Remove duplicated field initialization of ML (#22676) --- .../java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index e5e163127f7..b12346cadc9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -365,9 +365,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { this.mlOwnershipChecker = mlOwnershipChecker; this.propertiesMap = new ConcurrentHashMap<>(); this.inactiveLedgerRollOverTimeMs = config.getInactiveLedgerRollOverTimeMs(); -if (config.getManagedLedgerInterceptor() != null) { -this.managedLedgerInterceptor = config.getManagedLedgerInterceptor(); -} this.minBacklogCursorsForCaching = config.getMinimumBacklogCursorsForCaching(); this.minBacklogEntriesForCaching = config.getMinimumBacklogEntriesForCaching(); this.maxBacklogBetweenCursorsForCaching = config.getMaxBacklogBetweenCursorsForCaching();
(pulsar) branch branch-3.2 updated: Revert "[fix][sec] Upgrade Debezium oracle connector version to avoid… (#22668)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new f7984d74d19 Revert "[fix][sec] Upgrade Debezium oracle connector version to avoid… (#22668) f7984d74d19 is described below commit f7984d74d19e50d31f7ea2abacef8430e4cf95bd Author: Lari Hotari AuthorDate: Wed May 8 13:43:24 2024 +0300 Revert "[fix][sec] Upgrade Debezium oracle connector version to avoid… (#22668) --- pom.xml | 1 - pulsar-io/debezium/oracle/pom.xml | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index bd231f92a4d..c817c8e1858 100644 --- a/pom.xml +++ b/pom.xml @@ -198,7 +198,6 @@ flexible messaging model and an intuitive client API. 1.2.4 8.12.1 1.9.7.Final -2.2.0.Final 42.5.0 8.0.30 diff --git a/pulsar-io/debezium/oracle/pom.xml b/pulsar-io/debezium/oracle/pom.xml index 214e9c15c3a..1018d5f9573 100644 --- a/pulsar-io/debezium/oracle/pom.xml +++ b/pulsar-io/debezium/oracle/pom.xml @@ -48,8 +48,7 @@ io.debezium debezium-connector-oracle - ${debezium.oracle.version} - runtime + ${debezium.version}
(pulsar) branch master updated: Revert "[fix][sec] Upgrade Debezium oracle connector version to avoid… (#22668)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new ca44b9bc7c4 Revert "[fix][sec] Upgrade Debezium oracle connector version to avoid… (#22668) ca44b9bc7c4 is described below commit ca44b9bc7c48eca59692744399872e1f14f4fe6f Author: Lari Hotari AuthorDate: Wed May 8 13:43:24 2024 +0300 Revert "[fix][sec] Upgrade Debezium oracle connector version to avoid… (#22668) --- pom.xml | 1 - pulsar-io/debezium/oracle/pom.xml | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index cec3b3c60db..c2f563eb60e 100644 --- a/pom.xml +++ b/pom.xml @@ -199,7 +199,6 @@ flexible messaging model and an intuitive client API. 1.2.4 8.12.1 1.9.7.Final -2.2.0.Final 42.5.0 8.0.30 diff --git a/pulsar-io/debezium/oracle/pom.xml b/pulsar-io/debezium/oracle/pom.xml index b22a5785dfb..c69640ecff7 100644 --- a/pulsar-io/debezium/oracle/pom.xml +++ b/pulsar-io/debezium/oracle/pom.xml @@ -48,8 +48,7 @@ io.debezium debezium-connector-oracle - ${debezium.oracle.version} - runtime + ${debezium.version}
(pulsar) branch branch-3.2 updated: [fix] Fix Reader can be stuck from transaction aborted messages. (#22610)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 763f90f6dd3 [fix] Fix Reader can be stuck from transaction aborted messages. (#22610) 763f90f6dd3 is described below commit 763f90f6dd317819d93990348bfc8519029c727d Author: 道君 AuthorDate: Tue May 7 20:45:16 2024 +0800 [fix] Fix Reader can be stuck from transaction aborted messages. (#22610) --- .../mledger/util/ManagedLedgerImplUtils.java | 17 ++ .../broker/service/persistent/PersistentTopic.java | 24 .../pulsar/broker/transaction/TransactionTest.java | 68 ++ .../buffer/TopicTransactionBufferTest.java | 36 4 files changed, 110 insertions(+), 35 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java index cd8671b0e62..01de115290a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java @@ -38,11 +38,7 @@ public class ManagedLedgerImplUtils { final Predicate predicate, final PositionImpl startPosition) { CompletableFuture future = new CompletableFuture<>(); -if (!ledger.isValidPosition(startPosition)) { -future.complete(startPosition); -} else { -internalAsyncReverseFindPositionOneByOne(ledger, predicate, startPosition, future); -} +internalAsyncReverseFindPositionOneByOne(ledger, predicate, startPosition, future); return future; } @@ -50,6 +46,10 @@ public class ManagedLedgerImplUtils { final Predicate predicate, final PositionImpl position, final CompletableFuture future) { +if (!ledger.isValidPosition(position)) { +future.complete(position); +return; +} ledger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryComplete(Entry entry, Object ctx) { @@ -60,12 +60,7 @@ public class ManagedLedgerImplUtils { return; } PositionImpl previousPosition = ledger.getPreviousPosition((PositionImpl) position); -if (!ledger.isValidPosition(previousPosition)) { -future.complete(previousPosition); -} else { -internalAsyncReverseFindPositionOneByOne(ledger, predicate, -ledger.getPreviousPosition((PositionImpl) position), future); -} +internalAsyncReverseFindPositionOneByOne(ledger, predicate, previousPosition, future); } catch (Exception e) { future.completeExceptionally(e); } finally { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 60eb700fc06..fa731b860f7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3561,18 +3561,18 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Override public CompletableFuture getLastDispatchablePosition() { -PositionImpl maxReadPosition = getMaxReadPosition(); -// If `maxReadPosition` is not equal to `LastPosition`. It means that there are uncommitted transactions. -// so return `maxRedPosition` directly. -if (maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0) { -return CompletableFuture.completedFuture(maxReadPosition); -} else { -return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> { -MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer()); -// If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer -return !Markers.isServerOnlyMarker(md); -}, maxReadPosition); -} +return ManagedLedgerImplUtils.asyncGetLastVali
(pulsar) branch branch-3.2 updated: [fix][broker] avoid offload system topic (#22497)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new c3c17dee756 [fix][broker] avoid offload system topic (#22497) c3c17dee756 is described below commit c3c17dee7567d0a182affb1991e1e35098689d9b Author: Qiang Zhao AuthorDate: Wed May 8 13:10:49 2024 +0800 [fix][broker] avoid offload system topic (#22497) Co-authored-by: 道君 --- .../pulsar/broker/service/BrokerService.java | 8 +- .../pulsar/broker/service/BrokerServiceTest.java | 94 ++ 2 files changed, 101 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 032d4dd9369..60d56c0d908 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1906,7 +1906,13 @@ public class BrokerService implements Closeable { topicLevelOffloadPolicies, OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)), getPulsar().getConfig().getProperties()); -if (NamespaceService.isSystemServiceNamespace(namespace.toString())) { +if (NamespaceService.isSystemServiceNamespace(namespace.toString()) +|| SystemTopicNames.isSystemTopic(topicName)) { +/* + Avoid setting broker internal system topics using off-loader because some of them are the + preconditions of other topics. The slow replying log speed will cause a delay in all the topic + loading.(timeout) + */ managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE); } else { if (topicLevelOffloadPolicies != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index fcf11fad708..ab0b8f813ea 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -67,12 +67,15 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.commons.lang3.StringUtils; +import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; @@ -111,6 +114,9 @@ import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.LocalPolicies; +import org.apache.pulsar.common.policies.data.OffloadPolicies; +import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; +import org.apache.pulsar.common.policies.data.OffloadedReadPriority; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; @@ -1772,4 +1778,92 @@ public class BrokerServiceTest extends BrokerTestBase { fail("Unsubscribe failed"); } } + + +@Test +public void testOffloadConfShouldNotAppliedForSystemTopic() throws PulsarAdminException { +final String driver = "aws-s3"; +final String region = "test-region"; +final String bucket = "test-bucket"; +final String role = "test-role"; +final String roleSessionName = "test-role-session-name"; +final String credentialId = "test-credential-id"; +final String credentialSecret = "test-credential-secret"; +final String endPoint = "test-endpoint"; +final Integer maxBlockSizeInBytes = 5; +final Integer readBufferSizeInBytes = 2; +final Long offloadThresholdInBytes = 10L; +final Long offloadThresholdInSeconds = 1000L;
(pulsar) branch branch-3.2 updated: [improve][ws] Add memory limit configuration for Pulsar client used in Websocket proxy (#22666)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new e32cdfb113b [improve][ws] Add memory limit configuration for Pulsar client used in Websocket proxy (#22666) e32cdfb113b is described below commit e32cdfb113b1693a2420e39ab40f985b59a44899 Author: Lari Hotari AuthorDate: Wed May 8 06:56:35 2024 +0300 [improve][ws] Add memory limit configuration for Pulsar client used in Websocket proxy (#22666) --- conf/broker.conf | 3 +++ conf/standalone.conf | 3 +++ conf/websocket.conf| 3 +++ .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java | 7 +++ .../main/java/org/apache/pulsar/websocket/WebSocketService.java| 3 ++- .../pulsar/websocket/service/WebSocketProxyConfiguration.java | 3 +++ 6 files changed, 21 insertions(+), 1 deletion(-) diff --git a/conf/broker.conf b/conf/broker.conf index dd0f3e49e1f..e0ebbe3043a 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1539,6 +1539,9 @@ webSocketNumServiceThreads= # Number of connections per Broker in Pulsar Client used in WebSocket proxy webSocketConnectionsPerBroker= +# Memory limit in MBs for direct memory in Pulsar Client used in WebSocket proxy +webSocketPulsarClientMemoryLimitInMB=0 + # Time in milliseconds that idle WebSocket session times out webSocketSessionIdleTimeoutMillis=30 diff --git a/conf/standalone.conf b/conf/standalone.conf index 316143ab49d..5eb9fadcf19 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -967,6 +967,9 @@ webSocketNumIoThreads=8 # Number of connections per Broker in Pulsar Client used in WebSocket proxy webSocketConnectionsPerBroker=8 +# Memory limit in MBs for direct memory in Pulsar Client used in WebSocket proxy +webSocketPulsarClientMemoryLimitInMB=0 + # Time in milliseconds that idle WebSocket session times out webSocketSessionIdleTimeoutMillis=30 diff --git a/conf/websocket.conf b/conf/websocket.conf index 9051f3b590c..91f7f7d4c23 100644 --- a/conf/websocket.conf +++ b/conf/websocket.conf @@ -71,6 +71,9 @@ numHttpServerThreads= # Number of connections per Broker in Pulsar Client used in WebSocket proxy webSocketConnectionsPerBroker= +# Memory limit in MBs for direct memory in Pulsar Client used in WebSocket proxy +webSocketPulsarClientMemoryLimitInMB=0 + # Time in milliseconds that idle WebSocket session times out webSocketSessionIdleTimeoutMillis=30 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index d1f2e9b585f..f53eb7e183f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2892,6 +2892,13 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Number of connections per Broker in Pulsar Client used in WebSocket proxy" ) private int webSocketConnectionsPerBroker = Runtime.getRuntime().availableProcessors(); + +@FieldContext( +category = CATEGORY_WEBSOCKET, +doc = "Memory limit in MBs for direct memory in Pulsar Client used in WebSocket proxy" +) +private int webSocketPulsarClientMemoryLimitInMB = 0; + @FieldContext( category = CATEGORY_WEBSOCKET, doc = "Time in milliseconds that idle WebSocket session times out" diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java index 66b2a0075ec..889f4431cc3 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java @@ -195,7 +195,8 @@ public class WebSocketService implements Closeable { private PulsarClient createClientInstance(ClusterData clusterData) throws IOException { ClientBuilder clientBuilder = PulsarClient.builder() // -.memoryLimit(0, SizeUnit.BYTES) + .memoryLimit(SizeUnit.MEGA_BYTES.toBytes(config.getWebSocketPulsarClientMemoryLimitInMB()), +SizeUnit.BYTES) .statsInterval(0, TimeUnit.SECONDS) // .enableTls(config.isTlsEnabled()) // .allowTlsInsecureConnection(config.isTlsAllowInsecureConnection()) // diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/a
(pulsar) branch branch-3.2 updated: [fix][broker] Disable system topic message deduplication (#22582)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new b6f464ea0a1 [fix][broker] Disable system topic message deduplication (#22582) b6f464ea0a1 is described below commit b6f464ea0a17786fb857aac5111dcc394cba8f56 Author: Qiang Zhao AuthorDate: Wed May 8 10:53:53 2024 +0800 [fix][broker] Disable system topic message deduplication (#22582) --- .../org/apache/pulsar/broker/service/Topic.java| 10 +++ .../service/persistent/MessageDeduplication.java | 6 +--- .../broker/service/persistent/PersistentTopic.java | 9 +++--- .../broker/service/persistent/SystemTopic.java | 16 +++ .../service/persistent/MessageDuplicationTest.java | 32 ++ 5 files changed, 64 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 1da8cfce4ee..c2eefcd18e3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -213,6 +213,16 @@ public interface Topic { void checkCursorsToCacheEntries(); +/** + * Indicate if the current topic enabled server side deduplication. + * This is a dynamic configuration, user may update it by namespace/topic policies. + * + * @return whether enabled server side deduplication + */ +default boolean isDeduplicationEnabled() { +return false; +} + void checkDeduplicationSnapshot(); void checkMessageExpiry(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index e508661364d..ab3b799093b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -217,7 +217,7 @@ public class MessageDeduplication { * returning a future to track the completion of the task */ public CompletableFuture checkStatus() { -boolean shouldBeEnabled = isDeduplicationEnabled(); +boolean shouldBeEnabled = topic.isDeduplicationEnabled(); synchronized (this) { if (status == Status.Recovering || status == Status.Removing) { // If there's already a transition happening, check later for status @@ -472,10 +472,6 @@ public class MessageDeduplication { }, null); } -private boolean isDeduplicationEnabled() { -return topic.getHierarchyTopicPolicies().getDeduplicationEnabled().get(); -} - /** * Topic will call this method whenever a producer connects. */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index e99bd1425f4..60eb700fc06 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2146,10 +2146,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return future; } -public boolean isDeduplicationEnabled() { -return messageDeduplication.isEnabled(); -} - @Override public int getNumberOfConsumers() { int count = 0; @@ -4080,6 +4076,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return ledger.isMigrated(); } +public boolean isDeduplicationEnabled() { +return getHierarchyTopicPolicies().getDeduplicationEnabled().get(); +} + public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID, String subName) { return this.subscriptions.get(subName).getTransactionInPendingAckStats(txnID); } @@ -4104,4 +4104,5 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal public Optional getShadowSourceTopic() { return Optional.ofNullable(shadowSourceTopic); } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java index 720ae3c5189..f2cec2138a3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java @@ -80,6 +80,22 @@ public class
(pulsar) branch branch-3.2 updated: [fix][fn]make sure the classloader for ContextImpl is `functionClassLoader` in different runtimes (#22501)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 2c92ae31722 [fix][fn]make sure the classloader for ContextImpl is `functionClassLoader` in different runtimes (#22501) 2c92ae31722 is described below commit 2c92ae317222ac3e434a497aa458792f88debe75 Author: Rui Fu AuthorDate: Wed May 1 13:18:05 2024 +0800 [fix][fn]make sure the classloader for ContextImpl is `functionClassLoader` in different runtimes (#22501) --- .../apache/pulsar/functions/instance/JavaInstanceRunnable.java| 8 +++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 21f125d3497..f1b9af00f9d 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -283,13 +283,19 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { Logger instanceLog = LoggerFactory.getILoggerFactory().getLogger( "function-" + instanceConfig.getFunctionDetails().getName()); Thread currentThread = Thread.currentThread(); +ClassLoader clsLoader = currentThread.getContextClassLoader(); Consumer fatalHandler = throwable -> { this.deathException = throwable; currentThread.interrupt(); }; -return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider, +try { +Thread.currentThread().setContextClassLoader(functionClassLoader); +return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider, collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager, pulsarAdmin, clientBuilder, fatalHandler); +} finally { +Thread.currentThread().setContextClassLoader(clsLoader); +} } public interface AsyncResultConsumer {
(pulsar) branch branch-3.2 updated: [fix][test] Clear MockedPulsarServiceBaseTest fields to prevent test runtime memory leak (#22659)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new ebbdad1e6ab [fix][test] Clear MockedPulsarServiceBaseTest fields to prevent test runtime memory leak (#22659) ebbdad1e6ab is described below commit ebbdad1e6abfe0233ddc715a49f1facccd6991f8 Author: Lari Hotari AuthorDate: Mon May 6 21:48:47 2024 +0300 [fix][test] Clear MockedPulsarServiceBaseTest fields to prevent test runtime memory leak (#22659) --- .../org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java | 7 +++ 1 file changed, 7 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index bd08ced1e03..248bd0e720e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -275,15 +275,22 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport { } if (brokerGateway != null) { brokerGateway.close(); +brokerGateway = null; } if (pulsarTestContext != null) { pulsarTestContext.close(); pulsarTestContext = null; } + resetConfig(); callCloseables(closeables); closeables.clear(); onCleanup(); + +// clear fields to avoid test runtime memory leak, pulsarTestContext already handles closing of these instances +pulsar = null; +mockZooKeeper = null; +mockZooKeeperGlobal = null; } protected void closeAdmin() {
(pulsar) branch branch-3.2 updated: [fix][sec] Upgrade Debezium oracle connector version to avoid CVE-2023-4586 (#22641)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 412a02a4157 [fix][sec] Upgrade Debezium oracle connector version to avoid CVE-2023-4586 (#22641) 412a02a4157 is described below commit 412a02a4157a1ce0f16f8a2c6e119913352cba80 Author: Nikhil Erigila <60037808+nikhilerigil...@users.noreply.github.com> AuthorDate: Sat May 4 02:00:28 2024 +0530 [fix][sec] Upgrade Debezium oracle connector version to avoid CVE-2023-4586 (#22641) --- pom.xml | 1 + pulsar-io/debezium/oracle/pom.xml | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index c817c8e1858..bd231f92a4d 100644 --- a/pom.xml +++ b/pom.xml @@ -198,6 +198,7 @@ flexible messaging model and an intuitive client API. 1.2.4 8.12.1 1.9.7.Final +2.2.0.Final 42.5.0 8.0.30 diff --git a/pulsar-io/debezium/oracle/pom.xml b/pulsar-io/debezium/oracle/pom.xml index 1018d5f9573..214e9c15c3a 100644 --- a/pulsar-io/debezium/oracle/pom.xml +++ b/pulsar-io/debezium/oracle/pom.xml @@ -48,7 +48,8 @@ io.debezium debezium-connector-oracle - ${debezium.version} + ${debezium.oracle.version} + runtime
(pulsar) branch branch-3.2 updated: [fix][sec] Upgrade elasticsearch-java version to avoid CVE-2023-4043 (#22640)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new e5af8ef5ad3 [fix][sec] Upgrade elasticsearch-java version to avoid CVE-2023-4043 (#22640) e5af8ef5ad3 is described below commit e5af8ef5ad34c94b7e18a311055fc25e766ea646 Author: Nikhil Erigila <60037808+nikhilerigil...@users.noreply.github.com> AuthorDate: Sat May 4 02:51:48 2024 +0530 [fix][sec] Upgrade elasticsearch-java version to avoid CVE-2023-4043 (#22640) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 8c98cacfbb6..c817c8e1858 100644 --- a/pom.xml +++ b/pom.xml @@ -196,7 +196,7 @@ flexible messaging model and an intuitive client API. 3.3.5 2.4.10 1.2.4 -8.5.2 +8.12.1 1.9.7.Final 42.5.0 8.0.30
(pulsar) branch branch-3.2 updated: [fix][sec] Upgrade aws-sdk.version to avoid CVE-2024-21634 (#22633)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 40eb3914651 [fix][sec] Upgrade aws-sdk.version to avoid CVE-2024-21634 (#22633) 40eb3914651 is described below commit 40eb39146513c0f9f74142b5c12e9c53e74f318d Author: Nikhil Erigila <60037808+nikhilerigil...@users.noreply.github.com> AuthorDate: Thu May 2 18:58:02 2024 +0530 [fix][sec] Upgrade aws-sdk.version to avoid CVE-2024-21634 (#22633) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 8aa8bf36c98..8c98cacfbb6 100644 --- a/pom.xml +++ b/pom.xml @@ -182,7 +182,7 @@ flexible messaging model and an intuitive client API. 4.5.0 3.4.0 5.18.0 -1.12.262 +1.12.638 1.11.3 2.10.10 2.6.0
(pulsar) branch branch-3.2 updated: [fix] [client] Fix Consumer should return configured batch receive max messages (#22619)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new bc0c4a41b33 [fix] [client] Fix Consumer should return configured batch receive max messages (#22619) bc0c4a41b33 is described below commit bc0c4a41b33bae1344e19555bd5b8fd268997a5c Author: Rajan Dhabalia AuthorDate: Thu May 2 16:57:49 2024 -0700 [fix] [client] Fix Consumer should return configured batch receive max messages (#22619) --- .../client/api/ConsumerBatchReceiveTest.java | 8 +++--- .../client/api/SimpleProducerConsumerTest.java | 29 ++ .../pulsar/client/impl/ConsumerBuilderImpl.java| 4 +++ 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java index d54b1c99e3e..974d25aad64 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java @@ -112,7 +112,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { // Number of message limitation exceed receiverQueue size { BatchReceivePolicy.builder() -.maxNumMessages(70) +.maxNumMessages(50) .build(), true, 50, false }, // Number of message limitation exceed receiverQueue size and timeout limitation @@ -147,7 +147,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { // Number of message limitation exceed receiverQueue size { BatchReceivePolicy.builder() -.maxNumMessages(70) +.maxNumMessages(50) .build(), false, 50, false }, // Number of message limitation exceed receiverQueue size and timeout limitation @@ -248,7 +248,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { // Number of message limitation exceed receiverQueue size { BatchReceivePolicy.builder() -.maxNumMessages(70) +.maxNumMessages(50) .build(), true, 50, true }, // Number of message limitation exceed receiverQueue size and timeout limitation @@ -283,7 +283,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { // Number of message limitation exceed receiverQueue size { BatchReceivePolicy.builder() -.maxNumMessages(70) +.maxNumMessages(50) .build(), false, 50, true }, // Number of message limitation exceed receiverQueue size and timeout limitation diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 691f501777e..e9bb86fa33b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -4825,6 +4825,35 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { admin.topics().delete(topic, false); } +/** + * It verifies that consumer receives configured number of messages into the batch. + * @throws Exception + */ +@Test +public void testBatchReceiveWithMaxBatchSize() throws Exception { +int maxBatchSize = 100; +final int internalQueueSize = 10; +final int maxBytes = 200; +final int timeOutInSeconds = 900; +final String topic = "persistent://my-property/my-ns/testBatchReceive"; +BatchReceivePolicy batchReceivePolicy = BatchReceivePolicy.builder().maxNumBytes(maxBytes) +.maxNumMessages(maxBatchSize).timeout(timeOutInSeconds, TimeUnit.SECONDS).build(); +@Cleanup +Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic) +.subscriptionName("my-subscriber-name") +.receiverQueueSize(internalQueueSize) +.batchReceivePolicy(batchReceivePolicy).subscribe(); +@Cleanup +Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create(); + +final i
(pulsar) branch branch-3.2 updated: [fix][broker] Avoid being stuck when closing the broker with extensible load manager (#22573)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 892151bfcc2 [fix][broker] Avoid being stuck when closing the broker with extensible load manager (#22573) 892151bfcc2 is described below commit 892151bfcc2b64b1ee0a9f05a182a488d1554ef5 Author: Yunze Xu AuthorDate: Fri Apr 26 21:30:15 2024 +0800 [fix][broker] Avoid being stuck when closing the broker with extensible load manager (#22573) --- .../org/apache/pulsar/broker/PulsarService.java| 3 + .../store/TableViewLoadDataStoreImpl.java | 6 +- .../pulsar/broker/service/BrokerService.java | 11 +++ .../extensions/ExtensibleLoadManagerCloseTest.java | 107 + 4 files changed, 122 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 499a981259f..bf266d44d83 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -431,6 +431,9 @@ public class PulsarService implements AutoCloseable, ShutdownService { return closeFuture; } LOG.info("Closing PulsarService"); +if (brokerService != null) { +brokerService.unloadNamespaceBundlesGracefully(); +} state = State.Closing; // close the service in reverse order v.s. in which they are started diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java index d916e917162..81cf33b4a55 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java @@ -161,12 +161,8 @@ public class TableViewLoadDataStoreImpl implements LoadDataStore { } private void validateProducer() { -if (producer == null || !producer.isConnected()) { +if (producer == null) { try { -if (producer != null) { -producer.close(); -} -producer = null; startProducer(); log.info("Restarted producer on {}", topic); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index ea55a43c7f0..032d4dd9369 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -288,6 +288,7 @@ public class BrokerService implements Closeable { private Set brokerEntryPayloadProcessors; private final TopicEventsDispatcher topicEventsDispatcher = new TopicEventsDispatcher(); +private volatile boolean unloaded = false; public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception { this.pulsar = pulsar; @@ -869,9 +870,13 @@ public class BrokerService implements Closeable { } public void unloadNamespaceBundlesGracefully(int maxConcurrentUnload, boolean closeWithoutWaitingClientDisconnect) { +if (unloaded) { +return; +} try { log.info("Unloading namespace-bundles..."); // make broker-node unavailable from the cluster +long disableBrokerStartTime = System.nanoTime(); if (pulsar.getLoadManager() != null && pulsar.getLoadManager().get() != null) { try { pulsar.getLoadManager().get().disableBroker(); @@ -880,6 +885,10 @@ public class BrokerService implements Closeable { // still continue and release bundle ownership as broker's registration node doesn't exist. } } +double disableBrokerTimeSeconds = +TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - disableBrokerStartTime)) +/ 1000.0; +log.info("Disable broker in load manager completed in {} seconds", disableBrokerTimeSeconds); // unload all namespace-bundles gracefully long closeTopicsStartTime = System.nanoTime(); @@ -909,6 +918,8 @@ public class BrokerService implements Closeable { } } catch (Exception e) {
(pulsar) branch branch-3.2 updated: [fix][io] Fix es index creation (#22654)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new b86ebaa05d6 [fix][io] Fix es index creation (#22654) b86ebaa05d6 is described below commit b86ebaa05d62a8c79e3f8f2bc91e72c40cc23e4d Author: Zixuan Liu AuthorDate: Mon May 6 20:35:51 2024 +0800 [fix][io] Fix es index creation (#22654) Signed-off-by: Zixuan Liu --- .../client/elastic/ElasticSearchJavaRestClient.java | 4 ++-- .../pulsar/io/elasticsearch/ElasticSearchSinkTests.java | 13 + 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java index afda5ba0e74..133daa8cd6a 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java @@ -144,7 +144,7 @@ public class ElasticSearchJavaRestClient extends RestClient { public boolean deleteDocument(String index, String documentId) throws IOException { final DeleteRequest req = new DeleteRequest.Builder() -.index(config.getIndexName()) +.index(index) .id(documentId) .build(); @@ -156,7 +156,7 @@ public class ElasticSearchJavaRestClient extends RestClient { public boolean indexDocument(String index, String documentId, String documentSource) throws IOException { final Map mapped = objectMapper.readValue(documentSource, Map.class); final IndexRequest indexRequest = new IndexRequest.Builder<>() -.index(config.getIndexName()) +.index(index) .document(mapped) .id(documentId) .build(); diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java index 9a2cb4ab565..f1da6fd0c7e 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import co.elastic.clients.transport.ElasticsearchTransport; import com.fasterxml.jackson.core.JsonParseException; @@ -43,6 +44,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.GenericObject; @@ -152,6 +154,7 @@ public abstract class ElasticSearchSinkTests extends ElasticSearchTestBase { }); when(mockRecord.getSchema()).thenAnswer((Answer>>) invocation -> kvSchema); +when(mockRecord.getEventTime()).thenAnswer(invocation -> Optional.of(System.currentTimeMillis())); } @AfterMethod(alwaysRun = true) @@ -209,6 +212,16 @@ public abstract class ElasticSearchSinkTests extends ElasticSearchTestBase { verify(mockRecord, times(100)).ack(); } +@Test +public final void send1WithFormattedIndexTest() throws Exception { +map.put("indexName", "test-formatted-index-%{+-MM-dd}"); +sink.open(map, mockSinkContext); +send(1); +verify(mockRecord, times(1)).ack(); +String value = getHitIdAtIndex("test-formatted-index-*", 0); +assertTrue(StringUtils.isNotBlank(value)); +} + @Test public final void sendNoSchemaTest() throws Exception {
(pulsar) branch master updated (7e88463d9a5 -> 816755429a3)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 7e88463d9a5 [fix] Fix Reader can be stuck from transaction aborted messages. (#22610) add 816755429a3 [improve][test] Clear fields in AuthZTest classes at cleanup (#22661) No new revisions were added by this update. Summary of changes: .../org/apache/pulsar/broker/admin/AuthZTest.java | 15 ++ .../pulsar/broker/admin/NamespaceAuthZTest.java| 4 +++ .../apache/pulsar/broker/admin/TopicAuthZTest.java | 32 +- .../broker/admin/TopicPoliciesAuthZTest.java | 2 ++ .../admin/TransactionAndSchemaAuthZTest.java | 14 +++--- .../pulsar/security/MockedPulsarStandalone.java| 3 ++ 6 files changed, 41 insertions(+), 29 deletions(-)
(pulsar) branch master updated: [improve][test] Add policy authentication test for namespace API (#22593)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 1bb9378b50a [improve][test] Add policy authentication test for namespace API (#22593) 1bb9378b50a is described below commit 1bb9378b50aa891834b64cd39f55ae0e32a055bb Author: Cong Zhao AuthorDate: Sun Apr 28 10:37:37 2024 +0800 [improve][test] Add policy authentication test for namespace API (#22593) --- .../pulsar/broker/admin/NamespaceAuthZTest.java| 1248 ++-- 1 file changed, 1140 insertions(+), 108 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java index 5358295b785..ec6a122f7df 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java @@ -20,9 +20,11 @@ package org.apache.pulsar.broker.admin; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.deleteNamespaceWithRetry; +import static org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy.AutoUpdateDisabled; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import com.google.common.collect.Sets; import io.jsonwebtoken.Jwts; import java.io.File; import java.util.ArrayList; @@ -32,6 +34,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.Cleanup; import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; @@ -44,17 +48,33 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; +import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; +import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.EntryFilters; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.NamespaceOperation; +import org.apache.pulsar.common.policies.data.OffloadPolicies; +import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.PolicyName; +import org.apache.pulsar.common.policies.data.PolicyOperation; +import org.apache.pulsar.common.policies.data.PublishRate; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider; import org.apache.pulsar.packages.management.core.common.PackageMetadata; import org.apache.pulsar.security.MockedPulsarStandalone; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; @@ -72,7 +92,7 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { private AuthorizationService authorizationService; -private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); +private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); private static final String TENANT_ADMIN_TOKEN = Jwts.builder() .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact(); @@ -122,16 +142,46 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { superUserAdmin.namespaces().createNamespace("public/default"); } -private void setAuthorizationOperationChecker(String role, NamespaceOperation operation) { +private AtomicBoolean setAuthorizationOperationChecker(String role, NamespaceOperation operation) { +AtomicBoolean execFlag = new AtomicBoolean(false); Mockito.doAnswer(invocationOnMock -> { String role_ = invocat
(pulsar) branch branch-3.2 updated: [improve][admin] Check if the topic existed before the permission operations (#22547)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new adac20a72ef [improve][admin] Check if the topic existed before the permission operations (#22547) adac20a72ef is described below commit adac20a72efdc2b1d9b16464ebffb569c41014e9 Author: Jiwei Guo AuthorDate: Fri Apr 26 14:05:30 2024 +0800 [improve][admin] Check if the topic existed before the permission operations (#22547) --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 9 ++--- .../pulsar/broker/admin/AdminApiSchemaWithAuthTest.java| 1 + .../java/org/apache/pulsar/broker/admin/AdminApiTest.java | 12 .../apache/pulsar/broker/admin/PersistentTopicsTest.java | 10 -- .../org/apache/pulsar/broker/auth/AuthorizationTest.java | 14 +- .../client/api/AuthenticatedProducerConsumerTest.java | 4 +++- .../client/api/AuthorizationProducerConsumerTest.java | 2 ++ .../pulsar/websocket/proxy/ProxyAuthorizationTest.java | 8 +--- 8 files changed, 46 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index b0968f494ee..4b29452f98c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -207,6 +207,7 @@ public class PersistentTopicsBase extends AdminResource { protected CompletableFuture>> internalGetPermissionsOnTopic() { // This operation should be reading from zookeeper and it should be allowed without having admin privileges return validateAdminAccessForTenantAsync(namespaceName.getTenant()) +.thenCompose(__ -> internalCheckTopicExists(topicName)) .thenCompose(__ -> getAuthorizationService().getPermissionsAsync(topicName)); } @@ -258,9 +259,10 @@ public class PersistentTopicsBase extends AdminResource { Set actions) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessForTenantAsync(namespaceName.getTenant()) -.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 -> -grantPermissionsAsync(topicName, role, actions) -.thenAccept(unused -> asyncResponse.resume(Response.noContent().build() +.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) +.thenCompose(__ -> internalCheckTopicExists(topicName)) +.thenCompose(unused1 -> grantPermissionsAsync(topicName, role, actions)) +.thenAccept(unused -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { Throwable realCause = FutureUtil.unwrapCompletionException(ex); log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicName, realCause); @@ -272,6 +274,7 @@ public class PersistentTopicsBase extends AdminResource { protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessForTenantAsync(namespaceName.getTenant()) +.thenCompose(__ -> internalCheckTopicExists(topicName)) .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, true, false) .thenCompose(metadata -> { int numPartitions = metadata.partitions; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java index e89b4ff5e83..2dcb930fbe7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java @@ -120,6 +120,7 @@ public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest { .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString()) .authentication(AuthenticationToken.class.getName(), PRODUCE_TOKEN) .build(); +admin.topics().createNonPartitionedTopic(topicName); admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
(pulsar) branch master updated: [improve][admin] Check if the topic existed before the permission operations (#22547)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 69a600e86bb [improve][admin] Check if the topic existed before the permission operations (#22547) 69a600e86bb is described below commit 69a600e86bb5110a118d836125411e941b83764d Author: Jiwei Guo AuthorDate: Fri Apr 26 14:05:30 2024 +0800 [improve][admin] Check if the topic existed before the permission operations (#22547) --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 9 ++--- .../pulsar/broker/admin/AdminApiSchemaWithAuthTest.java | 1 + .../java/org/apache/pulsar/broker/admin/AdminApiTest.java | 12 .../apache/pulsar/broker/admin/PersistentTopicsTest.java| 10 -- .../org/apache/pulsar/broker/auth/AuthorizationTest.java| 13 - .../client/api/AuthenticatedProducerConsumerTest.java | 4 +++- .../client/api/AuthorizationProducerConsumerTest.java | 2 ++ .../pulsar/websocket/proxy/ProxyAuthorizationTest.java | 8 +--- 8 files changed, 45 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 63ea987bb07..682f41dcdb6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -205,6 +205,7 @@ public class PersistentTopicsBase extends AdminResource { protected CompletableFuture>> internalGetPermissionsOnTopic() { // This operation should be reading from zookeeper and it should be allowed without having admin privileges return validateAdminAccessForTenantAsync(namespaceName.getTenant()) +.thenCompose(__ -> internalCheckTopicExists(topicName)) .thenCompose(__ -> getAuthorizationService().getPermissionsAsync(topicName)); } @@ -256,9 +257,10 @@ public class PersistentTopicsBase extends AdminResource { Set actions) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessForTenantAsync(namespaceName.getTenant()) -.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 -> -grantPermissionsAsync(topicName, role, actions) -.thenAccept(unused -> asyncResponse.resume(Response.noContent().build() +.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) +.thenCompose(__ -> internalCheckTopicExists(topicName)) +.thenCompose(unused1 -> grantPermissionsAsync(topicName, role, actions)) +.thenAccept(unused -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { Throwable realCause = FutureUtil.unwrapCompletionException(ex); log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicName, realCause); @@ -270,6 +272,7 @@ public class PersistentTopicsBase extends AdminResource { protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessForTenantAsync(namespaceName.getTenant()) +.thenCompose(__ -> internalCheckTopicExists(topicName)) .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, true, false) .thenCompose(metadata -> { int numPartitions = metadata.partitions; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java index e89b4ff5e83..2dcb930fbe7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java @@ -120,6 +120,7 @@ public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest { .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString()) .authentication(AuthenticationToken.class.getName(), PRODUCE_TOKEN) .build(); +admin.topics().createNonPartitionedTopic(topicName); admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
(pulsar) branch master updated: [fix][admin] Fix namespace admin api exception response (#22587)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new f25776d7fe6 [fix][admin] Fix namespace admin api exception response (#22587) f25776d7fe6 is described below commit f25776d7fe6812f11b17226995d989c5a2364920 Author: Cong Zhao AuthorDate: Fri Apr 26 09:18:27 2024 +0800 [fix][admin] Fix namespace admin api exception response (#22587) --- .../pulsar/broker/admin/impl/NamespacesBase.java | 5 +- .../pulsar/broker/admin/NamespaceAuthZTest.java| 60 -- 2 files changed, 48 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index bbadc7bb331..5f2dccc3e9c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2019,7 +2019,7 @@ public abstract class NamespacesBase extends AdminResource { } protected void internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic){ -validateNamespacePolicyOperationAsync(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE); +validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE); validatePoliciesReadOnlyAccess(); if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) { throw new RestException(Status.PRECONDITION_FAILED, @@ -2125,9 +2125,10 @@ public abstract class NamespacesBase extends AdminResource { f.complete(null); }) .exceptionally(t -> { +Throwable cause = FutureUtil.unwrapCompletionException(t); log.error("[{}] Failed to update offloadThresholdInSeconds configuration for namespace {}", clientAppId(), namespaceName, t); -f.completeExceptionally(new RestException(t)); +f.completeExceptionally(new RestException(cause)); return null; }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java index d5a0468f340..5358295b785 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.admin; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.deleteNamespaceWithRetry; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -58,7 +59,6 @@ import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test(groups = "broker-admin") @@ -72,8 +72,6 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { private AuthorizationService authorizationService; -private AuthorizationService orignalAuthorizationService; - private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); private static final String TENANT_ADMIN_TOKEN = Jwts.builder() .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact(); @@ -100,6 +98,9 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN)) .build(); this.pulsarClient = super.getPulsarService().getClient(); +this.authorizationService = Mockito.spy(getPulsarService().getBrokerService().getAuthorizationService()); +FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", +authorizationService, true); } @@ -115,19 +116,9 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { close(); } -@BeforeMethod -public void before() throws IllegalAccessException { -orignalAuthorizationService = getPulsarService().getBrokerService().getAuthorizationService(); -authorizationService = Mockito.spy(orignalAuthorizationService); -FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", -authorizationService, true); -} - @AfterMethod -public
(pulsar) branch branch-3.0 updated: [improve][sec] Align some namespace level policy authorisation check (#21640)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 7fdfbf46f9b [improve][sec] Align some namespace level policy authorisation check (#21640) 7fdfbf46f9b is described below commit 7fdfbf46f9b01e55cac270782b57008ec02eb5b2 Author: Qiang Zhao AuthorDate: Mon Dec 4 22:15:19 2023 +0800 [improve][sec] Align some namespace level policy authorisation check (#21640) --- .../pulsar/broker/admin/impl/NamespacesBase.java | 30 +- .../apache/pulsar/broker/admin/v2/Namespaces.java | 3 ++- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 04ec944aab4..d3c5f681b6d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1204,7 +1204,8 @@ public abstract class NamespacesBase extends AdminResource { protected CompletableFuture internalSetPublishRateAsync(PublishRate maxPublishMessageRate) { log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate); -return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { +return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.WRITE) +.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { policies.publishMaxMessageRate.put(pulsar().getConfiguration().getClusterName(), maxPublishMessageRate); log.info("[{}] Successfully updated the publish_max_message_rate for cluster on namespace {}", clientAppId(), namespaceName); @@ -1233,7 +1234,8 @@ public abstract class NamespacesBase extends AdminResource { protected CompletableFuture internalRemovePublishRateAsync() { log.info("[{}] Remove namespace publish-rate {}/{}", clientAppId(), namespaceName, topicName); -return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { +return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.WRITE) +.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { if (policies.publishMaxMessageRate != null) { policies.publishMaxMessageRate.remove(pulsar().getConfiguration().getClusterName()); } @@ -1253,7 +1255,8 @@ public abstract class NamespacesBase extends AdminResource { @SuppressWarnings("deprecation") protected CompletableFuture internalSetTopicDispatchRateAsync(DispatchRateImpl dispatchRate) { log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate); -return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { +return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.WRITE) +.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { policies.topicDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate); policies.clusterDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate); log.info("[{}] Successfully updated the dispatchRate for cluster on namespace {}", clientAppId(), @@ -1263,7 +1266,8 @@ public abstract class NamespacesBase extends AdminResource { } protected CompletableFuture internalDeleteTopicDispatchRateAsync() { -return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { +return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.WRITE) +.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { policies.topicDispatchRate.remove(pulsar().getConfiguration().getClusterName()); policies.clusterDispatchRate.remove(pulsar().getConfiguration().getClusterName()); log.info("[{}] Successfully delete the dispatchRate for cluster on namespace {}", clientAppId(), @@ -1280,7 +1284,7 @@ public abstract class NamespacesBase extends AdminResource { } protected CompletableFuture internalSetSubscriptionDispatchRateAsync(DispatchRateImpl dispatchRate) { -return validateSuperUserAccessAsync() +return validateNamespacePolicyOperationAsync(namespa
(pulsar) branch master updated: [improve][test] Add topic policy test for topic API (#22546)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 3a0f908e80d [improve][test] Add topic policy test for topic API (#22546) 3a0f908e80d is described below commit 3a0f908e80d0863920a1258362fd782e95fe8f17 Author: Jiwei Guo AuthorDate: Mon Apr 22 19:47:03 2024 +0800 [improve][test] Add topic policy test for topic API (#22546) --- .../org/apache/pulsar/broker/admin/AuthZTest.java | 113 ++ .../apache/pulsar/broker/admin/TopicAuthZTest.java | 1121 ++-- .../admin/TransactionAndSchemaAuthZTest.java | 359 +++ 3 files changed, 1270 insertions(+), 323 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AuthZTest.java new file mode 100644 index 000..a710a03970d --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AuthZTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import io.jsonwebtoken.Jwts; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.policies.data.NamespaceOperation; +import org.apache.pulsar.common.policies.data.TopicOperation; +import org.apache.pulsar.security.MockedPulsarStandalone; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import static org.mockito.Mockito.doReturn; + +public class AuthZTest extends MockedPulsarStandalone { + +protected PulsarAdmin superUserAdmin; + +protected PulsarAdmin tenantManagerAdmin; + +protected AuthorizationService authorizationService; + +protected AuthorizationService orignalAuthorizationService; + +protected static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); +protected static final String TENANT_ADMIN_TOKEN = Jwts.builder() +.claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact(); + +@BeforeMethod(alwaysRun = true) +public void before() throws IllegalAccessException { +orignalAuthorizationService = getPulsarService().getBrokerService().getAuthorizationService(); +authorizationService = Mockito.spy(orignalAuthorizationService); +FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", +authorizationService, true); +} + +@AfterMethod(alwaysRun = true) +public void after() throws IllegalAccessException { +FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", +orignalAuthorizationService, true); +} + +protected AtomicBoolean setAuthorizationTopicOperationChecker(String role, Object operation) { +AtomicBoolean execFlag = new AtomicBoolean(false); +if (operation instanceof TopicOperation) { +Mockito.doAnswer(invocationOnMock -> { +String role_ = invocationOnMock.getArgument(2); +if (role.equals(role_)) { +TopicOperation operation_ = invocationOnMock.getArgument(1); +Assert.assertEquals(operation_, operation); +} +execFlag.set(true); +return invocationOnMock.callRealMethod(); + }).when(authorizationService).allowTopicOperationAsync(Mockito.any(), Mockito.any(), Mockito.any(), +Mockito.any(), Mockito.any()); +} else if (operation instanceof NamespaceOperation) { +doReturn(true) + .when(authorizationService).isValidOriginalPrincipal(Mockito.any(), Mockito.any(), Mockito.any()); +Mockito.doAnswer(invocationOnMock -> { +
(pulsar) branch master updated: [improve][admin] Align the auth and check it at the first place for topic related API (#22507)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 8ca01cd42ed [improve][admin] Align the auth and check it at the first place for topic related API (#22507) 8ca01cd42ed is described below commit 8ca01cd42edfd4efd986f752f6f8538ea5bf4f94 Author: Jiwei Guo AuthorDate: Wed Apr 17 18:46:22 2024 +0800 [improve][admin] Align the auth and check it at the first place for topic related API (#22507) --- .../broker/admin/impl/PersistentTopicsBase.java| 419 ++--- .../pulsar/broker/admin/v2/PersistentTopics.java | 44 ++- .../apache/pulsar/broker/admin/TopicAuthZTest.java | 257 +++-- 3 files changed, 447 insertions(+), 273 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index ab74b1e2bcc..1f8d0657190 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -128,8 +128,6 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.Policies; -import org.apache.pulsar.common.policies.data.PolicyName; -import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; @@ -2727,14 +2725,14 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture internalGetMessageIdByTimestampAsync(long timestamp, boolean authoritative) { -CompletableFuture future; -if (topicName.isGlobal()) { -future = validateGlobalNamespaceOwnershipAsync(namespaceName); -} else { -future = CompletableFuture.completedFuture(null); -} - +CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES); return future.thenCompose(__ -> { +if (topicName.isGlobal()) { +return validateGlobalNamespaceOwnershipAsync(namespaceName); +} else { +return CompletableFuture.completedFuture(null); +} +}).thenCompose(__ -> { if (topicName.isPartitioned()) { return CompletableFuture.completedFuture(null); } else { @@ -2748,7 +2746,6 @@ public class PersistentTopicsBase extends AdminResource { }); } }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) -.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES)) .thenCompose(__ -> getTopicReferenceAsync(topicName)) .thenCompose(topic -> { if (!(topic instanceof PersistentTopic)) { @@ -3158,65 +3155,56 @@ public class PersistentTopicsBase extends AdminResource { protected void internalGetBacklogSizeByMessageId(AsyncResponse asyncResponse, MessageIdImpl messageId, boolean authoritative) { -CompletableFuture ret; -// If the topic name is a partition name, no need to get partition topic metadata again -if (!topicName.isPartitioned()) { -ret = getPartitionedTopicMetadataAsync(topicName, authoritative, false) -.thenCompose(topicMetadata -> { -if (topicMetadata.partitions > 0) { -log.warn("[{}] Not supported calculate backlog size operation on partitioned-topic {}", -clientAppId(), topicName); -asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, -"calculate backlog size is not allowed for partitioned-topic")); -} -return CompletableFuture.completedFuture(null); -}); -} else { -ret = CompletableFuture.completedFuture(null); -} -CompletableFuture future; -if (topicName.isGlobal()) { -future = ret.thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(namespaceName)); -} else { -future = ret; -} -future.thenAccept(__ -> val
(pulsar) branch master updated: [improve][broker] Optimize gzip compression for /metrics endpoint by sharing/caching compressed result (#22521)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 94f6c7ccd2b [improve][broker] Optimize gzip compression for /metrics endpoint by sharing/caching compressed result (#22521) 94f6c7ccd2b is described below commit 94f6c7ccd2bf8bc261d45ab41f6c7f123359fa47 Author: Lari Hotari AuthorDate: Wed Apr 17 03:15:01 2024 -0700 [improve][broker] Optimize gzip compression for /metrics endpoint by sharing/caching compressed result (#22521) --- .../stats/prometheus/PrometheusMetricsServlet.java | 1 + .../apache/pulsar/broker/web/GzipHandlerUtil.java | 21 +++ .../pulsar/broker/web/GzipHandlerUtilTest.java | 36 + .../org/apache/pulsar/broker/PulsarService.java| 3 +- .../prometheus/PrometheusMetricsGenerator.java | 176 +++-- .../prometheus/PulsarPrometheusMetricsServlet.java | 28 +++- .../apache/pulsar/PrometheusMetricsTestUtil.java | 2 +- 7 files changed, 253 insertions(+), 14 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java index 8a41bed29d4..8685348174c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java @@ -39,6 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PrometheusMetricsServlet extends HttpServlet { +public static final String DEFAULT_METRICS_PATH = "/metrics"; private static final long serialVersionUID = 1L; static final int HTTP_STATUS_OK_200 = 200; static final int HTTP_STATUS_INTERNAL_SERVER_ERROR_500 = 500; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java index 37c9c05e5d5..9e980cecb79 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java @@ -19,8 +19,10 @@ package org.apache.pulsar.broker.web; import java.util.List; +import org.eclipse.jetty.http.pathmap.PathSpecSet; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.handler.gzip.GzipHandler; +import org.eclipse.jetty.util.IncludeExclude; public class GzipHandlerUtil { public static Handler wrapWithGzipHandler(Handler innerHandler, List gzipCompressionExcludedPaths) { @@ -45,4 +47,23 @@ public class GzipHandlerUtil { && (gzipCompressionExcludedPaths.get(0).equals("^.*") || gzipCompressionExcludedPaths.get(0).equals("^.*$")); } + +/** + * Check if GZIP compression is enabled for the given endpoint. + * @param gzipCompressionExcludedPaths list of paths that should not be compressed + * @param endpoint the endpoint to check + * @return true if GZIP compression is enabled for the endpoint, false otherwise + */ +public static boolean isGzipCompressionEnabledForEndpoint(List gzipCompressionExcludedPaths, + String endpoint) { +if (gzipCompressionExcludedPaths == null || gzipCompressionExcludedPaths.isEmpty()) { +return true; +} +if (isGzipCompressionCompletelyDisabled(gzipCompressionExcludedPaths)) { +return false; +} +IncludeExclude paths = new IncludeExclude<>(PathSpecSet.class); +paths.exclude(gzipCompressionExcludedPaths.toArray(new String[0])); +return paths.test(endpoint); +} } diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/web/GzipHandlerUtilTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/web/GzipHandlerUtilTest.java new file mode 100644 index 000..d6958695dec --- /dev/null +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/web/GzipHandlerUtilTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + *
(pulsar) branch master updated: [improve][broker] Repeat the handleMetadataChanges callback when configurationMetadataStore equals localMetadataStore (#22519)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 1dd82a0affd [improve][broker] Repeat the handleMetadataChanges callback when configurationMetadataStore equals localMetadataStore (#22519) 1dd82a0affd is described below commit 1dd82a0affd6ec3686fa85d444c354e9ce12 Author: hanmz AuthorDate: Wed Apr 17 18:14:38 2024 +0800 [improve][broker] Repeat the handleMetadataChanges callback when configurationMetadataStore equals localMetadataStore (#22519) --- .../src/main/java/org/apache/pulsar/broker/service/BrokerService.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 2687532693a..249008bad91 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -357,7 +357,9 @@ public class BrokerService implements Closeable { this.entryFilterProvider = new EntryFilterProvider(pulsar.getConfiguration()); pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges); - pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges); +if (pulsar.getConfigurationMetadataStore() != pulsar.getLocalMetadataStore()) { + pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges); +} this.inactivityMonitor = OrderedScheduler.newSchedulerBuilder() .name("pulsar-inactivity-monitor")
(pulsar) branch master updated (70b401b1de9 -> ffdfc0c4e08)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 70b401b1de9 [improve][fn] Expose `RuntimeFlags` as CLI option for Pulsar Functions and Connectors (#22514) add ffdfc0c4e08 [fix][test] SchemaMap in AutoConsumeSchema has been reused (#22500) No new revisions were added by this update. Summary of changes: .../client/api/SimpleProducerConsumerTest.java | 66 +- 1 file changed, 38 insertions(+), 28 deletions(-)
(pulsar) branch master updated: [improve][fn] Expose `RuntimeFlags` as CLI option for Pulsar Functions and Connectors (#22514)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 70b401b1de9 [improve][fn] Expose `RuntimeFlags` as CLI option for Pulsar Functions and Connectors (#22514) 70b401b1de9 is described below commit 70b401b1de9df685283140cff1f83252abc27045 Author: Rui Fu AuthorDate: Tue Apr 16 19:53:29 2024 +0800 [improve][fn] Expose `RuntimeFlags` as CLI option for Pulsar Functions and Connectors (#22514) --- .../test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java| 4 +++- .../src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java| 7 +++ .../src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java| 6 ++ .../src/main/java/org/apache/pulsar/admin/cli/CmdSources.java | 6 ++ 4 files changed, 22 insertions(+), 1 deletion(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index 4d906af9424..d3087b7fc87 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -165,7 +165,8 @@ public class CmdFunctionsTest { "--className", DummyFunction.class.getName(), "--dead-letter-topic", "test-dead-letter-topic", "--custom-runtime-options", "custom-runtime-options", -"--user-config", "{\"key\": [\"value1\", \"value2\"]}" +"--user-config", "{\"key\": [\"value1\", \"value2\"]}", +"--runtime-flags", "--add-opens java.base/java.lang=ALL-UNNAMED" }); CreateFunction creater = cmd.getCreater(); @@ -175,6 +176,7 @@ public class CmdFunctionsTest { assertEquals(Boolean.FALSE, creater.getAutoAck()); assertEquals("test-dead-letter-topic", creater.getDeadLetterTopic()); assertEquals("custom-runtime-options", creater.getCustomRuntimeOptions()); +assertEquals("--add-opens java.base/java.lang=ALL-UNNAMED", creater.getRuntimeFlags()); verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString()); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 15b8fca0761..5e80c168d92 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -377,6 +377,9 @@ public class CmdFunctions extends CmdBase { @Option(names = "--dead-letter-topic", description = "The topic where messages that are not processed successfully are sent to #Java") protected String deadLetterTopic; +@Option(names = "--runtime-flags", description = "Any flags that you want to pass to a runtime" ++ " (for process & Kubernetes runtime only).") +protected String runtimeFlags; protected FunctionConfig functionConfig; protected String userCodeFile; @@ -676,6 +679,10 @@ public class CmdFunctions extends CmdBase { userCodeFile = functionConfig.getGo(); } +if (null != runtimeFlags) { +functionConfig.setRuntimeFlags(runtimeFlags); +} + // check if configs are valid validateFunctionConfigs(functionConfig); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index f3172a49b01..be1cd0af960 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -404,6 +404,9 @@ public class CmdSinks extends CmdBase { protected String transformFunctionConfig; @Option(names = "--log-topic", description = "The topic to which the logs of a Pulsar Sink are produced") protected String logTopic; +@Option(names = "--runtime-flags", description = "Any flags that you want to pass to a runtime" ++ " (for process & Kubernetes runtime only).") +protected String runtimeFlags; protected SinkConfig sinkConfig; @@ -602,6 +605,9 @@ public class CmdSinks extends CmdBase {
(pulsar) branch master updated: [improve][test] Add topic operation checker for topic API (#22468)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 9d72e6bd847 [improve][test] Add topic operation checker for topic API (#22468) 9d72e6bd847 is described below commit 9d72e6bd847df85a7d18f1827274df96a446798f Author: Cong Zhao AuthorDate: Mon Apr 15 16:15:59 2024 +0800 [improve][test] Add topic operation checker for topic API (#22468) --- .../apache/pulsar/broker/admin/TopicAuthZTest.java | 156 ++--- 1 file changed, 135 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java index d09bc0a3ffd..e6ff0ce2bb4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java @@ -20,9 +20,18 @@ package org.apache.pulsar.broker.admin; import io.jsonwebtoken.Jwts; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; import lombok.SneakyThrows; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; @@ -37,22 +46,21 @@ import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.security.MockedPulsarStandalone; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; @Test(groups = "broker-admin") public class TopicAuthZTest extends MockedPulsarStandalone { @@ -61,13 +69,17 @@ public class TopicAuthZTest extends MockedPulsarStandalone { private PulsarAdmin tenantManagerAdmin; +private AuthorizationService authorizationService; + +private AuthorizationService orignalAuthorizationService; + private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); private static final String TENANT_ADMIN_TOKEN = Jwts.builder() .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact(); @SneakyThrows @BeforeClass(alwaysRun = true) -public void before() { +public void setup() { configureTokenAuthentication(); configureDefaultAuthorization(); enableTransaction(); @@ -99,7 +111,7 @@ public class TopicAuthZTest extends MockedPulsarStandalone { @SneakyThrows @AfterClass(alwaysRun = true) -public void after() { +public void cleanup() { if (superUserAdmin != null) { superUserAdmin.close(); } @@ -109,6 +121,51 @@ public class TopicAuthZTest extends MockedPulsarStandalone { close(); } +@BeforeMethod +public void before() throws IllegalAccessException { +orignalAuthorizationService = getPulsarService().getBrokerService().getAuthorizationService(); +authorizationService = Mockito.spy(orignalAuthorizationService); +FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", +authorizationService, true); +} + +@AfterMethod +public void after() throws IllegalAccessException { +FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", +orignalAuthorizationService, true); +} + +private AtomicBoolean setAuthorizationTopicOperationChecker(String role, Object operation) { +AtomicBoolean execFlag = new AtomicBoolean(false); +
(pulsar) branch master updated: [fix][test] Flaky-test: testMessageExpiryWithTimestampNonRecoverableException and testIncorrectClientClock (#22489)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new d9a43dd2160 [fix][test] Flaky-test: testMessageExpiryWithTimestampNonRecoverableException and testIncorrectClientClock (#22489) d9a43dd2160 is described below commit d9a43dd21605930e16bb038095e36fceff3a4a40 Author: Baodi Shi AuthorDate: Mon Apr 15 13:55:34 2024 +0800 [fix][test] Flaky-test: testMessageExpiryWithTimestampNonRecoverableException and testIncorrectClientClock (#22489) --- .../service/PersistentMessageFinderTest.java | 42 +++--- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 6965ac28068..0972c9098b5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.service; -import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; @@ -383,7 +382,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { * * @throws Exception */ -@Test(groups = "flaky") +@Test void testMessageExpiryWithTimestampNonRecoverableException() throws Exception { final String ledgerAndCursorName = "testPersistentMessageExpiryWithNonRecoverableLedgers"; @@ -402,11 +401,15 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { for (int i = 0; i < totalEntries; i++) { ledger.addEntry(createMessageWrittenToLedger("msg" + i)); } +Awaitility.await().untilAsserted(() -> +assertEquals(ledger.getState(), ManagedLedgerImpl.State.LedgerOpened)); List ledgers = ledger.getLedgersInfoAsList(); LedgerInfo lastLedgerInfo = ledgers.get(ledgers.size() - 1); - -assertEquals(ledgers.size(), totalEntries / entriesPerLedger); +// The `lastLedgerInfo` should be newly opened, and it does not contain any entries. +// Please refer to: https://github.com/apache/pulsar/pull/22034 +assertEquals(lastLedgerInfo.getEntries(), 0); +assertEquals(ledgers.size(), totalEntries / entriesPerLedger + 1); // this will make sure that all entries should be deleted Thread.sleep(TimeUnit.SECONDS.toMillis(ttlSeconds)); @@ -420,19 +423,13 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST); PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); -Position previousMarkDelete = null; -for (int i = 0; i < totalEntries; i++) { -monitor.expireMessages(1); -Position previousPos = previousMarkDelete; -retryStrategically( -(test) -> c1.getMarkDeletedPosition() != null && !c1.getMarkDeletedPosition().equals(previousPos), -5, 100); -previousMarkDelete = c1.getMarkDeletedPosition(); -} - -PositionImpl markDeletePosition = (PositionImpl) c1.getMarkDeletedPosition(); -assertEquals(lastLedgerInfo.getLedgerId(), markDeletePosition.getLedgerId()); -assertEquals(lastLedgerInfo.getEntries() - 1, markDeletePosition.getEntryId()); +assertTrue(monitor.expireMessages(ttlSeconds)); +Awaitility.await().untilAsserted(() -> { +PositionImpl markDeletePosition = (PositionImpl) c1.getMarkDeletedPosition(); +// The markDeletePosition points to the last entry of the previous ledger in lastLedgerInfo. +assertEquals(markDeletePosition.getLedgerId(), lastLedgerInfo.getLedgerId() - 1); +assertEquals(markDeletePosition.getEntryId(), entriesPerLedger - 1); +}); c1.close(); ledger.close(); @@ -440,20 +437,25 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { } -@Test(groups = "flaky") +@Test public void testIncorrectClientClock() throws Exception { final String ledgerAndCursorName = "testIncorrectClientClock"; int maxTTLSeconds = 1; +int entriesNum = 10; ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setMaxEntriesPerLedger(
svn commit: r68514 - /release/pulsar/pulsar-client-cpp-3.4.0/
Author: technoboy Date: Mon Apr 15 01:28:45 2024 New Revision: 68514 Log: cleanup Removed: release/pulsar/pulsar-client-cpp-3.4.0/
svn commit: r68517 - /release/pulsar/pulsar-client-cpp-3.1.1/
Author: technoboy Date: Mon Apr 15 01:29:42 2024 New Revision: 68517 Log: cleanup Removed: release/pulsar/pulsar-client-cpp-3.1.1/
svn commit: r68516 - /release/pulsar/pulsar-client-cpp-3.1.0/
Author: technoboy Date: Mon Apr 15 01:29:27 2024 New Revision: 68516 Log: cleanup Removed: release/pulsar/pulsar-client-cpp-3.1.0/
svn commit: r68515 - /release/pulsar/pulsar-client-cpp-3.4.1/
Author: technoboy Date: Mon Apr 15 01:28:58 2024 New Revision: 68515 Log: cleanup Removed: release/pulsar/pulsar-client-cpp-3.4.1/
svn commit: r68513 - /release/pulsar/pulsar-client-go-0.11.0/
Author: technoboy Date: Mon Apr 15 01:28:14 2024 New Revision: 68513 Log: cleanup Removed: release/pulsar/pulsar-client-go-0.11.0/
svn commit: r68512 - /release/pulsar/pulsar-dotpulsar-3.1.1/
Author: technoboy Date: Mon Apr 15 01:27:45 2024 New Revision: 68512 Log: cleanup Removed: release/pulsar/pulsar-dotpulsar-3.1.1/
svn commit: r68511 - /release/pulsar/pulsar-dotpulsar-3.1.0/
Author: technoboy Date: Mon Apr 15 01:27:29 2024 New Revision: 68511 Log: cleanup Removed: release/pulsar/pulsar-dotpulsar-3.1.0/
svn commit: r68510 - /release/pulsar/pulsar-client-go-0.12.0/
Author: technoboy Date: Mon Apr 15 01:26:17 2024 New Revision: 68510 Log: cleanup Removed: release/pulsar/pulsar-client-go-0.12.0/
svn commit: r68509 - /release/pulsar/pulsar-client-reactive-0.5.2/
Author: technoboy Date: Mon Apr 15 01:25:44 2024 New Revision: 68509 Log: cleanup Removed: release/pulsar/pulsar-client-reactive-0.5.2/
svn commit: r68508 - /release/pulsar/pulsar-client-reactive-0.5.1/
Author: technoboy Date: Mon Apr 15 01:25:31 2024 New Revision: 68508 Log: cleanup Removed: release/pulsar/pulsar-client-reactive-0.5.1/
svn commit: r68507 - /release/pulsar/pulsar-client-reactive-0.5.0/
Author: technoboy Date: Mon Apr 15 01:25:17 2024 New Revision: 68507 Log: cleanup Removed: release/pulsar/pulsar-client-reactive-0.5.0/
svn commit: r68506 - /release/pulsar/pulsar-3.2.1/
Author: technoboy Date: Mon Apr 15 01:24:19 2024 New Revision: 68506 Log: cleanup Removed: release/pulsar/pulsar-3.2.1/
svn commit: r68505 - /release/pulsar/pulsar-3.2.0/
Author: technoboy Date: Mon Apr 15 01:23:56 2024 New Revision: 68505 Log: cleanup Removed: release/pulsar/pulsar-3.2.0/
svn commit: r68502 - /release/pulsar/pulsar-3.1.0/
Author: technoboy Date: Mon Apr 15 01:22:31 2024 New Revision: 68502 Log: cleanup Removed: release/pulsar/pulsar-3.1.0/
svn commit: r68504 - /release/pulsar/pulsar-3.1.2/
Author: technoboy Date: Mon Apr 15 01:23:19 2024 New Revision: 68504 Log: cleanup Removed: release/pulsar/pulsar-3.1.2/
svn commit: r68503 - /release/pulsar/pulsar-3.1.1/
Author: technoboy Date: Mon Apr 15 01:22:58 2024 New Revision: 68503 Log: cleanup Removed: release/pulsar/pulsar-3.1.1/
svn commit: r68501 - /release/pulsar/pulsar-3.0.3/
Author: technoboy Date: Mon Apr 15 01:22:04 2024 New Revision: 68501 Log: cleanup Removed: release/pulsar/pulsar-3.0.3/
svn commit: r68500 - /release/pulsar/pulsar-3.0.2/
Author: technoboy Date: Mon Apr 15 01:21:51 2024 New Revision: 68500 Log: cleanup Removed: release/pulsar/pulsar-3.0.2/
svn commit: r68499 - /release/pulsar/pulsar-3.0.1/
Author: technoboy Date: Mon Apr 15 01:21:23 2024 New Revision: 68499 Log: cleanup Removed: release/pulsar/pulsar-3.0.1/
svn commit: r68498 - /release/pulsar/pulsar-3.0.0/
Author: technoboy Date: Mon Apr 15 01:21:08 2024 New Revision: 68498 Log: cleanup Removed: release/pulsar/pulsar-3.0.0/
svn commit: r68496 - /release/pulsar/pulsar-2.11.3/
Author: technoboy Date: Mon Apr 15 01:20:06 2024 New Revision: 68496 Log: cleanup Removed: release/pulsar/pulsar-2.11.3/
svn commit: r68497 - /release/pulsar/pulsar-2.9.4/
Author: technoboy Date: Mon Apr 15 01:20:37 2024 New Revision: 68497 Log: cleanup Removed: release/pulsar/pulsar-2.9.4/
svn commit: r68493 - /release/pulsar/pulsar-2.11.0/
Author: technoboy Date: Mon Apr 15 01:19:17 2024 New Revision: 68493 Log: cleanup Removed: release/pulsar/pulsar-2.11.0/
svn commit: r68494 - /release/pulsar/pulsar-2.11.1/
Author: technoboy Date: Mon Apr 15 01:19:42 2024 New Revision: 68494 Log: cleanup Removed: release/pulsar/pulsar-2.11.1/
svn commit: r68495 - /release/pulsar/pulsar-2.11.2/
Author: technoboy Date: Mon Apr 15 01:19:53 2024 New Revision: 68495 Log: cleanup Removed: release/pulsar/pulsar-2.11.2/
svn commit: r68490 - /release/pulsar/pulsar-2.10.3/
Author: technoboy Date: Mon Apr 15 01:18:19 2024 New Revision: 68490 Log: cleanup Removed: release/pulsar/pulsar-2.10.3/
svn commit: r68492 - /release/pulsar/pulsar-2.10.5/
Author: technoboy Date: Mon Apr 15 01:18:56 2024 New Revision: 68492 Log: cleanup Removed: release/pulsar/pulsar-2.10.5/
svn commit: r68491 - /release/pulsar/pulsar-2.10.4/
Author: technoboy Date: Mon Apr 15 01:18:39 2024 New Revision: 68491 Log: cleanup Removed: release/pulsar/pulsar-2.10.4/
svn commit: r68489 - /release/pulsar/pulsar-2.10.2/
Author: technoboy Date: Mon Apr 15 01:17:31 2024 New Revision: 68489 Log: cleanup Removed: release/pulsar/pulsar-2.10.2/
(pulsar) branch master updated: [fix][broker] Fix message drop record in producer stat (#22458)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new cea1a9ba9b5 [fix][broker] Fix message drop record in producer stat (#22458) cea1a9ba9b5 is described below commit cea1a9ba9b576bf43f0a45ff8d65369b0f2bbb36 Author: zhangqian <503837...@qq.com> AuthorDate: Wed Apr 10 16:51:26 2024 +0800 [fix][broker] Fix message drop record in producer stat (#22458) Co-authored-by: ceceezhang --- .../src/main/java/org/apache/pulsar/broker/service/Producer.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 7e4459505a5..9cfde67802b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -749,7 +749,7 @@ public class Producer { } if (this.isNonPersistentTopic) { msgDrop.calculateRate(); -((NonPersistentPublisherStatsImpl) stats).msgDropRate = msgDrop.getRate(); +((NonPersistentPublisherStatsImpl) stats).msgDropRate = msgDrop.getValueRate(); } }
(pulsar) branch master updated: [improve][admin] Add authorization test for schema and align auth for transaction (#22399)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 904ee2c [improve][admin] Add authorization test for schema and align auth for transaction (#22399) 904ee2c is described below commit 904ee2c7adf9febddc585a699a1fdb724013 Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com> AuthorDate: Tue Apr 9 22:43:35 2024 +0800 [improve][admin] Add authorization test for schema and align auth for transaction (#22399) --- .../apache/pulsar/broker/admin/TopicAuthZTest.java | 249 + .../pulsar/security/MockedPulsarStandalone.java| 4 +- 2 files changed, 252 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java index 2e75b59ec85..d09bc0a3ffd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java @@ -20,19 +20,27 @@ package org.apache.pulsar.broker.admin; import io.jsonwebtoken.Jwts; +import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.SneakyThrows; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.security.MockedPulsarStandalone; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -62,7 +70,9 @@ public class TopicAuthZTest extends MockedPulsarStandalone { public void before() { configureTokenAuthentication(); configureDefaultAuthorization(); +enableTransaction(); start(); +createTransactionCoordinatorAssign(16); this.superUserAdmin =PulsarAdmin.builder() .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(SUPER_USER_TOKEN)) @@ -74,8 +84,18 @@ public class TopicAuthZTest extends MockedPulsarStandalone { .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN)) .build(); + +superUserAdmin.tenants().createTenant("pulsar", tenantInfo); +superUserAdmin.namespaces().createNamespace("pulsar/system"); } +protected void createTransactionCoordinatorAssign(int numPartitionsOfTC) throws MetadataStoreException { +getPulsarService().getPulsarResources() +.getNamespaceResources() +.getPartitionedTopicResources() + .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, +new PartitionedTopicMetadata(numPartitionsOfTC)); +} @SneakyThrows @AfterClass(alwaysRun = true) @@ -1086,6 +1106,235 @@ public class TopicAuthZTest extends MockedPulsarStandalone { deleteTopic(topic, false); } +public enum OperationAuthType { +Lookup, +Produce, +Consume, +AdminOrSuperUser, +NOAuth +} + +private final String testTopic = "persistent://public/default/" + UUID.randomUUID().toString(); +@FunctionalInterface +public interface ThrowingBiConsumer { +void accept(T t) throws PulsarAdminException; +} + +@DataProvider(name = "authFunction") +public Object[][] authFunction () throws Exception { +String sub = "my-sub"; +createTopic(testTopic, false); +@Cleanup final PulsarClient pulsarClient = PulsarClient.builder() +.serviceUrl(getPulsarService().getBrokerServiceUrl()) +.authentication(new AuthenticationToken(SUPER_USER_TOKEN)) +.enableTransaction(true) +.build(); +@Cleanup final Producer producer =
(pulsar) branch master updated: [improve][test] Add operation authentication test for namespace API (#22398)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 6de711d4008 [improve][test] Add operation authentication test for namespace API (#22398) 6de711d4008 is described below commit 6de711d4008338a875c5c145e856c90dcb041f8f Author: Cong Zhao AuthorDate: Tue Apr 9 16:38:18 2024 +0800 [improve][test] Add operation authentication test for namespace API (#22398) --- .../pulsar/broker/admin/NamespaceAuthZTest.java| 882 - 1 file changed, 875 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java index ce0b925614c..d5a0468f340 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java @@ -19,23 +19,47 @@ package org.apache.pulsar.broker.admin; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; import io.jsonwebtoken.Jwts; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import lombok.Cleanup; import lombok.SneakyThrows; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; +import org.apache.pulsar.common.policies.data.BundlesData; +import org.apache.pulsar.common.policies.data.NamespaceOperation; +import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider; +import org.apache.pulsar.packages.management.core.common.PackageMetadata; import org.apache.pulsar.security.MockedPulsarStandalone; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.UUID; @Test(groups = "broker-admin") public class NamespaceAuthZTest extends MockedPulsarStandalone { @@ -44,17 +68,27 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { private PulsarAdmin tenantManagerAdmin; +private PulsarClient pulsarClient; + +private AuthorizationService authorizationService; + +private AuthorizationService orignalAuthorizationService; + private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); private static final String TENANT_ADMIN_TOKEN = Jwts.builder() .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact(); @SneakyThrows @BeforeClass -public void before() { +public void setup() { +getServiceConfiguration().setEnablePackagesManagement(true); + getServiceConfiguration().setPackagesManagementStorageProvider(MockedPackagesStorageProvider.class.getName()); +getServiceConfiguration().setDefaultNumberOfNamespaceBundles(1); +getServiceConfiguration().setForceDeleteNamespaceAllowed(true); configureTokenAuthentication(); configureDefaultAuthorization(); start(); -this.superUserAdmin =PulsarAdmin.builder() +this.superUserAdmin = PulsarAdmin.builder() .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(SUPER_USER_TOKEN)) .build(); @@ -65,12 +99,13 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN)) .build(); +this.pulsarClient = super.getPulsarService().getClient(); } @SneakyThrows @AfterCl
(pulsar) branch branch-3.0 updated: [fix][ml] No rollover inactive ledgers when metadata service invalid (#22284)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new b9bf0a83b5c [fix][ml] No rollover inactive ledgers when metadata service invalid (#22284) b9bf0a83b5c is described below commit b9bf0a83b5ccf89e036cf77a1c4cb764acf59bba Author: houxiaoyu AuthorDate: Sat Mar 30 21:38:55 2024 +0800 [fix][ml] No rollover inactive ledgers when metadata service invalid (#22284) ### Motivation We should not rollover inactive ledgers when metadata service is invailable. ### Modifications Checking metadata service is vailable when schedule `checkInactiveLedgerAndRollOver` --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 7 --- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 24 ++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index da9b1fde27a..8a7b376a7f5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -4464,9 +4464,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @Override public boolean checkInactiveLedgerAndRollOver() { -long currentTimeMs = System.currentTimeMillis(); -if (currentLedgerEntries > 0 && inactiveLedgerRollOverTimeMs > 0 && currentTimeMs > (lastAddEntryTimeMs -+ inactiveLedgerRollOverTimeMs)) { +if (factory.isMetadataServiceAvailable() +&& currentLedgerEntries > 0 +&& inactiveLedgerRollOverTimeMs > 0 +&& System.currentTimeMillis() > (lastAddEntryTimeMs + inactiveLedgerRollOverTimeMs)) { log.info("[{}] Closing inactive ledger, last-add entry {}", name, lastAddEntryTimeMs); if (STATE_UPDATER.compareAndSet(this, State.LedgerOpened, State.ClosingLedger)) { LedgerHandle currentLedger = this.currentLedger; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index a08b51cf29f..f918ffdc755 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3922,6 +3922,30 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { ledger.close(); } +@Test +public void testDontRollOverInactiveLedgersWhenMetadataServiceInvalid() throws Exception { +int inactiveLedgerRollOverTimeMs = 5; +@Cleanup("shutdown") +ManagedLedgerFactoryImpl factory = spy(new ManagedLedgerFactoryImpl(metadataStore, bkc)); +// mock metadata service invalid +when(factory.isMetadataServiceAvailable()).thenReturn(false); +ManagedLedgerConfig config = new ManagedLedgerConfig(); +config.setInactiveLedgerRollOverTime(inactiveLedgerRollOverTimeMs, TimeUnit.MILLISECONDS); +ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("rollover_inactive", config); + +long ledgerId = ledger.currentLedger.getId(); + +Thread.sleep(inactiveLedgerRollOverTimeMs * 5); +ledger.checkInactiveLedgerAndRollOver(); + +Thread.sleep(inactiveLedgerRollOverTimeMs * 5); +ledger.checkInactiveLedgerAndRollOver(); + +assertEquals(ledger.currentLedger.getId(), ledgerId); + +ledger.close(); +} + @Test public void testOffloadTaskCancelled() throws Exception { ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
(pulsar) branch branch-3.0 updated: [improve][io]: Add validation for JDBC sink not supporting primitive schema (#22376)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new a503efc826e [improve][io]: Add validation for JDBC sink not supporting primitive schema (#22376) a503efc826e is described below commit a503efc826e60b8e26f9792aeb45223374b8f4ca Author: Baodi Shi AuthorDate: Fri Mar 29 08:33:27 2024 +0800 [improve][io]: Add validation for JDBC sink not supporting primitive schema (#22376) --- pulsar-io/jdbc/core/pom.xml| 7 + .../pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java | 5 .../pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java | 25 + .../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java | 31 +++--- 4 files changed, 59 insertions(+), 9 deletions(-) diff --git a/pulsar-io/jdbc/core/pom.xml b/pulsar-io/jdbc/core/pom.xml index 3f44a062fb9..0322f6e11f1 100644 --- a/pulsar-io/jdbc/core/pom.xml +++ b/pulsar-io/jdbc/core/pom.xml @@ -71,6 +71,13 @@ provided + + ${project.groupId} + pulsar-client-original + ${project.version} + test + + \ No newline at end of file diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java index 36c36740919..3655688c0f3 100644 --- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java +++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java @@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.schema.GenericObject; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.KeyValueSchema; import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId; @@ -137,6 +138,10 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink data.get(k); } else { +SchemaType schemaType = message.getSchema().getSchemaInfo().getType(); +if (schemaType.isPrimitive()) { +throw new UnsupportedOperationException("Primitive schema is not supported: " + schemaType); +} recordValueGetter = (key) -> ((GenericRecord) record).getField(key); } String action = message.getProperties().get(ACTION_PROPERTY); diff --git a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java index b15eb832242..c088dd3c42c 100644 --- a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java +++ b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java @@ -22,6 +22,10 @@ import java.util.function.Function; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.util.Utf8; +import org.apache.pulsar.client.api.schema.GenericObject; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; +import org.apache.pulsar.functions.api.Record; import org.testng.Assert; import org.testng.annotations.Test; @@ -143,5 +147,26 @@ public class BaseJdbcAutoSchemaSinkTest { return consumer.apply(record).endRecord().getFields().get(0).schema(); } +@Test(expectedExceptions = UnsupportedOperationException.class, +expectedExceptionsMessageRegExp = "Primitive schema is not supported.*") +@SuppressWarnings("unchecked") +public void testNotSupportPrimitiveSchema() { +BaseJdbcAutoSchemaSink baseJdbcAutoSchemaSink = new BaseJdbcAutoSchemaSink() {}; +AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema(); + autoConsumeSchema.setSchema(org.apache.pulsar.client.api.Schema.STRING); +Record record = new Record() { +@Override +public org.apache.pulsar.client.api.Schema getSchema() { +return autoConsumeSchema; +} + +@Override +public GenericRecord getValue() { +return null; +} +}; +baseJdbcAutoSchemaSink.createMutation((Record) record); +} + } \ No newline at end of file diff --git a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java index d9ed4cbd442..ca01615bef1 100644 --- a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java +++ b/pulsar-io/jdb
(pulsar) branch branch-3.0 updated: [admin][broker] Fix force delete subscription not working (#22423)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 110add00c1c [admin][broker] Fix force delete subscription not working (#22423) 110add00c1c is described below commit 110add00c1cdc59542e7ef906cd5f4409e63dc04 Author: 道君 AuthorDate: Thu Apr 4 23:08:45 2024 +0800 [admin][broker] Fix force delete subscription not working (#22423) --- .../broker/admin/impl/PersistentTopicsBase.java| 5 ++-- .../pulsar/broker/admin/PersistentTopicsTest.java | 30 ++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 00c2eed2763..602cd47e595 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1656,7 +1656,7 @@ public class PersistentTopicsBase extends AdminResource { for (int i = 0; i < partitionMetadata.partitions; i++) { TopicName topicNamePartition = topicName.getPartition(i); futures.add(adminClient.topics() - .deleteSubscriptionAsync(topicNamePartition.toString(), subName, false)); + .deleteSubscriptionAsync(topicNamePartition.toString(), subName, force)); } return FutureUtil.waitForAll(futures).handle((result, exception) -> { @@ -1675,8 +1675,7 @@ public class PersistentTopicsBase extends AdminResource { return null; }); } -return internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative, -force); +return internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative, force); }); } }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 59c3dbf6ff3..d7ffa656bdb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -77,11 +77,13 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.client.admin.internal.TopicsImpl; import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -1696,6 +1698,34 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { assertThrows(PulsarAdminException.NotFoundException.class, () -> admin.topics().createMissedPartitions(topicName)); } +@Test +public void testForceDeleteSubscription() throws Exception { +try { +pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false); +String topicName = "persistent://" + testTenant + "/" + testNamespaceLocal + "/testForceDeleteSubscription"; +String subName = "sub1"; +admin.topics().createNonPartitionedTopic(topicName); +admin.topics().createSubscription(topicName, subName, MessageId.latest); + +@Cleanup +Consumer c0 = pulsarClient.newConsumer(Schema.STRING) +.topic(topicName) +.subscriptionName(subName) +.subscriptionType(SubscriptionType.Shared) +.subscribe(); +@Cleanup +Consumer c1 = pulsarClient.newConsumer(Schema.STRING) +.topic(topicName) +.subscriptionName(subName) +.subscriptionType(SubscriptionType.Shared) +.subscribe(); + +admin.topics().deleteSubscription(topicName, subName, true); +} finally { +pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true)
(pulsar) branch branch-3.0 updated: [fix][broker] Fix consumer stops receiving messages when with large backlogs processing (#22454)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 40329ee3615 [fix][broker] Fix consumer stops receiving messages when with large backlogs processing (#22454) 40329ee3615 is described below commit 40329ee3615ebeff6eaa0dca2d454d6389fa6f43 Author: Jiwei Guo AuthorDate: Mon Apr 8 18:22:05 2024 +0800 [fix][broker] Fix consumer stops receiving messages when with large backlogs processing (#22454) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 5 ++ .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 9 +--- .../service/persistent/PersistentSubscription.java | 4 +- .../service/persistent/PersistentTopicTest.java| 56 +- 4 files changed, 65 insertions(+), 9 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 995f980953b..a8e2a6931af 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -985,6 +985,11 @@ public class ManagedCursorImpl implements ManagedCursor { log.debug("[{}] [{}] Re-trying the read at position {}", ledger.getName(), name, op.readPosition); } +if (isClosed()) { +callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx); +return; +} + if (!hasMoreEntries()) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Still no entries available. Register for notification", ledger.getName(), diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 73dfc86e1ad..da9b1fde27a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1033,6 +1033,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { + consumerName), ctx); return; } else if (!cursor.isDurable()) { +cursor.setState(ManagedCursorImpl.State.Closed); cursors.removeCursor(consumerName); deactivateCursorByName(consumerName); callback.deleteCursorComplete(ctx); @@ -3805,13 +3806,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } public void addWaitingCursor(ManagedCursorImpl cursor) { -if (cursor instanceof NonDurableCursorImpl) { -if (cursor.isActive()) { -this.waitingCursors.add(cursor); -} -} else { -this.waitingCursors.add(cursor); -} +this.waitingCursors.add(cursor); } public boolean isCursorActive(ManagedCursor cursor) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 949dc398b32..f745f95b3dc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -320,7 +320,6 @@ public class PersistentSubscription extends AbstractSubscription implements Subs if (dispatcher != null && dispatcher.getConsumers().isEmpty()) { deactivateCursor(); -topic.getManagedLedger().removeWaitingCursor(cursor); if (!cursor.isDurable()) { // If cursor is not durable, we need to clean up the subscription as well @@ -349,11 +348,14 @@ public class PersistentSubscription extends AbstractSubscription implements Subs if (!isResetCursor) { try { topic.getManagedLedger().deleteCursor(cursor.getName()); + topic.getManagedLedger().removeWaitingCursor(cursor); } catch (InterruptedException | ManagedLedgerException e) { log.warn("[{}] [{}] Failed to remove non durable cursor", topic.getName(), subName, e); } } }); +} else { +topic.getManagedLedger().removeWaitingCursor(cursor); } } diff --git a/puls
(pulsar) branch branch-3.2 updated: [admin][broker] Fix force delete subscription not working (#22423)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new ed58028e0bd [admin][broker] Fix force delete subscription not working (#22423) ed58028e0bd is described below commit ed58028e0bd0939b616d29ea31558fedcbc563d7 Author: 道君 AuthorDate: Thu Apr 4 23:08:45 2024 +0800 [admin][broker] Fix force delete subscription not working (#22423) --- .../broker/admin/impl/PersistentTopicsBase.java| 5 ++-- .../pulsar/broker/admin/PersistentTopicsTest.java | 30 ++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 3ef8773..318e2bc2cde 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1566,7 +1566,7 @@ public class PersistentTopicsBase extends AdminResource { for (int i = 0; i < partitionMetadata.partitions; i++) { TopicName topicNamePartition = topicName.getPartition(i); futures.add(adminClient.topics() - .deleteSubscriptionAsync(topicNamePartition.toString(), subName, false)); + .deleteSubscriptionAsync(topicNamePartition.toString(), subName, force)); } return FutureUtil.waitForAll(futures).handle((result, exception) -> { @@ -1585,8 +1585,7 @@ public class PersistentTopicsBase extends AdminResource { return null; }); } -return internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative, -force); +return internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative, force); }); } }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 6641a5805c0..8e1375303ce 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -79,11 +79,13 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.client.admin.internal.TopicsImpl; import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -1793,6 +1795,34 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { assertThrows(PulsarAdminException.NotFoundException.class, () -> admin.topics().createMissedPartitions(topicName)); } +@Test +public void testForceDeleteSubscription() throws Exception { +try { +pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false); +String topicName = "persistent://" + testTenant + "/" + testNamespaceLocal + "/testForceDeleteSubscription"; +String subName = "sub1"; +admin.topics().createNonPartitionedTopic(topicName); +admin.topics().createSubscription(topicName, subName, MessageId.latest); + +@Cleanup +Consumer c0 = pulsarClient.newConsumer(Schema.STRING) +.topic(topicName) +.subscriptionName(subName) +.subscriptionType(SubscriptionType.Shared) +.subscribe(); +@Cleanup +Consumer c1 = pulsarClient.newConsumer(Schema.STRING) +.topic(topicName) +.subscriptionName(subName) +.subscriptionType(SubscriptionType.Shared) +.subscribe(); + +admin.topics().deleteSubscription(topicName, subName, true); +} finally { +pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true)
(pulsar) 01/02: [fix][broker] Fix consumer stops receiving messages when with large backlogs processing (#22454)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 7259fd0b26a246340262242a792dd57b35af2970 Author: Jiwei Guo AuthorDate: Mon Apr 8 18:22:05 2024 +0800 [fix][broker] Fix consumer stops receiving messages when with large backlogs processing (#22454) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 5 ++ .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 9 +--- .../service/persistent/PersistentSubscription.java | 4 +- .../service/persistent/PersistentTopicTest.java| 56 +- 4 files changed, 65 insertions(+), 9 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index dbdde7cf707..9bbcda327f0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -985,6 +985,11 @@ public class ManagedCursorImpl implements ManagedCursor { log.debug("[{}] [{}] Re-trying the read at position {}", ledger.getName(), name, op.readPosition); } +if (isClosed()) { +callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx); +return; +} + if (!hasMoreEntries()) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Still no entries available. Register for notification", ledger.getName(), diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 3d66bc8d6c0..426ac8df218 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1032,6 +1032,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { + consumerName), ctx); return; } else if (!cursor.isDurable()) { +cursor.setState(ManagedCursorImpl.State.Closed); cursors.removeCursor(consumerName); deactivateCursorByName(consumerName); callback.deleteCursorComplete(ctx); @@ -3804,13 +3805,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } public void addWaitingCursor(ManagedCursorImpl cursor) { -if (cursor instanceof NonDurableCursorImpl) { -if (cursor.isActive()) { -this.waitingCursors.add(cursor); -} -} else { -this.waitingCursors.add(cursor); -} +this.waitingCursors.add(cursor); } public boolean isCursorActive(ManagedCursor cursor) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 3bedf497f87..fdb977aa366 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -308,7 +308,6 @@ public class PersistentSubscription extends AbstractSubscription implements Subs if (dispatcher != null && dispatcher.getConsumers().isEmpty()) { deactivateCursor(); -topic.getManagedLedger().removeWaitingCursor(cursor); if (!cursor.isDurable()) { // If cursor is not durable, we need to clean up the subscription as well. No need to check for active @@ -338,11 +337,14 @@ public class PersistentSubscription extends AbstractSubscription implements Subs if (!isResetCursor) { try { topic.getManagedLedger().deleteCursor(cursor.getName()); + topic.getManagedLedger().removeWaitingCursor(cursor); } catch (InterruptedException | ManagedLedgerException e) { log.warn("[{}] [{}] Failed to remove non durable cursor", topic.getName(), subName, e); } } }); +} else { +topic.getManagedLedger().removeWaitingCursor(cursor); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 618f
(pulsar) branch branch-3.2 updated (4a5d14ce683 -> de5a2ebd609)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a change to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git from 4a5d14ce683 [improve][misc] Specify valid home dir for the default user in the Ubuntu based docker image (#22446) new 7259fd0b26a [fix][broker] Fix consumer stops receiving messages when with large backlogs processing (#22454) new de5a2ebd609 [improve][build] Upgrade Lombok to 1.18.32 for Java 22 support (#22425) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 5 ++ .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 9 +--- pom.xml| 2 +- .../service/persistent/PersistentSubscription.java | 4 +- .../service/persistent/PersistentTopicTest.java| 56 +- 5 files changed, 66 insertions(+), 10 deletions(-)
(pulsar) 02/02: [improve][build] Upgrade Lombok to 1.18.32 for Java 22 support (#22425)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit de5a2ebd609e75c11857d5b02b58ef196efeb689 Author: Lari Hotari AuthorDate: Thu Apr 4 06:39:53 2024 -0700 [improve][build] Upgrade Lombok to 1.18.32 for Java 22 support (#22425) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 7a831b51bd1..85b3f9b8ea9 100644 --- a/pom.xml +++ b/pom.xml @@ -228,7 +228,7 @@ flexible messaging model and an intuitive client API. 0.9.1 2.1.0 3.24.2 -1.18.30 +1.18.32 1.3.2 2.3.1 1.2.0
(pulsar) branch master updated: [fix][broker] Fix consumer stops receiving messages when with large backlogs processing (#22454)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 57a616eaa79 [fix][broker] Fix consumer stops receiving messages when with large backlogs processing (#22454) 57a616eaa79 is described below commit 57a616eaa79096af5b49db89c99cd39ccc94ec00 Author: Jiwei Guo AuthorDate: Mon Apr 8 18:22:05 2024 +0800 [fix][broker] Fix consumer stops receiving messages when with large backlogs processing (#22454) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 5 ++ .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 9 +--- .../service/persistent/PersistentSubscription.java | 4 +- .../service/persistent/PersistentTopicTest.java| 56 +- 4 files changed, 65 insertions(+), 9 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 4daa06cad57..69b130a98c8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -990,6 +990,11 @@ public class ManagedCursorImpl implements ManagedCursor { log.debug("[{}] [{}] Re-trying the read at position {}", ledger.getName(), name, op.readPosition); } +if (isClosed()) { +callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx); +return; +} + if (!hasMoreEntries()) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Still no entries available. Register for notification", ledger.getName(), diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 3a12cb2ad6c..698563ed7a1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1032,6 +1032,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { + consumerName), ctx); return; } else if (!cursor.isDurable()) { +cursor.setState(ManagedCursorImpl.State.Closed); cursors.removeCursor(consumerName); deactivateCursorByName(consumerName); callback.deleteCursorComplete(ctx); @@ -3814,13 +3815,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } public void addWaitingCursor(ManagedCursorImpl cursor) { -if (cursor instanceof NonDurableCursorImpl) { -if (cursor.isActive()) { -this.waitingCursors.add(cursor); -} -} else { -this.waitingCursors.add(cursor); -} +this.waitingCursors.add(cursor); } public boolean isCursorActive(ManagedCursor cursor) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 6e8e94baeae..dbbf92aa76d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -308,7 +308,6 @@ public class PersistentSubscription extends AbstractSubscription { if (dispatcher != null && dispatcher.getConsumers().isEmpty()) { deactivateCursor(); -topic.getManagedLedger().removeWaitingCursor(cursor); if (!cursor.isDurable()) { // If cursor is not durable, we need to clean up the subscription as well. No need to check for active @@ -338,11 +337,14 @@ public class PersistentSubscription extends AbstractSubscription { if (!isResetCursor) { try { topic.getManagedLedger().deleteCursor(cursor.getName()); + topic.getManagedLedger().removeWaitingCursor(cursor); } catch (InterruptedException | ManagedLedgerException e) { log.warn("[{}] [{}] Failed to remove non durable cursor", topic.getName(), subName, e); } } }); +} else { +topic.getManagedLedger().removeWaitingCursor(cursor); } } diff --git a/pulsar-broke
(pulsar) branch branch-3.2 updated: [improve][io]: Add validation for JDBC sink not supporting primitive schema (#22376)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 1639ae2a0ec [improve][io]: Add validation for JDBC sink not supporting primitive schema (#22376) 1639ae2a0ec is described below commit 1639ae2a0ec34ce475bb813f629a3ce97a3c5e14 Author: Baodi Shi AuthorDate: Fri Mar 29 08:33:27 2024 +0800 [improve][io]: Add validation for JDBC sink not supporting primitive schema (#22376) --- pulsar-io/jdbc/core/pom.xml| 7 + .../pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java | 5 .../pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java | 25 + .../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java | 31 +++--- 4 files changed, 59 insertions(+), 9 deletions(-) diff --git a/pulsar-io/jdbc/core/pom.xml b/pulsar-io/jdbc/core/pom.xml index e1f15332a6c..7617e221105 100644 --- a/pulsar-io/jdbc/core/pom.xml +++ b/pulsar-io/jdbc/core/pom.xml @@ -71,6 +71,13 @@ provided + + ${project.groupId} + pulsar-client-original + ${project.version} + test + + \ No newline at end of file diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java index 36c36740919..3655688c0f3 100644 --- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java +++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java @@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.schema.GenericObject; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.KeyValueSchema; import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId; @@ -137,6 +138,10 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink data.get(k); } else { +SchemaType schemaType = message.getSchema().getSchemaInfo().getType(); +if (schemaType.isPrimitive()) { +throw new UnsupportedOperationException("Primitive schema is not supported: " + schemaType); +} recordValueGetter = (key) -> ((GenericRecord) record).getField(key); } String action = message.getProperties().get(ACTION_PROPERTY); diff --git a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java index b15eb832242..c088dd3c42c 100644 --- a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java +++ b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java @@ -22,6 +22,10 @@ import java.util.function.Function; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.util.Utf8; +import org.apache.pulsar.client.api.schema.GenericObject; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; +import org.apache.pulsar.functions.api.Record; import org.testng.Assert; import org.testng.annotations.Test; @@ -143,5 +147,26 @@ public class BaseJdbcAutoSchemaSinkTest { return consumer.apply(record).endRecord().getFields().get(0).schema(); } +@Test(expectedExceptions = UnsupportedOperationException.class, +expectedExceptionsMessageRegExp = "Primitive schema is not supported.*") +@SuppressWarnings("unchecked") +public void testNotSupportPrimitiveSchema() { +BaseJdbcAutoSchemaSink baseJdbcAutoSchemaSink = new BaseJdbcAutoSchemaSink() {}; +AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema(); + autoConsumeSchema.setSchema(org.apache.pulsar.client.api.Schema.STRING); +Record record = new Record() { +@Override +public org.apache.pulsar.client.api.Schema getSchema() { +return autoConsumeSchema; +} + +@Override +public GenericRecord getValue() { +return null; +} +}; +baseJdbcAutoSchemaSink.createMutation((Record) record); +} + } \ No newline at end of file diff --git a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java index d9ed4cbd442..ca01615bef1 100644 --- a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java +++ b/pulsar-io/jdb
(pulsar) branch branch-3.2 updated: [fix][ml] No rollover inactive ledgers when metadata service invalid (#22284)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new a83c892f1a9 [fix][ml] No rollover inactive ledgers when metadata service invalid (#22284) a83c892f1a9 is described below commit a83c892f1a954564f6672253b8d533b020c0fce5 Author: houxiaoyu AuthorDate: Sat Mar 30 21:38:55 2024 +0800 [fix][ml] No rollover inactive ledgers when metadata service invalid (#22284) ### Motivation We should not rollover inactive ledgers when metadata service is invailable. ### Modifications Checking metadata service is vailable when schedule `checkInactiveLedgerAndRollOver` --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 7 --- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 24 ++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ee8e7c430ef..3d66bc8d6c0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -4468,9 +4468,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @Override public boolean checkInactiveLedgerAndRollOver() { -long currentTimeMs = System.currentTimeMillis(); -if (currentLedgerEntries > 0 && inactiveLedgerRollOverTimeMs > 0 && currentTimeMs > (lastAddEntryTimeMs -+ inactiveLedgerRollOverTimeMs)) { +if (factory.isMetadataServiceAvailable() +&& currentLedgerEntries > 0 +&& inactiveLedgerRollOverTimeMs > 0 +&& System.currentTimeMillis() > (lastAddEntryTimeMs + inactiveLedgerRollOverTimeMs)) { log.info("[{}] Closing inactive ledger, last-add entry {}", name, lastAddEntryTimeMs); if (STATE_UPDATER.compareAndSet(this, State.LedgerOpened, State.ClosingLedger)) { LedgerHandle currentLedger = this.currentLedger; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 0d9d6c0e573..9b4375dbedf 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3922,6 +3922,30 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { ledger.close(); } +@Test +public void testDontRollOverInactiveLedgersWhenMetadataServiceInvalid() throws Exception { +int inactiveLedgerRollOverTimeMs = 5; +@Cleanup("shutdown") +ManagedLedgerFactoryImpl factory = spy(new ManagedLedgerFactoryImpl(metadataStore, bkc)); +// mock metadata service invalid +when(factory.isMetadataServiceAvailable()).thenReturn(false); +ManagedLedgerConfig config = new ManagedLedgerConfig(); +config.setInactiveLedgerRollOverTime(inactiveLedgerRollOverTimeMs, TimeUnit.MILLISECONDS); +ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("rollover_inactive", config); + +long ledgerId = ledger.currentLedger.getId(); + +Thread.sleep(inactiveLedgerRollOverTimeMs * 5); +ledger.checkInactiveLedgerAndRollOver(); + +Thread.sleep(inactiveLedgerRollOverTimeMs * 5); +ledger.checkInactiveLedgerAndRollOver(); + +assertEquals(ledger.currentLedger.getId(), ledgerId); + +ledger.close(); +} + @Test public void testOffloadTaskCancelled() throws Exception { @Cleanup("shutdown")
(pulsar) branch branch-3.2 updated: [improve] [broker] Servlet support response compression (#21667)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new c8591994807 [improve] [broker] Servlet support response compression (#21667) c8591994807 is described below commit c859199480756eee5bf2dec1b3e6ebf2af9cec21 Author: Hang Chen AuthorDate: Wed Mar 13 14:52:43 2024 +0800 [improve] [broker] Servlet support response compression (#21667) --- .../org/apache/pulsar/broker/web/WebService.java | 16 +++-- .../apache/pulsar/broker/web/WebServiceTest.java | 72 ++ 2 files changed, 82 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 902593b7bf6..a7c42448990 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -44,6 +44,7 @@ import org.eclipse.jetty.server.handler.HandlerCollection; import org.eclipse.jetty.server.handler.RequestLogHandler; import org.eclipse.jetty.server.handler.ResourceHandler; import org.eclipse.jetty.server.handler.StatisticsHandler; +import org.eclipse.jetty.server.handler.gzip.GzipHandler; import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; @@ -258,15 +259,18 @@ public class WebService implements AutoCloseable { public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication, Map attributeMap) { -ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); +ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS); // Notice: each context path should be unique, but there's nothing here to verify that -context.setContextPath(path); -context.addServlet(servletHolder, MATCH_ALL); +servletContextHandler.setContextPath(path); +servletContextHandler.addServlet(servletHolder, MATCH_ALL); if (attributeMap != null) { -attributeMap.forEach(context::setAttribute); +attributeMap.forEach(servletContextHandler::setAttribute); } -filterInitializer.addFilters(context, requiresAuthentication); -handlers.add(context); +filterInitializer.addFilters(servletContextHandler, requiresAuthentication); + +GzipHandler gzipHandler = new GzipHandler(); +gzipHandler.setHandler(servletContextHandler); +handlers.add(gzipHandler); } public void addStaticResources(String basePath, String resourcePath) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java index 405f3a11b5d..5386363373a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java @@ -21,11 +21,14 @@ package org.apache.pulsar.broker.web; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; + import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import com.google.common.io.CharStreams; import com.google.common.io.Closeables; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; + +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -42,6 +45,8 @@ import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.zip.GZIPInputStream; +import java.util.zip.ZipException; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; @@ -353,6 +358,73 @@ public class WebServiceTest { assertEquals(res.getResponseBody(), "ok"); } +@Test +public void testCompressOutputMetricsInPrometheus() throws Exception { + +setupEnv(true, false, false, false, -1, false); + +String metricsUrl = pulsar.getWebServiceAddress() + "/metrics/"; + +String[] command = {"curl", "-H", "Accept-Encoding: gzip", metricsUrl}; + +ProcessBuilder processBuilder = new ProcessBuilder(command); +Process process = processBuilder.start(); + +InputStream inputStream = process.getInputStream(); + +try { +GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream); + +// Process the decompressed content +
(pulsar) branch master updated: [improve][admin] Align the auth and check it at the first place for topic related API (#22342)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 50121e7f7be [improve][admin] Align the auth and check it at the first place for topic related API (#22342) 50121e7f7be is described below commit 50121e7f7be541f45bb6dc976f51e30658b1cb8d Author: Jiwei Guo AuthorDate: Mon Apr 1 20:46:18 2024 +0800 [improve][admin] Align the auth and check it at the first place for topic related API (#22342) --- .../apache/pulsar/broker/admin/AdminResource.java |4 +- .../broker/admin/impl/PersistentTopicsBase.java| 1284 ++-- .../pulsar/broker/admin/v2/PersistentTopics.java |3 +- .../apache/pulsar/broker/admin/TopicAuthZTest.java | 759 4 files changed, 1388 insertions(+), 662 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 618c4ca73e1..a1bfeb2142f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -480,9 +480,9 @@ public abstract class AdminResource extends PulsarWebResource { // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can // serve/redirect request else fail partitioned-metadata-request so, client fails while creating // producer/consumer -return validateClusterOwnershipAsync(topicName.getCluster()) +return validateTopicOperationAsync(topicName, TopicOperation.LOOKUP) +.thenCompose(__ -> validateClusterOwnershipAsync(topicName.getCluster())) .thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject())) -.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.LOOKUP)) .thenCompose(__ -> { if (checkAllowAutoCreation) { return pulsar().getBrokerService() diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 86993f749b5..16d088756f5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -553,8 +553,8 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture> internalGetPropertiesAsync(boolean authoritative) { -return validateTopicOwnershipAsync(topicName, authoritative) -.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_METADATA)) +return validateTopicOperationAsync(topicName, TopicOperation.GET_METADATA) +.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) .thenCompose(__ -> { if (topicName.isPartitioned()) { return getPropertiesAsync(); @@ -586,27 +586,27 @@ public class PersistentTopicsBase extends AdminResource { log.warn("[{}] [{}] properties is empty, ignore update", clientAppId(), topicName); return CompletableFuture.completedFuture(null); } -return validateTopicOwnershipAsync(topicName, authoritative) -.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.UPDATE_METADATA)) -.thenCompose(__ -> { -if (topicName.isPartitioned()) { -return internalUpdateNonPartitionedTopicProperties(properties); -} else { -return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName) -.thenCompose(metadata -> { -if (metadata.partitions == 0) { -return internalUpdateNonPartitionedTopicProperties(properties); -} -return namespaceResources() - .getPartitionedTopicResources().updatePartitionedTopicAsync(topicName, -p -> new PartitionedTopicMetadata(p.partitions, -p.properties == null ? properties -: MapUtils.putAll(p.properties, properties.entrySet().toArray(; -}); -} -}).thenAccept(__ -> -log.info("[{}] [{}] update properties success with properties {}"
(pulsar) branch master updated: [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync fo
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 3eb3b1cd23d [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalancer (#22379) 3eb3b1cd23d is described below commit 3eb3b1cd23d2cc11424bf882e244d3bc2e92bf27 Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com> AuthorDate: Mon Apr 1 01:52:21 2024 -0700 [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalancer (#22379) --- .../extensions/manager/UnloadManager.java | 14 ++- .../pulsar/broker/service/BrokerService.java | 11 +- .../pulsar/broker/web/PulsarWebResource.java | 5 +++ .../ExtensibleLoadManagerImplBaseTest.java | 8 .../extensions/ExtensibleLoadManagerImplTest.java | 27 + .../extensions/manager/UnloadManagerTest.java | 44 -- 6 files changed, 80 insertions(+), 29 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java index b210dedbfe8..ffae9475243 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.manager; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigning; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown; import com.google.common.annotations.VisibleForTesting; @@ -28,6 +30,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter; @@ -170,6 +173,15 @@ public class UnloadManager implements StateChangeListener { @Override public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) { +ServiceUnitState state = ServiceUnitStateData.state(data); + +if (StringUtils.isBlank(data.sourceBroker()) && (state == Owned || state == Assigning)) { +if (log.isDebugEnabled()) { +log.debug("Skipping {} for service unit {} from the assignment command.", data, serviceUnit); +} +return; +} + if (t != null) { if (log.isDebugEnabled()) { log.debug("Handling {} for service unit {} with exception.", data, serviceUnit, t); @@ -181,7 +193,7 @@ public class UnloadManager implements StateChangeListener { if (log.isDebugEnabled()) { log.debug("Handling {} for service unit {}", data, serviceUnit); } -ServiceUnitState state = ServiceUnitStateData.state(data); + switch (state) { case Free, Owned -> complete(serviceUnit, t); case Releasing -> LatencyMetric.RELEASE.endMeasurement(serviceUnit); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 98a0ed95b1a..549dfef896c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2244,9 +2244,18 @@ public class BrokerService implements Closeable { closeFutures.add(topicFuture .thenCompose(t -> t.isPresent() ? t.get().close( disconnectClients, closeWithoutWaitingClientDisconnect) -: CompletableFuture.completedFuture(null))); +: CompletableFuture.completedFuture(null)) +.exceptionally(e -> { +if (e.getCause() instanc
(pulsar) branch master updated: [improve][fn] Pass FunctionDetails to Go instance (#22350)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 7315aeb6258 [improve][fn] Pass FunctionDetails to Go instance (#22350) 7315aeb6258 is described below commit 7315aeb6258b7adc9d874268d50acb95ffc0cf2b Author: jiangpengcheng AuthorDate: Fri Mar 29 13:47:29 2024 +0800 [improve][fn] Pass FunctionDetails to Go instance (#22350) --- pulsar-function-go/conf/conf.go| 2 + pulsar-function-go/pf/instanceConf.go | 11 ++ pulsar-function-go/pf/instanceConf_test.go | 207 + .../functions/instance/go/GoInstanceConfig.java| 2 + .../pulsar/functions/runtime/RuntimeUtils.java | 6 + .../runtime/kubernetes/KubernetesRuntimeTest.java | 8 + 6 files changed, 236 insertions(+) diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go index 03513648fac..1442a0f865f 100644 --- a/pulsar-function-go/conf/conf.go +++ b/pulsar-function-go/conf/conf.go @@ -91,6 +91,8 @@ type Conf struct { UserConfig string `json:"userConfig" yaml:"userConfig"` //metrics config MetricsPort int `json:"metricsPort" yaml:"metricsPort"` + // FunctionDetails + FunctionDetails string `json:"functionDetails" yaml:"functionDetails"` } var ( diff --git a/pulsar-function-go/pf/instanceConf.go b/pulsar-function-go/pf/instanceConf.go index 4cb60dd258a..844a2bc9b89 100644 --- a/pulsar-function-go/pf/instanceConf.go +++ b/pulsar-function-go/pf/instanceConf.go @@ -25,7 +25,9 @@ import ( "time" "github.com/apache/pulsar/pulsar-function-go/conf" + log "github.com/apache/pulsar/pulsar-function-go/logutil" pb "github.com/apache/pulsar/pulsar-function-go/pb" + "google.golang.org/protobuf/encoding/protojson" ) // This is the config passed to the Golang Instance. Contains all the information @@ -122,6 +124,15 @@ func newInstanceConfWithConf(cfg *conf.Conf) *instanceConf { tlsAllowInsecure:cfg.TLSAllowInsecureConnection, tlsHostnameVerification: cfg.TLSHostnameVerificationEnable, } + // parse the raw function details and ignore the unmarshal error(fallback to original way) + if cfg.FunctionDetails != "" { + functionDetails := pb.FunctionDetails{} + if err := protojson.Unmarshal([]byte(cfg.FunctionDetails), ); err != nil { + log.Errorf("Failed to unmarshal function details: %v", err) + } else { + instanceConf.funcDetails = functionDetails + } + } if instanceConf.funcDetails.ProcessingGuarantees == pb.ProcessingGuarantees_EFFECTIVELY_ONCE { panic("Go instance current not support EFFECTIVELY_ONCE processing guarantees.") diff --git a/pulsar-function-go/pf/instanceConf_test.go b/pulsar-function-go/pf/instanceConf_test.go index 02aef913ebc..cc5f46e2fe1 100644 --- a/pulsar-function-go/pf/instanceConf_test.go +++ b/pulsar-function-go/pf/instanceConf_test.go @@ -20,6 +20,7 @@ package pf import ( + "fmt" "testing" cfg "github.com/apache/pulsar/pulsar-function-go/conf" @@ -113,3 +114,209 @@ func TestInstanceConf_Fail(t *testing.T) { newInstanceConfWithConf({ProcessingGuarantees: 3}) }, "Should have a panic") } + +func TestInstanceConf_WithDetails(t *testing.T) { + cfg := { + FunctionDetails: `{"tenant":"public","namespace":"default","name":"test-function","className":"process", +"logTopic":"test-logs","userConfig":"{\"key1\":\"value1\"}","runtime":"GO","autoAck":true,"parallelism":1, +"source":{"configs":"{\"username\":\"admin\"}","typeClassName":"string","timeoutMs":"15000", +"subscriptionName":"test-subscription","inputSpecs":{"input":{"schemaType":"avro","receiverQueueSize":{"value":1000}, +"schemaProperties":{"schema_prop1":"schema1"},"consumerProperties":{"consumer_prop1":"consumer1"},"cryptoSpec": +{"cryptoKeyReaderClassName":"key-reader","producerCryptoFailureAction":"SEND","consumerCryptoFailureAction":"CONSUME"}}} +,"negativeAckRedeliveryDelayMs
(pulsar) branch master updated: [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription (#22359)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 6f9c8e7f70e [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription (#22359) 6f9c8e7f70e is described below commit 6f9c8e7f70ec201d65c7fc270480bed3aa3b5aba Author: sherlock-lin <1193179...@qq.com> AuthorDate: Thu Mar 28 11:58:47 2024 +0800 [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription (#22359) --- .../pulsar/broker/service/nonpersistent/NonPersistentSubscription.java | 3 +-- .../pulsar/broker/service/persistent/PersistentSubscription.java | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index 92aba6221da..cfe05cc32b7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -40,7 +40,6 @@ import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFence import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.GetStatsOptions; -import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; @@ -53,7 +52,7 @@ import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class NonPersistentSubscription extends AbstractSubscription implements Subscription { +public class NonPersistentSubscription extends AbstractSubscription { private final NonPersistentTopic topic; private volatile NonPersistentDispatcher dispatcher; private final String topicName; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 50e84310ac1..6e8e94baeae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -97,7 +97,7 @@ import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PersistentSubscription extends AbstractSubscription implements Subscription { +public class PersistentSubscription extends AbstractSubscription { protected final PersistentTopic topic; protected final ManagedCursor cursor; protected volatile Dispatcher dispatcher;
(pulsar) branch master updated: [fix][broker] Fix typos in PersistentTopic class (#22364)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 32037c3b098 [fix][broker] Fix typos in PersistentTopic class (#22364) 32037c3b098 is described below commit 32037c3b0982aa00a7cb5ee7e17a6b235a8c2d7f Author: hanmz AuthorDate: Thu Mar 28 10:31:09 2024 +0800 [fix][broker] Fix typos in PersistentTopic class (#22364) --- .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 6179e73169f..1650e449a3f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2753,7 +2753,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal ledger.asyncMigrate(); } if (log.isDebugEnabled()) { -log.debug("{} has replication backlog and applied migraiton", topic); +log.debug("{} has replication backlog and applied migration", topic); } return CompletableFuture.completedFuture(null); }
(pulsar) branch master updated: [improve][broker] Optimize web interface deleteDynamicConfiguration return error message (#22356)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new e4553391f96 [improve][broker] Optimize web interface deleteDynamicConfiguration return error message (#22356) e4553391f96 is described below commit e4553391f96af3bda3d8252b97cac3de1f39a1b5 Author: hanmz AuthorDate: Thu Mar 28 00:07:54 2024 +0800 [improve][broker] Optimize web interface deleteDynamicConfiguration return error message (#22356) --- .../src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 61b354610ac..83067e9f296 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -526,7 +526,7 @@ public class BrokersBase extends AdminResource { private CompletableFuture internalDeleteDynamicConfigurationOnMetadataAsync(String configName) { if (!pulsar().getBrokerService().isDynamicConfiguration(configName)) { -throw new RestException(Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration"); +throw new RestException(Status.PRECONDITION_FAILED, "Can't delete non-dynamic configuration"); } else { return dynamicConfigurationResources().setDynamicConfigurationAsync(old -> { if (old != null) {
(pulsar) branch master updated: [cleanup][cli] Cleanup jcommander (#22337)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new d23a8f64acb [cleanup][cli] Cleanup jcommander (#22337) d23a8f64acb is described below commit d23a8f64acbfb4179b9f2f64e1e9dd0756742a5b Author: Zixuan Liu AuthorDate: Wed Mar 27 16:36:36 2024 +0800 [cleanup][cli] Cleanup jcommander (#22337) Signed-off-by: Zixuan Liu --- distribution/shell/src/assemble/LICENSE.bin.txt| 1 - pom.xml| 7 --- pulsar-cli-utils/pom.xml | 5 -- .../cli/converters/TimeUnitToSecondsConverter.java | 42 -- .../validators/IntegerMaxValueLongValidator.java | 30 -- .../cli/validators/MinNegativeOneValidator.java| 30 -- .../cli/validators/NonNegativeValueValidator.java | 30 -- .../validators/PositiveIntegerValueValidator.java | 31 --- .../cli/validators/PositiveLongValueValidator.java | 31 --- .../apache/pulsar/cli/validators/package-info.java | 19 --- .../pulsar/cli/converters/TimeConversionTest.java | 5 +- .../cli/validators/CliUtilValidatorsTest.java | 64 -- .../cli/utils/NameValueParameterSplitterTest.java | 52 -- .../apache/pulsar/admin/cli/utils/CmdUtils.java| 11 ++-- .../cli/utils/NameValueParameterSplitter.java | 61 - .../org/apache/pulsar/client/cli/CmdProduce.java | 9 ++- .../java/org/apache/pulsar/client/cli/CmdRead.java | 3 +- .../apache/pulsar/admin/cli/TestCmdSources.java| 3 +- 18 files changed, 14 insertions(+), 420 deletions(-) diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt index 2b2f1c26be1..e735bd454ee 100644 --- a/distribution/shell/src/assemble/LICENSE.bin.txt +++ b/distribution/shell/src/assemble/LICENSE.bin.txt @@ -309,7 +309,6 @@ pulsar-client-cpp/lib/checksum/crc32c_sw.cc This projects includes binary packages with the following licenses: The Apache Software License, Version 2.0 - * JCommander -- jcommander-1.82.jar * Picocli - picocli-4.7.5.jar - picocli-shell-jline3-4.7.5.jar diff --git a/pom.xml b/pom.xml index caa2fc49b27..da7f2c76421 100644 --- a/pom.xml +++ b/pom.xml @@ -213,7 +213,6 @@ flexible messaging model and an intuitive client API. 6.2.8 0.20 2.12.1 -1.82 3.11 1.10 2.8.0 @@ -693,12 +692,6 @@ flexible messaging model and an intuitive client API. linux-aarch_64 - -com.beust -jcommander -${jcommander.version} - - info.picocli picocli diff --git a/pulsar-cli-utils/pom.xml b/pulsar-cli-utils/pom.xml index ac442b4004e..1638029f4c8 100644 --- a/pulsar-cli-utils/pom.xml +++ b/pulsar-cli-utils/pom.xml @@ -35,11 +35,6 @@ Isolated CLI utility module - - com.beust - jcommander - compile - info.picocli picocli diff --git a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/TimeUnitToSecondsConverter.java b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/TimeUnitToSecondsConverter.java deleted file mode 100644 index 3aca2e95d25..000 --- a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/TimeUnitToSecondsConverter.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.cli.converters; - -import static org.apache.pulsar.cli.ValueValidationUtil.emptyCheck; -import com.beust.jcommander.ParameterException; -import com.beust.jcommander.converters.BaseConverter; -import java.util.concurrent.TimeUnit; - -public class TimeUnitToSecondsConverter extends BaseConverter { - -public TimeUnitToSecondsConverter(String optionName) { -super(optionName); -} - -@Override -public Long convert(String str) { -emptyCheck(getOptionName(), str); -try { -return TimeUn
(pulsar) branch master updated: [fix][cli] Fix typos in CmdSinks class (#22358)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 023446b7328 [fix][cli] Fix typos in CmdSinks class (#22358) 023446b7328 is described below commit 023446b73287dea25c22c6db307e5d723306e765 Author: hanmz AuthorDate: Wed Mar 27 11:40:28 2024 +0800 [fix][cli] Fix typos in CmdSinks class (#22358) --- .../src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java | 2 +- .../src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 47af7e6794c..f3172a49b01 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -617,7 +617,7 @@ public class CmdSinks extends CmdBase { protected void validateSinkConfigs(SinkConfig sinkConfig) { if (isBlank(sinkConfig.getArchive())) { -throw new ParameterException("Sink archive not specfied"); +throw new ParameterException("Sink archive not specified"); } org.apache.pulsar.common.functions.Utils.inferMissingArguments(sinkConfig); diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java index c68bbd20ab8..6fbe3bc5da2 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java @@ -283,7 +283,7 @@ public class TestCmdSinks { } @Test(expectedExceptions = CliCommand.ParameterException.class, -expectedExceptionsMessageRegExp = "Sink archive not specfied") +expectedExceptionsMessageRegExp = "Sink archive not specified") public void testMissingArchive() throws Exception { SinkConfig sinkConfig = getSinkConfig(); sinkConfig.setArchive(null); @@ -503,7 +503,7 @@ public class TestCmdSinks { testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig); } -@Test(expectedExceptions = CliCommand.ParameterException.class, expectedExceptionsMessageRegExp = "Sink archive not specfied") +@Test(expectedExceptions = CliCommand.ParameterException.class, expectedExceptionsMessageRegExp = "Sink archive not specified") public void testCmdSinkConfigFileMissingJar() throws Exception { SinkConfig testSinkConfig = getSinkConfig(); testSinkConfig.setArchive(null);
(pulsar) branch branch-2.10 updated: [fix][broker] Fast fix infinite HTTP call getSubscriptions caused by wrong topicName (#22357)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.10 by this push: new 13938c1b051 [fix][broker] Fast fix infinite HTTP call getSubscriptions caused by wrong topicName (#22357) 13938c1b051 is described below commit 13938c1b051786077744cbf362cd68a0f0875f16 Author: fengyubiao AuthorDate: Tue Mar 26 19:50:15 2024 +0800 [fix][broker] Fast fix infinite HTTP call getSubscriptions caused by wrong topicName (#22357) Co-authored-by: Jiwe Guo --- .../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java| 2 +- .../api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java| 9 ++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index dbcfd734a37..6e537900f39 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -,7 +,7 @@ public class PersistentTopicsBase extends AdminResource { getPartitionedTopicMetadataAsync(topicName, authoritative, allowAutoTopicCreation)) .thenAccept(partitionMetadata -> { final int numPartitions = partitionMetadata.partitions; -if (numPartitions > 0) { +if (partitionMetadata.partitions > 0 && !isUnexpectedTopicName(partitionMetadata)) { final CompletableFuture future = new CompletableFuture<>(); final AtomicInteger count = new AtomicInteger(numPartitions); final AtomicInteger failureCount = new AtomicInteger(0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java index 6d3806f312e..25eee62609f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java @@ -52,7 +52,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerCo } @Test -public void testInfiniteHttpCallGetSubscriptions() throws Exception { +public void testInfiniteHttpCallGetOrCreateSubscriptions() throws Exception { final String randomStr = UUID.randomUUID().toString().replaceAll("-", ""); final String partitionedTopicName = "persistent://my-property/my-ns/tp1_" + randomStr; final String topic_p0 = partitionedTopicName + TopicName.PARTITIONED_TOPIC_SUFFIX + "0"; @@ -64,6 +64,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerCo // Do test. ProducerAndConsumerEntry pcEntry = triggerDLQCreated(topic_p0, topicDLQ, subscriptionName); admin.topics().getSubscriptions(topicDLQ); +admin.topics().createSubscription(topicDLQ, "s1", MessageId.earliest); // cleanup. pcEntry.consumer.close(); @@ -72,7 +73,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerCo } @Test -public void testInfiniteHttpCallGetSubscriptions2() throws Exception { +public void testInfiniteHttpCallGetOrCreateSubscriptions2() throws Exception { final String randomStr = UUID.randomUUID().toString().replaceAll("-", ""); final String topicName = "persistent://my-property/my-ns/tp1_" + randomStr + "-partition-0-abc"; Producer producer = pulsarClient.newProducer(Schema.STRING) @@ -81,13 +82,14 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerCo // Do test. admin.topics().getSubscriptions(topicName); +admin.topics().createSubscription(topicName, "s1", MessageId.earliest); // cleanup. producer.close(); } @Test -public void testInfiniteHttpCallGetSubscriptions3() throws Exception { +public void testInfiniteHttpCallGetOrCreateSubscriptions3() throws Exception { final String randomStr = UUID.randomUUID().toString().replaceAll("-", ""); final String topicName = "persistent://my-property/my-ns/tp1_" + randomStr + "-partition-0"; Producer producer = pulsarClient.newProducer(Schema.STRING) @@ -96,6 +98,7 @@ public class Top
(pulsar) branch master updated: [improve][cli] PIP-343: Use picocli instead of jcommander in pulsar-perf (#22303)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 69c45ad5300 [improve][cli] PIP-343: Use picocli instead of jcommander in pulsar-perf (#22303) 69c45ad5300 is described below commit 69c45ad5300e36a62a923b8eaa58aab99c6e02fb Author: crossoverJie AuthorDate: Fri Mar 22 09:12:37 2024 +0800 [improve][cli] PIP-343: Use picocli instead of jcommander in pulsar-perf (#22303) Co-authored-by: Zixuan Liu --- .../cli/converters/ByteUnitToLongConverter.java| 39 - .../pulsar/cli/converters/ByteConversionTest.java | 9 +- pulsar-testclient/pom.xml | 4 +- .../proxy/socket/client/PerformanceClient.java | 65 +++ .../apache/pulsar/testclient/BrokerMonitor.java| 30 +++ .../testclient/CmdGenerateDocumentation.java | 67 +-- .../pulsar/testclient/LoadSimulationClient.java| 34 .../testclient/LoadSimulationController.java | 68 +++ .../pulsar/testclient/ManagedLedgerWriter.java | 57 ++--- .../testclient/PerformanceBaseArguments.java | 59 +++-- .../pulsar/testclient/PerformanceConsumer.java | 65 --- .../pulsar/testclient/PerformanceProducer.java | 96 +++--- .../pulsar/testclient/PerformanceReader.java | 19 +++-- .../testclient/PerformanceTopicListArguments.java | 10 ++- .../pulsar/testclient/PerformanceTransaction.java | 45 +- ...or.java => PositiveNumberParameterConvert.java} | 15 ++-- .../pulsar/testclient/GenerateDocumentionTest.java | 37 + 17 files changed, 377 insertions(+), 342 deletions(-) diff --git a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/ByteUnitToLongConverter.java b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/ByteUnitToLongConverter.java deleted file mode 100644 index 6170fb489d4..000 --- a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/ByteUnitToLongConverter.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.cli.converters; - -import static org.apache.pulsar.cli.ValueValidationUtil.emptyCheck; -import com.beust.jcommander.converters.BaseConverter; - -public class ByteUnitToLongConverter extends BaseConverter { - -public ByteUnitToLongConverter(String optionName) { -super(optionName); -} - -@Override -public Long convert(String argStr) { -return parseBytes(argStr); -} - -Long parseBytes(String argStr) { -emptyCheck(getOptionName(), argStr); -return ByteUnitUtil.validateSizeString(argStr); -} -} diff --git a/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/ByteConversionTest.java b/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/ByteConversionTest.java index 283e94bfb9c..6e7a2e6d7e3 100644 --- a/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/ByteConversionTest.java +++ b/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/ByteConversionTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.cli.converters; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertThrows; import org.apache.pulsar.cli.converters.picocli.ByteUnitToIntegerConverter; +import org.apache.pulsar.cli.converters.picocli.ByteUnitToLongConverter; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import picocli.CommandLine.TypeConversionException; @@ -59,8 +60,8 @@ public class ByteConversionTest { } @Test(dataProvider = "successfulByteUnitUtilTestCases") -public void testSuccessfulByteUnitToLongConverter(String input, long expected) { -ByteUnitToLongConverter converter = new ByteUnitToLongConverter("optionName"); +public void testSuccessfulByteUnitToLongConverter(String input, long expected) throws Exception{ +ByteUnitToLongConverter converter = new ByteU
(pulsar) branch master updated: [improve][admin] Fix the `createMissingPartitions` doesn't response correctly (#22311)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 5cabcacbfa8 [improve][admin] Fix the `createMissingPartitions` doesn't response correctly (#22311) 5cabcacbfa8 is described below commit 5cabcacbfa8874931d501cd040f7a8ac3d6d1923 Author: Jiwei Guo AuthorDate: Thu Mar 21 15:24:50 2024 +0800 [improve][admin] Fix the `createMissingPartitions` doesn't response correctly (#22311) --- .../org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 4 +++- .../java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java | 7 +++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 10cf5edd3c3..86993f749b5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -487,7 +487,7 @@ public class PersistentTopicsBase extends AdminResource { protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) { getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> { -if (metadata != null) { +if (metadata != null && metadata.partitions > 0) { CompletableFuture future = validateNamespaceOperationAsync(topicName.getNamespaceObject(), NamespaceOperation.CREATE_TOPIC); future.thenCompose(__ -> tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> { @@ -497,6 +497,8 @@ public class PersistentTopicsBase extends AdminResource { resumeAsyncResponseExceptionally(asyncResponse, e); return null; }); +} else { +throw new RestException(Status.NOT_FOUND, String.format("Topic %s does not exist", topicName)); } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 23cb413614f..9a292175caa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -33,6 +33,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import java.lang.reflect.Field; import java.util.ArrayList; @@ -1779,4 +1780,10 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { assertTrue(namespaces.contains(ns1V2)); assertTrue(namespaces.contains(ns1V1)); } + +@Test +public void testCreateMissingPartitions() throws Exception { +String topicName = "persistent://" + testTenant + "/" + testNamespaceLocal + "/testCreateMissingPartitions"; +assertThrows(PulsarAdminException.NotFoundException.class, () -> admin.topics().createMissedPartitions(topicName)); +} }
(pulsar) branch branch-3.0 updated: [improve][broker] Add fine-grain authorization to ns/topic management endpoints (#22309)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new b51b74883fb [improve][broker] Add fine-grain authorization to ns/topic management endpoints (#22309) b51b74883fb is described below commit b51b74883fb66673161d0b73c6a7257d073c57a5 Author: Jiwei Guo AuthorDate: Wed Mar 20 20:37:32 2024 +0800 [improve][broker] Add fine-grain authorization to ns/topic management endpoints (#22309) --- .../authorization/PulsarAuthorizationProvider.java | 1 + .../apache/pulsar/broker/admin/AdminResource.java | 7 +- .../pulsar/broker/admin/impl/NamespacesBase.java | 166 +- .../broker/admin/impl/PersistentTopicsBase.java| 161 +- .../pulsar/broker/admin/NamespaceAuthZTest.java| 164 ++ .../apache/pulsar/broker/admin/TopicAuthZTest.java | 346 + 6 files changed, 680 insertions(+), 165 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 162c44bec38..9e9f2a446de 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -542,6 +542,7 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider { case COMPACT: case OFFLOAD: case UNLOAD: +case TRIM_TOPIC: case DELETE_METADATA: case UPDATE_METADATA: case ADD_BUNDLE_RANGE: diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index e5806b7bec2..f048d650f8e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -59,8 +59,6 @@ import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; -import org.apache.pulsar.common.policies.data.PolicyName; -import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SubscribeRate; @@ -710,10 +708,7 @@ public abstract class AdminResource extends PulsarWebResource { } protected CompletableFuture getSchemaCompatibilityStrategyAsync() { -return validateTopicPolicyOperationAsync(topicName, -PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, -PolicyOperation.READ) -.thenCompose((__) -> getSchemaCompatibilityStrategyAsyncWithoutAuth()).whenComplete((__, ex) -> { +return getSchemaCompatibilityStrategyAsyncWithoutAuth().whenComplete((__, ex) -> { if (ex != null) { log.error("[{}] Failed to get schema compatibility strategy of topic {} {}", clientAppId(), topicName, ex); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 46b1712e7dd..72f5f1439d9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2378,102 +2378,110 @@ public abstract class NamespacesBase extends AdminResource { } protected void internalSetProperty(String key, String value, AsyncResponse asyncResponse) { - validatePoliciesReadOnlyAccess(); - updatePoliciesAsync(namespaceName, policies -> { - policies.properties.put(key, value); - return policies; - }).thenAccept(v -> { - log.info("[{}] Successfully set property for key {} on namespace {}", clientAppId(), key, - namespaceName); - asyncResponse.resume(Response.noContent().build()); - }).exceptionally(ex -> { - Throwable cause = ex.getCause(); - log.error("[{}] Failed to set property for key {} on namespace {}", clientAppId(), key, - namespaceName
(pulsar) branch branch-3.2 updated: [fix] [client] Unclear error message when creating a consumer with two same topics (#22255)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 0edb8a93470 [fix] [client] Unclear error message when creating a consumer with two same topics (#22255) 0edb8a93470 is described below commit 0edb8a934704ede1cc134983a84016e611ac8cec Author: fengyubiao AuthorDate: Tue Mar 19 17:23:13 2024 +0800 [fix] [client] Unclear error message when creating a consumer with two same topics (#22255) --- .../pulsar/client/api/MultiTopicsConsumerTest.java | 27 ++ .../client/impl/MultiTopicsConsumerImpl.java | 21 + 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java index bb8bab29ad9..7a12acd47ed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java @@ -26,9 +26,11 @@ import static org.mockito.Mockito.verify; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import com.google.common.collect.Lists; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -42,6 +44,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.ConsumerImpl; @@ -50,6 +53,7 @@ import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.mockito.AdditionalAnswers; import org.mockito.Mockito; @@ -372,6 +376,29 @@ public class MultiTopicsConsumerTest extends ProducerConsumerBase { assertTrue(consumer.isConnected()); } +@Test +public void testSameTopics() throws Exception { +final String topic1 = BrokerTestUtil.newUniqueName("public/default/tp"); +final String topic2 = "persistent://" + topic1; +admin.topics().createNonPartitionedTopic(topic2); +// Create consumer with two same topics. +try { + pulsarClient.newConsumer(Schema.INT32).topics(Arrays.asList(topic1, topic2)) +.subscriptionName("s1").subscribe(); +fail("Do not allow use two same topics."); +} catch (Exception e) { +if (e instanceof PulsarClientException && e.getCause() != null) { +e = (Exception) e.getCause(); +} +Throwable unwrapEx = FutureUtil.unwrapCompletionException(e); +assertTrue(unwrapEx instanceof IllegalArgumentException); +assertTrue(e.getMessage().contains( "Subscription topics include duplicate items" ++ " or invalid names")); +} +// cleanup. +admin.topics().delete(topic2); +} + @Test(timeOut = 3) public void testSubscriptionNotFound() throws PulsarAdminException, PulsarClientException { final var topic1 = newTopicName(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index d18af475d61..20fd03d6a28 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -165,7 +165,8 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { return; } -checkArgument(topicNamesValid(conf.getTopicNames()), "Topics is invalid."); +checkArgument(topicNamesValid(conf.getTopicNames()), "Subscription topics include duplicate items" ++ " or invalid names."); List> futures = conf.getTopicNames().stream() .map(t -> subscribeAsync(t, createTopicIfDoesNotExist)) @@ -202,21 +203,21 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { checkState(topics != null && topics.size() >= 1,
(pulsar) branch branch-3.2 updated: [fix][broker] Fix wrong double-checked locking for readOnActiveConsumerTask in dispatcher (#22279)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 35ea4712a60 [fix][broker] Fix wrong double-checked locking for readOnActiveConsumerTask in dispatcher (#22279) 35ea4712a60 is described below commit 35ea4712a60992deeb93171c8b0e6d48c84e530a Author: Yunze Xu AuthorDate: Sat Mar 16 14:56:34 2024 +0800 [fix][broker] Fix wrong double-checked locking for readOnActiveConsumerTask in dispatcher (#22279) --- .../PersistentDispatcherSingleActiveConsumer.java | 26 +- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 387ba83d9cd..7494d11421a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -74,6 +74,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher protected volatile int readBatchSize; protected final Backoff readFailureBackoff; private volatile ScheduledFuture readOnActiveConsumerTask = null; +private final Object lockForReadOnActiveConsumerTask = new Object(); private final RedeliveryTracker redeliveryTracker; @@ -123,18 +124,23 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher return; } -readOnActiveConsumerTask = topic.getBrokerService().executor().schedule(() -> { -if (log.isDebugEnabled()) { -log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name, - serviceConfig.getActiveConsumerFailoverDelayTimeMillis()); +synchronized (lockForReadOnActiveConsumerTask) { +if (readOnActiveConsumerTask != null) { +return; } -Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this); -cursor.rewind(activeConsumer != null && activeConsumer.readCompacted()); +readOnActiveConsumerTask = topic.getBrokerService().executor().schedule(() -> { +if (log.isDebugEnabled()) { +log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name, + serviceConfig.getActiveConsumerFailoverDelayTimeMillis()); +} +Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this); +cursor.rewind(activeConsumer != null && activeConsumer.readCompacted()); -notifyActiveConsumerChanged(activeConsumer); -readMoreEntries(activeConsumer); -readOnActiveConsumerTask = null; -}, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS); +notifyActiveConsumerChanged(activeConsumer); +readMoreEntries(activeConsumer); +readOnActiveConsumerTask = null; +}, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS); +} } @Override
(pulsar) branch branch-3.2 updated: [improve][broker] Add fine-grain authorization to ns/topic management endpoints (#22305)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 6ffe667cdda [improve][broker] Add fine-grain authorization to ns/topic management endpoints (#22305) 6ffe667cdda is described below commit 6ffe667cddad3e959e02ce31fd09b2f9a439d50a Author: Jiwei Guo AuthorDate: Wed Mar 20 13:49:54 2024 +0800 [improve][broker] Add fine-grain authorization to ns/topic management endpoints (#22305) --- .../authorization/PulsarAuthorizationProvider.java | 1 + .../apache/pulsar/broker/admin/AdminResource.java | 7 +- .../pulsar/broker/admin/impl/NamespacesBase.java | 166 +- .../broker/admin/impl/PersistentTopicsBase.java| 172 +- .../pulsar/broker/admin/NamespaceAuthZTest.java| 163 ++ .../apache/pulsar/broker/admin/TopicAuthZTest.java | 345 + 6 files changed, 683 insertions(+), 171 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index acb6fce9b92..a39c3d05607 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -597,6 +597,7 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider { case COMPACT: case OFFLOAD: case UNLOAD: +case TRIM_TOPIC: case DELETE_METADATA: case UPDATE_METADATA: case ADD_BUNDLE_RANGE: diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 2ceec189975..b0d66c27d63 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -62,8 +62,6 @@ import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; -import org.apache.pulsar.common.policies.data.PolicyName; -import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SubscribeRate; @@ -714,10 +712,7 @@ public abstract class AdminResource extends PulsarWebResource { } protected CompletableFuture getSchemaCompatibilityStrategyAsync() { -return validateTopicPolicyOperationAsync(topicName, -PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, -PolicyOperation.READ) -.thenCompose((__) -> getSchemaCompatibilityStrategyAsyncWithoutAuth()).whenComplete((__, ex) -> { +return getSchemaCompatibilityStrategyAsyncWithoutAuth().whenComplete((__, ex) -> { if (ex != null) { log.error("[{}] Failed to get schema compatibility strategy of topic {} {}", clientAppId(), topicName, ex); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index f274cffa46b..b2e427b6e46 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2339,102 +2339,110 @@ public abstract class NamespacesBase extends AdminResource { } protected void internalSetProperty(String key, String value, AsyncResponse asyncResponse) { - validatePoliciesReadOnlyAccess(); - updatePoliciesAsync(namespaceName, policies -> { - policies.properties.put(key, value); - return policies; - }).thenAccept(v -> { - log.info("[{}] Successfully set property for key {} on namespace {}", clientAppId(), key, - namespaceName); - asyncResponse.resume(Response.noContent().build()); - }).exceptionally(ex -> { - Throwable cause = ex.getCause(); - log.error("[{}] Failed to set property for key {} on namespace {}", clientAppId(), key, - namespaceName
(pulsar) branch master updated: [improve][broker] Remove the atomicity on active consumer of a dispatcher (#22285)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 1b1bd4b610d [improve][broker] Remove the atomicity on active consumer of a dispatcher (#22285) 1b1bd4b610d is described below commit 1b1bd4b610dd768a6908964ef841a6790bb0f4f0 Author: Yunze Xu AuthorDate: Tue Mar 19 18:57:10 2024 +0800 [improve][broker] Remove the atomicity on active consumer of a dispatcher (#22285) --- .../AbstractDispatcherSingleActiveConsumer.java| 30 ++-- ...onPersistentDispatcherSingleActiveConsumer.java | 4 +- .../PersistentDispatcherSingleActiveConsumer.java | 56 +++--- 3 files changed, 45 insertions(+), 45 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java index 7726eb814a0..9980b6ae97c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java @@ -30,7 +30,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; @@ -47,9 +46,6 @@ import org.slf4j.LoggerFactory; public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBaseDispatcher { protected final String topicName; -protected static final AtomicReferenceFieldUpdater -ACTIVE_CONSUMER_UPDATER = AtomicReferenceFieldUpdater.newUpdater( -AbstractDispatcherSingleActiveConsumer.class, Consumer.class, "activeConsumer"); private volatile Consumer activeConsumer = null; protected final CopyOnWriteArrayList consumers; protected StickyKeyConsumerSelector stickyKeyConsumerSelector; @@ -78,11 +74,16 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas this.partitionIndex = partitionIndex; this.subscriptionType = subscriptionType; this.cursor = cursor; -ACTIVE_CONSUMER_UPDATER.set(this, null); } +/** + * @apiNote this method does not need to be thread safe + */ protected abstract void scheduleReadOnActiveConsumer(); +/** + * @apiNote this method does not need to be thread safe + */ protected abstract void cancelPendingRead(); protected void notifyActiveConsumerChanged(Consumer activeConsumer) { @@ -99,6 +100,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas * distributed partitions evenly across consumers with highest priority level. * * @return the true consumer if the consumer is changed, otherwise false. + * @apiNote this method is not thread safe */ protected boolean pickAndScheduleActiveConsumer() { checkArgument(!consumers.isEmpty()); @@ -128,14 +130,14 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas ? partitionIndex % consumersSize : peekConsumerIndexFromHashRing(makeHashRing(consumersSize)); -Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index)); +Consumer selectedConsumer = consumers.get(index); -Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this); -if (prevConsumer == activeConsumer) { +if (selectedConsumer == activeConsumer) { // Active consumer did not change. Do nothing at this point return false; } else { // If the active consumer is changed, send notification. +activeConsumer = selectedConsumer; scheduleReadOnActiveConsumer(); return true; } @@ -167,7 +169,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas } if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) { -Consumer actConsumer = ACTIVE_CONSUMER_UPDATER.get(this); +Consumer actConsumer = getActiveConsumer(); if (actConsumer != null) { return actConsumer.cnx().checkConnectionLiveness().thenCompose(actConsumerStillAlive -> { if (actConsumerStillAlive.isEmpty() || actConsumerStillAlive.get()) { @@ -210,7 +212,7 @@
(pulsar) branch master updated: [improve][misc] Upgrade jersey to 2.41 (#22290)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 0c9d8601698 [improve][misc] Upgrade jersey to 2.41 (#22290) 0c9d8601698 is described below commit 0c9d86016983b9683aa3b637be38e281090f40f0 Author: Zixuan Liu AuthorDate: Mon Mar 18 22:14:11 2024 +0800 [improve][misc] Upgrade jersey to 2.41 (#22290) --- distribution/server/src/assemble/LICENSE.bin.txt | 20 ++-- distribution/shell/src/assemble/LICENSE.bin.txt | 14 +++--- pom.xml | 2 +- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 9e12db74eb1..dac03d966a6 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -582,16 +582,16 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt - org.glassfish.hk2-osgi-resource-locator-1.0.3.jar - org.glassfish.hk2.external-aopalliance-repackaged-2.6.1.jar * Jersey -- org.glassfish.jersey.containers-jersey-container-servlet-2.34.jar -- org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar -- org.glassfish.jersey.core-jersey-client-2.34.jar -- org.glassfish.jersey.core-jersey-common-2.34.jar -- org.glassfish.jersey.core-jersey-server-2.34.jar -- org.glassfish.jersey.ext-jersey-entity-filtering-2.34.jar -- org.glassfish.jersey.media-jersey-media-json-jackson-2.34.jar -- org.glassfish.jersey.media-jersey-media-multipart-2.34.jar -- org.glassfish.jersey.inject-jersey-hk2-2.34.jar - * Mimepull -- org.jvnet.mimepull-mimepull-1.9.13.jar +- org.glassfish.jersey.containers-jersey-container-servlet-2.41.jar +- org.glassfish.jersey.containers-jersey-container-servlet-core-2.41.jar +- org.glassfish.jersey.core-jersey-client-2.41.jar +- org.glassfish.jersey.core-jersey-common-2.41.jar +- org.glassfish.jersey.core-jersey-server-2.41.jar +- org.glassfish.jersey.ext-jersey-entity-filtering-2.41.jar +- org.glassfish.jersey.media-jersey-media-json-jackson-2.41.jar +- org.glassfish.jersey.media-jersey-media-multipart-2.41.jar +- org.glassfish.jersey.inject-jersey-hk2-2.41.jar + * Mimepull -- org.jvnet.mimepull-mimepull-1.9.15.jar Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt * Jakarta Activation diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt index a9bbd541448..7ae4241dfb9 100644 --- a/distribution/shell/src/assemble/LICENSE.bin.txt +++ b/distribution/shell/src/assemble/LICENSE.bin.txt @@ -441,13 +441,13 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt - aopalliance-repackaged-2.6.1.jar - osgi-resource-locator-1.0.3.jar * Jersey -- jersey-client-2.34.jar -- jersey-common-2.34.jar -- jersey-entity-filtering-2.34.jar -- jersey-media-json-jackson-2.34.jar -- jersey-media-multipart-2.34.jar -- jersey-hk2-2.34.jar - * Mimepull -- mimepull-1.9.13.jar +- jersey-client-2.41.jar +- jersey-common-2.41.jar +- jersey-entity-filtering-2.41.jar +- jersey-media-json-jackson-2.41.jar +- jersey-media-multipart-2.41.jar +- jersey-hk2-2.41.jar + * Mimepull -- mimepull-1.9.15.jar Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt * Jakarta Activation diff --git a/pom.xml b/pom.xml index 98eea81c30a..caa2fc49b27 100644 --- a/pom.xml +++ b/pom.xml @@ -148,7 +148,7 @@ flexible messaging model and an intuitive client API. 0.0.24.Final 9.4.54.v20240208 2.5.2 -2.34 +2.41 1.10.50 0.16.0 4.3.8
(pulsar) branch branch-3.2 updated: [improve] [broker] Support create RawReader based on configuration (#22280)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new f1bdb78564e [improve] [broker] Support create RawReader based on configuration (#22280) f1bdb78564e is described below commit f1bdb78564e739ed558fef1e0c9b8ea1d54c402c Author: Hang Chen AuthorDate: Mon Mar 18 18:49:52 2024 +0800 [improve] [broker] Support create RawReader based on configuration (#22280) --- .../org/apache/pulsar/client/api/RawReader.java| 11 ++ .../apache/pulsar/client/impl/RawReaderImpl.java | 8 + .../apache/pulsar/client/impl/RawReaderTest.java | 39 -- 3 files changed, 56 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java index b7805c36b3b..55483708fdf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.RawReaderImpl; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; /** * Topic reader which receives raw messages (i.e. as they are stored in the managed ledger). @@ -43,6 +44,16 @@ public interface RawReader { return future.thenApply(__ -> r); } +static CompletableFuture create(PulsarClient client, + ConsumerConfigurationData consumerConfiguration, + boolean createTopicIfDoesNotExist) { +CompletableFuture> future = new CompletableFuture<>(); +RawReader r = new RawReaderImpl((PulsarClientImpl) client, +consumerConfiguration, future, createTopicIfDoesNotExist); +return future.thenApply(__ -> r); +} + + /** * Get the topic for the reader. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 3d7ad9f5865..5ac051d2271 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -65,6 +65,14 @@ public class RawReaderImpl implements RawReader { consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist); } +public RawReaderImpl(PulsarClientImpl client, ConsumerConfigurationData consumerConfiguration, + CompletableFuture> consumerFuture, + boolean createTopicIfDoesNotExist) { +this.consumerConfiguration = consumerConfiguration; +consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist); +} + + @Override public String getTopic() { return consumerConfiguration.getTopicNames().stream() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java index d3fcc36a546..d9ddc00b2e8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java @@ -44,6 +44,9 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.policies.data.ClusterData; @@ -56,6 +59,8 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.apache.pulsar.client.impl.RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE; + @Test(groups = "broker-impl") @Slf4j public class RawReaderTest extends MockedPulsarServiceBaseTest { @@ -195,6 +200,36 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest { reader.closeAsync().get(3, TimeUnit.SECONDS); } +@Test +public void testRawReaderWithConfigurationCreation() throws Exception { +int numKeys = 10; + +String topic = "persistent://my-property/my-ns/" +
(pulsar) branch master updated: [improve] [broker] Support create RawReader based on configuration (#22280)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 34f8e0e9456 [improve] [broker] Support create RawReader based on configuration (#22280) 34f8e0e9456 is described below commit 34f8e0e9456674cd6459105cd7a3619b113b06cf Author: Hang Chen AuthorDate: Mon Mar 18 18:49:52 2024 +0800 [improve] [broker] Support create RawReader based on configuration (#22280) --- .../org/apache/pulsar/client/api/RawReader.java| 11 ++ .../apache/pulsar/client/impl/RawReaderImpl.java | 8 + .../apache/pulsar/client/impl/RawReaderTest.java | 39 -- 3 files changed, 56 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java index b7805c36b3b..55483708fdf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.RawReaderImpl; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; /** * Topic reader which receives raw messages (i.e. as they are stored in the managed ledger). @@ -43,6 +44,16 @@ public interface RawReader { return future.thenApply(__ -> r); } +static CompletableFuture create(PulsarClient client, + ConsumerConfigurationData consumerConfiguration, + boolean createTopicIfDoesNotExist) { +CompletableFuture> future = new CompletableFuture<>(); +RawReader r = new RawReaderImpl((PulsarClientImpl) client, +consumerConfiguration, future, createTopicIfDoesNotExist); +return future.thenApply(__ -> r); +} + + /** * Get the topic for the reader. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 3d7ad9f5865..5ac051d2271 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -65,6 +65,14 @@ public class RawReaderImpl implements RawReader { consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist); } +public RawReaderImpl(PulsarClientImpl client, ConsumerConfigurationData consumerConfiguration, + CompletableFuture> consumerFuture, + boolean createTopicIfDoesNotExist) { +this.consumerConfiguration = consumerConfiguration; +consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist); +} + + @Override public String getTopic() { return consumerConfiguration.getTopicNames().stream() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java index d3fcc36a546..d9ddc00b2e8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java @@ -44,6 +44,9 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.policies.data.ClusterData; @@ -56,6 +59,8 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.apache.pulsar.client.impl.RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE; + @Test(groups = "broker-impl") @Slf4j public class RawReaderTest extends MockedPulsarServiceBaseTest { @@ -195,6 +200,36 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest { reader.closeAsync().get(3, TimeUnit.SECONDS); } +@Test +public void testRawReaderWithConfigurationCreation() throws Exception { +int numKeys = 10; + +String topic = "persistent://my-property/my-ns/" +
(pulsar) branch branch-3.2 updated: [fix][ci] Enable CI for branch-3.2 (#22287)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 5fae440e58f [fix][ci] Enable CI for branch-3.2 (#22287) 5fae440e58f is described below commit 5fae440e58fab38ea57a6e7ce1927f01427f951d Author: Lishen Yao AuthorDate: Mon Mar 18 18:37:33 2024 +0800 [fix][ci] Enable CI for branch-3.2 (#22287) --- .github/workflows/pulsar-ci.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml index 02496a82392..ba043ded210 100644 --- a/.github/workflows/pulsar-ci.yaml +++ b/.github/workflows/pulsar-ci.yaml @@ -22,6 +22,7 @@ on: pull_request: branches: - master + - branch-3.2 schedule: # scheduled job with JDK 17 - cron: '0 12 * * *'