Hello

Firstly -- this is top-notch detective work, kudos and thanks for the
patch and test cases.  (I verified that both fail before the code fix.)

Here's a v3.  I applied a lot of makeup in order to try to understand
what's going on.  I *think* I have a grasp on the original code and your
bugfix, not terribly firm I admit.

Some comments

* you don't need to Assert that things are not NULL if you're
  immediately going to dereference them.  The assert is there to make
  the code crash in case it's a NULL pointer, but the subsequent
  dereference is going to have the same effect, so the assert is
  redundant.

* I think setting snapshot_base to NULL in ReorderBufferCleanupTXN is
  pointless, since the struct is gonna be freed shortly afterwards.

* I rewrote many comments (both existing and some of the ones your patch
  adds), and added lots of comments where there were none.

* I'm a bit unsure about the comment atop ReorderBufferSetBaseSnapshot.
  Obviously, the bit within the #if 0/#endif I'm going to remove before
  push.  I don't understand why it says "Needs to be called before any
  changes are added with ReorderBufferQueueChange"; but if you edit that
  function and add an assert that the base snapshot is set, it crashes
  pretty quickly in the test_decoding tests.  (The supposedly bogus
  comment was there before your patch -- I'm not saying your comment
  addition is faulty.)

* I also noticed that we're doing subtxn cleanup one by one in both
  ReorderBufferAssignChild and ReorderBufferCommitChild, which means the
  top-level txn is sought in the hash table over and over, which seems a
  bit silly.  Not this patch's problem to fix ...

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 1d601d8144..afcab930f7 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -50,7 +50,8 @@ regresscheck-install-force: | submake-regress 
submake-test_decoding temp-install
        $(pg_regress_installcheck) \
            $(REGRESSCHECKS)
 
-ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml
+ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml \
+       oldest_xmin snapshot_transfer
 
 isolationcheck: | submake-isolation submake-test_decoding temp-install
        $(pg_isolation_regress_check) \
