[pulsar] branch branch-2.11 updated: [cleanup][broker] Simplify extract entryMetadata code in filterEntriesForConsumer (#18729)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 644a2c5f8d2 [cleanup][broker] Simplify extract entryMetadata code in filterEntriesForConsumer (#18729) 644a2c5f8d2 is described below commit 644a2c5f8d20d6889d7f27c8fbe07852ea00f5bd Author: lifepuzzlefun AuthorDate: Mon Dec 12 14:18:15 2022 +0800 [cleanup][broker] Simplify extract entryMetadata code in filterEntriesForConsumer (#18729) ### Motivation origin extract entry metadata logic is based on `Optional.map.orElseGet` which can be simplified by if condition and also has better performance on hot code path. ### Modifications 1. use if null check replace Optional code. 2. remove duplicate hasChunk check logic in `PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers` --- .../broker/service/AbstractBaseDispatcher.java | 29 ++ .../PersistentDispatcherMultipleConsumers.java | 5 +--- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 677b3a84a4c..1157ae65558 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; import lombok.extern.slf4j.Slf4j; @@ -47,6 +46,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Markers; +import org.checkerframework.checker.nullness.qual.Nullable; @Slf4j public abstract class AbstractBaseDispatcher extends EntryFilterSupport implements Dispatcher { @@ -94,22 +94,25 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen public int filterEntriesForConsumer(List entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead, Consumer consumer) { -return filterEntriesForConsumer(Optional.empty(), 0, entries, batchSizes, sendMessageInfo, indexesAcks, cursor, +return filterEntriesForConsumer(null, 0, entries, batchSizes, +sendMessageInfo, indexesAcks, cursor, isReplayRead, consumer); } /** * Filter entries with prefetched message metadata range so that there is no need to peek metadata from Entry. * - * @param optMetadataArray the optional message metadata array + * @param metadataArray the optional message metadata array. need check if null pass. * @param startOffset the index in `optMetadataArray` of the first Entry's message metadata * * @see AbstractBaseDispatcher#filterEntriesForConsumer(List, EntryBatchSizes, SendMessageInfo, * EntryBatchIndexesAcks, ManagedCursor, boolean, Consumer) */ -public int filterEntriesForConsumer(Optional optMetadataArray, int startOffset, - List entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo, - EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead, Consumer consumer) { +public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, int startOffset, +List entries, EntryBatchSizes batchSizes, +SendMessageInfo sendMessageInfo, +EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, +boolean isReplayRead, Consumer consumer) { int totalMessages = 0; long totalBytes = 0; int totalChunkedMessages = 0; @@ -124,11 +127,15 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen } ByteBuf metadataAndPayload = entry.getDataBuffer(); final int metadataIndex = i + startOffset; -final MessageMetadata msgMetadata = optMetadataArray.map(metadataArray -> metadataArray[metadataIndex]) -.orElseGet(() -> (entry instanceof EntryAndMetadata) -? ((EntryAndMetadata) entry).getMetadata() -: Commands.peekAndCopyMessageMetadata(metadataAndPayload,
[pulsar] branch branch-2.11 updated: [fix][broker] Fix PulsarRegistrationClient and ZkRegistrationClient not aware rack info problem. (#18672)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new d802a6aded1 [fix][broker] Fix PulsarRegistrationClient and ZkRegistrationClient not aware rack info problem. (#18672) d802a6aded1 is described below commit d802a6aded12f41facd09d5e0f18bd3e701b2d24 Author: Yan Zhao AuthorDate: Wed Feb 1 10:00:59 2023 +0800 [fix][broker] Fix PulsarRegistrationClient and ZkRegistrationClient not aware rack info problem. (#18672) --- .../rackawareness/BookieRackAffinityMapping.java | 26 + .../BookieRackAffinityMappingTest.java | 127 + .../bookkeeper/PulsarRegistrationClient.java | 18 ++- 3 files changed, 161 insertions(+), 10 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java index 53e1a683c55..3654b7b54c3 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java @@ -19,6 +19,7 @@ package org.apache.pulsar.bookie.rackawareness; import static org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.METADATA_STORE_SCHEME; +import java.lang.reflect.Field; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; @@ -28,8 +29,10 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.client.DefaultBookieAddressResolver; import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy; import org.apache.bookkeeper.client.RackChangeNotifier; +import org.apache.bookkeeper.discover.RegistrationClient; import org.apache.bookkeeper.meta.exceptions.Code; import org.apache.bookkeeper.meta.exceptions.MetadataException; import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping; @@ -118,6 +121,7 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get() .orElseGet(BookiesRackConfiguration::new); updateRacksWithHost(racksWithHost); +watchAvailableBookies(); for (Map bookieMapping : racksWithHost.values()) { for (String address : bookieMapping.keySet()) { bookieAddressListLastTime.add(BookieId.parse(address)); @@ -132,6 +136,28 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping } } +private void watchAvailableBookies() { +BookieAddressResolver bookieAddressResolver = getBookieAddressResolver(); +if (bookieAddressResolver instanceof DefaultBookieAddressResolver) { +try { +Field field = DefaultBookieAddressResolver.class.getDeclaredField("registrationClient"); +field.setAccessible(true); +RegistrationClient registrationClient = (RegistrationClient) field.get(bookieAddressResolver); +registrationClient.watchWritableBookies(versioned -> { +try { +racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get() +.orElseGet(BookiesRackConfiguration::new); +updateRacksWithHost(racksWithHost); +} catch (InterruptedException | ExecutionException e) { +LOG.error("Failed to update rack info. ", e); +} +}); +} catch (NoSuchFieldException | IllegalAccessException e) { +LOG.error("Failed watch available bookies.", e); +} +} +} + private synchronized void updateRacksWithHost(BookiesRackConfiguration racks) { // In config z-node, the bookies are added in the `ip:port` notation, while BK will ask // for just the IP/hostname when trying to get the rack for a bookie. diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java index 8cbd0ebe9ca..ac97db71e89 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java @@ -18,21 +18,45 @@ */ package org.apache.pulsar.bookie.rackawareness; +import static
[pulsar] branch branch-2.11 updated: [fix][broker] PulsarRegistrationClient - implement getAllBookies and follow BookieServiceInfo updates (#18133)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new b35ced440ad [fix][broker] PulsarRegistrationClient - implement getAllBookies and follow BookieServiceInfo updates (#18133) b35ced440ad is described below commit b35ced440ada04fefc5b6aa98b0fb705c5802e27 Author: Enrico Olivelli AuthorDate: Thu Oct 20 22:19:56 2022 +0200 [fix][broker] PulsarRegistrationClient - implement getAllBookies and follow BookieServiceInfo updates (#18133) * [fix] PulsarRegistrationClient - implement getAllBookies and follow BookieServiceInfo updates * remove debug --- .../bookkeeper/PulsarRegistrationClient.java | 61 ++- .../bookkeeper/PulsarRegistrationManager.java | 21 +++- .../bookkeeper/PulsarRegistrationClientTest.java | 118 +++-- 3 files changed, 163 insertions(+), 37 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java index f314c0efaf0..da7b1ae5fdb 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -97,14 +96,9 @@ public class PulsarRegistrationClient implements RegistrationClient { @Override public CompletableFuture>> getAllBookies() { -CompletableFuture>> wb = getWritableBookies(); -CompletableFuture>> rb = getReadOnlyBookies(); -return wb.thenCombine(rb, (rw, ro) -> { -Set res = new HashSet<>(); -res.addAll(rw.getValue()); -res.addAll(ro.getValue()); -return new Versioned<>(res, Version.NEW); -}); +// this method is meant to return all the known bookies, even the bookies +// that are not in a running state +return getChildren(bookieAllRegistrationPath); } @Override @@ -116,10 +110,9 @@ public class PulsarRegistrationClient implements RegistrationClient { return store.getChildren(path) .thenComposeAsync(children -> { Set bookieIds = PulsarRegistrationClient.convertToBookieAddresses(children); -Set bookies = convertToBookieAddresses(children); -List>> bookieInfoUpdated = -new ArrayList<>(bookies.size()); -for (BookieId id : bookies) { +List> bookieInfoUpdated = +new ArrayList<>(bookieIds.size()); +for (BookieId id : bookieIds) { // update the cache for new bookies if (!bookieServiceInfoCache.containsKey(id)) { bookieInfoUpdated.add(readBookieServiceInfoAsync(id)); @@ -160,26 +153,42 @@ public class PulsarRegistrationClient implements RegistrationClient { readOnlyBookiesWatchers.remove(registrationListener); } -private void updatedBookies(Notification n) { -if (n.getType() == NotificationType.Created || n.getType() == NotificationType.Deleted) { - -if (n.getType() == NotificationType.Deleted) { -BookieId bookieId = stripBookieIdFromPath(n.getPath()); +private void handleDeletedBookieNode(Notification n) { +if (n.getType() == NotificationType.Deleted) { +BookieId bookieId = stripBookieIdFromPath(n.getPath()); +if (bookieId != null) { log.info("Bookie {} disappeared", bookieId); -if (bookieId != null) { -bookieServiceInfoCache.remove(bookieId); -} +bookieServiceInfoCache.remove(bookieId); } +} +} +private void handleUpdatedBookieNode(Notification n) { +BookieId bookieId = stripBookieIdFromPath(n.getPath()); +if (bookieId != null) { +log.info("Bookie {} info updated", bookieId); +readBookieServiceInfoAsync(bookieId); +} +} + +private void updatedBookies(Notification n) { +if (n.getType() == NotificationType.Created || n.getType() == NotificationType.Deleted) { if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) { getReadOnlyBookies().thenAccept(bookies -> {
[pulsar] branch branch-2.11 updated: [fix][client] For exclusive subscriptions, if two consumers are created repeatedly, the second consumer will block (#18633)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 8599c4e1a5a [fix][client] For exclusive subscriptions, if two consumers are created repeatedly, the second consumer will block (#18633) 8599c4e1a5a is described below commit 8599c4e1a5a0edca5b8d530a879614ab5756609b Author: LinChen <1572139...@qq.com> AuthorDate: Tue Dec 6 09:54:03 2022 +0800 [fix][client] For exclusive subscriptions, if two consumers are created repeatedly, the second consumer will block (#18633) Co-authored-by: lordcheng10 --- .../pulsar/client/impl/TopicsConsumerImplTest.java | 30 ++ .../client/impl/MultiTopicsConsumerImpl.java | 11 ++-- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index d597d72ad81..e5ae39695fb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -69,6 +69,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.testng.Assert.assertEquals; @@ -539,6 +540,35 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { }).get(); } +@Test(timeOut = 3) +public void testExclusiveSubscribe() throws Exception { +final String topicName = "persistent://tenant1/namespace1/testTopicNameValid"; +TenantInfoImpl tenantInfo = createDefaultTenantInfo(); +admin.tenants().createTenant("tenant1", tenantInfo); +admin.namespaces().createNamespace("tenant1/namespace1"); +admin.topics().createPartitionedTopic(topicName, 3); + +Consumer consumer1 = pulsarClient.newConsumer() +.topic(topicName) +.subscriptionName("subscriptionName") +.subscriptionType(SubscriptionType.Exclusive) +.subscribe(); + +try { +pulsarClient.newConsumer() +.topics(IntStream.range(0, 3).mapToObj(i -> topicName + "-partition-" + i) +.collect(Collectors.toList())) +.subscriptionName("subscriptionName") +.subscriptionType(SubscriptionType.Exclusive) +.subscribe(); +fail("should fail"); +} catch (PulsarClientException e) { +String errorLog = e.getMessage(); +assertTrue(errorLog.contains("Exclusive consumer is already connected")); +} +consumer1.close(); +} + @Test public void testSubscribeUnsubscribeSingleTopic() throws Exception { String key = "TopicsConsumerSubscribeUnsubscribeSingleTopicTest"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index b6513b3e7b1..89190376092 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -1154,7 +1154,7 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer {}", topic, topicName, error.getMessage()); client.externalExecutorProvider().getExecutor().execute(() -> { AtomicInteger toCloseNum = new AtomicInteger(0); -consumers.values().stream().filter(consumer1 -> { +List filterConsumers = consumers.values().stream().filter(consumer1 -> { String consumerTopicName = consumer1.getTopic(); if (TopicName.get(consumerTopicName).getPartitionedTopicName() .equals(TopicName.get(topicName).getPartitionedTopicName())) { @@ -1163,7 +1163,14 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { } else { return false; } -}).collect(Collectors.toList()).forEach(consumer2 -> { +}).collect(Collectors.toList()); + +if (filterConsumers.isEmpty()) { +subscribeFuture.completeExceptionally(error); +return; +} + +filterConsumers.forEach(consumer2 -> { consumer2.closeAsync().whenComplete((r, ex) -> {
[pulsar] branch branch-2.11 updated: [improve][broker] Make Consumer#equals more effective (#18662)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 31308c9944d [improve][broker] Make Consumer#equals more effective (#18662) 31308c9944d is described below commit 31308c9944d19766f1dfe52534481932623c3b67 Author: 萧易客 AuthorDate: Tue Nov 29 11:59:13 2022 +0800 [improve][broker] Make Consumer#equals more effective (#18662) --- .../src/main/java/org/apache/pulsar/broker/service/Consumer.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 8dfed3e36e3..5b0b37efdef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -895,7 +895,7 @@ public class Consumer { public boolean equals(Object obj) { if (obj instanceof Consumer) { Consumer other = (Consumer) obj; -return Objects.equals(cnx.clientAddress(), other.cnx.clientAddress()) && consumerId == other.consumerId; +return consumerId == other.consumerId && Objects.equals(cnx.clientAddress(), other.cnx.clientAddress()); } return false; }
[pulsar] branch branch-2.11 updated: [Improve][Auth]Update authentication failed metrics report (#17787)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 5c47e42308a [Improve][Auth]Update authentication failed metrics report (#17787) 5c47e42308a is described below commit 5c47e42308ab81a489e322c42649c40d45c21687 Author: Guangning E AuthorDate: Mon Sep 26 13:02:08 2022 +0800 [Improve][Auth]Update authentication failed metrics report (#17787) --- .../pulsar/broker/authentication/AuthenticationProviderList.java | 8 1 file changed, 8 insertions(+) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java index c7a0387b667..f921a6f1e09 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java @@ -28,6 +28,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics; import org.apache.pulsar.common.api.AuthData; /** @@ -58,8 +59,15 @@ public class AuthenticationProviderList implements AuthenticationProvider { } if (null == authenticationException) { +AuthenticationMetrics.authenticateFailure( +AuthenticationProviderList.class.getSimpleName(), +"authentication-provider-list", "Authentication required"); throw new AuthenticationException("Authentication required"); } else { + AuthenticationMetrics.authenticateFailure(AuthenticationProviderList.class.getSimpleName(), +"authentication-provider-list", +authenticationException.getMessage() != null +? authenticationException.getMessage() : "Authentication required"); throw authenticationException; }
[pulsar] branch branch-2.11 updated: [fix][cli] Check numMessages after incrementing counter (#17826)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 636559e2493 [fix][cli] Check numMessages after incrementing counter (#17826) 636559e2493 is described below commit 636559e2493ab3d1fb91bc6dab0d83793a3876df Author: Andras Beni AuthorDate: Thu Sep 29 11:07:22 2022 +0200 [fix][cli] Check numMessages after incrementing counter (#17826) --- .../pulsar/testclient/PerformanceReader.java | 11 +++ .../pulsar/tests/integration/cli/PerfToolTest.java | 34 +- 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java index 5245f634d1b..be42bbf8a05 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java @@ -154,17 +154,18 @@ public class PerformanceReader { PerfClientUtils.exit(0); } } -if (arguments.numMessages > 0 && totalMessagesReceived.sum() >= arguments.numMessages) { -log.info("- DONE (reached the maximum number: [{}] of consumption) --", -arguments.numMessages); -PerfClientUtils.exit(0); -} messagesReceived.increment(); bytesReceived.add(msg.getData().length); totalMessagesReceived.increment(); totalBytesReceived.add(msg.getData().length); +if (arguments.numMessages > 0 && totalMessagesReceived.sum() >= arguments.numMessages) { +log.info("- DONE (reached the maximum number: [{}] of consumption) --", +arguments.numMessages); +PerfClientUtils.exit(0); +} + if (limiter != null) { limiter.acquire(); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java index 55af57d3b52..f87d11531dc 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java @@ -28,26 +28,24 @@ import org.testng.Assert; import org.testng.annotations.Test; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.IntStream; public class PerfToolTest extends TopicMessagingBase { private static final int MESSAGE_COUNT = 50; @Test -private void testProduce() throws Exception { +public void testProduce() throws Exception { String serviceUrl = "pulsar://" + pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT; final String topicName = getNonPartitionedTopic("testProduce", true); // Using the ZK container as it is separate from brokers, so its environment resembles real world usage more ZKContainer clientToolContainer = pulsarCluster.getZooKeeper(); -ContainerExecResult produceResult = produceWithPerfTool(clientToolContainer, serviceUrl, topicName); +ContainerExecResult produceResult = produceWithPerfTool(clientToolContainer, serviceUrl, topicName, MESSAGE_COUNT); checkOutputForLogs(produceResult,"PerformanceProducer - Aggregated throughput stats", "PerformanceProducer - Aggregated latency stats"); } @Test -private void testConsume() throws Exception { +public void testConsume() throws Exception { String serviceUrl = "pulsar://" + pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT; final String topicName = getNonPartitionedTopic("testConsume", true); // Using the ZK container as it is separate from brokers, so its environment resembles real world usage more @@ -57,8 +55,19 @@ public class PerfToolTest extends TopicMessagingBase { "PerformanceConsumer - Aggregated latency stats"); } -private ContainerExecResult produceWithPerfTool(ChaosContainer container, String url, String topic) throws Exception { -ContainerExecResult result = container.execCmd("bin/pulsar-perf", "produce", "-u", url, "-m", String.valueOf(MESSAGE_COUNT), topic); +@Test +public void testRead() throws Exception { +String serviceUrl = "pulsar://" + pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT; +final
[pulsar] branch branch-2.11 updated: [fix][cli] Quit PerformanceConsumer after receiving numMessages messages (#17750)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new b17ec925e89 [fix][cli] Quit PerformanceConsumer after receiving numMessages messages (#17750) b17ec925e89 is described below commit b17ec925e8992bf6a4af6ead72963256b3c2eb8d Author: Andras Beni AuthorDate: Tue Sep 27 10:46:52 2022 +0200 [fix][cli] Quit PerformanceConsumer after receiving numMessages messages (#17750) --- .../pulsar/testclient/PerformanceConsumer.java | 6 ++ .../pulsar/tests/integration/cli/PerfToolTest.java | 91 ++ .../integration/src/test/resources/pulsar-cli.xml | 1 + 3 files changed, 98 insertions(+) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index 3ac281db6fb..b7fe3ae03f0 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -314,6 +314,12 @@ public class PerformanceConsumer { totalMessagesReceived.increment(); totalBytesReceived.add(msg.size()); +if (arguments.numMessages > 0 && totalMessagesReceived.sum() >= arguments.numMessages) { +log.info("--- DONE ---"); +PerfClientUtils.exit(0); +thread.interrupt(); +} + if (limiter != null) { limiter.acquire(); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java new file mode 100644 index 000..55af57d3b52 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.cli; + +import static org.testng.Assert.fail; +import org.apache.pulsar.tests.integration.containers.ChaosContainer; +import org.apache.pulsar.tests.integration.containers.PulsarContainer; +import org.apache.pulsar.tests.integration.containers.ZKContainer; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.apache.pulsar.tests.integration.messaging.TopicMessagingBase; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class PerfToolTest extends TopicMessagingBase { + +private static final int MESSAGE_COUNT = 50; + +@Test +private void testProduce() throws Exception { +String serviceUrl = "pulsar://" + pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT; +final String topicName = getNonPartitionedTopic("testProduce", true); +// Using the ZK container as it is separate from brokers, so its environment resembles real world usage more +ZKContainer clientToolContainer = pulsarCluster.getZooKeeper(); +ContainerExecResult produceResult = produceWithPerfTool(clientToolContainer, serviceUrl, topicName); +checkOutputForLogs(produceResult,"PerformanceProducer - Aggregated throughput stats", +"PerformanceProducer - Aggregated latency stats"); +} + +@Test +private void testConsume() throws Exception { +String serviceUrl = "pulsar://" + pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT; +final String topicName = getNonPartitionedTopic("testConsume", true); +// Using the ZK container as it is separate from brokers, so its environment resembles real world usage more +ZKContainer clientToolContainer = pulsarCluster.getZooKeeper(); +
[pulsar] branch master updated (d7c4e373ac8 -> 9ab9d0170a0)
This is an automated email from the ASF dual-hosted git repository. bogong pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from d7c4e373ac8 [improve][client] AuthenticationAthenz supports Copper Argos (#19445) add 9ab9d0170a0 [improve][client] PIP-218: Consumer batchReceive() single partition every receive (#18316) No new revisions were added by this update. Summary of changes: .../client/api/ConsumerBatchReceiveTest.java | 81 +- .../pulsar/client/api/BatchReceivePolicy.java | 30 +++- .../apache/pulsar/client/impl/ConsumerBase.java| 16 + 3 files changed, 123 insertions(+), 4 deletions(-)
[pulsar] branch branch-2.11 updated: [fix] Avoid redelivering duplicated messages when batching is enabled (#18486)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new e3f7da5f5bb [fix] Avoid redelivering duplicated messages when batching is enabled (#18486) e3f7da5f5bb is described below commit e3f7da5f5bb73615e05f49682aa635f8ac84e321 Author: Yunze Xu AuthorDate: Mon Nov 21 17:43:01 2022 +0800 [fix] Avoid redelivering duplicated messages when batching is enabled (#18486) https://github.com/apache/pulsar/pull/18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by #18454. The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations. Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in #18454. `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works. The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it. - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` PR in forked repository: https://github.com/BewareMyPower/pulsar/pull/8 --- .../pulsar/client/api/ConsumerRedeliveryTest.java | 70 +- .../impl/ConsumerDedupPermitsUpdateTest.java | 21 ++- .../PersistentAcknowledgmentsGroupingTracker.java | 32 -- .../pulsar/client/impl/LastCumulativeAckTest.java | 20 +++ 4 files changed, 121 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java index 95e343acfef..070a621f612 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java @@ -25,11 +25,13 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; - +import java.util.stream.Collectors; +import java.util.stream.IntStream; import lombok.Cleanup; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +71,19 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase { return new Object[][] { { true }, { false } }; } +@DataProvider(name = "batchedMessageAck") +public Object[][] batchedMessageAck() { +// When batch index ack is disabled (by default), only after all single messages were sent would the pending +// ACK be added into the ACK tracker. +return new Object[][] { +// numAcked, batchSize, ack type +{ 3, 5, CommandAck.AckType.Individual }, +{ 5, 5, CommandAck.AckType.Individual }, +{ 3, 5, CommandAck.AckType.Cumulative }, +{ 5, 5, CommandAck.AckType.Cumulative } +}; +} + /** * It verifies that redelivered messages are sorted based on the ledger-ids. * @@ -301,4 +316,57 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase { consumer.close(); producer.close(); } + +@Test(timeOut = 3, dataProvider = "batchedMessageAck") +public void testAckNotSent(int numAcked, int batchSize, CommandAck.AckType ackType) throws Exception { +String topic = "persistent://my-property/my-ns/test-ack-not-sent-" ++ numAcked + "-" + batchSize + "-" + ackType.getValue(); +@Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING) +.topic(topic) +.subscriptionName("sub") +.enableBatchIndexAcknowledgment(false) +.acknowledgmentGroupTime(1, TimeUnit.HOURS) // ACK won't be sent +.subscribe(); +@Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING) +.topic(topic) +.enableBatching(true) +
[GitHub] [pulsar] congbobo184 merged pull request #18316: [improve][client] PIP-218: Consumer batchReceive() single partition every receive
congbobo184 merged PR #18316: URL: https://github.com/apache/pulsar/pull/18316 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch branch-2.11 updated: [fix][io] ElasticSearch sink: align null fields behaviour (#18577)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 1e07511052f [fix][io] ElasticSearch sink: align null fields behaviour (#18577) 1e07511052f is described below commit 1e07511052fc01bddda697815154da0ad7ae1a19 Author: Nicolò Boschi AuthorDate: Wed Nov 23 14:04:37 2022 +0100 [fix][io] ElasticSearch sink: align null fields behaviour (#18577) --- pom.xml| 6 ++-- .../elastic/ElasticSearchJavaRestClient.java | 12 .../io/elasticsearch/ElasticSearchClientTests.java | 35 +++--- .../io/elasticsearch/ElasticSearchTestBase.java| 4 +-- .../io/sinks/ElasticSearch7SinkTester.java | 7 - .../io/sinks/ElasticSearch8SinkTester.java | 7 - .../integration/io/sinks/OpenSearchSinkTester.java | 6 +++- 7 files changed, 60 insertions(+), 17 deletions(-) diff --git a/pom.xml b/pom.xml index 0424ecca72f..f4fb4fc0f8a 100644 --- a/pom.xml +++ b/pom.xml @@ -170,8 +170,8 @@ flexible messaging model and an intuitive client API. 3.3.3 2.4.7 1.2.4 -8.1.0 -334 +8.5.2 +363 2.13 2.13.10 1.7.2.Final @@ -231,7 +231,7 @@ flexible messaging model and an intuitive client API. 2.0.6 -1.17.2 +1.17.6 2.2 diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java index bd1efcb971b..258d9eb1cc4 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java @@ -36,7 +36,9 @@ import co.elastic.clients.elasticsearch.indices.RefreshRequest; import co.elastic.clients.json.jackson.JacksonJsonpMapper; import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.rest_client.RestClientTransport; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.Map; @@ -53,8 +55,9 @@ import org.elasticsearch.client.RestClientBuilder; public class ElasticSearchJavaRestClient extends RestClient { private final ElasticsearchClient client; -private final ObjectMapper objectMapper = new ObjectMapper(); - +private final ObjectMapper objectMapper = new ObjectMapper() +.configure(SerializationFeature.INDENT_OUTPUT, false) +.setSerializationInclusion(JsonInclude.Include.ALWAYS); private BulkProcessor bulkProcessor; private ElasticsearchTransport transport; @@ -87,8 +90,7 @@ public class ElasticSearchJavaRestClient extends RestClient { log.warn("Node host={} failed", node.getHost()); } }); -transport = new RestClientTransport(builder.build(), -new JacksonJsonpMapper()); +transport = new RestClientTransport(builder.build(), new JacksonJsonpMapper(objectMapper)); client = new ElasticsearchClient(transport); if (elasticSearchConfig.isBulkEnabled()) { bulkProcessor = new ElasticBulkProcessor(elasticSearchConfig, client, bulkProcessorListener); @@ -117,7 +119,7 @@ public class ElasticSearchJavaRestClient extends RestClient { .build(); try { final CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest); -if ((createIndexResponse.acknowledged() != null && createIndexResponse.acknowledged()) +if ((createIndexResponse.acknowledged()) && createIndexResponse.shardsAcknowledged()) { return true; } diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java index 8a453dfe451..d0cde9cf674 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java @@ -29,6 +29,7 @@ import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import eu.rekawek.toxiproxy.model.ToxicDirection; import java.io.IOException; +import java.util.Map; import java.util.Optional;
[GitHub] [pulsar] Technoboy- commented on pull request #17582: [improve][test] Remove WhiteBox for ElasticSearchSinkTests
Technoboy- commented on PR #17582: URL: https://github.com/apache/pulsar/pull/17582#issuecomment-1422169023 Cherry-picked to branch-2.11, as #18577 relies on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch branch-2.11 updated: [improve][test] Remove WhiteBox for ElasticSearchSinkTests (#17582)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new aecb04da75b [improve][test] Remove WhiteBox for ElasticSearchSinkTests (#17582) aecb04da75b is described below commit aecb04da75bac39582f29ef53baf4ec46071b82a Author: tison AuthorDate: Mon Sep 12 17:34:22 2022 +0800 [improve][test] Remove WhiteBox for ElasticSearchSinkTests (#17582) Master Issue: #16912 ### Documentation - [ ] `doc-required` (Your PR needs to update docs and you will update later) - [x] `doc-not-needed` (Please explain why) - [ ] `doc` (Your PR contains doc changes) - [ ] `doc-complete` (Docs have been already added) --- .../io/elasticsearch/ElasticSearchClient.java | 5 +++ .../pulsar/io/elasticsearch/ElasticSearchSink.java | 5 +++ .../elastic/ElasticSearchJavaRestClient.java | 35 +++- .../opensearch/OpenSearchHighLevelRestClient.java | 10 ++ .../io/elasticsearch/ElasticSearchSinkTests.java | 37 ++ testmocks/pom.xml | 5 --- 6 files changed, 57 insertions(+), 40 deletions(-) diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java index 67b04529a77..e46795e2b88 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java @@ -256,6 +256,11 @@ public class ElasticSearchClient implements AutoCloseable { } } +@VisibleForTesting +void setClient(RestClient client) { +this.client = client; +} + private void checkNotFailed() throws Exception { if (irrecoverableError.get() != null) { throw irrecoverableError.get(); diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java index 295cbc1da63..70c861825d0 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java @@ -102,6 +102,11 @@ public class ElasticSearchSink implements Sink { } } +@VisibleForTesting +void setElasticsearchClient(ElasticSearchClient elasticsearchClient) { +this.elasticsearchClient = elasticsearchClient; +} + @Override public void write(Record record) throws Exception { if (!elasticsearchClient.isFailed()) { diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java index 2166df22731..bd1efcb971b 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java @@ -40,6 +40,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.Map; +import java.util.Objects; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpHost; import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig; @@ -52,9 +53,20 @@ import org.elasticsearch.client.RestClientBuilder; public class ElasticSearchJavaRestClient extends RestClient { private final ElasticsearchClient client; -private final ElasticsearchTransport transport; private final ObjectMapper objectMapper = new ObjectMapper(); -private final BulkProcessor bulkProcessor; + +private BulkProcessor bulkProcessor; +private ElasticsearchTransport transport; + +@VisibleForTesting +public void setBulkProcessor(BulkProcessor bulkProcessor) { +this.bulkProcessor = bulkProcessor; +} + +@VisibleForTesting +public void setTransport(ElasticsearchTransport transport) { +this.transport = transport; +} public ElasticSearchJavaRestClient(ElasticSearchConfig elasticSearchConfig, BulkProcessor.Listener bulkProcessorListener) { @@ -77,7 +89,7 @@ public class ElasticSearchJavaRestClient extends RestClient { }); transport = new RestClientTransport(builder.build(), new
[pulsar] branch branch-2.11 updated: [fix][client] Fix failover/exclusive consumer with batch cumulate ack issue. (#18454)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 95270fbeaa5 [fix][client] Fix failover/exclusive consumer with batch cumulate ack issue. (#18454) 95270fbeaa5 is described below commit 95270fbeaa5463c284a00165ca632d259885f48c Author: Jiwei Guo AuthorDate: Tue Nov 15 11:37:16 2022 +0800 [fix][client] Fix failover/exclusive consumer with batch cumulate ack issue. (#18454) --- .../impl/ConsumerDedupPermitsUpdateTest.java | 21 +- .../pulsar/client/impl/NegativeAcksTest.java | 85 +- .../apache/pulsar/client/impl/ConsumerImpl.java| 9 ++- .../client/impl/MultiTopicsConsumerImpl.java | 6 +- 4 files changed, 109 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java index 4c9922acbec..ceb7d7fd484 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java @@ -116,10 +116,23 @@ public class ConsumerDedupPermitsUpdateTest extends ProducerConsumerBase { } producer.flush(); -for (int i = 0; i < 30; i++) { -Message msg = consumer.receive(); -assertEquals(msg.getValue(), "new-message-" + i); -consumer.acknowledge(msg); +if (batchingEnabled) { +for (int i = 0; i < 30; i++) { +Message msg = consumer.receive(); +assertEquals(msg.getValue(), "hello-" + i); +consumer.acknowledge(msg); +} +for (int i = 0; i < 30; i++) { +Message msg = consumer.receive(); +assertEquals(msg.getValue(), "new-message-" + i); +consumer.acknowledge(msg); +} +} else { +for (int i = 0; i < 30; i++) { +Message msg = consumer.receive(); +assertEquals(msg.getValue(), "new-message-" + i); +consumer.acknowledge(msg); +} } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index e7cc8dfd314..30aefd90b80 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ExecutorService; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import lombok.Cleanup; @@ -370,10 +371,10 @@ public class NegativeAcksTest extends ProducerConsumerBase { @Cleanup MultiTopicsConsumerImpl consumer = (MultiTopicsConsumerImpl) pulsarClient.newConsumer(Schema.INT32) -.topic(topic) -.subscriptionName("sub") -.receiverQueueSize(receiverQueueSize) -.subscribe(); +.topic(topic) +.subscriptionName("sub") +.receiverQueueSize(receiverQueueSize) +.subscribe(); ExecutorService internalPinnedExecutor = WhiteboxImpl.getInternalState(consumer, "internalPinnedExecutor"); @@ -410,4 +411,80 @@ public class NegativeAcksTest extends ProducerConsumerBase { consumer.close(); admin.topics().deletePartitionedTopic("persistent://public/default/" + topic); } + +@Test +public void testFailoverConsumerBatchCumulateAck() throws Exception { +final String topic = BrokerTestUtil.newUniqueName("my-topic"); +admin.topics().createPartitionedTopic(topic, 2); + +@Cleanup +Consumer consumer = pulsarClient.newConsumer(Schema.INT32) +.topic(topic) +.subscriptionName("sub") +.subscriptionType(SubscriptionType.Failover) +.enableBatchIndexAcknowledgment(true) +.acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS) +.receiverQueueSize(10) +.subscribe(); + +@Cleanup +Producer producer = pulsarClient.newProducer(Schema.INT32) +.topic(topic) +.batchingMaxMessages(10) +.batchingMaxPublishDelay(3, TimeUnit.SECONDS) +.blockIfQueueFull(true) +.create(); + +int count = 0; +Set datas = new HashSet<>(); +
[GitHub] [pulsar] congbobo184 commented on issue #19416: [Bug] Turn off allowAutoTopicCreation, then turn on transactionCoordinatorEnabled, unable to send, consume messages
congbobo184 commented on issue #19416: URL: https://github.com/apache/pulsar/issues/19416#issuecomment-1422162758 may be starting two clusters in the same zk will produce some problems, It's better to check the start up log to locate the problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch branch-2.11 updated: [improve][client] Change the get lastMessageId to debug level (#18421)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 6bacf53bd36 [improve][client] Change the get lastMessageId to debug level (#18421) 6bacf53bd36 is described below commit 6bacf53bd3655072af99a2f9914cd7e41acd7c2a Author: Yong Zhang AuthorDate: Mon Nov 14 08:48:21 2022 +0800 [improve][client] Change the get lastMessageId to debug level (#18421) --- .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 10 +++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 5e84a30e867..84d64fa05db 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2409,7 +2409,9 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle long requestId = client.newRequestId(); ByteBuf getLastIdCmd = Commands.newGetLastMessageId(consumerId, requestId); -log.info("[{}][{}] Get topic last message Id", topic, subscription); +if (log.isDebugEnabled()) { +log.debug("[{}][{}] Get topic last message Id", topic, subscription); +} cnx.sendGetLastMessageId(getLastIdCmd, requestId).thenAccept(cmd -> { MessageIdData lastMessageId = cmd.getLastMessageId(); @@ -2418,8 +2420,10 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle markDeletePosition = new MessageIdImpl(cmd.getConsumerMarkDeletePosition().getLedgerId(), cmd.getConsumerMarkDeletePosition().getEntryId(), -1); } -log.info("[{}][{}] Successfully getLastMessageId {}:{}", -topic, subscription, lastMessageId.getLedgerId(), lastMessageId.getEntryId()); +if (log.isDebugEnabled()) { +log.debug("[{}][{}] Successfully getLastMessageId {}:{}", +topic, subscription, lastMessageId.getLedgerId(), lastMessageId.getEntryId()); +} MessageId lastMsgId = lastMessageId.getBatchIndex() <= 0 ? new MessageIdImpl(lastMessageId.getLedgerId(),
[pulsar] branch branch-2.11 updated: [fix][client] Fix possible npe (#18406)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 698633e38d7 [fix][client] Fix possible npe (#18406) 698633e38d7 is described below commit 698633e38d737aa926c41ed7d76a1de3e68a962f Author: crossoverJie AuthorDate: Tue Nov 15 17:58:11 2022 +0800 [fix][client] Fix possible npe (#18406) --- .../src/main/java/org/apache/pulsar/client/impl/ClientCnx.java | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index da00807584e..6c458a0316f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -1270,7 +1270,10 @@ public class ClientCnx extends PulsarHandler { // if there is no request that is timed out then exit the loop break; } -request = requestTimeoutQueue.poll(); +if (!requestTimeoutQueue.remove(request)) { +// the request has been removed by another thread +continue; +} TimedCompletableFuture requestFuture = pendingRequests.get(request.requestId); if (requestFuture != null && !requestFuture.hasGotResponse()) {
[pulsar] branch branch-2.11 updated: [fix][client] Fixes batch_size not checked in MessageId#fromByteArrayWithTopic (#18405)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 69106c8ec47 [fix][client] Fixes batch_size not checked in MessageId#fromByteArrayWithTopic (#18405) 69106c8ec47 is described below commit 69106c8ec47761adcfb77235308276e52c6fda4b Author: Yunze Xu AuthorDate: Mon Nov 14 17:27:10 2022 +0800 [fix][client] Fixes batch_size not checked in MessageId#fromByteArrayWithTopic (#18405) Fixes https://github.com/apache/pulsar/issues/18395 ### Motivation The old version Pulsar clients might not set the `batch_size` field in a batched message id, it will cause `MessageId#fromByteArrayWithTopic`, which only checks the `batch_index` field, fail with IllegalStateException. ### Modifications Check if the `batch_size` field exists in `fromByteArrayWithTopic`. If it doesn't exist, create the `BatchMessageIdImpl` instance with the default batch size (0) and the acker (disabled). Move `MessageIdSerializationTest` to the `pulsar-client` module and add the `testBatchSizeNotSet` to verify the change works. --- .../java/org/apache/pulsar/client/impl/MessageIdImpl.java | 10 -- .../pulsar/client/impl}/MessageIdSerializationTest.java | 15 --- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 25f7f93c239..f5050f51a1b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -155,8 +155,14 @@ public class MessageIdImpl implements MessageId { MessageId messageId; if (idData.hasBatchIndex()) { -messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), -idData.getBatchIndex(), idData.getBatchSize(), BatchMessageAcker.newAcker(idData.getBatchSize())); +if (idData.hasBatchSize()) { +messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), +idData.getBatchIndex(), idData.getBatchSize(), +BatchMessageAcker.newAcker(idData.getBatchSize())); +} else { +messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), +idData.getBatchIndex(), 0, BatchMessageAckerDisabled.INSTANCE); +} } else { messageId = new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageIdSerializationTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java similarity index 75% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageIdSerializationTest.java rename to pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java index 295c7803372..b1cfad15128 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageIdSerializationTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java @@ -16,15 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.service; +package org.apache.pulsar.client.impl; import static org.testng.Assert.assertEquals; import java.io.IOException; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.impl.MessageIdImpl; import org.testng.annotations.Test; -@Test(groups = "broker") public class MessageIdSerializationTest { @Test @@ -32,6 +30,7 @@ public class MessageIdSerializationTest { MessageId id = new MessageIdImpl(1, 2, 3); byte[] serializedId = id.toByteArray(); assertEquals(MessageId.fromByteArray(serializedId), id); +assertEquals(MessageId.fromByteArrayWithTopic(serializedId, "my-topic"), id); } @Test @@ -39,6 +38,16 @@ public class MessageIdSerializationTest { MessageId id = new MessageIdImpl(1, 2, -1); byte[] serializedId = id.toByteArray(); assertEquals(MessageId.fromByteArray(serializedId), id); +assertEquals(MessageId.fromByteArrayWithTopic(serializedId, "my-topic"), id); +} + +@Test +public void testBatchSizeNotSet() throws Exception { +MessageId id = new BatchMessageIdImpl(1L, 2L, 3, 4, -1, +BatchMessageAckerDisabled.INSTANCE); +byte[] serialized =
[GitHub] [pulsar] congbobo184 commented on pull request #19451: [fixbug] [broker] filter messages in pending ack state.
congbobo184 commented on PR #19451: URL: https://github.com/apache/pulsar/pull/19451#issuecomment-1422156738 > 1. i find that only in `Exclusive` mode can i recur the problem, it may be have something to do with different dispatcher. In shared mode the test code can not recur the issue. exclusive redeliver messages from the cursor markdelete > 4. and, i do not understand what you say in [[feature][txn] Fix individual ack batch message with txn abort redevlier duplicate messages #14327](https://github.com/apache/pulsar/pull/14327), that is the risk of message lost. message dispatch is in order, if messages can be filtered, consumers using cumulative ack with txn will lose the messages which have been filtered. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-go] nodece opened a new pull request, #951: Improve test script
nodece opened a new pull request, #951: URL: https://github.com/apache/pulsar-client-go/pull/951 ### Motivation Test CI ### Modifications -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch branch-2.11 updated: [fix][client] Set authentication when using loadConf in client and admin client (#18358)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new d4b87dd61ee [fix][client] Set authentication when using loadConf in client and admin client (#18358) d4b87dd61ee is described below commit d4b87dd61ee01ca47b0982974697a8fc07de4f09 Author: Chris Bono AuthorDate: Sat Feb 4 14:03:23 2023 -0600 [fix][client] Set authentication when using loadConf in client and admin client (#18358) --- .../admin/internal/PulsarAdminBuilderImpl.java | 20 +++ .../client/admin/internal/PulsarAdminImpl.java | 17 +- .../admin/internal/PulsarAdminBuilderImplTest.java | 187 +++-- .../client/admin/internal/PulsarAdminImplTest.java | 55 ++ .../pulsar/client/impl/ClientBuilderImpl.java | 19 +++ .../pulsar/client/impl/PulsarClientImpl.java | 16 -- .../client/impl/conf/ClientConfigurationData.java | 11 +- .../pulsar/client/impl/ClientBuilderImplTest.java | 169 +++ .../client/impl/auth/AuthenticationTokenTest.java | 5 +- .../impl/conf/ClientConfigurationDataTest.java | 22 +-- 10 files changed, 461 insertions(+), 60 deletions(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java index a263bc34c8b..be72d43a61d 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.admin.internal; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.Authentication; @@ -57,6 +58,7 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder { @Override public PulsarAdminBuilder loadConf(Map config) { conf = ConfigurationDataUtils.loadData(config, conf, ClientConfigurationData.class); +setAuthenticationFromPropsIfAvailable(conf); return this; } @@ -86,6 +88,24 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder { return this; } +private void setAuthenticationFromPropsIfAvailable(ClientConfigurationData clientConfig) { +String authPluginClass = clientConfig.getAuthPluginClassName(); +String authParams = clientConfig.getAuthParams(); +Map authParamMap = clientConfig.getAuthParamMap(); +if (StringUtils.isBlank(authPluginClass) || (StringUtils.isBlank(authParams) && authParamMap == null)) { +return; +} +try { +if (StringUtils.isNotBlank(authParams)) { +authentication(authPluginClass, authParams); +} else if (authParamMap != null) { +authentication(authPluginClass, authParamMap); +} +} catch (UnsupportedAuthenticationException ex) { +throw new RuntimeException("Failed to create authentication: " + ex.getMessage(), ex); +} +} + @Override public PulsarAdminBuilder tlsKeyFilePath(String tlsKeyFilePath) { conf.setTlsKeyFilePath(tlsKeyFilePath); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java index 19d647ab80e..50b0d3e2b31 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java @@ -110,12 +110,9 @@ public class PulsarAdminImpl implements PulsarAdmin { this.clientConfigData = clientConfigData; this.auth = clientConfigData != null ? clientConfigData.getAuthentication() : new AuthenticationDisabled(); -LOG.debug("created: serviceUrl={}, authMethodName={}", serviceUrl, -auth != null ? auth.getAuthMethodName() : null); +LOG.debug("created: serviceUrl={}, authMethodName={}", serviceUrl, auth.getAuthMethodName()); -if (auth != null) { -auth.start(); -} +this.auth.start(); if (clientConfigData != null && StringUtils.isBlank(clientConfigData.getServiceUrl())) { clientConfigData.setServiceUrl(serviceUrl); @@ -191,7 +188,7 @@ public class PulsarAdminImpl implements PulsarAdmin { * This client object can be used to perform many subsquent API calls * * @param
[GitHub] [pulsar] Technoboy- commented on pull request #17375: [improve][admin] PulsarAdminBuilderImpl overrides timeout properties passed through config map
Technoboy- commented on PR #17375: URL: https://github.com/apache/pulsar/pull/17375#issuecomment-1422153810 > @Technoboy- Is there any specific reason that this bug fix isn't cherry-picked to maintenance branches? Ah, it's not a bug fix. I have cherry-picked to branch-2.11, because there are some modification based on this patch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch branch-2.11 updated: [improve][admin] PulsarAdminBuilderImpl overrides timeout properties passed through config map (#17375)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new d4cce14a22d [improve][admin] PulsarAdminBuilderImpl overrides timeout properties passed through config map (#17375) d4cce14a22d is described below commit d4cce14a22dda0dd25728ecdfd9a5b5fcc5a2bfa Author: Jiwei Guo AuthorDate: Fri Nov 18 12:09:15 2022 +0800 [improve][admin] PulsarAdminBuilderImpl overrides timeout properties passed through config map (#17375) --- .../pulsar/broker/admin/AdminApiTlsAuthTest.java | 2 +- .../admin/internal/PulsarAdminBuilderImpl.java | 25 +++--- .../client/admin/internal/PulsarAdminImpl.java | 54 +- .../admin/internal/PulsarAdminBuilderImplTest.java | 22 + .../pulsar/admin/cli/PulsarAdminToolTest.java | 15 +- .../client/impl/conf/ClientConfigurationData.java | 12 + 6 files changed, 55 insertions(+), 75 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java index 392f92cf41d..fb3f0cf07d2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java @@ -484,7 +484,7 @@ public class AdminApiTlsAuthTest extends MockedPulsarServiceBaseTest { .allowTlsInsecureConnection(false) .enableTlsHostnameVerification(false) .serviceHttpUrl(brokerUrlTls.toString()) -.autoCertRefreshTime(1, TimeUnit.SECONDS) +.autoCertRefreshTime(autoCertRefreshTimeSec, TimeUnit.SECONDS) .authentication("org.apache.pulsar.client.impl.auth.AuthenticationTls", String.format("tlsCertFile:%s,tlsKeyFile:%s", getTLSFile(adminUser + ".cert"), keyFile)) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java index 7e7922be6ca..a263bc34c8b 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java @@ -33,21 +33,12 @@ import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; public class PulsarAdminBuilderImpl implements PulsarAdminBuilder { protected ClientConfigurationData conf; -private int connectTimeout = PulsarAdminImpl.DEFAULT_CONNECT_TIMEOUT_SECONDS; -private int readTimeout = PulsarAdminImpl.DEFAULT_READ_TIMEOUT_SECONDS; -private int requestTimeout = PulsarAdminImpl.DEFAULT_REQUEST_TIMEOUT_SECONDS; -private int autoCertRefreshTime = PulsarAdminImpl.DEFAULT_CERT_REFRESH_SECONDS; -private TimeUnit connectTimeoutUnit = TimeUnit.SECONDS; -private TimeUnit readTimeoutUnit = TimeUnit.SECONDS; -private TimeUnit requestTimeoutUnit = TimeUnit.SECONDS; -private TimeUnit autoCertRefreshTimeUnit = TimeUnit.SECONDS; + private ClassLoader clientBuilderClassLoader = null; @Override public PulsarAdmin build() throws PulsarClientException { -return new PulsarAdminImpl(conf.getServiceUrl(), conf, connectTimeout, connectTimeoutUnit, readTimeout, -readTimeoutUnit, requestTimeout, requestTimeoutUnit, autoCertRefreshTime, -autoCertRefreshTimeUnit, clientBuilderClassLoader); +return new PulsarAdminImpl(conf.getServiceUrl(), conf, clientBuilderClassLoader); } public PulsarAdminBuilderImpl() { @@ -187,29 +178,25 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder { @Override public PulsarAdminBuilder connectionTimeout(int connectionTimeout, TimeUnit connectionTimeoutUnit) { -this.connectTimeout = connectionTimeout; -this.connectTimeoutUnit = connectionTimeoutUnit; +this.conf.setConnectionTimeoutMs((int) connectionTimeoutUnit.toMillis(connectionTimeout)); return this; } @Override public PulsarAdminBuilder readTimeout(int readTimeout, TimeUnit readTimeoutUnit) { -this.readTimeout = readTimeout; -this.readTimeoutUnit = readTimeoutUnit; +this.conf.setReadTimeoutMs((int) readTimeoutUnit.toMillis(readTimeout)); return this; } @Override public PulsarAdminBuilder requestTimeout(int requestTimeout, TimeUnit requestTimeoutUnit) { -this.requestTimeout = requestTimeout; -this.requestTimeoutUnit = requestTimeoutUnit; +
[GitHub] [pulsar] tianshimoyi commented on issue #19416: [Bug] Turn off allowAutoTopicCreation, then turn on transactionCoordinatorEnabled, unable to send, consume messages
tianshimoyi commented on issue #19416: URL: https://github.com/apache/pulsar/issues/19416#issuecomment-1422150478 @congbobo184 Yes, all broker nodes are unavailable, and restarting all broker nodes will not work. Do you know why this is? Or how to fix this problem? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch branch-2.11 updated: [refactor][java] Unify the acknowledge process for batch and non-batch message IDs (#17833)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new d2e719ac33c [refactor][java] Unify the acknowledge process for batch and non-batch message IDs (#17833) d2e719ac33c is described below commit d2e719ac33c6984a872873a7878eb5c0b7d06160 Author: Yunze Xu AuthorDate: Thu Oct 6 21:05:10 2022 +0800 [refactor][java] Unify the acknowledge process for batch and non-batch message IDs (#17833) --- .../client/impl/ConsumerAckResponseTest.java | 100 .../apache/pulsar/client/impl/ConsumerAckTest.java | 256 + .../pulsar/client/impl/BatchMessageIdImpl.java | 6 + .../PersistentAcknowledgmentsGroupingTracker.java | 139 +-- 4 files changed, 333 insertions(+), 168 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java deleted file mode 100644 index f86bbabdd88..000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.impl; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.testng.Assert.fail; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import lombok.Cleanup; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConsumerBase; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.impl.transaction.TransactionImpl; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -@Test(groups = "broker-impl") -public class ConsumerAckResponseTest extends ProducerConsumerBase { - -private TransactionImpl transaction; - -@BeforeClass(alwaysRun = true) -public void setup() throws Exception { -super.internalSetup(); -super.producerBaseSetup(); -transaction = mock(TransactionImpl.class); -doReturn(1L).when(transaction).getTxnIdLeastBits(); -doReturn(1L).when(transaction).getTxnIdMostBits(); -doReturn(TransactionImpl.State.OPEN).when(transaction).getState(); -CompletableFuture completableFuture = CompletableFuture.completedFuture(null); -doNothing().when(transaction).registerAckOp(any()); -doReturn(true).when(transaction).checkIfOpen(any()); - doReturn(completableFuture).when(transaction).registerAckedTopic(any(), any()); - -Thread.sleep(1000 * 3); -} - -@AfterClass(alwaysRun = true) -public void cleanup() throws Exception { -super.internalCleanup(); -} - -@Test -public void testAckResponse() throws PulsarClientException, InterruptedException { -String topic = "testAckResponse"; -@Cleanup -Producer producer = pulsarClient.newProducer(Schema.INT32) -.topic(topic) -.enableBatching(false) -.create(); -@Cleanup -ConsumerImpl consumer = (ConsumerImpl) pulsarClient.newConsumer(Schema.INT32) -.topic(topic) -.subscriptionName("sub") -.subscriptionType(SubscriptionType.Shared) -.ackTimeout(1, TimeUnit.SECONDS) -.subscribe(); -producer.send(1); -producer.send(2); -try { -consumer.acknowledgeAsync(new MessageIdImpl(1, 1, 1), transaction).get(); -fail(); -} catch (ExecutionException e) { -
[pulsar] branch branch-2.11 updated: [fix][broker] In the trimDeletedEntries method, release the removed entry (#18305)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 903a71947f3 [fix][broker] In the trimDeletedEntries method, release the removed entry (#18305) 903a71947f3 is described below commit 903a71947f342ac2ffa83f6fa0f411e48a8b4b92 Author: LinChen <1572139...@qq.com> AuthorDate: Thu Nov 3 20:51:18 2022 +0800 [fix][broker] In the trimDeletedEntries method, release the removed entry (#18305) --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 10 -- .../org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java | 5 + 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 514dd24fa4d..c6f0dcdaafc 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -3265,8 +3265,14 @@ public class ManagedCursorImpl implements ManagedCursor { @Override public void trimDeletedEntries(List entries) { -entries.removeIf(entry -> ((PositionImpl) entry.getPosition()).compareTo(markDeletePosition) <= 0 -|| individualDeletedMessages.contains(entry.getLedgerId(), entry.getEntryId())); +entries.removeIf(entry -> { +boolean isDeleted = ((PositionImpl) entry.getPosition()).compareTo(markDeletePosition) <= 0 +|| individualDeletedMessages.contains(entry.getLedgerId(), entry.getEntryId()); +if (isDeleted) { +entry.release(); +} +return isDeleted; +}); } private ManagedCursorImpl cursorImpl() { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 5b8476d244a..cdc493b51ab 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -2668,6 +2668,11 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { assertEquals(entries.size(), 1); assertEquals(entries.get(0).getPosition(), PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 7)); + +assertEquals(entry1.refCnt(), 0); +assertEquals(entry2.refCnt(), 0); +assertEquals(entry3.refCnt(), 0); +assertEquals(entry4.refCnt(), 0); } @Test(timeOut = 2)
[pulsar] branch branch-2.11 updated: Make BookieId work with PulsarRegistrationDriver (second take) (#17922)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 39696c29e58 Make BookieId work with PulsarRegistrationDriver (second take) (#17922) 39696c29e58 is described below commit 39696c29e587af615c0fe9658cf09b0edb8207ea Author: Enrico Olivelli AuthorDate: Sat Oct 15 18:24:20 2022 +0200 Make BookieId work with PulsarRegistrationDriver (second take) (#17922) * Make BookieId work with PulsarRegistrationDriver (#17762) * Make BookieId work with PulsarRegistrationDriver * Switch to MetadataCache * checkstyle * Do not execute lookup on MetadataCache in the getBookieServiceInfo caller thread --- .../bookkeeper/BookieServiceInfoSerde.java | 55 +- .../bookkeeper/PulsarRegistrationClient.java | 119 - .../bookkeeper/PulsarRegistrationClientTest.java | 62 +++ 3 files changed, 230 insertions(+), 6 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java index 78a33179e76..b7e3024b637 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.discover.BookieServiceInfo; +import org.apache.bookkeeper.discover.BookieServiceInfoUtils; import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat; import org.apache.pulsar.metadata.api.MetadataSerde; import org.apache.pulsar.metadata.api.Stat; @@ -63,7 +64,57 @@ public class BookieServiceInfoSerde implements MetadataSerde } @Override -public BookieServiceInfo deserialize(String path, byte[] content, Stat stat) throws IOException { -return null; +public BookieServiceInfo deserialize(String path, byte[] bookieServiceInfo, Stat stat) throws IOException { +// see https://github.com/apache/bookkeeper/blob/ +// 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/ +// src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L311 +String bookieId = extractBookiedIdFromPath(path); +if (bookieServiceInfo == null || bookieServiceInfo.length == 0) { +return BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookieId); +} + +BookieServiceInfoFormat builder = BookieServiceInfoFormat.parseFrom(bookieServiceInfo); +BookieServiceInfo bsi = new BookieServiceInfo(); +List endpoints = builder.getEndpointsList().stream() +.map(e -> { +BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint(); +endpoint.setId(e.getId()); +endpoint.setPort(e.getPort()); +endpoint.setHost(e.getHost()); +endpoint.setProtocol(e.getProtocol()); +endpoint.setAuth(e.getAuthList()); +endpoint.setExtensions(e.getExtensionsList()); +return endpoint; +}) +.collect(Collectors.toList()); + +bsi.setEndpoints(endpoints); +bsi.setProperties(builder.getPropertiesMap()); + +return bsi; + +} + +/** + * Extract the BookieId + * The path should look like /ledgers/available/bookieId + * or /ledgers/available/readonly/bookieId. + * But the prefix depends on the configuration. + * @param path + * @return the bookieId + */ +private static String extractBookiedIdFromPath(String path) throws IOException { +// https://github.com/apache/bookkeeper/blob/ +// 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/ +// src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L258 +if (path == null) { +path = ""; +} +int last = path.lastIndexOf("/"); +if (last >= 0) { +return path.substring(last + 1); +} else { +throw new IOException("The path " + path + " doesn't look like a valid path for a BookieServiceInfo node"); +} } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java index 52b50e3ea4b..f314c0efaf0 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java +++
[pulsar] branch branch-2.11 updated: Allow to configure and disable the size of lookahead for detecting fixed delays in messages (#17907)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 6d0b9e718ac Allow to configure and disable the size of lookahead for detecting fixed delays in messages (#17907) 6d0b9e718ac is described below commit 6d0b9e718ac542253b0e4a2d8d9db0441376f1ef Author: Matteo Merli AuthorDate: Sat Oct 1 08:06:13 2022 -0700 Allow to configure and disable the size of lookahead for detecting fixed delays in messages (#17907) --- conf/broker.conf| 6 ++ .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java| 6 ++ 2 files changed, 12 insertions(+) diff --git a/conf/broker.conf b/conf/broker.conf index 5a0150e34ea..f4d406ef509 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -551,6 +551,12 @@ isDelayedDeliveryDeliverAtTimeStrict=false # fixed delays in messages in a different way. delayedDeliveryFixedDelayDetectionLookahead=5 +# Size of the lookahead window to use when detecting if all the messages in the topic +# have a fixed delay. +# Default is 50,000. Setting the lookahead window to 0 will disable the logic to handle +# fixed delays in messages in a different way. +delayedDeliveryFixedDelayDetectionLookahead=5 + # Whether to enable acknowledge of batch local index. acknowledgmentAtBatchIndexLevelEnabled=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 1383e33af9a..0d8cb31b662 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -356,6 +356,12 @@ public class ServiceConfiguration implements PulsarConfiguration { + "logic to handle fixed delays in messages in a different way.") private long delayedDeliveryFixedDelayDetectionLookahead = 50_000; +@FieldContext(category = CATEGORY_SERVER, doc = "Size of the lookahead window to use " ++ "when detecting if all the messages in the topic have a fixed delay. " ++ "Default is 50,000. Setting the lookahead window to 0 will disable the " ++ "logic to handle fixed delays in messages in a different way.") +private long delayedDeliveryFixedDelayDetectionLookahead = 50_000; + @FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index") private boolean acknowledgmentAtBatchIndexLevelEnabled = false;
[pulsar] branch branch-2.11 updated: [fix][admin] returns 4xx error when pulsar-worker-service is disabled and trying to access it (#17901)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 345c25c64d2 [fix][admin] returns 4xx error when pulsar-worker-service is disabled and trying to access it (#17901) 345c25c64d2 is described below commit 345c25c64d21226ce1eb093cfeb8275cda77aec8 Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com> AuthorDate: Wed Nov 2 17:40:19 2022 -0700 [fix][admin] returns 4xx error when pulsar-worker-service is disabled and trying to access it (#17901) --- .../apache/pulsar/broker/admin/AdminResource.java | 9 .../pulsar/broker/admin/impl/FunctionsBase.java| 2 +- .../apache/pulsar/broker/admin/impl/SinksBase.java | 2 +- .../pulsar/broker/admin/impl/SourcesBase.java | 2 +- .../apache/pulsar/broker/admin/v2/Functions.java | 2 +- .../org/apache/pulsar/broker/admin/v2/Worker.java | 4 +- .../apache/pulsar/broker/admin/v2/WorkerStats.java | 2 +- .../apache/pulsar/broker/PulsarServiceTest.java| 57 -- 8 files changed, 69 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 79002bbdc6a..d8c6b3882c3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -65,6 +65,7 @@ import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException; import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; @@ -280,6 +281,14 @@ public abstract class AdminResource extends PulsarWebResource { } } +protected WorkerService validateAndGetWorkerService() { +try { +return pulsar().getWorkerService(); +} catch (UnsupportedOperationException e) { +throw new RestException(Status.CONFLICT, e.getMessage()); +} +} + protected Policies getNamespacePolicies(NamespaceName namespaceName) { try { Policies policies = namespaceResources().getPolicies(namespaceName) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index f1c4c105de6..3e26215c32e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -55,7 +55,7 @@ import org.glassfish.jersey.media.multipart.FormDataParam; public class FunctionsBase extends AdminResource { Functions functions() { -return pulsar().getWorkerService().getFunctions(); +return validateAndGetWorkerService().getFunctions(); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java index d45016454f5..3f97545010c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java @@ -50,7 +50,7 @@ import org.glassfish.jersey.media.multipart.FormDataParam; public class SinksBase extends AdminResource { Sinks sinks() { -return pulsar().getWorkerService().getSinks(); +return validateAndGetWorkerService().getSinks(); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java index b4ba332c312..5d020290a31 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java @@ -50,7 +50,7 @@ import org.glassfish.jersey.media.multipart.FormDataParam; public class SourcesBase extends AdminResource { Sources sources() { -return pulsar().getWorkerService().getSources(); +return validateAndGetWorkerService().getSources(); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java index a83ea8cd514..1164e371bd7 100644 ---
[pulsar] branch branch-2.11 updated: [fix] [pulsar-client] Fix pendingLookupRequestSemaphore leak when Ser… (#18219)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 761465015eb [fix] [pulsar-client] Fix pendingLookupRequestSemaphore leak when Ser… (#18219) 761465015eb is described below commit 761465015eb1b68eb636cae1a92ca8cee78a7db2 Author: ZhangJian He AuthorDate: Thu Oct 27 23:01:37 2022 +0800 [fix] [pulsar-client] Fix pendingLookupRequestSemaphore leak when Ser… (#18219) ### Motivation https://github.com/apache/pulsar/blob/b061c6ac5833c21e483368febebd0d30679a35e1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L748-L774 The `pendingLookupRequestSemaphore` will leak when handleError. There are `LookUpRequestSemaphore` not released when removing it from `pendingRequests` related PR: #17856 ### Modifications We can't easily release the semaphore in `handleError`, because there are not only `LookUpRequest`. So release the semaphore when LookupException ### Verifying this change Add unit test case to cover this change ### Documentation - [ ] `doc-required` (Your PR needs to update docs and you will update later) - [x] `doc-not-needed` bug fixs, no need doc - [ ] `doc` (Your PR contains doc changes) - [ ] `doc-complete` (Docs have been already added) --- .../org/apache/pulsar/client/impl/ClientCnx.java | 3 +- .../apache/pulsar/client/impl/ClientCnxTest.java | 43 ++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 14a33cd3203..da00807584e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -784,7 +784,8 @@ public class ClientCnx extends PulsarHandler { if (pendingLookupRequestSemaphore.tryAcquire()) { future.whenComplete((lookupDataResult, throwable) -> { -if (throwable instanceof ConnectException) { +if (throwable instanceof ConnectException +|| throwable instanceof PulsarClientException.LookupException) { pendingLookupRequestSemaphore.release(); } }); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index 63aa7b7048b..7c1524fafed 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -120,6 +120,49 @@ public class ClientCnxTest { eventLoop.shutdownGracefully(); } +@Test +public void testPendingLookupRequestSemaphoreServiceNotReady() throws Exception { +EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout")); +ClientConfigurationData conf = new ClientConfigurationData(); +conf.setOperationTimeoutMs(10_000); +conf.setKeepAliveIntervalSeconds(0); +ClientCnx cnx = new ClientCnx(conf, eventLoop); + +ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); +Channel channel = mock(Channel.class); +when(ctx.channel()).thenReturn(channel); +ChannelFuture listenerFuture = mock(ChannelFuture.class); +when(listenerFuture.addListener(any())).thenReturn(listenerFuture); +when(ctx.writeAndFlush(any())).thenReturn(listenerFuture); +cnx.channelActive(ctx); +cnx.state = ClientCnx.State.Ready; +CountDownLatch countDownLatch = new CountDownLatch(1); +CompletableFuture completableFuture = new CompletableFuture<>(); +new Thread(() -> { +try { +Thread.sleep(1_000); +CompletableFuture future = +cnx.newLookup(null, 123); +countDownLatch.countDown(); +future.get(); +} catch (Exception e) { +completableFuture.complete(e); +} +}).start(); +countDownLatch.await(); +CommandError commandError = new CommandError(); +commandError.setRequestId(123L); +commandError.setError(ServerError.ServiceNotReady); +commandError.setMessage("Service not ready"); +cnx.handleError(commandError); +assertTrue(completableFuture.get().getCause() instanceof PulsarClientException.LookupException); +// wait for subsequent calls over +Awaitility.await().untilAsserted(() -> { +
[pulsar] branch branch-2.11 updated: [fix][broker] Fix the order of resource close in the InMemoryDelayedDeliveryTracker (#18000)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 7c3dcecd72f [fix][broker] Fix the order of resource close in the InMemoryDelayedDeliveryTracker (#18000) 7c3dcecd72f is described below commit 7c3dcecd72f6ed7636196e3a9b06fb36c81d8728 Author: Cong Zhao AuthorDate: Thu Oct 13 12:20:12 2022 +0800 [fix][broker] Fix the order of resource close in the InMemoryDelayedDeliveryTracker (#18000) --- .../delayed/InMemoryDelayedDeliveryTracker.java| 9 ++-- .../delayed/InMemoryDeliveryTrackerTest.java | 49 +++--- 2 files changed, 49 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index bb3edef5074..38cc3c4f6e0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -33,7 +33,7 @@ import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue; @Slf4j public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, TimerTask { -private final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue(); +protected final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue(); private final PersistentDispatcherMultipleConsumers dispatcher; @@ -41,7 +41,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T private final Timer timer; // Current timeout or null if not set -private Timeout timeout; +protected Timeout timeout; // Timestamp at which the timeout is currently set private long currentTimeoutTarget; @@ -264,7 +264,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T if (log.isDebugEnabled()) { log.debug("[{}] Timer triggered", dispatcher.getName()); } -if (timeout.isCancelled()) { +if (timeout == null || timeout.isCancelled()) { return; } @@ -278,10 +278,11 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T @Override public void close() { -priorityQueue.close(); if (timeout != null) { timeout.cancel(); +timeout = null; } +priorityQueue.close(); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index 1ff47a4ca50..11b681d80a6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -28,13 +28,13 @@ import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; - import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; - +import io.netty.util.concurrent.DefaultThreadFactory; import java.time.Clock; import java.util.Collections; import java.util.NavigableMap; @@ -42,10 +42,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - -import io.netty.util.concurrent.DefaultThreadFactory; import lombok.Cleanup; - import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.awaitility.Awaitility; @@ -433,4 +430,46 @@ public class InMemoryDeliveryTrackerTest { assertFalse(tracker.shouldPauseAllDeliveries()); } +@Test +public void testClose() throws Exception { +Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"), +1, TimeUnit.MILLISECONDS); + +PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); + +AtomicLong clockTime = new AtomicLong(); +Clock clock = mock(Clock.class); +when(clock.millis()).then(x -> clockTime.get()); + +final Exception[] exceptions = new Exception[1]; + +InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, +
[pulsar] branch branch-2.11 updated: [fix][meta] fix getChildren in MemoryMetadataStore and EtcdMetadataStore (#18172)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 20016f8fb60 [fix][meta] fix getChildren in MemoryMetadataStore and EtcdMetadataStore (#18172) 20016f8fb60 is described below commit 20016f8fb60007b87ddaca373a5152ca7620e7d6 Author: Cong Zhao AuthorDate: Tue Oct 25 22:05:39 2022 +0800 [fix][meta] fix getChildren in MemoryMetadataStore and EtcdMetadataStore (#18172) --- .../pulsar/metadata/impl/EtcdMetadataStore.java| 5 ++-- .../metadata/impl/LocalMemoryMetadataStore.java| 2 +- .../apache/pulsar/metadata/MetadataStoreTest.java | 29 ++ .../impl/LocalMemoryMetadataStoreTest.java | 4 +-- 4 files changed, 35 insertions(+), 5 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java index d6d39b50cd7..4b73e22e8fd 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java @@ -388,11 +388,12 @@ public class EtcdMetadataStore extends AbstractBatchedMetadataStore { case GET_CHILDREN: { OpGetChildren getChildren = op.asGetChildren(); GetResponse gr = txnResponse.getGetResponses().get(getIdx++); -String basePath = getChildren.getPath() + "/"; +String basePath = +getChildren.getPath().equals("/") ? "/" : getChildren.getPath() + "/"; Set children = gr.getKvs().stream() .map(kv -> kv.getKey().toString(StandardCharsets.UTF_8)) -.map(p -> p.replace(basePath, "")) +.map(p -> p.replaceFirst(basePath, "")) // Only return first-level children .map(k -> k.split("/", 2)[0]) .collect(Collectors.toCollection(TreeSet::new)); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java index 6fa04dfe680..c91b22f2da6 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java @@ -131,7 +131,7 @@ public class LocalMemoryMetadataStore extends AbstractMetadataStore implements M Set children = new TreeSet<>(); map.subMap(firstKey, false, lastKey, false).forEach((key, value) -> { -String relativePath = key.replace(firstKey, ""); +String relativePath = key.replaceFirst(firstKey, ""); // Only return first-level children String child = relativePath.split("/", 2)[0]; diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java index b8a091fef25..9e0c6887d94 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -500,4 +501,32 @@ public class MetadataStoreTest extends BaseMetadataStoreTest { assertTrue(f1.isCompletedExceptionally() && !f2.isCompletedExceptionally() || ! f1.isCompletedExceptionally() && f2.isCompletedExceptionally()); } + +@Test(dataProvider = "impl") +public void testGetChildren(String provider, Supplier urlSupplier) throws Exception { +@Cleanup +MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build()); + +store.put("/a/a-1", "value1".getBytes(StandardCharsets.UTF_8), Optional.empty()).join(); +store.put("/a/a-2", "value1".getBytes(StandardCharsets.UTF_8), Optional.empty()).join(); +store.put("/b/c/b/1", "value1".getBytes(StandardCharsets.UTF_8), Optional.empty()).join(); + +List subPaths = store.getChildren("/").get(); +Set expectedSet = "ZooKeeper".equals(provider) ? Set.of("a", "b", "zookeeper") : Set.of("a", "b"); +for (String subPath : subPaths) { +
[pulsar] branch branch-2.11 updated: [fix][broker]Fix mutex never released when trimming (#17911)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new db2237f869e [fix][broker]Fix mutex never released when trimming (#17911) db2237f869e is described below commit db2237f869e9e0842c4c78fc9d24211c3093a35e Author: feynmanlin <315157...@qq.com> AuthorDate: Tue Oct 11 10:31:41 2022 +0800 [fix][broker]Fix mutex never released when trimming (#17911) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 17 +-- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 34 ++ 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ba0f4f8559f..1fbdee5fa21 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2574,7 +2574,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { return; } -advanceCursorsIfNecessary(ledgersToDelete); +try { +advanceCursorsIfNecessary(ledgersToDelete); +} catch (LedgerNotExistException e) { +log.info("First non deleted Ledger is not found, stop trimming"); +metadataMutex.unlock(); +trimmerMutex.unlock(); +return; +} PositionImpl currentLastConfirmedEntry = lastConfirmedEntry; // Update metadata @@ -2647,7 +2654,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { * This is to make sure that the `consumedEntries` counter is correctly updated with the number of skipped * entries and the stats are reported correctly. */ -private void advanceCursorsIfNecessary(List ledgersToDelete) { +@VisibleForTesting +void advanceCursorsIfNecessary(List ledgersToDelete) throws LedgerNotExistException { if (ledgersToDelete.isEmpty()) { return; } @@ -2655,7 +2663,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // need to move mark delete for non-durable cursors to the first ledger NOT marked for deletion // calling getNumberOfEntries latter for a ledger that is already deleted will be problematic and return // incorrect results -long firstNonDeletedLedger = ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId()); +Long firstNonDeletedLedger = ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId()); +if (firstNonDeletedLedger == null) { +throw new LedgerNotExistException("First non deleted Ledger is not found"); +} PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1); cursors.forEach(cursor -> { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 7adf5f9abb4..b5fc3d65cfd 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -22,9 +22,11 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -3548,6 +3550,38 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { ledger.close(); } +@Test +public void testLockReleaseWhenTrimLedger() throws Exception { +ManagedLedgerConfig config = new ManagedLedgerConfig(); +config.setMaxEntriesPerLedger(1); + +ManagedLedgerImpl ledger = spy((ManagedLedgerImpl)factory.open("testLockReleaseWhenTrimLedger", config)); +doThrow(new ManagedLedgerException.LedgerNotExistException("First non deleted Ledger is not found")) +.when(ledger).advanceCursorsIfNecessary(any()); +final int entries = 10; +ManagedCursor cursor = ledger.openCursor("test-cursor" + UUID.randomUUID()); +for (int i = 0; i < entries; i++) {
[pulsar] branch branch-2.11 updated: [fix][broker]unify time unit at dropping the backlog on a topic (#17957)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 627b96f5e1d [fix][broker]unify time unit at dropping the backlog on a topic (#17957) 627b96f5e1d is described below commit 627b96f5e1dc950d218572c132c5e591bbfe13e4 Author: Qiang Huang AuthorDate: Mon Oct 10 11:04:08 2022 +0800 [fix][broker]unify time unit at dropping the backlog on a topic (#17957) --- .../pulsar/broker/service/BacklogQuotaManager.java | 2 +- .../broker/service/BacklogQuotaManagerTest.java| 54 ++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index 915f9e7c6b9..2744469ea8d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -226,7 +226,7 @@ public class BacklogQuotaManager { } // Timestamp only > 0 if ledger has been closed if (ledgerInfo.getTimestamp() > 0 -&& currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime()) { +&& currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime() * 1000) { // skip whole ledger for the slowest cursor PositionImpl nextPosition = PositionImpl.get(mLedger.getNextValidLedger(ledgerInfo.getLedgerId()), -1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 4f16ebd0bec..5de9a3298fa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -532,6 +532,60 @@ public class BacklogQuotaManagerTest { client.close(); } +@Test(timeOut = 6) +public void testConsumerBacklogEvictionTimeQuotaWithPartEviction() throws Exception { +assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), +new HashMap<>()); +admin.namespaces().setBacklogQuota("prop/ns-quota", +BacklogQuota.builder() +.limitTime(5) // set limit time as 5 seconds + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) +.build(), BacklogQuota.BacklogQuotaType.message_age); +PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) +.build(); + +final String topic1 = "persistent://prop/ns-quota/topic3" + UUID.randomUUID(); +final String subName1 = "c1"; +final String subName2 = "c2"; +int numMsgs = 5; + +Consumer consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); +Consumer consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe(); +org.apache.pulsar.client.api.Producer producer = createProducer(client, topic1); +byte[] content = new byte[1024]; +for (int i = 0; i < numMsgs; i++) { +producer.send(content); +consumer1.receive(); +consumer2.receive(); +} + +TopicStats stats = getTopicStats(topic1); +assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 5); +assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 5); + +// Sleep 5000 mills for first 5 messages. +Thread.sleep(5000L); +numMsgs = 9; +for (int i = 0; i < numMsgs; i++) { +producer.send(content); +consumer1.receive(); +consumer2.receive(); +} + +// The first 5 messages are expired after sleeping 2000 more mills. +Thread.sleep(2000L); +rolloverStats(); + +TopicStats stats2 = getTopicStats(topic1); +// The first 5 messages should be expired due to limit time is 5 seconds, and the last 9 message should not. +Awaitility.await().untilAsserted(() -> { + assertEquals(stats2.getSubscriptions().get(subName1).getMsgBacklog(), 9); + assertEquals(stats2.getSubscriptions().get(subName2).getMsgBacklog(), 9); +}); +client.close(); +} + + @Test public void testConsumerBacklogEvictionTimeQuotaWithEmptyLedger() throws Exception {
[GitHub] [pulsar] tjiuming commented on a diff in pull request #18886: [fix][broker] Fix PendingAckHandleImpl when replay failed.
tjiuming commented on code in PR #18886: URL: https://github.com/apache/pulsar/pull/18886#discussion_r1099741514 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -283,12 +283,22 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS // ignore it for now and let the message dedup logic to take care of it } else { final String subscriptionName = Codec.decode(cursor.getName()); -subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor, +PersistentSubscription subscription = createPersistentSubscription(subscriptionName, cursor, PersistentSubscription.isCursorFromReplicatedSubscription(cursor), -cursor.getCursorProperties())); -// subscription-cursor gets activated by default: deactivate as there is no active subscription right -// now -subscriptions.get(subscriptionName).deactivateCursor(); +cursor.getCursorProperties()); +subscriptions.put(subscriptionName, subscription); +subscription.getInitializeFuture() +.exceptionally(t -> { +log.warn("PersistentSubscription [{}] pendingAckHandleImpl relay failed " ++ "when initialize topic [{}].", subscriptionName, topic, t); +if (subscriptions.remove(subscriptionName, subscription)) { +subscription.retryClose(); Review Comment: I think it may happen. ``` subscription.getInitializeFuture() .exceptionally(t -> { // ignore... }) ``` will be executed after the topic created. If a consumer connected the topic and unsubscribe the topic, `subscriptions.remove(...)` may return false. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch branch-2.11 updated: [improve][connector] JDBC sink: allow any jdbc driver (#17951)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 65aaae4523f [improve][connector] JDBC sink: allow any jdbc driver (#17951) 65aaae4523f is described below commit 65aaae4523fc3e4f54197a1c3b510a54cdc80afa Author: Nicolò Boschi AuthorDate: Tue Oct 11 09:52:59 2022 +0200 [improve][connector] JDBC sink: allow any jdbc driver (#17951) --- .../apache/pulsar/io/jdbc/JdbcAbstractSink.java| 2 - .../org/apache/pulsar/io/jdbc/JdbcDriverType.java | 62 -- .../java/org/apache/pulsar/io/jdbc/JdbcUtils.java | 10 3 files changed, 74 deletions(-) diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java index dbf27407ca1..4b993bc6a0b 100644 --- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java +++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java @@ -89,8 +89,6 @@ public abstract class JdbcAbstractSink implements Sink { properties.setProperty("password", password); } - - Class.forName(JdbcUtils.getDriverClassName(jdbcSinkConfig.getJdbcUrl())); connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), properties); connection.setAutoCommit(!jdbcSinkConfig.isUseTransactions()); log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, connection.getAutoCommit()); diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcDriverType.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcDriverType.java deleted file mode 100644 index 1a4710622b7..000 --- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcDriverType.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.jdbc; - -import lombok.Getter; - -@Getter -public enum JdbcDriverType { - -CLICKHOUSE("jdbc:clickhouse:", "ru.yandex.clickhouse.ClickHouseDriver"), -DB2("jdbc:db2:", "com.ibm.db2.jcc.DB2Driver"), -DERBY_CLIENT("jdbc:derby://", "org.apache.derby.jdbc.ClientDriver"), -DERBY_EMBEDDED("jdbc:derby:", "org.apache.derby.jdbc.EmbeddedDriver"), -FIREBIRD("jdbc:firebird:", "org.firebirdsql.jdbc.FBDriver"), -FIREBIRD_SQL("jdbc:firebirdsql:", "org.firebirdsql.jdbc.FBDriver"), -H2("jdbc:h2:", "org.h2.Driver"), -HSQL("jdbc:hsqldb:", "org.hsqldb.jdbcDriver"), -INFORMIX("jdbc:informix-sqli:", "com.informix.jdbc.IfxDriver"), -JTDS("jdbc:jtds:", "net.sourceforge.jtds.jdbc.Driver"), -MARIADB("jdbc:mariadb:", "org.mariadb.jdbc.Driver"), -MYSQL("jdbc:mysql:", "com.mysql.cj.jdbc.Driver"), -MYSQL_GOOGLE("jdbc:google:", "com.mysql.jdbc.GoogleDriver"), -ORACLE("jdbc:oracle", "oracle.jdbc.OracleDriver"), -POSTGRESQL("jdbc:postgresql:", "org.postgresql.Driver"), -REDSHIFT("jdbc:redshift:", "com.amazon.redshift.jdbc42.Driver"), -SAPHANA("jdbc:sap:", "com.sap.db.jdbc.Driver"), -SNOWFLAKE("jdbc:snowflake:", "net.snowflake.client.jdbc.SnowflakeDriver"), -SQLDROID("jdbc:sqldroid:", "org.sqldroid.SQLDroidDriver"), -SQLLITE("jdbc:sqlite:", "org.sqlite.JDBC"), -SQLSERVER("jdbc:sqlserver:", "com.microsoft.sqlserver.jdbc.SQLServerDriver"), -SYBASE("jdbc:sybase:", "com.sybase.jdbc4.jdbc.SybDriver"), -TEST_CONTAINERS("jdbc:tc:", "org.testcontainers.jdbc.ContainerDatabaseDriver"), -OPENMLDB("jdbc:openmldb:", "com._4paradigm.openmldb.jdbc.SQLDriver"); - -JdbcDriverType(String prefix, String driverClass) { -this.prefix = prefix; -this.driverClass = driverClass; -} - -private final String prefix; -private final String driverClass; - -public boolean matches(String url) { -return url.startsWith(prefix); -} -} diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
[pulsar] branch branch-2.11 updated: [fix][broker] Fix incorrect bundle split count metric (#17970)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 302efd1e637 [fix][broker] Fix incorrect bundle split count metric (#17970) 302efd1e637 is described below commit 302efd1e637af3109aeb8c2badff9f1515259a8f Author: 萧易客 AuthorDate: Tue Oct 11 10:37:23 2022 +0800 [fix][broker] Fix incorrect bundle split count metric (#17970) --- .../pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java | 10 ++ site2/docs/reference-metrics.md| 2 +- .../website/versioned_docs/version-2.10.1/reference-metrics.md | 2 +- .../website/versioned_docs/version-2.6.0/reference-metrics.md | 2 +- .../website/versioned_docs/version-2.6.1/reference-metrics.md | 2 +- .../website/versioned_docs/version-2.6.2/reference-metrics.md | 2 +- .../website/versioned_docs/version-2.6.3/reference-metrics.md | 2 +- .../website/versioned_docs/version-2.6.4/reference-metrics.md | 2 +- .../website/versioned_docs/version-2.7.0/reference-metrics.md | 2 +- .../website/versioned_docs/version-2.7.1/reference-metrics.md | 2 +- .../website/versioned_docs/version-2.7.2/reference-metrics.md | 2 +- .../website/versioned_docs/version-2.7.3/reference-metrics.md | 2 +- .../website/versioned_docs/version-2.7.4/reference-metrics.md | 2 +- .../website/versioned_docs/version-2.8.0/reference-metrics.md | 2 +- .../website/versioned_docs/version-2.8.1/reference-metrics.md | 2 +- .../website/versioned_docs/version-2.8.2/reference-metrics.md | 2 +- .../website/versioned_docs/version-2.8.3/reference-metrics.md | 2 +- .../website/versioned_docs/version-2.9.0/reference-metrics.md | 2 +- .../website/versioned_docs/version-2.9.1/reference-metrics.md | 2 +- .../website/versioned_docs/version-2.9.3/reference-metrics.md | 2 +- 20 files changed, 25 insertions(+), 23 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 3b0834027df..2f63580e31e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -755,6 +755,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager { synchronized (bundleSplitStrategy) { final Set bundlesToBeSplit = bundleSplitStrategy.findBundlesToSplit(loadData, pulsar); NamespaceBundleFactory namespaceBundleFactory = pulsar.getNamespaceService().getNamespaceBundleFactory(); +int splitCount = 0; for (String bundleName : bundlesToBeSplit) { try { final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundleName); @@ -776,13 +777,14 @@ public class ModularLoadManagerImpl implements ModularLoadManager { pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName, bundleRange, unloadSplitBundles, null); +splitCount++; log.info("Successfully split namespace bundle {}", bundleName); } catch (Exception e) { log.error("Failed to split namespace bundle {}", bundleName, e); } } -updateBundleSplitMetrics(bundlesToBeSplit); +updateBundleSplitMetrics(splitCount); } } @@ -790,10 +792,10 @@ public class ModularLoadManagerImpl implements ModularLoadManager { /** * As leader broker, update bundle split metrics. * - * @param bundlesToBeSplit + * @param bundlesSplit the number of bundles splits */ -private void updateBundleSplitMetrics(Set bundlesToBeSplit) { -bundleSplitCount += bundlesToBeSplit.size(); +private void updateBundleSplitMetrics(int bundlesSplit) { +bundleSplitCount += bundlesSplit; List metrics = Lists.newArrayList(); Map dimensions = new HashMap<>(); diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md index 20567826244..662a6c4a394 100644 --- a/site2/docs/reference-metrics.md +++ b/site2/docs/reference-metrics.md @@ -397,7 +397,7 @@ All the bundleUnloading metrics are labelled with the following labels: | Name | Type| Description | |---|-|| -| pulsar_lb_bundles_split_total | Counter | bundle split count in this bundle splitting check interval | +| pulsar_lb_bundles_split_total | Counter | The
[pulsar] branch branch-2.11 updated: [fix][function] Fix invalid metric type `gauge ` (#18129)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new f7087f0dbd1 [fix][function] Fix invalid metric type `gauge ` (#18129) f7087f0dbd1 is described below commit f7087f0dbd168d779aa01fa00c875096b15cfb8f Author: Cong Zhao AuthorDate: Mon Oct 24 13:11:24 2022 +0800 [fix][function] Fix invalid metric type `gauge ` (#18129) --- .../java/org/apache/pulsar/functions/worker/WorkerStatsManager.java| 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java index c9b7c1da8db..4c7d28cc70d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java @@ -329,7 +329,8 @@ public class WorkerStatsManager { stream.write("# TYPE "); stream.write(PULSAR_FUNCTION_WORKER_METRICS_PREFIX); stream.write(metricName); -stream.write(" gauge \n"); +stream.write(" gauge"); +stream.write("\n"); stream.write(PULSAR_FUNCTION_WORKER_METRICS_PREFIX); stream.write(metricName);
[GitHub] [pulsar] congbobo184 commented on issue #19416: [Bug] Turn off allowAutoTopicCreation, then turn on transactionCoordinatorEnabled, unable to send, consume messages
congbobo184 commented on issue #19416: URL: https://github.com/apache/pulsar/issues/19416#issuecomment-1422119424 seems like brokers are all down, load balancer can't find an available broker. can you test: 1. create a new topic 2. send messages without transaction what the result of this case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] nodece commented on a diff in pull request #19390: [fix][authentication] Store the original auth when using anonymous role
nodece commented on code in PR #19390: URL: https://github.com/apache/pulsar/pull/19390#discussion_r1099738894 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java: ## @@ -971,6 +973,7 @@ protected void handleConnect(CommandConnect connect) { authRole = getBrokerService().getAuthenticationService().getAnonymousUserRole() Review Comment: Sounds good, we need to improve the proxy codebase to find a workaround, this is going to take some time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] thetumbled commented on pull request #19451: [fixbug] [broker] filter messages in pending ack state.
thetumbled commented on PR #19451: URL: https://github.com/apache/pulsar/pull/19451#issuecomment-1422114703 > There have some problem: > > 1. use Exclusive, why not use cumulative ack? > 2. only check the postion, how batch message can be redeliver? > > I don't think it's necessary to fix it, at least for now 1. i find that only in `Exclusive` mode can i recur the problem, it may be have something to do with different dispatcher. In shared mode the test code can not recur the issue. 2. batch messages in pending ack state are filtered by your PR #14327. 3. According to my understanding, PR #14327 try to filter batch messages in pending ack state. for example, producer produce a batch message consisting of two messages msg1 and msg2. consumer use txn1 ack msg1, use txn2 ack msg2, then the consumer abort txn2, at this time, the bitset hold in PendingAckHandleImpl is 01, the bitset hold in cursor is 11. So when broker redeliver msgs to consumer triggered by transaction abortion, broker will send a batch message with bitset 11. so the consumer receive msg1 and msg2 again. but msg1 should be filtered. 4. and, i do not understand what you say in #14327, that is the risk of message lost. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-site] tjiuming commented on pull request #399: [doc][feat] Added doc to deploy a pulsar cluster on IBM cloud services
tjiuming commented on PR #399: URL: https://github.com/apache/pulsar-site/pull/399#issuecomment-1422106647 LGTM, but I'm not particularly familiar with helm chart. Could @mattisonchao help review? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch branch-2.11 updated: [fix][broker] Update the log print content of createSubscriptions (#18024)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new b2515241f6d [fix][broker] Update the log print content of createSubscriptions (#18024) b2515241f6d is described below commit b2515241f6de0576f36273e72d703d82ffca13b1 Author: HuangZeGui AuthorDate: Fri Oct 14 09:20:38 2022 +0800 [fix][broker] Update the log print content of createSubscriptions (#18024) Co-authored-by: huangzegui --- .../java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 7a8b047071a..c175b22c91f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -4401,7 +4401,7 @@ public class PersistentTopicsBase extends AdminResource { }); FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> { -log.info("[{}] Successfully created new partitions {}", clientAppId(), topicName); +log.info("[{}] Successfully created subscriptions on new partitions {}", clientAppId(), topicName); result.complete(null); }).exceptionally(ex -> { log.warn("[{}] Failed to create subscriptions on new partitions for {}",
[pulsar] branch branch-2.11 updated: [improve][broker] Remove locallyAcquiredLock when removeOwnership (#18197)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new d6e30b87cd3 [improve][broker] Remove locallyAcquiredLock when removeOwnership (#18197) d6e30b87cd3 is described below commit d6e30b87cd37f329136a6b1613427ca3715839d3 Author: Lei Zhiyuan AuthorDate: Thu Oct 27 14:43:32 2022 +0800 [improve][broker] Remove locallyAcquiredLock when removeOwnership (#18197) --- .../org/apache/pulsar/broker/namespace/OwnershipCache.java | 13 - .../apache/pulsar/broker/namespace/OwnershipCacheTest.java | 1 + 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index 67e986b804c..7fd573187f3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -22,6 +22,7 @@ import com.github.benmanes.caffeine.cache.AsyncCacheLoader; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.collect.Lists; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.MoreExecutors; import java.util.List; import java.util.Map; @@ -213,7 +214,7 @@ public class OwnershipCache { * */ public CompletableFuture removeOwnership(NamespaceBundle bundle) { -ResourceLock lock = locallyAcquiredLocks.get(bundle); +ResourceLock lock = locallyAcquiredLocks.remove(bundle); if (lock == null) { // We don't own the specified bundle anymore return CompletableFuture.completedFuture(null); @@ -330,6 +331,16 @@ public class OwnershipCache { this.ownedBundlesCache.synchronous().invalidateAll(); } +public void invalidateLocalOwnerCache(NamespaceBundle namespaceBundle) { +this.ownedBundlesCache.synchronous().invalidate(namespaceBundle); +} + +@VisibleForTesting +public Map> getLocallyAcquiredLocks() { +return locallyAcquiredLocks; +} + + public synchronized boolean refreshSelfOwnerInfo() { this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getSafeWebServiceAddress(), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java index 6168c61bb20..afe4d4c8b5b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java @@ -367,6 +367,7 @@ public class OwnershipCacheTest { Awaitility.await().untilAsserted(() -> { assertTrue(cache.getOwnedBundles().isEmpty()); assertFalse(store.exists(ServiceUnitUtils.path(bundle)).join()); +assertNull(cache.getLocallyAcquiredLocks().get(bundle)); }); }
[pulsar] branch branch-2.11 updated: [improve][broker] Support setting forceDeleteTenantAllowed dynamically (#18192)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new a8d8dbf130f [improve][broker] Support setting forceDeleteTenantAllowed dynamically (#18192) a8d8dbf130f is described below commit a8d8dbf130fc8be785273d6985ec781bfe682d67 Author: Jiwei Guo AuthorDate: Thu Oct 27 09:49:37 2022 +0800 [improve][broker] Support setting forceDeleteTenantAllowed dynamically (#18192) --- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 1 + .../org/apache/pulsar/broker/service/BrokerServiceTest.java | 12 2 files changed, 13 insertions(+) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index b79b557e582..1383e33af9a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -621,6 +621,7 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_POLICIES, +dynamic = true, doc = "Allow forced deletion of tenants. Default is false." ) private boolean forceDeleteTenantAllowed = false; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index e13e9b201f7..73f322ec3a3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -1510,4 +1510,16 @@ public class BrokerServiceTest extends BrokerTestBase { assertTrue(conf.isForceDeleteNamespaceAllowed()); }); } + +@Test +public void testDynamicConfigurationsForceDeleteTenantAllowed() throws Exception { +cleanup(); +conf.setForceDeleteTenantAllowed(false); +setup(); +admin.brokers() +.updateDynamicConfiguration("forceDeleteTenantAllowed", "true"); +Awaitility.await().untilAsserted(()->{ +assertTrue(conf.isForceDeleteTenantAllowed()); +}); +} }
[pulsar] branch branch-2.11 updated: [improve][broker] Support setting `ForceDeleteNamespaceAllowed` dynamically (#18181)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new bf8b494c286 [improve][broker] Support setting `ForceDeleteNamespaceAllowed` dynamically (#18181) bf8b494c286 is described below commit bf8b494c28659b7d8f58fb4a8e81847bef25b106 Author: Jiwei Guo AuthorDate: Tue Oct 25 22:07:13 2022 +0800 [improve][broker] Support setting `ForceDeleteNamespaceAllowed` dynamically (#18181) --- .../apache/pulsar/broker/ServiceConfiguration.java | 1 + .../pulsar/broker/service/BrokerServiceTest.java | 36 ++ 2 files changed, 37 insertions(+) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 7e9534bc9bc..b79b557e582 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -627,6 +627,7 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_POLICIES, +dynamic = true, doc = "Allow forced deletion of namespaces. Default is false." ) private boolean forceDeleteNamespaceAllowed = false; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 1fb63470456..e13e9b201f7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -88,6 +88,7 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.ConnectionPool; @@ -98,6 +99,7 @@ import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse; import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.LocalPolicies; @@ -1474,4 +1476,38 @@ public class BrokerServiceTest extends BrokerTestBase { assertTrue(brokerService.isSystemTopic("persistent://" + heartbeatNamespaceV1.toString() + "/healthcheck")); assertTrue(brokerService.isSystemTopic(heartbeatNamespaceV2.toString() + "/healthcheck")); } + +@Test +public void testGetTopic() throws Exception { +final String ns = "prop/ns-test"; +admin.namespaces().createNamespace(ns, 2); +final String topicName = ns + "/topic-1"; + admin.topics().createNonPartitionedTopic(String.format("persistent://%s", topicName)); +Producer producer1 = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); +producer1.close(); +PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName.toString(), false).get().get(); +persistentTopic.close().join(); +List topics = new ArrayList<>(pulsar.getBrokerService().getTopics().keys()); +topics.removeIf(item -> item.contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)); +Assert.assertEquals(topics.size(), 0); +@Cleanup +Consumer consumer = pulsarClient.newConsumer(Schema.STRING) +.topic(topicName) +.subscriptionName("sub-1") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) +.subscriptionType(SubscriptionType.Shared) +.subscribe(); +} + +@Test +public void testDynamicConfigurationsForceDeleteNamespaceAllowed() throws Exception { +cleanup(); +conf.setForceDeleteNamespaceAllowed(false); +setup(); +admin.brokers() +.updateDynamicConfiguration("forceDeleteNamespaceAllowed", "true"); +Awaitility.await().untilAsserted(()->{ +assertTrue(conf.isForceDeleteNamespaceAllowed()); +}); +} }
[pulsar] branch branch-2.11 updated: [fix][client] Fix IllegalThreadStateException when using newThread in ExecutorProvider.ExtendedThreadFactory
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 89252e6a7ea [fix][client] Fix IllegalThreadStateException when using newThread in ExecutorProvider.ExtendedThreadFactory 89252e6a7ea is described below commit 89252e6a7ea96bfd0d3ce29c94496f0fca1a7899 Author: Cong Zhao AuthorDate: Tue Nov 1 12:47:42 2022 +0800 [fix][client] Fix IllegalThreadStateException when using newThread in ExecutorProvider.ExtendedThreadFactory --- .../main/java/org/apache/pulsar/client/util/ExecutorProvider.java| 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java index 548c0a2abc4..90e1a59ed4d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java @@ -42,7 +42,7 @@ public class ExecutorProvider { public static class ExtendedThreadFactory extends DefaultThreadFactory { @Getter -private Thread thread; +private volatile Thread thread; public ExtendedThreadFactory(String poolName) { super(poolName, false); } @@ -52,9 +52,10 @@ public class ExecutorProvider { @Override public Thread newThread(Runnable r) { -thread = super.newThread(r); +Thread thread = super.newThread(r); thread.setUncaughtExceptionHandler((t, e) -> log.error("Thread {} got uncaught Exception", t.getName(), e)); +this.thread = thread; return thread; } }
[pulsar] branch branch-2.11 updated: [improve][broker] Add UncaughtExceptionHandler for every thread pool (#18211)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new bfaf39c4dd0 [improve][broker] Add UncaughtExceptionHandler for every thread pool (#18211) bfaf39c4dd0 is described below commit bfaf39c4dd0ac495edab3c948b372305c2567762 Author: feynmanlin <315157...@qq.com> AuthorDate: Thu Oct 27 11:29:53 2022 +0800 [improve][broker] Add UncaughtExceptionHandler for every thread pool (#18211) --- .../org/apache/pulsar/broker/PulsarService.java| 11 .../broker/TransactionMetadataStoreService.java| 4 +-- .../loadbalance/impl/ModularLoadManagerImpl.java | 7 +++-- .../loadbalance/impl/SimpleLoadManagerImpl.java| 4 +-- .../pulsar/broker/service/BrokerService.java | 33 -- .../metrics/PrometheusMetricsProvider.java | 5 ++-- .../pulsar/broker/tools/LoadReportCommand.java | 4 ++- .../pulsar/client/impl/AutoClusterFailover.java| 4 +-- .../client/impl/ControlledClusterFailover.java | 4 +-- .../pulsar/client/impl/PulsarClientImpl.java | 5 ++-- .../pulsar/client/util/ExecutorProvider.java | 8 -- .../pulsar/functions/instance/InstanceCache.java | 4 +-- .../worker/ClusterServiceCoordinator.java | 13 + 13 files changed, 65 insertions(+), 41 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 6cb761d7761..2cb48ed1549 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -314,13 +314,13 @@ public class PulsarService implements AutoCloseable, ShutdownService { this.config = config; this.processTerminator = processTerminator; this.loadManagerExecutor = Executors -.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-load-manager")); +.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager")); this.workerConfig = workerConfig; this.functionWorkerService = functionWorkerService; this.executor = Executors.newScheduledThreadPool(config.getNumExecutorThreadPoolSize(), -new DefaultThreadFactory("pulsar")); +new ExecutorProvider.ExtendedThreadFactory("pulsar")); this.cacheExecutor = Executors.newScheduledThreadPool(config.getNumCacheExecutorThreadPoolSize(), -new DefaultThreadFactory("zk-cache-callback")); +new ExecutorProvider.ExtendedThreadFactory("zk-cache-callback")); if (config.isTransactionCoordinatorEnabled()) { this.transactionExecutorProvider = new ExecutorProvider(this.getConfiguration() @@ -613,7 +613,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private CompletableFuture addTimeoutHandling(CompletableFuture future) { ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor( -new DefaultThreadFactory(getClass().getSimpleName() + "-shutdown")); +new ExecutorProvider.ExtendedThreadFactory(getClass().getSimpleName() + "-shutdown")); FutureUtil.addTimeoutHandling(future, Duration.ofMillis(Math.max(1L, getConfiguration().getBrokerShutdownTimeoutMs())), shutdownExecutor, () -> FutureUtil.createTimeoutException("Timeout in close", getClass(), "close")); @@ -1423,7 +1423,8 @@ public class PulsarService implements AutoCloseable, ShutdownService { protected synchronized ScheduledExecutorService getCompactorExecutor() { if (this.compactorExecutor == null) { -compactorExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("compaction")); +compactorExecutor = Executors.newSingleThreadScheduledExecutor( +new ExecutorProvider.ExtendedThreadFactory("compaction")); } return this.compactorExecutor; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index a15039b3ff8..fb56e8e6857 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -24,7 +24,6 @@ import static org.apache.pulsar.transaction.coordinator.proto.TxnStatus.COMMITTI import com.google.common.annotations.VisibleForTesting; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; -import
[pulsar] branch branch-2.11 updated: [fix][client] Support LocalDateTime Conversion (#18334)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 80fd119333d [fix][client] Support LocalDateTime Conversion (#18334) 80fd119333d is described below commit 80fd119333da1a4a89d18baf2f33a36eb7bac313 Author: Cong Zhao AuthorDate: Wed Nov 9 17:02:07 2022 +0800 [fix][client] Support LocalDateTime Conversion (#18334) * Support LocalDateTime Conversion * move `TimestampMicrosConversion` to correct line --- .../pulsar/client/impl/schema/AvroSchema.java | 4 +++- .../pulsar/client/impl/schema/AvroSchemaTest.java | 21 + 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java index d2ea9cd4a9f..592ffb5291f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java @@ -118,6 +118,8 @@ public class AvroSchema extends AvroBaseStructSchema { reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion()); reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion()); +reflectData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion()); +reflectData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion()); if (jsr310ConversionEnabled) { // The conversion that is registered first is higher priority than the registered later. reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()); @@ -128,8 +130,8 @@ public class AvroSchema extends AvroBaseStructSchema { } catch (ClassNotFoundException e) { // Skip if have not provide joda-time dependency. } -reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion()); } +reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion()); reflectData.addLogicalTypeConversion(new Conversions.UUIDConversion()); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java index 2a5040d7815..ed2c8597ded 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java @@ -32,6 +32,7 @@ import io.netty.buffer.ByteBufAllocator; import java.math.BigDecimal; import java.time.Instant; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.LocalTime; import java.time.temporal.ChronoUnit; import java.util.Arrays; @@ -549,4 +550,24 @@ public class AvroSchemaTest { Assert.assertNotEquals(Instant.class, decodeWithJsonNoClassLoader.getValue().getClass()); } +@Data +@AllArgsConstructor +@NoArgsConstructor +private static class LocalDateTimePojo { +LocalDateTime value; +} + +@Test +public void testLocalDateTime() { +SchemaDefinition schemaDefinition = + SchemaDefinition.builder().withPojo(LocalDateTimePojo.class) +.withJSR310ConversionEnabled(true).build(); + +AvroSchema avroSchema = AvroSchema.of(schemaDefinition); +LocalDateTime now = LocalDateTime.now(); +byte[] bytes = avroSchema.encode(new LocalDateTimePojo(now)); + +LocalDateTimePojo pojo = avroSchema.decode(bytes); +assertEquals(pojo.getValue().truncatedTo(ChronoUnit.MILLIS), now.truncatedTo(ChronoUnit.MILLIS)); +} }
[pulsar] branch branch-2.11 updated: [fix][client] Fix exception when calling loadConf on a ConsumerBuilder that has a KeySharedPolicy (#18345)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new ad61b984c09 [fix][client] Fix exception when calling loadConf on a ConsumerBuilder that has a KeySharedPolicy (#18345) ad61b984c09 is described below commit ad61b984c096795b981234bca13def3f3ee935af Author: Christophe Bornet AuthorDate: Wed Nov 16 01:45:24 2022 +0100 [fix][client] Fix exception when calling loadConf on a ConsumerBuilder that has a KeySharedPolicy (#18345) --- .../impl/conf/ConsumerConfigurationData.java | 1 + .../client/impl/ConsumerBuilderImplTest.java | 232 + 2 files changed, 233 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 040db866008..46568ec8314 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -166,6 +166,7 @@ public class ConsumerConfigurationData implements Serializable, Cloneable { private boolean resetIncludeHead = false; +@JsonIgnore private transient KeySharedPolicy keySharedPolicy; private boolean batchIndexAckEnabled = false; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java index cc190186351..fdb372ae390 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java @@ -22,10 +22,15 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -34,17 +39,30 @@ import java.util.regex.Pattern; import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.ConsumerEventListener; +import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.DeadLetterPolicy; +import org.apache.pulsar.client.api.KeySharedPolicy; +import org.apache.pulsar.client.api.MessageCrypto; +import org.apache.pulsar.client.api.MessageListener; +import org.apache.pulsar.client.api.MessagePayloadProcessor; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.RedeliveryBackoff; +import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData; +import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; /** @@ -370,4 +388,218 @@ public class ConsumerBuilderImplTest { assertThat(topicConsumerConfigurationData.getTopicNameMatcher().matches("foo")).isTrue(); assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(1); } + +@Test +public void testLoadConf() throws Exception { +ConsumerBuilderImpl consumerBuilder = createConsumerBuilder(); + +String jsonConf = ("{\n" ++ "'topicNames' : [ 'new-topic' ],\n" ++ "'topicsPattern' : 'new-topics-pattern',\n" ++ "'subscriptionName' : 'new-subscription',\n" ++ "'subscriptionType' : 'Key_Shared',\n" ++ "'subscriptionProperties' : {\n" ++ " 'new-sub-prop' : 'new-sub-prop-value'\n" ++ "},\n" ++ "'subscriptionMode' :
[GitHub] [pulsar] Technoboy- commented on pull request #18107: [improve][client] Support MAX_ACK_GROUP_SIZE configurable
Technoboy- commented on PR #18107: URL: https://github.com/apache/pulsar/pull/18107#issuecomment-1422091889 #18345 relies on this, so cherry-picked to branch-2.11 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch branch-2.11 updated: [improve][client] Support MAX_ACK_GROUP_SIZE configurable (#18107)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new e0aa601f946 [improve][client] Support MAX_ACK_GROUP_SIZE configurable (#18107) e0aa601f946 is described below commit e0aa601f946722ea062fa375b669c6daa0c11ef0 Author: LinChen <1572139...@qq.com> AuthorDate: Fri Oct 21 09:35:36 2022 +0800 [improve][client] Support MAX_ACK_GROUP_SIZE configurable (#18107) The current MAX_ACK_GROUP_SIZE is fixed at 1000, increase the configuration acknowledgementsGroupSize, support MAX_ACK_GROUP_SIZE configurable: https://github.com/apache/pulsar/blob/afcdbf0e2b5fb905e1f82f0220436f8f9ec0c742/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L63-L64 1.Add configuration acknowledgementsGroupSize; --- .../pulsar/client/impl/TopicsConsumerImplTest.java | 59 ++ .../apache/pulsar/client/api/ConsumerBuilder.java | 11 .../pulsar/client/impl/ConsumerBuilderImpl.java| 7 +++ .../PersistentAcknowledgmentsGroupingTracker.java | 15 +++--- .../impl/conf/ConsumerConfigurationData.java | 15 ++ .../client/impl/ConsumerBuilderImplTest.java | 14 + 6 files changed, 114 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index 8d068d65114..d597d72ad81 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -22,8 +22,12 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import io.netty.util.Timeout; import lombok.Cleanup; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -52,6 +56,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.util.ArrayList; +import java.util.Set; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -163,6 +168,60 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { consumer.close(); } +@Test +public void testMaxAcknowledgmentGroupSize() throws Exception { +final String namespace = "use/ns-abc"; +final String topicName = "persistent://" + namespace + "/topic1"; +TenantInfoImpl tenantInfo = createDefaultTenantInfo(); +admin.tenants().createTenant("use", tenantInfo); +admin.namespaces().createNamespace(namespace, Set.of("test")); +int acknowledgmentGroupSize = 6; + +Producer producer = pulsarClient.newProducer() +.topic(topicName) +.enableBatching(false) +.messageRoutingMode(MessageRoutingMode.SinglePartition) +.create(); +Consumer consumer = pulsarClient.newConsumer().topic(topicName) +.subscriptionName("my-sub") +.acknowledgmentGroupTime(1, TimeUnit.SECONDS) +.maxAcknowledgmentGroupSize(acknowledgmentGroupSize) +.subscribe(); + +PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); +ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) topic.getManagedLedger(); +ManagedCursorImpl cursor = (ManagedCursorImpl) managedLedger.getCursors().iterator().next(); + +for (int i = 0; i < 10; i++) { +String message = "my-message-" + i; +producer.send(message.getBytes()); +} + +MessageIdImpl ackMessageId = new MessageIdImpl(-1, -1, -1); +for (int i = 0; i < 10; i++) { +Message msg = consumer.receive(5, TimeUnit.SECONDS); +if (msg != null) { +MessageId messageId = msg.getMessageId(); +consumer.acknowledge(msg); +// When the acknowledgmentGroupSize message is confirmed, send ack will be triggered +if (i == (acknowledgmentGroupSize - 1)) { +ackMessageId = (MessageIdImpl) messageId; +} +} +} + +Awaitility.await().until(() -> cursor.getMarkDeletedPosition().getLedgerId() != -1); +Position
[GitHub] [pulsar] tianshimoyi commented on issue #19416: [Bug] Turn off allowAutoTopicCreation, then turn on transactionCoordinatorEnabled, unable to send, consume messages
tianshimoyi commented on issue #19416: URL: https://github.com/apache/pulsar/issues/19416#issuecomment-1422088816 2023-02-08T06:34:12,402+ [pulsar-2-1] INFO org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - 0 brokers being considered for assignment of public/default/0x_0x1000 2023-02-08T06:34:12,402+ [pulsar-2-1] WARN org.apache.pulsar.broker.namespace.NamespaceService - No broker is available for public/default/0x_0x1000 2023-02-08T06:34:12,402+ [pulsar-2-1] WARN org.apache.pulsar.broker.namespace.NamespaceService - Load manager didn't return any available broker. Returning empty result to lookup. NamespaceBundle[public/default/0x_0x1000] 2023-02-08T06:34:12,403+ [pulsar-io-6-4] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /10.0.35.166:36022 2023-02-08T06:34:15,597+ [pulsar-io-6-1] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /10.0.49.22:40676 2023-02-08T06:34:15,598+ [pulsar-io-6-2] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /10.0.49.22:40680 2023-02-08T06:34:15,599+ [pulsar-io-6-3] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /10.0.49.22:40682 2023-02-08T06:34:15,599+ [pulsar-io-6-4] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /10.0.49.22:40684 2023-02-08T06:34:15,600+ [pulsar-io-6-1] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /10.0.49.22:40688 2023-02-08T06:34:15,600+ [pulsar-io-6-2] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /10.0.49.22:40690 2023-02-08T06:34:15,601+ [pulsar-io-6-3] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /10.0.49.22:40692 2023-02-08T06:34:15,601+ [pulsar-io-6-4] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /10.0.49.22:40696 2023-02-08T06:34:15,602+ [pulsar-io-6-1] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /10.0.49.22:40698 2023-02-08T06:34:15,602+ [pulsar-io-6-2] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /10.0.49.22:40700 2023-02-08T06:34:15,602+ [pulsar-io-6-3] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /10.0.49.22:40702 2023-02-08T06:34:15,603+ [pulsar-io-6-4] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /10.0.49.22:40706 2023-02-08T06:34:15,603+ [pulsar-io-6-1] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /10.0.49.22:40708 2023-02-08T06:34:15,604+ [pulsar-io-6-2] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /10.0.49.22:40710 2023-02-08T06:34:15,605+ [pulsar-io-6-3] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /10.0.49.22:40712 2023-02-08T06:34:15,605+ [pulsar-io-6-4] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /10.0.49.22:40714 2023-02-08T06:34:15,611+ [pulsar-2-1] INFO org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - 0 brokers being considered for assignment of pulsar/system/0x5000_0x6000 2023-02-08T06:34:15,611+ [pulsar-2-1] WARN org.apache.pulsar.broker.namespace.NamespaceService - No broker is available for pulsar/system/0x5000_0x6000 2023-02-08T06:34:15,611+ [pulsar-2-1] WARN org.apache.pulsar.broker.namespace.NamespaceService - Load manager didn't return any available broker. Returning empty result to lookup. NamespaceBundle[pulsar/system/0x5000_0x6000] 2023-02-08T06:34:15,612+ [pulsar-2-2] INFO org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - 0 brokers being considered for assignment of pulsar/system/0x8000_0x9000 2023-02-08T06:34:15,612+ [pulsar-2-2] WARN org.apache.pulsar.broker.namespace.NamespaceService - No broker is available for pulsar/system/0x8000_0x9000 2023-02-08T06:34:15,612+ [pulsar-2-2] WARN org.apache.pulsar.broker.namespace.NamespaceService - Load manager didn't return any available broker. Returning empty result to lookup. NamespaceBundle[pulsar/system/0x8000_0x9000] 2023-02-08T06:34:15,612+ [pulsar-2-1] INFO org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - 0 brokers being considered for assignment of pulsar/system/0xf000_0x 2023-02-08T06:34:15,612+ [pulsar-2-1] WARN org.apache.pulsar.broker.namespace.NamespaceService - No broker is available for pulsar/system/0xf000_0x 2023-02-08T06:34:15,612+ [pulsar-2-1] WARN org.apache.pulsar.broker.namespace.NamespaceService - Load manager didn't return any available broker. Returning empty result to lookup. NamespaceBundle[pulsar/system/0xf000_0x] 2023-02-08T06:34:15,612+ [pulsar-2-2] INFO
[pulsar] branch branch-2.11 updated: [improve][java-client]Add init capacity for messages in BatchMessageContainerImpl (#17822)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 9c166b74f85 [improve][java-client]Add init capacity for messages in BatchMessageContainerImpl (#17822) 9c166b74f85 is described below commit 9c166b74f85d7a438c859972582e4031bbe4dd24 Author: Xiaoyu Hou AuthorDate: Fri Sep 30 10:33:37 2022 +0800 [improve][java-client]Add init capacity for messages in BatchMessageContainerImpl (#17822) --- .../client/impl/AbstractBatchMessageContainer.java | 8 +++ .../client/impl/BatchMessageContainerImpl.java | 5 +- .../client/impl/BatchMessageContainerImplTest.java | 59 ++ 3 files changed, 70 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java index 73f1e6d0889..9b4d1b7d683 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.List; import lombok.extern.slf4j.Slf4j; @@ -46,10 +47,12 @@ public abstract class AbstractBatchMessageContainer implements BatchMessageConta protected long currentTxnidLeastBits = -1L; protected static final int INITIAL_BATCH_BUFFER_SIZE = 1024; +protected static final int INITIAL_MESSAGES_NUM = 32; // This will be the largest size for a batch sent from this particular producer. This is used as a baseline to // allocate a new buffer that can hold the entire batch without needing costly reallocations protected int maxBatchSize = INITIAL_BATCH_BUFFER_SIZE; +protected int maxMessagesNum = INITIAL_MESSAGES_NUM; @Override public boolean haveEnoughSpace(MessageImpl msg) { @@ -71,6 +74,11 @@ public abstract class AbstractBatchMessageContainer implements BatchMessageConta return numMessagesInBatch; } +@VisibleForTesting +public int getMaxMessagesNum() { +return maxMessagesNum; +} + @Override public long getCurrentBatchSize() { return currentBatchSizeBytes; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index 2d91aafb7e3..4e5b8b87edb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -58,7 +58,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { @Setter private long highestSequenceId = -1L; private ByteBuf batchedMessageMetadataAndPayload; -private List> messages = new ArrayList<>(); +private List> messages = new ArrayList<>(maxMessagesNum); protected SendCallback previousCallback = null; // keep track of callbacks for individual messages being published in a batch protected SendCallback firstCallback; @@ -169,12 +169,13 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { // Update the current max batch size using the uncompressed size, which is what we need in any case to // accumulate the batch content maxBatchSize = Math.max(maxBatchSize, uncompressedSize); +maxMessagesNum = Math.max(maxMessagesNum, numMessagesInBatch); return compressedPayload; } @Override public void clear() { -messages = new ArrayList<>(); +messages = new ArrayList<>(maxMessagesNum); firstCallback = null; previousCallback = null; messageMetadata.clear(); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java index 1e640301f89..373a6c0fed0 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java @@ -19,14 +19,18 @@ package org.apache.pulsar.client.impl; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import io.netty.buffer.ByteBufAllocator;
[pulsar] branch branch-2.11 updated: fix comments for exposeManagedLedgerMetricsInPrometheus field (#17792)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 23ade06288c fix comments for exposeManagedLedgerMetricsInPrometheus field (#17792) 23ade06288c is described below commit 23ade06288c2c14b59157503114546ce6941679d Author: HuangZeGui AuthorDate: Sat Sep 24 23:14:40 2022 +0800 fix comments for exposeManagedLedgerMetricsInPrometheus field (#17792) --- conf/broker.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conf/broker.conf b/conf/broker.conf index 35a821016ad..5a0150e34ea 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1378,7 +1378,7 @@ metricsBufferResponse=false # Enable producer level metrics. default is false exposeProducerLevelMetricsInPrometheus=false -# Enable managed ledger metrics (aggregated by namespace). default is false +# Enable managed ledger metrics (aggregated by namespace). default is true exposeManagedLedgerMetricsInPrometheus=true # Enable cursor level metrics. default is false
[pulsar] branch branch-2.11 updated: Issue 17588: Allow deletion of a namespace that was left in deleted status (#17592)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new d83f4e14737 Issue 17588: Allow deletion of a namespace that was left in deleted status (#17592) d83f4e14737 is described below commit d83f4e14737288f8ed31ebc3ca5e7ba0310d7adb Author: Enrico Olivelli AuthorDate: Wed Sep 14 08:42:02 2022 +0200 Issue 17588: Allow deletion of a namespace that was left in deleted status (#17592) --- .../org/apache/pulsar/broker/namespace/NamespaceService.java | 2 +- .../java/org/apache/pulsar/broker/web/PulsarWebResource.java | 7 ++- .../java/org/apache/pulsar/broker/admin/NamespacesTest.java | 12 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 4b8d02da002..467262c8003 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1224,7 +1224,7 @@ public class NamespaceService implements AutoCloseable { public CompletableFuture> getListOfNonPersistentTopics(NamespaceName namespaceName) { -return PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName) +return PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName, true) .thenCompose(peerClusterData -> { // if peer-cluster-data is present it means namespace is owned by that peer-cluster and request // should redirect to the peer-cluster diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 85605e201a0..910db719a97 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -855,6 +855,11 @@ public abstract class PulsarWebResource { public static CompletableFuture checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService, NamespaceName namespace) { +return checkLocalOrGetPeerReplicationCluster(pulsarService, namespace, false); +} +public static CompletableFuture checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService, + NamespaceName namespace, + boolean allowDeletedNamespace) { if (!namespace.isGlobal() || NamespaceService.isHeartbeatNamespace(namespace)) { return CompletableFuture.completedFuture(null); } @@ -866,7 +871,7 @@ public abstract class PulsarWebResource { .getPoliciesAsync(namespace).thenAccept(policiesResult -> { if (policiesResult.isPresent()) { Policies policies = policiesResult.get(); -if (policies.deleted) { +if (!allowDeletedNamespace && policies.deleted) { String msg = String.format("Namespace %s is deleted", namespace.toString()); log.warn(msg); validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index dc4df8330cc..16dfe5bc9a3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -1192,6 +1192,18 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { topicList = admin.topics().getList(namespace); assertTrue(topicList.isEmpty()); +// simulate a partially deleted namespace, we should be able to recover +pulsar.getPulsarResources().getNamespaceResources() +.setPolicies(NamespaceName.get(namespace), old -> { +old.deleted = true; +return old; +}); +admin.namespaces().deleteNamespace(namespace, true); + +admin.namespaces().createNamespace(namespace, 100); +topicList = admin.topics().getList(namespace); +assertTrue(topicList.isEmpty()); + // reset back to false pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false); }
[pulsar] branch branch-2.11 updated: [fix][tiered-storage] Don't cleanup data when offload met Metastore exception (#17512)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 32f0e35b20d [fix][tiered-storage] Don't cleanup data when offload met Metastore exception (#17512) 32f0e35b20d is described below commit 32f0e35b20df19f5c70307b88e76410a72ba32e2 Author: Yong Zhang AuthorDate: Wed Sep 14 10:41:13 2022 +0800 [fix][tiered-storage] Don't cleanup data when offload met Metastore exception (#17512) * [fix][tiered-storage] Don't cleanup data when offload met BadVersion --- *Motivation* There have two ways that will cause the offload data cleanup. One is met offload conflict exception, and another is completeLedgerInfoForOffloaded reaches max retry time and throws zookeeper exceptions. We retry the zookeeper operation on connection loss exception. We should be careful about this exception, because we may loss data if the metadata update successfully. When a MetaStore exception happens, we can not make sure the metadata update is failed or not. Because we have a retry on the connection loss, it is possible to get a BadVersion or other exception after retrying. So we don't clean up the data if this happens. *Modification* - don't delete data if has meta store exception * log error when skip deleting * improve logs --- .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 15 ++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 5530fa4950b..ba0f4f8559f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3019,8 +3019,21 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { scheduledExecutor, name) .whenComplete((ignore2, exception) -> { if (exception != null) { -log.error("[{}] Failed to offload data for the ledgerId {}", +Throwable e = FutureUtil.unwrapCompletionException(exception); +if (e instanceof MetaStoreException) { +// When a MetaStore exception happens, we can not make sure the metadata +// update is failed or not. Because we have a retry on the connection loss, +// it is possible to get a BadVersion or other exception after retrying. +// So we don't clean up the data if it has metadata operation exception. +log.error("[{}] Failed to update offloaded metadata for the ledgerId {}, " ++ "the offloaded data will not be cleaned up", name, ledgerId, exception); +return; +} else { +log.error("[{}] Failed to offload data for the ledgerId {}, " ++ "clean up the offloaded data", +name, ledgerId, exception); +} cleanupOffloaded( ledgerId, uuid, driverName, driverMetadata,
[GitHub] [pulsar] congbobo184 commented on issue #19416: [Bug] Turn off allowAutoTopicCreation, then turn on transactionCoordinatorEnabled, unable to send, consume messages
congbobo184 commented on issue #19416: URL: https://github.com/apache/pulsar/issues/19416#issuecomment-1422085632 do you have the full log? this picture seem can't analyze any problems -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch branch-2.11 updated: [fix][broker] Remove timestamp from broker metrics (#17419)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 62006ba7484 [fix][broker] Remove timestamp from broker metrics (#17419) 62006ba7484 is described below commit 62006ba74842a3d9f10b68e162bfe0c1c4007174 Author: Michael Marshall AuthorDate: Tue Sep 6 20:02:20 2022 -0700 [fix][broker] Remove timestamp from broker metrics (#17419) ### Motivation When a Pulsar topic is unloaded from a broker, certain metrics related to that topic will appear to remain active for the broker for 5 minutes. This is confusing for troubleshooting because it makes the topic appear to be owned by multiple brokers for a short period of time. See below for a way to reproduce this behavior. In order to solve this "zombie" metric problem, I propose we remove the timestamps that get exported with each Prometheus metric served by the broker. ### Analysis Since we introduced Prometheus metrics in #294, we have exported a timestamp along with most metrics. This is an optional, valid part of the spec defined [here](https://prometheus.io/docs/instrumenting/exposition_formats/#comments-help-text-and-type-information). However, after our adoption of Prometheus metrics, the Prometheus project released version 2.0 with a significant improvement to its concept of staleness. In short, before 2.0, a metric that was in the last scrape but not the [...] This presentation https://www.youtube.com/watch?v=GcTzd2CLH7I and slide deck https://promcon.io/2017-munich/slides/staleness-in-prometheus-2-0.pdf document the feature in detail. This blog post was also helpful: https://www.robustperception.io/staleness-and-promql/. Additional motivation comes from mailing list threads like this one https://groups.google.com/g/prometheus-users/c/8OFAwp1OEcY. It says: > Note, however, that adding timestamps is an extremely niche use case. Most of the users who think the need it should actually not do it. > > The main usecases within that tiny niche are federation and mirroring the data from another monitoring system. The Prometheus Go client also indicates a similar motivation: https://pkg.go.dev/github.com/prometheus/client_golang/prometheus#NewMetricWithTimestamp. The OpenMetrics project also recommends against exporting timestamps: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exposing-timestamps. As such, I think we are not a niche use case, and we should not add timestamps to our metrics. ### Reproducing the problem 1. Run any 2.x version of Prometheus (I used 2.31.0) along with the following scrape config: ```yaml - job_name: broker honor_timestamps: true scrape_interval: 30s scrape_timeout: 10s metrics_path: /metrics scheme: http follow_redirects: true static_configs: - targets: ["localhost:8080"] ``` 2. Start pulsar standalone on the same machine. I used a recently compiled version of master. 3. Publish messages to a topic. 4. Observe `pulsar_in_messages_total` metric for the topic in the prometheus UI (localhost:9090) 5. Stop the producer. 6. Unload the topic from the broker. 7. Optionally, `curl` the metrics endpoint to verify that the topic’s `pulsar_in_messages_total` metric is no longer reported. 8. Watch the metrics get reported in prometheus for 5 additional minutes. When you set `honor_timestamps: false`, the metric stops getting reported right after the topic is unloaded, which is the desired behavior. ### Modifications * Remove all timestamps from metrics * Fix affected tests and test files (some of those tests were in the proxy and the function worker, but no code was changed for those modules) ### Verifying this change This change is accompanied by updated tests. ### Does this pull request potentially affect one of the following parts: This is technically a breaking change to the metrics, though I would consider it a bug fix at this point. I will discuss it on the mailing list to ensure it gets proper visibility. Given how frequently Pulsar changes which metrics are exposed between each scrape, I think this is an important fix that should be cherry picked to older release branches. Technically, we can avoid cherry picking this change if we advise users to set `honor_timestamps: false`. However, I think it is better to just remove them. ### Documentation - [x] `doc-not-needed` --- .../stats/prometheus/PrometheusMetricStreams.java | 2 +- .../prometheus/PrometheusMetricsGenerator.java | 3 +-
[GitHub] [pulsar] Technoboy- commented on pull request #15558: [fix][broker][functions-worker] Ensure prometheus metrics are grouped by type (#8407, #13865)
Technoboy- commented on PR #15558: URL: https://github.com/apache/pulsar/pull/15558#issuecomment-1422084899 #17419 relies on this, so cherry-picked to branch-2.11 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch branch-2.11 updated: [fix][broker][functions-worker] Ensure prometheus metrics are grouped by type (#8407, #13865) (#15558)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 9cab1ade94e [fix][broker][functions-worker] Ensure prometheus metrics are grouped by type (#8407, #13865) (#15558) 9cab1ade94e is described below commit 9cab1ade94e15ff91e536c0f961d7d9842534323 Author: Mark Silcox <63227862+marksil...@users.noreply.github.com> AuthorDate: Fri Sep 2 09:42:11 2022 +0100 [fix][broker][functions-worker] Ensure prometheus metrics are grouped by type (#8407, #13865) (#15558) Co-authored-by: Dave Maughan --- .../stats/prometheus/AggregatedNamespaceStats.java | 2 +- .../stats/prometheus/NamespaceStatsAggregator.java | 380 ++-- .../stats/prometheus/PrometheusMetricStreams.java | 75 +++ .../prometheus/PrometheusMetricsGenerator.java | 28 +- .../pulsar/broker/stats/prometheus/TopicStats.java | 646 ++--- .../stats/prometheus/TransactionAggregator.java| 327 +-- .../metrics/PrometheusTextFormatUtil.java | 32 - .../pulsar/broker/stats/PrometheusMetricsTest.java | 59 ++ .../prometheus/PrometheusMetricStreamsTest.java| 85 +++ .../pulsar/common/util/SimpleTextOutputStream.java | 13 +- .../instance/stats/PrometheusTextFormat.java | 5 + .../functions/worker/WorkerStatsManager.java | 5 + 12 files changed, 900 insertions(+), 757 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java index 0d76c44c27a..7e50da57c25 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java @@ -106,7 +106,7 @@ public class AggregatedNamespaceStats { stats.replicationStats.forEach((n, as) -> { AggregatedReplicationStats replStats = -replicationStats.computeIfAbsent(n, k -> new AggregatedReplicationStats()); +replicationStats.computeIfAbsent(n, k -> new AggregatedReplicationStats()); replStats.msgRateIn += as.msgRateIn; replStats.msgRateOut += as.msgRateOut; replStats.msgThroughputIn += as.msgThroughputIn; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index de90bd3aa53..a278fba7b29 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -19,8 +19,11 @@ package org.apache.pulsar.broker.stats.prometheus; import io.netty.util.concurrent.FastThreadLocal; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.LongAdder; +import java.util.function.Function; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -35,7 +38,6 @@ import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl; import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; -import org.apache.pulsar.common.util.SimpleTextOutputStream; import org.apache.pulsar.compaction.CompactedTopicContext; import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.compaction.CompactorMXBean; @@ -43,67 +45,70 @@ import org.apache.pulsar.compaction.CompactorMXBean; @Slf4j public class NamespaceStatsAggregator { -private static FastThreadLocal localNamespaceStats = -new FastThreadLocal() { +private static final FastThreadLocal localNamespaceStats = +new FastThreadLocal<>() { @Override -protected AggregatedNamespaceStats initialValue() throws Exception { +protected AggregatedNamespaceStats initialValue() { return new AggregatedNamespaceStats(); } }; -private static FastThreadLocal localTopicStats = new FastThreadLocal() { +private static final FastThreadLocal localTopicStats = new FastThreadLocal<>() { @Override -protected TopicStats initialValue() throws Exception { +protected TopicStats initialValue() { return new TopicStats(); } }; public static void generate(PulsarService pulsar,
[GitHub] [pulsar] tianshimoyi commented on issue #19416: [Bug] Turn off allowAutoTopicCreation, then turn on transactionCoordinatorEnabled, unable to send, consume messages
tianshimoyi commented on issue #19416: URL: https://github.com/apache/pulsar/issues/19416#issuecomment-1422083274 @congbobo184 ![截屏2023-02-08 下午2 29 47](https://user-images.githubusercontent.com/68532999/217452023-cc01b1a9-6066-4298-b9a8-9068206da0fe.png) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch branch-2.11 updated: [monitor][txn] Add metrics for transaction (#15140)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new afc7f2f5a5a [monitor][txn] Add metrics for transaction (#15140) afc7f2f5a5a is described below commit afc7f2f5a5aec611d68e228fda115c40e40f3302 Author: Tao Jiuming <95597048+tjium...@users.noreply.github.com> AuthorDate: Fri Aug 12 09:19:09 2022 +0800 [monitor][txn] Add metrics for transaction (#15140) --- .../broker/service/persistent/PersistentTopic.java | 4 + .../stats/prometheus/AggregatedNamespaceStats.java | 8 ++ .../stats/prometheus/NamespaceStatsAggregator.java | 6 + .../pulsar/broker/stats/prometheus/TopicStats.java | 15 +++ .../transaction/buffer/TransactionBuffer.java | 8 ++ .../buffer/TransactionBufferClientStats.java | 70 +++ .../buffer/impl/InMemTransactionBuffer.java| 24 .../buffer/impl/TopicTransactionBuffer.java| 21 .../buffer/impl/TransactionBufferClientImpl.java | 55 - .../impl/TransactionBufferClientStatsImpl.java | 130 + .../buffer/impl/TransactionBufferDisable.java | 15 +++ .../pendingack/PendingAckHandleStats.java | 34 ++ .../pendingack/impl/PendingAckHandleImpl.java | 15 ++- .../pendingack/impl/PendingAckHandleStatsImpl.java | 117 +++ .../pulsar/broker/transaction/TransactionTest.java | 41 +++ .../buffer/TransactionBufferClientTest.java| 55 + .../pendingack/PendingAckPersistentTest.java | 77 .../common/policies/data/stats/TopicStatsImpl.java | 7 ++ 18 files changed, 694 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index f8c9871a88a..f8f5d5d744f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1912,6 +1912,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal stats.bytesOutCounter = bytesOutFromRemovedSubscriptions.longValue(); stats.msgOutCounter = msgOutFromRemovedSubscriptions.longValue(); stats.publishRateLimitedTimes = publishRateLimitedTimes; +TransactionBuffer txnBuffer = getTransactionBuffer(); +stats.ongoingTxnCount = txnBuffer.getOngoingTxnCount(); +stats.abortedTxnCount = txnBuffer.getAbortedTxnCount(); +stats.committedTxnCount = txnBuffer.getCommittedTxnCount(); subscriptions.forEach((name, subscription) -> { SubscriptionStatsImpl subStats = diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java index 93631025623..0d76c44c27a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java @@ -43,6 +43,10 @@ public class AggregatedNamespaceStats { public long msgBacklog; public long msgDelayed; +public long ongoingTxnCount; +public long abortedTxnCount; +public long committedTxnCount; + long backlogQuotaLimit; long backlogQuotaLimitTime; @@ -79,6 +83,10 @@ public class AggregatedNamespaceStats { msgOutCounter += stats.msgOutCounter; delayedTrackerMemoryUsage += stats.delayedTrackerMemoryUsage; +this.ongoingTxnCount += stats.ongoingTxnCount; +this.abortedTxnCount += stats.abortedTxnCount; +this.committedTxnCount += stats.committedTxnCount; + managedLedgerStats.storageSize += stats.managedLedgerStats.storageSize; managedLedgerStats.storageLogicalSize += stats.managedLedgerStats.storageLogicalSize; managedLedgerStats.backlogSize += stats.managedLedgerStats.backlogSize; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 7bcd1f150be..de90bd3aa53 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -182,6 +182,9 @@ public class NamespaceStatsAggregator { stats.averageMsgSize = tStatus.averageMsgSize; stats.publishRateLimitedTimes = tStatus.publishRateLimitedTimes;
[GitHub] [pulsar] congbobo184 commented on issue #19416: [Bug] Turn off allowAutoTopicCreation, then turn on transactionCoordinatorEnabled, unable to send, consume messages
congbobo184 commented on issue #19416: URL: https://github.com/apache/pulsar/issues/19416#issuecomment-1422080068 @tianshimoyi do you have the broker log can be shared for me? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch branch-2.11 updated: [fix][broker] fix can not revoke permission after update topic partition (#17393)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 0655fe9a6c7 [fix][broker] fix can not revoke permission after update topic partition (#17393) 0655fe9a6c7 is described below commit 0655fe9a6c72496da591ba838f98b13c45957a8d Author: ken <1647023...@qq.com> AuthorDate: Mon Sep 5 10:59:08 2022 +0800 [fix][broker] fix can not revoke permission after update topic partition (#17393) * add unittest for revoke permission of topic after update topic partition * fix revoke permission of partition Co-authored-by: fanjianye --- .../broker/admin/impl/PersistentTopicsBase.java| 14 +- .../server/ProxyWithJwtAuthorizationTest.java | 22 ++ 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 26b208e2193..7a8b047071a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -344,7 +344,7 @@ public class PersistentTopicsBase extends AdminResource { }); } -private CompletableFuture revokePermissionsAsync(String topicUri, String role) { +private CompletableFuture revokePermissionsAsync(String topicUri, String role, boolean force) { return namespaceResources().getPoliciesAsync(namespaceName).thenCompose( policiesOptional -> { Policies policies = policiesOptional.orElseThrow(() -> @@ -353,8 +353,12 @@ public class PersistentTopicsBase extends AdminResource { || !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) { log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}", clientAppId(), role, topicUri); -return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, -"Permissions are not set at the topic level")); +if (force) { +return CompletableFuture.completedFuture(null); +} else { +return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, +"Permissions are not set at the topic level")); +} } // Write the new policies to metadata store return namespaceResources().setPoliciesAsync(namespaceName, p -> { @@ -380,10 +384,10 @@ public class PersistentTopicsBase extends AdminResource { for (int i = 0; i < numPartitions; i++) { TopicName topicNamePartition = topicName.getPartition(i); future = future.thenComposeAsync(unused -> - revokePermissionsAsync(topicNamePartition.toString(), role)); + revokePermissionsAsync(topicNamePartition.toString(), role, true)); } } -return future.thenComposeAsync(unused -> revokePermissionsAsync(topicName.toString(), role)) +return future.thenComposeAsync(unused -> revokePermissionsAsync(topicName.toString(), role, false)) .thenAccept(unused -> asyncResponse.resume(Response.noContent().build())); })) ).exceptionally(ex -> { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java index 9e5e1891554..206b90a6909 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java @@ -216,6 +216,8 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase { * 2. Update the topic partition number to 4. * 3. Use new producer/consumer with client role to process the topic. * 4. Broker should authorize producer/consumer normally. + * 5. revoke produce/consumer permission of topic + * 6. new producer/consumer should not be authorized * */ @Test @@ -297,6 +299,26 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase { Assert.assertEquals(messageSet,
[pulsar] branch branch-2.11 updated: ManagedLedger: move to FENCED state in case of BadVersionException (#17736)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new d0d142d2346 ManagedLedger: move to FENCED state in case of BadVersionException (#17736) d0d142d2346 is described below commit d0d142d23464c1ec8f42817d754236a3d07a597d Author: Enrico Olivelli AuthorDate: Thu Sep 22 15:50:12 2022 +0200 ManagedLedger: move to FENCED state in case of BadVersionException (#17736) --- .../bookkeeper/mledger/ManagedLedgerException.java | 4 ++ .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 54 ++--- .../mledger/impl/ManagedLedgerErrorsTest.java | 70 ++ .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 48 +++ .../broker/service/persistent/PersistentTopic.java | 42 ++--- .../broker/service/BrokerBkEnsemblesTests.java | 2 - 6 files changed, 189 insertions(+), 31 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java index 347a380d7eb..0dc820ec46d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java @@ -80,6 +80,10 @@ public class ManagedLedgerException extends Exception { super(new Exception("Attempted to use a fenced managed ledger")); } +public ManagedLedgerFencedException(String message) { +super(message); +} + public ManagedLedgerFencedException(Exception e) { super(e); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 230bc98e43b..5530fa4950b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -431,6 +431,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @Override public void operationFailed(MetaStoreException e) { +handleBadVersion(e); if (e instanceof MetadataNotFoundException) { callback.initializeFailed(new ManagedLedgerNotFoundException(e)); } else { @@ -479,6 +480,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @Override public void operationFailed(MetaStoreException e) { +handleBadVersion(e); callback.initializeFailed(new ManagedLedgerException(e)); } }; @@ -1020,6 +1022,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @Override public void operationFailed(MetaStoreException e) { +handleBadVersion(e); callback.deleteCursorFailed(e, ctx); } @@ -1310,6 +1313,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @Override public void operationFailed(MetaStoreException e) { log.error("[{}] Failed to terminate managed ledger: {}", name, e.getMessage()); +handleBadVersion(e); callback.terminateFailed(new ManagedLedgerException(e), ctx); } }); @@ -1394,6 +1398,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { public synchronized void asyncClose(final CloseCallback callback, final Object ctx) { State state = STATE_UPDATER.get(this); if (state == State.Fenced) { +cancelScheduledTasks(); factory.close(this); callback.closeFailed(new ManagedLedgerFencedException(), ctx); return; @@ -1517,6 +1522,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @Override public void operationFailed(MetaStoreException e) { log.warn("[{}] Error updating meta data with the new list of ledgers: {}", name, e.getMessage()); +handleBadVersion(e); mbean.startDataLedgerDeleteOp(); bookKeeper.asyncDeleteLedger(lh.getId(), (rc1, ctx1) -> { mbean.endDataLedgerDeleteOp(); @@ -1525,14 +1531,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { BKException.getMessage(rc1)); } }, null); -
[pulsar] branch branch-2.11 updated: [fix][broker] Fix executeWithRetry result is null (#17694)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 9fcf111a7d3 [fix][broker] Fix executeWithRetry result is null (#17694) 9fcf111a7d3 is described below commit 9fcf111a7d3ce5474a23193bf23be7ac295edfac Author: Cong Zhao AuthorDate: Tue Oct 11 10:14:58 2022 +0800 [fix][broker] Fix executeWithRetry result is null (#17694) --- .../pulsar/metadata/cache/impl/MetadataCacheImpl.java| 4 ++-- .../org/apache/pulsar/metadata/MetadataCacheTest.java| 16 +++- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java index 2cbe9a6dc19..f58530bde31 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java @@ -294,12 +294,12 @@ public class MetadataCacheImpl implements MetadataCache, Consumer executeWithRetry(Supplier> op, String key) { CompletableFuture result = new CompletableFuture<>(); -op.get().thenAccept(r -> result.complete(r)).exceptionally((ex) -> { +op.get().thenAccept(result::complete).exceptionally((ex) -> { if (ex.getCause() instanceof BadVersionException) { // if resource is updated by other than metadata-cache then metadata-cache will get bad-version // exception. so, try to invalidate the cache and try one more time. objCache.synchronous().invalidate(key); -op.get().thenAccept((c) -> result.complete(null)).exceptionally((ex1) -> { +op.get().thenAccept(result::complete).exceptionally((ex1) -> { result.completeExceptionally(ex1.getCause()); return null; }); diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java index be6a03d0eac..43af3ad757e 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java @@ -491,15 +491,21 @@ public class MetadataCacheTest extends BaseMetadataStoreTest { MyClass value1 = new MyClass("a", 1); objCache1.create(key1, value1).join(); -objCache1.get(key1).join(); +assertEquals(objCache1.get(key1).join().get().b, 1); -objCache2.readModifyUpdate(key1, v -> { +CompletableFuture future1 = objCache1.readModifyUpdate(key1, v -> { return new MyClass(v.a, v.b + 1); -}).join(); +}); -objCache1.readModifyUpdate(key1, v -> { +CompletableFuture future2 = objCache2.readModifyUpdate(key1, v -> { return new MyClass(v.a, v.b + 1); -}).join(); +}); + +MyClass myClass1 = future1.join(); +assertEquals(myClass1.b, 2); + +MyClass myClass2 = future2.join(); +assertEquals(myClass2.b, 3); } @Test(dataProvider = "impl")
[pulsar] branch branch-2.11 updated: [fix][metrics]wrong metrics text generated when label_cluster specified (#17704)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 0d536d2d512 [fix][metrics]wrong metrics text generated when label_cluster specified (#17704) 0d536d2d512 is described below commit 0d536d2d512104cf882c361496761a4a94c3e00f Author: fengyubiao AuthorDate: Tue Sep 27 10:43:38 2022 +0800 [fix][metrics]wrong metrics text generated when label_cluster specified (#17704) * [fix][metrics]wrong metrics text generated when label_cluster specified * improve logic branch * mark test group --- .../PrometheusMetricsGeneratorUtils.java | 13 ++- .../PrometheusMetricsGeneratorUtilsTest.java | 102 + .../broker/stats/prometheus/package-info.java | 19 3 files changed, 131 insertions(+), 3 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java index 3960ac43ae3..5651c5e588f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java @@ -26,6 +26,7 @@ import java.io.OutputStream; import java.util.Enumeration; import java.util.List; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.commons.collections4.CollectionUtils; import org.apache.pulsar.common.util.SimpleTextOutputStream; /** @@ -65,16 +66,22 @@ public class PrometheusMetricsGeneratorUtils { for (int i = 0; i < metricFamily.samples.size(); i++) { Collector.MetricFamilySamples.Sample sample = metricFamily.samples.get(i); stream.write(sample.name); +stream.write("{"); if (!sample.labelNames.contains("cluster")) { -stream.write("{cluster=\"").write(cluster).write('"'); +stream.write("cluster=\"").write(cluster).write('"'); +// If label is empty, should not append ','. +if (!CollectionUtils.isEmpty(sample.labelNames)){ +stream.write(","); +} } for (int j = 0; j < sample.labelNames.size(); j++) { String labelValue = sample.labelValues.get(j); if (labelValue != null) { labelValue = labelValue.replace("\"", "\\\""); } - -stream.write(","); +if (j > 0) { +stream.write(","); +} stream.write(sample.labelNames.get(j)); stream.write("=\""); stream.write(labelValue); diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtilsTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtilsTest.java new file mode 100644 index 000..9bbfa5d7714 --- /dev/null +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtilsTest.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus; + +import static org.testng.Assert.*; +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Counter; +import java.io.ByteArrayOutputStream; +import java.util.Collections; +import java.util.UUID; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class PrometheusMetricsGeneratorUtilsTest { + +private static final String LABEL_NAME_CLUSTER = "cluster"; + +@Test +public void testGenerateSystemMetricsWithSpecifyCluster() throws Exception { +String
[GitHub] [pulsar] codecov-commenter commented on pull request #19458: [fix][client] Shade com.fasterxml.jackson.datatype.* to prevent ClassNotFoundException
codecov-commenter commented on PR #19458: URL: https://github.com/apache/pulsar/pull/19458#issuecomment-1422058002 # [Codecov](https://codecov.io/gh/apache/pulsar/pull/19458?src=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) Report > Merging [#19458](https://codecov.io/gh/apache/pulsar/pull/19458?src=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) (c4a7116) into [master](https://codecov.io/gh/apache/pulsar/commit/016e7f0af997788d0514ca64e2c5c1bd9f506863?el=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) (016e7f0) will **increase** coverage by `38.57%`. > The diff coverage is `58.33%`. [![Impacted file tree graph](https://codecov.io/gh/apache/pulsar/pull/19458/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pulsar/pull/19458?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) ```diff @@ Coverage Diff @@ ## master #19458 +/- ## = + Coverage 24.75% 63.33% +38.57% - Complexity 28826094+25806 = Files 1579 1832 +253 Lines121841 134145+12304 Branches 1330414760 +1456 = + Hits 3016484963+54799 + Misses8724041455-45785 - Partials 4437 7727 +3290 ``` | Flag | Coverage Δ | | |---|---|---| | inttests | `24.87% <50.00%> (+0.11%)` | :arrow_up: | | systests | `25.55% <58.33%> (?)` | | | unittests | `60.68% <58.33%> (?)` | | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more. | [Impacted Files](https://codecov.io/gh/apache/pulsar/pull/19458?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation) | Coverage Δ | | |---|---|---| | [...va/org/apache/pulsar/broker/service/ServerCnx.java](https://codecov.io/gh/apache/pulsar/pull/19458?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL1NlcnZlckNueC5qYXZh) | `56.70% <58.33%> (+25.04%)` | :arrow_up: | | [...in/java/org/apache/pulsar/common/api/AuthData.java](https://codecov.io/gh/apache/pulsar/pull/19458?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3B1bHNhci9jb21tb24vYXBpL0F1dGhEYXRhLmphdmE=) | `71.42% <0.00%> (ø)` | | | [.../apache/pulsar/broker/namespace/LookupOptions.java](https://codecov.io/gh/apache/pulsar/pull/19458?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9uYW1lc3BhY2UvTG9va3VwT3B0aW9ucy5qYXZh) | `87.50% <0.00%> (ø)` | | | [.../apache/pulsar/common/naming/SystemTopicNames.java](https://codecov.io/gh/apache/pulsar/pull/19458?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNvbW1vbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbW1vbi9uYW1pbmcvU3lzdGVtVG9waWNOYW1lcy5qYXZh) | `55.55% <0.00%> (ø)` | | | [...apache/pulsar/common/util/SafeCollectionUtils.java](https://codecov.io/gh/apache/pulsar/pull/19458?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNvbW1vbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbW1vbi91dGlsL1NhZmVDb2xsZWN0aW9uVXRpbHMuamF2YQ==) | `0.00% <0.00%> (ø)` | | | [...pache/pulsar/common/configuration/BindAddress.java](https://codecov.io/gh/apache/pulsar/pull/19458?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3B1bHNhci9jb21tb24vY29uZmlndXJhdGlvbi9CaW5kQWRkcmVzcy5qYXZh) | `22.22% <0.00%> (ø)` | | |
[pulsar] branch branch-2.11 updated: [fix][broker] Fix Npe thrown by splitBundle (#17370)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 1cbbe01a6e6 [fix][broker] Fix Npe thrown by splitBundle (#17370) 1cbbe01a6e6 is described below commit 1cbbe01a6e669e0a04b4623c9caa01251b01327d Author: gaozhangmin AuthorDate: Thu Sep 1 20:27:09 2022 +0800 [fix][broker] Fix Npe thrown by splitBundle (#17370) --- .../pulsar/broker/admin/impl/NamespacesBase.java| 13 +++-- .../broker/namespace/NamespaceServiceTest.java | 21 + 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 60d171a9819..c2ce36d49ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1211,6 +1211,10 @@ public abstract class NamespacesBase extends AdminResource { .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) .thenCompose(policies->{ String bundleRange = getBundleRange(bundleName); +if (bundleRange == null) { +throw new RestException(Status.NOT_FOUND, +String.format("Bundle range %s not found", bundleName)); +} return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, authoritative, false) .thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, @@ -1269,13 +1273,18 @@ public abstract class NamespacesBase extends AdminResource { } private String getBundleRange(String bundleName) { +NamespaceBundle nsBundle; if (BundleType.LARGEST.toString().equals(bundleName)) { -return findLargestBundleWithTopics(namespaceName).getBundleRange(); +nsBundle = findLargestBundleWithTopics(namespaceName); } else if (BundleType.HOT.toString().equals(bundleName)) { -return findHotBundle(namespaceName).getBundleRange(); +nsBundle = findHotBundle(namespaceName); } else { return bundleName; } +if (nsBundle == null) { +return null; +} +return nsBundle.getBundleRange(); } private NamespaceBundle findLargestBundleWithTopics(NamespaceName namespaceName) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index c3dde95f59d..2b982d2e059 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -60,6 +60,7 @@ import org.apache.pulsar.broker.lookup.LookupResult; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.PulsarClient; @@ -598,6 +599,26 @@ public class NamespaceServiceTest extends BrokerTestBase { } } +public void testSplitBUndleWithNoBundle() throws Exception { +conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); +restartBroker(); +String namespace = "prop/test/ns-abc2"; + +BundlesData bundleData = BundlesData.builder().numBundles(10).build(); +admin.namespaces().createNamespace(namespace, bundleData); + +NamespaceService namespaceService = pulsar.getNamespaceService(); +NamespaceName nsname = NamespaceName.get(namespace); +NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname); + +try { +admin.namespaces().splitNamespaceBundle(namespace, Policies.BundleType.HOT.toString(), false, null); +fail("should have failed."); +} catch (Exception ex) { +Assert.assertEquals(404, ((PulsarAdminException) ex).getStatusCode()); +Assert.assertEquals("Bundle range HOT not found", ex.getMessage()); +} +} /** * Test bundle split with hot bundle which is serving highest load. *
[pulsar] branch branch-2.11 updated: Fix flakyness of testAutoSchemaFunction by creating the subscription before the function. (#17353)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 949465bd183 Fix flakyness of testAutoSchemaFunction by creating the subscription before the function. (#17353) 949465bd183 is described below commit 949465bd1835bda93d35d01ee4604082c71e5048 Author: Christophe Bornet AuthorDate: Wed Aug 31 10:53:09 2022 +0200 Fix flakyness of testAutoSchemaFunction by creating the subscription before the function. (#17353) Otherwise if the function creates the subscription, there is no schema on the topic and because of a bug a compatibility check fails on the new schema and the new consumer can't be creatd --- .../tests/integration/functions/PulsarFunctionsTest.java | 14 +++--- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 2b837b55f49..908d95784d6 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -809,6 +809,11 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { String outputSchemaType, SubscriptionInitialPosition subscriptionInitialPosition) throws Exception { +if (StringUtils.isNotEmpty(inputTopicName)) { +ensureSubscriptionCreated( +inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema); +} + CommandGenerator generator; log.info("--- INPUT TOPIC: '{}', customSchemaInputs: {}", inputTopicName, customSchemaInputs); if (inputTopicName.endsWith(".*")) { @@ -854,11 +859,6 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd( commands); assertTrue(result.getStdout().contains("Created successfully")); - -if (StringUtils.isNotEmpty(inputTopicName)) { -ensureSubscriptionCreated( -inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema); -} } private void updateFunctionParallelism(String functionName, int parallelism) throws Exception { @@ -1536,6 +1536,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { String logTopicName, String functionName, Schema schema) throws Exception { +ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), schema); + CommandGenerator generator; log.info("--- INPUT TOPIC: '{}'", inputTopicName); if (inputTopicName.endsWith(".*")) { @@ -1556,8 +1558,6 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd( commands); assertTrue(result.getStdout().contains("Created successfully")); - -ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), schema); } private void publishAndConsumeMessages(String inputTopic,
[pulsar] branch branch-2.11 updated: [refactor][java] Improve docs and code quality about KeyValueSchema usages (#17256)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new ecd7d0d5046 [refactor][java] Improve docs and code quality about KeyValueSchema usages (#17256) ecd7d0d5046 is described below commit ecd7d0d50463b530f8dc1c520a8bb81b67cafbf5 Author: Yunze Xu AuthorDate: Wed Nov 16 09:31:36 2022 +0800 [refactor][java] Improve docs and code quality about KeyValueSchema usages (#17256) --- .../java/org/apache/pulsar/client/api/Schema.java | 6 +- .../PulsarClientImplementationBinding.java | 2 - .../PulsarClientImplementationBindingImpl.java | 4 - .../client/impl/TypedMessageBuilderImpl.java | 98 +- 4 files changed, 62 insertions(+), 48 deletions(-) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java index e1cbb629757..74c06dae025 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -384,10 +384,12 @@ public interface Schema extends Cloneable { } /** - * Key Value Schema using passed in key and value schemas. + * Key Value Schema using passed in key and value schemas with {@link KeyValueEncodingType#INLINE} encoding type. + * + * @see Schema#KeyValue(Schema, Schema, KeyValueEncodingType) */ static Schema> KeyValue(Schema key, Schema value) { -return DefaultImplementation.getDefaultImplementation().newKeyValueSchema(key, value); +return KeyValue(key, value, KeyValueEncodingType.INLINE); } /** diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java index da85d3645be..e7c00ac2128 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java @@ -131,8 +131,6 @@ public interface PulsarClientImplementationBinding { Schema> newKeyValueBytesSchema(); - Schema> newKeyValueSchema(Schema keySchema, Schema valueSchema); - Schema> newKeyValueSchema(Schema keySchema, Schema valueSchema, KeyValueEncodingType keyValueEncodingType); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java index eb555112c28..30c4284d43c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java @@ -242,10 +242,6 @@ public final class PulsarClientImplementationBindingImpl implements PulsarClient return KeyValueSchemaImpl.kvBytes(); } -public Schema> newKeyValueSchema(Schema keySchema, Schema valueSchema) { -return KeyValueSchemaImpl.of(keySchema, valueSchema); -} - public Schema> newKeyValueSchema(Schema keySchema, Schema valueSchema, KeyValueEncodingType keyValueEncodingType) { return KeyValueSchemaImpl.of(keySchema, valueSchema, keyValueEncodingType); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java index 32091439691..f41ff4bc778 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java @@ -25,6 +25,7 @@ import java.util.Base64; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.Message; @@ -32,7 +33,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TypedMessageBuilder; -import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; +import org.apache.pulsar.client.api.schema.KeyValueSchema; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.api.proto.MessageMetadata; import
[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #19390: [fix][authentication] Store the original auth when using anonymous role
michaeljmarshall commented on code in PR #19390: URL: https://github.com/apache/pulsar/pull/19390#discussion_r1099690410 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java: ## @@ -971,6 +973,7 @@ protected void handleConnect(CommandConnect connect) { authRole = getBrokerService().getAuthenticationService().getAnonymousUserRole() Review Comment: > We have a case in which we don't want to check the proxy authentication in the broker, only check the original authentication. Would you have any idea? That context helps a lot, thanks! Is it possible to add a configuration to the proxy so that it forwards the original authentication data as the `authData` part of the `Connect` command? The broker wouldn't necessarily know the connection is from the proxy, but I think this should be fine. I think this discussion probably relates to https://github.com/apache/pulsar/issues/19332. I have been thinking that our authentication state integration between proxy and broker is very complicated and doesn't appear to be working in all cases (see also https://github.com/apache/pulsar/issues/19291). It seems like we could always drop the proxy auth data in cases where the authentication data can be forwarded. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] michaeljmarshall commented on pull request #19455: [improve][broker] Require authRole is proxyRole to set originalPrincipal
michaeljmarshall commented on PR #19455: URL: https://github.com/apache/pulsar/pull/19455#issuecomment-1422037977 I added a commit to expand the scope of this PR so that both admin and binary endpoints have the same requirements. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] michaeljmarshall commented on pull request #19455: [improve][broker] Require authRole is proxyRole to set originalPrincipal
michaeljmarshall commented on PR #19455: URL: https://github.com/apache/pulsar/pull/19455#issuecomment-1422037686 > I think we should add a paramter in the config file, and when this paramter is enabled, we perform the strict checks. My primary concern with making this requirement configurable is that it is prone to error. For me, the justification comes here: https://github.com/apache/pulsar/blob/d7c4e373ac8cb60f234c9c231e5dce5bf7c9b50e/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java#L344-L353 A proxy that forwards an admin call without being configured as a `proxyRole` will only be authorized based on the role supplied by the proxy. Since these `proxyRoles` are often also `superUsers`, this is extremely problematic and easy to misconfigure, especially because everything will "work" when the proxy's auth role is a super user. However, it will work because the proxy is over provisioned and the misconfiguration could lead to elevated permissions by the client. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] nodece commented on pull request #19455: [improve][broker] Require authRole is proxyRole to set originalPrincipal
nodece commented on PR #19455: URL: https://github.com/apache/pulsar/pull/19455#issuecomment-1422000748 Good work! > This is technically a breaking change in that upgrading existing proxies will not work if the `proxyRoles` is not correctly configured in the `broker.conf`. I think we should add a paramter in the config file, and when this paramter is enabled, we perform the strict checks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19440: [improve][broker] Implemented ExtensibleLoadManagerWrapper.getLoadBalancingMetrics()
heesung-sn commented on code in PR #19440: URL: https://github.com/apache/pulsar/pull/19440#discussion_r1099646027 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java: ## @@ -187,4 +191,35 @@ public String toString(ServiceConfiguration conf) { ); } +public List toMetrics(String advertisedBrokerAddress) { +var metrics = new ArrayList(); +var dimensions = new HashMap(); +dimensions.put("metric", "loadBalancing"); +dimensions.put("broker", advertisedBrokerAddress); Review Comment: Yes, I believe those extra k8s labels can be automatically added later when pushing to Prometheus. Pulsar brokers do not add them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19440: [improve][broker] Implemented ExtensibleLoadManagerWrapper.getLoadBalancingMetrics()
heesung-sn commented on code in PR #19440: URL: https://github.com/apache/pulsar/pull/19440#discussion_r1099646027 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java: ## @@ -187,4 +191,35 @@ public String toString(ServiceConfiguration conf) { ); } +public List toMetrics(String advertisedBrokerAddress) { +var metrics = new ArrayList(); +var dimensions = new HashMap(); +dimensions.put("metric", "loadBalancing"); +dimensions.put("broker", advertisedBrokerAddress); Review Comment: Yes, I believe those extra k8s labels can be automatically added later when pushing to Prometheus. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] nodece commented on a diff in pull request #19390: [fix][authentication] Store the original auth when using anonymous role
nodece commented on code in PR #19390: URL: https://github.com/apache/pulsar/pull/19390#discussion_r1099643951 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java: ## @@ -971,6 +973,7 @@ protected void handleConnect(CommandConnect connect) { authRole = getBrokerService().getAuthenticationService().getAnonymousUserRole() Review Comment: Your idea is very true, we also recommend enabling authentication anywhere. We have a case in which we don't want to check the proxy authentication in the broker, only check the original authentication. Would you have any idea? Right now, we set up `anonymousUserRole`, but we cannot get the original authentication, because of the broker jumped to `completeConnect` method, there is misses storing the original authentication. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] massakam opened a new pull request, #19458: [fix][client] Shade com.fasterxml.jackson.datatype.* to prevent ClassNotFoundException
massakam opened a new pull request, #19458: URL: https://github.com/apache/pulsar/pull/19458 ### Motivation I tried to use the latest Java client with the authentication plugin, but it threw the following exception: ``` Exception in thread "main" org.apache.pulsar.client.api.PulsarClientException$UnsupportedAuthenticationException: java.lang.NoClassDefFoundError: org/apache/pulsar/shade/com/fasterxml/jackson/datatype/jdk8/Jdk8Module at org.apache.pulsar.client.api.AuthenticationFactory.create(AuthenticationFactory.java:110) at org.apache.pulsar.client.impl.ClientBuilderImpl.authentication(ClientBuilderImpl.java:138) at SampleConsumer.main(SampleConsumer.java:42) Caused by: java.lang.NoClassDefFoundError: org/apache/pulsar/shade/com/fasterxml/jackson/datatype/jdk8/Jdk8Module at org.apache.pulsar.client.impl.AuthenticationUtil.(AuthenticationUtil.java:35) at org.apache.pulsar.client.impl.PulsarClientImplementationBindingImpl.createAuthentication(PulsarClientImplementationBindingImpl.java:132) at org.apache.pulsar.client.api.AuthenticationFactory.create(AuthenticationFactory.java:108) ... 2 more Caused by: java.lang.ClassNotFoundException: org.apache.pulsar.shade.com.fasterxml.jackson.datatype.jdk8.Jdk8Module at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520) ... 5 more ``` This is because the `com.fasterxml.jackson.datatype.*` classes are not shaded in `pulsar-client`. ### Modifications Fixed shade settings to use wildcards to specify Jackson classes to include. This also shades the `com.fasterxml.jackson.datatype.*` classes. ### Verifying this change - [ ] Make sure that the change passes the CI checks. ### Documentation - [ ] `doc` - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch master updated: [improve][client] AuthenticationAthenz supports Copper Argos (#19445)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new d7c4e373ac8 [improve][client] AuthenticationAthenz supports Copper Argos (#19445) d7c4e373ac8 is described below commit d7c4e373ac8cb60f234c9c231e5dce5bf7c9b50e Author: Masahiro Sakamoto AuthorDate: Wed Feb 8 13:15:40 2023 +0900 [improve][client] AuthenticationAthenz supports Copper Argos (#19445) --- .../client/impl/auth/AuthenticationAthenz.java | 102 - .../client/impl/auth/AuthenticationAthenzTest.java | 70 +- .../src/test/resources/copper_argos_ca.crt | 20 .../src/test/resources/copper_argos_ca.key | 27 ++ .../src/test/resources/copper_argos_client.crt | 24 + .../src/test/resources/copper_argos_client.key | 27 ++ 6 files changed, 243 insertions(+), 27 deletions(-) diff --git a/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java b/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java index ec1dcf602ad..84d81c5d943 100644 --- a/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java +++ b/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java @@ -18,9 +18,13 @@ */ package org.apache.pulsar.client.impl.auth; +import static com.google.common.base.Preconditions.checkArgument; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; import com.google.common.io.CharStreams; +import com.oath.auth.KeyRefresher; +import com.oath.auth.KeyRefresherException; +import com.oath.auth.Utils; import com.yahoo.athenz.auth.ServiceIdentityProvider; import com.yahoo.athenz.auth.impl.SimpleServiceIdentityProvider; import com.yahoo.athenz.auth.util.Crypto; @@ -33,12 +37,15 @@ import java.io.InputStreamReader; import java.net.URISyntaxException; import java.net.URLConnection; import java.nio.charset.Charset; +import java.nio.file.Path; +import java.nio.file.Paths; import java.security.PrivateKey; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.net.ssl.SSLContext; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport; @@ -53,13 +60,17 @@ public class AuthenticationAthenz implements Authentication, EncodedAuthenticati private static final String APPLICATION_X_PEM_FILE = "application/x-pem-file"; +private transient KeyRefresher keyRefresher = null; private transient ZTSClient ztsClient = null; -private String ztsUrl; +private String ztsUrl = null; private String tenantDomain; private String tenantService; private String providerDomain; -private PrivateKey privateKey; +private PrivateKey privateKey = null; private String keyId = "0"; +private String privateKeyPath = null; +private String x509CertChainPath = null; +private String caCertPath = null; private String roleHeader = null; // If auto prefetching is enabled, application will not complete until the static method // ZTSClient.cancelPrefetch() is called. @@ -70,7 +81,8 @@ public class AuthenticationAthenz implements Authentication, EncodedAuthenticati // athenz will only give this token if it's at least valid for 2hrs private static final int minValidity = 2 * 60 * 60; private static final int maxValidity = 24 * 60 * 60; // token has upto 24 hours validity -private static final int cacheDurationInHour = 1; // we will cache role token for an hour then ask athenz lib again +private static final int cacheDurationInMinutes = 90; // role token is cached for 90 minutes +private static final int retryFrequencyInMillis = 60 * 60 * 1000; // key refresher scans files every hour private final ReadWriteLock cachedRoleTokenLock = new ReentrantReadWriteLock(); @@ -116,17 +128,14 @@ public class AuthenticationAthenz implements Authentication, EncodedAuthenticati if (roleToken == null) { return false; } -// Ensure we refresh the Athenz role token every hour to avoid using an expired +// Ensure we refresh the Athenz role token every 90 minutes to avoid using an expired // role token -return (System.nanoTime() - cachedRoleTokenTimestamp) < TimeUnit.HOURS.toNanos(cacheDurationInHour); +return (System.nanoTime() - cachedRoleTokenTimestamp) <
[GitHub] [pulsar] merlimat merged pull request #19445: [improve][client] AuthenticationAthenz supports Copper Argos
merlimat merged PR #19445: URL: https://github.com/apache/pulsar/pull/19445 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18273: [feat][txn] implement the SnapshotSegmentAbortedTxnProcessor
poorbarcode commented on code in PR #18273: URL: https://github.com/apache/pulsar/pull/18273#discussion_r1099632191 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java: ## @@ -0,0 +1,758 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.buffer.impl; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl; +import org.apache.commons.collections4.map.LinkedMap; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.systopic.SystemTopicClient; +import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.util.FutureUtil; + +@Slf4j +public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcessor { + +/** + * Stored the unsealed aborted transaction IDs Whose size is always less than the snapshotSegmentCapacity. + * It will be persistent as a snapshot segment when its size reach the configured capacity. + */ +private LinkedList unsealedTxnIds; + +/** + * The map is used to clear the aborted transaction IDs persistent in the expired ledger. + * + * The key PositionImpl {@link PositionImpl} is the persistent position of + * the latest transaction of a segment. + * The value TxnID {@link TxnID} is the latest Transaction ID in a segment. + * + * + * + * If the position is expired, the processor can get the according latest + * transaction ID in this map. And then the processor can clear all the + * transaction IDs in the aborts {@link SnapshotSegmentAbortedTxnProcessorImpl#aborts} + * that lower than the transaction ID. + * And then the processor can delete the segments persistently according to + * the positions. + * + */ +private final LinkedMap segmentIndex = new LinkedMap<>(); + +/** + * This map is used to check whether a transaction is an aborted transaction. + * + * The transaction IDs is appended in order, so the processor can delete expired + * transaction IDs according to the latest expired
[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #19390: [fix][authentication] Store the original auth when using anonymous role
michaeljmarshall commented on code in PR #19390: URL: https://github.com/apache/pulsar/pull/19390#discussion_r1099631924 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java: ## @@ -971,6 +973,7 @@ protected void handleConnect(CommandConnect connect) { authRole = getBrokerService().getAuthenticationService().getAnonymousUserRole() Review Comment: > This means that the proxy cannot use the anonymous role to connect to the broker Yes, this is the design I am proposing we move towards. That is why I said this earlier in this PR: > I propose that we consider a proxy connecting as the anonymous role as a misconfiguration. > Usually, the proxy always enable the authentication feature, but we also should consider a case that the proxy doesn't enable the authentication feature. In this case, the broker wouldn't enable authentication either. In the admin API, we only look for the original principal when the `authRole` is in the `proxyRoles` set. Given that the proxy has to have at least as much permission as the `originalPrincipal`, I do not see what the point of authentication would be if the anonymous role is also a proxy role. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] gaoran10 commented on a diff in pull request #19440: [improve][broker] Implemented ExtensibleLoadManagerWrapper.getLoadBalancingMetrics()
gaoran10 commented on code in PR #19440: URL: https://github.com/apache/pulsar/pull/19440#discussion_r1099629495 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java: ## @@ -187,4 +191,35 @@ public String toString(ServiceConfiguration conf) { ); } +public List toMetrics(String advertisedBrokerAddress) { +var metrics = new ArrayList(); +var dimensions = new HashMap(); +dimensions.put("metric", "loadBalancing"); +dimensions.put("broker", advertisedBrokerAddress); Review Comment: Sorry, it seems that we can add some extra labels for the metric when using Prometheus to collect metrics, such as Kubernetes pod name, etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] gaoran10 commented on a diff in pull request #19440: [improve][broker] Implemented ExtensibleLoadManagerWrapper.getLoadBalancingMetrics()
gaoran10 commented on code in PR #19440: URL: https://github.com/apache/pulsar/pull/19440#discussion_r1099629495 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java: ## @@ -187,4 +191,35 @@ public String toString(ServiceConfiguration conf) { ); } +public List toMetrics(String advertisedBrokerAddress) { +var metrics = new ArrayList(); +var dimensions = new HashMap(); +dimensions.put("metric", "loadBalancing"); +dimensions.put("broker", advertisedBrokerAddress); Review Comment: Sorry, I mean when we use Prometheus to collect metrics we can add some extra labels for the metric, such as Kubernetes pod name, etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] gaoran10 commented on a diff in pull request #19440: [improve][broker] Implemented ExtensibleLoadManagerWrapper.getLoadBalancingMetrics()
gaoran10 commented on code in PR #19440: URL: https://github.com/apache/pulsar/pull/19440#discussion_r1099628678 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitCounter.java: ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.models; + +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Skip; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Balanced; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Bandwidth; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.MsgRate; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Sessions; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Topics; +import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.mutable.MutableLong; +import org.apache.pulsar.common.stats.Metrics; + +/** + * Defines the information required for a service unit split(e.g. bundle split). + */ +public class SplitCounter { + +long splitCount = 0; + +final Map> breakdownCounters; + +public SplitCounter() { +breakdownCounters = Map.of( +Success, Map.of( +Topics, new MutableLong(), Review Comment: Ok, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #18273: [feat][txn] implement the SnapshotSegmentAbortedTxnProcessor
liangyepianzhou commented on code in PR #18273: URL: https://github.com/apache/pulsar/pull/18273#discussion_r1099622315 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java: ## @@ -0,0 +1,758 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.buffer.impl; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl; +import org.apache.commons.collections4.map.LinkedMap; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.systopic.SystemTopicClient; +import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.util.FutureUtil; + +@Slf4j +public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcessor { + +/** + * Stored the unsealed aborted transaction IDs Whose size is always less than the snapshotSegmentCapacity. + * It will be persistent as a snapshot segment when its size reach the configured capacity. + */ +private LinkedList unsealedTxnIds; + +/** + * The map is used to clear the aborted transaction IDs persistent in the expired ledger. + * + * The key PositionImpl {@link PositionImpl} is the persistent position of + * the latest transaction of a segment. + * The value TxnID {@link TxnID} is the latest Transaction ID in a segment. + * + * + * + * If the position is expired, the processor can get the according latest + * transaction ID in this map. And then the processor can clear all the + * transaction IDs in the aborts {@link SnapshotSegmentAbortedTxnProcessorImpl#aborts} + * that lower than the transaction ID. + * And then the processor can delete the segments persistently according to + * the positions. + * + */ +private final LinkedMap segmentIndex = new LinkedMap<>(); + +/** + * This map is used to check whether a transaction is an aborted transaction. + * + * The transaction IDs is appended in order, so the processor can delete expired + * transaction IDs according to the latest
[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #18273: [feat][txn] implement the SnapshotSegmentAbortedTxnProcessor
liangyepianzhou commented on code in PR #18273: URL: https://github.com/apache/pulsar/pull/18273#discussion_r1099621420 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java: ## @@ -0,0 +1,758 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.buffer.impl; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl; +import org.apache.commons.collections4.map.LinkedMap; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.systopic.SystemTopicClient; +import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.util.FutureUtil; + +@Slf4j +public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcessor { + +/** + * Stored the unsealed aborted transaction IDs Whose size is always less than the snapshotSegmentCapacity. + * It will be persistent as a snapshot segment when its size reach the configured capacity. + */ +private LinkedList unsealedTxnIds; + +/** + * The map is used to clear the aborted transaction IDs persistent in the expired ledger. + * + * The key PositionImpl {@link PositionImpl} is the persistent position of + * the latest transaction of a segment. + * The value TxnID {@link TxnID} is the latest Transaction ID in a segment. + * + * + * + * If the position is expired, the processor can get the according latest + * transaction ID in this map. And then the processor can clear all the + * transaction IDs in the aborts {@link SnapshotSegmentAbortedTxnProcessorImpl#aborts} + * that lower than the transaction ID. + * And then the processor can delete the segments persistently according to + * the positions. + * + */ +private final LinkedMap segmentIndex = new LinkedMap<>(); + +/** + * This map is used to check whether a transaction is an aborted transaction. + * + * The transaction IDs is appended in order, so the processor can delete expired + * transaction IDs according to the latest
[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #18273: [feat][txn] implement the SnapshotSegmentAbortedTxnProcessor
liangyepianzhou commented on code in PR #18273: URL: https://github.com/apache/pulsar/pull/18273#discussion_r1099620734 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java: ## @@ -0,0 +1,758 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.buffer.impl; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl; +import org.apache.commons.collections4.map.LinkedMap; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.systopic.SystemTopicClient; +import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.util.FutureUtil; + +@Slf4j +public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcessor { + +/** + * Stored the unsealed aborted transaction IDs Whose size is always less than the snapshotSegmentCapacity. + * It will be persistent as a snapshot segment when its size reach the configured capacity. + */ +private LinkedList unsealedTxnIds; + +/** + * The map is used to clear the aborted transaction IDs persistent in the expired ledger. + * + * The key PositionImpl {@link PositionImpl} is the persistent position of + * the latest transaction of a segment. + * The value TxnID {@link TxnID} is the latest Transaction ID in a segment. + * + * + * + * If the position is expired, the processor can get the according latest + * transaction ID in this map. And then the processor can clear all the + * transaction IDs in the aborts {@link SnapshotSegmentAbortedTxnProcessorImpl#aborts} + * that lower than the transaction ID. + * And then the processor can delete the segments persistently according to + * the positions. + * + */ +private final LinkedMap segmentIndex = new LinkedMap<>(); + +/** + * This map is used to check whether a transaction is an aborted transaction. + * + * The transaction IDs is appended in order, so the processor can delete expired + * transaction IDs according to the latest
[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #18273: [feat][txn] implement the SnapshotSegmentAbortedTxnProcessor
liangyepianzhou commented on code in PR #18273: URL: https://github.com/apache/pulsar/pull/18273#discussion_r1099618974 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java: ## @@ -0,0 +1,758 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.buffer.impl; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl; +import org.apache.commons.collections4.map.LinkedMap; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.systopic.SystemTopicClient; +import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.util.FutureUtil; + +@Slf4j +public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcessor { + +/** + * Stored the unsealed aborted transaction IDs Whose size is always less than the snapshotSegmentCapacity. + * It will be persistent as a snapshot segment when its size reach the configured capacity. + */ +private LinkedList unsealedTxnIds; + +/** + * The map is used to clear the aborted transaction IDs persistent in the expired ledger. + * + * The key PositionImpl {@link PositionImpl} is the persistent position of + * the latest transaction of a segment. + * The value TxnID {@link TxnID} is the latest Transaction ID in a segment. + * + * + * + * If the position is expired, the processor can get the according latest + * transaction ID in this map. And then the processor can clear all the + * transaction IDs in the aborts {@link SnapshotSegmentAbortedTxnProcessorImpl#aborts} + * that lower than the transaction ID. + * And then the processor can delete the segments persistently according to + * the positions. + * + */ +private final LinkedMap segmentIndex = new LinkedMap<>(); + +/** + * This map is used to check whether a transaction is an aborted transaction. + * + * The transaction IDs is appended in order, so the processor can delete expired + * transaction IDs according to the latest
[GitHub] [pulsar] nodece commented on a diff in pull request #19390: [fix][authentication] Store the original auth when using anonymous role
nodece commented on code in PR #19390: URL: https://github.com/apache/pulsar/pull/19390#discussion_r1099617354 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java: ## @@ -971,6 +973,7 @@ protected void handleConnect(CommandConnect connect) { authRole = getBrokerService().getAuthenticationService().getAnonymousUserRole() .orElseThrow(() -> new AuthenticationException("No anonymous role, and no authentication provider configured")); +checkAndStoreOriginalAuthDataForwardedByProxy(connect); Review Comment: Right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] nodece commented on a diff in pull request #19390: [fix][authentication] Store the original auth when using anonymous role
nodece commented on code in PR #19390: URL: https://github.com/apache/pulsar/pull/19390#discussion_r1099617354 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java: ## @@ -971,6 +973,7 @@ protected void handleConnect(CommandConnect connect) { authRole = getBrokerService().getAuthenticationService().getAnonymousUserRole() .orElseThrow(() -> new AuthenticationException("No anonymous role, and no authentication provider configured")); +checkAndStoreOriginalAuthDataForwardedByProxy(connect); Review Comment: All right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] nodece commented on a diff in pull request #19390: [fix][authentication] Store the original auth when using anonymous role
nodece commented on code in PR #19390: URL: https://github.com/apache/pulsar/pull/19390#discussion_r1099616630 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java: ## @@ -971,6 +973,7 @@ protected void handleConnect(CommandConnect connect) { authRole = getBrokerService().getAuthenticationService().getAnonymousUserRole() Review Comment: This means that the proxy cannot use the anonymous role to connect to the broker, I don't think this is correct. Usually, the proxy always enable the authentication feature, but we also should consider a case that the proxy doesn't enable the authentication feature. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #18273: [feat][txn] implement the SnapshotSegmentAbortedTxnProcessor
liangyepianzhou commented on code in PR #18273: URL: https://github.com/apache/pulsar/pull/18273#discussion_r1099615573 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java: ## @@ -0,0 +1,758 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.buffer.impl; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl; +import org.apache.commons.collections4.map.LinkedMap; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.systopic.SystemTopicClient; +import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.util.FutureUtil; + +@Slf4j +public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcessor { + +/** + * Stored the unsealed aborted transaction IDs Whose size is always less than the snapshotSegmentCapacity. + * It will be persistent as a snapshot segment when its size reach the configured capacity. + */ +private LinkedList unsealedTxnIds; + +/** + * The map is used to clear the aborted transaction IDs persistent in the expired ledger. + * + * The key PositionImpl {@link PositionImpl} is the persistent position of + * the latest transaction of a segment. + * The value TxnID {@link TxnID} is the latest Transaction ID in a segment. + * + * + * + * If the position is expired, the processor can get the according latest + * transaction ID in this map. And then the processor can clear all the + * transaction IDs in the aborts {@link SnapshotSegmentAbortedTxnProcessorImpl#aborts} + * that lower than the transaction ID. + * And then the processor can delete the segments persistently according to + * the positions. + * + */ +private final LinkedMap segmentIndex = new LinkedMap<>(); + +/** + * This map is used to check whether a transaction is an aborted transaction. + * + * The transaction IDs is appended in order, so the processor can delete expired + * transaction IDs according to the latest
[GitHub] [pulsar] poorbarcode commented on pull request #19444: [fix] [ml] topic load fail by ledger lost
poorbarcode commented on PR #19444: URL: https://github.com/apache/pulsar/pull/19444#issuecomment-1421941307 /pulsarbot rerun-failure-checks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] tianshimoyi commented on issue #19416: [Bug] Turn off allowAutoTopicCreation, then turn on transactionCoordinatorEnabled, unable to send, consume messages
tianshimoyi commented on issue #19416: URL: https://github.com/apache/pulsar/issues/19416#issuecomment-1421934706 @congbobo184 I set up a new pulsar cluster. The two clusters share a zk cluster. There is no problem with the new pulsar cluster. The configuration is the same. What causes the above problems? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #18273: [feat][txn] implement the SnapshotSegmentAbortedTxnProcessor
liangyepianzhou commented on code in PR #18273: URL: https://github.com/apache/pulsar/pull/18273#discussion_r1099611429 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java: ## @@ -0,0 +1,758 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.buffer.impl; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl; +import org.apache.commons.collections4.map.LinkedMap; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.systopic.SystemTopicClient; +import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.util.FutureUtil; + +@Slf4j +public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcessor { + +/** + * Stored the unsealed aborted transaction IDs Whose size is always less than the snapshotSegmentCapacity. + * It will be persistent as a snapshot segment when its size reach the configured capacity. + */ +private LinkedList unsealedTxnIds; + +/** + * The map is used to clear the aborted transaction IDs persistent in the expired ledger. + * + * The key PositionImpl {@link PositionImpl} is the persistent position of + * the latest transaction of a segment. + * The value TxnID {@link TxnID} is the latest Transaction ID in a segment. + * + * + * + * If the position is expired, the processor can get the according latest + * transaction ID in this map. And then the processor can clear all the + * transaction IDs in the aborts {@link SnapshotSegmentAbortedTxnProcessorImpl#aborts} + * that lower than the transaction ID. + * And then the processor can delete the segments persistently according to + * the positions. + * + */ +private final LinkedMap segmentIndex = new LinkedMap<>(); + +/** + * This map is used to check whether a transaction is an aborted transaction. + * + * The transaction IDs is appended in order, so the processor can delete expired + * transaction IDs according to the latest
[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #18273: [feat][txn] implement the SnapshotSegmentAbortedTxnProcessor
liangyepianzhou commented on code in PR #18273: URL: https://github.com/apache/pulsar/pull/18273#discussion_r1099609851 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java: ## @@ -0,0 +1,758 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.buffer.impl; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl; +import org.apache.commons.collections4.map.LinkedMap; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.systopic.SystemTopicClient; +import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment; +import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.util.FutureUtil; + +@Slf4j +public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcessor { + +/** + * Stored the unsealed aborted transaction IDs Whose size is always less than the snapshotSegmentCapacity. + * It will be persistent as a snapshot segment when its size reach the configured capacity. + */ +private LinkedList unsealedTxnIds; + +/** + * The map is used to clear the aborted transaction IDs persistent in the expired ledger. + * + * The key PositionImpl {@link PositionImpl} is the persistent position of + * the latest transaction of a segment. + * The value TxnID {@link TxnID} is the latest Transaction ID in a segment. + * + * + * + * If the position is expired, the processor can get the according latest + * transaction ID in this map. And then the processor can clear all the + * transaction IDs in the aborts {@link SnapshotSegmentAbortedTxnProcessorImpl#aborts} + * that lower than the transaction ID. + * And then the processor can delete the segments persistently according to + * the positions. + * + */ +private final LinkedMap segmentIndex = new LinkedMap<>(); + +/** + * This map is used to check whether a transaction is an aborted transaction. + * + * The transaction IDs is appended in order, so the processor can delete expired + * transaction IDs according to the latest