On Mon, Nov 28, 2022 at 3:19 PM Dilip Kumar <dilipbal...@gmail.com> wrote: > > On Mon, Nov 28, 2022 at 1:46 PM shiy.f...@fujitsu.com > <shiy.f...@fujitsu.com> wrote: > > > > Thanks for your patch. > > > > I saw that the patch added a check when selecting largest transaction, but > > in > > addition to ReorderBufferCheckMemoryLimit(), the transaction can also be > > streamed in ReorderBufferProcessPartialChange(). Should we add the check in > > this function, too? > > > > diff --git a/src/backend/replication/logical/reorderbuffer.c > > b/src/backend/replication/logical/reorderbuffer.c > > index 9a58c4bfb9..108737b02f 100644 > > --- a/src/backend/replication/logical/reorderbuffer.c > > +++ b/src/backend/replication/logical/reorderbuffer.c > > @@ -768,7 +768,8 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, > > ReorderBufferTXN *txn, > > */ > > if (ReorderBufferCanStartStreaming(rb) && > > !(rbtxn_has_partial_change(toptxn)) && > > - rbtxn_is_serialized(txn)) > > + rbtxn_is_serialized(txn) && > > + rbtxn_has_streamable_change(txn)) > > ReorderBufferStreamTXN(rb, toptxn); > > } > > You are right we need this in ReorderBufferProcessPartialChange() as > well. I will fix this in the next version.
Fixed this in the attached patch. -- Regards, Dilip Kumar EnterpriseDB: http://www.enterprisedb.com
From d00e15b37dbfa0152d32800e3d4dd0982962b6f8 Mon Sep 17 00:00:00 2001 From: Dilip Kumar <dilip.ku...@enterprisedb.com> Date: Fri, 25 Nov 2022 13:11:44 +0530 Subject: [PATCH v3] Fix thinko in when to stream a transaction Actually, during DecodeCommit() for skipping a transaction we use ReadRecPtr to check whether to skip this transaction or not. Whereas in ReorderBufferCanStartStreaming() we use EndRecPtr to check whether to stream or not. Generally it will not create a problem but if the commit record itslef is adding some changes to the transaction(e.g. snapshot) and if the start_decoding_at is in between ReadRecPtr and EndRecPtr then streaming will decide to stream the transaction where as DecodeCommit will decide to skip it. And for handling this case in ReorderBufferForget() we call stream_abort() in order to abort any streamed changes. So ideally if we are planning to skip the transaction we should never stream it hence there is no need to stream abort such transaction in case of skip. Along with that we also skip the transaction if the transaction dbid is not same slot dbid or it is filtered by origin id. So in corner cases it is possible that we might stream the transaction but later it will be skipped in DecodeCommit. For fixing that do not select any transaction for streaming unless there is any streamable change and if there is any streamable change then we can safely select it for streaming as it will not be skipped by DecodeCommit. --- src/backend/replication/logical/reorderbuffer.c | 39 ++++++++++++++++++++----- src/include/replication/reorderbuffer.h | 23 ++++++++++----- 2 files changed, 47 insertions(+), 15 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 31f7381..a581ab5 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -753,7 +753,7 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, /* * Stream the transaction if it is serialized before and the changes are - * now complete in the top-level transaction. + * now complete in the top-level transaction and it has a streamable change * * The reason for doing the streaming of such a transaction as soon as we * get the complete change for it is that previously it would have reached @@ -762,7 +762,8 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, */ if (ReorderBufferCanStartStreaming(rb) && !(rbtxn_has_partial_change(toptxn)) && - rbtxn_is_serialized(txn)) + rbtxn_is_serialized(txn) && + rbtxn_has_streamable_change(txn)) ReorderBufferStreamTXN(rb, toptxn); } @@ -793,6 +794,30 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, return; } + /* + * If there are any streamable changes getting queued then get the top + * transaction and mark it has streamable change. This is required for + * streaming in-progress transactions, the in-progress transaction will + * not be selected for streaming unless it has at least one streamable + * change. + */ + if (change->action == REORDER_BUFFER_CHANGE_INSERT || + change->action == REORDER_BUFFER_CHANGE_UPDATE || + change->action == REORDER_BUFFER_CHANGE_DELETE || + change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT || + change->action == REORDER_BUFFER_CHANGE_TRUNCATE) + { + ReorderBufferTXN *toptxn; + + /* get the top transaction */ + if (txn->toptxn != NULL) + toptxn = txn->toptxn; + else + toptxn = txn; + + toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE; + } + change->lsn = lsn; change->txn = txn; @@ -2942,9 +2967,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) if (txn == NULL) return; - /* For streamed transactions notify the remote node about the abort. */ - if (rbtxn_is_streamed(txn)) - rb->stream_abort(rb, txn, lsn); + /* the transaction which is being skipped shouldn't have been streamed */ + Assert(!rbtxn_is_streamed(txn)); /* cosmetic... */ txn->final_lsn = lsn; @@ -3502,7 +3526,8 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb) Assert(txn->base_snapshot != NULL); if ((largest == NULL || txn->total_size > largest_size) && - (txn->total_size > 0) && !(rbtxn_has_partial_change(txn))) + (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) && + rbtxn_has_streamable_change(txn)) { largest = txn; largest_size = txn->total_size; @@ -3919,7 +3944,7 @@ ReorderBufferCanStartStreaming(ReorderBuffer *rb) * restarting. */ if (ReorderBufferCanStream(rb) && - !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr)) + !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr)) return true; return false; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index b23d8cc..9766c9f 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -168,14 +168,15 @@ typedef struct ReorderBufferChange } ReorderBufferChange; /* ReorderBufferTXN txn_flags */ -#define RBTXN_HAS_CATALOG_CHANGES 0x0001 -#define RBTXN_IS_SUBXACT 0x0002 -#define RBTXN_IS_SERIALIZED 0x0004 -#define RBTXN_IS_SERIALIZED_CLEAR 0x0008 -#define RBTXN_IS_STREAMED 0x0010 -#define RBTXN_HAS_PARTIAL_CHANGE 0x0020 -#define RBTXN_PREPARE 0x0040 -#define RBTXN_SKIPPED_PREPARE 0x0080 +#define RBTXN_HAS_CATALOG_CHANGES 0x0001 +#define RBTXN_IS_SUBXACT 0x0002 +#define RBTXN_IS_SERIALIZED 0x0004 +#define RBTXN_IS_SERIALIZED_CLEAR 0x0008 +#define RBTXN_IS_STREAMED 0x0010 +#define RBTXN_HAS_PARTIAL_CHANGE 0x0020 +#define RBTXN_PREPARE 0x0040 +#define RBTXN_SKIPPED_PREPARE 0x0080 +#define RBTXN_HAS_STREAMABLE_CHANGE 0x0100 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -207,6 +208,12 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \ ) +/* Has this transaction contains streamable change? */ +#define rbtxn_has_streamable_change(txn) \ +( \ + ((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \ +) + /* * Has this transaction been streamed to downstream? * -- 1.8.3.1