Dear hackers,

Attached patch set contains proper commit message. It briefly describes the 
background
and handlings. Regarding the PG13, the same commit message is used for 0001.
0002 is still rough.

Renamed backpatches to .txt, to make cfbot happy.

Thanks Hou working for it.

Best regards,
Hayato Kuroda
FUJITSU LIMITED


From 09c4c9482290a126f716482132813c9b0c09abc8 Mon Sep 17 00:00:00 2001
From: Shlok Kyal <shlok.kyal....@gmail.com>
Date: Fri, 23 Aug 2024 14:02:20 +0530
Subject: [PATCH v20_REL_13 1/2] Fix data loss in logical replication

Previously, logical replication could lose data if one user modified a
publication to add a table while another user concurrently modified that table
and commit later than the publication modification transaction. The issue
arised during the decoding of transactions modifying the table: if the initial
catalog cache was built using a snapshot taken before the publication DDL
execution, all subsequent changes to that table were decoded with outdated
catalog cache, which caused them to be filtered from replication. This happened
because invalidation messages were only present in the publication modification
transaction, which was decoded before these subsequent changes.

This issue is not limited to publication DDLs; similar problems can occur with
ALTER TYPE statements executed concurrently with DMLs, leading to incorrect
decoding under outdated type contexts.

To address this, the commit improves logical decoding by ensuring that
invalidation messages from catalog-modifying transactions are distributed to
all concurrent in-progress transactions. This allows the necessary rebuild of
the catalog cache when decoding new changes, similar to handling historic
catalog snapshots (see SnapBuildDistributeNewCatalogSnapshot()).

Following this change, some performance regression is observed, primarily
during frequent execution of publication DDL statements that modify published
tables. This is an expected trade-off due to cache rebuild and distribution
overhead. The regression is minor or nearly nonexistent when DDLs do not affect
published tables or occur infrequently, making this a worthwhile cost to
resolve a longstanding data loss issue.

An alternative approach considered was to take a strong lock on each affected
table during publication modification. However, this would only address issues
related to publication DDLs and require locking every relation in the database
for publications created as FOR ALL TABLES, which is impractical. Thus, this
commit chooses to distribute invalidation messages as outlined above.
---
 contrib/test_decoding/Makefile                |  2 +-
 .../expected/invalidation_distrubution.out    | 20 ++++++
 .../specs/invalidation_distrubution.spec      | 32 ++++++++++
 .../replication/logical/reorderbuffer.c       | 64 ++++++++++++++++---
 src/backend/replication/logical/snapbuild.c   | 63 ++++++++++++++----
 src/include/replication/reorderbuffer.h       |  4 ++
 6 files changed, 164 insertions(+), 21 deletions(-)
 create mode 100644 contrib/test_decoding/expected/invalidation_distrubution.out
 create mode 100644 contrib/test_decoding/specs/invalidation_distrubution.spec

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 735b7e7653c..f122dc3a82d 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
        spill slot truncate
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
        oldest_xmin snapshot_transfer subxact_without_top 
catalog_change_snapshot \
-       skip_snapshot_restore
+       skip_snapshot_restore invalidation_distrubution
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/invalidation_distrubution.out 
b/contrib/test_decoding/expected/invalidation_distrubution.out
new file mode 100644
index 00000000000..eb70eda9042
--- /dev/null
+++ b/contrib/test_decoding/expected/invalidation_distrubution.out
@@ -0,0 +1,20 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 
s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s1_begin: BEGIN;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
+step s1_commit: COMMIT;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_get_binary_changes: SELECT count(data) FROM 
pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 
'proto_version', '1', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
+count
+-----
+    1
+(1 row)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/invalidation_distrubution.spec 
b/contrib/test_decoding/specs/invalidation_distrubution.spec
new file mode 100644
index 00000000000..ca051fc1e85
--- /dev/null
+++ b/contrib/test_decoding/specs/invalidation_distrubution.spec
@@ -0,0 +1,32 @@
+# Test that catalog cache invalidation messages are distributed to ongoing
+# transactions, ensuring they can access the updated catalog content after
+# processing these messages.
+setup
+{
+    SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 
'pgoutput');
+    CREATE TABLE tbl1(val1 integer, val2 integer);
+    CREATE PUBLICATION pub;
+}
+
+teardown
+{
+    DROP TABLE tbl1;
+    DROP PUBLICATION pub;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s1"
+setup { SET synchronous_commit=on; }
+
+step "s1_begin" { BEGIN; }
+step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); }
+step "s1_commit" { COMMIT; }
+
+session "s2"
+setup { SET synchronous_commit=on; }
+
+step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; }
+step "s2_get_binary_changes" { SELECT count(data) FROM 
pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 
'proto_version', '1', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; 
}
+
+# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
+permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" 
"s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes"
diff --git a/src/backend/replication/logical/reorderbuffer.c 
b/src/backend/replication/logical/reorderbuffer.c
index 56c25e3a6da..fa9413fa2a0 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2264,20 +2264,45 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, 
TransactionId xid,
                                                          
SharedInvalidationMessage *msgs)
 {
        ReorderBufferTXN *txn;
+       MemoryContext oldcontext;
 
        txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
-       if (txn->ninvalidations != 0)
-               elog(ERROR, "only ever add one set of invalidations");
+       oldcontext = MemoryContextSwitchTo(rb->context);
+
+       /*
+        * Collect all the invalidations under the top transaction, if 
available,
+        * so that we can execute them all together.
+        */
+       if (txn->toplevel_xid)
+       {
+               txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, true, NULL, 
lsn,
+                                                                       true);
+       }
 
        Assert(nmsgs > 0);
 
-       txn->ninvalidations = nmsgs;
-       txn->invalidations = (SharedInvalidationMessage *)
-               MemoryContextAlloc(rb->context,
-                                                  
sizeof(SharedInvalidationMessage) * nmsgs);
-       memcpy(txn->invalidations, msgs,
-                  sizeof(SharedInvalidationMessage) * nmsgs);
+       /* Accumulate invalidations. */
+       if (txn->ninvalidations == 0)
+       {
+               txn->ninvalidations = nmsgs;
+               txn->invalidations = (SharedInvalidationMessage *)
+                       palloc(sizeof(SharedInvalidationMessage) * nmsgs);
+               memcpy(txn->invalidations, msgs,
+                          sizeof(SharedInvalidationMessage) * nmsgs);
+       }
+       else
+       {
+               txn->invalidations = (SharedInvalidationMessage *)
+                       repalloc(txn->invalidations, 
sizeof(SharedInvalidationMessage) *
+                                        (txn->ninvalidations + nmsgs));
+
+               memcpy(txn->invalidations + txn->ninvalidations, msgs,
+                          nmsgs * sizeof(SharedInvalidationMessage));
+               txn->ninvalidations += nmsgs;
+       }
+
+       MemoryContextSwitchTo(oldcontext);
 }
 
 /*
@@ -3895,3 +3920,26 @@ restart:
                *cmax = ent->cmax;
        return true;
 }
+
+/*
+ * Count invalidation messages of specified transaction.
+ *
+ * Returns number of messages, and msgs is set to the pointer of the linked
+ * list for the messages.
+ */
+uint32
+ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid,
+                                                         
SharedInvalidationMessage **msgs)
+{
+       ReorderBufferTXN *txn;
+
+       txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+                                                               false);
+
+       if (txn == NULL)
+               return 0;
+
+       *msgs = txn->invalidations;
+
+       return txn->ninvalidations;
+}
diff --git a/src/backend/replication/logical/snapbuild.c 
b/src/backend/replication/logical/snapbuild.c
index 7546de96763..3bda41c5251 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -292,7 +292,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap);
 
 static void SnapBuildSnapIncRefcount(Snapshot snap);
 