diff --git a/contrib/test_decoding/expected/oldest_xmin.out 
b/contrib/test_decoding/expected/oldest_xmin.out
new file mode 100644
index 0000000000..d09342c4be
--- /dev/null
+++ b/contrib/test_decoding/expected/oldest_xmin.out
@@ -0,0 +1,27 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_begin s0_getxid s1_begin s1_insert s0_alter s0_commit 
s0_checkpoint s0_get_changes s1_commit s0_vacuum s0_get_changes
+step s0_begin: BEGIN;
+step s0_getxid: SELECT txid_current() IS NULL;
+?column?       
+
+f              
+step s1_begin: BEGIN;
+step s1_insert: INSERT INTO harvest VALUES ((1, 2, 3));
+step s0_alter: ALTER TYPE basket DROP ATTRIBUTE mangos;
+step s0_commit: COMMIT;
+step s0_checkpoint: CHECKPOINT;
+step s0_get_changes: SELECT data FROM 
pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 
'skip-empty-xacts', '1');
+data           
+
+step s1_commit: COMMIT;
+step s0_vacuum: VACUUM FULL;
+step s0_get_changes: SELECT data FROM 
pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 
'skip-empty-xacts', '1');
+data           
+
+BEGIN          
+table public.harvest: INSERT: fruits[basket]:'(1,2,3)'
+COMMIT         
+?column?       
+
+stop           
diff --git a/contrib/test_decoding/expected/snapshot_transfer.out 
b/contrib/test_decoding/expected/snapshot_transfer.out
new file mode 100644
index 0000000000..87bed03f76
--- /dev/null
+++ b/contrib/test_decoding/expected/snapshot_transfer.out
@@ -0,0 +1,49 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_begin s0_begin_sub0 s0_log_assignment 
s0_sub_get_base_snap s1_produce_new_snap s0_insert s0_end_sub0 s0_commit 
s0_get_changes
+step s0_begin: BEGIN;
+step s0_begin_sub0: SAVEPOINT s0;
+step s0_log_assignment: SELECT txid_current() IS NULL;
+?column?       
+
+f              
+step s0_sub_get_base_snap: INSERT INTO dummy VALUES (0);
+step s1_produce_new_snap: ALTER TABLE harvest ADD COLUMN mangos int;
+step s0_insert: INSERT INTO harvest VALUES (1, 2, 3);
+step s0_end_sub0: RELEASE SAVEPOINT s0;
+step s0_commit: COMMIT;
+step s0_get_changes: SELECT data FROM 
pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 
'skip-empty-xacts', '1');
+data           
+
+BEGIN          
+table public.dummy: INSERT: i[integer]:0
+table public.harvest: INSERT: apples[integer]:1 pears[integer]:2 
mangos[integer]:3
+COMMIT         
+?column?       
+
+stop           
+
+starting permutation: s0_begin s0_begin_sub0 s0_log_assignment s0_begin_sub1 
s0_sub_get_base_snap s1_produce_new_snap s0_insert s0_end_sub1 s0_end_sub0 
s0_commit s0_get_changes
+step s0_begin: BEGIN;
+step s0_begin_sub0: SAVEPOINT s0;
+step s0_log_assignment: SELECT txid_current() IS NULL;
+?column?       
+
+f              
+step s0_begin_sub1: SAVEPOINT s1;
+step s0_sub_get_base_snap: INSERT INTO dummy VALUES (0);
+step s1_produce_new_snap: ALTER TABLE harvest ADD COLUMN mangos int;
+step s0_insert: INSERT INTO harvest VALUES (1, 2, 3);
+step s0_end_sub1: RELEASE SAVEPOINT s1;
+step s0_end_sub0: RELEASE SAVEPOINT s0;
+step s0_commit: COMMIT;
+step s0_get_changes: SELECT data FROM 
pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 
'skip-empty-xacts', '1');
+data           
+
+BEGIN          
+table public.dummy: INSERT: i[integer]:0
+table public.harvest: INSERT: apples[integer]:1 pears[integer]:2 
mangos[integer]:3
+COMMIT         
+?column?       
+
+stop           
diff --git a/contrib/test_decoding/specs/oldest_xmin.spec 
b/contrib/test_decoding/specs/oldest_xmin.spec
new file mode 100644
index 0000000000..4f8af70aa2
--- /dev/null
+++ b/contrib/test_decoding/specs/oldest_xmin.spec
@@ -0,0 +1,37 @@
+# Test advancement of the slot's oldest xmin
+
+setup
+{
+    SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 
'test_decoding'); -- must be first write in xact
+    DROP TYPE IF EXISTS basket;
+    CREATE TYPE basket AS (apples integer, pears integer, mangos integer);
+    DROP TABLE IF EXISTS harvest;
+    CREATE TABLE harvest(fruits basket);
+}
+
+teardown
+{
+    DROP TABLE IF EXISTS harvest;
+    DROP TYPE IF EXISTS basket;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+step "s0_begin" { BEGIN; }
+step "s0_getxid" { SELECT txid_current() IS NULL; }
+step "s0_alter" { ALTER TYPE basket DROP ATTRIBUTE mangos; }
+step "s0_commit" { COMMIT; }
+step "s0_checkpoint" { CHECKPOINT; }
+step "s0_vacuum" { VACUUM FULL; }
+step "s0_get_changes" { SELECT data FROM 
pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 
'skip-empty-xacts', '1'); }
+
+session "s1"
+step "s1_begin" { BEGIN; }
+step "s1_insert" { INSERT INTO harvest VALUES ((1, 2, 3)); }
+step "s1_commit" { COMMIT; }
+
+# Checkpoint with following get_changes forces to advance xmin. ALTER of a
+# composite type is a rare form of DDL which allows T1 to see the tuple which
+# will be removed (xmax set) before T1 commits. That is, interlocking doesn't
+# forbid modifying catalog after someone read it (and didn't commit yet).
+permutation "s0_begin" "s0_getxid" "s1_begin" "s1_insert" "s0_alter" 
"s0_commit" "s0_checkpoint" "s0_get_changes" "s1_commit" "s0_vacuum" 
"s0_get_changes"
diff --git a/contrib/test_decoding/specs/snapshot_transfer.spec 
b/contrib/test_decoding/specs/snapshot_transfer.spec
new file mode 100644
index 0000000000..47db7fd90a
--- /dev/null
+++ b/contrib/test_decoding/specs/snapshot_transfer.spec
@@ -0,0 +1,42 @@
+# Test snapshot transfer from subxact to top-level and receival of later snaps.
+
+setup
+{
+    SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 
'test_decoding'); -- must be first write in xact
+    DROP TABLE IF EXISTS dummy;
+    CREATE TABLE dummy(i int);
+    DROP TABLE IF EXISTS harvest;
+    CREATE TABLE harvest(apples int, pears int);
+}
+
+teardown
+{
+    DROP TABLE IF EXISTS harvest;
+    DROP TABLE IF EXISTS dummy;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+step "s0_begin" { BEGIN; }
+step "s0_begin_sub0" { SAVEPOINT s0; }
+step "s0_log_assignment" { SELECT txid_current() IS NULL; }
+step "s0_begin_sub1" { SAVEPOINT s1; }
+step "s0_sub_get_base_snap" { INSERT INTO dummy VALUES (0); }
+step "s0_insert" { INSERT INTO harvest VALUES (1, 2, 3); }
+step "s0_end_sub0" { RELEASE SAVEPOINT s0; }
+step "s0_end_sub1" { RELEASE SAVEPOINT s1; }
+step "s0_insert2" { INSERT INTO harvest VALUES (1, 2, 3, 4); }
+step "s0_commit" { COMMIT; }
+step "s0_get_changes" { SELECT data FROM 
pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 
'skip-empty-xacts', '1'); }
+
+session "s1"
+step "s1_produce_new_snap" { ALTER TABLE harvest ADD COLUMN mangos int; }
+
+# start top-level without base snap, get base snap in subxact, then create new
+# snap and make sure it is queued.
+permutation "s0_begin" "s0_begin_sub0" "s0_log_assignment" 
"s0_sub_get_base_snap" "s1_produce_new_snap" "s0_insert" "s0_end_sub0" 
"s0_commit" "s0_get_changes"
+
+# In previous test, we firstly associated subxact with xact and only then got
+# base snap; now nest one more subxact to get snap first and only then (at
+# commit) associate it with toplevel.
+permutation "s0_begin" "s0_begin_sub0" "s0_log_assignment" "s0_begin_sub1" 
"s0_sub_get_base_snap" "s1_produce_new_snap" "s0_insert" "s0_end_sub1" 
"s0_end_sub0" "s0_commit" "s0_get_changes"
diff --git a/src/backend/replication/logical/reorderbuffer.c 
b/src/backend/replication/logical/reorderbuffer.c
index c1447a513b..e29c1fec15 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -165,6 +165,8 @@ static void ReorderBufferReturnTXN(ReorderBuffer *rb, 
ReorderBufferTXN *txn);
 static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
                                          TransactionId xid, bool create, bool 
