On Tue, Oct 10, 2023 at 6:17 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
>  DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
>     Oid txn_dbid, RepOriginId origin_id)
>  {
> - return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
> - (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
> - ctx->fast_forward || FilterByOrigin(ctx, origin_id));
> + bool need_skip;
> +
> + need_skip = (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
> + (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
> + ctx->decoding_mode != DECODING_MODE_NORMAL ||
> + FilterByOrigin(ctx, origin_id));
> +
> + /* Set a flag if we are in the slient mode */
> + if (ctx->decoding_mode == DECODING_MODE_SILENT)
> + ctx->output_skipped = true;
> +
> + return need_skip;
>
> I think you need to set the new flag only when we are not skipping the
> transaction or in other words when we decide to process the
> transaction. Otherwise, how will you distinguish the case where the
> xact is already decoded and sent to client?
>

In the attached patch atop your v47*, I have changed it to show you
what I have in mind.

A few more comments:
=================
1.
+
+ /*
+ * Did the logical decoding context skip outputting any changes?
+ *
+ * This flag is used only when the context is in the silent mode.
+ */
+ bool output_skipped;
 } LogicalDecodingContext;

This doesn't seem to convey the meaning to the caller. How about
processing_required? BTW, I have made this change as well in the
patch.

2.
@@ -295,7 +295,7 @@ xact_decode(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf)
*/
if (TransactionIdIsValid(xid))
{
- if (!ctx->fast_forward)
+ if (ctx->decoding_mode != DECODING_MODE_FAST_FORWARD)
ReorderBufferAddInvalidations(reorder, xid,
  buf->origptr,
  invals->nmsgs,
@@ -303,7 +303,7 @@ xact_decode(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf)
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
  buf->origptr);
}
- else if ((!ctx->fast_forward))
+ else if (ctx->decoding_mode != DECODING_MODE_FAST_FORWARD)
ReorderBufferImmediateInvalidation(ctx->reorder,
   invals->nmsgs,
   invals->msgs);

We don't to execute the invalidations even in silent mode. Looking at
this and other changes in the patch related to silent mode, I wonder
whether we really need to introduce 'silent_mode'. Can't we simply set
processing_required when 'fast_forward' mode is true and then let the
caller decide whether it needs to further process the WAL?

-- 
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/decode.c 
b/src/backend/replication/logical/decode.c
index 6de54153f7..f3c561d8ed 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -631,7 +631,7 @@ logicalmsg_decode(LogicalDecodingContext *ctx, 
XLogRecordBuffer *buf)
        if (ctx->decoding_mode == DECODING_MODE_SILENT &&
                !message->transactional)
        {
-               ctx->output_skipped = true;
+               ctx->processing_required = true;
                return;
        }
 
@@ -1294,8 +1294,6 @@ DecodeXLogTuple(char *data, Size len, 
ReorderBufferTupleBuf *tuple)
  * 2) The transaction happened in another database.
  * 3) The output plugin is not interested in the origin.
  * 4) We are not in the normal decoding mode.
- *
- * Also, set output_skipped flag if we are in the slient mode.
  */
 static bool
 DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
@@ -1308,9 +1306,15 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, 
XLogRecordBuffer *buf,
                                 ctx->decoding_mode != DECODING_MODE_NORMAL ||
                                 FilterByOrigin(ctx, origin_id));
 
-       /* Set a flag if we are in the slient mode */
+       if (need_skip)
+               return true;
+
+       /*
+        * We don't need to process the transaction in silent mode. Indicate the
+        * same via LogicalDecodingContext, so that the caller can skip 
processing.
+        */
        if (ctx->decoding_mode == DECODING_MODE_SILENT)
-               ctx->output_skipped = true;
+               ctx->processing_required = true;
 
-       return need_skip;
+       return true;
 }
diff --git a/src/backend/replication/logical/logical.c 
b/src/backend/replication/logical/logical.c
index 0f4b1c6323..e47f2ebd7c 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -2030,7 +2030,7 @@ DecodingContextHasdecodedItems(LogicalDecodingContext 
*ctx,
        InvalidateSystemCaches();
 
        /* Loop until the end of WAL or some changes are processed */
-       while (!ctx->output_skipped && ctx->reader->EndRecPtr < end_of_wal)
+       while (!ctx->processing_required && ctx->reader->EndRecPtr < end_of_wal)
        {
                XLogRecord *record;
                char       *errm = NULL;
@@ -2046,5 +2046,5 @@ DecodingContextHasdecodedItems(LogicalDecodingContext 
*ctx,
                CHECK_FOR_INTERRUPTS();
        }
 
-       return ctx->output_skipped;
+       return ctx->processing_required;
 }
diff --git a/src/include/replication/logical.h 
b/src/include/replication/logical.h
index d0f9dda6c5..94cc631a5b 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -128,12 +128,8 @@ typedef struct LogicalDecodingContext
        /* Are we processing the end LSN of a transaction? */
        bool            end_xact;
 
-       /*
-        * Did the logical decoding context skip outputting any changes?
-        *
-        * This flag is used only when the context is in the silent mode.
-        */
-       bool            output_skipped;
+       /* Do we need to process any change in silent decoding mode? */
+       bool            processing_required;
 } LogicalDecodingContext;
 
 

Reply via email to