-static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn);
+static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr 
lsn, TransactionId xid);
 
 /* xlog reading helper functions for SnapBuildProcessRunningXacts */
 static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, 
xl_running_xacts *running);
@@ -861,15 +861,15 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId 
xid,
 }
 
 /*
- * Add a new Snapshot to all transactions we're decoding that currently are
- * in-progress so they can see new catalog contents made by the transaction
- * that just committed. This is necessary because those in-progress
- * transactions will use the new catalog's contents from here on (at the very
- * least everything they do needs to be compatible with newer catalog
- * contents).
+ * Add a new Snapshot and invalidation messages to all transactions we're
+ * decoding that currently are in-progress so they can see new catalog contents
+ * made by the transaction that just committed. This is necessary because those
+ * in-progress transactions will use the new catalog's contents from here on
+ * (at the very least everything they do needs to be compatible with newer
+ * catalog contents).
  */
 static void
-SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
+SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, 
TransactionId xid)
 {
        dlist_iter      txn_i;
        ReorderBufferTXN *txn;
@@ -877,7 +877,8 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn)
        /*
         * Iterate through all toplevel transactions. This can include
         * subtransactions which we just don't yet know to be that, but that's
-        * fine, they will just get an unnecessary snapshot queued.
+        * fine, they will just get an unnecessary snapshot and invalidations
+        * queued.
         */
        dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
        {
@@ -890,6 +891,14 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn)
                 * transaction which in turn implies we don't yet need a 
snapshot at
                 * all. We'll add a snapshot when the first change gets queued.
                 *
+                * Similarly, we don't need to add invalidations to a 
transaction whose
+                * base snapshot is not yet set. Once a base snapshot is built, 
it will
+                * include the xids of committed transactions that have 
modified the
+                * catalog, thus reflecting the new catalog contents. The 
existing
+                * catalog cache will have already been invalidated after 
processing
+                * the invalidations in the transaction that modified catalogs,
+                * ensuring that a fresh cache is constructed during decoding.
+                *
                 * NB: This works correctly even for subtransactions because
                 * ReorderBufferAssignChild() takes care to transfer the base 
snapshot
                 * to the top-level transaction, and while iterating the 
changequeue
@@ -898,7 +907,7 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn)
                if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, 
txn->xid))
                        continue;
 
-               elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
+               elog(DEBUG2, "adding a new snapshot and invalidations to %u at 
%X/%X",
                         txn->xid, (uint32) (lsn >> 32), (uint32) lsn);
 
                /*
@@ -908,6 +917,33 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn)
                SnapBuildSnapIncRefcount(builder->snapshot);
                ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
                                                                 
builder->snapshot);
+
+               /*
+                * Add invalidation messages to the reorder buffer of 
in-progress
+                * transactions except the current committed transaction, for 
which we
+                * will execute invalidations at the end.
+                *
+                * It is required, otherwise, we will end up using the stale 
catcache
+                * contents built by the current transaction even after its 
decoding,
+                * which should have been invalidated due to concurrent catalog
+                * changing transaction.
+                */
+               if (txn->xid != xid)
+               {
+                       uint32 ninvalidations;
+                       SharedInvalidationMessage *msgs = NULL;
+
+                       ninvalidations = 
ReorderBufferGetInvalidations(builder->reorder,
+                                                                               
                                   xid, &msgs);
+
+                       if (ninvalidations > 0)
+                       {
+                               Assert(msgs != NULL);
+
+                               ReorderBufferAddInvalidations(builder->reorder, 
txn->xid, lsn,
+                                                                               
          ninvalidations, msgs);
+                       }
+               }
        }
 }
 
@@ -1186,8 +1222,11 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, 
TransactionId xid,
                /* refcount of the snapshot builder for the new snapshot */
                SnapBuildSnapIncRefcount(builder->snapshot);
 
-               /* add a new catalog snapshot to all currently running 
transactions */
-               SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
+               /*
+                * Add a new catalog snapshot and invalidations messages to all
+                * currently running transactions.
+                */
+               SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
        }
 }
 
diff --git a/src/include/replication/reorderbuffer.h 
b/src/include/replication/reorderbuffer.h
index 5347597e92b..545cee891ed 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -463,6 +463,10 @@ TransactionId ReorderBufferGetOldestXmin(ReorderBuffer 
*rb);
 
 void           ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
+uint32         ReorderBufferGetInvalidations(ReorderBuffer *rb,
+                                                                               
  TransactionId xid,
+                                                                               
  SharedInvalidationMessage **msgs);
+
 void           StartupReorderBuffer(void);
 
 #endif
-- 
2.43.5

From 23bb871f617e7085b5ea869e2a83dcb6d38c91aa Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hay...@fujitsu.com>
Date: Mon, 17 Mar 2025 11:25:49 +0900
Subject: [PATCH v20_REL_13 2/2] Backpatch introducing invalidation messages in
 ReorderBufferChangeType

---
 .../replication/logical/reorderbuffer.c       | 72 +++++++++++++++++--
 src/include/replication/reorderbuffer.h       | 16 ++++-
 2 files changed, 80 insertions(+), 8 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c 
b/src/backend/replication/logical/reorderbuffer.c
index fa9413fa2a0..697b45675a6 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -220,7 +220,7 @@ static void ReorderBufferIterTXNInit(ReorderBuffer *rb, 
ReorderBufferTXN *txn,
 static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, 
ReorderBufferIterTXNState *state);
 static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
                                                                           
ReorderBufferIterTXNState *state);
-static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, 
ReorderBufferTXN *txn);
+static void ReorderBufferExecuteInvalidations(uint32 nmsgs, 
SharedInvalidationMessage *msgs);
 
 /*
  * ---------------------------------------
@@ -484,6 +484,11 @@ ReorderBufferReturnChange(ReorderBuffer *rb, 
ReorderBufferChange *change)
                case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
                case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
                        break;
+               case REORDER_BUFFER_CHANGE_INVALIDATION:
+                       if (change->data.inval.invalidations)
+                               pfree(change->data.inval.invalidations);
+                       change->data.inval.invalidations = NULL;
+                       break;
        }
 
        pfree(change);
@@ -1883,7 +1888,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
                                                 * see new catalog contents, so 
execute all
                                                 * invalidations.
                                                 */
-                                               
ReorderBufferExecuteInvalidations(rb, txn);
+                                               
ReorderBufferExecuteInvalidations(txn->ninvalidations,
+                                                                               
                                  txn->invalidations);
                                        }
 
                                        break;
@@ -1891,6 +1897,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
                                case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
                                        elog(ERROR, "tuplecid value in 
changequeue");
                                        break;
+                               case REORDER_BUFFER_CHANGE_INVALIDATION:
+                                       /* Execute the invalidation messages 
locally */
+                                       
ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations,
+                                                                               
                          change->data.inval.invalidations);
                        }
                }
 
@@ -1921,7 +1931,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
                AbortCurrentTransaction();
 
                /* make sure there's no cache pollution */
-               ReorderBufferExecuteInvalidations(rb, txn);
+               ReorderBufferExecuteInvalidations(txn->ninvalidations, 
txn->invalidations);
 
                if (using_subtxn)
                        RollbackAndReleaseCurrentSubTransaction();
@@ -1947,7 +1957,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
                AbortCurrentTransaction();
 
                /* make sure there's no cache pollution */
-               ReorderBufferExecuteInvalidations(rb, txn);
+               ReorderBufferExecuteInvalidations(txn->ninvalidations, 
txn->invalidations);
 
                if (using_subtxn)
                        RollbackAndReleaseCurrentSubTransaction();
