(pulsar) branch master updated: [fix][test] Flaky-test: testMessageExpiryWithTimestampNonRecoverableException and testIncorrectClientClock (#22489)

2024-04-14 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new d9a43dd2160 [fix][test] Flaky-test: 
testMessageExpiryWithTimestampNonRecoverableException and 
testIncorrectClientClock (#22489)
d9a43dd2160 is described below

commit d9a43dd21605930e16bb038095e36fceff3a4a40
Author: Baodi Shi 
AuthorDate: Mon Apr 15 13:55:34 2024 +0800

[fix][test] Flaky-test: 
testMessageExpiryWithTimestampNonRecoverableException and 
testIncorrectClientClock (#22489)
---
 .../service/PersistentMessageFinderTest.java   | 42 +++---
 1 file changed, 22 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 6965ac28068..0972c9098b5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.service;
 
-import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doAnswer;
@@ -383,7 +382,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
  *
  * @throws Exception
  */
-@Test(groups = "flaky")
+@Test
 void testMessageExpiryWithTimestampNonRecoverableException() throws 
Exception {
 
 final String ledgerAndCursorName = 
"testPersistentMessageExpiryWithNonRecoverableLedgers";
@@ -402,11 +401,15 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
 for (int i = 0; i < totalEntries; i++) {
 ledger.addEntry(createMessageWrittenToLedger("msg" + i));
 }
+Awaitility.await().untilAsserted(() ->
+assertEquals(ledger.getState(), 
ManagedLedgerImpl.State.LedgerOpened));
 
 List ledgers = ledger.getLedgersInfoAsList();
 LedgerInfo lastLedgerInfo = ledgers.get(ledgers.size() - 1);
-
-assertEquals(ledgers.size(), totalEntries / entriesPerLedger);
+// The `lastLedgerInfo` should be newly opened, and it does not 
contain any entries. 
+// Please refer to: https://github.com/apache/pulsar/pull/22034
+assertEquals(lastLedgerInfo.getEntries(), 0);
+assertEquals(ledgers.size(), totalEntries / entriesPerLedger + 1);
 
 // this will make sure that all entries should be deleted
 Thread.sleep(TimeUnit.SECONDS.toMillis(ttlSeconds));
@@ -420,19 +423,13 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
 when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
 
 PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
-Position previousMarkDelete = null;
-for (int i = 0; i < totalEntries; i++) {
-monitor.expireMessages(1);
-Position previousPos = previousMarkDelete;
-retryStrategically(
-(test) -> c1.getMarkDeletedPosition() != null && 
!c1.getMarkDeletedPosition().equals(previousPos),
-5, 100);
-previousMarkDelete = c1.getMarkDeletedPosition();
-}
-
-PositionImpl markDeletePosition = (PositionImpl) 
c1.getMarkDeletedPosition();
-assertEquals(lastLedgerInfo.getLedgerId(), 
markDeletePosition.getLedgerId());
-assertEquals(lastLedgerInfo.getEntries() - 1, 
markDeletePosition.getEntryId());
+assertTrue(monitor.expireMessages(ttlSeconds));
+Awaitility.await().untilAsserted(() -> {
+PositionImpl markDeletePosition = (PositionImpl) 
c1.getMarkDeletedPosition();
+// The markDeletePosition points to the last entry of the previous 
ledger in lastLedgerInfo.
+assertEquals(markDeletePosition.getLedgerId(), 
lastLedgerInfo.getLedgerId() - 1);
+assertEquals(markDeletePosition.getEntryId(), entriesPerLedger - 
1); 
+});
 
 c1.close();
 ledger.close();
@@ -440,20 +437,25 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
 
 }
 
-@Test(groups = "flaky")
+@Test
 public void testIncorrectClientClock() throws Exception {
 final String ledgerAndCursorName = "testIncorrectClientClock";
 int maxTTLSeconds = 1;
+int entriesNum = 10;
 ManagedLedgerConfig config = new ManagedLedgerConfig();
 config.setMaxEntriesPerLedger(1);
 ManagedLedgerImpl ledger = (ManagedLedgerImpl) 

Re: [PR] [fix][test] Flaky-test: testMessageExpiryWithTimestampNonRecoverableException and testIncorrectClientClock [pulsar]

2024-04-14 Thread via GitHub


Technoboy- merged PR #22489:
URL: https://github.com/apache/pulsar/pull/22489


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [fix][broker] Fix the rate limit policy loading for replicator after broker restart [pulsar]

2024-04-14 Thread via GitHub


nodece opened a new pull request, #22504:
URL: https://github.com/apache/pulsar/pull/22504

   ### Motivation
   
   Initializing a topic, if a replicator's cursor exists, the topic will create 
the replicator, and then load the namespace and topic policy, when the policies 
are loaded, we should apply that to the replicator.
   
   The namespace policies will not be applied to the replicator after the 
broker restart when topic policies are empty, which causes an excess of the 
total traffic.
   
   ### Modifications
   
   - Call 
`org.apache.pulsar.broker.service.persistent.PersistentTopic#onPoliciesUpdate` 
after the namespace policy is loaded
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [ml] Add entry fail due to race condition about add entry failed/timeout and switch ledger [pulsar]

2024-04-14 Thread via GitHub


lhotari commented on code in PR #1:
URL: https://github.com/apache/pulsar/pull/1#discussion_r1565191680


##
pulsar-common/src/main/java/org/apache/pulsar/common/mutable/AtomicMutableBoolean.java:
##


Review Comment:
   Oh yes, there's another discussion about the same matter: 
https://github.com/apache/pulsar/pull/1/files/d881f5416b1b5ce2863238d74ace38889bb2f134#r1531505030
 . Using Optional instead of `null`. Yes that works too. However in the Pulsar 
code style of minimizing allocations, we usually omit the usage of Optional and 
use `null`s as empty values. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [ml] Add entry fail due to race condition about add entry failed/timeout and switch ledger [pulsar]

2024-04-14 Thread via GitHub


lhotari commented on code in PR #1:
URL: https://github.com/apache/pulsar/pull/1#discussion_r1565190547


##
pulsar-common/src/main/java/org/apache/pulsar/common/mutable/AtomicMutableBoolean.java:
##


Review Comment:
   Now I noticed the comment about the ledger switch, requiring a new instance. 
 https://github.com/apache/pulsar/pull/1#discussion_r1531504225 . I guess 
it might not be possible to get rid of `volatile` for the field. However, also 
in that case, I would recommend using `AtomicBoolean` instead of adding this 
`AtomicMutableBoolean` class. It seems that a `null` value can be passed when 
there's no timeout in use? Wasn't the no-timeout case the reason to use 
MutableBoolean in the first place?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [ml] Add entry fail due to race condition about add entry failed/timeout and switch ledger [pulsar]

2024-04-14 Thread via GitHub


lhotari commented on code in PR #1:
URL: https://github.com/apache/pulsar/pull/1#discussion_r1565183246


##
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java:
##
@@ -375,6 +398,13 @@ protected OpAddEntry newObject(Recycler.Handle 
recyclerHandle) {
 }
 };
 
+public void recycleAfterClosed() {
+if (STATE_UPDATER.get(this) != State.CLOSED) {
+recycle();
+}
+closeFuture.thenAccept(ignore -> recycle());
+}

Review Comment:
   I think that there might be a better way to address this using reference 
counting. Another alternative is to skip recycling in cases where there's a 
potential issue. Recycling is not mandatory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [ml] Add entry fail due to race condition about add entry failed/timeout and switch ledger [pulsar]

2024-04-14 Thread via GitHub


lhotari commented on code in PR #1:
URL: https://github.com/apache/pulsar/pull/1#discussion_r1565183550


##
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java:
##
@@ -176,7 +186,7 @@ public void addComplete(int rc, final LedgerHandle lh, long 
entryId, Object ctx)
 if (!STATE_UPDATER.compareAndSet(OpAddEntry.this, State.INITIATED, 
State.COMPLETED)) {
 log.warn("[{}] The add op is terminal legacy callback for entry 
{}-{} adding.", ml.getName(), lh.getId(),
 entryId);
-OpAddEntry.this.recycle();
+OpAddEntry.this.recycleAfterClosed();

Review Comment:
   Instead of adding `recycleAfterClosed`, let's just skip recycling in cases 
that cause problems. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [ml] Add entry fail due to race condition about add entry failed/timeout and switch ledger [pulsar]

2024-04-14 Thread via GitHub


lhotari commented on code in PR #1:
URL: https://github.com/apache/pulsar/pull/1#discussion_r1565180673


##
pulsar-common/src/main/java/org/apache/pulsar/common/mutable/AtomicMutableBoolean.java:
##


Review Comment:
   > The value of Java's AtomicBoolean is modifier by volatile, it will reduce 
the performance.
   
   @poorbarcode Have you noticed that the field `currentLedgerTimeoutTriggered` 
itself is a `volatile`? `private volatile MutableBoolean 
currentLedgerTimeoutTriggered;`. That will "reduce performance" each time the 
field is read. Using `private final AtomicBoolean currentLedgerTimeoutTriggered 
= new AtomicBoolean(false);` would prevent that. Therefore I think that we 
shouldn't add the `AtomicMutableBoolean` class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [pulsar] pulsar standalone to have embedded zookeeper [pulsar]

2024-04-14 Thread via GitHub


nareshv closed issue #22502: [pulsar] pulsar standalone to have embedded 
zookeeper
URL: https://github.com/apache/pulsar/issues/22502


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] [pulsar] pulsar standalone to have embedded zookeeper [pulsar]

2024-04-14 Thread via GitHub


nareshv opened a new issue, #22502:
URL: https://github.com/apache/pulsar/issues/22502

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Motivation
   
   We've been evaluating pulsar standalone for edge/low-footprint use cases and 
seem to work pretty well. As mentioned in 
https://github.com/apache/pulsar/discussions/22438#discussioncomment-9018919 we 
would like to make the pulsar standalone robust with an embedded zookeeper.
   
   ### Solution
   
   `PulsarStandalone.java` supports multiple command line parameters. Adding 
`--run-zookeeper` flag to start the zookeeper on the same JVM
   
   ### Alternatives
   
   Running zookeeper as a separate process.
   --
   We have to rule this option due to large fleet management needs. A single 
JVM management with (Pulsar + Zookeeper + Bookeeper) is much more deployment 
friendly on tiny devices than multiple processes.
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) 01/02: Revert "[fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)"

2024-04-14 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f91b519f56622602dd2bd18ec3fa55748debd27f
Author: fengyubiao 
AuthorDate: Mon Apr 15 12:07:48 2024 +0800

Revert "[fix] [broker] Prevent long deduplication cursor backlog so that 
topic loading wouldn't timeout (#22479)"

This reverts commit 7cec82f99c2c729e0e9ac42f7176eadf4039ae1a.
---
 .../pulsar/broker/service/BrokerService.java   |   4 +-
 .../service/persistent/MessageDeduplication.java   |  18 +--
 .../broker/service/persistent/PersistentTopic.java |   2 +-
 .../DeduplicationDisabledBrokerLevelTest.java  | 161 -
 4 files changed, 8 insertions(+), 177 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index f19b3436f7b..d798d9e672e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -576,10 +576,8 @@ public class BrokerService implements Closeable {
 }
 
 protected void startDeduplicationSnapshotMonitor() {
-// We do not know whether users will enable deduplication on namespace 
level/topic level or not, so keep this
-// scheduled task runs.
 int interval = 
pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds();
-if (interval > 0) {
+if (interval > 0 && 
pulsar().getConfiguration().isBrokerDeduplicationEnabled()) {
 this.deduplicationSnapshotMonitor = 
OrderedScheduler.newSchedulerBuilder()
 .name("deduplication-snapshot-monitor")
 .numThreads(1)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index e508661364d..802dd917961 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -157,14 +157,9 @@ public class MessageDeduplication {
 
 // Replay all the entries and apply all the sequence ids updates
 log.info("[{}] Replaying {} entries for deduplication", 
topic.getName(), managedCursor.getNumberOfEntries());
-CompletableFuture future = new CompletableFuture<>();
+CompletableFuture future = new CompletableFuture<>();
 replayCursor(future);
-return future.thenAccept(lastPosition -> {
-if (lastPosition != null && snapshotCounter >= snapshotInterval) {
-snapshotCounter = 0;
-takeSnapshot(lastPosition);
-}
-});
+return future;
 }
 
 /**
@@ -173,11 +168,11 @@ public class MessageDeduplication {
  *
  * @param future future to trigger when the replay is complete
  */
-private void replayCursor(CompletableFuture future) {
+private void replayCursor(CompletableFuture future) {
 managedCursor.asyncReadEntries(100, new ReadEntriesCallback() {
 @Override
 public void readEntriesComplete(List entries, Object ctx) {
-Position lastPosition = null;
+
 for (Entry entry : entries) {
 ByteBuf messageMetadataAndPayload = entry.getDataBuffer();
 MessageMetadata md = 
Commands.parseMessageMetadata(messageMetadataAndPayload);
@@ -187,8 +182,7 @@ public class MessageDeduplication {
 highestSequencedPushed.put(producerName, sequenceId);
 highestSequencedPersisted.put(producerName, sequenceId);
 producerRemoved(producerName);
-snapshotCounter++;
-lastPosition = entry.getPosition();
+
 entry.release();
 }
 
@@ -197,7 +191,7 @@ public class MessageDeduplication {
 pulsar.getExecutor().execute(() -> replayCursor(future));
 } else {
 // Done replaying
-future.complete(lastPosition);
+future.complete(null);
 }
 }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 29df6e78cd1..be1b873cc21 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -204,7 +204,7 @@ public class PersistentTopic 

(pulsar) 02/02: [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)

2024-04-14 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f5c853a4e8a8d1ebce3166aff12715a068cd03cb
Author: fengyubiao 
AuthorDate: Mon Apr 15 12:11:04 2024 +0800

[fix] [broker] Prevent long deduplication cursor backlog so that topic 
loading wouldn't timeout (#22479)

(cherry picked from commit 837f8bca7ddbbad4354f9a89e36fcd6aea1be85c)
---
 .../pulsar/broker/service/BrokerService.java   |   4 +-
 .../service/persistent/MessageDeduplication.java   |  18 ++-
 .../broker/service/persistent/PersistentTopic.java |   2 +-
 .../DeduplicationDisabledBrokerLevelTest.java  | 163 +
 4 files changed, 179 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index d798d9e672e..f19b3436f7b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -576,8 +576,10 @@ public class BrokerService implements Closeable {
 }
 
 protected void startDeduplicationSnapshotMonitor() {
+// We do not know whether users will enable deduplication on namespace 
level/topic level or not, so keep this
+// scheduled task runs.
 int interval = 
pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds();
-if (interval > 0 && 
pulsar().getConfiguration().isBrokerDeduplicationEnabled()) {
+if (interval > 0) {
 this.deduplicationSnapshotMonitor = 
OrderedScheduler.newSchedulerBuilder()
 .name("deduplication-snapshot-monitor")
 .numThreads(1)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 802dd917961..e508661364d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -157,9 +157,14 @@ public class MessageDeduplication {
 
 // Replay all the entries and apply all the sequence ids updates
 log.info("[{}] Replaying {} entries for deduplication", 
topic.getName(), managedCursor.getNumberOfEntries());
-CompletableFuture future = new CompletableFuture<>();
+CompletableFuture future = new CompletableFuture<>();
 replayCursor(future);
-return future;
+return future.thenAccept(lastPosition -> {
+if (lastPosition != null && snapshotCounter >= snapshotInterval) {
+snapshotCounter = 0;
+takeSnapshot(lastPosition);
+}
+});
 }
 
 /**
@@ -168,11 +173,11 @@ public class MessageDeduplication {
  *
  * @param future future to trigger when the replay is complete
  */
-private void replayCursor(CompletableFuture future) {
+private void replayCursor(CompletableFuture future) {
 managedCursor.asyncReadEntries(100, new ReadEntriesCallback() {
 @Override
 public void readEntriesComplete(List entries, Object ctx) {
-
+Position lastPosition = null;
 for (Entry entry : entries) {
 ByteBuf messageMetadataAndPayload = entry.getDataBuffer();
 MessageMetadata md = 
Commands.parseMessageMetadata(messageMetadataAndPayload);
@@ -182,7 +187,8 @@ public class MessageDeduplication {
 highestSequencedPushed.put(producerName, sequenceId);
 highestSequencedPersisted.put(producerName, sequenceId);
 producerRemoved(producerName);
-
+snapshotCounter++;
+lastPosition = entry.getPosition();
 entry.release();
 }
 
@@ -191,7 +197,7 @@ public class MessageDeduplication {
 pulsar.getExecutor().execute(() -> replayCursor(future));
 } else {
 // Done replaying
-future.complete(null);
+future.complete(lastPosition);
 }
 }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index be1b873cc21..29df6e78cd1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -204,7 +204,7 @@ public class PersistentTopic 

(pulsar) branch branch-3.2 updated (7cec82f99c2 -> f5c853a4e8a)

2024-04-14 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a change to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 7cec82f99c2 [fix] [broker] Prevent long deduplication cursor backlog 
so that topic loading wouldn't timeout (#22479)
 new f91b519f566 Revert "[fix] [broker] Prevent long deduplication cursor 
backlog so that topic loading wouldn't timeout (#22479)"
 new f5c853a4e8a [fix] [broker] Prevent long deduplication cursor backlog 
so that topic loading wouldn't timeout (#22479)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)



(pulsar) branch branch-2.10 updated: [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)

2024-04-14 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
 new 68554056bbc [fix] [broker] Prevent long deduplication cursor backlog 
so that topic loading wouldn't timeout (#22479)
68554056bbc is described below

commit 68554056bbc4a3095446430255e937ce88f48088
Author: fengyubiao 
AuthorDate: Mon Apr 15 00:13:49 2024 +0800

[fix] [broker] Prevent long deduplication cursor backlog so that topic 
loading wouldn't timeout (#22479)

(cherry picked from commit 837f8bca7ddbbad4354f9a89e36fcd6aea1be85c)
---
 .../pulsar/broker/service/BrokerService.java   |   4 +-
 .../service/persistent/MessageDeduplication.java   |  18 ++-
 .../broker/service/persistent/PersistentTopic.java |   2 +-
 .../DeduplicationDisabledBrokerLevelTest.java  | 165 +
 4 files changed, 181 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 48b859bd11b..c53085347db 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -533,8 +533,10 @@ public class BrokerService implements Closeable {
 }
 
 protected void startDeduplicationSnapshotMonitor() {
+// We do not know whether users will enable deduplication on namespace 
level/topic level or not, so keep this
+// scheduled task runs.
 int interval = 
pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds();
-if (interval > 0 && 
pulsar().getConfiguration().isBrokerDeduplicationEnabled()) {
+if (interval > 0) {
 this.deduplicationSnapshotMonitor =
 Executors.newSingleThreadScheduledExecutor(new 
ExecutorProvider.ExtendedThreadFactory(
 "deduplication-snapshot-monitor"));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 4c3d8e1a467..c3299f23f9f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -153,9 +153,14 @@ public class MessageDeduplication {
 
 // Replay all the entries and apply all the sequence ids updates
 log.info("[{}] Replaying {} entries for deduplication", 
topic.getName(), managedCursor.getNumberOfEntries());
-CompletableFuture future = new CompletableFuture<>();
+CompletableFuture future = new CompletableFuture<>();
 replayCursor(future);
-return future;
+return future.thenAccept(lastPosition -> {
+if (lastPosition != null && snapshotCounter >= snapshotInterval) {
+snapshotCounter = 0;
+takeSnapshot(lastPosition);
+}
+});
 }
 
 /**
@@ -164,11 +169,11 @@ public class MessageDeduplication {
  *
  * @param future future to trigger when the replay is complete
  */
-private void replayCursor(CompletableFuture future) {
+private void replayCursor(CompletableFuture future) {
 managedCursor.asyncReadEntries(100, new ReadEntriesCallback() {
 @Override
 public void readEntriesComplete(List entries, Object ctx) {
-
+Position lastPosition = null;
 for (Entry entry : entries) {
 ByteBuf messageMetadataAndPayload = entry.getDataBuffer();
 MessageMetadata md = 
Commands.parseMessageMetadata(messageMetadataAndPayload);
@@ -178,7 +183,8 @@ public class MessageDeduplication {
 highestSequencedPushed.put(producerName, sequenceId);
 highestSequencedPersisted.put(producerName, sequenceId);
 producerRemoved(producerName);
-
+snapshotCounter++;
+lastPosition = entry.getPosition();
 entry.release();
 }
 
@@ -187,7 +193,7 @@ public class MessageDeduplication {
 pulsar.getExecutor().execute(() -> replayCursor(future));
 } else {
 // Done replaying
-future.complete(null);
+future.complete(lastPosition);
 }
 }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 

(pulsar) branch branch-2.11 updated: [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)

2024-04-14 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
 new 97518ee893c [fix] [broker] Prevent long deduplication cursor backlog 
so that topic loading wouldn't timeout (#22479)
97518ee893c is described below

commit 97518ee893c8c4db6b75db2e1440570e813b08f8
Author: fengyubiao 
AuthorDate: Mon Apr 15 00:13:49 2024 +0800

[fix] [broker] Prevent long deduplication cursor backlog so that topic 
loading wouldn't timeout (#22479)

(cherry picked from commit 837f8bca7ddbbad4354f9a89e36fcd6aea1be85c)
---
 .../pulsar/broker/service/BrokerService.java   |   4 +-
 .../service/persistent/MessageDeduplication.java   |  18 ++-
 .../broker/service/persistent/PersistentTopic.java |   2 +-
 .../DeduplicationDisabledBrokerLevelTest.java  | 163 +
 4 files changed, 179 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 3643903dd7c..b921c4b5dc6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -550,8 +550,10 @@ public class BrokerService implements Closeable {
 }
 
 protected void startDeduplicationSnapshotMonitor() {
+// We do not know whether users will enable deduplication on namespace 
level/topic level or not, so keep this
+// scheduled task runs.
 int interval = 
pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds();
-if (interval > 0 && 
pulsar().getConfiguration().isBrokerDeduplicationEnabled()) {
+if (interval > 0) {
 this.deduplicationSnapshotMonitor =
 Executors.newSingleThreadScheduledExecutor(new 
ExecutorProvider.ExtendedThreadFactory(
 "deduplication-snapshot-monitor"));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index ba2600d9134..235ed0fcaa8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -157,9 +157,14 @@ public class MessageDeduplication {
 
 // Replay all the entries and apply all the sequence ids updates
 log.info("[{}] Replaying {} entries for deduplication", 
topic.getName(), managedCursor.getNumberOfEntries());
-CompletableFuture future = new CompletableFuture<>();
+CompletableFuture future = new CompletableFuture<>();
 replayCursor(future);
-return future;
+return future.thenAccept(lastPosition -> {
+if (lastPosition != null && snapshotCounter >= snapshotInterval) {
+snapshotCounter = 0;
+takeSnapshot(lastPosition);
+}
+});
 }
 
 /**
@@ -168,11 +173,11 @@ public class MessageDeduplication {
  *
  * @param future future to trigger when the replay is complete
  */
-private void replayCursor(CompletableFuture future) {
+private void replayCursor(CompletableFuture future) {
 managedCursor.asyncReadEntries(100, new ReadEntriesCallback() {
 @Override
 public void readEntriesComplete(List entries, Object ctx) {
-
+Position lastPosition = null;
 for (Entry entry : entries) {
 ByteBuf messageMetadataAndPayload = entry.getDataBuffer();
 MessageMetadata md = 
Commands.parseMessageMetadata(messageMetadataAndPayload);
@@ -182,7 +187,8 @@ public class MessageDeduplication {
 highestSequencedPushed.put(producerName, sequenceId);
 highestSequencedPersisted.put(producerName, sequenceId);
 producerRemoved(producerName);
-
+snapshotCounter++;
+lastPosition = entry.getPosition();
 entry.release();
 }
 
@@ -191,7 +197,7 @@ public class MessageDeduplication {
 pulsar.getExecutor().execute(() -> replayCursor(future));
 } else {
 // Done replaying
-future.complete(null);
+future.complete(lastPosition);
 }
 }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 

(pulsar) branch branch-3.0 updated: [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)

2024-04-14 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 0fbcbb24039 [fix] [broker] Prevent long deduplication cursor backlog 
so that topic loading wouldn't timeout (#22479)
0fbcbb24039 is described below

commit 0fbcbb240394b241e940225c2067ccf1ac3d3b32
Author: fengyubiao 
AuthorDate: Mon Apr 15 10:59:47 2024 +0800

[fix] [broker] Prevent long deduplication cursor backlog so that topic 
loading wouldn't timeout (#22479)

(cherry picked from commit 837f8bca7ddbbad4354f9a89e36fcd6aea1be85c)
---
 .../pulsar/broker/service/BrokerService.java   |   4 +-
 .../service/persistent/MessageDeduplication.java   |  18 ++-
 .../broker/service/persistent/PersistentTopic.java |   2 +-
 .../DeduplicationDisabledBrokerLevelTest.java  | 163 +
 4 files changed, 179 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bfb289678d2..20de00370e1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -580,8 +580,10 @@ public class BrokerService implements Closeable {
 }
 
 protected void startDeduplicationSnapshotMonitor() {
+// We do not know whether users will enable deduplication on namespace 
level/topic level or not, so keep this
+// scheduled task runs.
 int interval = 
pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds();
-if (interval > 0 && 
pulsar().getConfiguration().isBrokerDeduplicationEnabled()) {
+if (interval > 0) {
 this.deduplicationSnapshotMonitor = 
OrderedScheduler.newSchedulerBuilder()
 .name("deduplication-snapshot-monitor")
 .numThreads(1)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 802dd917961..e508661364d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -157,9 +157,14 @@ public class MessageDeduplication {
 
 // Replay all the entries and apply all the sequence ids updates
 log.info("[{}] Replaying {} entries for deduplication", 
topic.getName(), managedCursor.getNumberOfEntries());
-CompletableFuture future = new CompletableFuture<>();
+CompletableFuture future = new CompletableFuture<>();
 replayCursor(future);
-return future;
+return future.thenAccept(lastPosition -> {
+if (lastPosition != null && snapshotCounter >= snapshotInterval) {
+snapshotCounter = 0;
+takeSnapshot(lastPosition);
+}
+});
 }
 
 /**
@@ -168,11 +173,11 @@ public class MessageDeduplication {
  *
  * @param future future to trigger when the replay is complete
  */
-private void replayCursor(CompletableFuture future) {
+private void replayCursor(CompletableFuture future) {
 managedCursor.asyncReadEntries(100, new ReadEntriesCallback() {
 @Override
 public void readEntriesComplete(List entries, Object ctx) {
-
+Position lastPosition = null;
 for (Entry entry : entries) {
 ByteBuf messageMetadataAndPayload = entry.getDataBuffer();
 MessageMetadata md = 
Commands.parseMessageMetadata(messageMetadataAndPayload);
@@ -182,7 +187,8 @@ public class MessageDeduplication {
 highestSequencedPushed.put(producerName, sequenceId);
 highestSequencedPersisted.put(producerName, sequenceId);
 producerRemoved(producerName);
-
+snapshotCounter++;
+lastPosition = entry.getPosition();
 entry.release();
 }
 
@@ -191,7 +197,7 @@ public class MessageDeduplication {
 pulsar.getExecutor().execute(() -> replayCursor(future));
 } else {
 // Done replaying
-future.complete(null);
+future.complete(lastPosition);
 }
 }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a6087490744..1363ca0945d 100644
--- 

(pulsar) branch branch-3.2 updated: [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)

2024-04-14 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 7cec82f99c2 [fix] [broker] Prevent long deduplication cursor backlog 
so that topic loading wouldn't timeout (#22479)
7cec82f99c2 is described below

commit 7cec82f99c2c729e0e9ac42f7176eadf4039ae1a
Author: fengyubiao 
AuthorDate: Mon Apr 15 00:13:49 2024 +0800

[fix] [broker] Prevent long deduplication cursor backlog so that topic 
loading wouldn't timeout (#22479)

(cherry picked from commit 837f8bca7ddbbad4354f9a89e36fcd6aea1be85c)
---
 .../pulsar/broker/service/BrokerService.java   |   4 +-
 .../service/persistent/MessageDeduplication.java   |  18 ++-
 .../broker/service/persistent/PersistentTopic.java |   2 +-
 .../DeduplicationDisabledBrokerLevelTest.java  | 161 +
 4 files changed, 177 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index d798d9e672e..f19b3436f7b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -576,8 +576,10 @@ public class BrokerService implements Closeable {
 }
 
 protected void startDeduplicationSnapshotMonitor() {
+// We do not know whether users will enable deduplication on namespace 
level/topic level or not, so keep this
+// scheduled task runs.
 int interval = 
pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds();
-if (interval > 0 && 
pulsar().getConfiguration().isBrokerDeduplicationEnabled()) {
+if (interval > 0) {
 this.deduplicationSnapshotMonitor = 
OrderedScheduler.newSchedulerBuilder()
 .name("deduplication-snapshot-monitor")
 .numThreads(1)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 802dd917961..e508661364d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -157,9 +157,14 @@ public class MessageDeduplication {
 
 // Replay all the entries and apply all the sequence ids updates
 log.info("[{}] Replaying {} entries for deduplication", 
topic.getName(), managedCursor.getNumberOfEntries());
-CompletableFuture future = new CompletableFuture<>();
+CompletableFuture future = new CompletableFuture<>();
 replayCursor(future);
-return future;
+return future.thenAccept(lastPosition -> {
+if (lastPosition != null && snapshotCounter >= snapshotInterval) {
+snapshotCounter = 0;
+takeSnapshot(lastPosition);
+}
+});
 }
 
 /**
@@ -168,11 +173,11 @@ public class MessageDeduplication {
  *
  * @param future future to trigger when the replay is complete
  */
-private void replayCursor(CompletableFuture future) {
+private void replayCursor(CompletableFuture future) {
 managedCursor.asyncReadEntries(100, new ReadEntriesCallback() {
 @Override
 public void readEntriesComplete(List entries, Object ctx) {
-
+Position lastPosition = null;
 for (Entry entry : entries) {
 ByteBuf messageMetadataAndPayload = entry.getDataBuffer();
 MessageMetadata md = 
Commands.parseMessageMetadata(messageMetadataAndPayload);
@@ -182,7 +187,8 @@ public class MessageDeduplication {
 highestSequencedPushed.put(producerName, sequenceId);
 highestSequencedPersisted.put(producerName, sequenceId);
 producerRemoved(producerName);
-
+snapshotCounter++;
+lastPosition = entry.getPosition();
 entry.release();
 }
 
@@ -191,7 +197,7 @@ public class MessageDeduplication {
 pulsar.getExecutor().execute(() -> replayCursor(future));
 } else {
 // Done replaying
-future.complete(null);
+future.complete(lastPosition);
 }
 }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index be1b873cc21..29df6e78cd1 100644
--- 

(pulsar) branch branch-2.11 updated: [fix][txn]Handle exceptions in the transaction pending ack init (#21274)

2024-04-14 Thread baodi
This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
 new e583dc86c3a [fix][txn]Handle exceptions in the transaction pending ack 
init (#21274)
e583dc86c3a is described below

commit e583dc86c3a999a33cdbd5ec058578d3e2a8fcd0
Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com>
AuthorDate: Mon Apr 15 08:43:12 2024 +0800

[fix][txn]Handle exceptions in the transaction pending ack init (#21274)

Co-authored-by: Baodi Shi 
(cherry picked from commit 5d18ff7b70f9de3b95d83f6a8fd4756b1c34567b)
---
 .../apache/pulsar/broker/service/ServerCnx.java|  2 +-
 .../pendingack/impl/PendingAckHandleImpl.java  | 55 +--
 .../pulsar/broker/transaction/TransactionTest.java |  2 +-
 .../pendingack/PendingAckPersistentTest.java   | 82 ++
 4 files changed, 133 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index d6b59e0b596..224a4489478 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1215,7 +1215,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 // back to client, only if not completed already.
 if 
(consumerFuture.completeExceptionally(exception)) {
 commandSender.sendErrorResponse(requestId,
-
BrokerServiceException.getClientErrorCode(exception),
+
BrokerServiceException.getClientErrorCode(exception.getCause()),
 exception.getCause().getMessage());
 }
 consumers.remove(consumerId, consumerFuture);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 5772efcce5c..33818336d91 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker.transaction.pendingack.impl;
 import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
 import static 
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet;
 import static 
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap;
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.util.Timer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -34,9 +36,11 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -44,6 +48,7 @@ import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.service.Consumer;
@@ -52,7 +57,9 @@ import 
org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import 
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
@@ -133,6 

Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]

2024-04-14 Thread via GitHub


gaoran10 commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1565113797


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##
@@ -188,58 +270,135 @@ protected CompletableFuture 
isLocalTopicActive() {
 }, brokerService.executor());
 }
 
-protected synchronized CompletableFuture closeProducerAsync() {
-if (producer == null) {
-STATE_UPDATER.set(this, State.Stopped);
+/**
+ * This method only be used by {@link PersistentTopic#checkGC} now.
+ */
+public CompletableFuture disconnect(boolean failIfHasBacklog, 
boolean closeTheStartingProducer) {
+long backlog = getNumberOfEntriesInBacklog();
+if (failIfHasBacklog && backlog > 0) {
+CompletableFuture disconnectFuture = new 
CompletableFuture<>();
+disconnectFuture.completeExceptionally(new 
TopicBusyException("Cannot close a replicator with backlog"));
+if (log.isDebugEnabled()) {
+log.debug("[{}] Replicator disconnect failed since topic has 
backlog", replicatorId);
+}
+return disconnectFuture;
+}
+log.info("[{}] Disconnect replicator at position {} with backlog {}", 
replicatorId,
+getReplicatorReadPosition(), backlog);
+return closeProducerAsync(closeTheStartingProducer);
+}
+
+/**
+ * This method only be used by {@link PersistentTopic#checkGC} now.
+ */
+protected CompletableFuture closeProducerAsync(boolean 
closeTheStartingProducer) {
+Pair setDisconnectingRes = 
compareSetAndGetState(State.Started, State.Disconnecting);
+if (!setDisconnectingRes.getLeft()) {
+if (setDisconnectingRes.getRight() == State.Starting) {
+if (closeTheStartingProducer) {
+/**
+ * Delay retry(wait for the start producer task is finish).
+ * Note: If the producer always start fail, the start 
producer task will always retry until the
+ *   state changed to {@link State.Terminated}.
+ *   Nit: The better solution is creating a {@link 
CompletableFuture} to trace the in-progress
+ * creation and call 
"inProgressCreationFuture.thenApply(closeProducer())".
+ */
+long waitTimeMs = backOff.next();
+brokerService.executor().schedule(() -> 
closeProducerAsync(true),
+waitTimeMs, TimeUnit.MILLISECONDS);
+} else {
+log.info("[{}] Skip current producer closing since the 
previous producer has been closed,"
++ " and trying start a new one, state : 
{}",
+replicatorId, setDisconnectingRes.getRight());
+}
+} else if (setDisconnectingRes.getRight() == State.Disconnected
+|| setDisconnectingRes.getRight() == State.Disconnecting) {
+log.info("[{}] Skip current producer closing since other 
thread did closing, state : {}",
+replicatorId, setDisconnectingRes.getRight());
+} else if (setDisconnectingRes.getRight() == State.Terminating
+|| setDisconnectingRes.getRight() == State.Terminated) {
+log.info("[{}] Skip current producer closing since other 
thread is doing termination, state : {}",
+replicatorId, state);
+}
+log.info("[{}] Skip current termination since other thread is 
doing close producer or termination,"
++ " state : {}", replicatorId, state);
 return CompletableFuture.completedFuture(null);
 }
-CompletableFuture future = producer.closeAsync();
+
+// Close producer and update state.
+return doCloseProducerAsync(producer, () -> {
+Pair setDisconnectedRes = 
compareSetAndGetState(State.Disconnecting, State.Disconnected);
+if (setDisconnectedRes.getLeft()) {
+this.producer = null;
+// deactivate further read
+disableReplicatorRead();
+return;
+}
+if (setDisconnectedRes.getRight() == State.Terminating
+|| setDisconnectingRes.getRight() == State.Terminated) {
+log.info("[{}] Skip setting state to terminated because it was 
terminated, state : {}",
+replicatorId, state);
+} else {
+// Since only one task can call 
"doCloseProducerAsync(producer, action)", this scenario is not expected.
+// So print a warn log.
+log.warn("[{}] Other task has change the state to terminated. 
so skipped current one task."
++ " State is : {}",
+  

(pulsar) branch branch-3.0 updated: [fix][txn]Handle exceptions in the transaction pending ack init (#21274)

2024-04-14 Thread baodi
This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 000ee6679e6 [fix][txn]Handle exceptions in the transaction pending ack 
init (#21274)
000ee6679e6 is described below

commit 000ee6679e6c1b0f1ecbc867bbe0ab3d0c542a55
Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com>
AuthorDate: Mon Apr 15 08:43:12 2024 +0800

[fix][txn]Handle exceptions in the transaction pending ack init (#21274)

Co-authored-by: Baodi Shi 
(cherry picked from commit 5d18ff7b70f9de3b95d83f6a8fd4756b1c34567b)
---
 .../apache/pulsar/broker/service/ServerCnx.java|  2 +-
 .../pendingack/impl/PendingAckHandleImpl.java  | 54 --
 .../pulsar/broker/transaction/TransactionTest.java |  2 +-
 .../pendingack/PendingAckPersistentTest.java   | 82 ++
 4 files changed, 132 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 2b2cbc5fac2..b47292268b4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1319,7 +1319,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 // Send error back to client, only if not 
completed already.
 if 
(consumerFuture.completeExceptionally(exception)) {
 commandSender.sendErrorResponse(requestId,
-
BrokerServiceException.getClientErrorCode(exception),
+
BrokerServiceException.getClientErrorCode(exception.getCause()),
 exception.getCause().getMessage());
 }
 consumers.remove(consumerId, consumerFuture);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 7dbe0385fd7..5ed271c6fd4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -22,6 +22,7 @@ import static 
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
 import static 
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet;
 import static 
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap;
 import com.google.common.annotations.VisibleForTesting;
+import io.netty.util.Timer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -35,9 +36,11 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -45,6 +48,7 @@ import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.service.Consumer;
@@ -53,7 +57,9 @@ import 
org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import 
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
@@ -134,6 +140,12 @@ public class PendingAckHandleImpl extends 

Re: [PR] [improve][test] Add topic operation checker for topic API [pulsar]

2024-04-14 Thread via GitHub


Technoboy- closed pull request #22468: [improve][test] Add topic operation 
checker for topic API
URL: https://github.com/apache/pulsar/pull/22468


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug] We are leaving old releases in the ASF distribution area. Please cleanup. [pulsar]

2024-04-14 Thread via GitHub


Technoboy- commented on issue #22486:
URL: https://github.com/apache/pulsar/issues/22486#issuecomment-2054279703

   Cleaned up. Please take a look @dave2wave 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



svn commit: r68514 - /release/pulsar/pulsar-client-cpp-3.4.0/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:28:45 2024
New Revision: 68514

Log:
cleanup

Removed:
release/pulsar/pulsar-client-cpp-3.4.0/



svn commit: r68517 - /release/pulsar/pulsar-client-cpp-3.1.1/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:29:42 2024
New Revision: 68517

Log:
cleanup

Removed:
release/pulsar/pulsar-client-cpp-3.1.1/



svn commit: r68516 - /release/pulsar/pulsar-client-cpp-3.1.0/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:29:27 2024
New Revision: 68516

Log:
cleanup

Removed:
release/pulsar/pulsar-client-cpp-3.1.0/



svn commit: r68515 - /release/pulsar/pulsar-client-cpp-3.4.1/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:28:58 2024
New Revision: 68515

Log:
cleanup

Removed:
release/pulsar/pulsar-client-cpp-3.4.1/



svn commit: r68513 - /release/pulsar/pulsar-client-go-0.11.0/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:28:14 2024
New Revision: 68513

Log:
cleanup

Removed:
release/pulsar/pulsar-client-go-0.11.0/



svn commit: r68512 - /release/pulsar/pulsar-dotpulsar-3.1.1/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:27:45 2024
New Revision: 68512

Log:
cleanup

Removed:
release/pulsar/pulsar-dotpulsar-3.1.1/



svn commit: r68511 - /release/pulsar/pulsar-dotpulsar-3.1.0/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:27:29 2024
New Revision: 68511

Log:
cleanup

Removed:
release/pulsar/pulsar-dotpulsar-3.1.0/



svn commit: r68510 - /release/pulsar/pulsar-client-go-0.12.0/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:26:17 2024
New Revision: 68510

Log:
cleanup

Removed:
release/pulsar/pulsar-client-go-0.12.0/



svn commit: r68509 - /release/pulsar/pulsar-client-reactive-0.5.2/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:25:44 2024
New Revision: 68509

Log:
cleanup

Removed:
release/pulsar/pulsar-client-reactive-0.5.2/



svn commit: r68508 - /release/pulsar/pulsar-client-reactive-0.5.1/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:25:31 2024
New Revision: 68508

Log:
cleanup

Removed:
release/pulsar/pulsar-client-reactive-0.5.1/



svn commit: r68507 - /release/pulsar/pulsar-client-reactive-0.5.0/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:25:17 2024
New Revision: 68507

Log:
cleanup

Removed:
release/pulsar/pulsar-client-reactive-0.5.0/



svn commit: r68506 - /release/pulsar/pulsar-3.2.1/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:24:19 2024
New Revision: 68506

Log:
cleanup

Removed:
release/pulsar/pulsar-3.2.1/



svn commit: r68505 - /release/pulsar/pulsar-3.2.0/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:23:56 2024
New Revision: 68505

Log:
cleanup

Removed:
release/pulsar/pulsar-3.2.0/



svn commit: r68502 - /release/pulsar/pulsar-3.1.0/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:22:31 2024
New Revision: 68502

Log:
cleanup

Removed:
release/pulsar/pulsar-3.1.0/



svn commit: r68504 - /release/pulsar/pulsar-3.1.2/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:23:19 2024
New Revision: 68504

Log:
cleanup

Removed:
release/pulsar/pulsar-3.1.2/



svn commit: r68503 - /release/pulsar/pulsar-3.1.1/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:22:58 2024
New Revision: 68503

Log:
cleanup

Removed:
release/pulsar/pulsar-3.1.1/



svn commit: r68501 - /release/pulsar/pulsar-3.0.3/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:22:04 2024
New Revision: 68501

Log:
cleanup

Removed:
release/pulsar/pulsar-3.0.3/



svn commit: r68500 - /release/pulsar/pulsar-3.0.2/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:21:51 2024
New Revision: 68500

Log:
cleanup

Removed:
release/pulsar/pulsar-3.0.2/



svn commit: r68499 - /release/pulsar/pulsar-3.0.1/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:21:23 2024
New Revision: 68499

Log:
cleanup

Removed:
release/pulsar/pulsar-3.0.1/



svn commit: r68498 - /release/pulsar/pulsar-3.0.0/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:21:08 2024
New Revision: 68498

Log:
cleanup

Removed:
release/pulsar/pulsar-3.0.0/



svn commit: r68496 - /release/pulsar/pulsar-2.11.3/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:20:06 2024
New Revision: 68496

Log:
cleanup

Removed:
release/pulsar/pulsar-2.11.3/



svn commit: r68497 - /release/pulsar/pulsar-2.9.4/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:20:37 2024
New Revision: 68497

Log:
cleanup

Removed:
release/pulsar/pulsar-2.9.4/



svn commit: r68493 - /release/pulsar/pulsar-2.11.0/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:19:17 2024
New Revision: 68493

Log:
cleanup

Removed:
release/pulsar/pulsar-2.11.0/



svn commit: r68494 - /release/pulsar/pulsar-2.11.1/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:19:42 2024
New Revision: 68494

Log:
cleanup

Removed:
release/pulsar/pulsar-2.11.1/



svn commit: r68495 - /release/pulsar/pulsar-2.11.2/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:19:53 2024
New Revision: 68495

Log:
cleanup

Removed:
release/pulsar/pulsar-2.11.2/



svn commit: r68490 - /release/pulsar/pulsar-2.10.3/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:18:19 2024
New Revision: 68490

Log:
cleanup

Removed:
release/pulsar/pulsar-2.10.3/



svn commit: r68492 - /release/pulsar/pulsar-2.10.5/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:18:56 2024
New Revision: 68492

Log:
cleanup

Removed:
release/pulsar/pulsar-2.10.5/



svn commit: r68491 - /release/pulsar/pulsar-2.10.4/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:18:39 2024
New Revision: 68491

Log:
cleanup

Removed:
release/pulsar/pulsar-2.10.4/



svn commit: r68489 - /release/pulsar/pulsar-2.10.2/

2024-04-14 Thread technoboy
Author: technoboy
Date: Mon Apr 15 01:17:31 2024
New Revision: 68489

Log:
cleanup

Removed:
release/pulsar/pulsar-2.10.2/



Re: [PR] [fix][fn]make sure the classloader for ContextImpl is `functionClassLoader` in different runtimes [pulsar]

2024-04-14 Thread via GitHub


freeznet closed pull request #22501: [fix][fn]make sure the classloader for 
ContextImpl is `functionClassLoader` in different runtimes
URL: https://github.com/apache/pulsar/pull/22501


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [fix][fn]make sure the classloader for ContextImpl is `functionClassLoader` in different runtimes [pulsar]

2024-04-14 Thread via GitHub


freeznet opened a new pull request, #22501:
URL: https://github.com/apache/pulsar/pull/22501

   
   
   
   
   Fixes #xyz
   
   
   
   Main Issue: #xyz
   
   
   
   PIP: #xyz 
   
   
   
   ### Motivation
   
   https://github.com/apache/pulsar/pull/20115 fixes the classloader in 
TopicSchema, but for Kubernetes Runtime, the classloader may be different from 
the other runtimes. Which caused the SerDe to fail to init in Kubernetes 
Runtime. 
   
   ### Modifications
   
   - make sure the classloader always be `functionClassLoader` when init the 
context
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: https://github.com/freeznet/pulsar/pull/11
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]

2024-04-14 Thread via GitHub


gaoran10 commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1565026654


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##
@@ -134,30 +137,44 @@ public PersistentReplicator(String localCluster, 
PersistentTopic localTopic, Man
 }
 
 @Override
-protected void readEntries(Producer producer) {
-// Rewind the cursor to be sure to read again all non-acked messages 
sent while restarting
+protected void setProducerAndTriggerReadEntries(Producer producer) 
{
+// Rewind the cursor to be sure to read again all non-acked messages 
sent while restarting.
 cursor.rewind();
-
 cursor.cancelPendingReadRequest();
-HAVE_PENDING_READ_UPDATER.set(this, FALSE);
-this.producer = (ProducerImpl) producer;
 
-if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) {
-log.info("[{}] Created replicator producer", replicatorId);
+/**
+ * 1. Try change state to {@link Started}.
+ * 2. Atoms modify multiple properties if change state success, to 
avoid another thread get a null value
+ *producer when the state is {@link Started}.
+ */
+Pair changeStateRes;
+changeStateRes = compareSetAndGetState(Starting, Started);
+if (changeStateRes.getLeft()) {
+this.producer = (ProducerImpl) producer;
+HAVE_PENDING_READ_UPDATER.set(this, FALSE);
+// Trigger a new read.
+log.info("[{}] Created replicator producer, Replicator state: {}", 
replicatorId, state);
 backOff.reset();
-// activate cursor: so, entries can be cached
+// activate cursor: so, entries can be cached.
 this.cursor.setActive();
 // read entries
 readMoreEntries();
 } else {
-log.info(
-"[{}] Replicator was stopped while creating the producer."
-+ " Closing it. Replicator state: {}",
-replicatorId, STATE_UPDATER.get(this));
-STATE_UPDATER.set(this, State.Stopping);
-closeProducerAsync();
+if (changeStateRes.getRight() == Started) {
+// Since only one task can call 
"producerBuilder.createAsync()", this scenario is not expected.
+// So print a warn log.
+log.warn("[{}] Replicator was already started by another 
thread while creating the producer."
++ " Closing the producer newly created. Replicator 
state: {}", replicatorId, state);
+} else if (changeStateRes.getRight() == Terminating || 
changeStateRes.getRight() == Terminated) {
+log.info("[{}] Replicator was terminated, so close the 
producer. Replicator state: {}",

Review Comment:
   Got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]

2024-04-14 Thread via GitHub


gaoran10 commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1565025191


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##
@@ -134,30 +137,44 @@ public PersistentReplicator(String localCluster, 
PersistentTopic localTopic, Man
 }
 
 @Override
-protected void readEntries(Producer producer) {
-// Rewind the cursor to be sure to read again all non-acked messages 
sent while restarting
+protected void setProducerAndTriggerReadEntries(Producer producer) 
{
+// Rewind the cursor to be sure to read again all non-acked messages 
sent while restarting.
 cursor.rewind();
-
 cursor.cancelPendingReadRequest();
-HAVE_PENDING_READ_UPDATER.set(this, FALSE);
-this.producer = (ProducerImpl) producer;

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.1 updated: [fix][txn]Handle exceptions in the transaction pending ack init (#21274)

2024-04-14 Thread baodi
This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new ce99a96bb5b [fix][txn]Handle exceptions in the transaction pending ack 
init (#21274)
ce99a96bb5b is described below

commit ce99a96bb5b088dd37dc2ecb97ef7bb9d3ae3deb
Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com>
AuthorDate: Mon Apr 15 08:43:12 2024 +0800

[fix][txn]Handle exceptions in the transaction pending ack init (#21274)

Co-authored-by: Baodi Shi 
(cherry picked from commit 5d18ff7b70f9de3b95d83f6a8fd4756b1c34567b)
---
 .../apache/pulsar/broker/service/ServerCnx.java|  2 +-
 .../pendingack/impl/PendingAckHandleImpl.java  | 54 --
 .../pulsar/broker/transaction/TransactionTest.java |  2 +-
 .../pendingack/PendingAckPersistentTest.java   | 82 ++
 4 files changed, 132 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 29ba7cb866e..c5342dd3dff 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1311,7 +1311,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 // Send error back to client, only if not 
completed already.
 if 
(consumerFuture.completeExceptionally(exception)) {
 commandSender.sendErrorResponse(requestId,
-
BrokerServiceException.getClientErrorCode(exception),
+
BrokerServiceException.getClientErrorCode(exception.getCause()),
 exception.getCause().getMessage());
 }
 consumers.remove(consumerId, consumerFuture);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 7dbe0385fd7..5ed271c6fd4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -22,6 +22,7 @@ import static 
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
 import static 
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet;
 import static 
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap;
 import com.google.common.annotations.VisibleForTesting;
+import io.netty.util.Timer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -35,9 +36,11 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -45,6 +48,7 @@ import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.service.Consumer;
@@ -53,7 +57,9 @@ import 
org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import 
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
@@ -134,6 +140,12 @@ public class PendingAckHandleImpl extends 

(pulsar) branch branch-3.2 updated: [fix][txn]Handle exceptions in the transaction pending ack init (#21274)

2024-04-14 Thread baodi
This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new fdbcb6fe0da [fix][txn]Handle exceptions in the transaction pending ack 
init (#21274)
fdbcb6fe0da is described below

commit fdbcb6fe0da6967afe55248134b9d3699916f191
Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com>
AuthorDate: Mon Apr 15 08:43:12 2024 +0800

[fix][txn]Handle exceptions in the transaction pending ack init (#21274)

Co-authored-by: Baodi Shi 
(cherry picked from commit 5d18ff7b70f9de3b95d83f6a8fd4756b1c34567b)
---
 .../apache/pulsar/broker/service/ServerCnx.java|  2 +-
 .../pendingack/impl/PendingAckHandleImpl.java  | 54 --
 .../pulsar/broker/transaction/TransactionTest.java |  2 +-
 .../pendingack/PendingAckPersistentTest.java   | 82 ++
 4 files changed, 132 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 569c5a2fb1e..40a0ee4a73c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1374,7 +1374,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 // Send error back to client, only if not 
completed already.
 if 
(consumerFuture.completeExceptionally(exception)) {
 commandSender.sendErrorResponse(requestId,
-
BrokerServiceException.getClientErrorCode(exception),
+
BrokerServiceException.getClientErrorCode(exception.getCause()),
 exception.getCause().getMessage());
 }
 consumers.remove(consumerId, consumerFuture);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 7dbe0385fd7..5ed271c6fd4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -22,6 +22,7 @@ import static 
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
 import static 
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet;
 import static 
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap;
 import com.google.common.annotations.VisibleForTesting;
+import io.netty.util.Timer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -35,9 +36,11 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -45,6 +48,7 @@ import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.service.Consumer;
@@ -53,7 +57,9 @@ import 
org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import 
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
@@ -134,6 +140,12 @@ public class PendingAckHandleImpl extends 

Re: [PR] [fix][test] Flaky-test: PersistentMessageFinderTest.testMessageExpiryWithTimestampNonRecoverableException [pulsar]

2024-04-14 Thread via GitHub


codelipenghui commented on PR #22489:
URL: https://github.com/apache/pulsar/pull/22489#issuecomment-2054249321

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [fix][txn]Handle exceptions in the transaction pending ack init (#21274)

2024-04-14 Thread baodi
This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 5d18ff7b70f [fix][txn]Handle exceptions in the transaction pending ack 
init (#21274)
5d18ff7b70f is described below

commit 5d18ff7b70f9de3b95d83f6a8fd4756b1c34567b
Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com>
AuthorDate: Mon Apr 15 08:43:12 2024 +0800

[fix][txn]Handle exceptions in the transaction pending ack init (#21274)

Co-authored-by: Baodi Shi 
---
 .../apache/pulsar/broker/service/ServerCnx.java|  2 +-
 .../pendingack/impl/PendingAckHandleImpl.java  | 54 --
 .../pulsar/broker/transaction/TransactionTest.java |  2 +-
 .../pendingack/PendingAckPersistentTest.java   | 82 ++
 4 files changed, 132 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4ee6ac43465..a60f1d805ce 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1376,7 +1376,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 // Send error back to client, only if not 
completed already.
 if 
(consumerFuture.completeExceptionally(exception)) {
 commandSender.sendErrorResponse(requestId,
-
BrokerServiceException.getClientErrorCode(exception),
+
BrokerServiceException.getClientErrorCode(exception.getCause()),
 exception.getCause().getMessage());
 }
 consumers.remove(consumerId, consumerFuture);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 7dbe0385fd7..5ed271c6fd4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -22,6 +22,7 @@ import static 
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
 import static 
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet;
 import static 
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap;
 import com.google.common.annotations.VisibleForTesting;
+import io.netty.util.Timer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -35,9 +36,11 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -45,6 +48,7 @@ import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.service.Consumer;
@@ -53,7 +57,9 @@ import 
org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import 
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
@@ -134,6 +140,12 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
 
 public final RecoverTimeRecord 

Re: [PR] [fix][txn]Handle exceptions in the transaction pending ack init [pulsar]

2024-04-14 Thread via GitHub


shibd merged PR #21274:
URL: https://github.com/apache/pulsar/pull/21274


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]

2024-04-14 Thread via GitHub


poorbarcode commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1564805152


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##
@@ -675,30 +692,6 @@ protected void checkReplicatedSubscriptionMarker(Position 
position, MessageImpl<
 }
 }
 
-@Override
-public CompletableFuture disconnect() {
-return disconnect(false);
-}
-
-@Override
-public synchronized CompletableFuture disconnect(boolean 
failIfHasBacklog) {
-final CompletableFuture future = new CompletableFuture<>();
-
-super.disconnect(failIfHasBacklog).thenRun(() -> {
-dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);

Review Comment:
   Good catch! Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]

2024-04-14 Thread via GitHub


poorbarcode commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1564803078


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -3870,6 +3879,27 @@ private void unfenceTopicToResume() {
 subscriptions.values().forEach(sub -> sub.resumeAfterFence());
 isFenced = false;
 isClosingOrDeleting = false;
+replicatorsResume();
+}
+
+private void replicatorsResume() {
+removeTerminatingReplicators(replicators);
+removeTerminatingReplicators(shadowReplicators);
+checkReplication();
+checkShadowReplication();
+}
+
+private static void 
removeTerminatingReplicators(ConcurrentOpenHashMap 
replicators) {

Review Comment:
   For performance and test, static method is better  
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]

2024-04-14 Thread via GitHub


poorbarcode commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1564802782


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##
@@ -134,30 +137,44 @@ public PersistentReplicator(String localCluster, 
PersistentTopic localTopic, Man
 }
 
 @Override
-protected void readEntries(Producer producer) {
-// Rewind the cursor to be sure to read again all non-acked messages 
sent while restarting
+protected void setProducerAndTriggerReadEntries(Producer producer) 
{
+// Rewind the cursor to be sure to read again all non-acked messages 
sent while restarting.
 cursor.rewind();
-
 cursor.cancelPendingReadRequest();
-HAVE_PENDING_READ_UPDATER.set(this, FALSE);
-this.producer = (ProducerImpl) producer;

Review Comment:
   > Why do we need to move these two lines?
   It seems that if we don't set producer, the operation 
doCloseProducerAsync(producer, () -> {});  can't close the producer.
   
   We did not remove these two lines, just moved them into the logic branch 
`compareSetAndGetState(Starting, Started) == true`.
   
   >`doCloseProducerAsync(producer, () -> {});`  can't close the producer.
   
   This logic branch is using the method variable `producer`, not 
`this.producer`, so it does work



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]

2024-04-14 Thread via GitHub


poorbarcode commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1564801185


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##
@@ -134,30 +137,44 @@ public PersistentReplicator(String localCluster, 
PersistentTopic localTopic, Man
 }
 
 @Override
-protected void readEntries(Producer producer) {
-// Rewind the cursor to be sure to read again all non-acked messages 
sent while restarting
+protected void setProducerAndTriggerReadEntries(Producer producer) 
{
+// Rewind the cursor to be sure to read again all non-acked messages 
sent while restarting.
 cursor.rewind();
-
 cursor.cancelPendingReadRequest();
-HAVE_PENDING_READ_UPDATER.set(this, FALSE);
-this.producer = (ProducerImpl) producer;
 
-if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) {
-log.info("[{}] Created replicator producer", replicatorId);
+/**
+ * 1. Try change state to {@link Started}.
+ * 2. Atoms modify multiple properties if change state success, to 
avoid another thread get a null value
+ *producer when the state is {@link Started}.
+ */
+Pair changeStateRes;
+changeStateRes = compareSetAndGetState(Starting, Started);
+if (changeStateRes.getLeft()) {
+this.producer = (ProducerImpl) producer;
+HAVE_PENDING_READ_UPDATER.set(this, FALSE);
+// Trigger a new read.
+log.info("[{}] Created replicator producer, Replicator state: {}", 
replicatorId, state);
 backOff.reset();
-// activate cursor: so, entries can be cached
+// activate cursor: so, entries can be cached.
 this.cursor.setActive();
 // read entries
 readMoreEntries();
 } else {
-log.info(
-"[{}] Replicator was stopped while creating the producer."
-+ " Closing it. Replicator state: {}",
-replicatorId, STATE_UPDATER.get(this));
-STATE_UPDATER.set(this, State.Stopping);
-closeProducerAsync();
+if (changeStateRes.getRight() == Started) {
+// Since only one task can call 
"producerBuilder.createAsync()", this scenario is not expected.
+// So print a warn log.
+log.warn("[{}] Replicator was already started by another 
thread while creating the producer."
++ " Closing the producer newly created. Replicator 
state: {}", replicatorId, state);
+} else if (changeStateRes.getRight() == Terminating || 
changeStateRes.getRight() == Terminated) {
+log.info("[{}] Replicator was terminated, so close the 
producer. Replicator state: {}",

Review Comment:
   > This is a little confusing, if the replicator was terminating or 
terminated, do we need to close the producer again?
   
   Yes, it is needed. Because the variable `producer` here is not the variable 
`this.producer`, it is a new object, we need to close it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]

2024-04-14 Thread via GitHub


poorbarcode commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1564800410


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##
@@ -188,58 +270,135 @@ protected CompletableFuture 
isLocalTopicActive() {
 }, brokerService.executor());
 }
 
-protected synchronized CompletableFuture closeProducerAsync() {
-if (producer == null) {
-STATE_UPDATER.set(this, State.Stopped);
+/**
+ * This method only be used by {@link PersistentTopic#checkGC} now.
+ */
+public CompletableFuture disconnect(boolean failIfHasBacklog, 
boolean closeTheStartingProducer) {
+long backlog = getNumberOfEntriesInBacklog();
+if (failIfHasBacklog && backlog > 0) {
+CompletableFuture disconnectFuture = new 
CompletableFuture<>();
+disconnectFuture.completeExceptionally(new 
TopicBusyException("Cannot close a replicator with backlog"));
+if (log.isDebugEnabled()) {
+log.debug("[{}] Replicator disconnect failed since topic has 
backlog", replicatorId);
+}
+return disconnectFuture;
+}
+log.info("[{}] Disconnect replicator at position {} with backlog {}", 
replicatorId,
+getReplicatorReadPosition(), backlog);
+return closeProducerAsync(closeTheStartingProducer);
+}
+
+/**
+ * This method only be used by {@link PersistentTopic#checkGC} now.
+ */
+protected CompletableFuture closeProducerAsync(boolean 
closeTheStartingProducer) {
+Pair setDisconnectingRes = 
compareSetAndGetState(State.Started, State.Disconnecting);
+if (!setDisconnectingRes.getLeft()) {
+if (setDisconnectingRes.getRight() == State.Starting) {
+if (closeTheStartingProducer) {
+/**
+ * Delay retry(wait for the start producer task is finish).
+ * Note: If the producer always start fail, the start 
producer task will always retry until the
+ *   state changed to {@link State.Terminated}.
+ *   Nit: The better solution is creating a {@link 
CompletableFuture} to trace the in-progress
+ * creation and call 
"inProgressCreationFuture.thenApply(closeProducer())".
+ */
+long waitTimeMs = backOff.next();
+brokerService.executor().schedule(() -> 
closeProducerAsync(true),
+waitTimeMs, TimeUnit.MILLISECONDS);
+} else {
+log.info("[{}] Skip current producer closing since the 
previous producer has been closed,"
++ " and trying start a new one, state : 
{}",
+replicatorId, setDisconnectingRes.getRight());
+}
+} else if (setDisconnectingRes.getRight() == State.Disconnected
+|| setDisconnectingRes.getRight() == State.Disconnecting) {
+log.info("[{}] Skip current producer closing since other 
thread did closing, state : {}",
+replicatorId, setDisconnectingRes.getRight());
+} else if (setDisconnectingRes.getRight() == State.Terminating
+|| setDisconnectingRes.getRight() == State.Terminated) {
+log.info("[{}] Skip current producer closing since other 
thread is doing termination, state : {}",
+replicatorId, state);
+}
+log.info("[{}] Skip current termination since other thread is 
doing close producer or termination,"
++ " state : {}", replicatorId, state);
 return CompletableFuture.completedFuture(null);
 }
-CompletableFuture future = producer.closeAsync();
+
+// Close producer and update state.
+return doCloseProducerAsync(producer, () -> {
+Pair setDisconnectedRes = 
compareSetAndGetState(State.Disconnecting, State.Disconnected);
+if (setDisconnectedRes.getLeft()) {
+this.producer = null;
+// deactivate further read
+disableReplicatorRead();
+return;
+}
+if (setDisconnectedRes.getRight() == State.Terminating
+|| setDisconnectingRes.getRight() == State.Terminated) {
+log.info("[{}] Skip setting state to terminated because it was 
terminated, state : {}",
+replicatorId, state);
+} else {
+// Since only one task can call 
"doCloseProducerAsync(producer, action)", this scenario is not expected.
+// So print a warn log.
+log.warn("[{}] Other task has change the state to terminated. 
so skipped current one task."
++ " State is : {}",
+   

Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]

2024-04-14 Thread via GitHub


poorbarcode commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1564796734


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##
@@ -96,83 +123,138 @@ public AbstractReplicator(String localCluster, Topic 
localTopic, String remoteCl
 .sendTimeout(0, TimeUnit.SECONDS) //
 .maxPendingMessages(producerQueueSize) //
 .producerName(getProducerName());
-STATE_UPDATER.set(this, State.Stopped);
+STATE_UPDATER.set(this, State.Disconnected);
 }
 
 protected abstract String getProducerName();
 
-protected abstract void 
readEntries(org.apache.pulsar.client.api.Producer producer);
+protected abstract void 
setProducerAndTriggerReadEntries(org.apache.pulsar.client.api.Producer 
producer);
 
 protected abstract Position getReplicatorReadPosition();
 
-protected abstract long getNumberOfEntriesInBacklog();
+public abstract long getNumberOfEntriesInBacklog();
 
 protected abstract void disableReplicatorRead();
 
 public String getRemoteCluster() {
 return remoteCluster;
 }
 
-// This method needs to be synchronized with disconnects else if there is 
a disconnect followed by startProducer
-// the end result can be disconnect.
-public synchronized void startProducer() {
-if (STATE_UPDATER.get(this) == State.Stopping) {
-long waitTimeMs = backOff.next();
-if (log.isDebugEnabled()) {
-log.debug(
-"[{}] waiting for producer to close before attempting 
to reconnect, retrying in {} s",
-replicatorId, waitTimeMs / 1000.0);
-}
-// BackOff before retrying
-
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, 
waitTimeMs,
-TimeUnit.MILLISECONDS);
-return;
-}
-State state = STATE_UPDATER.get(this);
-if (!STATE_UPDATER.compareAndSet(this, State.Stopped, State.Starting)) 
{
-if (state == State.Started) {
-// Already running
+public void startProducer() {
+// Guarantee only one task call "producerBuilder.createAsync()".
+Pair setStartingRes = 
compareSetAndGetState(State.Disconnected, State.Starting);
+if (!setStartingRes.getLeft()) {
+if (setStartingRes.getRight() == State.Starting) {
+log.info("[{}] Skip the producer creation since other thread 
is doing starting, state : {}",
+replicatorId, state);
+} else if (setStartingRes.getRight() == State.Started) {
+// Since the method "startProducer" will be called even if it 
is started, only print debug-level log.
+if (log.isDebugEnabled()) {
+log.debug("[{}] Replicator was already running. state: 
{}", replicatorId, state);
+}
+} else if (setStartingRes.getRight() == State.Disconnecting) {
 if (log.isDebugEnabled()) {
-log.debug("[{}] Replicator was already running", 
replicatorId);
+log.debug("[{}] Rep.producer is closing, delay to 
retry(wait the producer close success)."
++ " state: {}", replicatorId, state);
 }
+delayStartProducerAfterDisconnected();
 } else {
-log.info("[{}] Replicator already being started. Replicator 
state: {}", replicatorId, state);
+/** {@link State.Terminating}, {@link State.Terminated}. **/
+log.info("[{}] Skip the producer creation since the replicator 
state is : {}", replicatorId, state);
 }
-
 return;
 }
 
 log.info("[{}] Starting replicator", replicatorId);
 producerBuilder.createAsync().thenAccept(producer -> {
-readEntries(producer);
+setProducerAndTriggerReadEntries(producer);
 }).exceptionally(ex -> {
-if (STATE_UPDATER.compareAndSet(this, State.Starting, 
State.Stopped)) {
+Pair setDisconnectedRes = 
compareSetAndGetState(State.Starting, State.Disconnected);
+if (setDisconnectedRes.getLeft()) {
 long waitTimeMs = backOff.next();
 log.warn("[{}] Failed to create remote producer ({}), retrying 
in {} s",
 replicatorId, ex.getMessage(), waitTimeMs / 1000.0);
-
 // BackOff before retrying
-
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, 
waitTimeMs,
-TimeUnit.MILLISECONDS);
+scheduleCheckTopicActiveAndStartProducer(waitTimeMs);
 } else {
-log.warn("[{}] Failed to create remote producer. Replicator 
state: {}", replicatorId,
-

Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]

2024-04-14 Thread via GitHub


poorbarcode commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1564796010


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##
@@ -96,83 +123,138 @@ public AbstractReplicator(String localCluster, Topic 
localTopic, String remoteCl
 .sendTimeout(0, TimeUnit.SECONDS) //
 .maxPendingMessages(producerQueueSize) //
 .producerName(getProducerName());
-STATE_UPDATER.set(this, State.Stopped);
+STATE_UPDATER.set(this, State.Disconnected);
 }
 
 protected abstract String getProducerName();
 
-protected abstract void 
readEntries(org.apache.pulsar.client.api.Producer producer);
+protected abstract void 
setProducerAndTriggerReadEntries(org.apache.pulsar.client.api.Producer 
producer);
 
 protected abstract Position getReplicatorReadPosition();
 
-protected abstract long getNumberOfEntriesInBacklog();
+public abstract long getNumberOfEntriesInBacklog();
 
 protected abstract void disableReplicatorRead();
 
 public String getRemoteCluster() {
 return remoteCluster;
 }
 
-// This method needs to be synchronized with disconnects else if there is 
a disconnect followed by startProducer
-// the end result can be disconnect.
-public synchronized void startProducer() {
-if (STATE_UPDATER.get(this) == State.Stopping) {
-long waitTimeMs = backOff.next();
-if (log.isDebugEnabled()) {
-log.debug(
-"[{}] waiting for producer to close before attempting 
to reconnect, retrying in {} s",
-replicatorId, waitTimeMs / 1000.0);
-}
-// BackOff before retrying
-
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, 
waitTimeMs,
-TimeUnit.MILLISECONDS);
-return;
-}
-State state = STATE_UPDATER.get(this);
-if (!STATE_UPDATER.compareAndSet(this, State.Stopped, State.Starting)) 
{
-if (state == State.Started) {
-// Already running
+public void startProducer() {
+// Guarantee only one task call "producerBuilder.createAsync()".
+Pair setStartingRes = 
compareSetAndGetState(State.Disconnected, State.Starting);
+if (!setStartingRes.getLeft()) {
+if (setStartingRes.getRight() == State.Starting) {
+log.info("[{}] Skip the producer creation since other thread 
is doing starting, state : {}",
+replicatorId, state);
+} else if (setStartingRes.getRight() == State.Started) {
+// Since the method "startProducer" will be called even if it 
is started, only print debug-level log.
+if (log.isDebugEnabled()) {
+log.debug("[{}] Replicator was already running. state: 
{}", replicatorId, state);
+}
+} else if (setStartingRes.getRight() == State.Disconnecting) {
 if (log.isDebugEnabled()) {
-log.debug("[{}] Replicator was already running", 
replicatorId);
+log.debug("[{}] Rep.producer is closing, delay to 
retry(wait the producer close success)."
++ " state: {}", replicatorId, state);
 }
+delayStartProducerAfterDisconnected();
 } else {
-log.info("[{}] Replicator already being started. Replicator 
state: {}", replicatorId, state);
+/** {@link State.Terminating}, {@link State.Terminated}. **/
+log.info("[{}] Skip the producer creation since the replicator 
state is : {}", replicatorId, state);
 }
-
 return;
 }
 
 log.info("[{}] Starting replicator", replicatorId);
 producerBuilder.createAsync().thenAccept(producer -> {
-readEntries(producer);
+setProducerAndTriggerReadEntries(producer);
 }).exceptionally(ex -> {
-if (STATE_UPDATER.compareAndSet(this, State.Starting, 
State.Stopped)) {
+Pair setDisconnectedRes = 
compareSetAndGetState(State.Starting, State.Disconnected);
+if (setDisconnectedRes.getLeft()) {
 long waitTimeMs = backOff.next();
 log.warn("[{}] Failed to create remote producer ({}), retrying 
in {} s",
 replicatorId, ex.getMessage(), waitTimeMs / 1000.0);
-
 // BackOff before retrying
-
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, 
waitTimeMs,
-TimeUnit.MILLISECONDS);
+scheduleCheckTopicActiveAndStartProducer(waitTimeMs);
 } else {
-log.warn("[{}] Failed to create remote producer. Replicator 
state: {}", replicatorId,
-

(pulsar) branch master updated: [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)

2024-04-14 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 837f8bca7dd [fix] [broker] Prevent long deduplication cursor backlog 
so that topic loading wouldn't timeout (#22479)
837f8bca7dd is described below

commit 837f8bca7ddbbad4354f9a89e36fcd6aea1be85c
Author: fengyubiao 
AuthorDate: Mon Apr 15 00:13:49 2024 +0800

[fix] [broker] Prevent long deduplication cursor backlog so that topic 
loading wouldn't timeout (#22479)
---
 .../pulsar/broker/service/BrokerService.java   |   4 +-
 .../service/persistent/MessageDeduplication.java   |  18 ++-
 .../broker/service/persistent/PersistentTopic.java |   2 +-
 .../DeduplicationDisabledBrokerLevelTest.java  | 161 +
 4 files changed, 177 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b4d0f38b4a4..2687532693a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -631,8 +631,10 @@ public class BrokerService implements Closeable {
 }
 
 protected void startDeduplicationSnapshotMonitor() {
+// We do not know whether users will enable deduplication on namespace 
level/topic level or not, so keep this
+// scheduled task runs.
 int interval = 
pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds();
-if (interval > 0 && 
pulsar().getConfiguration().isBrokerDeduplicationEnabled()) {
+if (interval > 0) {
 this.deduplicationSnapshotMonitor = 
OrderedScheduler.newSchedulerBuilder()
 .name("deduplication-snapshot-monitor")
 .numThreads(1)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 802dd917961..e508661364d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -157,9 +157,14 @@ public class MessageDeduplication {
 
 // Replay all the entries and apply all the sequence ids updates
 log.info("[{}] Replaying {} entries for deduplication", 
topic.getName(), managedCursor.getNumberOfEntries());
-CompletableFuture future = new CompletableFuture<>();
+CompletableFuture future = new CompletableFuture<>();
 replayCursor(future);
-return future;
+return future.thenAccept(lastPosition -> {
+if (lastPosition != null && snapshotCounter >= snapshotInterval) {
+snapshotCounter = 0;
+takeSnapshot(lastPosition);
+}
+});
 }
 
 /**
@@ -168,11 +173,11 @@ public class MessageDeduplication {
  *
  * @param future future to trigger when the replay is complete
  */
-private void replayCursor(CompletableFuture future) {
+private void replayCursor(CompletableFuture future) {
 managedCursor.asyncReadEntries(100, new ReadEntriesCallback() {
 @Override
 public void readEntriesComplete(List entries, Object ctx) {
-
+Position lastPosition = null;
 for (Entry entry : entries) {
 ByteBuf messageMetadataAndPayload = entry.getDataBuffer();
 MessageMetadata md = 
Commands.parseMessageMetadata(messageMetadataAndPayload);
@@ -182,7 +187,8 @@ public class MessageDeduplication {
 highestSequencedPushed.put(producerName, sequenceId);
 highestSequencedPersisted.put(producerName, sequenceId);
 producerRemoved(producerName);
-
+snapshotCounter++;
+lastPosition = entry.getPosition();
 entry.release();
 }
 
@@ -191,7 +197,7 @@ public class MessageDeduplication {
 pulsar.getExecutor().execute(() -> replayCursor(future));
 } else {
 // Done replaying
-future.complete(null);
+future.complete(lastPosition);
 }
 }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3c9ab04d79a..e4441969101 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 

Re: [PR] [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout [pulsar]

2024-04-14 Thread via GitHub


poorbarcode merged PR #22479:
URL: https://github.com/apache/pulsar/pull/22479


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug] [branch-3.0] The test SimpleProducerConsumerTest.testAccessAvroSchemaMetadata always fails [pulsar]

2024-04-14 Thread via GitHub


Denovo1998 commented on issue #22470:
URL: https://github.com/apache/pulsar/issues/22470#issuecomment-2054102214

   @poorbarcode @Technoboy-  PTAL!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [fix][test] SchemaMap in AutoConsumeSchema has been reused [pulsar]

2024-04-14 Thread via GitHub


Denovo1998 opened a new pull request, #22500:
URL: https://github.com/apache/pulsar/pull/22500

   
   
   
   
   Fixes #22470
   
   
   
   ### Motivation
   
   
   Related to #17887. After executing the test method once does not clean up 
the pulsarClient, causing the AutoConsumeSchema to be reused. When the 
schemaMap in the first AutoConsumeSchema is set to AvroSchema.
   
https://github.com/apache/pulsar/blob/982d94d15465ff5fcea192e9a8b6b25d2d1c55b6/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java#L149-L169
   
https://github.com/apache/pulsar/blob/982d94d15465ff5fcea192e9a8b6b25d2d1c55b6/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java#L202-L206
   
   The second execution of the test method will also use the AvroSchema instead 
of the JsonSchema. Because the schema will not be fetched again.
   
https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java#L130-L136
   
https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java#L328-L338
   
   ### Modifications
   
   
   1. Closes the `pulsarClient` after throwing an exception.
   2. Determine whether the `pulsarClient` is null before starting, and create 
a new one if it is null.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) 01/01: fix unit test.

2024-04-14 Thread baodi
This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch remove_failed_sub
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c5ffbf9111e11ed2c98b6268b60bfb354117339b
Author: Baodi Shi 
AuthorDate: Sun Apr 14 20:52:01 2024 +0800

fix unit test.
---
 .../test/java/org/apache/pulsar/broker/transaction/TransactionTest.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index ddfa82f5288..e45924e8bb4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1517,7 +1517,7 @@ public class TransactionTest extends TransactionTestBase {
 fail("Expect failure by PendingAckHandle closed, but success");
 } catch (ExecutionException executionException){
 Throwable t = executionException.getCause();
-Assert.assertTrue(t instanceof 
BrokerServiceException.ServiceUnitNotReadyException);
+Assert.assertTrue(t instanceof BrokerServiceException);
 }
 }
 



(pulsar) branch remove_failed_sub deleted (was c5ffbf9111e)

2024-04-14 Thread baodi
This is an automated email from the ASF dual-hosted git repository.

baodi pushed a change to branch remove_failed_sub
in repository https://gitbox.apache.org/repos/asf/pulsar.git


 was c5ffbf9111e fix unit test.

This change permanently discards the following revisions:

 discard c5ffbf9111e fix unit test.



(pulsar) branch remove_failed_sub created (now c5ffbf9111e)

2024-04-14 Thread baodi
This is an automated email from the ASF dual-hosted git repository.

baodi pushed a change to branch remove_failed_sub
in repository https://gitbox.apache.org/repos/asf/pulsar.git


  at c5ffbf9111e fix unit test.

This branch includes the following new commits:

 new c5ffbf9111e fix unit test.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.