On Mon, Nov 28, 2022 at 3:19 PM Dilip Kumar <dilipbal...@gmail.com> wrote:
>
> On Mon, Nov 28, 2022 at 1:46 PM shiy.f...@fujitsu.com
> <shiy.f...@fujitsu.com> wrote:
> >
> > Thanks for your patch.
> >
> > I saw that the patch added a check when selecting largest transaction, but 
> > in
> > addition to ReorderBufferCheckMemoryLimit(), the transaction can also be
> > streamed in ReorderBufferProcessPartialChange(). Should we add the check in
> > this function, too?
> >
> > diff --git a/src/backend/replication/logical/reorderbuffer.c 
> > b/src/backend/replication/logical/reorderbuffer.c
> > index 9a58c4bfb9..108737b02f 100644
> > --- a/src/backend/replication/logical/reorderbuffer.c
> > +++ b/src/backend/replication/logical/reorderbuffer.c
> > @@ -768,7 +768,8 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, 
> > ReorderBufferTXN *txn,
> >          */
> >         if (ReorderBufferCanStartStreaming(rb) &&
> >                 !(rbtxn_has_partial_change(toptxn)) &&
> > -               rbtxn_is_serialized(txn))
> > +               rbtxn_is_serialized(txn) &&
> > +               rbtxn_has_streamable_change(txn))
> >                 ReorderBufferStreamTXN(rb, toptxn);
> >  }
>
> You are right we need this in ReorderBufferProcessPartialChange() as
> well.  I will fix this in the next version.

Fixed this in the attached patch.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
From d00e15b37dbfa0152d32800e3d4dd0982962b6f8 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilip.ku...@enterprisedb.com>
Date: Fri, 25 Nov 2022 13:11:44 +0530
Subject: [PATCH v3] Fix thinko in when to stream a transaction

Actually, during DecodeCommit() for skipping a transaction we use
ReadRecPtr to check whether to skip this transaction or not.  Whereas
in ReorderBufferCanStartStreaming() we use EndRecPtr to check whether
to stream or not. Generally it will not create a problem but if the
commit record itslef is adding some changes to the transaction(e.g. snapshot)
and if the start_decoding_at is in between ReadRecPtr and EndRecPtr then
streaming will decide to stream the transaction where as DecodeCommit will
decide to skip it.  And for handling this case in ReorderBufferForget() we
call stream_abort() in order to abort any streamed changes.  So ideally if
we are planning to skip the transaction we should never stream it hence there
is no need to stream abort such transaction in case of skip.

Along with that we also skip the transaction if the transaction dbid is not same
slot dbid or it is filtered by origin id.  So in corner cases it is possible that
we might stream the transaction but later it will be skipped in DecodeCommit.

For fixing that do not select any transaction for streaming unless there is
any streamable change and if there is any streamable change then we can safely
select it for streaming as it will not be skipped by DecodeCommit.
---
 src/backend/replication/logical/reorderbuffer.c | 39 ++++++++++++++++++++-----
 src/include/replication/reorderbuffer.h         | 23 ++++++++++-----
 2 files changed, 47 insertions(+), 15 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 31f7381..a581ab5 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -753,7 +753,7 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 	/*
 	 * Stream the transaction if it is serialized before and the changes are
-	 * now complete in the top-level transaction.
+	 * now complete in the top-level transaction and it has a streamable change
 	 *
 	 * The reason for doing the streaming of such a transaction as soon as we
 	 * get the complete change for it is that previously it would have reached
@@ -762,7 +762,8 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	 */
 	if (ReorderBufferCanStartStreaming(rb) &&
 		!(rbtxn_has_partial_change(toptxn)) &&
-		rbtxn_is_serialized(txn))
+		rbtxn_is_serialized(txn) &&
+		rbtxn_has_streamable_change(txn))
 		ReorderBufferStreamTXN(rb, toptxn);
 }
 