@@ -2265,6 +2275,7 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, 
TransactionId xid,
 {
        ReorderBufferTXN *txn;
        MemoryContext oldcontext;
+       ReorderBufferChange *change;
 
        txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
@@ -2302,6 +2313,16 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, 
TransactionId xid,
                txn->ninvalidations += nmsgs;
        }
 
+       change = ReorderBufferGetChange(rb);
+       change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
+       change->data.inval.ninvalidations = nmsgs;
+       change->data.inval.invalidations = (SharedInvalidationMessage *)
+               palloc(sizeof(SharedInvalidationMessage) * nmsgs);
+       memcpy(change->data.inval.invalidations, msgs,
+                  sizeof(SharedInvalidationMessage) * nmsgs);
+
+       ReorderBufferQueueChange(rb, xid, lsn, change);
+
        MemoryContextSwitchTo(oldcontext);
 }
 
@@ -2310,12 +2331,12 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, 
TransactionId xid,
  * in the changestream but we don't know which those are.
  */
 static void
-ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
+ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage 
*msgs)
 {
        int                     i;
 
-       for (i = 0; i < txn->ninvalidations; i++)
-               LocalExecuteInvalidationMessage(&txn->invalidations[i]);
+       for (i = 0; i < nmsgs; i++)
+               LocalExecuteInvalidationMessage(&msgs[i]);
 }
 
 /*
@@ -2725,6 +2746,24 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, 
ReorderBufferTXN *txn,
                case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
                        /* ReorderBufferChange contains everything important */
                        break;
+               case REORDER_BUFFER_CHANGE_INVALIDATION:
+               {
+                       char       *data;
+                       Size            inval_size = 
sizeof(SharedInvalidationMessage) *
+                               change->data.inval.ninvalidations;
+
+                       sz += inval_size;
+
+                       ReorderBufferSerializeReserve(rb, sz);
+                       data = ((char *) rb->outbuf) + 
sizeof(ReorderBufferDiskChange);
+
+                       /* might have been reallocated above */
+                       ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+                       memcpy(data, change->data.inval.invalidations, 
inval_size);
+                       data += inval_size;
+
+                       break;
+               }
        }
 
        ondisk->size = sz;
@@ -2833,6 +2872,12 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
                case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
                        /* ReorderBufferChange contains everything important */
                        break;
+               case REORDER_BUFFER_CHANGE_INVALIDATION:
+               {
+                       sz += sizeof(SharedInvalidationMessage) *
+                               change->data.inval.ninvalidations;
+                       break;
+               }
        }
 
        return sz;
@@ -3120,6 +3165,19 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, 
ReorderBufferTXN *txn,
                case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
                case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
                        break;
+               case REORDER_BUFFER_CHANGE_INVALIDATION:
+               {
+                       Size            inval_size = 
sizeof(SharedInvalidationMessage) *
+                               change->data.inval.ninvalidations;
+
+                       change->data.inval.invalidations =
+                               MemoryContextAlloc(rb->context, inval_size);
+
+                       /* read the message */
+                       memcpy(change->data.inval.invalidations, data, 
inval_size);
+
+                       break;
+               }
        }
 
        dlist_push_tail(&txn->changes, &change->node);
diff --git a/src/include/replication/reorderbuffer.h 
b/src/include/replication/reorderbuffer.h
index 545cee891ed..dff58a2fd8f 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -63,7 +63,8 @@ enum ReorderBufferChangeType
        REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
        REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
        REORDER_BUFFER_CHANGE_TRUNCATE,
-       REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
+       REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT,
+       REORDER_BUFFER_CHANGE_INVALIDATION
 };
 
 /* forward declaration */
@@ -150,6 +151,13 @@ typedef struct ReorderBufferChange
                        CommandId       cmax;
                        CommandId       combocid;
                }                       tuplecid;
+
+               /* Invalidation. */
+               struct
+               {
+                       uint32          ninvalidations; /* Number of messages */
+                       SharedInvalidationMessage *invalidations;       /* 
invalidation message */
+               }                       inval;
        }                       data;
 
        /*
@@ -467,6 +475,12 @@ uint32             
ReorderBufferGetInvalidations(ReorderBuffer *rb,
                                                                                
  TransactionId xid,
                                                                                
  SharedInvalidationMessage **msgs);
 
+void           ReorderBufferAddInvalidationsForDistribute(ReorderBuffer *,
+                                                                               
                           TransactionId,
+                                                                               
                           XLogRecPtr lsn,
+                                                                               
                           Size nmsgs,
+                                                                               
                           SharedInvalidationMessage *msgs);
+
 void           StartupReorderBuffer(void);
 
 #endif
-- 
2.43.5

From 0c64fcb70e09530de659fbd15d7280bbd44547e7 Mon Sep 17 00:00:00 2001
From: Shlok Kyal <shlok.kyal....@gmail.com>
Date: Fri, 23 Aug 2024 14:02:20 +0530
Subject: [PATCH v20_REL_14] Fix data loss in logical replication

Previously, logical replication could lose data if one user modified a
publication to add a table while another user concurrently modified that table
and commit later than the publication modification transaction. The issue
arised during the decoding of transactions modifying the table: if the initial
catalog cache was built using a snapshot taken before the publication DDL
execution, all subsequent changes to that table were decoded with outdated
catalog cache, which caused them to be filtered from replication. This happened
because invalidation messages were only present in the publication modification
transaction, which was decoded before these subsequent changes.

This issue is not limited to publication DDLs; similar problems can occur with
ALTER TYPE statements executed concurrently with DMLs, leading to incorrect
decoding under outdated type contexts.

To address this, the commit improves logical decoding by ensuring that
invalidation messages from catalog-modifying transactions are distributed to
all concurrent in-progress transactions. This allows the necessary rebuild of
the catalog cache when decoding new changes, similar to handling historic
catalog snapshots (see SnapBuildDistributeNewCatalogSnapshot()).

Following this change, some performance regression is observed, primarily
during frequent execution of publication DDL statements that modify published
tables. This is an expected trade-off due to cache rebuild and distribution
overhead. The regression is minor or nearly nonexistent when DDLs do not affect
published tables or occur infrequently, making this a worthwhile cost to
resolve a longstanding data loss issue.

An alternative approach considered was to take a strong lock on each affected
table during publication modification. However, this would only address issues
related to publication DDLs and require locking every relation in the database
for publications created as FOR ALL TABLES, which is impractical. Thus, this
commit chooses to distribute invalidation messages as outlined above.
---
 contrib/test_decoding/Makefile                |  3 +-
 .../expected/invalidation_distrubution.out    | 20 ++++++
 .../specs/invalidation_distrubution.spec      | 32 +++++++++
 .../replication/logical/reorderbuffer.c       | 23 +++++++
 src/backend/replication/logical/snapbuild.c   | 67 +++++++++++++++----
 src/include/replication/reorderbuffer.h       |  4 ++
 6 files changed, 134 insertions(+), 15 deletions(-)
 create mode 100644 contrib/test_decoding/expected/invalidation_distrubution.out
 create mode 100644 contrib/test_decoding/specs/invalidation_distrubution.spec

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 82ba3f7df11..8b0b8cc3acf 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -8,7 +8,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
        spill slot truncate stream stats twophase twophase_stream
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
        oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
-       twophase_snapshot catalog_change_snapshot skip_snapshot_restore
+       twophase_snapshot catalog_change_snapshot skip_snapshot_restore \
+       invalidation_distrubution
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/invalidation_distrubution.out 
b/contrib/test_decoding/expected/invalidation_distrubution.out
new file mode 100644
index 00000000000..c701e290bb9
--- /dev/null
+++ b/contrib/test_decoding/expected/invalidation_distrubution.out
@@ -0,0 +1,20 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 
s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s1_begin: BEGIN;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
+step s1_commit: COMMIT;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_get_binary_changes: SELECT count(data) FROM 
pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 
'proto_version', '2', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
+count
+-----
+    1
+(1 row)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/invalidation_distrubution.spec 
b/contrib/test_decoding/specs/invalidation_distrubution.spec
new file mode 100644
index 00000000000..b8b14e333a1
--- /dev/null
+++ b/contrib/test_decoding/specs/invalidation_distrubution.spec
@@ -0,0 +1,32 @@
+# Test that catalog cache invalidation messages are distributed to ongoing
+# transactions, ensuring they can access the updated catalog content after
+# processing these messages.
+setup
+{
+    SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 
'pgoutput');
+    CREATE TABLE tbl1(val1 integer, val2 integer);
+    CREATE PUBLICATION pub;
+}
+
+teardown
+{
+    DROP TABLE tbl1;
+    DROP PUBLICATION pub;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s1"
+setup { SET synchronous_commit=on; }
+
+step "s1_begin" { BEGIN; }
+step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); }
+step "s1_commit" { COMMIT; }
+
+session "s2"
+setup { SET synchronous_commit=on; }
+
+step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; }
+step "s2_get_binary_changes" { SELECT count(data) FROM 
pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 
'proto_version', '2', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; 
}
+
+# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
+permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" 
"s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes"
diff --git a/src/backend/replication/logical/reorderbuffer.c 
b/src/backend/replication/logical/reorderbuffer.c
index 64d9baa7982..a52b51a5e7d 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -5196,3 +5196,26 @@ restart:
                *cmax = ent->cmax;
        return true;
 }
+
+/*
+ * Count invalidation messages of specified transaction.
+ *
+ * Returns number of messages, and msgs is set to the pointer of the linked
+ * list for the messages.
+ */
+uint32
+ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid,
+                                                         
SharedInvalidationMessage **msgs)
+{
+       ReorderBufferTXN *txn;
+
+       txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+                                                               false);
+
+       if (txn == NULL)
+               return 0;
+
+       *msgs = txn->invalidations;
+
+       return txn->ninvalidations;
+}
diff --git a/src/backend/replication/logical/snapbuild.c 
b/src/backend/replication/logical/snapbuild.c
index 9d8700147ca..8abd669c51e 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -290,7 +290,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap);
 
 static void SnapBuildSnapIncRefcount(Snapshot snap);
 
