(pulsar) branch master updated: [improve] [pip] PIP-357: Correct the conf name in load balance module. (#22823)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new fb80007a47d [improve] [pip] PIP-357: Correct the conf name in load balance module. (#22823) fb80007a47d is described below commit fb80007a47deaadb82d0b1b1e4fcd6ca04c05c9c Author: Wenzhi Feng <52550727+thetumb...@users.noreply.github.com> AuthorDate: Thu Jun 6 16:16:14 2024 +0800 [improve] [pip] PIP-357: Correct the conf name in load balance module. (#22823) --- pip/pip-357.md | 35 +++ 1 file changed, 35 insertions(+) diff --git a/pip/pip-357.md b/pip/pip-357.md new file mode 100644 index 000..716a7d5f504 --- /dev/null +++ b/pip/pip-357.md @@ -0,0 +1,35 @@ +# PIP-357: Correct the conf name in load balance module. + +# Background knowledge + +We use `loadBalancerBandwithInResourceWeight` and `loadBalancerBandwithOutResourceWeight` to calculate the broker's load in the load balance module. However, the correct conf name should be `loadBalancerBandwidthInResourceWeight` and `loadBalancerBandwidthOutResourceWeight`. This PIP is to correct the conf name in the load balance module. + +# Motivation + +The current conf name is incorrect. + + +# Detailed Design + +- deprecated `loadBalancerBandwithInResourceWeight` and `loadBalancerBandwithOutResourceWeight` in the load balance module. +- add `loadBalancerBandwidthInResourceWeight` and `loadBalancerBandwidthOutResourceWeight` in the load balance module. + +In case of users upgrading to this version don't notice the change, we will still support the old conf name in following way: +- If a configuration is not the default configuration, use that configuration. +- If both the new and the old are configured different from the default value, use the new one. + +# Backward & Forward Compatibility + +Backward compatible, users can upgrade to this version without doing any changes and the old conf name will still work. +If user want to use the new conf name, they can change the conf name in the configuration file. +Just remember that if both the new and the old are configured different from the default value, the new one will be used. + +# General Notes + +# Links + + +* Mailing List discussion thread: https://lists.apache.org/thread/31wfq2hhprn4zknp4jv21lzf5809q6lf +* Mailing List voting thread: https://lists.apache.org/thread/0pggcploqw43mo134cwmk7b3p7t13848
(pulsar) branch branch-3.3 updated: [fix][broker] fix replicated subscriptions for transactional messages (#22452)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new dd408d4b9dc [fix][broker] fix replicated subscriptions for transactional messages (#22452) dd408d4b9dc is described below commit dd408d4b9dcd3c4985df4b9a060912acbc2e0596 Author: Wenzhi Feng <52550727+thetumb...@users.noreply.github.com> AuthorDate: Mon May 13 20:04:55 2024 +0800 [fix][broker] fix replicated subscriptions for transactional messages (#22452) --- .../broker/service/persistent/PersistentTopic.java | 21 ++- .../ReplicatedSubscriptionsController.java | 4 +- .../transaction/buffer/TransactionBuffer.java | 3 +- .../buffer/impl/InMemTransactionBuffer.java| 13 +- .../buffer/impl/TopicTransactionBuffer.java| 70 ++-- .../buffer/impl/TransactionBufferDisable.java | 13 +- .../pulsar/broker/service/PersistentTopicTest.java | 4 +- .../broker/service/ReplicatorSubscriptionTest.java | 25 +++ .../TransactionalReplicateSubscriptionTest.java| 182 + .../broker/transaction/TransactionProduceTest.java | 36 .../pulsar/broker/transaction/TransactionTest.java | 2 +- 11 files changed, 342 insertions(+), 31 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 0d5fc94f302..18e69250c16 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -134,6 +134,7 @@ import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.NamespaceStats; import org.apache.pulsar.broker.stats.ReplicationMetrics; import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; +import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.client.admin.LongRunningProcessStatus; @@ -272,10 +273,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Getter protected final TransactionBuffer transactionBuffer; +@Getter +private final TopicTransactionBuffer.MaxReadPositionCallBack maxReadPositionCallBack = +(oldPosition, newPosition) -> updateMaxReadPositionMovedForwardTimestamp(); -// Record the last time a data message (ie: not an internal Pulsar marker) is published on the topic +// Record the last time max read position is moved forward, unless it's a marker message. @Getter -private volatile long lastDataMessagePublishedTimestamp = 0; +private volatile long lastMaxReadPositionMovedForwardTimestamp = 0; @Getter private final ExecutorService orderedExecutor; @@ -410,7 +414,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } else { this.transactionBuffer = new TransactionBufferDisable(this); } -transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry()); +transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry(), true); if (ledger instanceof ShadowManagedLedgerImpl) { shadowSourceTopic = TopicName.get(ledger.getConfig().getShadowSource()); } else { @@ -719,6 +723,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } } +private void updateMaxReadPositionMovedForwardTimestamp() { +lastMaxReadPositionMovedForwardTimestamp = Clock.systemUTC().millis(); +} + @Override public void addComplete(Position pos, ByteBuf entryData, Object ctx) { PublishContext publishContext = (PublishContext) ctx; @@ -727,12 +735,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal // Message has been successfully persisted messageDeduplication.recordMessagePersisted(publishContext, position); -if (!publishContext.isMarkerMessage()) { -lastDataMessagePublishedTimestamp = Clock.systemUTC().millis(); -} - // in order to sync the max position when cursor read entries -transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry()); +transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry(), +publishContext.isMark
(pulsar) branch branch-3.3 updated: [improve][broker] Remove ClassLoaderSwitcher to avoid objects allocations and consistent the codestyle (#22796)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 3448f8127df [improve][broker] Remove ClassLoaderSwitcher to avoid objects allocations and consistent the codestyle (#22796) 3448f8127df is described below commit 3448f8127df9d0f3a9fe1c04f8b16924bbf6e302 Author: 道君 AuthorDate: Wed May 29 22:19:47 2024 +0800 [improve][broker] Remove ClassLoaderSwitcher to avoid objects allocations and consistent the codestyle (#22796) --- .../apache/pulsar/broker/ClassLoaderSwitcher.java | 37 -- .../servlet/AdditionalServletWithClassLoader.java | 25 +--- .../protocol/ProtocolHandlerWithClassLoader.java | 44 +- 3 files changed, 55 insertions(+), 51 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ClassLoaderSwitcher.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ClassLoaderSwitcher.java deleted file mode 100644 index 55cb9198da2..000 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ClassLoaderSwitcher.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker; - -/** - * Help to switch the class loader of current thread to the NarClassLoader, and change it back when it's done. - * With the help of try-with-resources statement, the code would be cleaner than using try finally every time. - */ -public class ClassLoaderSwitcher implements AutoCloseable { -private final ClassLoader prevClassLoader; - -public ClassLoaderSwitcher(ClassLoader classLoader) { -prevClassLoader = Thread.currentThread().getContextClassLoader(); -Thread.currentThread().setContextClassLoader(classLoader); -} - -@Override -public void close() { -Thread.currentThread().setContextClassLoader(prevClassLoader); -} -} \ No newline at end of file diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithClassLoader.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithClassLoader.java index c2b4b900733..bc1f25c5af9 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithClassLoader.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithClassLoader.java @@ -22,7 +22,6 @@ import java.io.IOException; import lombok.Data; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.ClassLoaderSwitcher; import org.apache.pulsar.common.configuration.PulsarConfiguration; import org.apache.pulsar.common.nar.NarClassLoader; import org.eclipse.jetty.servlet.ServletHolder; @@ -40,29 +39,45 @@ public class AdditionalServletWithClassLoader implements AdditionalServlet { @Override public void loadConfig(PulsarConfiguration pulsarConfiguration) { -try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { +ClassLoader prevClassLoader = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(classLoader); servlet.loadConfig(pulsarConfiguration); +} finally { +Thread.currentThread().setContextClassLoader(prevClassLoader); } } @Override public String getBasePath() { -try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { +ClassLoader prevClassLoader = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(classLoader); return servlet.getBasePath(); +} finally { +Thread.currentThread().setContextClassLoader(prevClassLoader); } } @Override public ServletHolder getServletHolder() { -try (ClassLoaderSwitcher ignored = new ClassLoader
(pulsar) branch branch-3.3 updated: [fix][cli] Fix expiration of tokens created with "pulsar tokens create" (#22815)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 40e7ebcf985 [fix][cli] Fix expiration of tokens created with "pulsar tokens create" (#22815) 40e7ebcf985 is described below commit 40e7ebcf9850c58a57fa1ef9d9ff87eec78c672e Author: entvex <1580435+ent...@users.noreply.github.com> AuthorDate: Mon Jun 3 18:33:44 2024 +0200 [fix][cli] Fix expiration of tokens created with "pulsar tokens create" (#22815) Co-authored-by: David Jensen --- .../pulsar/utils/auth/tokens/TokensCliUtils.java | 4 +- .../utils/auth/tokens/TokensCliUtilsTest.java | 58 ++ 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java index 4ae28b2c0bd..2a69d1d95a0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java @@ -39,7 +39,7 @@ import java.util.concurrent.Callable; import javax.crypto.SecretKey; import lombok.Cleanup; import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; -import org.apache.pulsar.cli.converters.picocli.TimeUnitToSecondsConverter; +import org.apache.pulsar.cli.converters.picocli.TimeUnitToMillisConverter; import org.apache.pulsar.docs.tools.CmdGenerateDocs; import picocli.CommandLine; import picocli.CommandLine.Command; @@ -127,7 +127,7 @@ public class TokensCliUtils { "--expiry-time"}, description = "Relative expiry time for the token (eg: 1h, 3d, 10y)." + " (m=minutes) Default: no expiration", -converter = TimeUnitToSecondsConverter.class) +converter = TimeUnitToMillisConverter.class) private Long expiryTime = null; @Option(names = {"-sk", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtilsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtilsTest.java index d5dc259438e..c541f8cee42 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtilsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtilsTest.java @@ -19,10 +19,17 @@ package org.apache.pulsar.utils.auth.tokens; import static org.testng.Assert.assertTrue; +import io.jsonwebtoken.Claims; +import io.jsonwebtoken.Jwts; +import io.jsonwebtoken.io.Decoders; import java.io.ByteArrayOutputStream; import java.io.PrintStream; import java.lang.reflect.Field; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Arrays; +import java.util.Date; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import picocli.CommandLine.Option; @@ -31,6 +38,57 @@ import picocli.CommandLine.Option; */ public class TokensCliUtilsTest { + +@DataProvider(name = "desiredExpireTime") +public Object[][] desiredExpireTime() { +return new Object[][] { +{"600", 600}, //10m +{"5m", 300}, +{"1h", 3600}, +{"1d", 86400}, +{"1w", 604800}, +{"1y", 31536000} +}; +} + +@Test(dataProvider = "desiredExpireTime") +public void commandCreateToken_WhenCreatingATokenWithExpiryTime_ShouldHaveTheDesiredExpireTime(String expireTime, int expireAsSec) throws Exception { +PrintStream oldStream = System.out; +try { +//Arrange +ByteArrayOutputStream baoStream = new ByteArrayOutputStream(); +System.setOut(new PrintStream(baoStream)); + +String[] command = {"create", "--secret-key", + "data:;base64,u+FxaxYWpsTfxeEmMh8fQeS3g2jfXw4+sGIv+PTY+BY=", +"--subject", "test", +"--expiry-time", expireTime, +}; + +new TokensCliUtils().execute(command); +String token = baoStream.toString(); + +Instant start = (new Date().toInstant().plus(expireAsSec - 5, ChronoUnit.SECONDS)); +Instant stop = (new Date().toInstant().plus(expireAsSec + 5, ChronoUnit.SECONDS)); + +//Act +Claims jwt = Jwts.parserBuilder() + .setSigningKey(Decoders.BASE64.decode("u+FxaxYWpsTfxeEmMh8fQeS3g2jfXw4+sGIv+PTY+BY=")) +.build() +
(pulsar) branch branch-3.3 updated: [improve] [test] Add a test to guarantee the TNX topics will not be replicated (#22721)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 2274e717813 [improve] [test] Add a test to guarantee the TNX topics will not be replicated (#22721) 2274e717813 is described below commit 2274e717813802ecec81b0c374792f2a4f678253 Author: fengyubiao AuthorDate: Fri May 17 01:18:49 2024 +0800 [improve] [test] Add a test to guarantee the TNX topics will not be replicated (#22721) --- .../broker/service/OneWayReplicatorTest.java | 9 - .../broker/service/OneWayReplicatorTestBase.java | 44 +++- .../pulsar/broker/service/ReplicationTxnTest.java | 262 + 3 files changed, 297 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 99fd4d877c1..fae72e8eac2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -96,15 +96,6 @@ public class OneWayReplicatorTest extends OneWayReplicatorTestBase { super.cleanup(); } -private void waitReplicatorStarted(String topicName) { -Awaitility.await().untilAsserted(() -> { -Optional topicOptional2 = pulsar2.getBrokerService().getTopic(topicName, false).get(); -assertTrue(topicOptional2.isPresent()); -PersistentTopic persistentTopic2 = (PersistentTopic) topicOptional2.get(); -assertFalse(persistentTopic2.getProducers().isEmpty()); -}); -} - private void waitReplicatorStopped(String topicName) { Awaitility.await().untilAsserted(() -> { Optional topicOptional2 = pulsar2.getBrokerService().getTopic(topicName, false).get(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index b4eed00c447..317e43306e3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; import com.google.common.collect.Sets; import java.net.URL; import java.time.Duration; @@ -29,6 +31,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.policies.data.ClusterData; @@ -55,7 +58,7 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport { protected ZookeeperServerTest brokerConfigZk1; protected LocalBookkeeperEnsemble bkEnsemble1; protected PulsarService pulsar1; -protected BrokerService ns1; +protected BrokerService broker1; protected PulsarAdmin admin1; protected PulsarClient client1; @@ -66,7 +69,7 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport { protected ZookeeperServerTest brokerConfigZk2; protected LocalBookkeeperEnsemble bkEnsemble2; protected PulsarService pulsar2; -protected BrokerService ns2; +protected BrokerService broker2; protected PulsarAdmin admin2; protected PulsarClient client2; @@ -89,23 +92,29 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport { setConfigDefaults(config1, cluster1, bkEnsemble1, brokerConfigZk1); pulsar1 = new PulsarService(config1); pulsar1.start(); -ns1 = pulsar1.getBrokerService(); - +broker1 = pulsar1.getBrokerService(); url1 = new URL(pulsar1.getWebServiceAddress()); urlTls1 = new URL(pulsar1.getWebServiceAddressTls()); -admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build(); -client1 = PulsarClient.builder().serviceUrl(url1.toString()).build(); // Start region 2 setConfigDefaults(config2, cluster2, bkEnsemble2, brokerConfigZk2); pulsar2 = new PulsarService(config2); pulsar2.start(); -ns2 = pulsar2.getBrokerService(); - +broker2 = pulsar2.getBrokerService(); url2 = new URL(pulsar2.getWebServiceA
(pulsar) branch branch-3.3 updated: [fix][broker] EntryFilters fix NoClassDefFoundError due to closed classloader (#22767)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 6b216adc5b3 [fix][broker] EntryFilters fix NoClassDefFoundError due to closed classloader (#22767) 6b216adc5b3 is described below commit 6b216adc5b37e8c51de49c1c8231ba02ca5a311d Author: Enrico Olivelli AuthorDate: Wed May 29 17:27:00 2024 +0200 [fix][broker] EntryFilters fix NoClassDefFoundError due to closed classloader (#22767) --- .../broker/service/plugin/EntryFilterProvider.java | 3 ++- .../service/plugin/EntryFilterWithClassLoader.java | 29 +- .../broker/service/plugin/FilterEntryTest.java | 12 - .../pulsar/broker/stats/ConsumerStatsTest.java | 2 +- .../pulsar/broker/stats/SubscriptionStatsTest.java | 2 +- .../apache/pulsar/common/nar/NarClassLoader.java | 16 6 files changed, 49 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java index f93e561542e..53418744b54 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java @@ -197,7 +197,8 @@ public class EntryFilterProvider implements AutoCloseable { + " does not implement entry filter interface"); } EntryFilter pi = (EntryFilter) filter; -return new EntryFilterWithClassLoader(pi, ncl); +// the classloader is shared with the broker, the instance doesn't own it +return new EntryFilterWithClassLoader(pi, ncl, false); } catch (Throwable e) { if (e instanceof IOException) { throw (IOException) e; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java index c5c57210877..aab46c62acd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java @@ -30,15 +30,23 @@ import org.apache.pulsar.common.nar.NarClassLoader; public class EntryFilterWithClassLoader implements EntryFilter { private final EntryFilter entryFilter; private final NarClassLoader classLoader; +private final boolean classLoaderOwned; -public EntryFilterWithClassLoader(EntryFilter entryFilter, NarClassLoader classLoader) { +public EntryFilterWithClassLoader(EntryFilter entryFilter, NarClassLoader classLoader, boolean classLoaderOwned) { this.entryFilter = entryFilter; this.classLoader = classLoader; +this.classLoaderOwned = classLoaderOwned; } @Override public FilterResult filterEntry(Entry entry, FilterContext context) { -return entryFilter.filterEntry(entry, context); +ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(classLoader); +return entryFilter.filterEntry(entry, context); +} finally { +Thread.currentThread().setContextClassLoader(currentClassLoader); +} } @VisibleForTesting @@ -48,11 +56,20 @@ public class EntryFilterWithClassLoader implements EntryFilter { @Override public void close() { -entryFilter.close(); +ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader(); try { -classLoader.close(); -} catch (IOException e) { -log.error("close EntryFilterWithClassLoader failed", e); +Thread.currentThread().setContextClassLoader(classLoader); +entryFilter.close(); +} finally { +Thread.currentThread().setContextClassLoader(currentClassLoader); +} +if (classLoaderOwned) { +log.info("Closing classloader {} for EntryFilter {}", classLoader, entryFilter.getClass().getName()); +try { +classLoader.close(); +} catch (IOException e) { +log.error("close EntryFilterWithClassLoader failed", e); +} } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java index 7b3daddcd9d..f7388ef9eb9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/br
(pulsar) branch branch-3.3 updated: [fix] [broker] maintain last active info in memory only. (#22794)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 527dcf8f81b [fix] [broker] maintain last active info in memory only. (#22794) 527dcf8f81b is described below commit 527dcf8f81b2cee1979ba4e7d188f882264ad273 Author: Wenzhi Feng <52550727+thetumb...@users.noreply.github.com> AuthorDate: Mon Jun 3 10:14:31 2024 +0800 [fix] [broker] maintain last active info in memory only. (#22794) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 - managed-ledger/src/main/proto/MLDataFormats.proto | 3 +- .../broker/service/persistent/PersistentTopic.java | 31 ++- .../pulsar/broker/service/BrokerServiceTest.java | 62 ++ 4 files changed, 82 insertions(+), 16 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 18d9cd7cb05..1d2065ef8e3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -475,9 +475,7 @@ public class ManagedCursorImpl implements ManagedCursor { ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback() { @Override public void operationComplete(ManagedCursorInfo info, Stat stat) { - updateCursorLedgerStat(info, stat); -lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive; if (log.isDebugEnabled()) { log.debug("[{}] [{}] Recover cursor last active to [{}]", ledger.getName(), name, lastActive); diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto index c4e502819fa..fdffed6762d 100644 --- a/managed-ledger/src/main/proto/MLDataFormats.proto +++ b/managed-ledger/src/main/proto/MLDataFormats.proto @@ -124,7 +124,8 @@ message ManagedCursorInfo { // the current cursor position repeated LongProperty properties = 5; -optional int64 lastActive = 6; +// deprecated, do not persist this field anymore +optional int64 lastActive = 6 [deprecated = true]; // Store which index in the batch message has been deleted repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 28bc27f7961..0d5fc94f302 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3222,19 +3222,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal final Integer nsExpirationTime = policies.subscription_expiration_time_minutes; final long expirationTimeMillis = TimeUnit.MINUTES .toMillis(nsExpirationTime == null ? defaultExpirationTime : nsExpirationTime); -if (expirationTimeMillis > 0) { -subscriptions.forEach((subName, sub) -> { -if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected() -|| sub.isReplicated() -|| isCompactionSubscription(subName)) { -return; -} -if (System.currentTimeMillis() - sub.cursor.getLastActive() > expirationTimeMillis) { -sub.delete().thenAccept(v -> log.info("[{}][{}] The subscription was deleted due to expiration " -+ "with last active [{}]", topic, subName, sub.cursor.getLastActive())); -} -}); -} +checkInactiveSubscriptions(expirationTimeMillis); } catch (Exception e) { if (log.isDebugEnabled()) { log.debug("[{}] Error getting policies", topic); @@ -3242,6 +3230,23 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } } +@VisibleForTesting +public void checkInactiveSubscriptions(long expirationTimeMillis) { +if (expirationTimeMillis > 0) { +subscriptions.forEach((subName, sub) -> { +if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected() +|| sub.isReplicated() +
(pulsar) branch branch-3.3 updated: [improve][broker] avoid creating new objects when intercepting (#22790)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 956164fe6c6 [improve][broker] avoid creating new objects when intercepting (#22790) 956164fe6c6 is described below commit 956164fe6c62a78c4bd178d553a13715b1b83de7 Author: Qiang Zhao AuthorDate: Tue May 28 22:45:30 2024 +0800 [improve][broker] avoid creating new objects when intercepting (#22790) --- .../BrokerInterceptorWithClassLoader.java | 127 + .../intercept/BrokerInterceptorUtilsTest.java | 2 +- .../BrokerInterceptorWithClassLoaderTest.java | 2 +- 3 files changed, 105 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java index faee5799289..3997e214f43 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java @@ -29,7 +29,6 @@ import lombok.Data; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; -import org.apache.pulsar.broker.ClassLoaderSwitcher; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Producer; @@ -51,16 +50,20 @@ import org.apache.pulsar.common.nar.NarClassLoader; public class BrokerInterceptorWithClassLoader implements BrokerInterceptor { private final BrokerInterceptor interceptor; -private final NarClassLoader classLoader; +private final NarClassLoader narClassLoader; @Override public void beforeSendMessage(Subscription subscription, Entry entry, long[] ackSet, MessageMetadata msgMetadata) { -try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { +final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.beforeSendMessage( subscription, entry, ackSet, msgMetadata); +} finally { +Thread.currentThread().setContextClassLoader(previousContext); } } @@ -70,25 +73,37 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor { long[] ackSet, MessageMetadata msgMetadata, Consumer consumer) { -try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { +final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.beforeSendMessage( subscription, entry, ackSet, msgMetadata, consumer); +} finally { +Thread.currentThread().setContextClassLoader(previousContext); } } @Override public void onMessagePublish(Producer producer, ByteBuf headersAndPayload, Topic.PublishContext publishContext) { -try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { +final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.onMessagePublish(producer, headersAndPayload, publishContext); +} finally { +Thread.currentThread().setContextClassLoader(previousContext); } } @Override public void producerCreated(ServerCnx cnx, Producer producer, Map metadata){ -try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { +final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.producerCreated(cnx, producer, metadata); +} finally { +Thread.currentThread().setContextClassLoader(previousContext); } } @@ -96,8 +111,12 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor { public void producerClosed(ServerCnx cnx, Producer producer, Map metadata) { -try (ClassLoaderSwitcher
(pulsar) branch branch-3.3 updated: [fix] [broker] disable loadBalancerDirectMemoryResourceWeight by default (#22821)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 8dc5f2ef687 [fix] [broker] disable loadBalancerDirectMemoryResourceWeight by default (#22821) 8dc5f2ef687 is described below commit 8dc5f2ef687a8c234bb7e2071e13274180017e9c Author: Wenzhi Feng <52550727+thetumb...@users.noreply.github.com> AuthorDate: Tue Jun 4 17:11:34 2024 +0800 [fix] [broker] disable loadBalancerDirectMemoryResourceWeight by default (#22821) --- conf/standalone.conf | 2 +- deployment/terraform-ansible/templates/broker.conf | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/conf/standalone.conf b/conf/standalone.conf index a8615b70293..758a79d0fa1 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -925,7 +925,7 @@ loadBalancerMemoryResourceWeight=1.0 # The direct memory usage weight when calculating new resource usage. # It only takes effect in the ThresholdShedder strategy. -loadBalancerDirectMemoryResourceWeight=1.0 +loadBalancerDirectMemoryResourceWeight=0 # Bundle unload minimum throughput threshold (MB), avoiding bundle unload frequently. # It only takes effect in the ThresholdShedder strategy. diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf index 291cdd92147..43bbdc0d52d 100644 --- a/deployment/terraform-ansible/templates/broker.conf +++ b/deployment/terraform-ansible/templates/broker.conf @@ -971,7 +971,7 @@ loadBalancerMemoryResourceWeight=1.0 # The direct memory usage weight when calculating new resource usage. # It only take effect in ThresholdShedder strategy. -loadBalancerDirectMemoryResourceWeight=1.0 +loadBalancerDirectMemoryResourceWeight=0 # Bundle unload minimum throughput threshold (MB), avoiding bundle unload frequently. # It only take effect in ThresholdShedder strategy.
(pulsar) branch branch-3.3 updated: [fix] [conf] fix configuration name and typo. (#22822)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 13ee738fa2d [fix] [conf] fix configuration name and typo. (#22822) 13ee738fa2d is described below commit 13ee738fa2d7268440c2d217f927b0810911f600 Author: Wenzhi Feng <52550727+thetumb...@users.noreply.github.com> AuthorDate: Tue Jun 4 17:12:31 2024 +0800 [fix] [conf] fix configuration name and typo. (#22822) --- deployment/terraform-ansible/templates/broker.conf | 20 ++-- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf index fe3bae6bb15..291cdd92147 100644 --- a/deployment/terraform-ansible/templates/broker.conf +++ b/deployment/terraform-ansible/templates/broker.conf @@ -320,7 +320,7 @@ dispatcherMinReadBatchSize=1 # Max number of entries to dispatch for a shared subscription. By default it is 20 entries. dispatcherMaxRoundRobinBatchSize=20 -# Precise dispathcer flow control according to history message number of each entry +# Precise dispatcher flow control according to history message number of each entry preciseDispatcherFlowControl=false # Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic @@ -638,7 +638,7 @@ bookkeeperMetadataServiceUri= # Authentication plugin to use when connecting to bookies bookkeeperClientAuthenticationPlugin= -# BookKeeper auth plugin implementatation specifics parameters name and values +# BookKeeper auth plugin implementation specifics parameters name and values bookkeeperClientAuthenticationParametersName= bookkeeperClientAuthenticationParameters= @@ -944,7 +944,7 @@ defaultNamespaceBundleSplitAlgorithm=range_equally_divide loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder # The broker resource usage threshold. -# When the broker resource usage is gratter than the pulsar cluster average resource usge, +# When the broker resource usage is greater than the pulsar cluster average resource usge, # the threshold shedder will be triggered to offload bundles from the broker. # It only take effect in ThresholdShedder strategy. loadBalancerBrokerThresholdShedderPercentage=10 @@ -953,27 +953,27 @@ loadBalancerBrokerThresholdShedderPercentage=10 # It only take effect in ThresholdShedder strategy. loadBalancerHistoryResourcePercentage=0.9 -# The BandWithIn usage weight when calculating new resourde usage. +# The BandWithIn usage weight when calculating new resource usage. # It only take effect in ThresholdShedder strategy. loadBalancerBandwithInResourceWeight=1.0 -# The BandWithOut usage weight when calculating new resourde usage. +# The BandWithOut usage weight when calculating new resource usage. # It only take effect in ThresholdShedder strategy. loadBalancerBandwithOutResourceWeight=1.0 -# The CPU usage weight when calculating new resourde usage. +# The CPU usage weight when calculating new resource usage. # It only take effect in ThresholdShedder strategy. loadBalancerCPUResourceWeight=1.0 -# The heap memory usage weight when calculating new resourde usage. +# The heap memory usage weight when calculating new resource usage. # It only take effect in ThresholdShedder strategy. loadBalancerMemoryResourceWeight=1.0 -# The direct memory usage weight when calculating new resourde usage. +# The direct memory usage weight when calculating new resource usage. # It only take effect in ThresholdShedder strategy. loadBalancerDirectMemoryResourceWeight=1.0 -# Bundle unload minimum throughput threshold (MB), avoding bundle unload frequently. +# Bundle unload minimum throughput threshold (MB), avoiding bundle unload frequently. # It only take effect in ThresholdShedder strategy. loadBalancerBundleUnloadMinThroughputThreshold=10 @@ -995,7 +995,7 @@ replicatorPrefix=pulsar.repl # Duration to check replication policy to avoid replicator inconsistency # due to missing ZooKeeper watch (disable with value 0) -replicatioPolicyCheckDurationSeconds=600 +replicationPolicyCheckDurationSeconds=600 # Default message retention time. 0 means retention is disabled. -1 means data is not removed by time quota defaultRetentionTimeInMinutes=0
(pulsar) branch branch-3.3 updated: [improve] Validate range of argument before long -> int conversion (#22830)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new cabed9a92da [improve] Validate range of argument before long -> int conversion (#22830) cabed9a92da is described below commit cabed9a92da6992fa706aeee4422be385ab9a2a6 Author: Matteo Merli AuthorDate: Tue Jun 4 03:12:33 2024 -0700 [improve] Validate range of argument before long -> int conversion (#22830) --- .../org/apache/pulsar/broker/admin/impl/TransactionsBase.java| 9 - 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java index 1014c9fe8e3..4fef0802ed4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java @@ -564,8 +564,15 @@ public abstract class TransactionsBase extends AdminResource { protected CompletableFuture internalAbortTransaction(boolean authoritative, long mostSigBits, long leastSigBits) { + +if (mostSigBits < 0 || mostSigBits > Integer.MAX_VALUE) { +return CompletableFuture.failedFuture(new IllegalArgumentException("mostSigBits out of bounds")); +} + +int partitionIdx = (int) mostSigBits; + return validateTopicOwnershipAsync( - SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) mostSigBits), authoritative) + SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(partitionIdx), authoritative) .thenCompose(__ -> validateSuperUserAccessAsync()) .thenCompose(__ -> pulsar().getTransactionMetadataStoreService() .endTransaction(new TxnID(mostSigBits, leastSigBits), TxnAction.ABORT_VALUE, false));
(pulsar) branch branch-3.3 updated: [fix][meta] Check if metadata store is closed in RocksdbMetadataStore (#22852)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 37f6239b5c0 [fix][meta] Check if metadata store is closed in RocksdbMetadataStore (#22852) 37f6239b5c0 is described below commit 37f6239b5c0e9f03169ad02bae0e0299e6107342 Author: Lari Hotari AuthorDate: Wed Jun 5 22:02:43 2024 +0300 [fix][meta] Check if metadata store is closed in RocksdbMetadataStore (#22852) --- .../metadata/impl/AbstractMetadataStore.java | 25 +++--- .../pulsar/metadata/impl/RocksdbMetadataStore.java | 15 + 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index 0a356643914..77fd21f1342 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -257,8 +257,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co @Override public CompletableFuture> get(String path) { if (isClosed()) { -return FutureUtil.failedFuture( -new MetadataStoreException.AlreadyClosedException()); +return alreadyClosedFailedFuture(); } long start = System.currentTimeMillis(); if (!isValidPath(path)) { @@ -286,8 +285,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co @Override public final CompletableFuture> getChildren(String path) { if (isClosed()) { -return FutureUtil.failedFuture( -new MetadataStoreException.AlreadyClosedException()); +return alreadyClosedFailedFuture(); } if (!isValidPath(path)) { return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); @@ -298,8 +296,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co @Override public final CompletableFuture exists(String path) { if (isClosed()) { -return FutureUtil.failedFuture( -new MetadataStoreException.AlreadyClosedException()); +return alreadyClosedFailedFuture(); } if (!isValidPath(path)) { return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); @@ -361,8 +358,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co @Override public final CompletableFuture delete(String path, Optional expectedVersion) { if (isClosed()) { -return FutureUtil.failedFuture( -new MetadataStoreException.AlreadyClosedException()); +return alreadyClosedFailedFuture(); } long start = System.currentTimeMillis(); if (!isValidPath(path)) { @@ -411,8 +407,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co @Override public CompletableFuture deleteRecursive(String path) { if (isClosed()) { -return FutureUtil.failedFuture( -new MetadataStoreException.AlreadyClosedException()); +return alreadyClosedFailedFuture(); } return getChildren(path) .thenCompose(children -> FutureUtil.waitForAll( @@ -436,8 +431,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co public final CompletableFuture put(String path, byte[] data, Optional optExpectedVersion, EnumSet options) { if (isClosed()) { -return FutureUtil.failedFuture( -new MetadataStoreException.AlreadyClosedException()); +return alreadyClosedFailedFuture(); } long start = System.currentTimeMillis(); if (!isValidPath(path)) { @@ -517,10 +511,15 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co } } -private boolean isClosed() { +protected boolean isClosed() { return isClosed.get(); } +protected static CompletableFuture alreadyClosedFailedFuture() { +return FutureUtil.failedFuture( +new MetadataStoreException.AlreadyClosedException()); +} + @Override public void close() throws Exception { executor.shutdownNow(); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java index 39f7edd5cee..06f7b260536 100644 --- a/pulsar-met
(pulsar) branch branch-3.3 updated: [improve] [broker] Do not call cursor.isCursorDataFullyPersistable if disabled dispatcherPauseOnAckStatePersistentEnabled (#22729)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new b33fc099e7d [improve] [broker] Do not call cursor.isCursorDataFullyPersistable if disabled dispatcherPauseOnAckStatePersistentEnabled (#22729) b33fc099e7d is described below commit b33fc099e7d663e1b93c6ca8ec95007187dbf8e7 Author: fengyubiao AuthorDate: Fri May 17 13:50:48 2024 +0800 [improve] [broker] Do not call cursor.isCursorDataFullyPersistable if disabled dispatcherPauseOnAckStatePersistentEnabled (#22729) --- .../PersistentDispatcherMultipleConsumers.java | 9 .../api/SubscriptionPauseOnAckStatPersistTest.java | 50 ++ 2 files changed, 59 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 49a19c0fe31..f20750fa0c2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -1087,6 +1087,15 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul @Override public boolean checkAndResumeIfPaused() { boolean paused = blockedDispatcherOnCursorDataCanNotFullyPersist == TRUE; +// Calling "cursor.isCursorDataFullyPersistable()" will loop the collection "individualDeletedMessages". It is +// not a light method. +// If never enabled "dispatcherPauseOnAckStatePersistentEnabled", skip the following checks to improve +// performance. +if (!paused && !topic.isDispatcherPauseOnAckStatePersistentEnabled()){ +// "true" means no need to pause. +return true; +} +// Enabled "dispatcherPauseOnAckStatePersistentEnabled" before. boolean shouldPauseNow = !cursor.isCursorDataFullyPersistable() && topic.isDispatcherPauseOnAckStatePersistentEnabled(); // No need to change. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java index 9a4de8ecf21..36c36735c06 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java @@ -23,8 +23,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService; @@ -38,6 +42,7 @@ import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -549,4 +554,49 @@ public class SubscriptionPauseOnAckStatPersistTest extends ProducerConsumerBase c4.close(); admin.topics().delete(tpName, false); } + +@Test(dataProvider = "multiConsumerSubscriptionTypes") +public void testNeverCallCursorIsCursorDataFullyPersistableIfDisabledTheFeature(SubscriptionType subscriptionType) +throws Exception { +final String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); +final String mlName = TopicName.get(tpName).getPersistenceNamingEncoding(); +final String subscription = "s1"; +final int msgSendCount = 100; +// Inject a injection to record the counter of calling "cursor.isCursorDataFullyPersistable". +final ManagedLedgerImpl ml = (ManagedLedgerImpl) pulsar.getBrokerService().getManagedLedgerFactory().open(mlName); +final ManagedCursorImpl cursor = (ManagedCursorImpl) ml.openCursor(subscription); +final ManagedCursorImpl spyCursor = M
(pulsar) branch branch-3.3 updated: [fix][ml]: subscription props could be lost in case of missing ledger during recovery (#22637)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 71d62b2613b [fix][ml]: subscription props could be lost in case of missing ledger during recovery (#22637) 71d62b2613b is described below commit 71d62b2613bbb7f378848e308e1616b65d223169 Author: Nicolò Boschi AuthorDate: Mon May 20 18:12:21 2024 +0200 [fix][ml]: subscription props could be lost in case of missing ledger during recovery (#22637) --- .../main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 35000361eca..18d9cd7cb05 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -543,7 +543,7 @@ public class ManagedCursorImpl implements ManagedCursor { log.error("[{}] Error opening metadata ledger {} for cursor {}: {}", ledger.getName(), ledgerId, name, BKException.getMessage(rc)); // Rewind to oldest entry available -initialize(getRollbackPosition(info), Collections.emptyMap(), Collections.emptyMap(), callback); +initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); return; } else if (rc != BKException.Code.OK) { log.warn("[{}] Error opening metadata ledger {} for cursor {}: {}", ledger.getName(), ledgerId, name,
(pulsar) branch branch-3.3 updated: [improve][broker] Close protocol handlers before unloading namespace bundles (#22728)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new d608073edab [improve][broker] Close protocol handlers before unloading namespace bundles (#22728) d608073edab is described below commit d608073edab5938f0274193b676f6fced21a82f2 Author: Yunze Xu AuthorDate: Tue May 21 16:26:36 2024 +0800 [improve][broker] Close protocol handlers before unloading namespace bundles (#22728) --- .../org/apache/pulsar/broker/PulsarService.java| 12 +- .../channel/ServiceUnitStateChannelImpl.java | 2 +- .../broker/protocol/PulsarClientBasedHandler.java | 152 + .../protocol/PulsarClientBasedHandlerTest.java | 87 .../protocol/SimpleProtocolHandlerTestsBase.java | 16 ++- 5 files changed, 258 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index ac37aca531a..db1e1dbe402 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -442,6 +442,12 @@ public class PulsarService implements AutoCloseable, ShutdownService { public CompletableFuture closeAsync() { mutex.lock(); try { +// Close protocol handler before unloading namespace bundles because protocol handlers might maintain +// Pulsar clients that could send lookup requests that affect unloading. +if (protocolHandlers != null) { +protocolHandlers.close(); +protocolHandlers = null; +} if (closeFuture != null) { return closeFuture; } @@ -449,6 +455,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { if (brokerService != null) { brokerService.unloadNamespaceBundlesGracefully(); } +// It only tells the Pulsar clients that this service is not ready to serve for the lookup requests state = State.Closing; // close the service in reverse order v.s. in which they are started @@ -510,11 +517,6 @@ public class PulsarService implements AutoCloseable, ShutdownService { (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * getConfiguration() .getBrokerShutdownTimeoutMs(; -// close protocol handler before closing broker service -if (protocolHandlers != null) { -protocolHandlers.close(); -protocolHandlers = null; -} // cancel loadShedding task and shutdown the loadManager executor before shutting down the broker cancelLoadBalancerTasks(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index bf6266482f8..5e3d1c78361 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -114,7 +114,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { public static final CompressionType MSG_COMPRESSION_TYPE = CompressionType.ZSTD; private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000; private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 100; -private static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000; +public static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000; public static final long VERSION_ID_INIT = 1; // initial versionId public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs to clean immediately diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java new file mode 100644 index 000..ed9881a8cad --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work
(pulsar) branch branch-3.3 updated: [improve][offload] Allow to disable the managedLedgerOffloadDeletionLagInMillis (#22689)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 4e467091ddf [improve][offload] Allow to disable the managedLedgerOffloadDeletionLagInMillis (#22689) 4e467091ddf is described below commit 4e467091ddfbb9f164cf45715dceb3b3d72ea974 Author: Yong Zhang AuthorDate: Thu May 16 20:54:32 2024 +0800 [improve][offload] Allow to disable the managedLedgerOffloadDeletionLagInMillis (#22689) --- .../java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java| 3 ++- .../org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java | 4 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 167c0a1bad3..1e3e7e065d7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2592,6 +2592,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { long elapsedMs = clock.millis() - offload.getTimestamp(); return offloadPolicies.filter(policies -> offload.getComplete() && !offload.getBookkeeperDeleted() && policies.getManagedLedgerOffloadDeletionLagInMillis() != null +&& policies.getManagedLedgerOffloadDeletionLagInMillis() >= 0 && elapsedMs > policies.getManagedLedgerOffloadDeletionLagInMillis()).isPresent(); } @@ -4574,4 +4575,4 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } return theSlowestNonDurableReadPosition; } -} \ No newline at end of file +} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java index 56da315553e..b46f06106cf 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java @@ -383,6 +383,10 @@ public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase { needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies)); Assert.assertTrue(needsDelete); +offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(-1L); +needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies)); +Assert.assertFalse(needsDelete); + offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(1000L * 2); needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies)); Assert.assertFalse(needsDelete);
(pulsar) branch branch-3.3 updated: [fix][broker] usedLocallySinceLastReport should always be reset (#22672)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 00675bcea1f [fix][broker] usedLocallySinceLastReport should always be reset (#22672) 00675bcea1f is described below commit 00675bcea1facaf1fe6942476186f3449f5c8419 Author: Zixuan Liu AuthorDate: Thu May 9 09:42:17 2024 +0800 [fix][broker] usedLocallySinceLastReport should always be reset (#22672) Signed-off-by: Zixuan Liu --- .../pulsar/broker/resourcegroup/ResourceGroup.java | 3 +- .../ResourceGroupReportLocalUsageTest.java | 50 ++ 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java index f8ec52bfe3c..541a645f18b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java @@ -458,14 +458,13 @@ public class ResourceGroup { bytesUsed = monEntity.usedLocallySinceLastReport.bytes; messagesUsed = monEntity.usedLocallySinceLastReport.messages; - +monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0; if (sendReport) { p.setBytesPerPeriod(bytesUsed); p.setMessagesPerPeriod(messagesUsed); monEntity.lastReportedValues.bytes = bytesUsed; monEntity.lastReportedValues.messages = messagesUsed; monEntity.numSuppressedUsageReports = 0; -monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0; monEntity.totalUsedLocally.bytes += bytesUsed; monEntity.totalUsedLocally.messages += messagesUsed; monEntity.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java index 658b7c94165..139d19886c7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java @@ -72,34 +72,50 @@ public class ResourceGroupReportLocalUsageTest extends MockedPulsarServiceBaseTe rgConfig.setPublishRateInMsgs(2000); service.resourceGroupCreate(rgName, rgConfig); -org.apache.pulsar.broker.resourcegroup.ResourceGroup resourceGroup = service.resourceGroupGet(rgName); BytesAndMessagesCount bytesAndMessagesCount = new BytesAndMessagesCount(); bytesAndMessagesCount.bytes = 20; bytesAndMessagesCount.messages = 10; - resourceGroup.incrementLocalUsageStats(ResourceGroupMonitoringClass.Publish, bytesAndMessagesCount); + +org.apache.pulsar.broker.resourcegroup.ResourceGroup resourceGroup = service.resourceGroupGet(rgName); +for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) { +resourceGroup.incrementLocalUsageStats(value, bytesAndMessagesCount); +} + +// Case1: Suppress report ResourceUsage. +needReport.set(false); ResourceUsage resourceUsage = new ResourceUsage(); resourceGroup.rgFillResourceUsage(resourceUsage); assertFalse(resourceUsage.hasDispatch()); assertFalse(resourceUsage.hasPublish()); +for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) { +PerMonitoringClassFields monitoredEntity = +resourceGroup.getMonitoredEntity(value); +assertEquals(monitoredEntity.usedLocallySinceLastReport.messages, 0); +assertEquals(monitoredEntity.usedLocallySinceLastReport.bytes, 0); +assertEquals(monitoredEntity.totalUsedLocally.messages, 0); +assertEquals(monitoredEntity.totalUsedLocally.bytes, 0); +assertEquals(monitoredEntity.lastReportedValues.messages, 0); +assertEquals(monitoredEntity.lastReportedValues.bytes, 0); +} -PerMonitoringClassFields publishMonitoredEntity = - resourceGroup.getMonitoredEntity(ResourceGroupMonitoringClass.Publish); - assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, bytesAndMessagesCount.messages); -assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, bytesAndMessagesCount.bytes); -assertEquals
(pulsar) branch branch-3.3 updated: [fix] [broker] rename to changeMaxReadPositionCount (#22656)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 4a1779e2b36 [fix] [broker] rename to changeMaxReadPositionCount (#22656) 4a1779e2b36 is described below commit 4a1779e2b369867e47979362a576fd81b25c41e3 Author: Wenzhi Feng <52550727+thetumb...@users.noreply.github.com> AuthorDate: Wed May 8 19:34:00 2024 +0800 [fix] [broker] rename to changeMaxReadPositionCount (#22656) --- .../transaction/buffer/impl/TopicTransactionBuffer.java | 16 .../pulsar/broker/transaction/TransactionTest.java | 12 ++-- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index a36216bd625..81c9ecfc728 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -76,8 +76,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen */ private final LinkedMap ongoingTxns = new LinkedMap<>(); -// when add abort or change max read position, the count will +1. Take snapshot will set 0 into it. -private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new AtomicLong(); +// when change max read position, the count will +1. Take snapshot will reset the count. +private final AtomicLong changeMaxReadPositionCount = new AtomicLong(); private final LongAdder txnCommittedCounter = new LongAdder(); @@ -429,15 +429,15 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen } private void takeSnapshotByChangeTimes() { -if (changeMaxReadPositionAndAddAbortTimes.get() >= takeSnapshotIntervalNumber) { -this.changeMaxReadPositionAndAddAbortTimes.set(0); +if (changeMaxReadPositionCount.get() >= takeSnapshotIntervalNumber) { +this.changeMaxReadPositionCount.set(0); this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition); } } private void takeSnapshotByTimeout() { -if (changeMaxReadPositionAndAddAbortTimes.get() > 0) { -this.changeMaxReadPositionAndAddAbortTimes.set(0); +if (changeMaxReadPositionCount.get() > 0) { +this.changeMaxReadPositionCount.set(0); this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition); } this.timer.newTimeout(TopicTransactionBuffer.this, @@ -454,7 +454,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(); } if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) { -this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement(); +this.changeMaxReadPositionCount.getAndIncrement(); } } @@ -489,7 +489,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen } else if (checkIfReady()) { if (ongoingTxns.isEmpty()) { maxReadPosition = position; -changeMaxReadPositionAndAddAbortTimes.incrementAndGet(); +changeMaxReadPositionCount.incrementAndGet(); } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index e8c15d193a2..5e806bb9cee 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1095,10 +1095,10 @@ public class TransactionTest extends TransactionTestBase { } @Test -public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() throws Exception { +public void testNotChangeMaxReadPositionCountWhenCheckIfNoSnapshot() throws Exception { PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0) .getBrokerService() -.getTopic(NAMESPACE1 + "/changeMaxReadPositionAndAddAbortTimes" + UUID.randomUUID(), true) +.getTopic(NAMESPACE1 + "/changeMaxReadPositionCount" + UUID.randomUUID(), true) .get().get(); TransactionBuffer buffer = persistentTopic.getTransa
(pulsar) branch branch-3.3 updated: [fix][sec] Upgrade postgresql version to avoid CVE-2024-1597 (#22635)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 72641cb1888 [fix][sec] Upgrade postgresql version to avoid CVE-2024-1597 (#22635) 72641cb1888 is described below commit 72641cb18883bc03041029e656b805bb94463a79 Author: Nikhil Erigila <60037808+nikhilerigil...@users.noreply.github.com> AuthorDate: Fri May 10 16:41:20 2024 +0530 [fix][sec] Upgrade postgresql version to avoid CVE-2024-1597 (#22635) --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index af45b297b8c..23e3d3a15e2 100644 --- a/pom.xml +++ b/pom.xml @@ -190,7 +190,7 @@ flexible messaging model and an intuitive client API. 5.1.0 3.42.0.0 8.0.11 -42.5.1 +42.5.5 0.4.6 2.7.5 0.4.4-hotfix1 @@ -199,7 +199,7 @@ flexible messaging model and an intuitive client API. 1.2.4 8.12.1 1.9.7.Final -42.5.0 +42.5.5 8.0.30 1.15.16.Final
(pulsar) branch branch-3.3 updated: [improve] [broker] Add additionalSystemCursorNames ignore list for TTL check (#22614)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 79de2daf507 [improve] [broker] Add additionalSystemCursorNames ignore list for TTL check (#22614) 79de2daf507 is described below commit 79de2daf50700dfb0ecd4fe00ee4af6af775a288 Author: Hang Chen AuthorDate: Thu May 9 20:45:56 2024 +0800 [improve] [broker] Add additionalSystemCursorNames ignore list for TTL check (#22614) --- conf/broker.conf | 4 + conf/standalone.conf | 4 + .../apache/pulsar/broker/ServiceConfiguration.java | 7 ++ .../broker/service/persistent/PersistentTopic.java | 7 +- .../pulsar/broker/service/MessageTTLTest.java | 96 ++ 5 files changed, 117 insertions(+), 1 deletion(-) diff --git a/conf/broker.conf b/conf/broker.conf index 1b51ff47551..1ef68a0395c 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -180,6 +180,10 @@ backlogQuotaDefaultRetentionPolicy=producer_request_hold # Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0) ttlDurationDefaultInSeconds=0 +# Additional system subscriptions that will be ignored by ttl check. The cursor names are comma separated. +# Default is empty. +# additionalSystemCursorNames= + # Enable topic auto creation if new producer or consumer connected (disable auto creation with value false) allowAutoTopicCreation=true diff --git a/conf/standalone.conf b/conf/standalone.conf index 51035235d4d..a8615b70293 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -121,6 +121,10 @@ backlogQuotaDefaultLimitSecond=-1 # Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0) ttlDurationDefaultInSeconds=0 +# Additional system subscriptions that will be ignored by ttl check. The cursor names are comma separated. +# Default is empty. +# additionalSystemCursorNames= + # Enable the deletion of inactive topics. This parameter need to cooperate with the allowAutoTopicCreation parameter. # If brokerDeleteInactiveTopicsEnabled is set to true, we should ensure that allowAutoTopicCreation is also set to true. brokerDeleteInactiveTopicsEnabled=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index a9d170ea5de..9efe1856509 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -652,6 +652,13 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int ttlDurationDefaultInSeconds = 0; +@FieldContext( +category = CATEGORY_POLICIES, +doc = "Additional system subscriptions that will be ignored by ttl check. " ++ "The cursor names are comma separated. Default is empty." +) +private Set additionalSystemCursorNames = new TreeSet<>(); + @FieldContext( category = CATEGORY_POLICIES, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 7228bdeb2d3..28bc27f7961 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -40,6 +40,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -279,6 +280,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal private final ExecutorService orderedExecutor; private volatile CloseFutures closeFutures; +private Set additionalSystemCursorNames = new TreeSet<>(); @Getter private final PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics(); @@ -414,6 +416,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } else { shadowSourceTopic = null; } +additionalSystemCursorNames = brokerService.pulsar().getConfiguration().getAdditionalSystemCursorNames(); } @Override @@ -1934,7 +1937,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal int messageTtlInSeconds = topi
(pulsar) branch master updated (7b8f4a9159c -> 6701936939f)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 7b8f4a9159c [fix][ci] Fix snappy-java native lib fails to load in x86 alpine (#22804) add 6701936939f [fix] [broker] Fix doc of ThresholdShedder and remove useless method. (#22798) No new revisions were added by this update. Summary of changes: .../apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java | 2 +- .../pulsar/policies/data/loadbalancer/LocalBrokerData.java | 9 + 2 files changed, 2 insertions(+), 9 deletions(-)
(pulsar) branch master updated: [fix][broker] fix can not cleanup heartbeat data if scaling down broker (#22750)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 34898e36576 [fix][broker] fix can not cleanup heartbeat data if scaling down broker (#22750) 34898e36576 is described below commit 34898e365764f4dd2f1cfbcb9d7381b8e4f104e9 Author: ken <1647023...@qq.com> AuthorDate: Thu May 30 16:38:08 2024 +0800 [fix][broker] fix can not cleanup heartbeat data if scaling down broker (#22750) Co-authored-by: fanjianye --- .../org/apache/pulsar/broker/PulsarService.java| 42 ++ .../pulsar/broker/admin/impl/BrokersBase.java | 12 --- 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 6482ead1f5a..722bfda426d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.broker.admin.impl.BrokersBase.getHeartbeatTopicName; import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER; import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName; import com.google.common.annotations.VisibleForTesting; @@ -72,6 +73,7 @@ import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; import org.apache.bookkeeper.mledger.LedgerOffloaderStats; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.bookkeeper.mledger.offload.Offloaders; @@ -414,6 +416,41 @@ public class PulsarService implements AutoCloseable, ShutdownService { } } +private boolean isManagedLedgerNotFoundException(Throwable e) { +Throwable realCause = e.getCause(); +return realCause instanceof ManagedLedgerException.MetadataNotFoundException +|| realCause instanceof MetadataStoreException.NotFoundException; +} + +private void deleteHeartbeatResource() { +if (this.brokerService != null) { +LOG.info("forcefully delete heartbeat topic when close broker"); + +String heartbeatTopicNameV1 = getHeartbeatTopicName(getBrokerId(), getConfiguration(), false); +String heartbeatTopicNameV2 = getHeartbeatTopicName(getBrokerId(), getConfiguration(), true); + +try { +this.brokerService.deleteTopic(heartbeatTopicNameV1, true).get(); +} catch (Exception e) { +if (!isManagedLedgerNotFoundException(e)) { +LOG.error("Closed with errors in delete heartbeat topic [{}]", +heartbeatTopicNameV1, e); +} +} + +try { +this.brokerService.deleteTopic(heartbeatTopicNameV2, true).get(); +} catch (Exception e) { +if (!isManagedLedgerNotFoundException(e)) { +LOG.error("Closed with errors in delete heartbeat topic [{}]", +heartbeatTopicNameV2, e); +} +} + +LOG.info("finish forcefully delete heartbeat topic when close broker"); +} +} + @Override public void close() throws PulsarServerException { try { @@ -460,6 +497,11 @@ public class PulsarService implements AutoCloseable, ShutdownService { // It only tells the Pulsar clients that this service is not ready to serve for the lookup requests state = State.Closing; +if (brokerId != null) { +// forcefully delete heartbeat topic when close broker +deleteHeartbeatResource(); +} + // close the service in reverse order v.s. in which they are started if (this.resourceUsageTransportManager != null) { try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 7eeea66db71..9db17f76a8d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src
(pulsar) branch branch-3.2 updated: [improve][broker] Remove ClassLoaderSwitcher to avoid objects allocations and consistent the codestyle (#22796)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new a4b27cac0a4 [improve][broker] Remove ClassLoaderSwitcher to avoid objects allocations and consistent the codestyle (#22796) a4b27cac0a4 is described below commit a4b27cac0a4619859d95990db911e172311c6b3a Author: 道君 AuthorDate: Wed May 29 22:19:47 2024 +0800 [improve][broker] Remove ClassLoaderSwitcher to avoid objects allocations and consistent the codestyle (#22796) --- .../apache/pulsar/broker/ClassLoaderSwitcher.java | 37 -- .../servlet/AdditionalServletWithClassLoader.java | 25 +--- .../protocol/ProtocolHandlerWithClassLoader.java | 44 +- 3 files changed, 55 insertions(+), 51 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ClassLoaderSwitcher.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ClassLoaderSwitcher.java deleted file mode 100644 index 55cb9198da2..000 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ClassLoaderSwitcher.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker; - -/** - * Help to switch the class loader of current thread to the NarClassLoader, and change it back when it's done. - * With the help of try-with-resources statement, the code would be cleaner than using try finally every time. - */ -public class ClassLoaderSwitcher implements AutoCloseable { -private final ClassLoader prevClassLoader; - -public ClassLoaderSwitcher(ClassLoader classLoader) { -prevClassLoader = Thread.currentThread().getContextClassLoader(); -Thread.currentThread().setContextClassLoader(classLoader); -} - -@Override -public void close() { -Thread.currentThread().setContextClassLoader(prevClassLoader); -} -} \ No newline at end of file diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithClassLoader.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithClassLoader.java index c2b4b900733..bc1f25c5af9 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithClassLoader.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithClassLoader.java @@ -22,7 +22,6 @@ import java.io.IOException; import lombok.Data; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.ClassLoaderSwitcher; import org.apache.pulsar.common.configuration.PulsarConfiguration; import org.apache.pulsar.common.nar.NarClassLoader; import org.eclipse.jetty.servlet.ServletHolder; @@ -40,29 +39,45 @@ public class AdditionalServletWithClassLoader implements AdditionalServlet { @Override public void loadConfig(PulsarConfiguration pulsarConfiguration) { -try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { +ClassLoader prevClassLoader = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(classLoader); servlet.loadConfig(pulsarConfiguration); +} finally { +Thread.currentThread().setContextClassLoader(prevClassLoader); } } @Override public String getBasePath() { -try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { +ClassLoader prevClassLoader = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(classLoader); return servlet.getBasePath(); +} finally { +Thread.currentThread().setContextClassLoader(prevClassLoader); } } @Override public ServletHolder getServletHolder() { -try (ClassLoaderSwitcher ignored = new ClassLoader
(pulsar) branch branch-3.2 updated: [improve][broker] avoid creating new objects when intercepting (#22790)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 69b521fef7c [improve][broker] avoid creating new objects when intercepting (#22790) 69b521fef7c is described below commit 69b521fef7c7606e66cafd9417b61f0af3624eec Author: Qiang Zhao AuthorDate: Tue May 28 22:45:30 2024 +0800 [improve][broker] avoid creating new objects when intercepting (#22790) --- .../BrokerInterceptorWithClassLoader.java | 127 + .../intercept/BrokerInterceptorUtilsTest.java | 2 +- .../BrokerInterceptorWithClassLoaderTest.java | 2 +- 3 files changed, 105 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java index faee5799289..3997e214f43 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java @@ -29,7 +29,6 @@ import lombok.Data; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; -import org.apache.pulsar.broker.ClassLoaderSwitcher; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Producer; @@ -51,16 +50,20 @@ import org.apache.pulsar.common.nar.NarClassLoader; public class BrokerInterceptorWithClassLoader implements BrokerInterceptor { private final BrokerInterceptor interceptor; -private final NarClassLoader classLoader; +private final NarClassLoader narClassLoader; @Override public void beforeSendMessage(Subscription subscription, Entry entry, long[] ackSet, MessageMetadata msgMetadata) { -try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { +final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.beforeSendMessage( subscription, entry, ackSet, msgMetadata); +} finally { +Thread.currentThread().setContextClassLoader(previousContext); } } @@ -70,25 +73,37 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor { long[] ackSet, MessageMetadata msgMetadata, Consumer consumer) { -try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { +final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.beforeSendMessage( subscription, entry, ackSet, msgMetadata, consumer); +} finally { +Thread.currentThread().setContextClassLoader(previousContext); } } @Override public void onMessagePublish(Producer producer, ByteBuf headersAndPayload, Topic.PublishContext publishContext) { -try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { +final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.onMessagePublish(producer, headersAndPayload, publishContext); +} finally { +Thread.currentThread().setContextClassLoader(previousContext); } } @Override public void producerCreated(ServerCnx cnx, Producer producer, Map metadata){ -try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { +final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.producerCreated(cnx, producer, metadata); +} finally { +Thread.currentThread().setContextClassLoader(previousContext); } } @@ -96,8 +111,12 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor { public void producerClosed(ServerCnx cnx, Producer producer, Map metadata) { -try (ClassLoaderSwitcher
(pulsar) branch branch-3.2 updated: [fix][broker] EntryFilters fix NoClassDefFoundError due to closed classloader (#22767)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 53167ff6b03 [fix][broker] EntryFilters fix NoClassDefFoundError due to closed classloader (#22767) 53167ff6b03 is described below commit 53167ff6b039d6ff69cae43501ee66983e2920e4 Author: Enrico Olivelli AuthorDate: Wed May 29 17:27:00 2024 +0200 [fix][broker] EntryFilters fix NoClassDefFoundError due to closed classloader (#22767) --- .../broker/service/plugin/EntryFilterProvider.java | 3 ++- .../service/plugin/EntryFilterWithClassLoader.java | 29 +- .../broker/service/plugin/FilterEntryTest.java | 12 - .../pulsar/broker/stats/ConsumerStatsTest.java | 2 +- .../pulsar/broker/stats/SubscriptionStatsTest.java | 2 +- .../apache/pulsar/common/nar/NarClassLoader.java | 16 6 files changed, 49 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java index f93e561542e..53418744b54 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java @@ -197,7 +197,8 @@ public class EntryFilterProvider implements AutoCloseable { + " does not implement entry filter interface"); } EntryFilter pi = (EntryFilter) filter; -return new EntryFilterWithClassLoader(pi, ncl); +// the classloader is shared with the broker, the instance doesn't own it +return new EntryFilterWithClassLoader(pi, ncl, false); } catch (Throwable e) { if (e instanceof IOException) { throw (IOException) e; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java index c5c57210877..aab46c62acd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java @@ -30,15 +30,23 @@ import org.apache.pulsar.common.nar.NarClassLoader; public class EntryFilterWithClassLoader implements EntryFilter { private final EntryFilter entryFilter; private final NarClassLoader classLoader; +private final boolean classLoaderOwned; -public EntryFilterWithClassLoader(EntryFilter entryFilter, NarClassLoader classLoader) { +public EntryFilterWithClassLoader(EntryFilter entryFilter, NarClassLoader classLoader, boolean classLoaderOwned) { this.entryFilter = entryFilter; this.classLoader = classLoader; +this.classLoaderOwned = classLoaderOwned; } @Override public FilterResult filterEntry(Entry entry, FilterContext context) { -return entryFilter.filterEntry(entry, context); +ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(classLoader); +return entryFilter.filterEntry(entry, context); +} finally { +Thread.currentThread().setContextClassLoader(currentClassLoader); +} } @VisibleForTesting @@ -48,11 +56,20 @@ public class EntryFilterWithClassLoader implements EntryFilter { @Override public void close() { -entryFilter.close(); +ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader(); try { -classLoader.close(); -} catch (IOException e) { -log.error("close EntryFilterWithClassLoader failed", e); +Thread.currentThread().setContextClassLoader(classLoader); +entryFilter.close(); +} finally { +Thread.currentThread().setContextClassLoader(currentClassLoader); +} +if (classLoaderOwned) { +log.info("Closing classloader {} for EntryFilter {}", classLoader, entryFilter.getClass().getName()); +try { +classLoader.close(); +} catch (IOException e) { +log.error("close EntryFilterWithClassLoader failed", e); +} } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java index 7b3daddcd9d..f7388ef9eb9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/br
(pulsar) branch branch-3.2 updated: [improve][broker] Clear thread local BrokerEntryMetadata instance before reuse (#22752)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new d43a640f1c9 [improve][broker] Clear thread local BrokerEntryMetadata instance before reuse (#22752) d43a640f1c9 is described below commit d43a640f1c9860662bba19f70a079d1db7066de5 Author: Lari Hotari AuthorDate: Wed May 22 05:56:14 2024 +0300 [improve][broker] Clear thread local BrokerEntryMetadata instance before reuse (#22752) --- .../src/main/java/org/apache/pulsar/common/protocol/Commands.java| 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 65674af0ae1..8599ec2dd34 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1705,6 +1705,7 @@ public class Commands { // | 2 bytes| 4 bytes |BROKER_ENTRY_METADATA_SIZE bytes | BrokerEntryMetadata brokerEntryMetadata = BROKER_ENTRY_METADATA.get(); +brokerEntryMetadata.clear(); for (BrokerEntryMetadataInterceptor interceptor : brokerInterceptors) { interceptor.intercept(brokerEntryMetadata); if (numberOfMessages >= 0) {
(pulsar) branch branch-3.2 updated: [improve][broker] Close protocol handlers before unloading namespace bundles (#22728)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 895e05d088f [improve][broker] Close protocol handlers before unloading namespace bundles (#22728) 895e05d088f is described below commit 895e05d088f4db23f1ca4d30e9d2ecaf7d3a6761 Author: Yunze Xu AuthorDate: Tue May 21 16:26:36 2024 +0800 [improve][broker] Close protocol handlers before unloading namespace bundles (#22728) --- .../org/apache/pulsar/broker/PulsarService.java| 12 +- .../channel/ServiceUnitStateChannelImpl.java | 2 +- .../broker/protocol/PulsarClientBasedHandler.java | 152 + .../protocol/PulsarClientBasedHandlerTest.java | 87 .../protocol/SimpleProtocolHandlerTestsBase.java | 16 ++- 5 files changed, 258 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index bf266d44d83..1a45bedfce4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -427,6 +427,12 @@ public class PulsarService implements AutoCloseable, ShutdownService { public CompletableFuture closeAsync() { mutex.lock(); try { +// Close protocol handler before unloading namespace bundles because protocol handlers might maintain +// Pulsar clients that could send lookup requests that affect unloading. +if (protocolHandlers != null) { +protocolHandlers.close(); +protocolHandlers = null; +} if (closeFuture != null) { return closeFuture; } @@ -434,6 +440,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { if (brokerService != null) { brokerService.unloadNamespaceBundlesGracefully(); } +// It only tells the Pulsar clients that this service is not ready to serve for the lookup requests state = State.Closing; // close the service in reverse order v.s. in which they are started @@ -492,11 +499,6 @@ public class PulsarService implements AutoCloseable, ShutdownService { (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * getConfiguration() .getBrokerShutdownTimeoutMs(; -// close protocol handler before closing broker service -if (protocolHandlers != null) { -protocolHandlers.close(); -protocolHandlers = null; -} // cancel loadShedding task and shutdown the loadManager executor before shutting down the broker if (this.loadSheddingTask != null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index c7702a40d0b..477a9239538 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -114,7 +114,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { public static final CompressionType MSG_COMPRESSION_TYPE = CompressionType.ZSTD; private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000; private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 100; -private static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000; +public static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000; public static final long VERSION_ID_INIT = 1; // initial versionId public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs to clean immediately diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java new file mode 100644 index 000..ed9881a8cad --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed
(pulsar) branch branch-3.0 updated: [improve][broker] Remove ClassLoaderSwitcher to avoid objects allocations and consistent the codestyle (#22796)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 7918ed5805d [improve][broker] Remove ClassLoaderSwitcher to avoid objects allocations and consistent the codestyle (#22796) 7918ed5805d is described below commit 7918ed5805d8175dab2cb4cfa2c1e97b0eb80a09 Author: 道君 AuthorDate: Wed May 29 22:19:47 2024 +0800 [improve][broker] Remove ClassLoaderSwitcher to avoid objects allocations and consistent the codestyle (#22796) --- .../apache/pulsar/broker/ClassLoaderSwitcher.java | 37 -- .../servlet/AdditionalServletWithClassLoader.java | 25 +--- .../protocol/ProtocolHandlerWithClassLoader.java | 44 +- 3 files changed, 55 insertions(+), 51 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ClassLoaderSwitcher.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ClassLoaderSwitcher.java deleted file mode 100644 index 55cb9198da2..000 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ClassLoaderSwitcher.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker; - -/** - * Help to switch the class loader of current thread to the NarClassLoader, and change it back when it's done. - * With the help of try-with-resources statement, the code would be cleaner than using try finally every time. - */ -public class ClassLoaderSwitcher implements AutoCloseable { -private final ClassLoader prevClassLoader; - -public ClassLoaderSwitcher(ClassLoader classLoader) { -prevClassLoader = Thread.currentThread().getContextClassLoader(); -Thread.currentThread().setContextClassLoader(classLoader); -} - -@Override -public void close() { -Thread.currentThread().setContextClassLoader(prevClassLoader); -} -} \ No newline at end of file diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithClassLoader.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithClassLoader.java index c2b4b900733..bc1f25c5af9 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithClassLoader.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithClassLoader.java @@ -22,7 +22,6 @@ import java.io.IOException; import lombok.Data; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.ClassLoaderSwitcher; import org.apache.pulsar.common.configuration.PulsarConfiguration; import org.apache.pulsar.common.nar.NarClassLoader; import org.eclipse.jetty.servlet.ServletHolder; @@ -40,29 +39,45 @@ public class AdditionalServletWithClassLoader implements AdditionalServlet { @Override public void loadConfig(PulsarConfiguration pulsarConfiguration) { -try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { +ClassLoader prevClassLoader = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(classLoader); servlet.loadConfig(pulsarConfiguration); +} finally { +Thread.currentThread().setContextClassLoader(prevClassLoader); } } @Override public String getBasePath() { -try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { +ClassLoader prevClassLoader = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(classLoader); return servlet.getBasePath(); +} finally { +Thread.currentThread().setContextClassLoader(prevClassLoader); } } @Override public ServletHolder getServletHolder() { -try (ClassLoaderSwitcher ignored = new ClassLoader
(pulsar) branch branch-3.0 updated: [improve][broker] Clear thread local BrokerEntryMetadata instance before reuse (#22752)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 18db799297d [improve][broker] Clear thread local BrokerEntryMetadata instance before reuse (#22752) 18db799297d is described below commit 18db799297d6c6980100d9367231b3aa42db838e Author: Lari Hotari AuthorDate: Wed May 22 05:56:14 2024 +0300 [improve][broker] Clear thread local BrokerEntryMetadata instance before reuse (#22752) --- .../src/main/java/org/apache/pulsar/common/protocol/Commands.java| 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 39829000418..faa5fbcd301 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1675,6 +1675,7 @@ public class Commands { // | 2 bytes| 4 bytes |BROKER_ENTRY_METADATA_SIZE bytes | BrokerEntryMetadata brokerEntryMetadata = BROKER_ENTRY_METADATA.get(); +brokerEntryMetadata.clear(); for (BrokerEntryMetadataInterceptor interceptor : brokerInterceptors) { interceptor.intercept(brokerEntryMetadata); if (numberOfMessages >= 0) {
(pulsar) branch branch-3.0 updated: [fix][broker] EntryFilters fix NoClassDefFoundError due to closed classloader (#22767)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new caccd54f2ce [fix][broker] EntryFilters fix NoClassDefFoundError due to closed classloader (#22767) caccd54f2ce is described below commit caccd54f2ceabae0b24b8c14df088c94b7a72643 Author: Enrico Olivelli AuthorDate: Wed May 29 17:27:00 2024 +0200 [fix][broker] EntryFilters fix NoClassDefFoundError due to closed classloader (#22767) --- .../broker/service/plugin/EntryFilterProvider.java | 3 ++- .../service/plugin/EntryFilterWithClassLoader.java | 29 +- .../broker/service/plugin/FilterEntryTest.java | 12 - .../pulsar/broker/stats/ConsumerStatsTest.java | 2 +- .../pulsar/broker/stats/SubscriptionStatsTest.java | 2 +- .../apache/pulsar/common/nar/NarClassLoader.java | 16 6 files changed, 49 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java index f93e561542e..53418744b54 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java @@ -197,7 +197,8 @@ public class EntryFilterProvider implements AutoCloseable { + " does not implement entry filter interface"); } EntryFilter pi = (EntryFilter) filter; -return new EntryFilterWithClassLoader(pi, ncl); +// the classloader is shared with the broker, the instance doesn't own it +return new EntryFilterWithClassLoader(pi, ncl, false); } catch (Throwable e) { if (e instanceof IOException) { throw (IOException) e; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java index c5c57210877..aab46c62acd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java @@ -30,15 +30,23 @@ import org.apache.pulsar.common.nar.NarClassLoader; public class EntryFilterWithClassLoader implements EntryFilter { private final EntryFilter entryFilter; private final NarClassLoader classLoader; +private final boolean classLoaderOwned; -public EntryFilterWithClassLoader(EntryFilter entryFilter, NarClassLoader classLoader) { +public EntryFilterWithClassLoader(EntryFilter entryFilter, NarClassLoader classLoader, boolean classLoaderOwned) { this.entryFilter = entryFilter; this.classLoader = classLoader; +this.classLoaderOwned = classLoaderOwned; } @Override public FilterResult filterEntry(Entry entry, FilterContext context) { -return entryFilter.filterEntry(entry, context); +ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(classLoader); +return entryFilter.filterEntry(entry, context); +} finally { +Thread.currentThread().setContextClassLoader(currentClassLoader); +} } @VisibleForTesting @@ -48,11 +56,20 @@ public class EntryFilterWithClassLoader implements EntryFilter { @Override public void close() { -entryFilter.close(); +ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader(); try { -classLoader.close(); -} catch (IOException e) { -log.error("close EntryFilterWithClassLoader failed", e); +Thread.currentThread().setContextClassLoader(classLoader); +entryFilter.close(); +} finally { +Thread.currentThread().setContextClassLoader(currentClassLoader); +} +if (classLoaderOwned) { +log.info("Closing classloader {} for EntryFilter {}", classLoader, entryFilter.getClass().getName()); +try { +classLoader.close(); +} catch (IOException e) { +log.error("close EntryFilterWithClassLoader failed", e); +} } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java index 1c4f88bc027..6c847db05a0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/br
(pulsar) branch branch-3.0 updated: [improve][broker] avoid creating new objects when intercepting (#22790)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 3e17c6338d3 [improve][broker] avoid creating new objects when intercepting (#22790) 3e17c6338d3 is described below commit 3e17c6338d3976beb1823b4f1838b38584e3a60a Author: Qiang Zhao AuthorDate: Tue May 28 22:45:30 2024 +0800 [improve][broker] avoid creating new objects when intercepting (#22790) --- .../BrokerInterceptorWithClassLoader.java | 127 + .../intercept/BrokerInterceptorUtilsTest.java | 2 +- .../BrokerInterceptorWithClassLoaderTest.java | 2 +- 3 files changed, 105 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java index faee5799289..3997e214f43 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java @@ -29,7 +29,6 @@ import lombok.Data; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; -import org.apache.pulsar.broker.ClassLoaderSwitcher; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Producer; @@ -51,16 +50,20 @@ import org.apache.pulsar.common.nar.NarClassLoader; public class BrokerInterceptorWithClassLoader implements BrokerInterceptor { private final BrokerInterceptor interceptor; -private final NarClassLoader classLoader; +private final NarClassLoader narClassLoader; @Override public void beforeSendMessage(Subscription subscription, Entry entry, long[] ackSet, MessageMetadata msgMetadata) { -try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { +final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.beforeSendMessage( subscription, entry, ackSet, msgMetadata); +} finally { +Thread.currentThread().setContextClassLoader(previousContext); } } @@ -70,25 +73,37 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor { long[] ackSet, MessageMetadata msgMetadata, Consumer consumer) { -try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { +final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.beforeSendMessage( subscription, entry, ackSet, msgMetadata, consumer); +} finally { +Thread.currentThread().setContextClassLoader(previousContext); } } @Override public void onMessagePublish(Producer producer, ByteBuf headersAndPayload, Topic.PublishContext publishContext) { -try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { +final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.onMessagePublish(producer, headersAndPayload, publishContext); +} finally { +Thread.currentThread().setContextClassLoader(previousContext); } } @Override public void producerCreated(ServerCnx cnx, Producer producer, Map metadata){ -try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { +final ClassLoader previousContext = Thread.currentThread().getContextClassLoader(); +try { +Thread.currentThread().setContextClassLoader(narClassLoader); this.interceptor.producerCreated(cnx, producer, metadata); +} finally { +Thread.currentThread().setContextClassLoader(previousContext); } } @@ -96,8 +111,12 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor { public void producerClosed(ServerCnx cnx, Producer producer, Map metadata) { -try (ClassLoaderSwitcher
(pulsar) branch master updated: [fix][admin] Clearly define REST API on Open API for Topics (#22782)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new c25d7b20b21 [fix][admin] Clearly define REST API on Open API for Topics (#22782) c25d7b20b21 is described below commit c25d7b20b21e66f122f949b5a26fa32b433632b7 Author: Baodi Shi AuthorDate: Mon May 27 23:45:50 2024 +0800 [fix][admin] Clearly define REST API on Open API for Topics (#22782) --- .../broker/admin/v2/NonPersistentTopics.java | 19 +- .../pulsar/broker/admin/v2/PersistentTopics.java | 327 - 2 files changed, 270 insertions(+), 76 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index 7de7d7363c0..5a7ea1b7632 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -52,8 +52,10 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.NamespaceOperation; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.policies.data.TopicStats; @@ -74,7 +76,7 @@ public class NonPersistentTopics extends PersistentTopics { @GET @Path("/{tenant}/{namespace}/{topic}/partitions") -@ApiOperation(value = "Get partitioned topic metadata.") +@ApiOperation(value = "Get partitioned topic metadata.", response = PartitionedTopicMetadata.class) @ApiResponses(value = { @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to manage resources on this tenant"), @@ -102,7 +104,7 @@ public class NonPersistentTopics extends PersistentTopics { @GET @Path("{tenant}/{namespace}/{topic}/internalStats") -@ApiOperation(value = "Get the internal stats for the topic.") +@ApiOperation(value = "Get the internal stats for the topic.", response = PersistentTopicInternalStats.class) @ApiResponses(value = { @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to manage resources on this tenant"), @@ -145,6 +147,7 @@ public class NonPersistentTopics extends PersistentTopics { @ApiOperation(value = "Create a partitioned topic.", notes = "It needs to be called before creating a producer on a partitioned topic.") @ApiResponses(value = { +@ApiResponse(code = 204, message = "Operation successful"), @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to manage resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @@ -317,6 +320,7 @@ public class NonPersistentTopics extends PersistentTopics { @Path("/{tenant}/{namespace}/{topic}/unload") @ApiOperation(value = "Unload a topic") @ApiResponses(value = { +@ApiResponse(code = 204, message = "Operation successful"), @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "This operation requires super-user access"), @ApiResponse(code = 403, message = "Don't have admin permission"), @@ -502,6 +506,7 @@ public class NonPersistentTopics extends PersistentTopics { @ApiOperation(value = "Truncate a topic.", notes = "NonPersistentTopic does not support truncate.") @ApiResponses(value = { +@ApiResponse(code = 204, message = "Operation successful"), @ApiResponse(code = 412, message = "NonPersistentTopic does not support truncate.") }) public void truncateTopic( @@ -525,7 +530,7 @@ public class NonPersistentTopics extends PersistentTopics { @GET @Path(&
(pulsar) branch master updated: [fix][admin] Clearly define REST API on Open API (#22783)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new ba20e02f01d [fix][admin] Clearly define REST API on Open API (#22783) ba20e02f01d is described below commit ba20e02f01d75f0d4ec38393841bcf5c417e9363 Author: Baodi Shi AuthorDate: Mon May 27 21:45:51 2024 +0800 [fix][admin] Clearly define REST API on Open API (#22783) --- .../pulsar/broker/admin/impl/FunctionsBase.java| 27 ++- .../apache/pulsar/broker/admin/impl/SinksBase.java | 18 ++ .../pulsar/broker/admin/impl/SourcesBase.java | 20 +++ .../pulsar/broker/admin/impl/TenantsBase.java | 12 +-- .../pulsar/broker/admin/v2/ResourceGroups.java | 5 ++- .../pulsar/broker/admin/v2/ResourceQuotas.java | 4 ++- .../org/apache/pulsar/broker/admin/v2/Worker.java | 10 -- .../apache/pulsar/broker/admin/v3/Packages.java| 9 ++--- .../pulsar/broker/admin/v3/Transactions.java | 39 +++--- 9 files changed, 99 insertions(+), 45 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 4350316e2f0..42971ae231c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -39,7 +39,6 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.StreamingOutput; import org.apache.pulsar.broker.admin.AdminResource; -import org.apache.pulsar.client.api.Message; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.FunctionDefinition; import org.apache.pulsar.common.functions.FunctionState; @@ -486,7 +485,7 @@ public class FunctionsBase extends AdminResource { @POST @ApiOperation( value = "Triggers a Pulsar Function with a user-specified value or file data", -response = Message.class +response = String.class ) @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), @@ -541,6 +540,7 @@ public class FunctionsBase extends AdminResource { value = "Put the state associated with a Pulsar Function" ) @ApiResponses(value = { +@ApiResponse(code = 200, message = "Operation successful"), @ApiResponse(code = 400, message = "Invalid request"), @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), @ApiResponse(code = 404, message = "The Pulsar Function does not exist"), @@ -557,8 +557,9 @@ public class FunctionsBase extends AdminResource { } @POST -@ApiOperation(value = "Restart an instance of a Pulsar Function", response = Void.class) +@ApiOperation(value = "Restart an instance of a Pulsar Function") @ApiResponses(value = { +@ApiResponse(code = 200, message = "Operation successful"), @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this function"), @ApiResponse(code = 400, message = "Invalid request"), @ApiResponse(code = 404, message = "The Pulsar Function does not exist"), @@ -578,8 +579,9 @@ public class FunctionsBase extends AdminResource { } @POST -@ApiOperation(value = "Restart all instances of a Pulsar Function", response = Void.class) +@ApiOperation(value = "Restart all instances of a Pulsar Function") @ApiResponses(value = { +@ApiResponse(code = 200, message = "Operation successful"), @ApiResponse(code = 400, message = "Invalid request"), @ApiResponse(code = 404, message = "The Pulsar Function does not exist"), @ApiResponse(code = 500, message = "Internal server error") @@ -597,8 +599,9 @@ public class FunctionsBase extends AdminResource { } @POST -@ApiOperation(value = "Stop an instance of a Pulsar Function", response = Void.class) +@ApiOperation(value = "Stop an instance of a Pulsar Function") @ApiResponses(value = { +@ApiResponse(code = 200, message = "Operation successful"), @ApiResponse(code = 400, message = "Invalid request"), @ApiResponse(code = 404, message = "The Pulsar Function does not exist"), @ApiResponse(code = 500, message = "Internal server error") @@ -617,8 +620,9 @@ public class FunctionsBase extends AdminReso
(pulsar) branch master updated: [fix][admin] Clearly define REST API on Open API for Namesaces@v2 (#22775)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 9b3876df70f [fix][admin] Clearly define REST API on Open API for Namesaces@v2 (#22775) 9b3876df70f is described below commit 9b3876df70f3b1d8bc01a34308d718c456f1781b Author: Baodi Shi AuthorDate: Mon May 27 21:37:24 2024 +0800 [fix][admin] Clearly define REST API on Open API for Namesaces@v2 (#22775) --- .../pulsar/broker/admin/impl/NamespacesBase.java | 2 +- .../apache/pulsar/broker/admin/v2/Namespaces.java | 453 +++-- 2 files changed, 329 insertions(+), 126 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index ca67a244607..afcf4e646fa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2711,7 +2711,7 @@ public abstract class NamespacesBase extends AdminResource { })); } -protected CompletableFuture internalGetDispatcherPauseOnAckStatePersistentAsync() { +protected CompletableFuture internalGetDispatcherPauseOnAckStatePersistentAsync() { return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT, PolicyOperation.READ) .thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName)) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 0e270ed34f7..3a7c614a7c6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -23,6 +23,8 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Example; +import io.swagger.annotations.ExampleProperty; import java.io.OutputStreamWriter; import java.nio.charset.StandardCharsets; import java.util.HashSet; @@ -58,9 +60,11 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; +import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.NamespaceOperation; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; @@ -72,6 +76,12 @@ import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrat import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; +import org.apache.pulsar.common.policies.data.TopicHashPositions; +import org.apache.pulsar.common.policies.data.impl.AutoSubscriptionCreationOverrideImpl; +import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl; +import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl; +import org.apache.pulsar.common.policies.data.impl.BookieAffinityGroupDataImpl; +import org.apache.pulsar.common.policies.data.impl.BundlesDataImpl; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -151,7 +161,9 @@ public class Namespaces extends NamespacesBase { @PUT @Path("/{tenant}/{namespace}") @ApiOperation(value = "Creates a new namespace with the specified policies") -@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), +@ApiResponses(value = { +@ApiResponse(code = 204, message = "Operation successful"), +@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster doesn't exist"), @ApiResponse(code = 409, message = "Namespace already exists"), @ApiResponse(
(pulsar) branch master updated: [improve][broker] Clear thread local BrokerEntryMetadata instance before reuse (#22752)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new adad1fb8420 [improve][broker] Clear thread local BrokerEntryMetadata instance before reuse (#22752) adad1fb8420 is described below commit adad1fb84200f3e6858cea453a7ce6f13ef93de2 Author: Lari Hotari AuthorDate: Wed May 22 05:56:14 2024 +0300 [improve][broker] Clear thread local BrokerEntryMetadata instance before reuse (#22752) --- .../src/main/java/org/apache/pulsar/common/protocol/Commands.java| 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 65674af0ae1..8599ec2dd34 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1705,6 +1705,7 @@ public class Commands { // | 2 bytes| 4 bytes |BROKER_ENTRY_METADATA_SIZE bytes | BrokerEntryMetadata brokerEntryMetadata = BROKER_ENTRY_METADATA.get(); +brokerEntryMetadata.clear(); for (BrokerEntryMetadataInterceptor interceptor : brokerInterceptors) { interceptor.intercept(brokerEntryMetadata); if (numberOfMessages >= 0) {
(pulsar) branch master updated (949260f190c -> ce9998655c7)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 949260f190c [fix][broker] Immediately tombstone Deleted and Free state bundles (#22743) add ce9998655c7 [improve] Allow to construct Oxia metadata-store directly from a client instance (#22756) No new revisions were added by this update. Summary of changes: .../pulsar/metadata/impl/oxia/OxiaMetadataStore.java | 20 +--- 1 file changed, 17 insertions(+), 3 deletions(-)
(pulsar) branch branch-3.2 updated: [improve] [broker] Trigger offload on topic load (#22652)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 5bbd6175a3a [improve] [broker] Trigger offload on topic load (#22652) 5bbd6175a3a is described below commit 5bbd6175a3aaf3a4413af784af9d06d5c748a32d Author: Hang Chen AuthorDate: Fri May 17 21:59:42 2024 +0800 [improve] [broker] Trigger offload on topic load (#22652) --- conf/broker.conf| 3 +++ conf/standalone.conf| 3 +++ .../apache/bookkeeper/mledger/ManagedLedgerConfig.java | 17 + .../mledger/impl/ManagedLedgerFactoryImpl.java | 5 + .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 4 ++-- .../org/apache/pulsar/broker/ServiceConfiguration.java | 9 +++-- .../org/apache/pulsar/broker/service/BrokerService.java | 1 + 7 files changed, 38 insertions(+), 4 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index c5beda206a4..1c9dc915465 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1206,6 +1206,9 @@ managedLedgerDataReadPriority=tiered-storage-first # (default is -1, which is disabled) managedLedgerOffloadThresholdInSeconds=-1 +# Trigger offload on topic load or not. Default is false. +# triggerOffloadOnTopicLoad=false + # Max number of entries to append to a cursor ledger managedLedgerCursorMaxEntriesPerLedger=5 diff --git a/conf/standalone.conf b/conf/standalone.conf index 5ca0c683d74..fe174cf53ae 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -835,6 +835,9 @@ managedLedgerPrometheusStatsLatencyRolloverSeconds=60 # Whether trace managed ledger task execution time managedLedgerTraceTaskExecution=true +# Trigger offload on topic load or not. Default is false. +# triggerOffloadOnTopicLoad=false + # If you want to custom bookie ID or use a dynamic network address for the bookie, # you can set this option. # Bookie advertises itself using bookieId rather than diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 0c93a5b642c..fb2c6de3c74 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -85,6 +85,7 @@ public class ManagedLedgerConfig { private int minimumBacklogCursorsForCaching = 0; private int minimumBacklogEntriesForCaching = 1000; private int maxBacklogBetweenCursorsForCaching = 1000; +private boolean triggerOffloadOnTopicLoad = false; @Getter @Setter @@ -748,6 +749,22 @@ public class ManagedLedgerConfig { this.maxBacklogBetweenCursorsForCaching = maxBacklogBetweenCursorsForCaching; } +/** + * Trigger offload on topic load. + * @return + */ +public boolean isTriggerOffloadOnTopicLoad() { +return triggerOffloadOnTopicLoad; +} + +/** + * Set trigger offload on topic load. + * @param triggerOffloadOnTopicLoad + */ +public void setTriggerOffloadOnTopicLoad(boolean triggerOffloadOnTopicLoad) { +this.triggerOffloadOnTopicLoad = triggerOffloadOnTopicLoad; +} + public String getShadowSource() { return MapUtils.getString(properties, PROPERTY_SOURCE_TOPIC_KEY); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 5ce84b3ed85..d867f2f4c02 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.impl; import static com.google.common.base.Preconditions.checkArgument; import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; +import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.NULL_OFFLOAD_PROMISE; import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; import com.google.common.base.Predicates; import com.google.common.collect.Maps; @@ -395,6 +396,10 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { // May need to update the cursor position newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger(); +// May need to trigger offloading +if (config.isTriggerOffloadOnTopicLoad()) { + newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE
(pulsar) branch branch-3.2 updated: [fix][ml]: subscription props could be lost in case of missing ledger during recovery (#22637)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 35b36010d6e [fix][ml]: subscription props could be lost in case of missing ledger during recovery (#22637) 35b36010d6e is described below commit 35b36010d6ec2dbd658bd7c9f3cf440a7554a983 Author: Nicolò Boschi AuthorDate: Mon May 20 18:12:21 2024 +0200 [fix][ml]: subscription props could be lost in case of missing ledger during recovery (#22637) --- .../main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 911eca48bac..e47e03c39bf 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -538,7 +538,7 @@ public class ManagedCursorImpl implements ManagedCursor { log.error("[{}] Error opening metadata ledger {} for cursor {}: {}", ledger.getName(), ledgerId, name, BKException.getMessage(rc)); // Rewind to oldest entry available -initialize(getRollbackPosition(info), Collections.emptyMap(), Collections.emptyMap(), callback); +initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); return; } else if (rc != BKException.Code.OK) { log.warn("[{}] Error opening metadata ledger {} for cursor {}: {}", ledger.getName(), ledgerId, name,
(pulsar) branch branch-3.2 updated: [fix] [ml] Add entry fail due to race condition about add entry failed/timeout and switch ledger (#22221)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new acc1ff6da0e [fix] [ml] Add entry fail due to race condition about add entry failed/timeout and switch ledger (#1) acc1ff6da0e is described below commit acc1ff6da0e6e32a59942c8b7cdae55cf3ad76a7 Author: fengyubiao AuthorDate: Tue May 21 15:50:53 2024 +0800 [fix] [ml] Add entry fail due to race condition about add entry failed/timeout and switch ledger (#1) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 30 .../apache/bookkeeper/mledger/impl/OpAddEntry.java | 39 +--- .../mledger/impl/ShadowManagedLedgerImpl.java | 3 ++ .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 53 +- .../bookkeeper/client/PulsarMockBookKeeper.java| 7 +++ .../bookkeeper/client/PulsarMockLedgerHandle.java | 7 +++ 6 files changed, 113 insertions(+), 26 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index fa2dc45357e..7701419ab40 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -59,6 +59,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; @@ -240,6 +241,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { protected volatile long lastAddEntryTimeMs = 0; private long inactiveLedgerRollOverTimeMs = 0; +/** A signal that may trigger all the subsequent OpAddEntry of current ledger to be failed due to timeout. **/ +protected volatile AtomicBoolean currentLedgerTimeoutTriggered; + protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3; protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60; private static final String MIGRATION_STATE_PROPERTY = "migrated"; @@ -532,6 +536,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { STATE_UPDATER.set(this, State.LedgerOpened); updateLastLedgerCreatedTimeAndScheduleRolloverTask(); currentLedger = lh; +currentLedgerTimeoutTriggered = new AtomicBoolean(); lastConfirmedEntry = new PositionImpl(lh.getId(), -1); // bypass empty ledgers, find last ledger with Message if possible. @@ -774,7 +779,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // Jump to specific thread to avoid contention from writers writing from different threads executor.execute(() -> { -OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx); +OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx, +currentLedgerTimeoutTriggered); internalAsyncAddEntry(addOperation); }); } @@ -790,7 +796,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // Jump to specific thread to avoid contention from writers writing from different threads executor.execute(() -> { -OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx); +OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx, +currentLedgerTimeoutTriggered); internalAsyncAddEntry(addOperation); }); } @@ -842,6 +849,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // Write into lastLedger addOperation.setLedger(currentLedger); +addOperation.setTimeoutTriggered(currentLedgerTimeoutTriggered); ++currentLedgerEntries; currentLedgerSize += addOperation.data.readableBytes(); @@ -1585,6 +1593,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { LedgerHandle originalCurrentLedger = currentLedger; ledgers.put(lh.getId(), newLedger); currentLedger = lh; +currentLedgerTimeoutTriggered = new AtomicBoolean(); currentLedgerEntries = 0; c
(pulsar) branch branch-3.0 updated: [fix] [ml] Add entry fail due to race condition about add entry failed/timeout and switch ledger (#22221)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new b798e7f7d81 [fix] [ml] Add entry fail due to race condition about add entry failed/timeout and switch ledger (#1) b798e7f7d81 is described below commit b798e7f7d81e5b03f127a07b435c912109ff2e64 Author: fengyubiao AuthorDate: Tue May 21 15:50:53 2024 +0800 [fix] [ml] Add entry fail due to race condition about add entry failed/timeout and switch ledger (#1) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 30 .../apache/bookkeeper/mledger/impl/OpAddEntry.java | 39 +--- .../mledger/impl/ShadowManagedLedgerImpl.java | 3 ++ .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 53 +- .../bookkeeper/client/PulsarMockBookKeeper.java| 7 +++ .../bookkeeper/client/PulsarMockLedgerHandle.java | 7 +++ 6 files changed, 113 insertions(+), 26 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index e3dbe2370ec..ba70fcd9833 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -59,6 +59,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; @@ -241,6 +242,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { protected volatile long lastAddEntryTimeMs = 0; private long inactiveLedgerRollOverTimeMs = 0; +/** A signal that may trigger all the subsequent OpAddEntry of current ledger to be failed due to timeout. **/ +protected volatile AtomicBoolean currentLedgerTimeoutTriggered; + protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3; protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60; private static final String MIGRATION_STATE_PROPERTY = "migrated"; @@ -533,6 +537,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { STATE_UPDATER.set(this, State.LedgerOpened); updateLastLedgerCreatedTimeAndScheduleRolloverTask(); currentLedger = lh; +currentLedgerTimeoutTriggered = new AtomicBoolean(); lastConfirmedEntry = new PositionImpl(lh.getId(), -1); // bypass empty ledgers, find last ledger with Message if possible. @@ -775,7 +780,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // Jump to specific thread to avoid contention from writers writing from different threads executor.execute(() -> { -OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx); +OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx, +currentLedgerTimeoutTriggered); internalAsyncAddEntry(addOperation); }); } @@ -791,7 +797,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // Jump to specific thread to avoid contention from writers writing from different threads executor.execute(() -> { -OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx); +OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx, +currentLedgerTimeoutTriggered); internalAsyncAddEntry(addOperation); }); } @@ -843,6 +850,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // Write into lastLedger addOperation.setLedger(currentLedger); +addOperation.setTimeoutTriggered(currentLedgerTimeoutTriggered); ++currentLedgerEntries; currentLedgerSize += addOperation.data.readableBytes(); @@ -1586,6 +1594,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { LedgerHandle originalCurrentLedger = currentLedger; ledgers.put(lh.getId(), newLedger); currentLedger = lh; +currentLedgerTimeoutTriggered = new AtomicBoolean(); currentLedgerEntries = 0; c
(pulsar) branch branch-3.0 updated: [fix][ml]: subscription props could be lost in case of missing ledger during recovery (#22637)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 5bbb62d9fc8 [fix][ml]: subscription props could be lost in case of missing ledger during recovery (#22637) 5bbb62d9fc8 is described below commit 5bbb62d9fc8845de8bbb61323453b8aad22c083d Author: Nicolò Boschi AuthorDate: Mon May 20 18:12:21 2024 +0200 [fix][ml]: subscription props could be lost in case of missing ledger during recovery (#22637) --- .../main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 722e833e7ab..f700b354cda 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -540,7 +540,7 @@ public class ManagedCursorImpl implements ManagedCursor { log.error("[{}] Error opening metadata ledger {} for cursor {}: {}", ledger.getName(), ledgerId, name, BKException.getMessage(rc)); // Rewind to oldest entry available -initialize(getRollbackPosition(info), Collections.emptyMap(), Collections.emptyMap(), callback); +initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); return; } else if (rc != BKException.Code.OK) { log.warn("[{}] Error opening metadata ledger {} for cursor {}: {}", ledger.getName(), ledgerId, name,
(pulsar) branch master updated: [improve][broker] Close protocol handlers before unloading namespace bundles (#22728)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new a66ff17b31a [improve][broker] Close protocol handlers before unloading namespace bundles (#22728) a66ff17b31a is described below commit a66ff17b31a01fd9ab151188e9afc9d2de8c141f Author: Yunze Xu AuthorDate: Tue May 21 16:26:36 2024 +0800 [improve][broker] Close protocol handlers before unloading namespace bundles (#22728) --- .../org/apache/pulsar/broker/PulsarService.java| 12 +- .../channel/ServiceUnitStateChannelImpl.java | 2 +- .../broker/protocol/PulsarClientBasedHandler.java | 152 + .../protocol/PulsarClientBasedHandlerTest.java | 87 .../protocol/SimpleProtocolHandlerTestsBase.java | 16 ++- 5 files changed, 258 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 6ee35ad295f..6482ead1f5a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -444,6 +444,12 @@ public class PulsarService implements AutoCloseable, ShutdownService { public CompletableFuture closeAsync() { mutex.lock(); try { +// Close protocol handler before unloading namespace bundles because protocol handlers might maintain +// Pulsar clients that could send lookup requests that affect unloading. +if (protocolHandlers != null) { +protocolHandlers.close(); +protocolHandlers = null; +} if (closeFuture != null) { return closeFuture; } @@ -451,6 +457,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { if (brokerService != null) { brokerService.unloadNamespaceBundlesGracefully(); } +// It only tells the Pulsar clients that this service is not ready to serve for the lookup requests state = State.Closing; // close the service in reverse order v.s. in which they are started @@ -512,11 +519,6 @@ public class PulsarService implements AutoCloseable, ShutdownService { (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * getConfiguration() .getBrokerShutdownTimeoutMs(; -// close protocol handler before closing broker service -if (protocolHandlers != null) { -protocolHandlers.close(); -protocolHandlers = null; -} // cancel loadShedding task and shutdown the loadManager executor before shutting down the broker cancelLoadBalancerTasks(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 9821ce56420..7c91cf92708 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -114,7 +114,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { public static final CompressionType MSG_COMPRESSION_TYPE = CompressionType.ZSTD; private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000; private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 100; -private static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000; +public static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000; public static final long VERSION_ID_INIT = 1; // initial versionId public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs to clean immediately diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java new file mode 100644 index 000..ed9881a8cad --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work
(pulsar) branch master updated: [improve][test] Restart broker in SimpleProducerConsumerTest if ns deletion times out (#22734)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 47fd1a3e414 [improve][test] Restart broker in SimpleProducerConsumerTest if ns deletion times out (#22734) 47fd1a3e414 is described below commit 47fd1a3e414982555d55ca92e669fc63d3095636 Author: Lari Hotari AuthorDate: Mon May 20 16:21:31 2024 +0300 [improve][test] Restart broker in SimpleProducerConsumerTest if ns deletion times out (#22734) --- .../client/api/SimpleProducerConsumerTest.java | 37 +- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index d37bd484bfb..a9d97b7febd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -121,6 +121,7 @@ import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.tests.ThreadDumpUtil; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -147,25 +148,31 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { } @AfterMethod(alwaysRun = true) -public void rest() throws Exception { -pulsar.getConfiguration().setForceDeleteTenantAllowed(true); -pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true); +public void cleanupAfterMethod() throws Exception { +try { +pulsar.getConfiguration().setForceDeleteTenantAllowed(true); +pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true); -for (String tenant : admin.tenants().getTenants()) { -for (String namespace : admin.namespaces().getNamespaces(tenant)) { -deleteNamespaceWithRetry(namespace, true); +for (String tenant : admin.tenants().getTenants()) { +for (String namespace : admin.namespaces().getNamespaces(tenant)) { +deleteNamespaceWithRetry(namespace, true); +} +admin.tenants().deleteTenant(tenant, true); } -admin.tenants().deleteTenant(tenant, true); -} - -for (String cluster : admin.clusters().getClusters()) { -admin.clusters().deleteCluster(cluster); -} -pulsar.getConfiguration().setForceDeleteTenantAllowed(false); -pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false); +for (String cluster : admin.clusters().getClusters()) { +admin.clusters().deleteCluster(cluster); +} -super.producerBaseSetup(); +pulsar.getConfiguration().setForceDeleteTenantAllowed(false); +pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false); +super.producerBaseSetup(); +} catch (Exception | AssertionError e) { +log.warn("Failed to clean up state. Restarting broker.", e); +log.warn("Thread dump:\n{}", ThreadDumpUtil.buildThreadDiagnosticString()); +cleanup(); +setup(); +} } @DataProvider
(pulsar) branch branch-3.0 updated: [improve][admin] Check if the topic existed before the permission operations (#22742)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 71640f696fd [improve][admin] Check if the topic existed before the permission operations (#22742) 71640f696fd is described below commit 71640f696fd9109bc677408e3c2cbacb2fb7252b Author: Jiwei Guo AuthorDate: Sat May 18 22:57:45 2024 +0800 [improve][admin] Check if the topic existed before the permission operations (#22742) --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java| 15 +-- .../pulsar/broker/admin/AdminApiSchemaWithAuthTest.java | 1 + .../java/org/apache/pulsar/broker/admin/AdminApiTest.java | 12 .../apache/pulsar/broker/admin/PersistentTopicsTest.java | 10 -- .../org/apache/pulsar/broker/auth/AuthorizationTest.java | 12 +++- .../client/api/AuthenticatedProducerConsumerTest.java | 5 +++-- .../client/api/AuthorizationProducerConsumerTest.java | 2 ++ .../pulsar/websocket/proxy/ProxyAuthorizationTest.java| 8 +--- .../java/org/apache/pulsar/sql/presto/TestPulsarAuth.java | 2 +- .../tests/integration/presto/TestPulsarSQLAuth.java | 7 ++- 10 files changed, 50 insertions(+), 24 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index ebc838756f9..978111d7187 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -218,6 +218,7 @@ public class PersistentTopicsBase extends AdminResource { protected CompletableFuture>> internalGetPermissionsOnTopic() { // This operation should be reading from zookeeper and it should be allowed without having admin privileges return validateAdminAccessForTenantAsync(namespaceName.getTenant()) +.thenCompose(__ -> internalCheckTopicExists(topicName)) .thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName) .thenApply(policies -> { if (!policies.isPresent()) { @@ -298,9 +299,10 @@ public class PersistentTopicsBase extends AdminResource { Set actions) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessForTenantAsync(namespaceName.getTenant()) -.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 -> -grantPermissionsAsync(topicName, role, actions) -.thenAccept(unused -> asyncResponse.resume(Response.noContent().build() +.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) +.thenCompose(__ -> internalCheckTopicExists(topicName)) +.thenCompose(unused1 -> grantPermissionsAsync(topicName, role, actions)) +.thenAccept(unused -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { Throwable realCause = FutureUtil.unwrapCompletionException(ex); log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicName, realCause); @@ -346,8 +348,9 @@ public class PersistentTopicsBase extends AdminResource { protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessForTenantAsync(namespaceName.getTenant()) -.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 -> -getPartitionedTopicMetadataAsync(topicName, true, false) +.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) +.thenCompose(__ -> internalCheckTopicExists(topicName)) +.thenCompose(unused1 -> getPartitionedTopicMetadataAsync(topicName, true, false) .thenCompose(metadata -> { int numPartitions = metadata.partitions; CompletableFuture future = CompletableFuture.completedFuture(null); @@ -360,7 +363,7 @@ public class PersistentTopicsBase extends AdminResource { } return future.thenComposeAsync(unused -> revokePermissionsAsync(topicName.toString(), role, false)) .thenAccept(unused -> asyncResponse.resume(Response.noContent().build())); -})) +
(pulsar) branch branch-3.0 updated: [improve][broker] do not grant permission for each partition to reduce unnecessary zk metadata (#18222)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 31d28efad11 [improve][broker] do not grant permission for each partition to reduce unnecessary zk metadata (#18222) 31d28efad11 is described below commit 31d28efad11017420ecc5010a317d2b0bdea8013 Author: ken <1647023...@qq.com> AuthorDate: Mon Jul 3 17:38:59 2023 +0800 [improve][broker] do not grant permission for each partition to reduce unnecessary zk metadata (#18222) Co-authored-by: fanjianye --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 17 +++-- .../pulsar/broker/admin/PersistentTopicsTest.java | 13 - .../client/api/AuthenticatedProducerConsumerTest.java | 5 - 3 files changed, 3 insertions(+), 32 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index d892c156839..ebc838756f9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -299,20 +299,9 @@ public class PersistentTopicsBase extends AdminResource { // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessForTenantAsync(namespaceName.getTenant()) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 -> - getPartitionedTopicMetadataAsync(topicName, true, false) - .thenCompose(metadata -> { - int numPartitions = metadata.partitions; - CompletableFuture future = CompletableFuture.completedFuture(null); - if (numPartitions > 0) { - for (int i = 0; i < numPartitions; i++) { - TopicName topicNamePartition = topicName.getPartition(i); - future = future.thenCompose(unused -> grantPermissionsAsync(topicNamePartition, role, - actions)); - } - } - return future.thenCompose(unused -> grantPermissionsAsync(topicName, role, actions)) - .thenAccept(unused -> asyncResponse.resume(Response.noContent().build())); - }))).exceptionally(ex -> { +grantPermissionsAsync(topicName, role, actions) +.thenAccept(unused -> asyncResponse.resume(Response.noContent().build() +.exceptionally(ex -> { Throwable realCause = FutureUtil.unwrapCompletionException(ex); log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicName, realCause); resumeAsyncResponseExceptionally(asyncResponse, realCause); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 2095582fd6f..23ea5838d56 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -942,19 +942,6 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Map> permissions = (Map>) responseCaptor.getValue(); Assert.assertEquals(permissions.get(role), expectActions); -TopicName topicName = TopicName.get(TopicDomain.persistent.value(), testTenant, testNamespace, -partitionedTopicName); -for (int i = 0; i < numPartitions; i++) { -TopicName partition = topicName.getPartition(i); -response = mock(AsyncResponse.class); -responseCaptor = ArgumentCaptor.forClass(Response.class); -persistentTopics.getPermissionsOnTopic(response, testTenant, testNamespace, -partition.getEncodedLocalName()); -verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); -Map> partitionPermissions = -(Map>) responseCaptor.getValue(); -Assert.assertEquals(partitionPermissions.get(role), expectActions); -} } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/cli
(pulsar) branch branch-3.0 updated: [improve][broker] checkTopicExists supports checking partitioned topic without index (#21701)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 8b8048c4b60 [improve][broker] checkTopicExists supports checking partitioned topic without index (#21701) 8b8048c4b60 is described below commit 8b8048c4b605650720878fa5a549f36376a3aa79 Author: Zixuan Liu AuthorDate: Tue Dec 19 12:00:54 2023 +0800 [improve][broker] checkTopicExists supports checking partitioned topic without index (#21701) Signed-off-by: Zixuan Liu --- .../pulsar/broker/namespace/NamespaceService.java | 66 +++--- .../broker/namespace/NamespaceServiceTest.java | 27 + 2 files changed, 59 insertions(+), 34 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 55d2d83aa63..9c6fbc8cf6a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1320,42 +1320,40 @@ public class NamespaceService implements AutoCloseable { } public CompletableFuture checkTopicExists(TopicName topic) { -if (topic.isPersistent()) { -if (topic.isPartitioned()) { -return pulsar.getBrokerService() - .fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName())) -.thenCompose(metadata -> { -// Allow creating the non-partitioned persistent topic that name includes `-partition-` -if (metadata.partitions == 0 -|| topic.getPartitionIndex() < metadata.partitions) { -return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic); -} -return CompletableFuture.completedFuture(false); -}); -} else { -return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic); -} +CompletableFuture future; +// If the topic is persistent and the name includes `-partition-`, find the topic from the managed/ledger. +if (topic.isPersistent() && topic.isPartitioned()) { +future = pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic); } else { -if (topic.isPartitioned()) { -final TopicName partitionedTopicName = TopicName.get(topic.getPartitionedTopicName()); -return pulsar.getBrokerService() - .fetchPartitionedTopicMetadataAsync(partitionedTopicName) -.thenApply((metadata) -> topic.getPartitionIndex() < metadata.partitions); -} else { -// only checks and don't do any topic creating and loading. -CompletableFuture> topicFuture = - pulsar.getBrokerService().getTopics().get(topic.toString()); -if (topicFuture == null) { -return CompletableFuture.completedFuture(false); -} else { -return topicFuture.thenApply(Optional::isPresent).exceptionally(throwable -> { -LOG.warn("[{}] topicFuture completed with exception when checkTopicExists, {}", -topic, throwable.getMessage()); -return false; -}); -} -} +future = CompletableFuture.completedFuture(false); } + +return future.thenCompose(found -> { +if (found != null && found) { +return CompletableFuture.completedFuture(true); +} + +return pulsar.getBrokerService() + .fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName())) +.thenCompose(metadata -> { +if (metadata.partitions > 0) { +return CompletableFuture.completedFuture(true); +} + +if (topic.isPersistent()) { +return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic); +} else { +// The non-partitioned non-persistent topic only exist in the broker topics. +CompletableFuture> nonPersistentTopicFuture = + pulsar.getBrokerService().getTopics().get(topic.toString()); +
(pulsar) branch branch-3.0 updated: [fix][admin] Fix can't delete tenant for v1 (#22550)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 5d0f1d6609e [fix][admin] Fix can't delete tenant for v1 (#22550) 5d0f1d6609e is described below commit 5d0f1d6609ee0c76177e8845300c9a20a2dc2172 Author: Jiwei Guo AuthorDate: Tue Apr 23 22:04:13 2024 +0800 [fix][admin] Fix can't delete tenant for v1 (#22550) --- .../pulsar/broker/resources/TopicResources.java| 2 +- .../pulsar/broker/auth/AuthorizationTest.java | 29 ++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java index 0963f25c3d3..413184764f5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java @@ -120,7 +120,7 @@ public class TopicResources { return store.exists(path) .thenCompose(exists -> { if (exists) { -return store.delete(path, Optional.empty()); +return store.deleteRecursive(path); } else { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java index 58cf4ee418e..7acd39d741d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java @@ -32,6 +32,7 @@ import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; @@ -55,12 +56,17 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest { @Override public void setup() throws Exception { conf.setClusterName("c1"); +conf.setSystemTopicEnabled(false); conf.setAuthenticationEnabled(true); +conf.setForceDeleteNamespaceAllowed(true); +conf.setForceDeleteTenantAllowed(true); conf.setAuthenticationProviders( Sets.newHashSet("org.apache.pulsar.broker.auth.MockAuthenticationProvider")); conf.setAuthorizationEnabled(true); conf.setAuthorizationAllowWildcardsMatching(true); conf.setSuperUserRoles(Sets.newHashSet("pulsar.super_user", "pass.pass")); + conf.setBrokerClientAuthenticationPlugin(MockAuthentication.class.getName()); +conf.setBrokerClientAuthenticationParameters("user:pass.pass"); internalSetup(); } @@ -69,6 +75,11 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest { pulsarAdminBuilder.authentication(new MockAuthentication("pass.pass")); } +@Override +protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) { +clientBuilder.authentication(new MockAuthentication("pass.pass")); +} + @AfterClass(alwaysRun = true) @Override public void cleanup() throws Exception { @@ -232,6 +243,24 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest { admin.namespaces().deleteNamespace("p1/c1/ns1"); admin.tenants().deleteTenant("p1"); + +admin.clusters().deleteCluster("c1"); +} + +@Test +public void testDeleteV1Tenant() throws Exception { +admin.clusters().createCluster("c1", ClusterData.builder().build()); +admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1"))); +waitForChange(); +admin.namespaces().createNamespace("p1/c1/ns1"); +waitForChange(); + + +String topic = "persistent://p1/c1/ns1/ds2"; +admin.topics().createNonPartitionedTopic(topic); + +admin.namespaces().deleteNamespace("p1/c1/ns1", true); +admin.tenants().deleteTenant("p1", true); admin.clusters().deleteCluster("c1"); }
(pulsar) branch master updated: [improve][pip] PIP-347: Add role field in consumer's stat (#22562)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new f3e52b568ec [improve][pip] PIP-347: Add role field in consumer's stat (#22562) f3e52b568ec is described below commit f3e52b568ec7e86e7582bdc425321fe172bc4deb Author: Wenzhi Feng <52550727+thetumb...@users.noreply.github.com> AuthorDate: Thu May 16 13:29:26 2024 +0800 [improve][pip] PIP-347: Add role field in consumer's stat (#22562) --- pip/pip-347.md | 2 +- .../org/apache/pulsar/broker/service/Consumer.java | 1 + .../stats/AuthenticatedConsumerStatsTest.java | 169 + .../pulsar/broker/stats/ConsumerStatsTest.java | 2 +- .../pulsar/common/policies/data/ConsumerStats.java | 3 + .../policies/data/stats/ConsumerStatsImpl.java | 3 + 6 files changed, 178 insertions(+), 2 deletions(-) diff --git a/pip/pip-347.md b/pip/pip-347.md index 5326fed3533..a5d5d76ae17 100644 --- a/pip/pip-347.md +++ b/pip/pip-347.md @@ -34,4 +34,4 @@ Fully compatible. Updated afterwards --> * Mailing List discussion thread: https://lists.apache.org/thread/p9y9r8pb7ygk8f0jd121c1121phvzd09 -* Mailing List voting thread: +* Mailing List voting thread: https://lists.apache.org/thread/sfv0vq498dnjx6k6zdrnn0cw8f22tz05 diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index fe9fbe6a400..c9f417c4bc4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -208,6 +208,7 @@ public class Consumer { stats = new ConsumerStatsImpl(); stats.setAddress(cnx.clientSourceAddressAndPort()); stats.consumerName = consumerName; +stats.appId = appId; stats.setConnectedSince(DateFormatter.format(connectedSince)); stats.setClientVersion(cnx.getClientVersion()); stats.metadata = this.metadata; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java new file mode 100644 index 000..e8cadb72e1e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; +import io.jsonwebtoken.Jwts; +import io.jsonwebtoken.SignatureAlgorithm; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.NoSuchAlgorithmException; +import java.security.PrivateKey; +import java.time.Duration; +import java.util.Base64; +import java.util.Date; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Properties; +import java.util.Set; + +public class AuthenticatedConsumerStatsTest extends ConsumerStatsTest{ +private final String ADMIN_TOKEN; +private final String TOKEN_PUBLIC_KEY; +private final KeyPair kp; + +AuthenticatedConsumerStatsTest() th
(pulsar) branch branch-3.2 updated: [fix][broker] Fix cursor should use latest ledger config (#22644)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 3de8358b22a [fix][broker] Fix cursor should use latest ledger config (#22644) 3de8358b22a is described below commit 3de8358b22aa48bb3fba5d301938609868791924 Author: Zixuan Liu AuthorDate: Fri May 10 10:37:44 2024 +0800 [fix][broker] Fix cursor should use latest ledger config (#22644) Signed-off-by: Zixuan Liu --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 59 +++--- .../mledger/impl/ManagedCursorMXBeanImpl.java | 3 +- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 8 +-- .../mledger/impl/NonDurableCursorImpl.java | 5 +- .../bookkeeper/mledger/impl/OpReadEntry.java | 3 +- .../bookkeeper/mledger/impl/RangeSetWrapper.java | 2 +- .../mledger/impl/ReadOnlyCursorImpl.java | 5 +- .../mledger/impl/ReadOnlyManagedLedgerImpl.java| 2 +- ...ManagedCursorIndividualDeletedMessagesTest.java | 3 +- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 7 ++- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 +- .../broker/service/BrokerBkEnsemblesTests.java | 8 +-- .../service/persistent/PersistentTopicTest.java| 25 + 13 files changed, 76 insertions(+), 56 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 7065af203da..911eca48bac 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -119,7 +119,6 @@ public class ManagedCursorImpl implements ManagedCursor { return 0; }; protected final BookKeeper bookkeeper; -protected final ManagedLedgerConfig config; protected final ManagedLedgerImpl ledger; private final String name; @@ -299,31 +298,30 @@ public class ManagedCursorImpl implements ManagedCursor { void operationFailed(ManagedLedgerException exception); } -ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName) { +ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerImpl ledger, String cursorName) { this.bookkeeper = bookkeeper; this.cursorProperties = Collections.emptyMap(); -this.config = config; this.ledger = ledger; this.name = cursorName; this.individualDeletedMessages = new RangeSetWrapper<>(positionRangeConverter, positionRangeReverseConverter, this); -if (config.isDeletionAtBatchIndexLevelEnabled()) { +if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { this.batchDeletedIndexes = new ConcurrentSkipListMap<>(); } else { this.batchDeletedIndexes = null; } -this.digestType = BookKeeper.DigestType.fromApiDigestType(config.getDigestType()); +this.digestType = BookKeeper.DigestType.fromApiDigestType(getConfig().getDigestType()); STATE_UPDATER.set(this, State.Uninitialized); PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.set(this, 0); PENDING_READ_OPS_UPDATER.set(this, 0); RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE); WAITING_READ_OP_UPDATER.set(this, null); -this.clock = config.getClock(); +this.clock = getConfig().getClock(); this.lastActive = this.clock.millis(); this.lastLedgerSwitchTimestamp = this.clock.millis(); -if (config.getThrottleMarkDelete() > 0.0) { -markDeleteLimiter = RateLimiter.create(config.getThrottleMarkDelete()); +if (getConfig().getThrottleMarkDelete() > 0.0) { +markDeleteLimiter = RateLimiter.create(getConfig().getThrottleMarkDelete()); } else { // Disable mark-delete rate limiter markDeleteLimiter = null; @@ -602,7 +600,7 @@ public class ManagedCursorImpl implements ManagedCursor { if (positionInfo.getIndividualDeletedMessagesCount() > 0) { recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList()); } -if (config.isDeletionAtBatchIndexLevelEnabled() +if (getConfig().isDeletionAtBatchIndexLevelEnabled() && positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) { recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList()); } @@ -611,7 +609,8 @@ public class ManagedCursorImpl implements ManagedCursor { }, null); }; try { -boo
(pulsar) branch branch-3.2 updated: [cleanup][ml] ManagedCursor clean up. (#22246)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 781a02b2085 [cleanup][ml] ManagedCursor clean up. (#22246) 781a02b2085 is described below commit 781a02b20859e61361f1d18c369c5d00d1b2f7fd Author: 道君 AuthorDate: Tue Mar 12 23:36:59 2024 +0800 [cleanup][ml] ManagedCursor clean up. (#22246) --- .../java/org/apache/bookkeeper/mledger/impl/EntryImpl.java| 7 ++- .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 11 +++ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index 6512399173f..80397931357 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -42,6 +42,7 @@ public final class EntryImpl extends AbstractCASReferenceCounted implements Entr private long timestamp; private long ledgerId; private long entryId; +private PositionImpl position; ByteBuf data; private Runnable onDeallocate; @@ -151,7 +152,10 @@ public final class EntryImpl extends AbstractCASReferenceCounted implements Entr @Override public PositionImpl getPosition() { -return new PositionImpl(ledgerId, entryId); +if (position == null) { +position = PositionImpl.get(ledgerId, entryId); +} +return position; } @Override @@ -197,6 +201,7 @@ public final class EntryImpl extends AbstractCASReferenceCounted implements Entr timestamp = -1; ledgerId = -1; entryId = -1; +position = null; recyclerHandle.recycle(this); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 0c8dedd6b21..7065af203da 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1506,10 +1506,7 @@ public class ManagedCursorImpl implements ManagedCursor { Set alreadyAcknowledgedPositions = new HashSet<>(); lock.readLock().lock(); try { -positions.stream() -.filter(position -> ((PositionImpl) position).compareTo(markDeletePosition) <= 0 -|| individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())) -.forEach(alreadyAcknowledgedPositions::add); + positions.stream().filter(this::isMessageDeleted).forEach(alreadyAcknowledgedPositions::add); } finally { lock.readLock().unlock(); } @@ -2281,8 +2278,7 @@ public class ManagedCursorImpl implements ManagedCursor { return; } -if (position.compareTo(markDeletePosition) <= 0 -|| individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())) { +if (isMessageDeleted(position)) { if (config.isDeletionAtBatchIndexLevelEnabled()) { BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position); if (bitSetRecyclable != null) { @@ -3517,8 +3513,7 @@ public class ManagedCursorImpl implements ManagedCursor { @Override public void trimDeletedEntries(List entries) { entries.removeIf(entry -> { -boolean isDeleted = markDeletePosition.compareTo(entry.getLedgerId(), entry.getEntryId()) >= 0 -|| individualDeletedMessages.contains(entry.getLedgerId(), entry.getEntryId()); +boolean isDeleted = isMessageDeleted(entry.getPosition()); if (isDeleted) { entry.release(); }
(pulsar) branch branch-3.2 updated: [fix][admin] Fix can't delete tenant for v1 (#22550)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new aa7f0676e5a [fix][admin] Fix can't delete tenant for v1 (#22550) aa7f0676e5a is described below commit aa7f0676e5a1b4b450da569fb70385523393d8e1 Author: Jiwei Guo AuthorDate: Tue Apr 23 22:04:13 2024 +0800 [fix][admin] Fix can't delete tenant for v1 (#22550) --- .../pulsar/broker/resources/TopicResources.java| 2 +- .../pulsar/broker/auth/AuthorizationTest.java | 28 ++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java index 0963f25c3d3..413184764f5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java @@ -120,7 +120,7 @@ public class TopicResources { return store.exists(path) .thenCompose(exists -> { if (exists) { -return store.delete(path, Optional.empty()); +return store.deleteRecursive(path); } else { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java index e9ad401b878..6c913d42908 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java @@ -33,6 +33,7 @@ import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; @@ -59,11 +60,15 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest { conf.setSystemTopicEnabled(false); conf.setForceDeleteNamespaceAllowed(true); conf.setAuthenticationEnabled(true); +conf.setForceDeleteNamespaceAllowed(true); +conf.setForceDeleteTenantAllowed(true); conf.setAuthenticationProviders( Sets.newHashSet("org.apache.pulsar.broker.auth.MockAuthenticationProvider")); conf.setAuthorizationEnabled(true); conf.setAuthorizationAllowWildcardsMatching(true); conf.setSuperUserRoles(Sets.newHashSet("pulsar.super_user", "pass.pass")); + conf.setBrokerClientAuthenticationPlugin(MockAuthentication.class.getName()); +conf.setBrokerClientAuthenticationParameters("user:pass.pass"); internalSetup(); } @@ -72,6 +77,11 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest { pulsarAdminBuilder.authentication(new MockAuthentication("pass.pass")); } +@Override +protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) { +clientBuilder.authentication(new MockAuthentication("pass.pass")); +} + @AfterClass(alwaysRun = true) @Override public void cleanup() throws Exception { @@ -237,6 +247,24 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest { admin.namespaces().deleteNamespace("p1/c1/ns1", true); admin.tenants().deleteTenant("p1"); + +admin.clusters().deleteCluster("c1"); +} + +@Test +public void testDeleteV1Tenant() throws Exception { +admin.clusters().createCluster("c1", ClusterData.builder().build()); +admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1"))); +waitForChange(); +admin.namespaces().createNamespace("p1/c1/ns1"); +waitForChange(); + + +String topic = "persistent://p1/c1/ns1/ds2"; +admin.topics().createNonPartitionedTopic(topic); + +admin.namespaces().deleteNamespace("p1/c1/ns1", true); +admin.tenants().deleteTenant("p1", true); admin.clusters().deleteCluster("c1"); }
(pulsar) branch master updated: [fix][misc] Correct the description of patternAutoDiscoveryPeriod (#22615)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 3b24b6e0b72 [fix][misc] Correct the description of patternAutoDiscoveryPeriod (#22615) 3b24b6e0b72 is described below commit 3b24b6e0b7250f531c86e5ee2635a9b23467419c Author: jito AuthorDate: Mon May 13 09:29:38 2024 +0900 [fix][misc] Correct the description of patternAutoDiscoveryPeriod (#22615) Signed-off-by: jitokim --- .../src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java | 5 +++-- .../apache/pulsar/client/impl/conf/ConsumerConfigurationData.java| 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 863432b478f..6f3c3be9727 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -464,7 +464,7 @@ public interface ConsumerBuilder extends Cloneable { ConsumerBuilder readCompacted(boolean readCompacted); /** - * Sets topic's auto-discovery period when using a pattern for topics consumer. + * Sets topic's auto-discovery period when using a pattern for topic's consumer. * The period is in minutes, and the default and minimum values are 1 minute. * * @param periodInMinutes @@ -476,7 +476,8 @@ public interface ConsumerBuilder extends Cloneable { /** - * Sets topic's auto-discovery period when using a pattern for topics consumer. + * Sets topic's auto-discovery period when using a pattern for topic's consumer. + * The default value of period is 1 minute, with a minimum of 1 second. * * @param interval *the amount of delay between checks for diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 3ae0e977d13..18529276c9c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -310,7 +310,7 @@ public class ConsumerConfigurationData implements Serializable, Cloneable { name = "patternAutoDiscoveryPeriod", value = "Topic auto discovery period when using a pattern for topic's consumer.\n" + "\n" -+ "The default and minimum value is 1 minute." ++ "The default value is 1 minute, with a minimum of 1 second." ) private int patternAutoDiscoveryPeriod = 60;
(pulsar) branch branch-3.2 updated: [improve] [broker] Add additionalSystemCursorNames ignore list for TTL check (#22614)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new e98370e21cb [improve] [broker] Add additionalSystemCursorNames ignore list for TTL check (#22614) e98370e21cb is described below commit e98370e21cb6a6ddf0b5ef9c8123046c7e5b8e3d Author: Hang Chen AuthorDate: Thu May 9 20:45:56 2024 +0800 [improve] [broker] Add additionalSystemCursorNames ignore list for TTL check (#22614) --- conf/broker.conf | 4 + conf/standalone.conf | 4 + .../apache/pulsar/broker/ServiceConfiguration.java | 7 ++ .../broker/service/persistent/PersistentTopic.java | 7 +- .../pulsar/broker/service/MessageTTLTest.java | 96 ++ 5 files changed, 117 insertions(+), 1 deletion(-) diff --git a/conf/broker.conf b/conf/broker.conf index e0ebbe3043a..c5beda206a4 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -180,6 +180,10 @@ backlogQuotaDefaultRetentionPolicy=producer_request_hold # Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0) ttlDurationDefaultInSeconds=0 +# Additional system subscriptions that will be ignored by ttl check. The cursor names are comma separated. +# Default is empty. +# additionalSystemCursorNames= + # Enable topic auto creation if new producer or consumer connected (disable auto creation with value false) allowAutoTopicCreation=true diff --git a/conf/standalone.conf b/conf/standalone.conf index 5eb9fadcf19..5ca0c683d74 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -121,6 +121,10 @@ backlogQuotaDefaultLimitSecond=-1 # Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0) ttlDurationDefaultInSeconds=0 +# Additional system subscriptions that will be ignored by ttl check. The cursor names are comma separated. +# Default is empty. +# additionalSystemCursorNames= + # Enable the deletion of inactive topics. This parameter need to cooperate with the allowAutoTopicCreation parameter. # If brokerDeleteInactiveTopicsEnabled is set to true, we should ensure that allowAutoTopicCreation is also set to true. brokerDeleteInactiveTopicsEnabled=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index f53eb7e183f..10ffced0321 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -646,6 +646,13 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int ttlDurationDefaultInSeconds = 0; +@FieldContext( +category = CATEGORY_POLICIES, +doc = "Additional system subscriptions that will be ignored by ttl check. " ++ "The cursor names are comma separated. Default is empty." +) +private Set additionalSystemCursorNames = new TreeSet<>(); + @FieldContext( category = CATEGORY_POLICIES, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index b4f295dbd97..472387a0a9b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -39,6 +39,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -272,6 +273,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal private final ExecutorService orderedExecutor; private volatile CloseFutures closeFutures; +private Set additionalSystemCursorNames = new TreeSet<>(); /*** * We use 3 futures to prevent a new closing if there is an in-progress deletion or closing. We make Pulsar return @@ -384,6 +386,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } else { shadowSourceTopic = null; } +additionalSystemCursorNames = brokerService.pulsar().getConfiguration().getAdditionalSystemCursorNames(); } @Override @@ -1888,7 +1891,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
(pulsar) branch master updated: [improve] [broker] Add additionalSystemCursorNames ignore list for TTL check (#22614)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new bed032e714a [improve] [broker] Add additionalSystemCursorNames ignore list for TTL check (#22614) bed032e714a is described below commit bed032e714aff9f5d2594bdc80a3e7888e53b1bf Author: Hang Chen AuthorDate: Thu May 9 20:45:56 2024 +0800 [improve] [broker] Add additionalSystemCursorNames ignore list for TTL check (#22614) --- conf/broker.conf | 4 + conf/standalone.conf | 4 + .../apache/pulsar/broker/ServiceConfiguration.java | 7 ++ .../broker/service/persistent/PersistentTopic.java | 7 +- .../pulsar/broker/service/MessageTTLTest.java | 96 ++ 5 files changed, 117 insertions(+), 1 deletion(-) diff --git a/conf/broker.conf b/conf/broker.conf index 1b51ff47551..1ef68a0395c 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -180,6 +180,10 @@ backlogQuotaDefaultRetentionPolicy=producer_request_hold # Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0) ttlDurationDefaultInSeconds=0 +# Additional system subscriptions that will be ignored by ttl check. The cursor names are comma separated. +# Default is empty. +# additionalSystemCursorNames= + # Enable topic auto creation if new producer or consumer connected (disable auto creation with value false) allowAutoTopicCreation=true diff --git a/conf/standalone.conf b/conf/standalone.conf index 51035235d4d..a8615b70293 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -121,6 +121,10 @@ backlogQuotaDefaultLimitSecond=-1 # Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0) ttlDurationDefaultInSeconds=0 +# Additional system subscriptions that will be ignored by ttl check. The cursor names are comma separated. +# Default is empty. +# additionalSystemCursorNames= + # Enable the deletion of inactive topics. This parameter need to cooperate with the allowAutoTopicCreation parameter. # If brokerDeleteInactiveTopicsEnabled is set to true, we should ensure that allowAutoTopicCreation is also set to true. brokerDeleteInactiveTopicsEnabled=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index a9d170ea5de..9efe1856509 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -652,6 +652,13 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int ttlDurationDefaultInSeconds = 0; +@FieldContext( +category = CATEGORY_POLICIES, +doc = "Additional system subscriptions that will be ignored by ttl check. " ++ "The cursor names are comma separated. Default is empty." +) +private Set additionalSystemCursorNames = new TreeSet<>(); + @FieldContext( category = CATEGORY_POLICIES, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 7228bdeb2d3..28bc27f7961 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -40,6 +40,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -279,6 +280,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal private final ExecutorService orderedExecutor; private volatile CloseFutures closeFutures; +private Set additionalSystemCursorNames = new TreeSet<>(); @Getter private final PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics(); @@ -414,6 +416,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } else { shadowSourceTopic = null; } +additionalSystemCursorNames = brokerService.pulsar().getConfiguration().getAdditionalSystemCursorNames(); } @Override @@ -1934,7 +1937,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal int messageTtlInSeconds = topicPolic
(pulsar) branch master updated: [fix][ml] Remove duplicated field initialization of ML (#22676)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 88feb874bb3 [fix][ml] Remove duplicated field initialization of ML (#22676) 88feb874bb3 is described below commit 88feb874bb3ad58a74b3d40d931b2aa7380dc7e1 Author: 道君 AuthorDate: Thu May 9 08:53:59 2024 +0800 [fix][ml] Remove duplicated field initialization of ML (#22676) --- .../java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index e5e163127f7..b12346cadc9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -365,9 +365,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { this.mlOwnershipChecker = mlOwnershipChecker; this.propertiesMap = new ConcurrentHashMap<>(); this.inactiveLedgerRollOverTimeMs = config.getInactiveLedgerRollOverTimeMs(); -if (config.getManagedLedgerInterceptor() != null) { -this.managedLedgerInterceptor = config.getManagedLedgerInterceptor(); -} this.minBacklogCursorsForCaching = config.getMinimumBacklogCursorsForCaching(); this.minBacklogEntriesForCaching = config.getMinimumBacklogEntriesForCaching(); this.maxBacklogBetweenCursorsForCaching = config.getMaxBacklogBetweenCursorsForCaching();
(pulsar) branch branch-3.2 updated: Revert "[fix][sec] Upgrade Debezium oracle connector version to avoid… (#22668)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new f7984d74d19 Revert "[fix][sec] Upgrade Debezium oracle connector version to avoid… (#22668) f7984d74d19 is described below commit f7984d74d19e50d31f7ea2abacef8430e4cf95bd Author: Lari Hotari AuthorDate: Wed May 8 13:43:24 2024 +0300 Revert "[fix][sec] Upgrade Debezium oracle connector version to avoid… (#22668) --- pom.xml | 1 - pulsar-io/debezium/oracle/pom.xml | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index bd231f92a4d..c817c8e1858 100644 --- a/pom.xml +++ b/pom.xml @@ -198,7 +198,6 @@ flexible messaging model and an intuitive client API. 1.2.4 8.12.1 1.9.7.Final -2.2.0.Final 42.5.0 8.0.30 diff --git a/pulsar-io/debezium/oracle/pom.xml b/pulsar-io/debezium/oracle/pom.xml index 214e9c15c3a..1018d5f9573 100644 --- a/pulsar-io/debezium/oracle/pom.xml +++ b/pulsar-io/debezium/oracle/pom.xml @@ -48,8 +48,7 @@ io.debezium debezium-connector-oracle - ${debezium.oracle.version} - runtime + ${debezium.version}
(pulsar) branch master updated: Revert "[fix][sec] Upgrade Debezium oracle connector version to avoid… (#22668)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new ca44b9bc7c4 Revert "[fix][sec] Upgrade Debezium oracle connector version to avoid… (#22668) ca44b9bc7c4 is described below commit ca44b9bc7c48eca59692744399872e1f14f4fe6f Author: Lari Hotari AuthorDate: Wed May 8 13:43:24 2024 +0300 Revert "[fix][sec] Upgrade Debezium oracle connector version to avoid… (#22668) --- pom.xml | 1 - pulsar-io/debezium/oracle/pom.xml | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index cec3b3c60db..c2f563eb60e 100644 --- a/pom.xml +++ b/pom.xml @@ -199,7 +199,6 @@ flexible messaging model and an intuitive client API. 1.2.4 8.12.1 1.9.7.Final -2.2.0.Final 42.5.0 8.0.30 diff --git a/pulsar-io/debezium/oracle/pom.xml b/pulsar-io/debezium/oracle/pom.xml index b22a5785dfb..c69640ecff7 100644 --- a/pulsar-io/debezium/oracle/pom.xml +++ b/pulsar-io/debezium/oracle/pom.xml @@ -48,8 +48,7 @@ io.debezium debezium-connector-oracle - ${debezium.oracle.version} - runtime + ${debezium.version}
(pulsar) branch branch-3.2 updated: [fix] Fix Reader can be stuck from transaction aborted messages. (#22610)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 763f90f6dd3 [fix] Fix Reader can be stuck from transaction aborted messages. (#22610) 763f90f6dd3 is described below commit 763f90f6dd317819d93990348bfc8519029c727d Author: 道君 AuthorDate: Tue May 7 20:45:16 2024 +0800 [fix] Fix Reader can be stuck from transaction aborted messages. (#22610) --- .../mledger/util/ManagedLedgerImplUtils.java | 17 ++ .../broker/service/persistent/PersistentTopic.java | 24 .../pulsar/broker/transaction/TransactionTest.java | 68 ++ .../buffer/TopicTransactionBufferTest.java | 36 4 files changed, 110 insertions(+), 35 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java index cd8671b0e62..01de115290a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java @@ -38,11 +38,7 @@ public class ManagedLedgerImplUtils { final Predicate predicate, final PositionImpl startPosition) { CompletableFuture future = new CompletableFuture<>(); -if (!ledger.isValidPosition(startPosition)) { -future.complete(startPosition); -} else { -internalAsyncReverseFindPositionOneByOne(ledger, predicate, startPosition, future); -} +internalAsyncReverseFindPositionOneByOne(ledger, predicate, startPosition, future); return future; } @@ -50,6 +46,10 @@ public class ManagedLedgerImplUtils { final Predicate predicate, final PositionImpl position, final CompletableFuture future) { +if (!ledger.isValidPosition(position)) { +future.complete(position); +return; +} ledger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryComplete(Entry entry, Object ctx) { @@ -60,12 +60,7 @@ public class ManagedLedgerImplUtils { return; } PositionImpl previousPosition = ledger.getPreviousPosition((PositionImpl) position); -if (!ledger.isValidPosition(previousPosition)) { -future.complete(previousPosition); -} else { -internalAsyncReverseFindPositionOneByOne(ledger, predicate, -ledger.getPreviousPosition((PositionImpl) position), future); -} +internalAsyncReverseFindPositionOneByOne(ledger, predicate, previousPosition, future); } catch (Exception e) { future.completeExceptionally(e); } finally { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 60eb700fc06..fa731b860f7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3561,18 +3561,18 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Override public CompletableFuture getLastDispatchablePosition() { -PositionImpl maxReadPosition = getMaxReadPosition(); -// If `maxReadPosition` is not equal to `LastPosition`. It means that there are uncommitted transactions. -// so return `maxRedPosition` directly. -if (maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0) { -return CompletableFuture.completedFuture(maxReadPosition); -} else { -return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> { -MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer()); -// If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer -return !Markers.isServerOnlyMarker(md); -}, maxReadPosition); -} +return ManagedLedgerImplUtils.asyncGetLastVali
(pulsar) branch branch-3.2 updated: [fix][broker] avoid offload system topic (#22497)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new c3c17dee756 [fix][broker] avoid offload system topic (#22497) c3c17dee756 is described below commit c3c17dee7567d0a182affb1991e1e35098689d9b Author: Qiang Zhao AuthorDate: Wed May 8 13:10:49 2024 +0800 [fix][broker] avoid offload system topic (#22497) Co-authored-by: 道君 --- .../pulsar/broker/service/BrokerService.java | 8 +- .../pulsar/broker/service/BrokerServiceTest.java | 94 ++ 2 files changed, 101 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 032d4dd9369..60d56c0d908 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1906,7 +1906,13 @@ public class BrokerService implements Closeable { topicLevelOffloadPolicies, OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)), getPulsar().getConfig().getProperties()); -if (NamespaceService.isSystemServiceNamespace(namespace.toString())) { +if (NamespaceService.isSystemServiceNamespace(namespace.toString()) +|| SystemTopicNames.isSystemTopic(topicName)) { +/* + Avoid setting broker internal system topics using off-loader because some of them are the + preconditions of other topics. The slow replying log speed will cause a delay in all the topic + loading.(timeout) + */ managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE); } else { if (topicLevelOffloadPolicies != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index fcf11fad708..ab0b8f813ea 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -67,12 +67,15 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.commons.lang3.StringUtils; +import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; @@ -111,6 +114,9 @@ import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.LocalPolicies; +import org.apache.pulsar.common.policies.data.OffloadPolicies; +import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; +import org.apache.pulsar.common.policies.data.OffloadedReadPriority; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; @@ -1772,4 +1778,92 @@ public class BrokerServiceTest extends BrokerTestBase { fail("Unsubscribe failed"); } } + + +@Test +public void testOffloadConfShouldNotAppliedForSystemTopic() throws PulsarAdminException { +final String driver = "aws-s3"; +final String region = "test-region"; +final String bucket = "test-bucket"; +final String role = "test-role"; +final String roleSessionName = "test-role-session-name"; +final String credentialId = "test-credential-id"; +final String credentialSecret = "test-credential-secret"; +final String endPoint = "test-endpoint"; +final Integer maxBlockSizeInBytes = 5; +final Integer readBufferSizeInBytes = 2; +final Long offloadThresholdInBytes = 10L; +final Long offloadThresholdInSeconds = 1000L;
(pulsar) branch branch-3.2 updated: [improve][ws] Add memory limit configuration for Pulsar client used in Websocket proxy (#22666)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new e32cdfb113b [improve][ws] Add memory limit configuration for Pulsar client used in Websocket proxy (#22666) e32cdfb113b is described below commit e32cdfb113b1693a2420e39ab40f985b59a44899 Author: Lari Hotari AuthorDate: Wed May 8 06:56:35 2024 +0300 [improve][ws] Add memory limit configuration for Pulsar client used in Websocket proxy (#22666) --- conf/broker.conf | 3 +++ conf/standalone.conf | 3 +++ conf/websocket.conf| 3 +++ .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java | 7 +++ .../main/java/org/apache/pulsar/websocket/WebSocketService.java| 3 ++- .../pulsar/websocket/service/WebSocketProxyConfiguration.java | 3 +++ 6 files changed, 21 insertions(+), 1 deletion(-) diff --git a/conf/broker.conf b/conf/broker.conf index dd0f3e49e1f..e0ebbe3043a 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1539,6 +1539,9 @@ webSocketNumServiceThreads= # Number of connections per Broker in Pulsar Client used in WebSocket proxy webSocketConnectionsPerBroker= +# Memory limit in MBs for direct memory in Pulsar Client used in WebSocket proxy +webSocketPulsarClientMemoryLimitInMB=0 + # Time in milliseconds that idle WebSocket session times out webSocketSessionIdleTimeoutMillis=30 diff --git a/conf/standalone.conf b/conf/standalone.conf index 316143ab49d..5eb9fadcf19 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -967,6 +967,9 @@ webSocketNumIoThreads=8 # Number of connections per Broker in Pulsar Client used in WebSocket proxy webSocketConnectionsPerBroker=8 +# Memory limit in MBs for direct memory in Pulsar Client used in WebSocket proxy +webSocketPulsarClientMemoryLimitInMB=0 + # Time in milliseconds that idle WebSocket session times out webSocketSessionIdleTimeoutMillis=30 diff --git a/conf/websocket.conf b/conf/websocket.conf index 9051f3b590c..91f7f7d4c23 100644 --- a/conf/websocket.conf +++ b/conf/websocket.conf @@ -71,6 +71,9 @@ numHttpServerThreads= # Number of connections per Broker in Pulsar Client used in WebSocket proxy webSocketConnectionsPerBroker= +# Memory limit in MBs for direct memory in Pulsar Client used in WebSocket proxy +webSocketPulsarClientMemoryLimitInMB=0 + # Time in milliseconds that idle WebSocket session times out webSocketSessionIdleTimeoutMillis=30 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index d1f2e9b585f..f53eb7e183f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2892,6 +2892,13 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Number of connections per Broker in Pulsar Client used in WebSocket proxy" ) private int webSocketConnectionsPerBroker = Runtime.getRuntime().availableProcessors(); + +@FieldContext( +category = CATEGORY_WEBSOCKET, +doc = "Memory limit in MBs for direct memory in Pulsar Client used in WebSocket proxy" +) +private int webSocketPulsarClientMemoryLimitInMB = 0; + @FieldContext( category = CATEGORY_WEBSOCKET, doc = "Time in milliseconds that idle WebSocket session times out" diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java index 66b2a0075ec..889f4431cc3 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java @@ -195,7 +195,8 @@ public class WebSocketService implements Closeable { private PulsarClient createClientInstance(ClusterData clusterData) throws IOException { ClientBuilder clientBuilder = PulsarClient.builder() // -.memoryLimit(0, SizeUnit.BYTES) + .memoryLimit(SizeUnit.MEGA_BYTES.toBytes(config.getWebSocketPulsarClientMemoryLimitInMB()), +SizeUnit.BYTES) .statsInterval(0, TimeUnit.SECONDS) // .enableTls(config.isTlsEnabled()) // .allowTlsInsecureConnection(config.isTlsAllowInsecureConnection()) // diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/a
(pulsar) branch branch-3.2 updated: [fix][broker] Disable system topic message deduplication (#22582)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new b6f464ea0a1 [fix][broker] Disable system topic message deduplication (#22582) b6f464ea0a1 is described below commit b6f464ea0a17786fb857aac5111dcc394cba8f56 Author: Qiang Zhao AuthorDate: Wed May 8 10:53:53 2024 +0800 [fix][broker] Disable system topic message deduplication (#22582) --- .../org/apache/pulsar/broker/service/Topic.java| 10 +++ .../service/persistent/MessageDeduplication.java | 6 +--- .../broker/service/persistent/PersistentTopic.java | 9 +++--- .../broker/service/persistent/SystemTopic.java | 16 +++ .../service/persistent/MessageDuplicationTest.java | 32 ++ 5 files changed, 64 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 1da8cfce4ee..c2eefcd18e3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -213,6 +213,16 @@ public interface Topic { void checkCursorsToCacheEntries(); +/** + * Indicate if the current topic enabled server side deduplication. + * This is a dynamic configuration, user may update it by namespace/topic policies. + * + * @return whether enabled server side deduplication + */ +default boolean isDeduplicationEnabled() { +return false; +} + void checkDeduplicationSnapshot(); void checkMessageExpiry(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index e508661364d..ab3b799093b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -217,7 +217,7 @@ public class MessageDeduplication { * returning a future to track the completion of the task */ public CompletableFuture checkStatus() { -boolean shouldBeEnabled = isDeduplicationEnabled(); +boolean shouldBeEnabled = topic.isDeduplicationEnabled(); synchronized (this) { if (status == Status.Recovering || status == Status.Removing) { // If there's already a transition happening, check later for status @@ -472,10 +472,6 @@ public class MessageDeduplication { }, null); } -private boolean isDeduplicationEnabled() { -return topic.getHierarchyTopicPolicies().getDeduplicationEnabled().get(); -} - /** * Topic will call this method whenever a producer connects. */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index e99bd1425f4..60eb700fc06 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2146,10 +2146,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return future; } -public boolean isDeduplicationEnabled() { -return messageDeduplication.isEnabled(); -} - @Override public int getNumberOfConsumers() { int count = 0; @@ -4080,6 +4076,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return ledger.isMigrated(); } +public boolean isDeduplicationEnabled() { +return getHierarchyTopicPolicies().getDeduplicationEnabled().get(); +} + public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID, String subName) { return this.subscriptions.get(subName).getTransactionInPendingAckStats(txnID); } @@ -4104,4 +4104,5 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal public Optional getShadowSourceTopic() { return Optional.ofNullable(shadowSourceTopic); } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java index 720ae3c5189..f2cec2138a3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java @@ -80,6 +80,22 @@ public class
(pulsar) branch branch-3.2 updated: [fix][fn]make sure the classloader for ContextImpl is `functionClassLoader` in different runtimes (#22501)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 2c92ae31722 [fix][fn]make sure the classloader for ContextImpl is `functionClassLoader` in different runtimes (#22501) 2c92ae31722 is described below commit 2c92ae317222ac3e434a497aa458792f88debe75 Author: Rui Fu AuthorDate: Wed May 1 13:18:05 2024 +0800 [fix][fn]make sure the classloader for ContextImpl is `functionClassLoader` in different runtimes (#22501) --- .../apache/pulsar/functions/instance/JavaInstanceRunnable.java| 8 +++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 21f125d3497..f1b9af00f9d 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -283,13 +283,19 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { Logger instanceLog = LoggerFactory.getILoggerFactory().getLogger( "function-" + instanceConfig.getFunctionDetails().getName()); Thread currentThread = Thread.currentThread(); +ClassLoader clsLoader = currentThread.getContextClassLoader(); Consumer fatalHandler = throwable -> { this.deathException = throwable; currentThread.interrupt(); }; -return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider, +try { +Thread.currentThread().setContextClassLoader(functionClassLoader); +return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider, collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager, pulsarAdmin, clientBuilder, fatalHandler); +} finally { +Thread.currentThread().setContextClassLoader(clsLoader); +} } public interface AsyncResultConsumer {
(pulsar) branch branch-3.2 updated: [fix][test] Clear MockedPulsarServiceBaseTest fields to prevent test runtime memory leak (#22659)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new ebbdad1e6ab [fix][test] Clear MockedPulsarServiceBaseTest fields to prevent test runtime memory leak (#22659) ebbdad1e6ab is described below commit ebbdad1e6abfe0233ddc715a49f1facccd6991f8 Author: Lari Hotari AuthorDate: Mon May 6 21:48:47 2024 +0300 [fix][test] Clear MockedPulsarServiceBaseTest fields to prevent test runtime memory leak (#22659) --- .../org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java | 7 +++ 1 file changed, 7 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index bd08ced1e03..248bd0e720e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -275,15 +275,22 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport { } if (brokerGateway != null) { brokerGateway.close(); +brokerGateway = null; } if (pulsarTestContext != null) { pulsarTestContext.close(); pulsarTestContext = null; } + resetConfig(); callCloseables(closeables); closeables.clear(); onCleanup(); + +// clear fields to avoid test runtime memory leak, pulsarTestContext already handles closing of these instances +pulsar = null; +mockZooKeeper = null; +mockZooKeeperGlobal = null; } protected void closeAdmin() {
(pulsar) branch branch-3.2 updated: [fix][sec] Upgrade Debezium oracle connector version to avoid CVE-2023-4586 (#22641)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 412a02a4157 [fix][sec] Upgrade Debezium oracle connector version to avoid CVE-2023-4586 (#22641) 412a02a4157 is described below commit 412a02a4157a1ce0f16f8a2c6e119913352cba80 Author: Nikhil Erigila <60037808+nikhilerigil...@users.noreply.github.com> AuthorDate: Sat May 4 02:00:28 2024 +0530 [fix][sec] Upgrade Debezium oracle connector version to avoid CVE-2023-4586 (#22641) --- pom.xml | 1 + pulsar-io/debezium/oracle/pom.xml | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index c817c8e1858..bd231f92a4d 100644 --- a/pom.xml +++ b/pom.xml @@ -198,6 +198,7 @@ flexible messaging model and an intuitive client API. 1.2.4 8.12.1 1.9.7.Final +2.2.0.Final 42.5.0 8.0.30 diff --git a/pulsar-io/debezium/oracle/pom.xml b/pulsar-io/debezium/oracle/pom.xml index 1018d5f9573..214e9c15c3a 100644 --- a/pulsar-io/debezium/oracle/pom.xml +++ b/pulsar-io/debezium/oracle/pom.xml @@ -48,7 +48,8 @@ io.debezium debezium-connector-oracle - ${debezium.version} + ${debezium.oracle.version} + runtime
(pulsar) branch branch-3.2 updated: [fix][sec] Upgrade elasticsearch-java version to avoid CVE-2023-4043 (#22640)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new e5af8ef5ad3 [fix][sec] Upgrade elasticsearch-java version to avoid CVE-2023-4043 (#22640) e5af8ef5ad3 is described below commit e5af8ef5ad34c94b7e18a311055fc25e766ea646 Author: Nikhil Erigila <60037808+nikhilerigil...@users.noreply.github.com> AuthorDate: Sat May 4 02:51:48 2024 +0530 [fix][sec] Upgrade elasticsearch-java version to avoid CVE-2023-4043 (#22640) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 8c98cacfbb6..c817c8e1858 100644 --- a/pom.xml +++ b/pom.xml @@ -196,7 +196,7 @@ flexible messaging model and an intuitive client API. 3.3.5 2.4.10 1.2.4 -8.5.2 +8.12.1 1.9.7.Final 42.5.0 8.0.30
(pulsar) branch branch-3.2 updated: [fix][sec] Upgrade aws-sdk.version to avoid CVE-2024-21634 (#22633)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 40eb3914651 [fix][sec] Upgrade aws-sdk.version to avoid CVE-2024-21634 (#22633) 40eb3914651 is described below commit 40eb39146513c0f9f74142b5c12e9c53e74f318d Author: Nikhil Erigila <60037808+nikhilerigil...@users.noreply.github.com> AuthorDate: Thu May 2 18:58:02 2024 +0530 [fix][sec] Upgrade aws-sdk.version to avoid CVE-2024-21634 (#22633) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 8aa8bf36c98..8c98cacfbb6 100644 --- a/pom.xml +++ b/pom.xml @@ -182,7 +182,7 @@ flexible messaging model and an intuitive client API. 4.5.0 3.4.0 5.18.0 -1.12.262 +1.12.638 1.11.3 2.10.10 2.6.0
(pulsar) branch branch-3.2 updated: [fix] [client] Fix Consumer should return configured batch receive max messages (#22619)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new bc0c4a41b33 [fix] [client] Fix Consumer should return configured batch receive max messages (#22619) bc0c4a41b33 is described below commit bc0c4a41b33bae1344e19555bd5b8fd268997a5c Author: Rajan Dhabalia AuthorDate: Thu May 2 16:57:49 2024 -0700 [fix] [client] Fix Consumer should return configured batch receive max messages (#22619) --- .../client/api/ConsumerBatchReceiveTest.java | 8 +++--- .../client/api/SimpleProducerConsumerTest.java | 29 ++ .../pulsar/client/impl/ConsumerBuilderImpl.java| 4 +++ 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java index d54b1c99e3e..974d25aad64 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java @@ -112,7 +112,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { // Number of message limitation exceed receiverQueue size { BatchReceivePolicy.builder() -.maxNumMessages(70) +.maxNumMessages(50) .build(), true, 50, false }, // Number of message limitation exceed receiverQueue size and timeout limitation @@ -147,7 +147,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { // Number of message limitation exceed receiverQueue size { BatchReceivePolicy.builder() -.maxNumMessages(70) +.maxNumMessages(50) .build(), false, 50, false }, // Number of message limitation exceed receiverQueue size and timeout limitation @@ -248,7 +248,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { // Number of message limitation exceed receiverQueue size { BatchReceivePolicy.builder() -.maxNumMessages(70) +.maxNumMessages(50) .build(), true, 50, true }, // Number of message limitation exceed receiverQueue size and timeout limitation @@ -283,7 +283,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { // Number of message limitation exceed receiverQueue size { BatchReceivePolicy.builder() -.maxNumMessages(70) +.maxNumMessages(50) .build(), false, 50, true }, // Number of message limitation exceed receiverQueue size and timeout limitation diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 691f501777e..e9bb86fa33b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -4825,6 +4825,35 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { admin.topics().delete(topic, false); } +/** + * It verifies that consumer receives configured number of messages into the batch. + * @throws Exception + */ +@Test +public void testBatchReceiveWithMaxBatchSize() throws Exception { +int maxBatchSize = 100; +final int internalQueueSize = 10; +final int maxBytes = 200; +final int timeOutInSeconds = 900; +final String topic = "persistent://my-property/my-ns/testBatchReceive"; +BatchReceivePolicy batchReceivePolicy = BatchReceivePolicy.builder().maxNumBytes(maxBytes) +.maxNumMessages(maxBatchSize).timeout(timeOutInSeconds, TimeUnit.SECONDS).build(); +@Cleanup +Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic) +.subscriptionName("my-subscriber-name") +.receiverQueueSize(internalQueueSize) +.batchReceivePolicy(batchReceivePolicy).subscribe(); +@Cleanup +Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create(); + +final i
(pulsar) branch branch-3.2 updated: [fix][broker] Avoid being stuck when closing the broker with extensible load manager (#22573)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 892151bfcc2 [fix][broker] Avoid being stuck when closing the broker with extensible load manager (#22573) 892151bfcc2 is described below commit 892151bfcc2b64b1ee0a9f05a182a488d1554ef5 Author: Yunze Xu AuthorDate: Fri Apr 26 21:30:15 2024 +0800 [fix][broker] Avoid being stuck when closing the broker with extensible load manager (#22573) --- .../org/apache/pulsar/broker/PulsarService.java| 3 + .../store/TableViewLoadDataStoreImpl.java | 6 +- .../pulsar/broker/service/BrokerService.java | 11 +++ .../extensions/ExtensibleLoadManagerCloseTest.java | 107 + 4 files changed, 122 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 499a981259f..bf266d44d83 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -431,6 +431,9 @@ public class PulsarService implements AutoCloseable, ShutdownService { return closeFuture; } LOG.info("Closing PulsarService"); +if (brokerService != null) { +brokerService.unloadNamespaceBundlesGracefully(); +} state = State.Closing; // close the service in reverse order v.s. in which they are started diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java index d916e917162..81cf33b4a55 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java @@ -161,12 +161,8 @@ public class TableViewLoadDataStoreImpl implements LoadDataStore { } private void validateProducer() { -if (producer == null || !producer.isConnected()) { +if (producer == null) { try { -if (producer != null) { -producer.close(); -} -producer = null; startProducer(); log.info("Restarted producer on {}", topic); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index ea55a43c7f0..032d4dd9369 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -288,6 +288,7 @@ public class BrokerService implements Closeable { private Set brokerEntryPayloadProcessors; private final TopicEventsDispatcher topicEventsDispatcher = new TopicEventsDispatcher(); +private volatile boolean unloaded = false; public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception { this.pulsar = pulsar; @@ -869,9 +870,13 @@ public class BrokerService implements Closeable { } public void unloadNamespaceBundlesGracefully(int maxConcurrentUnload, boolean closeWithoutWaitingClientDisconnect) { +if (unloaded) { +return; +} try { log.info("Unloading namespace-bundles..."); // make broker-node unavailable from the cluster +long disableBrokerStartTime = System.nanoTime(); if (pulsar.getLoadManager() != null && pulsar.getLoadManager().get() != null) { try { pulsar.getLoadManager().get().disableBroker(); @@ -880,6 +885,10 @@ public class BrokerService implements Closeable { // still continue and release bundle ownership as broker's registration node doesn't exist. } } +double disableBrokerTimeSeconds = +TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - disableBrokerStartTime)) +/ 1000.0; +log.info("Disable broker in load manager completed in {} seconds", disableBrokerTimeSeconds); // unload all namespace-bundles gracefully long closeTopicsStartTime = System.nanoTime(); @@ -909,6 +918,8 @@ public class BrokerService implements Closeable { } } catch (Exception e) {
(pulsar) branch branch-3.2 updated: [fix][io] Fix es index creation (#22654)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new b86ebaa05d6 [fix][io] Fix es index creation (#22654) b86ebaa05d6 is described below commit b86ebaa05d62a8c79e3f8f2bc91e72c40cc23e4d Author: Zixuan Liu AuthorDate: Mon May 6 20:35:51 2024 +0800 [fix][io] Fix es index creation (#22654) Signed-off-by: Zixuan Liu --- .../client/elastic/ElasticSearchJavaRestClient.java | 4 ++-- .../pulsar/io/elasticsearch/ElasticSearchSinkTests.java | 13 + 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java index afda5ba0e74..133daa8cd6a 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java @@ -144,7 +144,7 @@ public class ElasticSearchJavaRestClient extends RestClient { public boolean deleteDocument(String index, String documentId) throws IOException { final DeleteRequest req = new DeleteRequest.Builder() -.index(config.getIndexName()) +.index(index) .id(documentId) .build(); @@ -156,7 +156,7 @@ public class ElasticSearchJavaRestClient extends RestClient { public boolean indexDocument(String index, String documentId, String documentSource) throws IOException { final Map mapped = objectMapper.readValue(documentSource, Map.class); final IndexRequest indexRequest = new IndexRequest.Builder<>() -.index(config.getIndexName()) +.index(index) .document(mapped) .id(documentId) .build(); diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java index 9a2cb4ab565..f1da6fd0c7e 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import co.elastic.clients.transport.ElasticsearchTransport; import com.fasterxml.jackson.core.JsonParseException; @@ -43,6 +44,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.GenericObject; @@ -152,6 +154,7 @@ public abstract class ElasticSearchSinkTests extends ElasticSearchTestBase { }); when(mockRecord.getSchema()).thenAnswer((Answer>>) invocation -> kvSchema); +when(mockRecord.getEventTime()).thenAnswer(invocation -> Optional.of(System.currentTimeMillis())); } @AfterMethod(alwaysRun = true) @@ -209,6 +212,16 @@ public abstract class ElasticSearchSinkTests extends ElasticSearchTestBase { verify(mockRecord, times(100)).ack(); } +@Test +public final void send1WithFormattedIndexTest() throws Exception { +map.put("indexName", "test-formatted-index-%{+-MM-dd}"); +sink.open(map, mockSinkContext); +send(1); +verify(mockRecord, times(1)).ack(); +String value = getHitIdAtIndex("test-formatted-index-*", 0); +assertTrue(StringUtils.isNotBlank(value)); +} + @Test public final void sendNoSchemaTest() throws Exception {
(pulsar) branch master updated (7e88463d9a5 -> 816755429a3)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 7e88463d9a5 [fix] Fix Reader can be stuck from transaction aborted messages. (#22610) add 816755429a3 [improve][test] Clear fields in AuthZTest classes at cleanup (#22661) No new revisions were added by this update. Summary of changes: .../org/apache/pulsar/broker/admin/AuthZTest.java | 15 ++ .../pulsar/broker/admin/NamespaceAuthZTest.java| 4 +++ .../apache/pulsar/broker/admin/TopicAuthZTest.java | 32 +- .../broker/admin/TopicPoliciesAuthZTest.java | 2 ++ .../admin/TransactionAndSchemaAuthZTest.java | 14 +++--- .../pulsar/security/MockedPulsarStandalone.java| 3 ++ 6 files changed, 41 insertions(+), 29 deletions(-)
(pulsar) branch master updated: [improve][test] Add policy authentication test for namespace API (#22593)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 1bb9378b50a [improve][test] Add policy authentication test for namespace API (#22593) 1bb9378b50a is described below commit 1bb9378b50aa891834b64cd39f55ae0e32a055bb Author: Cong Zhao AuthorDate: Sun Apr 28 10:37:37 2024 +0800 [improve][test] Add policy authentication test for namespace API (#22593) --- .../pulsar/broker/admin/NamespaceAuthZTest.java| 1248 ++-- 1 file changed, 1140 insertions(+), 108 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java index 5358295b785..ec6a122f7df 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java @@ -20,9 +20,11 @@ package org.apache.pulsar.broker.admin; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.deleteNamespaceWithRetry; +import static org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy.AutoUpdateDisabled; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import com.google.common.collect.Sets; import io.jsonwebtoken.Jwts; import java.io.File; import java.util.ArrayList; @@ -32,6 +34,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.Cleanup; import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; @@ -44,17 +48,33 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; +import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; +import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.EntryFilters; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.NamespaceOperation; +import org.apache.pulsar.common.policies.data.OffloadPolicies; +import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.PolicyName; +import org.apache.pulsar.common.policies.data.PolicyOperation; +import org.apache.pulsar.common.policies.data.PublishRate; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider; import org.apache.pulsar.packages.management.core.common.PackageMetadata; import org.apache.pulsar.security.MockedPulsarStandalone; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; @@ -72,7 +92,7 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { private AuthorizationService authorizationService; -private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); +private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); private static final String TENANT_ADMIN_TOKEN = Jwts.builder() .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact(); @@ -122,16 +142,46 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { superUserAdmin.namespaces().createNamespace("public/default"); } -private void setAuthorizationOperationChecker(String role, NamespaceOperation operation) { +private AtomicBoolean setAuthorizationOperationChecker(String role, NamespaceOperation operation) { +AtomicBoolean execFlag = new AtomicBoolean(false); Mockito.doAnswer(invocationOnMock -> { String role_ = invocat
(pulsar) branch branch-3.2 updated: [improve][admin] Check if the topic existed before the permission operations (#22547)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new adac20a72ef [improve][admin] Check if the topic existed before the permission operations (#22547) adac20a72ef is described below commit adac20a72efdc2b1d9b16464ebffb569c41014e9 Author: Jiwei Guo AuthorDate: Fri Apr 26 14:05:30 2024 +0800 [improve][admin] Check if the topic existed before the permission operations (#22547) --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 9 ++--- .../pulsar/broker/admin/AdminApiSchemaWithAuthTest.java| 1 + .../java/org/apache/pulsar/broker/admin/AdminApiTest.java | 12 .../apache/pulsar/broker/admin/PersistentTopicsTest.java | 10 -- .../org/apache/pulsar/broker/auth/AuthorizationTest.java | 14 +- .../client/api/AuthenticatedProducerConsumerTest.java | 4 +++- .../client/api/AuthorizationProducerConsumerTest.java | 2 ++ .../pulsar/websocket/proxy/ProxyAuthorizationTest.java | 8 +--- 8 files changed, 46 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index b0968f494ee..4b29452f98c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -207,6 +207,7 @@ public class PersistentTopicsBase extends AdminResource { protected CompletableFuture>> internalGetPermissionsOnTopic() { // This operation should be reading from zookeeper and it should be allowed without having admin privileges return validateAdminAccessForTenantAsync(namespaceName.getTenant()) +.thenCompose(__ -> internalCheckTopicExists(topicName)) .thenCompose(__ -> getAuthorizationService().getPermissionsAsync(topicName)); } @@ -258,9 +259,10 @@ public class PersistentTopicsBase extends AdminResource { Set actions) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessForTenantAsync(namespaceName.getTenant()) -.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 -> -grantPermissionsAsync(topicName, role, actions) -.thenAccept(unused -> asyncResponse.resume(Response.noContent().build() +.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) +.thenCompose(__ -> internalCheckTopicExists(topicName)) +.thenCompose(unused1 -> grantPermissionsAsync(topicName, role, actions)) +.thenAccept(unused -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { Throwable realCause = FutureUtil.unwrapCompletionException(ex); log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicName, realCause); @@ -272,6 +274,7 @@ public class PersistentTopicsBase extends AdminResource { protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessForTenantAsync(namespaceName.getTenant()) +.thenCompose(__ -> internalCheckTopicExists(topicName)) .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, true, false) .thenCompose(metadata -> { int numPartitions = metadata.partitions; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java index e89b4ff5e83..2dcb930fbe7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java @@ -120,6 +120,7 @@ public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest { .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString()) .authentication(AuthenticationToken.class.getName(), PRODUCE_TOKEN) .build(); +admin.topics().createNonPartitionedTopic(topicName); admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
(pulsar) branch master updated: [improve][admin] Check if the topic existed before the permission operations (#22547)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 69a600e86bb [improve][admin] Check if the topic existed before the permission operations (#22547) 69a600e86bb is described below commit 69a600e86bb5110a118d836125411e941b83764d Author: Jiwei Guo AuthorDate: Fri Apr 26 14:05:30 2024 +0800 [improve][admin] Check if the topic existed before the permission operations (#22547) --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 9 ++--- .../pulsar/broker/admin/AdminApiSchemaWithAuthTest.java | 1 + .../java/org/apache/pulsar/broker/admin/AdminApiTest.java | 12 .../apache/pulsar/broker/admin/PersistentTopicsTest.java| 10 -- .../org/apache/pulsar/broker/auth/AuthorizationTest.java| 13 - .../client/api/AuthenticatedProducerConsumerTest.java | 4 +++- .../client/api/AuthorizationProducerConsumerTest.java | 2 ++ .../pulsar/websocket/proxy/ProxyAuthorizationTest.java | 8 +--- 8 files changed, 45 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 63ea987bb07..682f41dcdb6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -205,6 +205,7 @@ public class PersistentTopicsBase extends AdminResource { protected CompletableFuture>> internalGetPermissionsOnTopic() { // This operation should be reading from zookeeper and it should be allowed without having admin privileges return validateAdminAccessForTenantAsync(namespaceName.getTenant()) +.thenCompose(__ -> internalCheckTopicExists(topicName)) .thenCompose(__ -> getAuthorizationService().getPermissionsAsync(topicName)); } @@ -256,9 +257,10 @@ public class PersistentTopicsBase extends AdminResource { Set actions) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessForTenantAsync(namespaceName.getTenant()) -.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 -> -grantPermissionsAsync(topicName, role, actions) -.thenAccept(unused -> asyncResponse.resume(Response.noContent().build() +.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) +.thenCompose(__ -> internalCheckTopicExists(topicName)) +.thenCompose(unused1 -> grantPermissionsAsync(topicName, role, actions)) +.thenAccept(unused -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { Throwable realCause = FutureUtil.unwrapCompletionException(ex); log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicName, realCause); @@ -270,6 +272,7 @@ public class PersistentTopicsBase extends AdminResource { protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessForTenantAsync(namespaceName.getTenant()) +.thenCompose(__ -> internalCheckTopicExists(topicName)) .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, true, false) .thenCompose(metadata -> { int numPartitions = metadata.partitions; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java index e89b4ff5e83..2dcb930fbe7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java @@ -120,6 +120,7 @@ public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest { .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString()) .authentication(AuthenticationToken.class.getName(), PRODUCE_TOKEN) .build(); +admin.topics().createNonPartitionedTopic(topicName); admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
(pulsar) branch master updated: [fix][admin] Fix namespace admin api exception response (#22587)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new f25776d7fe6 [fix][admin] Fix namespace admin api exception response (#22587) f25776d7fe6 is described below commit f25776d7fe6812f11b17226995d989c5a2364920 Author: Cong Zhao AuthorDate: Fri Apr 26 09:18:27 2024 +0800 [fix][admin] Fix namespace admin api exception response (#22587) --- .../pulsar/broker/admin/impl/NamespacesBase.java | 5 +- .../pulsar/broker/admin/NamespaceAuthZTest.java| 60 -- 2 files changed, 48 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index bbadc7bb331..5f2dccc3e9c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2019,7 +2019,7 @@ public abstract class NamespacesBase extends AdminResource { } protected void internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic){ -validateNamespacePolicyOperationAsync(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE); +validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE); validatePoliciesReadOnlyAccess(); if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) { throw new RestException(Status.PRECONDITION_FAILED, @@ -2125,9 +2125,10 @@ public abstract class NamespacesBase extends AdminResource { f.complete(null); }) .exceptionally(t -> { +Throwable cause = FutureUtil.unwrapCompletionException(t); log.error("[{}] Failed to update offloadThresholdInSeconds configuration for namespace {}", clientAppId(), namespaceName, t); -f.completeExceptionally(new RestException(t)); +f.completeExceptionally(new RestException(cause)); return null; }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java index d5a0468f340..5358295b785 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.admin; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.deleteNamespaceWithRetry; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -58,7 +59,6 @@ import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test(groups = "broker-admin") @@ -72,8 +72,6 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { private AuthorizationService authorizationService; -private AuthorizationService orignalAuthorizationService; - private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); private static final String TENANT_ADMIN_TOKEN = Jwts.builder() .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact(); @@ -100,6 +98,9 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN)) .build(); this.pulsarClient = super.getPulsarService().getClient(); +this.authorizationService = Mockito.spy(getPulsarService().getBrokerService().getAuthorizationService()); +FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", +authorizationService, true); } @@ -115,19 +116,9 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { close(); } -@BeforeMethod -public void before() throws IllegalAccessException { -orignalAuthorizationService = getPulsarService().getBrokerService().getAuthorizationService(); -authorizationService = Mockito.spy(orignalAuthorizationService); -FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", -authorizationService, true); -} - @AfterMethod -public
(pulsar) branch branch-3.0 updated: [improve][sec] Align some namespace level policy authorisation check (#21640)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 7fdfbf46f9b [improve][sec] Align some namespace level policy authorisation check (#21640) 7fdfbf46f9b is described below commit 7fdfbf46f9b01e55cac270782b57008ec02eb5b2 Author: Qiang Zhao AuthorDate: Mon Dec 4 22:15:19 2023 +0800 [improve][sec] Align some namespace level policy authorisation check (#21640) --- .../pulsar/broker/admin/impl/NamespacesBase.java | 30 +- .../apache/pulsar/broker/admin/v2/Namespaces.java | 3 ++- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 04ec944aab4..d3c5f681b6d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1204,7 +1204,8 @@ public abstract class NamespacesBase extends AdminResource { protected CompletableFuture internalSetPublishRateAsync(PublishRate maxPublishMessageRate) { log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate); -return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { +return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.WRITE) +.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { policies.publishMaxMessageRate.put(pulsar().getConfiguration().getClusterName(), maxPublishMessageRate); log.info("[{}] Successfully updated the publish_max_message_rate for cluster on namespace {}", clientAppId(), namespaceName); @@ -1233,7 +1234,8 @@ public abstract class NamespacesBase extends AdminResource { protected CompletableFuture internalRemovePublishRateAsync() { log.info("[{}] Remove namespace publish-rate {}/{}", clientAppId(), namespaceName, topicName); -return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { +return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.WRITE) +.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { if (policies.publishMaxMessageRate != null) { policies.publishMaxMessageRate.remove(pulsar().getConfiguration().getClusterName()); } @@ -1253,7 +1255,8 @@ public abstract class NamespacesBase extends AdminResource { @SuppressWarnings("deprecation") protected CompletableFuture internalSetTopicDispatchRateAsync(DispatchRateImpl dispatchRate) { log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate); -return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { +return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.WRITE) +.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { policies.topicDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate); policies.clusterDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate); log.info("[{}] Successfully updated the dispatchRate for cluster on namespace {}", clientAppId(), @@ -1263,7 +1266,8 @@ public abstract class NamespacesBase extends AdminResource { } protected CompletableFuture internalDeleteTopicDispatchRateAsync() { -return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { +return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.WRITE) +.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { policies.topicDispatchRate.remove(pulsar().getConfiguration().getClusterName()); policies.clusterDispatchRate.remove(pulsar().getConfiguration().getClusterName()); log.info("[{}] Successfully delete the dispatchRate for cluster on namespace {}", clientAppId(), @@ -1280,7 +1284,7 @@ public abstract class NamespacesBase extends AdminResource { } protected CompletableFuture internalSetSubscriptionDispatchRateAsync(DispatchRateImpl dispatchRate) { -return validateSuperUserAccessAsync() +return validateNamespacePolicyOperationAsync(namespa
(pulsar) branch master updated: [improve][test] Add topic policy test for topic API (#22546)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 3a0f908e80d [improve][test] Add topic policy test for topic API (#22546) 3a0f908e80d is described below commit 3a0f908e80d0863920a1258362fd782e95fe8f17 Author: Jiwei Guo AuthorDate: Mon Apr 22 19:47:03 2024 +0800 [improve][test] Add topic policy test for topic API (#22546) --- .../org/apache/pulsar/broker/admin/AuthZTest.java | 113 ++ .../apache/pulsar/broker/admin/TopicAuthZTest.java | 1121 ++-- .../admin/TransactionAndSchemaAuthZTest.java | 359 +++ 3 files changed, 1270 insertions(+), 323 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AuthZTest.java new file mode 100644 index 000..a710a03970d --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AuthZTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import io.jsonwebtoken.Jwts; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.policies.data.NamespaceOperation; +import org.apache.pulsar.common.policies.data.TopicOperation; +import org.apache.pulsar.security.MockedPulsarStandalone; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import static org.mockito.Mockito.doReturn; + +public class AuthZTest extends MockedPulsarStandalone { + +protected PulsarAdmin superUserAdmin; + +protected PulsarAdmin tenantManagerAdmin; + +protected AuthorizationService authorizationService; + +protected AuthorizationService orignalAuthorizationService; + +protected static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); +protected static final String TENANT_ADMIN_TOKEN = Jwts.builder() +.claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact(); + +@BeforeMethod(alwaysRun = true) +public void before() throws IllegalAccessException { +orignalAuthorizationService = getPulsarService().getBrokerService().getAuthorizationService(); +authorizationService = Mockito.spy(orignalAuthorizationService); +FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", +authorizationService, true); +} + +@AfterMethod(alwaysRun = true) +public void after() throws IllegalAccessException { +FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", +orignalAuthorizationService, true); +} + +protected AtomicBoolean setAuthorizationTopicOperationChecker(String role, Object operation) { +AtomicBoolean execFlag = new AtomicBoolean(false); +if (operation instanceof TopicOperation) { +Mockito.doAnswer(invocationOnMock -> { +String role_ = invocationOnMock.getArgument(2); +if (role.equals(role_)) { +TopicOperation operation_ = invocationOnMock.getArgument(1); +Assert.assertEquals(operation_, operation); +} +execFlag.set(true); +return invocationOnMock.callRealMethod(); + }).when(authorizationService).allowTopicOperationAsync(Mockito.any(), Mockito.any(), Mockito.any(), +Mockito.any(), Mockito.any()); +} else if (operation instanceof NamespaceOperation) { +doReturn(true) + .when(authorizationService).isValidOriginalPrincipal(Mockito.any(), Mockito.any(), Mockito.any()); +Mockito.doAnswer(invocationOnMock -> { +
(pulsar) branch master updated: [improve][admin] Align the auth and check it at the first place for topic related API (#22507)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 8ca01cd42ed [improve][admin] Align the auth and check it at the first place for topic related API (#22507) 8ca01cd42ed is described below commit 8ca01cd42edfd4efd986f752f6f8538ea5bf4f94 Author: Jiwei Guo AuthorDate: Wed Apr 17 18:46:22 2024 +0800 [improve][admin] Align the auth and check it at the first place for topic related API (#22507) --- .../broker/admin/impl/PersistentTopicsBase.java| 419 ++--- .../pulsar/broker/admin/v2/PersistentTopics.java | 44 ++- .../apache/pulsar/broker/admin/TopicAuthZTest.java | 257 +++-- 3 files changed, 447 insertions(+), 273 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index ab74b1e2bcc..1f8d0657190 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -128,8 +128,6 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.Policies; -import org.apache.pulsar.common.policies.data.PolicyName; -import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; @@ -2727,14 +2725,14 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture internalGetMessageIdByTimestampAsync(long timestamp, boolean authoritative) { -CompletableFuture future; -if (topicName.isGlobal()) { -future = validateGlobalNamespaceOwnershipAsync(namespaceName); -} else { -future = CompletableFuture.completedFuture(null); -} - +CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES); return future.thenCompose(__ -> { +if (topicName.isGlobal()) { +return validateGlobalNamespaceOwnershipAsync(namespaceName); +} else { +return CompletableFuture.completedFuture(null); +} +}).thenCompose(__ -> { if (topicName.isPartitioned()) { return CompletableFuture.completedFuture(null); } else { @@ -2748,7 +2746,6 @@ public class PersistentTopicsBase extends AdminResource { }); } }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) -.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES)) .thenCompose(__ -> getTopicReferenceAsync(topicName)) .thenCompose(topic -> { if (!(topic instanceof PersistentTopic)) { @@ -3158,65 +3155,56 @@ public class PersistentTopicsBase extends AdminResource { protected void internalGetBacklogSizeByMessageId(AsyncResponse asyncResponse, MessageIdImpl messageId, boolean authoritative) { -CompletableFuture ret; -// If the topic name is a partition name, no need to get partition topic metadata again -if (!topicName.isPartitioned()) { -ret = getPartitionedTopicMetadataAsync(topicName, authoritative, false) -.thenCompose(topicMetadata -> { -if (topicMetadata.partitions > 0) { -log.warn("[{}] Not supported calculate backlog size operation on partitioned-topic {}", -clientAppId(), topicName); -asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, -"calculate backlog size is not allowed for partitioned-topic")); -} -return CompletableFuture.completedFuture(null); -}); -} else { -ret = CompletableFuture.completedFuture(null); -} -CompletableFuture future; -if (topicName.isGlobal()) { -future = ret.thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(namespaceName)); -} else { -future = ret; -} -future.thenAccept(__ -> val
(pulsar) branch master updated: [improve][broker] Optimize gzip compression for /metrics endpoint by sharing/caching compressed result (#22521)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 94f6c7ccd2b [improve][broker] Optimize gzip compression for /metrics endpoint by sharing/caching compressed result (#22521) 94f6c7ccd2b is described below commit 94f6c7ccd2bf8bc261d45ab41f6c7f123359fa47 Author: Lari Hotari AuthorDate: Wed Apr 17 03:15:01 2024 -0700 [improve][broker] Optimize gzip compression for /metrics endpoint by sharing/caching compressed result (#22521) --- .../stats/prometheus/PrometheusMetricsServlet.java | 1 + .../apache/pulsar/broker/web/GzipHandlerUtil.java | 21 +++ .../pulsar/broker/web/GzipHandlerUtilTest.java | 36 + .../org/apache/pulsar/broker/PulsarService.java| 3 +- .../prometheus/PrometheusMetricsGenerator.java | 176 +++-- .../prometheus/PulsarPrometheusMetricsServlet.java | 28 +++- .../apache/pulsar/PrometheusMetricsTestUtil.java | 2 +- 7 files changed, 253 insertions(+), 14 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java index 8a41bed29d4..8685348174c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java @@ -39,6 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PrometheusMetricsServlet extends HttpServlet { +public static final String DEFAULT_METRICS_PATH = "/metrics"; private static final long serialVersionUID = 1L; static final int HTTP_STATUS_OK_200 = 200; static final int HTTP_STATUS_INTERNAL_SERVER_ERROR_500 = 500; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java index 37c9c05e5d5..9e980cecb79 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java @@ -19,8 +19,10 @@ package org.apache.pulsar.broker.web; import java.util.List; +import org.eclipse.jetty.http.pathmap.PathSpecSet; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.handler.gzip.GzipHandler; +import org.eclipse.jetty.util.IncludeExclude; public class GzipHandlerUtil { public static Handler wrapWithGzipHandler(Handler innerHandler, List gzipCompressionExcludedPaths) { @@ -45,4 +47,23 @@ public class GzipHandlerUtil { && (gzipCompressionExcludedPaths.get(0).equals("^.*") || gzipCompressionExcludedPaths.get(0).equals("^.*$")); } + +/** + * Check if GZIP compression is enabled for the given endpoint. + * @param gzipCompressionExcludedPaths list of paths that should not be compressed + * @param endpoint the endpoint to check + * @return true if GZIP compression is enabled for the endpoint, false otherwise + */ +public static boolean isGzipCompressionEnabledForEndpoint(List gzipCompressionExcludedPaths, + String endpoint) { +if (gzipCompressionExcludedPaths == null || gzipCompressionExcludedPaths.isEmpty()) { +return true; +} +if (isGzipCompressionCompletelyDisabled(gzipCompressionExcludedPaths)) { +return false; +} +IncludeExclude paths = new IncludeExclude<>(PathSpecSet.class); +paths.exclude(gzipCompressionExcludedPaths.toArray(new String[0])); +return paths.test(endpoint); +} } diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/web/GzipHandlerUtilTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/web/GzipHandlerUtilTest.java new file mode 100644 index 000..d6958695dec --- /dev/null +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/web/GzipHandlerUtilTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + *
(pulsar) branch master updated: [improve][broker] Repeat the handleMetadataChanges callback when configurationMetadataStore equals localMetadataStore (#22519)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 1dd82a0affd [improve][broker] Repeat the handleMetadataChanges callback when configurationMetadataStore equals localMetadataStore (#22519) 1dd82a0affd is described below commit 1dd82a0affd6ec3686fa85d444c354e9ce12 Author: hanmz AuthorDate: Wed Apr 17 18:14:38 2024 +0800 [improve][broker] Repeat the handleMetadataChanges callback when configurationMetadataStore equals localMetadataStore (#22519) --- .../src/main/java/org/apache/pulsar/broker/service/BrokerService.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 2687532693a..249008bad91 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -357,7 +357,9 @@ public class BrokerService implements Closeable { this.entryFilterProvider = new EntryFilterProvider(pulsar.getConfiguration()); pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges); - pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges); +if (pulsar.getConfigurationMetadataStore() != pulsar.getLocalMetadataStore()) { + pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges); +} this.inactivityMonitor = OrderedScheduler.newSchedulerBuilder() .name("pulsar-inactivity-monitor")
(pulsar) branch master updated (70b401b1de9 -> ffdfc0c4e08)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 70b401b1de9 [improve][fn] Expose `RuntimeFlags` as CLI option for Pulsar Functions and Connectors (#22514) add ffdfc0c4e08 [fix][test] SchemaMap in AutoConsumeSchema has been reused (#22500) No new revisions were added by this update. Summary of changes: .../client/api/SimpleProducerConsumerTest.java | 66 +- 1 file changed, 38 insertions(+), 28 deletions(-)
(pulsar) branch master updated: [improve][fn] Expose `RuntimeFlags` as CLI option for Pulsar Functions and Connectors (#22514)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 70b401b1de9 [improve][fn] Expose `RuntimeFlags` as CLI option for Pulsar Functions and Connectors (#22514) 70b401b1de9 is described below commit 70b401b1de9df685283140cff1f83252abc27045 Author: Rui Fu AuthorDate: Tue Apr 16 19:53:29 2024 +0800 [improve][fn] Expose `RuntimeFlags` as CLI option for Pulsar Functions and Connectors (#22514) --- .../test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java| 4 +++- .../src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java| 7 +++ .../src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java| 6 ++ .../src/main/java/org/apache/pulsar/admin/cli/CmdSources.java | 6 ++ 4 files changed, 22 insertions(+), 1 deletion(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index 4d906af9424..d3087b7fc87 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -165,7 +165,8 @@ public class CmdFunctionsTest { "--className", DummyFunction.class.getName(), "--dead-letter-topic", "test-dead-letter-topic", "--custom-runtime-options", "custom-runtime-options", -"--user-config", "{\"key\": [\"value1\", \"value2\"]}" +"--user-config", "{\"key\": [\"value1\", \"value2\"]}", +"--runtime-flags", "--add-opens java.base/java.lang=ALL-UNNAMED" }); CreateFunction creater = cmd.getCreater(); @@ -175,6 +176,7 @@ public class CmdFunctionsTest { assertEquals(Boolean.FALSE, creater.getAutoAck()); assertEquals("test-dead-letter-topic", creater.getDeadLetterTopic()); assertEquals("custom-runtime-options", creater.getCustomRuntimeOptions()); +assertEquals("--add-opens java.base/java.lang=ALL-UNNAMED", creater.getRuntimeFlags()); verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString()); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 15b8fca0761..5e80c168d92 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -377,6 +377,9 @@ public class CmdFunctions extends CmdBase { @Option(names = "--dead-letter-topic", description = "The topic where messages that are not processed successfully are sent to #Java") protected String deadLetterTopic; +@Option(names = "--runtime-flags", description = "Any flags that you want to pass to a runtime" ++ " (for process & Kubernetes runtime only).") +protected String runtimeFlags; protected FunctionConfig functionConfig; protected String userCodeFile; @@ -676,6 +679,10 @@ public class CmdFunctions extends CmdBase { userCodeFile = functionConfig.getGo(); } +if (null != runtimeFlags) { +functionConfig.setRuntimeFlags(runtimeFlags); +} + // check if configs are valid validateFunctionConfigs(functionConfig); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index f3172a49b01..be1cd0af960 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -404,6 +404,9 @@ public class CmdSinks extends CmdBase { protected String transformFunctionConfig; @Option(names = "--log-topic", description = "The topic to which the logs of a Pulsar Sink are produced") protected String logTopic; +@Option(names = "--runtime-flags", description = "Any flags that you want to pass to a runtime" ++ " (for process & Kubernetes runtime only).") +protected String runtimeFlags; protected SinkConfig sinkConfig; @@ -602,6 +605,9 @@ public class CmdSinks extends CmdBase {
(pulsar) branch master updated: [improve][test] Add topic operation checker for topic API (#22468)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 9d72e6bd847 [improve][test] Add topic operation checker for topic API (#22468) 9d72e6bd847 is described below commit 9d72e6bd847df85a7d18f1827274df96a446798f Author: Cong Zhao AuthorDate: Mon Apr 15 16:15:59 2024 +0800 [improve][test] Add topic operation checker for topic API (#22468) --- .../apache/pulsar/broker/admin/TopicAuthZTest.java | 156 ++--- 1 file changed, 135 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java index d09bc0a3ffd..e6ff0ce2bb4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java @@ -20,9 +20,18 @@ package org.apache.pulsar.broker.admin; import io.jsonwebtoken.Jwts; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; import lombok.SneakyThrows; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; @@ -37,22 +46,21 @@ import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.security.MockedPulsarStandalone; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; @Test(groups = "broker-admin") public class TopicAuthZTest extends MockedPulsarStandalone { @@ -61,13 +69,17 @@ public class TopicAuthZTest extends MockedPulsarStandalone { private PulsarAdmin tenantManagerAdmin; +private AuthorizationService authorizationService; + +private AuthorizationService orignalAuthorizationService; + private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); private static final String TENANT_ADMIN_TOKEN = Jwts.builder() .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact(); @SneakyThrows @BeforeClass(alwaysRun = true) -public void before() { +public void setup() { configureTokenAuthentication(); configureDefaultAuthorization(); enableTransaction(); @@ -99,7 +111,7 @@ public class TopicAuthZTest extends MockedPulsarStandalone { @SneakyThrows @AfterClass(alwaysRun = true) -public void after() { +public void cleanup() { if (superUserAdmin != null) { superUserAdmin.close(); } @@ -109,6 +121,51 @@ public class TopicAuthZTest extends MockedPulsarStandalone { close(); } +@BeforeMethod +public void before() throws IllegalAccessException { +orignalAuthorizationService = getPulsarService().getBrokerService().getAuthorizationService(); +authorizationService = Mockito.spy(orignalAuthorizationService); +FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", +authorizationService, true); +} + +@AfterMethod +public void after() throws IllegalAccessException { +FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", +orignalAuthorizationService, true); +} + +private AtomicBoolean setAuthorizationTopicOperationChecker(String role, Object operation) { +AtomicBoolean execFlag = new AtomicBoolean(false); +
(pulsar) branch master updated: [fix][test] Flaky-test: testMessageExpiryWithTimestampNonRecoverableException and testIncorrectClientClock (#22489)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new d9a43dd2160 [fix][test] Flaky-test: testMessageExpiryWithTimestampNonRecoverableException and testIncorrectClientClock (#22489) d9a43dd2160 is described below commit d9a43dd21605930e16bb038095e36fceff3a4a40 Author: Baodi Shi AuthorDate: Mon Apr 15 13:55:34 2024 +0800 [fix][test] Flaky-test: testMessageExpiryWithTimestampNonRecoverableException and testIncorrectClientClock (#22489) --- .../service/PersistentMessageFinderTest.java | 42 +++--- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 6965ac28068..0972c9098b5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.service; -import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; @@ -383,7 +382,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { * * @throws Exception */ -@Test(groups = "flaky") +@Test void testMessageExpiryWithTimestampNonRecoverableException() throws Exception { final String ledgerAndCursorName = "testPersistentMessageExpiryWithNonRecoverableLedgers"; @@ -402,11 +401,15 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { for (int i = 0; i < totalEntries; i++) { ledger.addEntry(createMessageWrittenToLedger("msg" + i)); } +Awaitility.await().untilAsserted(() -> +assertEquals(ledger.getState(), ManagedLedgerImpl.State.LedgerOpened)); List ledgers = ledger.getLedgersInfoAsList(); LedgerInfo lastLedgerInfo = ledgers.get(ledgers.size() - 1); - -assertEquals(ledgers.size(), totalEntries / entriesPerLedger); +// The `lastLedgerInfo` should be newly opened, and it does not contain any entries. +// Please refer to: https://github.com/apache/pulsar/pull/22034 +assertEquals(lastLedgerInfo.getEntries(), 0); +assertEquals(ledgers.size(), totalEntries / entriesPerLedger + 1); // this will make sure that all entries should be deleted Thread.sleep(TimeUnit.SECONDS.toMillis(ttlSeconds)); @@ -420,19 +423,13 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST); PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); -Position previousMarkDelete = null; -for (int i = 0; i < totalEntries; i++) { -monitor.expireMessages(1); -Position previousPos = previousMarkDelete; -retryStrategically( -(test) -> c1.getMarkDeletedPosition() != null && !c1.getMarkDeletedPosition().equals(previousPos), -5, 100); -previousMarkDelete = c1.getMarkDeletedPosition(); -} - -PositionImpl markDeletePosition = (PositionImpl) c1.getMarkDeletedPosition(); -assertEquals(lastLedgerInfo.getLedgerId(), markDeletePosition.getLedgerId()); -assertEquals(lastLedgerInfo.getEntries() - 1, markDeletePosition.getEntryId()); +assertTrue(monitor.expireMessages(ttlSeconds)); +Awaitility.await().untilAsserted(() -> { +PositionImpl markDeletePosition = (PositionImpl) c1.getMarkDeletedPosition(); +// The markDeletePosition points to the last entry of the previous ledger in lastLedgerInfo. +assertEquals(markDeletePosition.getLedgerId(), lastLedgerInfo.getLedgerId() - 1); +assertEquals(markDeletePosition.getEntryId(), entriesPerLedger - 1); +}); c1.close(); ledger.close(); @@ -440,20 +437,25 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { } -@Test(groups = "flaky") +@Test public void testIncorrectClientClock() throws Exception { final String ledgerAndCursorName = "testIncorrectClientClock"; int maxTTLSeconds = 1; +int entriesNum = 10; ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setMaxEntriesPerLedger(
svn commit: r68514 - /release/pulsar/pulsar-client-cpp-3.4.0/
Author: technoboy Date: Mon Apr 15 01:28:45 2024 New Revision: 68514 Log: cleanup Removed: release/pulsar/pulsar-client-cpp-3.4.0/
svn commit: r68517 - /release/pulsar/pulsar-client-cpp-3.1.1/
Author: technoboy Date: Mon Apr 15 01:29:42 2024 New Revision: 68517 Log: cleanup Removed: release/pulsar/pulsar-client-cpp-3.1.1/
svn commit: r68516 - /release/pulsar/pulsar-client-cpp-3.1.0/
Author: technoboy Date: Mon Apr 15 01:29:27 2024 New Revision: 68516 Log: cleanup Removed: release/pulsar/pulsar-client-cpp-3.1.0/
svn commit: r68515 - /release/pulsar/pulsar-client-cpp-3.4.1/
Author: technoboy Date: Mon Apr 15 01:28:58 2024 New Revision: 68515 Log: cleanup Removed: release/pulsar/pulsar-client-cpp-3.4.1/
svn commit: r68513 - /release/pulsar/pulsar-client-go-0.11.0/
Author: technoboy Date: Mon Apr 15 01:28:14 2024 New Revision: 68513 Log: cleanup Removed: release/pulsar/pulsar-client-go-0.11.0/
svn commit: r68512 - /release/pulsar/pulsar-dotpulsar-3.1.1/
Author: technoboy Date: Mon Apr 15 01:27:45 2024 New Revision: 68512 Log: cleanup Removed: release/pulsar/pulsar-dotpulsar-3.1.1/
svn commit: r68511 - /release/pulsar/pulsar-dotpulsar-3.1.0/
Author: technoboy Date: Mon Apr 15 01:27:29 2024 New Revision: 68511 Log: cleanup Removed: release/pulsar/pulsar-dotpulsar-3.1.0/
svn commit: r68510 - /release/pulsar/pulsar-client-go-0.12.0/
Author: technoboy Date: Mon Apr 15 01:26:17 2024 New Revision: 68510 Log: cleanup Removed: release/pulsar/pulsar-client-go-0.12.0/
svn commit: r68509 - /release/pulsar/pulsar-client-reactive-0.5.2/
Author: technoboy Date: Mon Apr 15 01:25:44 2024 New Revision: 68509 Log: cleanup Removed: release/pulsar/pulsar-client-reactive-0.5.2/
svn commit: r68508 - /release/pulsar/pulsar-client-reactive-0.5.1/
Author: technoboy Date: Mon Apr 15 01:25:31 2024 New Revision: 68508 Log: cleanup Removed: release/pulsar/pulsar-client-reactive-0.5.1/
svn commit: r68507 - /release/pulsar/pulsar-client-reactive-0.5.0/
Author: technoboy Date: Mon Apr 15 01:25:17 2024 New Revision: 68507 Log: cleanup Removed: release/pulsar/pulsar-client-reactive-0.5.0/
svn commit: r68506 - /release/pulsar/pulsar-3.2.1/
Author: technoboy Date: Mon Apr 15 01:24:19 2024 New Revision: 68506 Log: cleanup Removed: release/pulsar/pulsar-3.2.1/
svn commit: r68505 - /release/pulsar/pulsar-3.2.0/
Author: technoboy Date: Mon Apr 15 01:23:56 2024 New Revision: 68505 Log: cleanup Removed: release/pulsar/pulsar-3.2.0/
svn commit: r68502 - /release/pulsar/pulsar-3.1.0/
Author: technoboy Date: Mon Apr 15 01:22:31 2024 New Revision: 68502 Log: cleanup Removed: release/pulsar/pulsar-3.1.0/
svn commit: r68504 - /release/pulsar/pulsar-3.1.2/
Author: technoboy Date: Mon Apr 15 01:23:19 2024 New Revision: 68504 Log: cleanup Removed: release/pulsar/pulsar-3.1.2/
svn commit: r68503 - /release/pulsar/pulsar-3.1.1/
Author: technoboy Date: Mon Apr 15 01:22:58 2024 New Revision: 68503 Log: cleanup Removed: release/pulsar/pulsar-3.1.1/
svn commit: r68501 - /release/pulsar/pulsar-3.0.3/
Author: technoboy Date: Mon Apr 15 01:22:04 2024 New Revision: 68501 Log: cleanup Removed: release/pulsar/pulsar-3.0.3/