On Mon, Dec 5, 2022 at 9:21 AM Dilip Kumar <dilipbal...@gmail.com> wrote: > > On Mon, Dec 5, 2022 at 8:59 AM Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > On Sun, Dec 4, 2022 at 5:14 PM houzj.f...@fujitsu.com > > <houzj.f...@fujitsu.com> wrote: > > > > > > On Saturday, December 3, 2022 7:37 PM Amit Kapila > > > <amit.kapil...@gmail.com> wrote: > > > > Apart from the above, I have slightly adjusted the comments in the > > > > attached. Do > > > > let me know what you think of the attached. > > > > > > Thanks for updating the patch. It looks good to me. > > > > > > > I feel the function name ReorderBufferLargestTopTXN() is slightly > > misleading because it also checks some of the streaming properties > > (like whether the TXN has partial changes and whether it contains any > > streamable change). Shall we rename it to > > ReorderBufferLargestStreamableTopTXN() or something like that? > > Yes that makes sense
I have done this change in the attached patch. > > The other point to consider is whether we need to have a test case for > > this patch. I think before this patch if the size of DDL changes in a > > transaction exceeds logical_decoding_work_mem, the empty streams will > > be output in the plugin but after this patch, there won't be any such > > stream. I tried this test, but I think generating 64k data with just CID messages will make the test case really big. I tried using multiple sessions such that one session makes the reorder buffer full but contains partial changes so that we try to stream another transaction but that is not possible in an automated test to consistently generate the partial change. I think we need something like this[1] so that we can better control the streaming. [1]https://www.postgresql.org/message-id/OSZPR01MB631042582805A8E8615BC413FD329%40OSZPR01MB6310.jpnprd01.prod.outlook.com -- Regards, Dilip Kumar EnterpriseDB: http://www.enterprisedb.com
From d15b33d74952a78c4050ed7f1235bbb675643478 Mon Sep 17 00:00:00 2001 From: Amit Kapila <akap...@postgresql.org> Date: Sat, 3 Dec 2022 12:11:29 +0530 Subject: [PATCH v5] Avoid unnecessary streaming of transactions during logical replication. After restart, we don't perform streaming of an in-progress transaction if it was previously decoded and confirmed by client. To achieve that we were comparing the END location of the WAL record being decoded with the WAL location we have already decoded and confirmed by the client. While decoding the commit record, to decide whether to process and send the complete transaction, we compare its START location with the WAL location we have already decoded and confirmed by the client. Now, if we need to queue some change in the transaction while decoding the commit record (e.g. snapshot), it is possible that we decide to stream the transaction but later commit processing decides to skip it. In such a case, we would needlessly send the changes and later when we decide to skip it, we will send stream abort. We also sometimes decide to stream the changes when we actually just need to process them locally like a change for invalidations. This will lead us to send empty streams. To avoid this, while queuing each change for decoding, we remember whether the transaction has any change that actually needs to be sent downstream and use that information later to decide whether to stream the transaction or not. Author: Dilip Kumar Reviewed-by: Hou Zhijie, Ashutosh Bapat, Shi yu, Amit Kapila Discussion: https://postgr.es/m/CAFiTN-tHK=7lzfrps8fbt2ksrojgqbzywcgxst2bm9-rjja...@mail.gmail.com --- src/backend/replication/logical/reorderbuffer.c | 51 ++++++++++++++++++------- src/include/replication/reorderbuffer.h | 23 +++++++---- 2 files changed, 53 insertions(+), 21 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 31f7381..6e11056 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -695,9 +695,9 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, * Record the partial change for the streaming of in-progress transactions. We * can stream only complete changes so if we have a partial change like toast * table insert or speculative insert then we mark such a 'txn' so that it - * can't be streamed. We also ensure that if the changes in such a 'txn' are - * above logical_decoding_work_mem threshold then we stream them as soon as we - * have a complete change. + * can't be streamed. We also ensure that if the changes in such a 'txn' can + * be streamed and are above logical_decoding_work_mem threshold then we stream + * them as soon as we have a complete change. */ static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, @@ -762,7 +762,8 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, */ if (ReorderBufferCanStartStreaming(rb) && !(rbtxn_has_partial_change(toptxn)) && - rbtxn_is_serialized(txn)) + rbtxn_is_serialized(txn) && + rbtxn_has_streamable_change(toptxn)) ReorderBufferStreamTXN(rb, toptxn); } @@ -793,6 +794,29 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, return; } + /* + * The changes that are sent downstream are considered streamable. We + * remember such transactions so that only those will later be considered + * for streaming. + */ + if (change->action == REORDER_BUFFER_CHANGE_INSERT || + change->action == REORDER_BUFFER_CHANGE_UPDATE || + change->action == REORDER_BUFFER_CHANGE_DELETE || + change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT || + change->action == REORDER_BUFFER_CHANGE_TRUNCATE || + change->action == REORDER_BUFFER_CHANGE_MESSAGE) + { + ReorderBufferTXN *toptxn; + + /* get the top transaction */ + if (txn->toptxn != NULL) + toptxn = txn->toptxn; + else + toptxn = txn; + + toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE; + } + change->lsn = lsn; change->txn = txn; @@ -2942,9 +2966,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) if (txn == NULL) return; - /* For streamed transactions notify the remote node about the abort. */ - if (rbtxn_is_streamed(txn)) - rb->stream_abort(rb, txn, lsn); + /* this transaction mustn't be streamed */ + Assert(!rbtxn_is_streamed(txn)); /* cosmetic... */ txn->final_lsn = lsn; @@ -3460,14 +3483,15 @@ ReorderBufferLargestTXN(ReorderBuffer *rb) } /* - * Find the largest toplevel transaction to evict (by streaming). + * Find the largest streamable toplevel transaction to evict (by streaming). * * This can be seen as an optimized version of ReorderBufferLargestTXN, which * should give us the same transaction (because we don't update memory account * for subtransaction with streaming, so it's always 0). But we can simply * iterate over the limited number of toplevel transactions that have a base * snapshot. There is no use of selecting a transaction that doesn't have base - * snapshot because we don't decode such transactions. + * snapshot because we don't decode such transactions. Also we do not select + * the transaction which doesn't have any streamable change. * * Note that, we skip transactions that contains incomplete changes. There * is a scope of optimization here such that we can select the largest @@ -3483,7 +3507,7 @@ ReorderBufferLargestTXN(ReorderBuffer *rb) * the subxact from where we streamed the last change. */ static ReorderBufferTXN * -ReorderBufferLargestTopTXN(ReorderBuffer *rb) +ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb) { dlist_iter iter; Size largest_size = 0; @@ -3502,7 +3526,8 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb) Assert(txn->base_snapshot != NULL); if ((largest == NULL || txn->total_size > largest_size) && - (txn->total_size > 0) && !(rbtxn_has_partial_change(txn))) + (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) && + rbtxn_has_streamable_change(txn)) { largest = txn; largest_size = txn->total_size; @@ -3547,7 +3572,7 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) * memory by streaming, if possible. Otherwise, spill to disk. */ if (ReorderBufferCanStartStreaming(rb) && - (txn = ReorderBufferLargestTopTXN(rb)) != NULL) + (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL) { /* we know there has to be one, because the size is not zero */ Assert(txn && !txn->toptxn); @@ -3919,7 +3944,7 @@ ReorderBufferCanStartStreaming(ReorderBuffer *rb) * restarting. */ if (ReorderBufferCanStream(rb) && - !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr)) + !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr)) return true; return false; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index b23d8cc..c700b55 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -168,14 +168,15 @@ typedef struct ReorderBufferChange } ReorderBufferChange; /* ReorderBufferTXN txn_flags */ -#define RBTXN_HAS_CATALOG_CHANGES 0x0001 -#define RBTXN_IS_SUBXACT 0x0002 -#define RBTXN_IS_SERIALIZED 0x0004 -#define RBTXN_IS_SERIALIZED_CLEAR 0x0008 -#define RBTXN_IS_STREAMED 0x0010 -#define RBTXN_HAS_PARTIAL_CHANGE 0x0020 -#define RBTXN_PREPARE 0x0040 -#define RBTXN_SKIPPED_PREPARE 0x0080 +#define RBTXN_HAS_CATALOG_CHANGES 0x0001 +#define RBTXN_IS_SUBXACT 0x0002 +#define RBTXN_IS_SERIALIZED 0x0004 +#define RBTXN_IS_SERIALIZED_CLEAR 0x0008 +#define RBTXN_IS_STREAMED 0x0010 +#define RBTXN_HAS_PARTIAL_CHANGE 0x0020 +#define RBTXN_PREPARE 0x0040 +#define RBTXN_SKIPPED_PREPARE 0x0080 +#define RBTXN_HAS_STREAMABLE_CHANGE 0x0100 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -207,6 +208,12 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \ ) +/* Does this transaction contain streamable changes? */ +#define rbtxn_has_streamable_change(txn) \ +( \ + ((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \ +) + /* * Has this transaction been streamed to downstream? * -- 1.8.3.1