-static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn);
+static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr 
lsn, TransactionId xid);
 
 /* xlog reading helper functions for SnapBuildProcessRunningXacts */
 static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, 
xl_running_xacts *running);
@@ -843,15 +843,15 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId 
xid,
 }
 
 /*
- * Add a new Snapshot to all transactions we're decoding that currently are
- * in-progress so they can see new catalog contents made by the transaction
- * that just committed. This is necessary because those in-progress
- * transactions will use the new catalog's contents from here on (at the very
- * least everything they do needs to be compatible with newer catalog
- * contents).
+ * Add a new Snapshot and invalidation messages to all transactions we're
+ * decoding that currently are in-progress so they can see new catalog contents
+ * made by the transaction that just committed. This is necessary because those
+ * in-progress transactions will use the new catalog's contents from here on
+ * (at the very least everything they do needs to be compatible with newer
+ * catalog contents).
  */
 static void
-SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
+SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, 
TransactionId xid)
 {
        dlist_iter      txn_i;
        ReorderBufferTXN *txn;
@@ -859,7 +859,8 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn)
        /*
         * Iterate through all toplevel transactions. This can include
         * subtransactions which we just don't yet know to be that, but that's
-        * fine, they will just get an unnecessary snapshot queued.
+        * fine, they will just get an unnecessary snapshot and invalidations
+        * queued.
         */
        dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
        {
@@ -872,6 +873,14 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn)
                 * transaction which in turn implies we don't yet need a 
snapshot at
                 * all. We'll add a snapshot when the first change gets queued.
                 *
+                * Similarly, we don't need to add invalidations to a 
transaction whose
+                * base snapshot is not yet set. Once a base snapshot is built, 
it will
+                * include the xids of committed transactions that have 
modified the
+                * catalog, thus reflecting the new catalog contents. The 
existing
+                * catalog cache will have already been invalidated after 
processing
+                * the invalidations in the transaction that modified catalogs,
+                * ensuring that a fresh cache is constructed during decoding.
+                *
                 * NB: This works correctly even for subtransactions because
                 * ReorderBufferAssignChild() takes care to transfer the base 
snapshot
                 * to the top-level transaction, and while iterating the 
changequeue
@@ -881,13 +890,13 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn)
                        continue;
 
                /*
-                * We don't need to add snapshot to prepared transactions as 
they
-                * should not see the new catalog contents.
+                * We don't need to add snapshot or invalidations to prepared
+                * transactions as they should not see the new catalog contents.
                 */
                if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn))
                        continue;
 
-               elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
+               elog(DEBUG2, "adding a new snapshot and invalidations to %u at 
%X/%X",
                         txn->xid, LSN_FORMAT_ARGS(lsn));
 
                /*
@@ -897,6 +906,33 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn)
                SnapBuildSnapIncRefcount(builder->snapshot);
                ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
                                                                 
builder->snapshot);
+
+               /*
+                * Add invalidation messages to the reorder buffer of 
in-progress
+                * transactions except the current committed transaction, for 
which we
+                * will execute invalidations at the end.
+                *
+                * It is required, otherwise, we will end up using the stale 
catcache
+                * contents built by the current transaction even after its 
decoding,
+                * which should have been invalidated due to concurrent catalog
+                * changing transaction.
+                */
+               if (txn->xid != xid)
+               {
+                       uint32 ninvalidations;
+                       SharedInvalidationMessage *msgs = NULL;
+
+                       ninvalidations = 
ReorderBufferGetInvalidations(builder->reorder,
+                                                                               
                                   xid, &msgs);
+
+                       if (ninvalidations > 0)
+                       {
+                               Assert(msgs != NULL);
+
+                               ReorderBufferAddInvalidations(builder->reorder, 
txn->xid, lsn,
+                                                                               
          ninvalidations, msgs);
+                       }
+               }
        }
 }
 
@@ -1175,8 +1211,11 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, 
TransactionId xid,
                /* refcount of the snapshot builder for the new snapshot */
                SnapBuildSnapIncRefcount(builder->snapshot);
 
-               /* add a new catalog snapshot to all currently running 
transactions */
-               SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
+               /*
+                * Add a new catalog snapshot and invalidations messages to all
+                * currently running transactions.
+                */
+               SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
        }
 }
 
diff --git a/src/include/replication/reorderbuffer.h 
b/src/include/replication/reorderbuffer.h
index ba257d81b51..d399975e8a5 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -676,6 +676,10 @@ TransactionId ReorderBufferGetOldestXmin(ReorderBuffer 
*rb);
 
 void           ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
+uint32         ReorderBufferGetInvalidations(ReorderBuffer *rb,
+                                                                               
  TransactionId xid,
+                                                                               
  SharedInvalidationMessage **msgs);
+
 void           StartupReorderBuffer(void);
 
 #endif
-- 
2.43.5

From 23a6ffa776bcae1125a6aef70703652b2850f908 Mon Sep 17 00:00:00 2001
From: Shlok Kyal <shlok.kyal....@gmail.com>
Date: Fri, 23 Aug 2024 14:02:20 +0530
Subject: [PATCH v20_REL_15] Fix data loss in logical replication

Previously, logical replication could lose data if one user modified a
publication to add a table while another user concurrently modified that table
and commit later than the publication modification transaction. The issue
arised during the decoding of transactions modifying the table: if the initial
catalog cache was built using a snapshot taken before the publication DDL
execution, all subsequent changes to that table were decoded with outdated
catalog cache, which caused them to be filtered from replication. This happened
because invalidation messages were only present in the publication modification
transaction, which was decoded before these subsequent changes.

