On Sun, Apr 27, 2025 at 2:07 PM Masahiko Sawada wrote:
> 
> On Sat, Apr 26, 2025 at 5:01 AM Amit Kapila <amit.kapil...@gmail.com>
> wrote:
> >
> > On Sat, Apr 26, 2025 at 12:01 AM Masahiko Sawada
> <sawada.m...@gmail.com> wrote:
> > >
> > > On Fri, Apr 25, 2025 at 4:42 AM Amit Kapila <amit.kapil...@gmail.com>
> wrote:
> > > >
> > > > Can you think of any better ideas?
> > >
> > > No idea. Hmm, there seems no reasonable way to fix this issue for
> > > back branches. I consented to the view that these costs were
> > > something that we should have paid from the beginning.
> > >
> >
> > Right, I feel we should go with the simple change proposed by Hou-San
> > for now to fix the bug. If, in the future, we encounter any cases
> > where such optimizations can help for fast-forward mode, then we can
> > consider it. Does that sound reasonable to you?
> 
> Yes, agreed with this approach.

+1. Thanks for the discussion!

Here is the V2 patch (including both HEAD and back-branch versions)
which merged Amit's suggestions for the comments. It can pass regression
and pgindent check.

I also adjusted the commit message to mention the commit f49a80c4
as suggested by Amit.

Best Regards,
Hou zj

Attachment: v2-0001-HEAD-PG17-Fix-premature-xmin-advancement-during-fast-forwar.patch
Description: v2-0001-HEAD-PG17-Fix-premature-xmin-advancement-during-fast-forwar.patch

From 16ae28e3b3b4c10bd82ad679194325eb56aa3965 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.f...@cn.fujitsu.com>
Date: Sun, 27 Apr 2025 14:53:35 +0800
Subject: [PATCH v2] Fix premature xmin advancement during fast forward
 decoding

Currently, fast-forward decoding could prematurely advance catalog_xmin,
resulting in required catalog data being removed by vacuum.

Commit f49a80c4 fixed a similar issue where the logical slot's catalog_xmin was
advanced prematurely during non-fast-forward mode, which involved using the
minimum transaction ID remaining visible in the base snapshot when determining
candidates for catalog_xmin. However, this fix did not apply to fast-forward
mode, as a base snapshot is not built when decoding changes in this mode.
Consequently, catalog_xmin was directly advanced to the oldest running
transaction ID found in the latest running_xacts record.

To resolve this, the commit allows the base snapshot to be built during fast
forward decoding.
---
 .../test_decoding/expected/oldest_xmin.out    | 41 +++++++++++++++++++
 contrib/test_decoding/specs/oldest_xmin.spec  |  5 +++
 src/backend/replication/logical/decode.c      | 38 +++++++++++------
 3 files changed, 71 insertions(+), 13 deletions(-)

diff --git a/contrib/test_decoding/expected/oldest_xmin.out 
b/contrib/test_decoding/expected/oldest_xmin.out
index dd6053f9c1f..57268b38d33 100644
--- a/contrib/test_decoding/expected/oldest_xmin.out
+++ b/contrib/test_decoding/expected/oldest_xmin.out
@@ -38,3 +38,44 @@ COMMIT
 stop    
 (1 row)
 
+
+starting permutation: s0_begin s0_getxid s1_begin s1_insert s0_alter s0_commit 
s0_checkpoint s0_advance_slot s0_advance_slot s1_commit s0_vacuum s0_get_changes
+step s0_begin: BEGIN;
+step s0_getxid: SELECT pg_current_xact_id() IS NULL;
+?column?
+--------
+f       
+(1 row)
+
+step s1_begin: BEGIN;
+step s1_insert: INSERT INTO harvest VALUES ((1, 2, 3));
+step s0_alter: ALTER TYPE basket DROP ATTRIBUTE mangos;
+step s0_commit: COMMIT;
+step s0_checkpoint: CHECKPOINT;
+step s0_advance_slot: SELECT slot_name FROM 
pg_replication_slot_advance('isolation_slot', pg_current_wal_lsn());
+slot_name     
+--------------
+isolation_slot
+(1 row)
+
+step s0_advance_slot: SELECT slot_name FROM 
pg_replication_slot_advance('isolation_slot', pg_current_wal_lsn());
+slot_name     
+--------------
+isolation_slot
+(1 row)
+
+step s1_commit: COMMIT;
+step s0_vacuum: VACUUM pg_attribute;
+step s0_get_changes: SELECT data FROM 
pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 
'skip-empty-xacts', '1');
+data                                                  
+------------------------------------------------------
+BEGIN                                                 
+table public.harvest: INSERT: fruits[basket]:'(1,2,3)'
+COMMIT                                                
+(3 rows)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/oldest_xmin.spec 
b/contrib/test_decoding/specs/oldest_xmin.spec
index 88bd30f5ff7..7f2fe3d7ed7 100644
--- a/contrib/test_decoding/specs/oldest_xmin.spec
+++ b/contrib/test_decoding/specs/oldest_xmin.spec
@@ -25,6 +25,7 @@ step "s0_commit" { COMMIT; }
 step "s0_checkpoint" { CHECKPOINT; }
 step "s0_vacuum" { VACUUM pg_attribute; }
 step "s0_get_changes" { SELECT data FROM 
pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 
'skip-empty-xacts', '1'); }
+step "s0_advance_slot" { SELECT slot_name FROM 
pg_replication_slot_advance('isolation_slot', pg_current_wal_lsn()); }
 
 session "s1"
 setup { SET synchronous_commit=on; }