@@ -793,6 +794,30 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 		return;
 	}
 
+	/*
+	 * If there are any streamable changes getting queued then get the top
+	 * transaction and mark it has streamable change.  This is required for
+	 * streaming in-progress transactions, the in-progress transaction will
+	 * not be selected for streaming unless it has at least one streamable
+	 * change.
+	 */
+	if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
+		change->action == REORDER_BUFFER_CHANGE_UPDATE ||
+		change->action == REORDER_BUFFER_CHANGE_DELETE ||
+		change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT ||
+		change->action == REORDER_BUFFER_CHANGE_TRUNCATE)
+	{
+		ReorderBufferTXN *toptxn;
+
+		/* get the top transaction */
+		if (txn->toptxn != NULL)
+			toptxn = txn->toptxn;
+		else
+			toptxn = txn;
+
+		toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE;
+	}
+
 	change->lsn = lsn;
 	change->txn = txn;
 
@@ -2942,9 +2967,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 	if (txn == NULL)
 		return;
 
-	/* For streamed transactions notify the remote node about the abort. */
-	if (rbtxn_is_streamed(txn))
-		rb->stream_abort(rb, txn, lsn);
+	/* the transaction which is being skipped shouldn't have been streamed */
+	Assert(!rbtxn_is_streamed(txn));
 
 	/* cosmetic... */
 	txn->final_lsn = lsn;
@@ -3502,7 +3526,8 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb)
 		Assert(txn->base_snapshot != NULL);
 
 		if ((largest == NULL || txn->total_size > largest_size) &&
-			(txn->total_size > 0) && !(rbtxn_has_partial_change(txn)))
+			(txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
+			rbtxn_has_streamable_change(txn))
 		{
 			largest = txn;
 			largest_size = txn->total_size;
@@ -3919,7 +3944,7 @@ ReorderBufferCanStartStreaming(ReorderBuffer *rb)
 	 * restarting.
 	 */
 	if (ReorderBufferCanStream(rb) &&
-		!SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
+		!SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
 		return true;
 
 	return false;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index b23d8cc..9766c9f 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -168,14 +168,15 @@ typedef struct ReorderBufferChange
 } ReorderBufferChange;
 
 /* ReorderBufferTXN txn_flags */
-#define RBTXN_HAS_CATALOG_CHANGES 0x0001
-#define RBTXN_IS_SUBXACT          0x0002
-#define RBTXN_IS_SERIALIZED       0x0004
-#define RBTXN_IS_SERIALIZED_CLEAR 0x0008
-#define RBTXN_IS_STREAMED         0x0010
-#define RBTXN_HAS_PARTIAL_CHANGE  0x0020
-#define RBTXN_PREPARE             0x0040
-#define RBTXN_SKIPPED_PREPARE	  0x0080
+#define RBTXN_HAS_CATALOG_CHANGES 	0x0001
+#define RBTXN_IS_SUBXACT          	0x0002
+#define RBTXN_IS_SERIALIZED       	0x0004
+#define RBTXN_IS_SERIALIZED_CLEAR 	0x0008
+#define RBTXN_IS_STREAMED         	0x0010
+#define RBTXN_HAS_PARTIAL_CHANGE  	0x0020
+#define RBTXN_PREPARE             	0x0040
+#define RBTXN_SKIPPED_PREPARE	  	0x0080
+#define RBTXN_HAS_STREAMABLE_CHANGE	0x0100
 
 /* Does the transaction have catalog changes? */
 #define rbtxn_has_catalog_changes(txn) \
@@ -207,6 +208,12 @@ typedef struct ReorderBufferChange
 	((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \
 )
 
+/* Has this transaction contains streamable change? */
+#define rbtxn_has_streamable_change(txn) \
+( \
+	((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \
+)
+
 /*
  * Has this transaction been streamed to downstream?
  *
-- 
1.8.3.1

Reply via email to