This issue is not limited to publication DDLs; similar problems can occur with
ALTER TYPE statements executed concurrently with DMLs, leading to incorrect
decoding under outdated type contexts.

To address this, the commit improves logical decoding by ensuring that
invalidation messages from catalog-modifying transactions are distributed to
all concurrent in-progress transactions. This allows the necessary rebuild of
the catalog cache when decoding new changes, similar to handling historic
catalog snapshots (see SnapBuildDistributeNewCatalogSnapshot()).

Following this change, some performance regression is observed, primarily
during frequent execution of publication DDL statements that modify published
tables. This is an expected trade-off due to cache rebuild and distribution
overhead. The regression is minor or nearly nonexistent when DDLs do not affect
published tables or occur infrequently, making this a worthwhile cost to
resolve a longstanding data loss issue.

An alternative approach considered was to take a strong lock on each affected
table during publication modification. However, this would only address issues
related to publication DDLs and require locking every relation in the database
for publications created as FOR ALL TABLES, which is impractical. Thus, this
commit chooses to distribute invalidation messages as outlined above.
---
 contrib/test_decoding/Makefile                |  2 +-
 .../expected/invalidation_distrubution.out    | 20 ++++++
 .../specs/invalidation_distrubution.spec      | 32 +++++++++
 .../replication/logical/reorderbuffer.c       | 23 +++++++
 src/backend/replication/logical/snapbuild.c   | 67 +++++++++++++++----
 src/include/replication/reorderbuffer.h       |  4 ++
 6 files changed, 133 insertions(+), 15 deletions(-)
 create mode 100644 contrib/test_decoding/expected/invalidation_distrubution.out
 create mode 100644 contrib/test_decoding/specs/invalidation_distrubution.spec

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index a4ba1a509ae..eef70770674 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
        oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
        twophase_snapshot slot_creation_error catalog_change_snapshot \
-       skip_snapshot_restore
+       skip_snapshot_restore invalidation_distrubution
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/invalidation_distrubution.out 
b/contrib/test_decoding/expected/invalidation_distrubution.out
new file mode 100644
index 00000000000..24190ebe570
--- /dev/null
+++ b/contrib/test_decoding/expected/invalidation_distrubution.out
@@ -0,0 +1,20 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 
s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s1_begin: BEGIN;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
+step s1_commit: COMMIT;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_get_binary_changes: SELECT count(data) FROM 
pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 
'proto_version', '3', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
+count
+-----
+    1
+(1 row)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/invalidation_distrubution.spec 
b/contrib/test_decoding/specs/invalidation_distrubution.spec
new file mode 100644
index 00000000000..f63aba3ce96
--- /dev/null
+++ b/contrib/test_decoding/specs/invalidation_distrubution.spec
@@ -0,0 +1,32 @@
+# Test that catalog cache invalidation messages are distributed to ongoing
+# transactions, ensuring they can access the updated catalog content after
+# processing these messages.
+setup
+{
+    SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 
'pgoutput');
+    CREATE TABLE tbl1(val1 integer, val2 integer);
+    CREATE PUBLICATION pub;
+}
+
+teardown
+{
+    DROP TABLE tbl1;
+    DROP PUBLICATION pub;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s1"
+setup { SET synchronous_commit=on; }
+
+step "s1_begin" { BEGIN; }
+step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); }
+step "s1_commit" { COMMIT; }
+
+session "s2"
+setup { SET synchronous_commit=on; }
+
+step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; }
+step "s2_get_binary_changes" { SELECT count(data) FROM 
pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 
'proto_version', '3', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; 
}
+
+# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
+permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" 
"s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes"
diff --git a/src/backend/replication/logical/reorderbuffer.c 
b/src/backend/replication/logical/reorderbuffer.c
index d5cde20a1c9..835180e12f6 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -5200,3 +5200,26 @@ restart:
                *cmax = ent->cmax;
        return true;
 }
+
+/*
+ * Count invalidation messages of specified transaction.
+ *
+ * Returns number of messages, and msgs is set to the pointer of the linked
+ * list for the messages.
+ */
+uint32
+ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid,
+                                                         
SharedInvalidationMessage **msgs)
+{
+       ReorderBufferTXN *txn;
+
+       txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+                                                               false);
+
+       if (txn == NULL)
+               return 0;
+
+       *msgs = txn->invalidations;
+
+       return txn->ninvalidations;
+}
diff --git a/src/backend/replication/logical/snapbuild.c 
b/src/backend/replication/logical/snapbuild.c
index cc1f2a9f154..0b303f9a235 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -290,7 +290,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap);
 
 static void SnapBuildSnapIncRefcount(Snapshot snap);
 
-static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn);
+static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr 
lsn, TransactionId xid);
 
 /* xlog reading helper functions for SnapBuildProcessRunningXacts */
 static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, 
xl_running_xacts *running);
@@ -852,15 +852,15 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId 
xid,
 }
 
 /*
- * Add a new Snapshot to all transactions we're decoding that currently are
- * in-progress so they can see new catalog contents made by the transaction
- * that just committed. This is necessary because those in-progress
- * transactions will use the new catalog's contents from here on (at the very
- * least everything they do needs to be compatible with newer catalog
- * contents).
+ * Add a new Snapshot and invalidation messages to all transactions we're
+ * decoding that currently are in-progress so they can see new catalog contents
+ * made by the transaction that just committed. This is necessary because those
+ * in-progress transactions will use the new catalog's contents from here on
+ * (at the very least everything they do needs to be compatible with newer
+ * catalog contents).
  */
 static void
-SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
+SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, 
TransactionId xid)
 {
        dlist_iter      txn_i;
        ReorderBufferTXN *txn;
@@ -868,7 +868,8 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn)
        /*
         * Iterate through all toplevel transactions. This can include
         * subtransactions which we just don't yet know to be that, but that's
-        * fine, they will just get an unnecessary snapshot queued.
+        * fine, they will just get an unnecessary snapshot and invalidations
+        * queued.
         */
        dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
        {
@@ -881,6 +882,14 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn)
                 * transaction which in turn implies we don't yet need a 
snapshot at
                 * all. We'll add a snapshot when the first change gets queued.
                 *
+                * Similarly, we don't need to add invalidations to a 
transaction whose
+                * base snapshot is not yet set. Once a base snapshot is built, 
it will
+                * include the xids of committed transactions that have 
modified the
+                * catalog, thus reflecting the new catalog contents. The 
existing
+                * catalog cache will have already been invalidated after 
processing
+                * the invalidations in the transaction that modified catalogs,
+                * ensuring that a fresh cache is constructed during decoding.
+                *
                 * NB: This works correctly even for subtransactions because
                 * ReorderBufferAssignChild() takes care to transfer the base 
snapshot
                 * to the top-level transaction, and while iterating the 
changequeue
@@ -890,13 +899,13 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn)
                        continue;
 
                /*
-                * We don't need to add snapshot to prepared transactions as 
they
-                * should not see the new catalog contents.
+                * We don't need to add snapshot or invalidations to prepared
+                * transactions as they should not see the new catalog contents.
                 */
                if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn))
                        continue;
 
