Re: [PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-05-07 Thread via GitHub


dao-jun merged PR #22610:
URL: https://github.com/apache/pulsar/pull/22610


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-05-06 Thread via GitHub


dao-jun commented on PR #22610:
URL: https://github.com/apache/pulsar/pull/22610#issuecomment-2095648740

   @lhotari PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-05-04 Thread via GitHub


dao-jun commented on PR #22610:
URL: https://github.com/apache/pulsar/pull/22610#issuecomment-2094575724

   > Regarding [#22572 
(comment)](https://github.com/apache/pulsar/pull/22572#discussion_r1582980339), 
to further reduce code duplication, start 
`internalAsyncReverseFindPositionOneByOne` method with this part
   > 
   > ```
   > if (!ledger.isValidPosition(previousPosition)) {
   > future.complete(previousPosition);
   > return;
   > }
   > ```
   > 
   > and then this logic can be removed from `asyncGetLastValidPosition` and 
the `readEntryComplete` callback.
   
   addressed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-05-04 Thread via GitHub


dao-jun commented on code in PR #22610:
URL: https://github.com/apache/pulsar/pull/22610#discussion_r1590204833


##
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java:
##
@@ -112,17 +109,7 @@
 import 
org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.*;

Review Comment:
   addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-05-03 Thread via GitHub


lhotari commented on code in PR #22610:
URL: https://github.com/apache/pulsar/pull/22610#discussion_r1588783677


##
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java:
##
@@ -112,17 +109,7 @@
 import 
org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.*;

Review Comment:
   The code style doesn't seem to match [the Pulsar code 
style](https://pulsar.apache.org/contribute/setup-ide/#configure-code-style).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-05-02 Thread via GitHub


codecov-commenter commented on PR #22610:
URL: https://github.com/apache/pulsar/pull/22610#issuecomment-2091952874

   ## 
[Codecov](https://app.codecov.io/gh/apache/pulsar/pull/22610?dropdown=coverage=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 Report
   Attention: Patch coverage is `90.0%` with `1 lines` in your changes are 
missing coverage. Please review.
   > Project coverage is 72.71%. Comparing base 
[(`bbc6224`)](https://app.codecov.io/gh/apache/pulsar/commit/bbc62245c5ddba1de4b1e7cee4ab49334bc36277?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 to head 
[(`59d227f`)](https://app.codecov.io/gh/apache/pulsar/pull/22610?dropdown=coverage=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache).
   > Report is 218 commits behind head on master.
   
   Additional details and impacted files
   
   
   [![Impacted file tree 
graph](https://app.codecov.io/gh/apache/pulsar/pull/22610/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/22610?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
   
   ```diff
   @@ Coverage Diff  @@
   ## master   #22610  +/-   ##
   
   - Coverage 73.57%   72.71%   -0.86% 
   + Complexity3262432532  -92 
   
 Files  1877 1887  +10 
 Lines139502   141004+1502 
 Branches  1529915477 +178 
   
   - Hits 102638   102534 -104 
   - Misses2890830603+1695 
   + Partials   7956 7867  -89 
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/pulsar/pull/22610/flags?src=pr=flags_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[inttests](https://app.codecov.io/gh/apache/pulsar/pull/22610/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `27.40% <40.00%> (+2.82%)` | :arrow_up: |
   | 
[systests](https://app.codecov.io/gh/apache/pulsar/pull/22610/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `24.84% <40.00%> (+0.51%)` | :arrow_up: |
   | 
[unittests](https://app.codecov.io/gh/apache/pulsar/pull/22610/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `71.46% <90.00%> (-1.39%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   | 
[Files](https://app.codecov.io/gh/apache/pulsar/pull/22610?dropdown=coverage=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[...ookkeeper/mledger/util/ManagedLedgerImplUtils.java](https://app.codecov.io/gh/apache/pulsar/pull/22610?src=pr=tree=managed-ledger%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbookkeeper%2Fmledger%2Futil%2FManagedLedgerImplUtils.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-bWFuYWdlZC1sZWRnZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2Jvb2trZWVwZXIvbWxlZGdlci91dGlsL01hbmFnZWRMZWRnZXJJbXBsVXRpbHMuamF2YQ==)
 | `68.18% <100.00%> (ø)` | |
   | 
[...sar/broker/service/persistent/PersistentTopic.java](https://app.codecov.io/gh/apache/pulsar/pull/22610?src=pr=tree=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2Fpersistent%2FPersistentTopic.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvUGVyc2lzdGVudFRvcGljLmphdmE=)
 | `79.06% <88.88%> (+0.60%)` | :arrow_up: |
   
   ... and [342 files with indirect coverage 
changes](https://app.codecov.io/gh/apache/pulsar/pull/22610/indirect-changes?src=pr=tree-more_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-04-29 Thread via GitHub


shibd commented on PR #22610:
URL: https://github.com/apache/pulsar/pull/22610#issuecomment-2082536013

   > @codelipenghui @shibd @coderzc It looks like Shared/key_Shared Consumer 
also could be stuck from delayed delivery messages, WDYT? Do we need to handle 
it?
   
   I prefer case by case fix it and adding enough unit tests to cover every 
case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-04-29 Thread via GitHub


dao-jun commented on code in PR #22610:
URL: https://github.com/apache/pulsar/pull/22610#discussion_r1582756633


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -3647,7 +3647,14 @@ public CompletableFuture 
getLastDispatchablePosition() {
 return 
ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, 
entry -> {
 MessageMetadata md = 
Commands.parseMessageMetadata(entry.getDataBuffer());
 // If a messages has marker will filter by 
AbstractBaseDispatcher.filterEntriesForConsumer
-return !Markers.isServerOnlyMarker(md);
+if (Markers.isServerOnlyMarker(md)) {
+return false;
+} else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
+// Filter-out transaction aborted messages.
+TxnID txnID = new TxnID(md.getTxnidMostBits(), 
md.getTxnidLeastBits());
+return !isTxnAborted(txnID, (PositionImpl) 
entry.getPosition());

Review Comment:
   @thetumbled @liangyepianzhou It's a further improvement, I'm going to read 
more entries from ledger(not one by one) to optimize it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-04-29 Thread via GitHub


liangyepianzhou commented on code in PR #22610:
URL: https://github.com/apache/pulsar/pull/22610#discussion_r1582747773


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -3647,7 +3647,14 @@ public CompletableFuture 
getLastDispatchablePosition() {
 return 
ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, 
entry -> {
 MessageMetadata md = 
Commands.parseMessageMetadata(entry.getDataBuffer());
 // If a messages has marker will filter by 
AbstractBaseDispatcher.filterEntriesForConsumer
-return !Markers.isServerOnlyMarker(md);
+if (Markers.isServerOnlyMarker(md)) {
+return false;
+} else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
+// Filter-out transaction aborted messages.
+TxnID txnID = new TxnID(md.getTxnidMostBits(), 
md.getTxnidLeastBits());
+return !isTxnAborted(txnID, (PositionImpl) 
entry.getPosition());

Review Comment:
   >@coderzc Yes, But we are unable to do more optimizations, TransactionBuffer 
didn't record aborted messages' position.
   
   But we can try to record somethings(maybe a map for ongoing transaction id 
-> max position) in the TransactionBuffer to resolve this issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-04-29 Thread via GitHub


liangyepianzhou commented on code in PR #22610:
URL: https://github.com/apache/pulsar/pull/22610#discussion_r1582747773


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -3647,7 +3647,14 @@ public CompletableFuture 
getLastDispatchablePosition() {
 return 
ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, 
entry -> {
 MessageMetadata md = 
Commands.parseMessageMetadata(entry.getDataBuffer());
 // If a messages has marker will filter by 
AbstractBaseDispatcher.filterEntriesForConsumer
-return !Markers.isServerOnlyMarker(md);
+if (Markers.isServerOnlyMarker(md)) {
+return false;
+} else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
+// Filter-out transaction aborted messages.
+TxnID txnID = new TxnID(md.getTxnidMostBits(), 
md.getTxnidLeastBits());
+return !isTxnAborted(txnID, (PositionImpl) 
entry.getPosition());

Review Comment:
   >@coderzc Yes, But we are unable to do more optimizations, TransactionBuffer 
didn't record aborted messages' position.
   
   But we can try to record somethings in the TransactionBuffer to resolve this 
issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-04-29 Thread via GitHub


thetumbled commented on code in PR #22610:
URL: https://github.com/apache/pulsar/pull/22610#discussion_r1582730866


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -3647,7 +3647,14 @@ public CompletableFuture 
getLastDispatchablePosition() {
 return 
ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, 
entry -> {
 MessageMetadata md = 
Commands.parseMessageMetadata(entry.getDataBuffer());
 // If a messages has marker will filter by 
AbstractBaseDispatcher.filterEntriesForConsumer
-return !Markers.isServerOnlyMarker(md);
+if (Markers.isServerOnlyMarker(md)) {
+return false;
+} else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
+// Filter-out transaction aborted messages.
+TxnID txnID = new TxnID(md.getTxnidMostBits(), 
md.getTxnidLeastBits());
+return !isTxnAborted(txnID, (PositionImpl) 
entry.getPosition());

Review Comment:
   Same in https://github.com/apache/pulsar/pull/21951.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-04-29 Thread via GitHub


dao-jun commented on PR #22610:
URL: https://github.com/apache/pulsar/pull/22610#issuecomment-2081965882

   @codelipenghui @shibd @coderzc It looks like Shared/key_Shared Consumer also 
could be stuck from delayed delivery messages, WDYT? Do we need to handle it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-04-28 Thread via GitHub


dao-jun commented on PR #22610:
URL: https://github.com/apache/pulsar/pull/22610#issuecomment-2081887272

   @coderzc PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-04-28 Thread via GitHub


dao-jun commented on PR #22610:
URL: https://github.com/apache/pulsar/pull/22610#issuecomment-2081870869

   @coderzc Oh, I understand you, if the maxReadPosition is also an aborted 
message, the reader can be stuck too. It makes sense, I'll improve test to 
cover the case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-04-28 Thread via GitHub


coderzc commented on PR #22610:
URL: https://github.com/apache/pulsar/pull/22610#issuecomment-2081864440

   > > It seems we don't need `maxReadPosition.compareTo((PositionImpl) 
getLastPosition()) != 0` judgment condition, since `maxReadPosition` also may 
not be a valid position for transaction topic event if `maxReadPosition < 
lastPosition`.
   > 
   > I don't understand, why?
   
   @dao-jun `maxReadPosition` is just the position in front of the ongoing 
transaction. It can only ensure that this is not a pending transaction 
position. but it's not necessarily a normal message.
   
   See: 
https://github.com/apache/pulsar/blob/264722f1da9ab806c9a79196c091bfe4d03b3090/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java#L447-L459


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-04-28 Thread via GitHub


dao-jun commented on PR #22610:
URL: https://github.com/apache/pulsar/pull/22610#issuecomment-2081860890

   > It seems we don't need `maxReadPosition.compareTo((PositionImpl) 
getLastPosition()) != 0` judgment condition, since `maxReadPosition` also may 
not be a valid position for transaction topic event if `maxReadPosition < 
lastPosition`.
   
   I don't understand, why?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-04-28 Thread via GitHub


dao-jun commented on code in PR #22610:
URL: https://github.com/apache/pulsar/pull/22610#discussion_r1582520899


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -3647,7 +3647,14 @@ public CompletableFuture 
getLastDispatchablePosition() {
 return 
ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, 
entry -> {
 MessageMetadata md = 
Commands.parseMessageMetadata(entry.getDataBuffer());
 // If a messages has marker will filter by 
AbstractBaseDispatcher.filterEntriesForConsumer
-return !Markers.isServerOnlyMarker(md);
+if (Markers.isServerOnlyMarker(md)) {
+return false;
+} else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
+// Filter-out transaction aborted messages.
+TxnID txnID = new TxnID(md.getTxnidMostBits(), 
md.getTxnidLeastBits());
+return !isTxnAborted(txnID, (PositionImpl) 
entry.getPosition());

Review Comment:
   @coderzc Yes, But we are unable to do more optimizations, TransactionBuffer 
didn't record aborted messages' position.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-04-28 Thread via GitHub


coderzc commented on code in PR #22610:
URL: https://github.com/apache/pulsar/pull/22610#discussion_r1582519352


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -3647,7 +3647,14 @@ public CompletableFuture 
getLastDispatchablePosition() {
 return 
ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, 
entry -> {
 MessageMetadata md = 
Commands.parseMessageMetadata(entry.getDataBuffer());
 // If a messages has marker will filter by 
AbstractBaseDispatcher.filterEntriesForConsumer
-return !Markers.isServerOnlyMarker(md);
+if (Markers.isServerOnlyMarker(md)) {
+return false;
+} else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
+// Filter-out transaction aborted messages.
+TxnID txnID = new TxnID(md.getTxnidMostBits(), 
md.getTxnidLeastBits());
+return !isTxnAborted(txnID, (PositionImpl) 
entry.getPosition());

Review Comment:
   This will search forward one by one until a valid position is found, this 
performance is very poor if the transaction has a lot of abort messages. Need 
more reviewer advice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-04-28 Thread via GitHub


coderzc commented on code in PR #22610:
URL: https://github.com/apache/pulsar/pull/22610#discussion_r1582519352


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -3647,7 +3647,14 @@ public CompletableFuture 
getLastDispatchablePosition() {
 return 
ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, 
entry -> {
 MessageMetadata md = 
Commands.parseMessageMetadata(entry.getDataBuffer());
 // If a messages has marker will filter by 
AbstractBaseDispatcher.filterEntriesForConsumer
-return !Markers.isServerOnlyMarker(md);
+if (Markers.isServerOnlyMarker(md)) {
+return false;
+} else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
+// Filter-out transaction aborted messages.
+TxnID txnID = new TxnID(md.getTxnidMostBits(), 
md.getTxnidLeastBits());
+return !isTxnAborted(txnID, (PositionImpl) 
entry.getPosition());

Review Comment:
   This will search forward one by one until a valid position is found, this 
performance very low if the transaction has a lot of abort messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-04-28 Thread via GitHub


dao-jun commented on PR #22610:
URL: https://github.com/apache/pulsar/pull/22610#issuecomment-2081756364

   @shibd Hi baodi, I've addressed your comment, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-04-28 Thread via GitHub


shibd commented on PR #22610:
URL: https://github.com/apache/pulsar/pull/22610#issuecomment-2081749388

   hi, @tjiuming Thanks for your PR.
   
   I thinks we can change this test to cover all related transactions get 
lastmessage id case.
   
   
https://github.com/apache/pulsar/blob/a761b97b733142b1ade525e1d1c06785e98face1/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java#L264
   
   Change to:
   
   ```java
   @Test
   public void testGetLastMessageIdsWithOngoingTransactions() throws 
Exception {
   // 1. Prepare environment
   String topic = "persistent://" + NAMESPACE1 + 
"/testGetLastMessageIdsWithOngoingTransactions";
   String subName = "my-subscription";
   @Cleanup
   Producer producer = pulsarClient.newProducer()
   .topic(topic)
   .create();
   Consumer consumer = pulsarClient.newConsumer()
   .topic(topic)
   .subscriptionName(subName)
   .subscribe();
   
   // 2. Test last max read position can be required correctly.
   // 2.1 Case1: send 3 original messages. |1:0|1:1|1:2|
   MessageIdImpl expectedLastMessageID = null;
   for (int i = 0; i < 3; i++) {
   expectedLastMessageID = (MessageIdImpl) 
producer.newMessage().send();
   }
   assertMessageId(consumer, expectedLastMessageID);
   // 2.2 Case2: send 2 ongoing transactional messages and 2 original 
messages.
   // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5.
   Transaction txn1 = pulsarClient.newTransaction()
   .withTransactionTimeout(5, TimeUnit.HOURS)
   .build()
   .get();
   Transaction txn2 = pulsarClient.newTransaction()
   .withTransactionTimeout(5, TimeUnit.HOURS)
   .build()
   .get();
   
   // |1:0|1:1|1:2|txn1:1:3|
   producer.newMessage(txn1).send();
   
   // |1:0|1:1|1:2|txn1:1:3|1:4|
   MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) 
producer.newMessage().send();
   
   // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|
   producer.newMessage(txn2).send();
   
   // 2.2.1 Last message ID will not change when txn1 and txn2 do not 
end.
   assertMessageId(consumer, expectedLastMessageID);
   
   // 2.2.2 Last message ID will update to 1:4 when txn1 committed.
   // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|
   txn1.commit().get(5, TimeUnit.SECONDS);
   assertMessageId(consumer, expectedLastMessageID1);
   
   // 2.2.3 Last message ID will still to 1:4 when txn2 aborted.
   // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|tx2:abort->1:7|
   txn2.abort().get(5, TimeUnit.SECONDS);
   assertMessageId(consumer, expectedLastMessageID1);
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [fix] Fix Reader can be stuck from transaction aborted messages. [pulsar]

2024-04-28 Thread via GitHub


dao-jun opened a new pull request, #22610:
URL: https://github.com/apache/pulsar/pull/22610

   ### Motivation
   
   Fix Reader can be stuck from transaction aborted messages.
   
   ### Modifications
   
   
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org