*is_new,
                                          XLogRecPtr lsn, bool create_as_top);
+static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
+                                                                 
ReorderBufferTXN *subtxn);
 
 static void AssertTXNLsnOrder(ReorderBuffer *rb);
 
@@ -271,6 +273,7 @@ ReorderBufferAllocate(void)
        buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
        dlist_init(&buffer->toplevel_by_lsn);
+       dlist_init(&buffer->txns_by_base_snapshot_lsn);
 
        /*
         * Ensure there's no stale data from prior uses of this slot, in case 
some
@@ -462,7 +465,6 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, 
bool create,
        bool            found;
 
        Assert(TransactionIdIsValid(xid));
-       Assert(!create || lsn != InvalidXLogRecPtr);
 
        /*
         * Check the one-entry lookup cache first
@@ -506,6 +508,7 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, 
bool create,
        {
                /* initialize the new entry, if creation was requested */
                Assert(ent != NULL);
+               Assert(lsn != InvalidXLogRecPtr);
 
                ent->txn = ReorderBufferGetTXN(rb);
                ent->txn->xid = xid;
@@ -543,9 +546,10 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId 
xid, XLogRecPtr lsn,
        ReorderBufferTXN *txn;
 
        txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+       Assert(txn->base_snapshot != NULL || txn->is_known_as_subxact);
 
        change->lsn = lsn;
