(pulsar) branch master updated: [fix][test] Flaky-test: testMessageExpiryWithTimestampNonRecoverableException and testIncorrectClientClock (#22489)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new d9a43dd2160 [fix][test] Flaky-test: testMessageExpiryWithTimestampNonRecoverableException and testIncorrectClientClock (#22489) d9a43dd2160 is described below commit d9a43dd21605930e16bb038095e36fceff3a4a40 Author: Baodi Shi AuthorDate: Mon Apr 15 13:55:34 2024 +0800 [fix][test] Flaky-test: testMessageExpiryWithTimestampNonRecoverableException and testIncorrectClientClock (#22489) --- .../service/PersistentMessageFinderTest.java | 42 +++--- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 6965ac28068..0972c9098b5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.service; -import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; @@ -383,7 +382,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { * * @throws Exception */ -@Test(groups = "flaky") +@Test void testMessageExpiryWithTimestampNonRecoverableException() throws Exception { final String ledgerAndCursorName = "testPersistentMessageExpiryWithNonRecoverableLedgers"; @@ -402,11 +401,15 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { for (int i = 0; i < totalEntries; i++) { ledger.addEntry(createMessageWrittenToLedger("msg" + i)); } +Awaitility.await().untilAsserted(() -> +assertEquals(ledger.getState(), ManagedLedgerImpl.State.LedgerOpened)); List ledgers = ledger.getLedgersInfoAsList(); LedgerInfo lastLedgerInfo = ledgers.get(ledgers.size() - 1); - -assertEquals(ledgers.size(), totalEntries / entriesPerLedger); +// The `lastLedgerInfo` should be newly opened, and it does not contain any entries. +// Please refer to: https://github.com/apache/pulsar/pull/22034 +assertEquals(lastLedgerInfo.getEntries(), 0); +assertEquals(ledgers.size(), totalEntries / entriesPerLedger + 1); // this will make sure that all entries should be deleted Thread.sleep(TimeUnit.SECONDS.toMillis(ttlSeconds)); @@ -420,19 +423,13 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST); PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); -Position previousMarkDelete = null; -for (int i = 0; i < totalEntries; i++) { -monitor.expireMessages(1); -Position previousPos = previousMarkDelete; -retryStrategically( -(test) -> c1.getMarkDeletedPosition() != null && !c1.getMarkDeletedPosition().equals(previousPos), -5, 100); -previousMarkDelete = c1.getMarkDeletedPosition(); -} - -PositionImpl markDeletePosition = (PositionImpl) c1.getMarkDeletedPosition(); -assertEquals(lastLedgerInfo.getLedgerId(), markDeletePosition.getLedgerId()); -assertEquals(lastLedgerInfo.getEntries() - 1, markDeletePosition.getEntryId()); +assertTrue(monitor.expireMessages(ttlSeconds)); +Awaitility.await().untilAsserted(() -> { +PositionImpl markDeletePosition = (PositionImpl) c1.getMarkDeletedPosition(); +// The markDeletePosition points to the last entry of the previous ledger in lastLedgerInfo. +assertEquals(markDeletePosition.getLedgerId(), lastLedgerInfo.getLedgerId() - 1); +assertEquals(markDeletePosition.getEntryId(), entriesPerLedger - 1); +}); c1.close(); ledger.close(); @@ -440,20 +437,25 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { } -@Test(groups = "flaky") +@Test public void testIncorrectClientClock() throws Exception { final String ledgerAndCursorName = "testIncorrectClientClock"; int maxTTLSeconds = 1; +int entriesNum = 10; ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setMaxEntriesPerLedger(1); ManagedLedgerImpl ledger = (ManagedLedgerImpl)
Re: [PR] [fix][test] Flaky-test: testMessageExpiryWithTimestampNonRecoverableException and testIncorrectClientClock [pulsar]
Technoboy- merged PR #22489: URL: https://github.com/apache/pulsar/pull/22489 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix][broker] Fix the rate limit policy loading for replicator after broker restart [pulsar]
nodece opened a new pull request, #22504: URL: https://github.com/apache/pulsar/pull/22504 ### Motivation Initializing a topic, if a replicator's cursor exists, the topic will create the replicator, and then load the namespace and topic policy, when the policies are loaded, we should apply that to the replicator. The namespace policies will not be applied to the replicator after the broker restart when topic policies are empty, which causes an excess of the total traffic. ### Modifications - Call `org.apache.pulsar.broker.service.persistent.PersistentTopic#onPoliciesUpdate` after the namespace policy is loaded ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [ml] Add entry fail due to race condition about add entry failed/timeout and switch ledger [pulsar]
lhotari commented on code in PR #1: URL: https://github.com/apache/pulsar/pull/1#discussion_r1565191680 ## pulsar-common/src/main/java/org/apache/pulsar/common/mutable/AtomicMutableBoolean.java: ## Review Comment: Oh yes, there's another discussion about the same matter: https://github.com/apache/pulsar/pull/1/files/d881f5416b1b5ce2863238d74ace38889bb2f134#r1531505030 . Using Optional instead of `null`. Yes that works too. However in the Pulsar code style of minimizing allocations, we usually omit the usage of Optional and use `null`s as empty values. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [ml] Add entry fail due to race condition about add entry failed/timeout and switch ledger [pulsar]
lhotari commented on code in PR #1: URL: https://github.com/apache/pulsar/pull/1#discussion_r1565190547 ## pulsar-common/src/main/java/org/apache/pulsar/common/mutable/AtomicMutableBoolean.java: ## Review Comment: Now I noticed the comment about the ledger switch, requiring a new instance. https://github.com/apache/pulsar/pull/1#discussion_r1531504225 . I guess it might not be possible to get rid of `volatile` for the field. However, also in that case, I would recommend using `AtomicBoolean` instead of adding this `AtomicMutableBoolean` class. It seems that a `null` value can be passed when there's no timeout in use? Wasn't the no-timeout case the reason to use MutableBoolean in the first place? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [ml] Add entry fail due to race condition about add entry failed/timeout and switch ledger [pulsar]
lhotari commented on code in PR #1: URL: https://github.com/apache/pulsar/pull/1#discussion_r1565183246 ## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java: ## @@ -375,6 +398,13 @@ protected OpAddEntry newObject(Recycler.Handle recyclerHandle) { } }; +public void recycleAfterClosed() { +if (STATE_UPDATER.get(this) != State.CLOSED) { +recycle(); +} +closeFuture.thenAccept(ignore -> recycle()); +} Review Comment: I think that there might be a better way to address this using reference counting. Another alternative is to skip recycling in cases where there's a potential issue. Recycling is not mandatory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [ml] Add entry fail due to race condition about add entry failed/timeout and switch ledger [pulsar]
lhotari commented on code in PR #1: URL: https://github.com/apache/pulsar/pull/1#discussion_r1565183550 ## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java: ## @@ -176,7 +186,7 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx) if (!STATE_UPDATER.compareAndSet(OpAddEntry.this, State.INITIATED, State.COMPLETED)) { log.warn("[{}] The add op is terminal legacy callback for entry {}-{} adding.", ml.getName(), lh.getId(), entryId); -OpAddEntry.this.recycle(); +OpAddEntry.this.recycleAfterClosed(); Review Comment: Instead of adding `recycleAfterClosed`, let's just skip recycling in cases that cause problems. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [ml] Add entry fail due to race condition about add entry failed/timeout and switch ledger [pulsar]
lhotari commented on code in PR #1: URL: https://github.com/apache/pulsar/pull/1#discussion_r1565180673 ## pulsar-common/src/main/java/org/apache/pulsar/common/mutable/AtomicMutableBoolean.java: ## Review Comment: > The value of Java's AtomicBoolean is modifier by volatile, it will reduce the performance. @poorbarcode Have you noticed that the field `currentLedgerTimeoutTriggered` itself is a `volatile`? `private volatile MutableBoolean currentLedgerTimeoutTriggered;`. That will "reduce performance" each time the field is read. Using `private final AtomicBoolean currentLedgerTimeoutTriggered = new AtomicBoolean(false);` would prevent that. Therefore I think that we shouldn't add the `AtomicMutableBoolean` class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [pulsar] pulsar standalone to have embedded zookeeper [pulsar]
nareshv closed issue #22502: [pulsar] pulsar standalone to have embedded zookeeper URL: https://github.com/apache/pulsar/issues/22502 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] [pulsar] pulsar standalone to have embedded zookeeper [pulsar]
nareshv opened a new issue, #22502: URL: https://github.com/apache/pulsar/issues/22502 ### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Motivation We've been evaluating pulsar standalone for edge/low-footprint use cases and seem to work pretty well. As mentioned in https://github.com/apache/pulsar/discussions/22438#discussioncomment-9018919 we would like to make the pulsar standalone robust with an embedded zookeeper. ### Solution `PulsarStandalone.java` supports multiple command line parameters. Adding `--run-zookeeper` flag to start the zookeeper on the same JVM ### Alternatives Running zookeeper as a separate process. -- We have to rule this option due to large fleet management needs. A single JVM management with (Pulsar + Zookeeper + Bookeeper) is much more deployment friendly on tiny devices than multiple processes. ### Anything else? _No response_ ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) 01/02: Revert "[fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)"
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit f91b519f56622602dd2bd18ec3fa55748debd27f Author: fengyubiao AuthorDate: Mon Apr 15 12:07:48 2024 +0800 Revert "[fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)" This reverts commit 7cec82f99c2c729e0e9ac42f7176eadf4039ae1a. --- .../pulsar/broker/service/BrokerService.java | 4 +- .../service/persistent/MessageDeduplication.java | 18 +-- .../broker/service/persistent/PersistentTopic.java | 2 +- .../DeduplicationDisabledBrokerLevelTest.java | 161 - 4 files changed, 8 insertions(+), 177 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index f19b3436f7b..d798d9e672e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -576,10 +576,8 @@ public class BrokerService implements Closeable { } protected void startDeduplicationSnapshotMonitor() { -// We do not know whether users will enable deduplication on namespace level/topic level or not, so keep this -// scheduled task runs. int interval = pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds(); -if (interval > 0) { +if (interval > 0 && pulsar().getConfiguration().isBrokerDeduplicationEnabled()) { this.deduplicationSnapshotMonitor = OrderedScheduler.newSchedulerBuilder() .name("deduplication-snapshot-monitor") .numThreads(1) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index e508661364d..802dd917961 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -157,14 +157,9 @@ public class MessageDeduplication { // Replay all the entries and apply all the sequence ids updates log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries()); -CompletableFuture future = new CompletableFuture<>(); +CompletableFuture future = new CompletableFuture<>(); replayCursor(future); -return future.thenAccept(lastPosition -> { -if (lastPosition != null && snapshotCounter >= snapshotInterval) { -snapshotCounter = 0; -takeSnapshot(lastPosition); -} -}); +return future; } /** @@ -173,11 +168,11 @@ public class MessageDeduplication { * * @param future future to trigger when the replay is complete */ -private void replayCursor(CompletableFuture future) { +private void replayCursor(CompletableFuture future) { managedCursor.asyncReadEntries(100, new ReadEntriesCallback() { @Override public void readEntriesComplete(List entries, Object ctx) { -Position lastPosition = null; + for (Entry entry : entries) { ByteBuf messageMetadataAndPayload = entry.getDataBuffer(); MessageMetadata md = Commands.parseMessageMetadata(messageMetadataAndPayload); @@ -187,8 +182,7 @@ public class MessageDeduplication { highestSequencedPushed.put(producerName, sequenceId); highestSequencedPersisted.put(producerName, sequenceId); producerRemoved(producerName); -snapshotCounter++; -lastPosition = entry.getPosition(); + entry.release(); } @@ -197,7 +191,7 @@ public class MessageDeduplication { pulsar.getExecutor().execute(() -> replayCursor(future)); } else { // Done replaying -future.complete(lastPosition); +future.complete(null); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 29df6e78cd1..be1b873cc21 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -204,7 +204,7 @@ public class PersistentTopic
(pulsar) 02/02: [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit f5c853a4e8a8d1ebce3166aff12715a068cd03cb Author: fengyubiao AuthorDate: Mon Apr 15 12:11:04 2024 +0800 [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479) (cherry picked from commit 837f8bca7ddbbad4354f9a89e36fcd6aea1be85c) --- .../pulsar/broker/service/BrokerService.java | 4 +- .../service/persistent/MessageDeduplication.java | 18 ++- .../broker/service/persistent/PersistentTopic.java | 2 +- .../DeduplicationDisabledBrokerLevelTest.java | 163 + 4 files changed, 179 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index d798d9e672e..f19b3436f7b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -576,8 +576,10 @@ public class BrokerService implements Closeable { } protected void startDeduplicationSnapshotMonitor() { +// We do not know whether users will enable deduplication on namespace level/topic level or not, so keep this +// scheduled task runs. int interval = pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds(); -if (interval > 0 && pulsar().getConfiguration().isBrokerDeduplicationEnabled()) { +if (interval > 0) { this.deduplicationSnapshotMonitor = OrderedScheduler.newSchedulerBuilder() .name("deduplication-snapshot-monitor") .numThreads(1) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 802dd917961..e508661364d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -157,9 +157,14 @@ public class MessageDeduplication { // Replay all the entries and apply all the sequence ids updates log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries()); -CompletableFuture future = new CompletableFuture<>(); +CompletableFuture future = new CompletableFuture<>(); replayCursor(future); -return future; +return future.thenAccept(lastPosition -> { +if (lastPosition != null && snapshotCounter >= snapshotInterval) { +snapshotCounter = 0; +takeSnapshot(lastPosition); +} +}); } /** @@ -168,11 +173,11 @@ public class MessageDeduplication { * * @param future future to trigger when the replay is complete */ -private void replayCursor(CompletableFuture future) { +private void replayCursor(CompletableFuture future) { managedCursor.asyncReadEntries(100, new ReadEntriesCallback() { @Override public void readEntriesComplete(List entries, Object ctx) { - +Position lastPosition = null; for (Entry entry : entries) { ByteBuf messageMetadataAndPayload = entry.getDataBuffer(); MessageMetadata md = Commands.parseMessageMetadata(messageMetadataAndPayload); @@ -182,7 +187,8 @@ public class MessageDeduplication { highestSequencedPushed.put(producerName, sequenceId); highestSequencedPersisted.put(producerName, sequenceId); producerRemoved(producerName); - +snapshotCounter++; +lastPosition = entry.getPosition(); entry.release(); } @@ -191,7 +197,7 @@ public class MessageDeduplication { pulsar.getExecutor().execute(() -> replayCursor(future)); } else { // Done replaying -future.complete(null); +future.complete(lastPosition); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index be1b873cc21..29df6e78cd1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -204,7 +204,7 @@ public class PersistentTopic
(pulsar) branch branch-3.2 updated (7cec82f99c2 -> f5c853a4e8a)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a change to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git from 7cec82f99c2 [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479) new f91b519f566 Revert "[fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)" new f5c853a4e8a [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-)
(pulsar) branch branch-2.10 updated: [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.10 by this push: new 68554056bbc [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479) 68554056bbc is described below commit 68554056bbc4a3095446430255e937ce88f48088 Author: fengyubiao AuthorDate: Mon Apr 15 00:13:49 2024 +0800 [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479) (cherry picked from commit 837f8bca7ddbbad4354f9a89e36fcd6aea1be85c) --- .../pulsar/broker/service/BrokerService.java | 4 +- .../service/persistent/MessageDeduplication.java | 18 ++- .../broker/service/persistent/PersistentTopic.java | 2 +- .../DeduplicationDisabledBrokerLevelTest.java | 165 + 4 files changed, 181 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 48b859bd11b..c53085347db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -533,8 +533,10 @@ public class BrokerService implements Closeable { } protected void startDeduplicationSnapshotMonitor() { +// We do not know whether users will enable deduplication on namespace level/topic level or not, so keep this +// scheduled task runs. int interval = pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds(); -if (interval > 0 && pulsar().getConfiguration().isBrokerDeduplicationEnabled()) { +if (interval > 0) { this.deduplicationSnapshotMonitor = Executors.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory( "deduplication-snapshot-monitor")); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 4c3d8e1a467..c3299f23f9f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -153,9 +153,14 @@ public class MessageDeduplication { // Replay all the entries and apply all the sequence ids updates log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries()); -CompletableFuture future = new CompletableFuture<>(); +CompletableFuture future = new CompletableFuture<>(); replayCursor(future); -return future; +return future.thenAccept(lastPosition -> { +if (lastPosition != null && snapshotCounter >= snapshotInterval) { +snapshotCounter = 0; +takeSnapshot(lastPosition); +} +}); } /** @@ -164,11 +169,11 @@ public class MessageDeduplication { * * @param future future to trigger when the replay is complete */ -private void replayCursor(CompletableFuture future) { +private void replayCursor(CompletableFuture future) { managedCursor.asyncReadEntries(100, new ReadEntriesCallback() { @Override public void readEntriesComplete(List entries, Object ctx) { - +Position lastPosition = null; for (Entry entry : entries) { ByteBuf messageMetadataAndPayload = entry.getDataBuffer(); MessageMetadata md = Commands.parseMessageMetadata(messageMetadataAndPayload); @@ -178,7 +183,8 @@ public class MessageDeduplication { highestSequencedPushed.put(producerName, sequenceId); highestSequencedPersisted.put(producerName, sequenceId); producerRemoved(producerName); - +snapshotCounter++; +lastPosition = entry.getPosition(); entry.release(); } @@ -187,7 +193,7 @@ public class MessageDeduplication { pulsar.getExecutor().execute(() -> replayCursor(future)); } else { // Done replaying -future.complete(null); +future.complete(lastPosition); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index
(pulsar) branch branch-2.11 updated: [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 97518ee893c [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479) 97518ee893c is described below commit 97518ee893c8c4db6b75db2e1440570e813b08f8 Author: fengyubiao AuthorDate: Mon Apr 15 00:13:49 2024 +0800 [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479) (cherry picked from commit 837f8bca7ddbbad4354f9a89e36fcd6aea1be85c) --- .../pulsar/broker/service/BrokerService.java | 4 +- .../service/persistent/MessageDeduplication.java | 18 ++- .../broker/service/persistent/PersistentTopic.java | 2 +- .../DeduplicationDisabledBrokerLevelTest.java | 163 + 4 files changed, 179 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 3643903dd7c..b921c4b5dc6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -550,8 +550,10 @@ public class BrokerService implements Closeable { } protected void startDeduplicationSnapshotMonitor() { +// We do not know whether users will enable deduplication on namespace level/topic level or not, so keep this +// scheduled task runs. int interval = pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds(); -if (interval > 0 && pulsar().getConfiguration().isBrokerDeduplicationEnabled()) { +if (interval > 0) { this.deduplicationSnapshotMonitor = Executors.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory( "deduplication-snapshot-monitor")); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index ba2600d9134..235ed0fcaa8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -157,9 +157,14 @@ public class MessageDeduplication { // Replay all the entries and apply all the sequence ids updates log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries()); -CompletableFuture future = new CompletableFuture<>(); +CompletableFuture future = new CompletableFuture<>(); replayCursor(future); -return future; +return future.thenAccept(lastPosition -> { +if (lastPosition != null && snapshotCounter >= snapshotInterval) { +snapshotCounter = 0; +takeSnapshot(lastPosition); +} +}); } /** @@ -168,11 +173,11 @@ public class MessageDeduplication { * * @param future future to trigger when the replay is complete */ -private void replayCursor(CompletableFuture future) { +private void replayCursor(CompletableFuture future) { managedCursor.asyncReadEntries(100, new ReadEntriesCallback() { @Override public void readEntriesComplete(List entries, Object ctx) { - +Position lastPosition = null; for (Entry entry : entries) { ByteBuf messageMetadataAndPayload = entry.getDataBuffer(); MessageMetadata md = Commands.parseMessageMetadata(messageMetadataAndPayload); @@ -182,7 +187,8 @@ public class MessageDeduplication { highestSequencedPushed.put(producerName, sequenceId); highestSequencedPersisted.put(producerName, sequenceId); producerRemoved(producerName); - +snapshotCounter++; +lastPosition = entry.getPosition(); entry.release(); } @@ -191,7 +197,7 @@ public class MessageDeduplication { pulsar.getExecutor().execute(() -> replayCursor(future)); } else { // Done replaying -future.complete(null); +future.complete(lastPosition); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index
(pulsar) branch branch-3.0 updated: [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 0fbcbb24039 [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479) 0fbcbb24039 is described below commit 0fbcbb240394b241e940225c2067ccf1ac3d3b32 Author: fengyubiao AuthorDate: Mon Apr 15 10:59:47 2024 +0800 [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479) (cherry picked from commit 837f8bca7ddbbad4354f9a89e36fcd6aea1be85c) --- .../pulsar/broker/service/BrokerService.java | 4 +- .../service/persistent/MessageDeduplication.java | 18 ++- .../broker/service/persistent/PersistentTopic.java | 2 +- .../DeduplicationDisabledBrokerLevelTest.java | 163 + 4 files changed, 179 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index bfb289678d2..20de00370e1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -580,8 +580,10 @@ public class BrokerService implements Closeable { } protected void startDeduplicationSnapshotMonitor() { +// We do not know whether users will enable deduplication on namespace level/topic level or not, so keep this +// scheduled task runs. int interval = pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds(); -if (interval > 0 && pulsar().getConfiguration().isBrokerDeduplicationEnabled()) { +if (interval > 0) { this.deduplicationSnapshotMonitor = OrderedScheduler.newSchedulerBuilder() .name("deduplication-snapshot-monitor") .numThreads(1) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 802dd917961..e508661364d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -157,9 +157,14 @@ public class MessageDeduplication { // Replay all the entries and apply all the sequence ids updates log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries()); -CompletableFuture future = new CompletableFuture<>(); +CompletableFuture future = new CompletableFuture<>(); replayCursor(future); -return future; +return future.thenAccept(lastPosition -> { +if (lastPosition != null && snapshotCounter >= snapshotInterval) { +snapshotCounter = 0; +takeSnapshot(lastPosition); +} +}); } /** @@ -168,11 +173,11 @@ public class MessageDeduplication { * * @param future future to trigger when the replay is complete */ -private void replayCursor(CompletableFuture future) { +private void replayCursor(CompletableFuture future) { managedCursor.asyncReadEntries(100, new ReadEntriesCallback() { @Override public void readEntriesComplete(List entries, Object ctx) { - +Position lastPosition = null; for (Entry entry : entries) { ByteBuf messageMetadataAndPayload = entry.getDataBuffer(); MessageMetadata md = Commands.parseMessageMetadata(messageMetadataAndPayload); @@ -182,7 +187,8 @@ public class MessageDeduplication { highestSequencedPushed.put(producerName, sequenceId); highestSequencedPersisted.put(producerName, sequenceId); producerRemoved(producerName); - +snapshotCounter++; +lastPosition = entry.getPosition(); entry.release(); } @@ -191,7 +197,7 @@ public class MessageDeduplication { pulsar.getExecutor().execute(() -> replayCursor(future)); } else { // Done replaying -future.complete(null); +future.complete(lastPosition); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index a6087490744..1363ca0945d 100644 ---
(pulsar) branch branch-3.2 updated: [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 7cec82f99c2 [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479) 7cec82f99c2 is described below commit 7cec82f99c2c729e0e9ac42f7176eadf4039ae1a Author: fengyubiao AuthorDate: Mon Apr 15 00:13:49 2024 +0800 [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479) (cherry picked from commit 837f8bca7ddbbad4354f9a89e36fcd6aea1be85c) --- .../pulsar/broker/service/BrokerService.java | 4 +- .../service/persistent/MessageDeduplication.java | 18 ++- .../broker/service/persistent/PersistentTopic.java | 2 +- .../DeduplicationDisabledBrokerLevelTest.java | 161 + 4 files changed, 177 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index d798d9e672e..f19b3436f7b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -576,8 +576,10 @@ public class BrokerService implements Closeable { } protected void startDeduplicationSnapshotMonitor() { +// We do not know whether users will enable deduplication on namespace level/topic level or not, so keep this +// scheduled task runs. int interval = pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds(); -if (interval > 0 && pulsar().getConfiguration().isBrokerDeduplicationEnabled()) { +if (interval > 0) { this.deduplicationSnapshotMonitor = OrderedScheduler.newSchedulerBuilder() .name("deduplication-snapshot-monitor") .numThreads(1) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 802dd917961..e508661364d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -157,9 +157,14 @@ public class MessageDeduplication { // Replay all the entries and apply all the sequence ids updates log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries()); -CompletableFuture future = new CompletableFuture<>(); +CompletableFuture future = new CompletableFuture<>(); replayCursor(future); -return future; +return future.thenAccept(lastPosition -> { +if (lastPosition != null && snapshotCounter >= snapshotInterval) { +snapshotCounter = 0; +takeSnapshot(lastPosition); +} +}); } /** @@ -168,11 +173,11 @@ public class MessageDeduplication { * * @param future future to trigger when the replay is complete */ -private void replayCursor(CompletableFuture future) { +private void replayCursor(CompletableFuture future) { managedCursor.asyncReadEntries(100, new ReadEntriesCallback() { @Override public void readEntriesComplete(List entries, Object ctx) { - +Position lastPosition = null; for (Entry entry : entries) { ByteBuf messageMetadataAndPayload = entry.getDataBuffer(); MessageMetadata md = Commands.parseMessageMetadata(messageMetadataAndPayload); @@ -182,7 +187,8 @@ public class MessageDeduplication { highestSequencedPushed.put(producerName, sequenceId); highestSequencedPersisted.put(producerName, sequenceId); producerRemoved(producerName); - +snapshotCounter++; +lastPosition = entry.getPosition(); entry.release(); } @@ -191,7 +197,7 @@ public class MessageDeduplication { pulsar.getExecutor().execute(() -> replayCursor(future)); } else { // Done replaying -future.complete(null); +future.complete(lastPosition); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index be1b873cc21..29df6e78cd1 100644 ---
(pulsar) branch branch-2.11 updated: [fix][txn]Handle exceptions in the transaction pending ack init (#21274)
This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new e583dc86c3a [fix][txn]Handle exceptions in the transaction pending ack init (#21274) e583dc86c3a is described below commit e583dc86c3a999a33cdbd5ec058578d3e2a8fcd0 Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com> AuthorDate: Mon Apr 15 08:43:12 2024 +0800 [fix][txn]Handle exceptions in the transaction pending ack init (#21274) Co-authored-by: Baodi Shi (cherry picked from commit 5d18ff7b70f9de3b95d83f6a8fd4756b1c34567b) --- .../apache/pulsar/broker/service/ServerCnx.java| 2 +- .../pendingack/impl/PendingAckHandleImpl.java | 55 +-- .../pulsar/broker/transaction/TransactionTest.java | 2 +- .../pendingack/PendingAckPersistentTest.java | 82 ++ 4 files changed, 133 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index d6b59e0b596..224a4489478 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1215,7 +1215,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { // back to client, only if not completed already. if (consumerFuture.completeExceptionally(exception)) { commandSender.sendErrorResponse(requestId, - BrokerServiceException.getClientErrorCode(exception), + BrokerServiceException.getClientErrorCode(exception.getCause()), exception.getCause().getMessage()); } consumers.remove(consumerId, consumerFuture); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 5772efcce5c..33818336d91 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -21,6 +21,8 @@ package org.apache.pulsar.broker.transaction.pendingack.impl; import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet; import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet; import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap; +import com.google.common.annotations.VisibleForTesting; +import io.netty.util.Timer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -34,9 +36,11 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -44,6 +48,7 @@ import org.apache.commons.collections4.map.LinkedMap; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; import org.apache.pulsar.broker.service.Consumer; @@ -52,7 +57,9 @@ import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats; import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore; import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats; import org.apache.pulsar.common.policies.data.TransactionPendingAckStats; @@ -133,6
Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
gaoran10 commented on code in PR #21946: URL: https://github.com/apache/pulsar/pull/21946#discussion_r1565113797 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java: ## @@ -188,58 +270,135 @@ protected CompletableFuture isLocalTopicActive() { }, brokerService.executor()); } -protected synchronized CompletableFuture closeProducerAsync() { -if (producer == null) { -STATE_UPDATER.set(this, State.Stopped); +/** + * This method only be used by {@link PersistentTopic#checkGC} now. + */ +public CompletableFuture disconnect(boolean failIfHasBacklog, boolean closeTheStartingProducer) { +long backlog = getNumberOfEntriesInBacklog(); +if (failIfHasBacklog && backlog > 0) { +CompletableFuture disconnectFuture = new CompletableFuture<>(); +disconnectFuture.completeExceptionally(new TopicBusyException("Cannot close a replicator with backlog")); +if (log.isDebugEnabled()) { +log.debug("[{}] Replicator disconnect failed since topic has backlog", replicatorId); +} +return disconnectFuture; +} +log.info("[{}] Disconnect replicator at position {} with backlog {}", replicatorId, +getReplicatorReadPosition(), backlog); +return closeProducerAsync(closeTheStartingProducer); +} + +/** + * This method only be used by {@link PersistentTopic#checkGC} now. + */ +protected CompletableFuture closeProducerAsync(boolean closeTheStartingProducer) { +Pair setDisconnectingRes = compareSetAndGetState(State.Started, State.Disconnecting); +if (!setDisconnectingRes.getLeft()) { +if (setDisconnectingRes.getRight() == State.Starting) { +if (closeTheStartingProducer) { +/** + * Delay retry(wait for the start producer task is finish). + * Note: If the producer always start fail, the start producer task will always retry until the + * state changed to {@link State.Terminated}. + * Nit: The better solution is creating a {@link CompletableFuture} to trace the in-progress + * creation and call "inProgressCreationFuture.thenApply(closeProducer())". + */ +long waitTimeMs = backOff.next(); +brokerService.executor().schedule(() -> closeProducerAsync(true), +waitTimeMs, TimeUnit.MILLISECONDS); +} else { +log.info("[{}] Skip current producer closing since the previous producer has been closed," ++ " and trying start a new one, state : {}", +replicatorId, setDisconnectingRes.getRight()); +} +} else if (setDisconnectingRes.getRight() == State.Disconnected +|| setDisconnectingRes.getRight() == State.Disconnecting) { +log.info("[{}] Skip current producer closing since other thread did closing, state : {}", +replicatorId, setDisconnectingRes.getRight()); +} else if (setDisconnectingRes.getRight() == State.Terminating +|| setDisconnectingRes.getRight() == State.Terminated) { +log.info("[{}] Skip current producer closing since other thread is doing termination, state : {}", +replicatorId, state); +} +log.info("[{}] Skip current termination since other thread is doing close producer or termination," ++ " state : {}", replicatorId, state); return CompletableFuture.completedFuture(null); } -CompletableFuture future = producer.closeAsync(); + +// Close producer and update state. +return doCloseProducerAsync(producer, () -> { +Pair setDisconnectedRes = compareSetAndGetState(State.Disconnecting, State.Disconnected); +if (setDisconnectedRes.getLeft()) { +this.producer = null; +// deactivate further read +disableReplicatorRead(); +return; +} +if (setDisconnectedRes.getRight() == State.Terminating +|| setDisconnectingRes.getRight() == State.Terminated) { +log.info("[{}] Skip setting state to terminated because it was terminated, state : {}", +replicatorId, state); +} else { +// Since only one task can call "doCloseProducerAsync(producer, action)", this scenario is not expected. +// So print a warn log. +log.warn("[{}] Other task has change the state to terminated. so skipped current one task." ++ " State is : {}", +
(pulsar) branch branch-3.0 updated: [fix][txn]Handle exceptions in the transaction pending ack init (#21274)
This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 000ee6679e6 [fix][txn]Handle exceptions in the transaction pending ack init (#21274) 000ee6679e6 is described below commit 000ee6679e6c1b0f1ecbc867bbe0ab3d0c542a55 Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com> AuthorDate: Mon Apr 15 08:43:12 2024 +0800 [fix][txn]Handle exceptions in the transaction pending ack init (#21274) Co-authored-by: Baodi Shi (cherry picked from commit 5d18ff7b70f9de3b95d83f6a8fd4756b1c34567b) --- .../apache/pulsar/broker/service/ServerCnx.java| 2 +- .../pendingack/impl/PendingAckHandleImpl.java | 54 -- .../pulsar/broker/transaction/TransactionTest.java | 2 +- .../pendingack/PendingAckPersistentTest.java | 82 ++ 4 files changed, 132 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 2b2cbc5fac2..b47292268b4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1319,7 +1319,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { // Send error back to client, only if not completed already. if (consumerFuture.completeExceptionally(exception)) { commandSender.sendErrorResponse(requestId, - BrokerServiceException.getClientErrorCode(exception), + BrokerServiceException.getClientErrorCode(exception.getCause()), exception.getCause().getMessage()); } consumers.remove(consumerId, consumerFuture); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 7dbe0385fd7..5ed271c6fd4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -22,6 +22,7 @@ import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet; import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet; import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap; import com.google.common.annotations.VisibleForTesting; +import io.netty.util.Timer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -35,9 +36,11 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -45,6 +48,7 @@ import org.apache.commons.collections4.map.LinkedMap; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; import org.apache.pulsar.broker.service.Consumer; @@ -53,7 +57,9 @@ import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats; import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore; import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats; import org.apache.pulsar.common.policies.data.TransactionPendingAckStats; @@ -134,6 +140,12 @@ public class PendingAckHandleImpl extends
Re: [PR] [improve][test] Add topic operation checker for topic API [pulsar]
Technoboy- closed pull request #22468: [improve][test] Add topic operation checker for topic API URL: https://github.com/apache/pulsar/pull/22468 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug] We are leaving old releases in the ASF distribution area. Please cleanup. [pulsar]
Technoboy- commented on issue #22486: URL: https://github.com/apache/pulsar/issues/22486#issuecomment-2054279703 Cleaned up. Please take a look @dave2wave -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
svn commit: r68514 - /release/pulsar/pulsar-client-cpp-3.4.0/
Author: technoboy Date: Mon Apr 15 01:28:45 2024 New Revision: 68514 Log: cleanup Removed: release/pulsar/pulsar-client-cpp-3.4.0/
svn commit: r68517 - /release/pulsar/pulsar-client-cpp-3.1.1/
Author: technoboy Date: Mon Apr 15 01:29:42 2024 New Revision: 68517 Log: cleanup Removed: release/pulsar/pulsar-client-cpp-3.1.1/
svn commit: r68516 - /release/pulsar/pulsar-client-cpp-3.1.0/
Author: technoboy Date: Mon Apr 15 01:29:27 2024 New Revision: 68516 Log: cleanup Removed: release/pulsar/pulsar-client-cpp-3.1.0/
svn commit: r68515 - /release/pulsar/pulsar-client-cpp-3.4.1/
Author: technoboy Date: Mon Apr 15 01:28:58 2024 New Revision: 68515 Log: cleanup Removed: release/pulsar/pulsar-client-cpp-3.4.1/
svn commit: r68513 - /release/pulsar/pulsar-client-go-0.11.0/
Author: technoboy Date: Mon Apr 15 01:28:14 2024 New Revision: 68513 Log: cleanup Removed: release/pulsar/pulsar-client-go-0.11.0/
svn commit: r68512 - /release/pulsar/pulsar-dotpulsar-3.1.1/
Author: technoboy Date: Mon Apr 15 01:27:45 2024 New Revision: 68512 Log: cleanup Removed: release/pulsar/pulsar-dotpulsar-3.1.1/
svn commit: r68511 - /release/pulsar/pulsar-dotpulsar-3.1.0/
Author: technoboy Date: Mon Apr 15 01:27:29 2024 New Revision: 68511 Log: cleanup Removed: release/pulsar/pulsar-dotpulsar-3.1.0/
svn commit: r68510 - /release/pulsar/pulsar-client-go-0.12.0/
Author: technoboy Date: Mon Apr 15 01:26:17 2024 New Revision: 68510 Log: cleanup Removed: release/pulsar/pulsar-client-go-0.12.0/
svn commit: r68509 - /release/pulsar/pulsar-client-reactive-0.5.2/
Author: technoboy Date: Mon Apr 15 01:25:44 2024 New Revision: 68509 Log: cleanup Removed: release/pulsar/pulsar-client-reactive-0.5.2/
svn commit: r68508 - /release/pulsar/pulsar-client-reactive-0.5.1/
Author: technoboy Date: Mon Apr 15 01:25:31 2024 New Revision: 68508 Log: cleanup Removed: release/pulsar/pulsar-client-reactive-0.5.1/
svn commit: r68507 - /release/pulsar/pulsar-client-reactive-0.5.0/
Author: technoboy Date: Mon Apr 15 01:25:17 2024 New Revision: 68507 Log: cleanup Removed: release/pulsar/pulsar-client-reactive-0.5.0/
svn commit: r68506 - /release/pulsar/pulsar-3.2.1/
Author: technoboy Date: Mon Apr 15 01:24:19 2024 New Revision: 68506 Log: cleanup Removed: release/pulsar/pulsar-3.2.1/
svn commit: r68505 - /release/pulsar/pulsar-3.2.0/
Author: technoboy Date: Mon Apr 15 01:23:56 2024 New Revision: 68505 Log: cleanup Removed: release/pulsar/pulsar-3.2.0/
svn commit: r68502 - /release/pulsar/pulsar-3.1.0/
Author: technoboy Date: Mon Apr 15 01:22:31 2024 New Revision: 68502 Log: cleanup Removed: release/pulsar/pulsar-3.1.0/
svn commit: r68504 - /release/pulsar/pulsar-3.1.2/
Author: technoboy Date: Mon Apr 15 01:23:19 2024 New Revision: 68504 Log: cleanup Removed: release/pulsar/pulsar-3.1.2/
svn commit: r68503 - /release/pulsar/pulsar-3.1.1/
Author: technoboy Date: Mon Apr 15 01:22:58 2024 New Revision: 68503 Log: cleanup Removed: release/pulsar/pulsar-3.1.1/
svn commit: r68501 - /release/pulsar/pulsar-3.0.3/
Author: technoboy Date: Mon Apr 15 01:22:04 2024 New Revision: 68501 Log: cleanup Removed: release/pulsar/pulsar-3.0.3/
svn commit: r68500 - /release/pulsar/pulsar-3.0.2/
Author: technoboy Date: Mon Apr 15 01:21:51 2024 New Revision: 68500 Log: cleanup Removed: release/pulsar/pulsar-3.0.2/
svn commit: r68499 - /release/pulsar/pulsar-3.0.1/
Author: technoboy Date: Mon Apr 15 01:21:23 2024 New Revision: 68499 Log: cleanup Removed: release/pulsar/pulsar-3.0.1/
svn commit: r68498 - /release/pulsar/pulsar-3.0.0/
Author: technoboy Date: Mon Apr 15 01:21:08 2024 New Revision: 68498 Log: cleanup Removed: release/pulsar/pulsar-3.0.0/
svn commit: r68496 - /release/pulsar/pulsar-2.11.3/
Author: technoboy Date: Mon Apr 15 01:20:06 2024 New Revision: 68496 Log: cleanup Removed: release/pulsar/pulsar-2.11.3/
svn commit: r68497 - /release/pulsar/pulsar-2.9.4/
Author: technoboy Date: Mon Apr 15 01:20:37 2024 New Revision: 68497 Log: cleanup Removed: release/pulsar/pulsar-2.9.4/
svn commit: r68493 - /release/pulsar/pulsar-2.11.0/
Author: technoboy Date: Mon Apr 15 01:19:17 2024 New Revision: 68493 Log: cleanup Removed: release/pulsar/pulsar-2.11.0/
svn commit: r68494 - /release/pulsar/pulsar-2.11.1/
Author: technoboy Date: Mon Apr 15 01:19:42 2024 New Revision: 68494 Log: cleanup Removed: release/pulsar/pulsar-2.11.1/
svn commit: r68495 - /release/pulsar/pulsar-2.11.2/
Author: technoboy Date: Mon Apr 15 01:19:53 2024 New Revision: 68495 Log: cleanup Removed: release/pulsar/pulsar-2.11.2/
svn commit: r68490 - /release/pulsar/pulsar-2.10.3/
Author: technoboy Date: Mon Apr 15 01:18:19 2024 New Revision: 68490 Log: cleanup Removed: release/pulsar/pulsar-2.10.3/
svn commit: r68492 - /release/pulsar/pulsar-2.10.5/
Author: technoboy Date: Mon Apr 15 01:18:56 2024 New Revision: 68492 Log: cleanup Removed: release/pulsar/pulsar-2.10.5/
svn commit: r68491 - /release/pulsar/pulsar-2.10.4/
Author: technoboy Date: Mon Apr 15 01:18:39 2024 New Revision: 68491 Log: cleanup Removed: release/pulsar/pulsar-2.10.4/
svn commit: r68489 - /release/pulsar/pulsar-2.10.2/
Author: technoboy Date: Mon Apr 15 01:17:31 2024 New Revision: 68489 Log: cleanup Removed: release/pulsar/pulsar-2.10.2/
Re: [PR] [fix][fn]make sure the classloader for ContextImpl is `functionClassLoader` in different runtimes [pulsar]
freeznet closed pull request #22501: [fix][fn]make sure the classloader for ContextImpl is `functionClassLoader` in different runtimes URL: https://github.com/apache/pulsar/pull/22501 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix][fn]make sure the classloader for ContextImpl is `functionClassLoader` in different runtimes [pulsar]
freeznet opened a new pull request, #22501: URL: https://github.com/apache/pulsar/pull/22501 Fixes #xyz Main Issue: #xyz PIP: #xyz ### Motivation https://github.com/apache/pulsar/pull/20115 fixes the classloader in TopicSchema, but for Kubernetes Runtime, the classloader may be different from the other runtimes. Which caused the SerDe to fail to init in Kubernetes Runtime. ### Modifications - make sure the classloader always be `functionClassLoader` when init the context ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: https://github.com/freeznet/pulsar/pull/11 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
gaoran10 commented on code in PR #21946: URL: https://github.com/apache/pulsar/pull/21946#discussion_r1565026654 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java: ## @@ -134,30 +137,44 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man } @Override -protected void readEntries(Producer producer) { -// Rewind the cursor to be sure to read again all non-acked messages sent while restarting +protected void setProducerAndTriggerReadEntries(Producer producer) { +// Rewind the cursor to be sure to read again all non-acked messages sent while restarting. cursor.rewind(); - cursor.cancelPendingReadRequest(); -HAVE_PENDING_READ_UPDATER.set(this, FALSE); -this.producer = (ProducerImpl) producer; -if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) { -log.info("[{}] Created replicator producer", replicatorId); +/** + * 1. Try change state to {@link Started}. + * 2. Atoms modify multiple properties if change state success, to avoid another thread get a null value + *producer when the state is {@link Started}. + */ +Pair changeStateRes; +changeStateRes = compareSetAndGetState(Starting, Started); +if (changeStateRes.getLeft()) { +this.producer = (ProducerImpl) producer; +HAVE_PENDING_READ_UPDATER.set(this, FALSE); +// Trigger a new read. +log.info("[{}] Created replicator producer, Replicator state: {}", replicatorId, state); backOff.reset(); -// activate cursor: so, entries can be cached +// activate cursor: so, entries can be cached. this.cursor.setActive(); // read entries readMoreEntries(); } else { -log.info( -"[{}] Replicator was stopped while creating the producer." -+ " Closing it. Replicator state: {}", -replicatorId, STATE_UPDATER.get(this)); -STATE_UPDATER.set(this, State.Stopping); -closeProducerAsync(); +if (changeStateRes.getRight() == Started) { +// Since only one task can call "producerBuilder.createAsync()", this scenario is not expected. +// So print a warn log. +log.warn("[{}] Replicator was already started by another thread while creating the producer." ++ " Closing the producer newly created. Replicator state: {}", replicatorId, state); +} else if (changeStateRes.getRight() == Terminating || changeStateRes.getRight() == Terminated) { +log.info("[{}] Replicator was terminated, so close the producer. Replicator state: {}", Review Comment: Got it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
gaoran10 commented on code in PR #21946: URL: https://github.com/apache/pulsar/pull/21946#discussion_r1565025191 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java: ## @@ -134,30 +137,44 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man } @Override -protected void readEntries(Producer producer) { -// Rewind the cursor to be sure to read again all non-acked messages sent while restarting +protected void setProducerAndTriggerReadEntries(Producer producer) { +// Rewind the cursor to be sure to read again all non-acked messages sent while restarting. cursor.rewind(); - cursor.cancelPendingReadRequest(); -HAVE_PENDING_READ_UPDATER.set(this, FALSE); -this.producer = (ProducerImpl) producer; Review Comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.1 updated: [fix][txn]Handle exceptions in the transaction pending ack init (#21274)
This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new ce99a96bb5b [fix][txn]Handle exceptions in the transaction pending ack init (#21274) ce99a96bb5b is described below commit ce99a96bb5b088dd37dc2ecb97ef7bb9d3ae3deb Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com> AuthorDate: Mon Apr 15 08:43:12 2024 +0800 [fix][txn]Handle exceptions in the transaction pending ack init (#21274) Co-authored-by: Baodi Shi (cherry picked from commit 5d18ff7b70f9de3b95d83f6a8fd4756b1c34567b) --- .../apache/pulsar/broker/service/ServerCnx.java| 2 +- .../pendingack/impl/PendingAckHandleImpl.java | 54 -- .../pulsar/broker/transaction/TransactionTest.java | 2 +- .../pendingack/PendingAckPersistentTest.java | 82 ++ 4 files changed, 132 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 29ba7cb866e..c5342dd3dff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1311,7 +1311,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { // Send error back to client, only if not completed already. if (consumerFuture.completeExceptionally(exception)) { commandSender.sendErrorResponse(requestId, - BrokerServiceException.getClientErrorCode(exception), + BrokerServiceException.getClientErrorCode(exception.getCause()), exception.getCause().getMessage()); } consumers.remove(consumerId, consumerFuture); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 7dbe0385fd7..5ed271c6fd4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -22,6 +22,7 @@ import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet; import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet; import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap; import com.google.common.annotations.VisibleForTesting; +import io.netty.util.Timer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -35,9 +36,11 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -45,6 +48,7 @@ import org.apache.commons.collections4.map.LinkedMap; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; import org.apache.pulsar.broker.service.Consumer; @@ -53,7 +57,9 @@ import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats; import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore; import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats; import org.apache.pulsar.common.policies.data.TransactionPendingAckStats; @@ -134,6 +140,12 @@ public class PendingAckHandleImpl extends
(pulsar) branch branch-3.2 updated: [fix][txn]Handle exceptions in the transaction pending ack init (#21274)
This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new fdbcb6fe0da [fix][txn]Handle exceptions in the transaction pending ack init (#21274) fdbcb6fe0da is described below commit fdbcb6fe0da6967afe55248134b9d3699916f191 Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com> AuthorDate: Mon Apr 15 08:43:12 2024 +0800 [fix][txn]Handle exceptions in the transaction pending ack init (#21274) Co-authored-by: Baodi Shi (cherry picked from commit 5d18ff7b70f9de3b95d83f6a8fd4756b1c34567b) --- .../apache/pulsar/broker/service/ServerCnx.java| 2 +- .../pendingack/impl/PendingAckHandleImpl.java | 54 -- .../pulsar/broker/transaction/TransactionTest.java | 2 +- .../pendingack/PendingAckPersistentTest.java | 82 ++ 4 files changed, 132 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 569c5a2fb1e..40a0ee4a73c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1374,7 +1374,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { // Send error back to client, only if not completed already. if (consumerFuture.completeExceptionally(exception)) { commandSender.sendErrorResponse(requestId, - BrokerServiceException.getClientErrorCode(exception), + BrokerServiceException.getClientErrorCode(exception.getCause()), exception.getCause().getMessage()); } consumers.remove(consumerId, consumerFuture); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 7dbe0385fd7..5ed271c6fd4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -22,6 +22,7 @@ import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet; import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet; import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap; import com.google.common.annotations.VisibleForTesting; +import io.netty.util.Timer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -35,9 +36,11 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -45,6 +48,7 @@ import org.apache.commons.collections4.map.LinkedMap; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; import org.apache.pulsar.broker.service.Consumer; @@ -53,7 +57,9 @@ import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats; import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore; import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats; import org.apache.pulsar.common.policies.data.TransactionPendingAckStats; @@ -134,6 +140,12 @@ public class PendingAckHandleImpl extends
Re: [PR] [fix][test] Flaky-test: PersistentMessageFinderTest.testMessageExpiryWithTimestampNonRecoverableException [pulsar]
codelipenghui commented on PR #22489: URL: https://github.com/apache/pulsar/pull/22489#issuecomment-2054249321 /pulsarbot run-failure-checks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [fix][txn]Handle exceptions in the transaction pending ack init (#21274)
This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 5d18ff7b70f [fix][txn]Handle exceptions in the transaction pending ack init (#21274) 5d18ff7b70f is described below commit 5d18ff7b70f9de3b95d83f6a8fd4756b1c34567b Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com> AuthorDate: Mon Apr 15 08:43:12 2024 +0800 [fix][txn]Handle exceptions in the transaction pending ack init (#21274) Co-authored-by: Baodi Shi --- .../apache/pulsar/broker/service/ServerCnx.java| 2 +- .../pendingack/impl/PendingAckHandleImpl.java | 54 -- .../pulsar/broker/transaction/TransactionTest.java | 2 +- .../pendingack/PendingAckPersistentTest.java | 82 ++ 4 files changed, 132 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4ee6ac43465..a60f1d805ce 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1376,7 +1376,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { // Send error back to client, only if not completed already. if (consumerFuture.completeExceptionally(exception)) { commandSender.sendErrorResponse(requestId, - BrokerServiceException.getClientErrorCode(exception), + BrokerServiceException.getClientErrorCode(exception.getCause()), exception.getCause().getMessage()); } consumers.remove(consumerId, consumerFuture); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 7dbe0385fd7..5ed271c6fd4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -22,6 +22,7 @@ import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet; import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet; import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap; import com.google.common.annotations.VisibleForTesting; +import io.netty.util.Timer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -35,9 +36,11 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -45,6 +48,7 @@ import org.apache.commons.collections4.map.LinkedMap; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; import org.apache.pulsar.broker.service.Consumer; @@ -53,7 +57,9 @@ import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats; import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore; import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats; import org.apache.pulsar.common.policies.data.TransactionPendingAckStats; @@ -134,6 +140,12 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi public final RecoverTimeRecord
Re: [PR] [fix][txn]Handle exceptions in the transaction pending ack init [pulsar]
shibd merged PR #21274: URL: https://github.com/apache/pulsar/pull/21274 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
poorbarcode commented on code in PR #21946: URL: https://github.com/apache/pulsar/pull/21946#discussion_r1564805152 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java: ## @@ -675,30 +692,6 @@ protected void checkReplicatedSubscriptionMarker(Position position, MessageImpl< } } -@Override -public CompletableFuture disconnect() { -return disconnect(false); -} - -@Override -public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) { -final CompletableFuture future = new CompletableFuture<>(); - -super.disconnect(failIfHasBacklog).thenRun(() -> { -dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); Review Comment: Good catch! Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
poorbarcode commented on code in PR #21946: URL: https://github.com/apache/pulsar/pull/21946#discussion_r1564803078 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -3870,6 +3879,27 @@ private void unfenceTopicToResume() { subscriptions.values().forEach(sub -> sub.resumeAfterFence()); isFenced = false; isClosingOrDeleting = false; +replicatorsResume(); +} + +private void replicatorsResume() { +removeTerminatingReplicators(replicators); +removeTerminatingReplicators(shadowReplicators); +checkReplication(); +checkShadowReplication(); +} + +private static void removeTerminatingReplicators(ConcurrentOpenHashMap replicators) { Review Comment: For performance and test, static method is better -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
poorbarcode commented on code in PR #21946: URL: https://github.com/apache/pulsar/pull/21946#discussion_r1564802782 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java: ## @@ -134,30 +137,44 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man } @Override -protected void readEntries(Producer producer) { -// Rewind the cursor to be sure to read again all non-acked messages sent while restarting +protected void setProducerAndTriggerReadEntries(Producer producer) { +// Rewind the cursor to be sure to read again all non-acked messages sent while restarting. cursor.rewind(); - cursor.cancelPendingReadRequest(); -HAVE_PENDING_READ_UPDATER.set(this, FALSE); -this.producer = (ProducerImpl) producer; Review Comment: > Why do we need to move these two lines? It seems that if we don't set producer, the operation doCloseProducerAsync(producer, () -> {}); can't close the producer. We did not remove these two lines, just moved them into the logic branch `compareSetAndGetState(Starting, Started) == true`. >`doCloseProducerAsync(producer, () -> {});` can't close the producer. This logic branch is using the method variable `producer`, not `this.producer`, so it does work -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
poorbarcode commented on code in PR #21946: URL: https://github.com/apache/pulsar/pull/21946#discussion_r1564801185 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java: ## @@ -134,30 +137,44 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man } @Override -protected void readEntries(Producer producer) { -// Rewind the cursor to be sure to read again all non-acked messages sent while restarting +protected void setProducerAndTriggerReadEntries(Producer producer) { +// Rewind the cursor to be sure to read again all non-acked messages sent while restarting. cursor.rewind(); - cursor.cancelPendingReadRequest(); -HAVE_PENDING_READ_UPDATER.set(this, FALSE); -this.producer = (ProducerImpl) producer; -if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) { -log.info("[{}] Created replicator producer", replicatorId); +/** + * 1. Try change state to {@link Started}. + * 2. Atoms modify multiple properties if change state success, to avoid another thread get a null value + *producer when the state is {@link Started}. + */ +Pair changeStateRes; +changeStateRes = compareSetAndGetState(Starting, Started); +if (changeStateRes.getLeft()) { +this.producer = (ProducerImpl) producer; +HAVE_PENDING_READ_UPDATER.set(this, FALSE); +// Trigger a new read. +log.info("[{}] Created replicator producer, Replicator state: {}", replicatorId, state); backOff.reset(); -// activate cursor: so, entries can be cached +// activate cursor: so, entries can be cached. this.cursor.setActive(); // read entries readMoreEntries(); } else { -log.info( -"[{}] Replicator was stopped while creating the producer." -+ " Closing it. Replicator state: {}", -replicatorId, STATE_UPDATER.get(this)); -STATE_UPDATER.set(this, State.Stopping); -closeProducerAsync(); +if (changeStateRes.getRight() == Started) { +// Since only one task can call "producerBuilder.createAsync()", this scenario is not expected. +// So print a warn log. +log.warn("[{}] Replicator was already started by another thread while creating the producer." ++ " Closing the producer newly created. Replicator state: {}", replicatorId, state); +} else if (changeStateRes.getRight() == Terminating || changeStateRes.getRight() == Terminated) { +log.info("[{}] Replicator was terminated, so close the producer. Replicator state: {}", Review Comment: > This is a little confusing, if the replicator was terminating or terminated, do we need to close the producer again? Yes, it is needed. Because the variable `producer` here is not the variable `this.producer`, it is a new object, we need to close it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
poorbarcode commented on code in PR #21946: URL: https://github.com/apache/pulsar/pull/21946#discussion_r1564800410 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java: ## @@ -188,58 +270,135 @@ protected CompletableFuture isLocalTopicActive() { }, brokerService.executor()); } -protected synchronized CompletableFuture closeProducerAsync() { -if (producer == null) { -STATE_UPDATER.set(this, State.Stopped); +/** + * This method only be used by {@link PersistentTopic#checkGC} now. + */ +public CompletableFuture disconnect(boolean failIfHasBacklog, boolean closeTheStartingProducer) { +long backlog = getNumberOfEntriesInBacklog(); +if (failIfHasBacklog && backlog > 0) { +CompletableFuture disconnectFuture = new CompletableFuture<>(); +disconnectFuture.completeExceptionally(new TopicBusyException("Cannot close a replicator with backlog")); +if (log.isDebugEnabled()) { +log.debug("[{}] Replicator disconnect failed since topic has backlog", replicatorId); +} +return disconnectFuture; +} +log.info("[{}] Disconnect replicator at position {} with backlog {}", replicatorId, +getReplicatorReadPosition(), backlog); +return closeProducerAsync(closeTheStartingProducer); +} + +/** + * This method only be used by {@link PersistentTopic#checkGC} now. + */ +protected CompletableFuture closeProducerAsync(boolean closeTheStartingProducer) { +Pair setDisconnectingRes = compareSetAndGetState(State.Started, State.Disconnecting); +if (!setDisconnectingRes.getLeft()) { +if (setDisconnectingRes.getRight() == State.Starting) { +if (closeTheStartingProducer) { +/** + * Delay retry(wait for the start producer task is finish). + * Note: If the producer always start fail, the start producer task will always retry until the + * state changed to {@link State.Terminated}. + * Nit: The better solution is creating a {@link CompletableFuture} to trace the in-progress + * creation and call "inProgressCreationFuture.thenApply(closeProducer())". + */ +long waitTimeMs = backOff.next(); +brokerService.executor().schedule(() -> closeProducerAsync(true), +waitTimeMs, TimeUnit.MILLISECONDS); +} else { +log.info("[{}] Skip current producer closing since the previous producer has been closed," ++ " and trying start a new one, state : {}", +replicatorId, setDisconnectingRes.getRight()); +} +} else if (setDisconnectingRes.getRight() == State.Disconnected +|| setDisconnectingRes.getRight() == State.Disconnecting) { +log.info("[{}] Skip current producer closing since other thread did closing, state : {}", +replicatorId, setDisconnectingRes.getRight()); +} else if (setDisconnectingRes.getRight() == State.Terminating +|| setDisconnectingRes.getRight() == State.Terminated) { +log.info("[{}] Skip current producer closing since other thread is doing termination, state : {}", +replicatorId, state); +} +log.info("[{}] Skip current termination since other thread is doing close producer or termination," ++ " state : {}", replicatorId, state); return CompletableFuture.completedFuture(null); } -CompletableFuture future = producer.closeAsync(); + +// Close producer and update state. +return doCloseProducerAsync(producer, () -> { +Pair setDisconnectedRes = compareSetAndGetState(State.Disconnecting, State.Disconnected); +if (setDisconnectedRes.getLeft()) { +this.producer = null; +// deactivate further read +disableReplicatorRead(); +return; +} +if (setDisconnectedRes.getRight() == State.Terminating +|| setDisconnectingRes.getRight() == State.Terminated) { +log.info("[{}] Skip setting state to terminated because it was terminated, state : {}", +replicatorId, state); +} else { +// Since only one task can call "doCloseProducerAsync(producer, action)", this scenario is not expected. +// So print a warn log. +log.warn("[{}] Other task has change the state to terminated. so skipped current one task." ++ " State is : {}", +
Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
poorbarcode commented on code in PR #21946: URL: https://github.com/apache/pulsar/pull/21946#discussion_r1564796734 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java: ## @@ -96,83 +123,138 @@ public AbstractReplicator(String localCluster, Topic localTopic, String remoteCl .sendTimeout(0, TimeUnit.SECONDS) // .maxPendingMessages(producerQueueSize) // .producerName(getProducerName()); -STATE_UPDATER.set(this, State.Stopped); +STATE_UPDATER.set(this, State.Disconnected); } protected abstract String getProducerName(); -protected abstract void readEntries(org.apache.pulsar.client.api.Producer producer); +protected abstract void setProducerAndTriggerReadEntries(org.apache.pulsar.client.api.Producer producer); protected abstract Position getReplicatorReadPosition(); -protected abstract long getNumberOfEntriesInBacklog(); +public abstract long getNumberOfEntriesInBacklog(); protected abstract void disableReplicatorRead(); public String getRemoteCluster() { return remoteCluster; } -// This method needs to be synchronized with disconnects else if there is a disconnect followed by startProducer -// the end result can be disconnect. -public synchronized void startProducer() { -if (STATE_UPDATER.get(this) == State.Stopping) { -long waitTimeMs = backOff.next(); -if (log.isDebugEnabled()) { -log.debug( -"[{}] waiting for producer to close before attempting to reconnect, retrying in {} s", -replicatorId, waitTimeMs / 1000.0); -} -// BackOff before retrying - brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, waitTimeMs, -TimeUnit.MILLISECONDS); -return; -} -State state = STATE_UPDATER.get(this); -if (!STATE_UPDATER.compareAndSet(this, State.Stopped, State.Starting)) { -if (state == State.Started) { -// Already running +public void startProducer() { +// Guarantee only one task call "producerBuilder.createAsync()". +Pair setStartingRes = compareSetAndGetState(State.Disconnected, State.Starting); +if (!setStartingRes.getLeft()) { +if (setStartingRes.getRight() == State.Starting) { +log.info("[{}] Skip the producer creation since other thread is doing starting, state : {}", +replicatorId, state); +} else if (setStartingRes.getRight() == State.Started) { +// Since the method "startProducer" will be called even if it is started, only print debug-level log. +if (log.isDebugEnabled()) { +log.debug("[{}] Replicator was already running. state: {}", replicatorId, state); +} +} else if (setStartingRes.getRight() == State.Disconnecting) { if (log.isDebugEnabled()) { -log.debug("[{}] Replicator was already running", replicatorId); +log.debug("[{}] Rep.producer is closing, delay to retry(wait the producer close success)." ++ " state: {}", replicatorId, state); } +delayStartProducerAfterDisconnected(); } else { -log.info("[{}] Replicator already being started. Replicator state: {}", replicatorId, state); +/** {@link State.Terminating}, {@link State.Terminated}. **/ +log.info("[{}] Skip the producer creation since the replicator state is : {}", replicatorId, state); } - return; } log.info("[{}] Starting replicator", replicatorId); producerBuilder.createAsync().thenAccept(producer -> { -readEntries(producer); +setProducerAndTriggerReadEntries(producer); }).exceptionally(ex -> { -if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) { +Pair setDisconnectedRes = compareSetAndGetState(State.Starting, State.Disconnected); +if (setDisconnectedRes.getLeft()) { long waitTimeMs = backOff.next(); log.warn("[{}] Failed to create remote producer ({}), retrying in {} s", replicatorId, ex.getMessage(), waitTimeMs / 1000.0); - // BackOff before retrying - brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, waitTimeMs, -TimeUnit.MILLISECONDS); +scheduleCheckTopicActiveAndStartProducer(waitTimeMs); } else { -log.warn("[{}] Failed to create remote producer. Replicator state: {}", replicatorId, -
Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
poorbarcode commented on code in PR #21946: URL: https://github.com/apache/pulsar/pull/21946#discussion_r1564796010 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java: ## @@ -96,83 +123,138 @@ public AbstractReplicator(String localCluster, Topic localTopic, String remoteCl .sendTimeout(0, TimeUnit.SECONDS) // .maxPendingMessages(producerQueueSize) // .producerName(getProducerName()); -STATE_UPDATER.set(this, State.Stopped); +STATE_UPDATER.set(this, State.Disconnected); } protected abstract String getProducerName(); -protected abstract void readEntries(org.apache.pulsar.client.api.Producer producer); +protected abstract void setProducerAndTriggerReadEntries(org.apache.pulsar.client.api.Producer producer); protected abstract Position getReplicatorReadPosition(); -protected abstract long getNumberOfEntriesInBacklog(); +public abstract long getNumberOfEntriesInBacklog(); protected abstract void disableReplicatorRead(); public String getRemoteCluster() { return remoteCluster; } -// This method needs to be synchronized with disconnects else if there is a disconnect followed by startProducer -// the end result can be disconnect. -public synchronized void startProducer() { -if (STATE_UPDATER.get(this) == State.Stopping) { -long waitTimeMs = backOff.next(); -if (log.isDebugEnabled()) { -log.debug( -"[{}] waiting for producer to close before attempting to reconnect, retrying in {} s", -replicatorId, waitTimeMs / 1000.0); -} -// BackOff before retrying - brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, waitTimeMs, -TimeUnit.MILLISECONDS); -return; -} -State state = STATE_UPDATER.get(this); -if (!STATE_UPDATER.compareAndSet(this, State.Stopped, State.Starting)) { -if (state == State.Started) { -// Already running +public void startProducer() { +// Guarantee only one task call "producerBuilder.createAsync()". +Pair setStartingRes = compareSetAndGetState(State.Disconnected, State.Starting); +if (!setStartingRes.getLeft()) { +if (setStartingRes.getRight() == State.Starting) { +log.info("[{}] Skip the producer creation since other thread is doing starting, state : {}", +replicatorId, state); +} else if (setStartingRes.getRight() == State.Started) { +// Since the method "startProducer" will be called even if it is started, only print debug-level log. +if (log.isDebugEnabled()) { +log.debug("[{}] Replicator was already running. state: {}", replicatorId, state); +} +} else if (setStartingRes.getRight() == State.Disconnecting) { if (log.isDebugEnabled()) { -log.debug("[{}] Replicator was already running", replicatorId); +log.debug("[{}] Rep.producer is closing, delay to retry(wait the producer close success)." ++ " state: {}", replicatorId, state); } +delayStartProducerAfterDisconnected(); } else { -log.info("[{}] Replicator already being started. Replicator state: {}", replicatorId, state); +/** {@link State.Terminating}, {@link State.Terminated}. **/ +log.info("[{}] Skip the producer creation since the replicator state is : {}", replicatorId, state); } - return; } log.info("[{}] Starting replicator", replicatorId); producerBuilder.createAsync().thenAccept(producer -> { -readEntries(producer); +setProducerAndTriggerReadEntries(producer); }).exceptionally(ex -> { -if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) { +Pair setDisconnectedRes = compareSetAndGetState(State.Starting, State.Disconnected); +if (setDisconnectedRes.getLeft()) { long waitTimeMs = backOff.next(); log.warn("[{}] Failed to create remote producer ({}), retrying in {} s", replicatorId, ex.getMessage(), waitTimeMs / 1000.0); - // BackOff before retrying - brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, waitTimeMs, -TimeUnit.MILLISECONDS); +scheduleCheckTopicActiveAndStartProducer(waitTimeMs); } else { -log.warn("[{}] Failed to create remote producer. Replicator state: {}", replicatorId, -
(pulsar) branch master updated: [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 837f8bca7dd [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479) 837f8bca7dd is described below commit 837f8bca7ddbbad4354f9a89e36fcd6aea1be85c Author: fengyubiao AuthorDate: Mon Apr 15 00:13:49 2024 +0800 [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479) --- .../pulsar/broker/service/BrokerService.java | 4 +- .../service/persistent/MessageDeduplication.java | 18 ++- .../broker/service/persistent/PersistentTopic.java | 2 +- .../DeduplicationDisabledBrokerLevelTest.java | 161 + 4 files changed, 177 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index b4d0f38b4a4..2687532693a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -631,8 +631,10 @@ public class BrokerService implements Closeable { } protected void startDeduplicationSnapshotMonitor() { +// We do not know whether users will enable deduplication on namespace level/topic level or not, so keep this +// scheduled task runs. int interval = pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds(); -if (interval > 0 && pulsar().getConfiguration().isBrokerDeduplicationEnabled()) { +if (interval > 0) { this.deduplicationSnapshotMonitor = OrderedScheduler.newSchedulerBuilder() .name("deduplication-snapshot-monitor") .numThreads(1) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 802dd917961..e508661364d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -157,9 +157,14 @@ public class MessageDeduplication { // Replay all the entries and apply all the sequence ids updates log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries()); -CompletableFuture future = new CompletableFuture<>(); +CompletableFuture future = new CompletableFuture<>(); replayCursor(future); -return future; +return future.thenAccept(lastPosition -> { +if (lastPosition != null && snapshotCounter >= snapshotInterval) { +snapshotCounter = 0; +takeSnapshot(lastPosition); +} +}); } /** @@ -168,11 +173,11 @@ public class MessageDeduplication { * * @param future future to trigger when the replay is complete */ -private void replayCursor(CompletableFuture future) { +private void replayCursor(CompletableFuture future) { managedCursor.asyncReadEntries(100, new ReadEntriesCallback() { @Override public void readEntriesComplete(List entries, Object ctx) { - +Position lastPosition = null; for (Entry entry : entries) { ByteBuf messageMetadataAndPayload = entry.getDataBuffer(); MessageMetadata md = Commands.parseMessageMetadata(messageMetadataAndPayload); @@ -182,7 +187,8 @@ public class MessageDeduplication { highestSequencedPushed.put(producerName, sequenceId); highestSequencedPersisted.put(producerName, sequenceId); producerRemoved(producerName); - +snapshotCounter++; +lastPosition = entry.getPosition(); entry.release(); } @@ -191,7 +197,7 @@ public class MessageDeduplication { pulsar.getExecutor().execute(() -> replayCursor(future)); } else { // Done replaying -future.complete(null); +future.complete(lastPosition); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 3c9ab04d79a..e4441969101 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++
Re: [PR] [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout [pulsar]
poorbarcode merged PR #22479: URL: https://github.com/apache/pulsar/pull/22479 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug] [branch-3.0] The test SimpleProducerConsumerTest.testAccessAvroSchemaMetadata always fails [pulsar]
Denovo1998 commented on issue #22470: URL: https://github.com/apache/pulsar/issues/22470#issuecomment-2054102214 @poorbarcode @Technoboy- PTAL! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix][test] SchemaMap in AutoConsumeSchema has been reused [pulsar]
Denovo1998 opened a new pull request, #22500: URL: https://github.com/apache/pulsar/pull/22500 Fixes #22470 ### Motivation Related to #17887. After executing the test method once does not clean up the pulsarClient, causing the AutoConsumeSchema to be reused. When the schemaMap in the first AutoConsumeSchema is set to AvroSchema. https://github.com/apache/pulsar/blob/982d94d15465ff5fcea192e9a8b6b25d2d1c55b6/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java#L149-L169 https://github.com/apache/pulsar/blob/982d94d15465ff5fcea192e9a8b6b25d2d1c55b6/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java#L202-L206 The second execution of the test method will also use the AvroSchema instead of the JsonSchema. Because the schema will not be fetched again. https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java#L130-L136 https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java#L328-L338 ### Modifications 1. Closes the `pulsarClient` after throwing an exception. 2. Determine whether the `pulsarClient` is null before starting, and create a new one if it is null. ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) 01/01: fix unit test.
This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch remove_failed_sub in repository https://gitbox.apache.org/repos/asf/pulsar.git commit c5ffbf9111e11ed2c98b6268b60bfb354117339b Author: Baodi Shi AuthorDate: Sun Apr 14 20:52:01 2024 +0800 fix unit test. --- .../test/java/org/apache/pulsar/broker/transaction/TransactionTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index ddfa82f5288..e45924e8bb4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1517,7 +1517,7 @@ public class TransactionTest extends TransactionTestBase { fail("Expect failure by PendingAckHandle closed, but success"); } catch (ExecutionException executionException){ Throwable t = executionException.getCause(); -Assert.assertTrue(t instanceof BrokerServiceException.ServiceUnitNotReadyException); +Assert.assertTrue(t instanceof BrokerServiceException); } }
(pulsar) branch remove_failed_sub deleted (was c5ffbf9111e)
This is an automated email from the ASF dual-hosted git repository. baodi pushed a change to branch remove_failed_sub in repository https://gitbox.apache.org/repos/asf/pulsar.git was c5ffbf9111e fix unit test. This change permanently discards the following revisions: discard c5ffbf9111e fix unit test.
(pulsar) branch remove_failed_sub created (now c5ffbf9111e)
This is an automated email from the ASF dual-hosted git repository. baodi pushed a change to branch remove_failed_sub in repository https://gitbox.apache.org/repos/asf/pulsar.git at c5ffbf9111e fix unit test. This branch includes the following new commits: new c5ffbf9111e fix unit test. The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.