-               elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
+               elog(DEBUG2, "adding a new snapshot and invalidations to %u at 
%X/%X",
                         txn->xid, LSN_FORMAT_ARGS(lsn));
 
                /*
@@ -906,6 +915,33 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn)
                SnapBuildSnapIncRefcount(builder->snapshot);
                ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
                                                                 
builder->snapshot);
+
+               /*
+                * Add invalidation messages to the reorder buffer of 
in-progress
+                * transactions except the current committed transaction, for 
which we
+                * will execute invalidations at the end.
+                *
+                * It is required, otherwise, we will end up using the stale 
catcache
+                * contents built by the current transaction even after its 
decoding,
+                * which should have been invalidated due to concurrent catalog
+                * changing transaction.
+                */
+               if (txn->xid != xid)
+               {
+                       uint32 ninvalidations;
+                       SharedInvalidationMessage *msgs = NULL;
+
+                       ninvalidations = 
ReorderBufferGetInvalidations(builder->reorder,
+                                                                               
                                   xid, &msgs);
+
+                       if (ninvalidations > 0)
+                       {
+                               Assert(msgs != NULL);
+
+                               ReorderBufferAddInvalidations(builder->reorder, 
txn->xid, lsn,
+                                                                               
          ninvalidations, msgs);
+                       }
+               }
        }
 }
 
@@ -1184,8 +1220,11 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, 
TransactionId xid,
                /* refcount of the snapshot builder for the new snapshot */
                SnapBuildSnapIncRefcount(builder->snapshot);
 
-               /* add a new catalog snapshot to all currently running 
transactions */
-               SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
+               /*
+                * Add a new catalog snapshot and invalidations messages to all
+                * currently running transactions.
+                */
+               SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
        }
 }
 
diff --git a/src/include/replication/reorderbuffer.h 
b/src/include/replication/reorderbuffer.h
index 4a01f877e5d..402bb7a2728 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -680,6 +680,10 @@ extern TransactionId 
ReorderBufferGetOldestXmin(ReorderBuffer *rb);
 
 extern void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
+extern uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb,
+                                                                               
        TransactionId xid,
+                                                                               
        SharedInvalidationMessage **msgs);
+
 extern void StartupReorderBuffer(void);
 
 #endif
-- 
2.43.5

From febbb2e89f4c7ecc74ae9b7bc6194be19f0376ca Mon Sep 17 00:00:00 2001
From: Shlok Kyal <shlok.kyal....@gmail.com>
Date: Fri, 23 Aug 2024 14:02:20 +0530
Subject: [PATCH v20_REL_16] Fix data loss in logical replication

Previously, logical replication could lose data if one user modified a
publication to add a table while another user concurrently modified that table
and commit later than the publication modification transaction. The issue
arised during the decoding of transactions modifying the table: if the initial
catalog cache was built using a snapshot taken before the publication DDL
execution, all subsequent changes to that table were decoded with outdated
catalog cache, which caused them to be filtered from replication. This happened
because invalidation messages were only present in the publication modification
transaction, which was decoded before these subsequent changes.

This issue is not limited to publication DDLs; similar problems can occur with
ALTER TYPE statements executed concurrently with DMLs, leading to incorrect
decoding under outdated type contexts.

To address this, the commit improves logical decoding by ensuring that
invalidation messages from catalog-modifying transactions are distributed to
all concurrent in-progress transactions. This allows the necessary rebuild of
the catalog cache when decoding new changes, similar to handling historic
catalog snapshots (see SnapBuildDistributeNewCatalogSnapshot()).

Following this change, some performance regression is observed, primarily
during frequent execution of publication DDL statements that modify published
tables. This is an expected trade-off due to cache rebuild and distribution
overhead. The regression is minor or nearly nonexistent when DDLs do not affect
published tables or occur infrequently, making this a worthwhile cost to
resolve a longstanding data loss issue.

An alternative approach considered was to take a strong lock on each affected
table during publication modification. However, this would only address issues
related to publication DDLs and require locking every relation in the database
for publications created as FOR ALL TABLES, which is impractical. Thus, this
commit chooses to distribute invalidation messages as outlined above.
---
 contrib/test_decoding/Makefile                |  2 +-
 .../expected/invalidation_distrubution.out    | 20 ++++++
 contrib/test_decoding/meson.build             |  1 +
 .../specs/invalidation_distrubution.spec      | 32 +++++++++
 .../replication/logical/reorderbuffer.c       | 23 +++++++
 src/backend/replication/logical/snapbuild.c   | 67 +++++++++++++++----
 src/include/replication/reorderbuffer.h       |  4 ++
 7 files changed, 134 insertions(+), 15 deletions(-)
 create mode 100644 contrib/test_decoding/expected/invalidation_distrubution.out
 create mode 100644 contrib/test_decoding/specs/invalidation_distrubution.spec

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index a4ba1a509ae..eef70770674 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
        oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
        twophase_snapshot slot_creation_error catalog_change_snapshot \
-       skip_snapshot_restore
+       skip_snapshot_restore invalidation_distrubution
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/invalidation_distrubution.out 
b/contrib/test_decoding/expected/invalidation_distrubution.out
new file mode 100644
index 00000000000..ad0a944cbf3
--- /dev/null
+++ b/contrib/test_decoding/expected/invalidation_distrubution.out
@@ -0,0 +1,20 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 
s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s1_begin: BEGIN;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
+step s1_commit: COMMIT;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_get_binary_changes: SELECT count(data) FROM 
pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 
'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
+count
+-----
+    1
+(1 row)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/meson.build 
b/contrib/test_decoding/meson.build
index 2dd3ede41bf..273d26643c0 100644
--- a/contrib/test_decoding/meson.build
+++ b/contrib/test_decoding/meson.build
@@ -63,6 +63,7 @@ tests += {
       'twophase_snapshot',
       'slot_creation_error',
       'skip_snapshot_restore',
+      'invalidation_distrubution',
     ],
     'regress_args': [
       '--temp-config', files('logical.conf'),
diff --git a/contrib/test_decoding/specs/invalidation_distrubution.spec 
b/contrib/test_decoding/specs/invalidation_distrubution.spec
new file mode 100644
index 00000000000..decbed627e3
--- /dev/null
+++ b/contrib/test_decoding/specs/invalidation_distrubution.spec
@@ -0,0 +1,32 @@
+# Test that catalog cache invalidation messages are distributed to ongoing
+# transactions, ensuring they can access the updated catalog content after
+# processing these messages.
+setup
+{
+    SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 
'pgoutput');
+    CREATE TABLE tbl1(val1 integer, val2 integer);
+    CREATE PUBLICATION pub;
+}
+
+teardown
+{
+    DROP TABLE tbl1;
+    DROP PUBLICATION pub;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s1"
+setup { SET synchronous_commit=on; }
+
+step "s1_begin" { BEGIN; }
+step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); }
+step "s1_commit" { COMMIT; }
+
+session "s2"
+setup { SET synchronous_commit=on; }
+
+step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; }
+step "s2_get_binary_changes" { SELECT count(data) FROM 
pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 
'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; 
}
+
+# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
+permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" 
"s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes"
diff --git a/src/backend/replication/logical/reorderbuffer.c 
b/src/backend/replication/logical/reorderbuffer.c
index 930549948af..fa04e829cc9 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -5283,3 +5283,26 @@ restart:
                *cmax = ent->cmax;
        return true;
 }
+
+/*
+ * Count invalidation messages of specified transaction.
+ *
+ * Returns number of messages, and msgs is set to the pointer of the linked
+ * list for the messages.
+ */
+uint32
+ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid,
+                                                         
SharedInvalidationMessage **msgs)
+{
+       ReorderBufferTXN *txn;
+
+       txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+                                                               false);
+
+       if (txn == NULL)
+               return 0;
+
+       *msgs = txn->invalidations;
+
+       return txn->ninvalidations;
+}
diff --git a/src/backend/replication/logical/snapbuild.c 
b/src/backend/replication/logical/snapbuild.c
index 3ed2f79dd06..d48ebb8337b 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -288,7 +288,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap);
 
 static void SnapBuildSnapIncRefcount(Snapshot snap);
 