-       Assert(InvalidXLogRecPtr != lsn);
+       Assert(!XLogRecPtrIsInvalid(lsn));
        dlist_push_tail(&txn->changes, &change->node);
        txn->nentries++;
        txn->nentries_mem++;
@@ -607,43 +611,80 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, 
TransactionId xid,
        }
 }
 
-
+/*
+ * AssertTXNLsnOrder
+ *             Verify LSN ordering of transaction lists in the reorderbuffer
+ *
+ * Other LSN-related invariants are checked too.
+ *
+ * No-op if assertions are not in use.
+ */
 static void
 AssertTXNLsnOrder(ReorderBuffer *rb)
 {
 #ifdef USE_ASSERT_CHECKING
        dlist_iter      iter;
        XLogRecPtr      prev_first_lsn = InvalidXLogRecPtr;
+       XLogRecPtr      prev_base_snap_lsn = InvalidXLogRecPtr;
 
        dlist_foreach(iter, &rb->toplevel_by_lsn)
        {
-               ReorderBufferTXN *cur_txn;
+               ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, 
node,
+                                                                               
                        iter.cur);
 
-               cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
+               /* start LSN must be set */
                Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
 
+               /* If there is an end LSN, it must be higher than start LSN */
                if (cur_txn->end_lsn != InvalidXLogRecPtr)
                        Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
 
+               /* Current initial LSN must be strictly higher than previous */
                if (prev_first_lsn != InvalidXLogRecPtr)
                        Assert(prev_first_lsn < cur_txn->first_lsn);
 
+               /* known-as-subtxn txns must not be listed */
                Assert(!cur_txn->is_known_as_subxact);
+
                prev_first_lsn = cur_txn->first_lsn;
        }
+
+       dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
+       {
+               ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN,
+                                                                               
                        base_snapshot_node,
+                                                                               
                        iter.cur);
+
+               /* base snapshot (and its LSN) must be set */
+               Assert(cur_txn->base_snapshot != NULL);
+               Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr);
+
+               /* current LSN must be strictly higher than previous */
+               if (prev_base_snap_lsn != InvalidXLogRecPtr)
+                       Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
+
+               /* known-as-subtxn txns must not be listed */
+               Assert(!cur_txn->is_known_as_subxact);
+
+               prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
+       }
 #endif
 }
 
+/*
+ * ReorderBufferGetOldestTXN
+ *             Return oldest transaction in reorderbuffer
+ */
 ReorderBufferTXN *
 ReorderBufferGetOldestTXN(ReorderBuffer *rb)
 {
        ReorderBufferTXN *txn;
 
+       AssertTXNLsnOrder(rb);
+
        if (dlist_is_empty(&rb->toplevel_by_lsn))
                return NULL;
 
-       AssertTXNLsnOrder(rb);
-
        txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
 
        Assert(!txn->is_known_as_subxact);
@@ -651,12 +692,44 @@ ReorderBufferGetOldestTXN(ReorderBuffer *rb)
        return txn;
 }
 
+/*
+ * ReorderBufferGetOldestXmin
+ *             Return oldest Xmin in reorderbuffer
+ *
+ * Returns oldest possibly running Xid from the point of view of snapshots
+ * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
+ * there are none.
+ *
+ * Since snapshots are assigned monotonically, this equals the Xmin of the
+ * base snapshot with minimal base_snapshot_lsn.
+ */
+TransactionId
+ReorderBufferGetOldestXmin(ReorderBuffer *rb)
+{
+       ReorderBufferTXN *txn;
+
+       AssertTXNLsnOrder(rb);
+
+       if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn))
+               return InvalidTransactionId;
+
+       txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
+                                                        
&rb->txns_by_base_snapshot_lsn);
+       return txn->base_snapshot->xmin;
+}
+
 void
 ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
 {
        rb->current_restart_decoding_lsn = ptr;
 }
 