@@ -40,3 +41,7 @@ step "s1_commit" { COMMIT; }
 # will be removed (xmax set) before T1 commits. That is, interlocking doesn't
 # forbid modifying catalog after someone read it (and didn't commit yet).
 permutation "s0_begin" "s0_getxid" "s1_begin" "s1_insert" "s0_alter" 
"s0_commit" "s0_checkpoint" "s0_get_changes" "s0_get_changes" "s1_commit" 
"s0_vacuum" "s0_get_changes"
+
+# Perform the same testing process as described above, but use advance_slot to
+# forces xmin advancement during fast forward decoding.
+permutation "s0_begin" "s0_getxid" "s1_begin" "s1_insert" "s0_alter" 
"s0_commit" "s0_checkpoint" "s0_advance_slot" "s0_advance_slot" "s1_commit" 
"s0_vacuum" "s0_get_changes"
diff --git a/src/backend/replication/logical/decode.c 
b/src/backend/replication/logical/decode.c
index 297eb11b5a8..9c2f1f17c7f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -362,20 +362,24 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, 
XLogRecordBuffer *buf)
 
        /*
         * If we don't have snapshot or we are just fast-forwarding, there is no
-        * point in decoding changes.
+        * point in decoding data changes. However, it's crucial to build the 
base
+        * snapshot during fast-forward mode (as is done in
+        * SnapBuildProcessChange()) because we require the snapshot's xmin when
+        * determining the candidate catalog_xmin for the replication slot. See
+        * SnapBuildProcessRunningXacts().
         */
-       if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
-               ctx->fast_forward)
+       if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
                return;
 
        switch (info)
        {
                case XLOG_HEAP2_MULTI_INSERT:
-                       if (!ctx->fast_forward &&
-                               SnapBuildProcessChange(builder, xid, 
buf->origptr))
+                       if (SnapBuildProcessChange(builder, xid, buf->origptr) 
&&
+                               !ctx->fast_forward)
                                DecodeMultiInsert(ctx, buf);
                        break;
                case XLOG_HEAP2_NEW_CID:
+                       if (!ctx->fast_forward)
                        {
                                xl_heap_new_cid *xlrec;
 
@@ -422,16 +426,20 @@ DecodeHeapOp(LogicalDecodingContext *ctx, 
XLogRecordBuffer *buf)
 
        /*
         * If we don't have snapshot or we are just fast-forwarding, there is no
-        * point in decoding data changes.
+        * point in decoding data changes. However, it's crucial to build the 
base
+        * snapshot during fast-forward mode (as is done in
+        * SnapBuildProcessChange()) because we require the snapshot's xmin when
+        * determining the candidate catalog_xmin for the replication slot. See
+        * SnapBuildProcessRunningXacts().
         */
-       if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
-               ctx->fast_forward)
+       if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
                return;
 
        switch (info)
        {
                case XLOG_HEAP_INSERT:
-                       if (SnapBuildProcessChange(builder, xid, buf->origptr))
+                       if (SnapBuildProcessChange(builder, xid, buf->origptr) 
&&
+                               !ctx->fast_forward)
                                DecodeInsert(ctx, buf);
                        break;
 
@@ -442,17 +450,20 @@ DecodeHeapOp(LogicalDecodingContext *ctx, 
XLogRecordBuffer *buf)
                         */
                case XLOG_HEAP_HOT_UPDATE:
                case XLOG_HEAP_UPDATE:
-                       if (SnapBuildProcessChange(builder, xid, buf->origptr))
+                       if (SnapBuildProcessChange(builder, xid, buf->origptr) 
&&
+                               !ctx->fast_forward)
                                DecodeUpdate(ctx, buf);
                        break;
 
                case XLOG_HEAP_DELETE:
-                       if (SnapBuildProcessChange(builder, xid, buf->origptr))
+                       if (SnapBuildProcessChange(builder, xid, buf->origptr) 
&&
+                               !ctx->fast_forward)
                                DecodeDelete(ctx, buf);
                        break;
 
                case XLOG_HEAP_TRUNCATE:
-                       if (SnapBuildProcessChange(builder, xid, buf->origptr))
+                       if (SnapBuildProcessChange(builder, xid, buf->origptr) 
&&
+                               !ctx->fast_forward)
                                DecodeTruncate(ctx, buf);
                        break;
 
@@ -480,7 +491,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf)
                        break;
 
                case XLOG_HEAP_CONFIRM:
-                       if (SnapBuildProcessChange(builder, xid, buf->origptr))
+                       if (SnapBuildProcessChange(builder, xid, buf->origptr) 
&&
+                               !ctx->fast_forward)
                                DecodeSpecConfirm(ctx, buf);
                        break;
 
-- 
2.30.0.windows.2

Reply via email to