-static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn);
+static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr 
lsn, TransactionId xid);
 
 static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, 
TransactionId xid,
                                                                                
                 uint32 xinfo);
@@ -845,15 +845,15 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId 
xid,
 }
 
 /*
- * Add a new Snapshot to all transactions we're decoding that currently are
- * in-progress so they can see new catalog contents made by the transaction
- * that just committed. This is necessary because those in-progress
- * transactions will use the new catalog's contents from here on (at the very
- * least everything they do needs to be compatible with newer catalog
- * contents).
+ * Add a new Snapshot and invalidation messages to all transactions we're
+ * decoding that currently are in-progress so they can see new catalog contents
+ * made by the transaction that just committed. This is necessary because those
+ * in-progress transactions will use the new catalog's contents from here on
+ * (at the very least everything they do needs to be compatible with newer
+ * catalog contents).
  */
 static void
-SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
+SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, 
TransactionId xid)
 {
        dlist_iter      txn_i;
        ReorderBufferTXN *txn;
@@ -861,7 +861,8 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn)
        /*
         * Iterate through all toplevel transactions. This can include
         * subtransactions which we just don't yet know to be that, but that's
-        * fine, they will just get an unnecessary snapshot queued.
+        * fine, they will just get an unnecessary snapshot and invalidations
+        * queued.
         */
        dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
        {
@@ -874,6 +875,14 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn)
                 * transaction which in turn implies we don't yet need a 
snapshot at
                 * all. We'll add a snapshot when the first change gets queued.
                 *
+                * Similarly, we don't need to add invalidations to a 
transaction whose
+                * base snapshot is not yet set. Once a base snapshot is built, 
it will
+                * include the xids of committed transactions that have 
modified the
+                * catalog, thus reflecting the new catalog contents. The 
existing
+                * catalog cache will have already been invalidated after 
processing
+                * the invalidations in the transaction that modified catalogs,
+                * ensuring that a fresh cache is constructed during decoding.
+                *
                 * NB: This works correctly even for subtransactions because
                 * ReorderBufferAssignChild() takes care to transfer the base 
snapshot
                 * to the top-level transaction, and while iterating the 
changequeue
@@ -883,13 +892,13 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn)
                        continue;
 
                /*
-                * We don't need to add snapshot to prepared transactions as 
they
-                * should not see the new catalog contents.
+                * We don't need to add snapshot or invalidations to prepared
+                * transactions as they should not see the new catalog contents.
                 */
                if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn))
                        continue;
 
-               elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
+               elog(DEBUG2, "adding a new snapshot and invalidations to %u at 
%X/%X",
                         txn->xid, LSN_FORMAT_ARGS(lsn));
 
                /*
@@ -899,6 +908,33 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn)
                SnapBuildSnapIncRefcount(builder->snapshot);
                ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
                                                                 
builder->snapshot);
+
+               /*
+                * Add invalidation messages to the reorder buffer of 
in-progress
+                * transactions except the current committed transaction, for 
which we
+                * will execute invalidations at the end.
+                *
+                * It is required, otherwise, we will end up using the stale 
catcache
+                * contents built by the current transaction even after its 
decoding,
+                * which should have been invalidated due to concurrent catalog
+                * changing transaction.
+                */
+               if (txn->xid != xid)
+               {
+                       uint32 ninvalidations;
+                       SharedInvalidationMessage *msgs = NULL;
+
+                       ninvalidations = 
ReorderBufferGetInvalidations(builder->reorder,
+                                                                               
                                   xid, &msgs);
+
+                       if (ninvalidations > 0)
+                       {
+                               Assert(msgs != NULL);
+
+                               ReorderBufferAddInvalidations(builder->reorder, 
txn->xid, lsn,
+                                                                               
          ninvalidations, msgs);
+                       }
+               }
        }
 }
 
@@ -1170,8 +1206,11 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, 
TransactionId xid,
                /* refcount of the snapshot builder for the new snapshot */
                SnapBuildSnapIncRefcount(builder->snapshot);
 
-               /* add a new catalog snapshot to all currently running 
transactions */
-               SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
+               /*
+                * Add a new catalog snapshot and invalidations messages to all
+                * currently running transactions.
+                */
+               SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
        }
 }
 
diff --git a/src/include/replication/reorderbuffer.h 
b/src/include/replication/reorderbuffer.h
index 3cb03168de2..8a32bbe28df 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -748,6 +748,10 @@ extern TransactionId 
*ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb);
 
 extern void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr);
 
+extern uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb,
+                                                                               
        TransactionId xid,
+                                                                               
        SharedInvalidationMessage **msgs);
+
 extern void StartupReorderBuffer(void);
 
 #endif
-- 
2.43.5

From baf14b0c37827f301ad93e3903fee9048be97b52 Mon Sep 17 00:00:00 2001
From: Shlok Kyal <shlok.kyal....@gmail.com>
Date: Fri, 23 Aug 2024 14:02:20 +0530
Subject: [PATCH v20_REL_17] Fix data loss in logical replication

Previously, logical replication could lose data if one user modified a
publication to add a table while another user concurrently modified that table
and commit later than the publication modification transaction. The issue
arised during the decoding of transactions modifying the table: if the initial
catalog cache was built using a snapshot taken before the publication DDL
execution, all subsequent changes to that table were decoded with outdated
catalog cache, which caused them to be filtered from replication. This happened
because invalidation messages were only present in the publication modification
transaction, which was decoded before these subsequent changes.

This issue is not limited to publication DDLs; similar problems can occur with
ALTER TYPE statements executed concurrently with DMLs, leading to incorrect
decoding under outdated type contexts.

To address this, the commit improves logical decoding by ensuring that
invalidation messages from catalog-modifying transactions are distributed to
all concurrent in-progress transactions. This allows the necessary rebuild of
the catalog cache when decoding new changes, similar to handling historic
catalog snapshots (see SnapBuildDistributeNewCatalogSnapshot()).

Following this change, some performance regression is observed, primarily
during frequent execution of publication DDL statements that modify published
tables. This is an expected trade-off due to cache rebuild and distribution
overhead. The regression is minor or nearly nonexistent when DDLs do not affect
published tables or occur infrequently, making this a worthwhile cost to
resolve a longstanding data loss issue.

An alternative approach considered was to take a strong lock on each affected
table during publication modification. However, this would only address issues
related to publication DDLs and require locking every relation in the database
for publications created as FOR ALL TABLES, which is impractical. Thus, this
commit chooses to distribute invalidation messages as outlined above.
---
 contrib/test_decoding/Makefile                |  2 +-
 .../expected/invalidation_distrubution.out    | 20 ++++++
 contrib/test_decoding/meson.build             |  1 +
 .../specs/invalidation_distrubution.spec      | 32 +++++++++
 .../replication/logical/reorderbuffer.c       | 23 +++++++
 src/backend/replication/logical/snapbuild.c   | 67 +++++++++++++++----
 src/include/replication/reorderbuffer.h       |  4 ++
 7 files changed, 134 insertions(+), 15 deletions(-)
 create mode 100644 contrib/test_decoding/expected/invalidation_distrubution.out
 create mode 100644 contrib/test_decoding/specs/invalidation_distrubution.spec

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index a4ba1a509ae..eef70770674 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
        oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
        twophase_snapshot slot_creation_error catalog_change_snapshot \