+/*
+ * ReorderBufferAssignChild
+ *
+ * Make note that we know that subxid is a subtransaction of xid, seen as of
+ * the given lsn.
+ */
 void
 ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
                                                 TransactionId subxid, 
XLogRecPtr lsn)
@@ -669,32 +742,107 @@ ReorderBufferAssignChild(ReorderBuffer *rb, 
TransactionId xid,
        txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
        subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
 
-       if (new_sub)
-       {
-               /*
-                * we assign subtransactions to top level transaction even if 
we don't
-                * have data for it yet, assignment records frequently 
reference xids
-                * that have not yet produced any records. Knowing those aren't 
top
-                * level xids allows us to make processing cheaper in some 
places.
-                */
-               dlist_push_tail(&txn->subtxns, &subtxn->node);
-               txn->nsubtxns++;
-       }
-       else if (!subtxn->is_known_as_subxact)
-       {
-               subtxn->is_known_as_subxact = true;
-               Assert(subtxn->nsubtxns == 0);
+       if (new_top && !new_sub)
+               elog(ERROR, "subtransaction logged without previous top-level 
txn record");
 
-               /* remove from lsn order list of top-level transactions */
-               dlist_delete(&subtxn->node);
-
-               /* add to toplevel transaction */
-               dlist_push_tail(&txn->subtxns, &subtxn->node);
-               txn->nsubtxns++;
-       }
-       else if (new_top)
+       if (!new_sub)
        {
-               elog(ERROR, "existing subxact assigned to unknown toplevel 
xact");
+               if (subtxn->is_known_as_subxact)
+               {
+                       /* already associated, nothing to do */
+                       return;
+               }
+               else
+               {
+                       /*
+                        * We already saw this transaction, but initially added 
it to the list
+                        * of top-level txns.  Now that we know it's not 
top-level, remove
+                        * it from there.
+                        */
+                       dlist_delete(&subtxn->node);
+               }
+       }
+
+       subtxn->is_known_as_subxact = true;
+       subtxn->toplevel_xid = xid;
+       Assert(subtxn->nsubtxns == 0);
+
+       /* add to subtransaction list */
+       dlist_push_tail(&txn->subtxns, &subtxn->node);
+       txn->nsubtxns++;
+
+       /* Possibly transfer the subtxn's snapshot to its top-level txn. */
+       ReorderBufferTransferSnapToParent(txn, subtxn);
+
+       /* Verify LSN-ordering invariant */
+       AssertTXNLsnOrder(rb);
+}
+
+/*
+ * ReorderBufferTransferSnapToParent
+ *             Transfer base snapshot from subtxn to top-level txn, if needed
+ *
+ * This is done if the top-level txn doesn't have a base snapshot, or if the
+ * subtxn's base snapshot has an earlier LSN than the top-level txn's base
+ * snapshot's LSN.  This can happen if there are no changes in the toplevel
+ * txn but there are some in the subtxn, or the first change in subtxn has
+ * earlier LSN than first change in the top-level txn and we learned about
+ * their kinship only now.
+ *
+ * The subtransaction's snapshot is cleared regardless of the transfer
+ * happening, since it's not needed anymore in either case.
+ *
+ * We do this as soon as we become aware of their kinship, to avoid queueing
+ * extra snapshots to txns known-as-subtxns -- only top-level txns will
+ * receive further snapshots.
+ */
+static void
+ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
+                                                                 
ReorderBufferTXN *subtxn)
+{
+       Assert(subtxn->toplevel_xid == txn->xid);
+
+       if (subtxn->base_snapshot != NULL)
+       {
+               if (txn->base_snapshot == NULL ||
+                       subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
+               {
+                       /*
+                        * If the toplevel transaction already has a base 
snapshot but
+                        * it's newer than the subxact's, purge it.
+                        */
+                       if (txn->base_snapshot != NULL)
+                       {
+                               SnapBuildSnapDecRefcount(txn->base_snapshot);
+                               dlist_delete(&txn->base_snapshot_node);
+                       }
+
+                       /*
+                        * The snapshot is now the top transaction's; transfer 
it, and
+                        * adjust the list position of the top transaction in 
the list by
+                        * moving it to where the subtransaction is.
+                        */
+                       txn->base_snapshot = subtxn->base_snapshot;
+                       txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
+                       dlist_insert_before(&subtxn->base_snapshot_node,
+                                                               
&txn->base_snapshot_node);
+
+                       /*
+                        * The subtransaction doesn't have a snapshot anymore 
(so it
+                        * mustn't be in the list.)
+                        */
+                       subtxn->base_snapshot = NULL;
+                       subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
+                       dlist_delete(&subtxn->base_snapshot_node);
+               }
+               else
+               {
+                       /* Base snap of toplevel is fine, so subxact's is not 
needed */
+                       SnapBuildSnapDecRefcount(subtxn->base_snapshot);
+                       dlist_delete(&subtxn->base_snapshot_node);
+                       subtxn->base_snapshot = NULL;
+                       subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
+               }
        }
 }
 
@@ -707,7 +855,6 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId 
xid,
                                                 TransactionId subxid, 
XLogRecPtr commit_lsn,
                                                 XLogRecPtr end_lsn)
 {
-       ReorderBufferTXN *txn;
        ReorderBufferTXN *subtxn;
 
        subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
@@ -719,42 +866,14 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId 
xid,
        if (!subtxn)
                return;
 
-       txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
-
-       if (txn == NULL)
-               elog(ERROR, "subxact logged without previous toplevel record");
-
-       /*
-        * Pass our base snapshot to the parent transaction if it doesn't have
-        * one, or ours is older. That can happen if there are no changes in the
-        * toplevel transaction but in one of the child transactions. This 
allows
-        * the parent to simply use its base snapshot initially.
-        */
-       if (subtxn->base_snapshot != NULL &&
-               (txn->base_snapshot == NULL ||
-                txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
-       {
-               txn->base_snapshot = subtxn->base_snapshot;
-               txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
-               subtxn->base_snapshot = NULL;
-               subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
-       }
-
        subtxn->final_lsn = commit_lsn;
        subtxn->end_lsn = end_lsn;
 
-       if (!subtxn->is_known_as_subxact)
-       {
-               subtxn->is_known_as_subxact = true;
-               Assert(subtxn->nsubtxns == 0);
-
-               /* remove from lsn order list of top-level transactions */
-               dlist_delete(&subtxn->node);
-
-               /* add to subtransaction list */
-               dlist_push_tail(&txn->subtxns, &subtxn->node);
-               txn->nsubtxns++;
-       }
+       /*
+        * Assign this subxact as a child of the toplevel xact (no-op if already
+        * done.)
+        */
+       ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
 }
 
 
@@ -1078,11 +1197,16 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, 
ReorderBufferTXN *txn)
                ReorderBufferReturnChange(rb, change);
        }
 
+       /*
+        * If base snap is set, clean it up and remove txn from corresponding
+        * containing list
+        */
        if (txn->base_snapshot != NULL)
        {
                SnapBuildSnapDecRefcount(txn->base_snapshot);
                txn->base_snapshot = NULL;
                txn->base_snapshot_lsn = InvalidXLogRecPtr;
+               dlist_delete(&txn->base_snapshot_node);
        }
 
        /*
@@ -1257,17 +1381,17 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
 }
 
 /*
- * Perform the replay of a transaction and it's non-aborted subtransactions.
+ * Perform the replay of a transaction and its non-aborted subtransactions.
  *
  * Subtransactions previously have to be processed by
  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
  * transaction with ReorderBufferAssignChild.
  *
- * We currently can only decode a transaction's contents in when their commit
- * record is read because that's currently the only place where we know about
- * cache invalidations. Thus, once a toplevel commit is read, we iterate over
- * the top and subtransactions (using a k-way merge) and replay the changes in
- * lsn order.
+ * We currently can only decode a transaction's contents when its commit
+ * record is read because that's the only place where we know about cache
+ * invalidations. Thus, once a toplevel commit is read, we iterate over the top
+ * and subtransactions (using a k-way merge) and replay the changes in lsn
+ * order.
  */
 void
 ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
@@ -1295,10 +1419,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId 
xid,
        txn->origin_lsn = origin_lsn;
 
        /*
-        * If this transaction didn't have any real changes in our database, 
it's
-        * OK not to have a snapshot. Note that ReorderBufferCommitChild will 
have
-        * transferred its snapshot to this transaction if it had one and the
-        * toplevel tx didn't.
+        * If this transaction has no snapshot, it didn't make any changes to 
the
+        * database, so there's nothing to decode.  Note that
+        * ReorderBufferCommitChild will have transferred any snapshots from
+        * subtransactions if there were any.
         */
        if (txn->base_snapshot == NULL)
        {
@@ -1861,12 +1985,19 @@ ReorderBufferAddSnapshot(ReorderBuffer *rb, 
TransactionId xid,
 }
 
 /*
- * Setup the base snapshot of a transaction. The base snapshot is the snapshot
- * that is used to decode all changes until either this transaction modifies
- * the catalog or another catalog modifying transaction commits.
+ * Set up the transaction's base snapshot.
  *
  * Needs to be called before any changes are added with
  * ReorderBufferQueueChange().
+#if 0
+ * The base snapshot is the snapshot
+ * that is used to decode all changes until either this transaction modifies
+ * the catalog or another catalog modifying transaction commits.
+ * If we know that xid is a subtransaction, we just make sure that its
+ * top-level transaction has a base snapshot.
+ * as its base or queued in
+ * its change queue.
+#endif
  */
 void
 ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
@@ -1875,12 +2006,23 @@ ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, 
TransactionId xid,
        ReorderBufferTXN *txn;
        bool            is_new;
 
+       AssertArg(snap != NULL);
+
+       /*
+        * Fetch the transaction to operate on.  If we know it's a 
subtransaction,
+        * operate on its top-level transaction instead.
+        */
        txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
+       if (txn->is_known_as_subxact)
+               txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
+                                                                       NULL, 
InvalidXLogRecPtr, false);
        Assert(txn->base_snapshot == NULL);
-       Assert(snap != NULL);
 
        txn->base_snapshot = snap;
        txn->base_snapshot_lsn = lsn;
+       dlist_push_tail(&rb->txns_by_base_snapshot_lsn, 
&txn->base_snapshot_node);
+
+       AssertTXNLsnOrder(rb);
 }
 
 /*
@@ -1999,25 +2141,26 @@ ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, 
TransactionId xid)
 }
 
 /*
- * Have we already added the first snapshot?
+ * ReorderBufferXidHasBaseSnapshot
+ *             Have we already set the base snapshot for the given txn/subtxn?
  */
 bool
 ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
 {
        ReorderBufferTXN *txn;
 
-       txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
-                                                               false);
+       txn = ReorderBufferTXNByXid(rb, xid, false,
+                                                               NULL, 
InvalidXLogRecPtr, false);
 
        /* transaction isn't known yet, ergo no snapshot */
        if (txn == NULL)
                return false;
 
