On Tue, Oct 10, 2023 at 6:17 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, > Oid txn_dbid, RepOriginId origin_id) > { > - return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || > - (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) || > - ctx->fast_forward || FilterByOrigin(ctx, origin_id)); > + bool need_skip; > + > + need_skip = (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || > + (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) || > + ctx->decoding_mode != DECODING_MODE_NORMAL || > + FilterByOrigin(ctx, origin_id)); > + > + /* Set a flag if we are in the slient mode */ > + if (ctx->decoding_mode == DECODING_MODE_SILENT) > + ctx->output_skipped = true; > + > + return need_skip; > > I think you need to set the new flag only when we are not skipping the > transaction or in other words when we decide to process the > transaction. Otherwise, how will you distinguish the case where the > xact is already decoded and sent to client? >
In the attached patch atop your v47*, I have changed it to show you what I have in mind. A few more comments: ================= 1. + + /* + * Did the logical decoding context skip outputting any changes? + * + * This flag is used only when the context is in the silent mode. + */ + bool output_skipped; } LogicalDecodingContext; This doesn't seem to convey the meaning to the caller. How about processing_required? BTW, I have made this change as well in the patch. 2. @@ -295,7 +295,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ if (TransactionIdIsValid(xid)) { - if (!ctx->fast_forward) + if (ctx->decoding_mode != DECODING_MODE_FAST_FORWARD) ReorderBufferAddInvalidations(reorder, xid, buf->origptr, invals->nmsgs, @@ -303,7 +303,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); } - else if ((!ctx->fast_forward)) + else if (ctx->decoding_mode != DECODING_MODE_FAST_FORWARD) ReorderBufferImmediateInvalidation(ctx->reorder, invals->nmsgs, invals->msgs); We don't to execute the invalidations even in silent mode. Looking at this and other changes in the patch related to silent mode, I wonder whether we really need to introduce 'silent_mode'. Can't we simply set processing_required when 'fast_forward' mode is true and then let the caller decide whether it needs to further process the WAL? -- With Regards, Amit Kapila.
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 6de54153f7..f3c561d8ed 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -631,7 +631,7 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (ctx->decoding_mode == DECODING_MODE_SILENT && !message->transactional) { - ctx->output_skipped = true; + ctx->processing_required = true; return; } @@ -1294,8 +1294,6 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) * 2) The transaction happened in another database. * 3) The output plugin is not interested in the origin. * 4) We are not in the normal decoding mode. - * - * Also, set output_skipped flag if we are in the slient mode. */ static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, @@ -1308,9 +1306,15 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, ctx->decoding_mode != DECODING_MODE_NORMAL || FilterByOrigin(ctx, origin_id)); - /* Set a flag if we are in the slient mode */ + if (need_skip) + return true; + + /* + * We don't need to process the transaction in silent mode. Indicate the + * same via LogicalDecodingContext, so that the caller can skip processing. + */ if (ctx->decoding_mode == DECODING_MODE_SILENT) - ctx->output_skipped = true; + ctx->processing_required = true; - return need_skip; + return true; } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 0f4b1c6323..e47f2ebd7c 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -2030,7 +2030,7 @@ DecodingContextHasdecodedItems(LogicalDecodingContext *ctx, InvalidateSystemCaches(); /* Loop until the end of WAL or some changes are processed */ - while (!ctx->output_skipped && ctx->reader->EndRecPtr < end_of_wal) + while (!ctx->processing_required && ctx->reader->EndRecPtr < end_of_wal) { XLogRecord *record; char *errm = NULL; @@ -2046,5 +2046,5 @@ DecodingContextHasdecodedItems(LogicalDecodingContext *ctx, CHECK_FOR_INTERRUPTS(); } - return ctx->output_skipped; + return ctx->processing_required; } diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index d0f9dda6c5..94cc631a5b 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -128,12 +128,8 @@ typedef struct LogicalDecodingContext /* Are we processing the end LSN of a transaction? */ bool end_xact; - /* - * Did the logical decoding context skip outputting any changes? - * - * This flag is used only when the context is in the silent mode. - */ - bool output_skipped; + /* Do we need to process any change in silent decoding mode? */ + bool processing_required; } LogicalDecodingContext;