-       skip_snapshot_restore
+       skip_snapshot_restore invalidation_distrubution
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/invalidation_distrubution.out 
b/contrib/test_decoding/expected/invalidation_distrubution.out
new file mode 100644
index 00000000000..ad0a944cbf3
--- /dev/null
+++ b/contrib/test_decoding/expected/invalidation_distrubution.out
@@ -0,0 +1,20 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 
s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s1_begin: BEGIN;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
+step s1_commit: COMMIT;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_get_binary_changes: SELECT count(data) FROM 
pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 
'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
+count
+-----
+    1
+(1 row)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/meson.build 
b/contrib/test_decoding/meson.build
index f643dc81a2c..b31c433681d 100644
--- a/contrib/test_decoding/meson.build
+++ b/contrib/test_decoding/meson.build
@@ -63,6 +63,7 @@ tests += {
       'twophase_snapshot',
       'slot_creation_error',
       'skip_snapshot_restore',
+      'invalidation_distrubution',
     ],
     'regress_args': [
       '--temp-config', files('logical.conf'),
diff --git a/contrib/test_decoding/specs/invalidation_distrubution.spec 
b/contrib/test_decoding/specs/invalidation_distrubution.spec
new file mode 100644
index 00000000000..decbed627e3
--- /dev/null
+++ b/contrib/test_decoding/specs/invalidation_distrubution.spec
@@ -0,0 +1,32 @@
+# Test that catalog cache invalidation messages are distributed to ongoing
+# transactions, ensuring they can access the updated catalog content after
+# processing these messages.
+setup
+{
+    SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 
'pgoutput');
+    CREATE TABLE tbl1(val1 integer, val2 integer);
+    CREATE PUBLICATION pub;
+}
+
+teardown
+{
+    DROP TABLE tbl1;
+    DROP PUBLICATION pub;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s1"
+setup { SET synchronous_commit=on; }
+
+step "s1_begin" { BEGIN; }
+step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); }
+step "s1_commit" { COMMIT; }
+
+session "s2"
+setup { SET synchronous_commit=on; }
+
+step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; }
+step "s2_get_binary_changes" { SELECT count(data) FROM 
pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 
'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; 
}
+
+# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
+permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" 
"s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes"
diff --git a/src/backend/replication/logical/reorderbuffer.c 
b/src/backend/replication/logical/reorderbuffer.c
index 9c742e96eb3..03eb005c39d 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -5337,3 +5337,26 @@ restart:
                *cmax = ent->cmax;
        return true;
 }
+
+/*
+ * Count invalidation messages of specified transaction.
+ *
+ * Returns number of messages, and msgs is set to the pointer of the linked
+ * list for the messages.
+ */
+uint32
+ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid,
+                                                         
SharedInvalidationMessage **msgs)
+{
+       ReorderBufferTXN *txn;
+
+       txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+                                                               false);
+
+       if (txn == NULL)
+               return 0;
+
+       *msgs = txn->invalidations;
+
+       return txn->ninvalidations;
+}
diff --git a/src/backend/replication/logical/snapbuild.c 
b/src/backend/replication/logical/snapbuild.c
index ae676145e60..110e0b0a044 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -300,7 +300,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap);
 
 static void SnapBuildSnapIncRefcount(Snapshot snap);
 
-static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn);
+static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr 
lsn, TransactionId xid);
 
 static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, 
TransactionId xid,
                                                                                
                 uint32 xinfo);
@@ -859,15 +859,15 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId 
xid,
 }
 
 /*
- * Add a new Snapshot to all transactions we're decoding that currently are
- * in-progress so they can see new catalog contents made by the transaction
- * that just committed. This is necessary because those in-progress
- * transactions will use the new catalog's contents from here on (at the very
- * least everything they do needs to be compatible with newer catalog
- * contents).
+ * Add a new Snapshot and invalidation messages to all transactions we're
+ * decoding that currently are in-progress so they can see new catalog contents
+ * made by the transaction that just committed. This is necessary because those
+ * in-progress transactions will use the new catalog's contents from here on
+ * (at the very least everything they do needs to be compatible with newer
+ * catalog contents).
  */
 static void
-SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
+SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, 
TransactionId xid)
 {
        dlist_iter      txn_i;
        ReorderBufferTXN *txn;
@@ -875,7 +875,8 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn)
        /*
         * Iterate through all toplevel transactions. This can include
         * subtransactions which we just don't yet know to be that, but that's
-        * fine, they will just get an unnecessary snapshot queued.
+        * fine, they will just get an unnecessary snapshot and invalidations
+        * queued.
         */
        dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
        {
@@ -888,6 +889,14 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn)
                 * transaction which in turn implies we don't yet need a 
snapshot at
                 * all. We'll add a snapshot when the first change gets queued.
                 *
+                * Similarly, we don't need to add invalidations to a 
transaction whose
+                * base snapshot is not yet set. Once a base snapshot is built, 
it will
+                * include the xids of committed transactions that have 
modified the
+                * catalog, thus reflecting the new catalog contents. The 
existing
+                * catalog cache will have already been invalidated after 
processing
+                * the invalidations in the transaction that modified catalogs,
+                * ensuring that a fresh cache is constructed during decoding.
+                *
                 * NB: This works correctly even for subtransactions because
                 * ReorderBufferAssignChild() takes care to transfer the base 
snapshot
                 * to the top-level transaction, and while iterating the 
changequeue
@@ -897,13 +906,13 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn)
                        continue;
 
                /*
-                * We don't need to add snapshot to prepared transactions as 
they
-                * should not see the new catalog contents.
+                * We don't need to add snapshot or invalidations to prepared
+                * transactions as they should not see the new catalog contents.
                 */
                if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn))
                        continue;
 
-               elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
+               elog(DEBUG2, "adding a new snapshot and invalidations to %u at 
%X/%X",
                         txn->xid, LSN_FORMAT_ARGS(lsn));
 
                /*
@@ -913,6 +922,33 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn)
                SnapBuildSnapIncRefcount(builder->snapshot);
                ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
                                                                 
builder->snapshot);
+
+               /*
+                * Add invalidation messages to the reorder buffer of 
in-progress
+                * transactions except the current committed transaction, for 
which we
+                * will execute invalidations at the end.
+                *
+                * It is required, otherwise, we will end up using the stale 
catcache
+                * contents built by the current transaction even after its 
decoding,
+                * which should have been invalidated due to concurrent catalog
+                * changing transaction.
+                */
+               if (txn->xid != xid)
+               {
+                       uint32 ninvalidations;
+                       SharedInvalidationMessage *msgs = NULL;
+
+                       ninvalidations = 
ReorderBufferGetInvalidations(builder->reorder,
+                                                                               
                                   xid, &msgs);
+
+                       if (ninvalidations > 0)
+                       {
+                               Assert(msgs != NULL);
+
+                               ReorderBufferAddInvalidations(builder->reorder, 
txn->xid, lsn,
+                                                                               
          ninvalidations, msgs);
+                       }
+               }
        }
 }
 
@@ -1184,8 +1220,11 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, 
TransactionId xid,
                /* refcount of the snapshot builder for the new snapshot */
                SnapBuildSnapIncRefcount(builder->snapshot);
 
-               /* add a new catalog snapshot to all currently running 
transactions */
-               SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
+               /*
+                * Add a new catalog snapshot and invalidations messages to all
+                * currently running transactions.
+                */
+               SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
        }
 }
 
diff --git a/src/include/replication/reorderbuffer.h 
b/src/include/replication/reorderbuffer.h
index 7de50462dcf..4c56f219fd8 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -729,6 +729,10 @@ extern TransactionId 
*ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb);
 
 extern void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr);
 
+extern uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb,
+                                                                               
        TransactionId xid,
+                                                                               
        SharedInvalidationMessage **msgs);
+
 extern void StartupReorderBuffer(void);
 
 #endif
-- 
2.43.5

Attachment: v20-0001-Fix-data-loss-in-logical-replication.patch
Description: v20-0001-Fix-data-loss-in-logical-replication.patch

Reply via email to