-       /*
-        * TODO: It would be a nice improvement if we would check the toplevel
-        * transaction in subtransactions, but we'd need to keep track of a bit
-        * more state.
-        */
+       /* a known subtxn? operate on top-level txn instead */
+       if (txn->is_known_as_subxact)
+               txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
+                                                                       NULL, 
InvalidXLogRecPtr, false);
+
        return txn->base_snapshot != NULL;
 }
 
diff --git a/src/backend/replication/logical/snapbuild.c 
b/src/backend/replication/logical/snapbuild.c
index 2c4a1bab4b..ecf51d6b89 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -830,9 +830,9 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, 
XLogRecPtr lsn)
                 * all. We'll add a snapshot when the first change gets queued.
                 *
                 * NB: This works correctly even for subtransactions because
-                * ReorderBufferCommitChild() takes care to pass the parent the 
base
-                * snapshot, and while iterating the changequeue we'll get the 
change
-                * from the subtxn.
+                * ReorderBufferAssignChild() takes care to transfer the base 
snapshot
+                * to the top-level transaction, and while iterating the 
changequeue
+                * we'll get the change from the subtxn.
                 */
                if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, 
txn->xid))
                        continue;
@@ -1094,6 +1094,7 @@ void
 SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, 
xl_running_xacts *running)
 {
        ReorderBufferTXN *txn;
+       TransactionId oldest_xmin;
 
        /*
         * If we're not consistent yet, inspect the record to see whether it
@@ -1132,9 +1133,17 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, 
XLogRecPtr lsn, xl_running_xact
 
        /*
         * Increase shared memory limits, so vacuum can work on tuples we
-        * prevented from being pruned till now.
+        * prevented from being pruned till now. We ask reorderbuffer which
+        * minimal xid might still be running in snapshots of xacts currently
+        * being reordered. If rb doesn't have any xacts with snapshots, we need
+        * to care only about xids which will be considered as running by
+        * snapshots we will produce later, and min of them can't be less than
+        * oldest running xid in the record we are reading.
         */
-       LogicalIncreaseXminForSlot(lsn, running->oldestRunningXid);
+       oldest_xmin = ReorderBufferGetOldestXmin(builder->reorder);
+       if (oldest_xmin == InvalidTransactionId)
+               oldest_xmin = running->oldestRunningXid;
+       LogicalIncreaseXminForSlot(lsn, oldest_xmin);
 
        /*
         * Also tell the slot where we can restart decoding from. We don't want 
to
diff --git a/src/include/replication/reorderbuffer.h 
b/src/include/replication/reorderbuffer.h
index 1c7982958e..f8a295bddc 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -160,10 +160,9 @@ typedef struct ReorderBufferTXN
        /* did the TX have catalog changes */
        bool            has_catalog_changes;
 
-       /*
-        * Do we know this is a subxact?
-        */
+       /* Do we know this is a subxact?  Xid of top-level txn if so */
        bool            is_known_as_subxact;
+       TransactionId toplevel_xid;
 
        /*
         * LSN of the first data carrying, WAL record with knowledge about this
@@ -209,10 +208,13 @@ typedef struct ReorderBufferTXN
        TimestampTz commit_time;
 
        /*
-        * Base snapshot or NULL.
+        * The base snapshot is used to decode all changes until either this
+        * transaction modifies the catalog, or another catalog-modifying
+        * transaction commits.
         */
        Snapshot        base_snapshot;
        XLogRecPtr      base_snapshot_lsn;
+       dlist_node      base_snapshot_node;     /* link in 
txns_by_base_snapshot_lsn */
 
        /*
         * How many ReorderBufferChange's do we have in this txn.
@@ -279,7 +281,7 @@ typedef struct ReorderBufferTXN
         * Position in one of three lists:
         * * list of subtransactions if we are *known* to be subxact
         * * list of toplevel xacts (can be an as-yet unknown subxact)
-        * * list of preallocated ReorderBufferTXNs
+        * * list of preallocated ReorderBufferTXNs (if unused)
         * ---
         */
        dlist_node      node;
@@ -338,6 +340,15 @@ struct ReorderBuffer
        dlist_head      toplevel_by_lsn;
 
        /*
+        * Transactions and subtransactions that have a base snapshot, ordered 
by
+        * LSN of the record which caused us to first obtain the base snapshot.
+        * This is not the same as toplevel_by_lsn, because we only set the base
+        * snapshot on the first logical-decoding-relevant record (eg. heap
+        * writes), whereas the initial LSN could be set by other operations.
+        */
+       dlist_head      txns_by_base_snapshot_lsn;
+
+       /*
         * one-entry sized cache for by_txn. Very frequently the same txn gets
         * looked up over and over again.
         */
@@ -422,6 +433,7 @@ bool                
ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
 bool           ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId 
xid);
 
 ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
+TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
 
 void           ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 

Reply via email to