Ajin, Amit,
thank you both a lot for thinking this through and even providing a patch.
The changes in expectation for twophase.out matches exactly with what I
prepared. And the switch with pg_logical_slot_get_changes indeed is
something I had not yet considered, either.
On 19.02.21 03:50, Ajin Cherian wrote:
For this, I am planning to change the semantics such that
two-phase-commit can only be specified while creating the slot using
pg_create_logical_replication_slot()
and not in pg_logical_slot_get_changes, thus preventing
two-phase-commit flag from being toggled between restarts of the
decoder. Let me know if anybody objects to this
change, else I will update that in the next patch.
This sounds like a good plan to me, yes.
However, more generally speaking, I suspect you are overthinking this.
All of the complexity arises because of the assumption that an output
plugin receiving and confirming a PREPARE may not be able to persist
that first phase of transaction application. Instead, you are trying to
somehow resurrect the transactional changes and the prepare at COMMIT
PREPARED time and decode it in a deferred way.
Instead, I'm arguing that a PREPARE is an atomic operation just like a
transaction's COMMIT. The decoder should always feed these in the order
of appearance in the WAL. For example, if you have PREAPRE A, COMMIT B,
COMMIT PREPARED A in the WAL, the decoder should always output these
events in exactly that order. And not ever COMMIT B, PREPARE A, COMMIT
PREPARED A (which is currently violated in the expectation for
twophase_snapshot, because the COMMIT for `s1insert` there appears after
the PREPARE of `s2p` in the WAL, but gets decoded before it).
The patch I'm attaching corrects this expectation in twophase_snapshot,
adds an explanatory diagram, and eliminates any danger of sending
PREPAREs at COMMIT PREPARED time. Thereby preserving the ordering of
PREPAREs vs COMMITs.
Given the output plugin supports two-phase commit, I argue there must be
a good reason for it setting the start_decoding_at LSN to a point in
time after a PREPARE. To me that means the output plugin (or its
downstream replica) has processed the PREPARE (and the downstream
replica did whatever it needed to do on its side in order to make the
transaction ready to be committed in a second phase).
(In the weird case of an output plugin that wants to enable two-phase
commit but does not really support it downstream, it's still possible
for it to hold back LSN confirmations for prepared-but-still-in-flight
transactions. However, I'm having a hard time justifying this use case.)
With that line of thinking, the point in time (or in WAL) of the COMMIT
PREPARED does not matter at all to reason about the decoding of the
PREPARE operation. Instead, there are only exactly two cases to consider:
a) the PREPARE happened before the start_decoding_at LSN and must not be
decoded. (But the effects of the PREPARE must then be included in the
initial synchronization. If that's not supported, the output plugin
should not enable two-phase commit.)
b) the PREPARE happens after the start_decoding_at LSN and must be
decoded. (It obviously is not included in the initial synchronization
or decoded by a previous instance of the decoder process.)
The case where the PREPARE lies before SNAPBUILD_CONSISTENT must always
be case a) where we must not repeat the PREPARE, anyway. And in case b)
where we need a consistent snapshot to decode the PREPARE, existing
provisions already guarantee that to be possible (or how would this be
different from a regular single-phase commit?).
Please let me know what you think and whether this approach is feasible
for you as well.
Regards
Markus
>From ed03c463175733072edf8afb8d120a1285a3194f Mon Sep 17 00:00:00 2001
From: Markus Wanner <markus.wan...@enterprisedb.com>
Date: Tue, 9 Feb 2021 16:16:13 +0100
Subject: [PATCH] Preserve ordering of PREPAREs vs COMMITs in logical decoding
Decouple decoding of the prepare phase of a two-phase transaction from
the final commit (or rollback) of a two-phase transaction, so that
these are more like atomic operations which preserve the ordering in
WAL. And so that transactional changes of a PREPARE are not ever
provided to the output plugin unnecessarily.
Correct test expectations to expect no duplication. Add a variant with
a ROLLBACK PREPARED to twophase_snapshot and illustrate the test case
with an explanatory diagram.
---
contrib/test_decoding/expected/twophase.out | 38 ++++---------
.../expected/twophase_snapshot.out | 40 ++++++++++++-
.../expected/twophase_stream.out | 28 +---------
.../specs/twophase_snapshot.spec | 56 ++++++++++++++++---
doc/src/sgml/logicaldecoding.sgml | 17 ++++--
.../replication/logical/reorderbuffer.c | 29 ----------
6 files changed, 112 insertions(+), 96 deletions(-)
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 5a62ab8bbc1..d0d805e5c0e 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2695,35 +2695,6 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
/* add the gid in the txn */
txn->gid = pstrdup(gid);
- /*
- * It is possible that this transaction is not decoded at prepare time
- * either because by that time we didn't have a consistent snapshot or it
- * was decoded earlier but we have restarted. We can't distinguish between
- * those two cases so we send the prepare in both the cases and let
- * downstream decide whether to process or skip it. We don't need to
- * decode the xact for aborts if it is not done already.
- */
- if (!rbtxn_prepared(txn) && is_commit)
- {
- txn->txn_flags |= RBTXN_PREPARE;
-
- /*
- * The prepare info must have been updated in txn even if we skip
- * prepare.
- */
- Assert(txn->final_lsn != InvalidXLogRecPtr);
-
- /*
- * By this time the txn has the prepare record information and it is
- * important to use that so that downstream gets the accurate
- * information. If instead, we have passed commit information here
- * then downstream can behave as it has already replayed commit
- * prepared after the restart.
- */
- ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
- txn->commit_time, txn->origin_id, txn->origin_lsn);
- }
-
txn->final_lsn = commit_lsn;
txn->end_lsn = end_lsn;
txn->commit_time = commit_time;
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index cf705ed9cda..15820883a43 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -821,11 +821,18 @@ typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx
whenever the start of a prepared transaction has been decoded. The
<parameter>gid</parameter> field, which is part of the
<parameter>txn</parameter> parameter can be used in this callback to
- check if the plugin has already received this prepare in which case it
- can skip the remaining changes of the transaction. This can only happen
- if the user restarts the decoding after receiving the prepare for a
- transaction but before receiving the commit prepared say because of some
- error.
+ identify the transaction and later match it with invocations of
+ of <function>commit_prepared_cb</function>
+ or <function>rollback_prepared_cb</function>.
+ </para>
+
+ <para>
+ Note the start of a logical slot (by LSN) may fall in between
+ a <command>PREPARE</command> and its final <command>COMMIT
+ PREPARED</command> for the same transaction. Similarly it is possible
+ for a decoder to restart in between. Therefore, an output plugin needs
+ to be prepared to handle a final commit or rollback for a gid it has not
+ ever seen in its lifetime.
<programlisting>
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
diff --git a/contrib/test_decoding/expected/twophase.out b/contrib/test_decoding/expected/twophase.out
index f9f6bedd1cf..c51870f8dd1 100644
--- a/contrib/test_decoding/expected/twophase.out
+++ b/contrib/test_decoding/expected/twophase.out
@@ -33,14 +33,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
COMMIT PREPARED 'test_prepared#1';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
- data
-----------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:1
- table public.test_prepared1: INSERT: id[integer]:2
- PREPARE TRANSACTION 'test_prepared#1'
+ data
+-----------------------------------
COMMIT PREPARED 'test_prepared#1'
-(5 rows)
+(1 row)
-- Test that rollback of a prepared xact is decoded.
BEGIN;
@@ -103,13 +99,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
COMMIT PREPARED 'test_prepared#3';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
- data
--------------------------------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar'
- PREPARE TRANSACTION 'test_prepared#3'
+ data
+-----------------------------------
COMMIT PREPARED 'test_prepared#3'
-(4 rows)
+(1 row)
-- make sure stuff still works
INSERT INTO test_prepared1 VALUES (6);
@@ -159,14 +152,10 @@ RESET statement_timeout;
COMMIT PREPARED 'test_prepared_lock';
-- consume the commit
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
- data
----------------------------------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol'
- table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2'
- PREPARE TRANSACTION 'test_prepared_lock'
+ data
+--------------------------------------
COMMIT PREPARED 'test_prepared_lock'
-(5 rows)
+(1 row)
-- Test savepoints and sub-xacts. Creating savepoints will create
-- sub-xacts implicitly.
@@ -189,13 +178,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
COMMIT PREPARED 'test_prepared_savepoint';
-- consume the commit
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
- data
-------------------------------------------------------------
- BEGIN
- table public.test_prepared_savepoint: INSERT: a[integer]:1
- PREPARE TRANSACTION 'test_prepared_savepoint'
+ data
+-------------------------------------------
COMMIT PREPARED 'test_prepared_savepoint'
-(4 rows)
+(1 row)
-- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
BEGIN;
diff --git a/contrib/test_decoding/expected/twophase_snapshot.out b/contrib/test_decoding/expected/twophase_snapshot.out
index 14d93876462..0b51ead97f7 100644
--- a/contrib/test_decoding/expected/twophase_snapshot.out
+++ b/contrib/test_decoding/expected/twophase_snapshot.out
@@ -32,10 +32,44 @@ step s2cp: COMMIT PREPARED 'test1';
step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1', 'two-phase-commit', '1');
data
-BEGIN
-table public.do_write: INSERT: id[integer]:1
-PREPARE TRANSACTION 'test1'
COMMIT PREPARED 'test1'
?column?
stop
+
+starting permutation: s2b s2txid s1init s3b s3txid s2c s2b s2insert s2p s3c s1insert s1start s2rp s1start
+step s2b: BEGIN;
+step s2txid: SELECT pg_current_xact_id() IS NULL;
+?column?
+
+f
+step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); <waiting ...>
+step s3b: BEGIN;
+step s3txid: SELECT pg_current_xact_id() IS NULL;
+?column?
+
+f
+step s2c: COMMIT;
+step s2b: BEGIN;
+step s2insert: INSERT INTO do_write DEFAULT VALUES;
+step s2p: PREPARE TRANSACTION 'test1';
+step s3c: COMMIT;
+step s1init: <... completed>
+?column?
+
+init
+step s1insert: INSERT INTO do_write DEFAULT VALUES;
+step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1', 'two-phase-commit', '1');
+data
+
+BEGIN
+table public.do_write: INSERT: id[integer]:2
+COMMIT
+step s2rp: ROLLBACK PREPARED 'test1';
+step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1', 'two-phase-commit', '1');
+data
+
+ROLLBACK PREPARED 'test1'
+?column?
+
+stop
diff --git a/contrib/test_decoding/expected/twophase_stream.out b/contrib/test_decoding/expected/twophase_stream.out
index 3acc4acd365..d54e640b409 100644
--- a/contrib/test_decoding/expected/twophase_stream.out
+++ b/contrib/test_decoding/expected/twophase_stream.out
@@ -60,32 +60,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-
COMMIT PREPARED 'test1';
--should show the COMMIT PREPARED and the other changes in the transaction
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
- data
--------------------------------------------------------------
- BEGIN
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
- PREPARE TRANSACTION 'test1'
+ data
+-------------------------
COMMIT PREPARED 'test1'
-(23 rows)
+(1 row)
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
-- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
diff --git a/contrib/test_decoding/specs/twophase_snapshot.spec b/contrib/test_decoding/specs/twophase_snapshot.spec
index 3e700404e0e..f4df24e7353 100644
--- a/contrib/test_decoding/specs/twophase_snapshot.spec
+++ b/contrib/test_decoding/specs/twophase_snapshot.spec
@@ -28,6 +28,7 @@ step "s2c" { COMMIT; }
step "s2insert" { INSERT INTO do_write DEFAULT VALUES; }
step "s2p" { PREPARE TRANSACTION 'test1'; }
step "s2cp" { COMMIT PREPARED 'test1'; }
+step "s2rp" { ROLLBACK PREPARED 'test1'; }
session "s3"
@@ -37,17 +38,56 @@ step "s3b" { BEGIN; }
step "s3txid" { SELECT pg_current_xact_id() IS NULL; }
step "s3c" { COMMIT; }
-# Force building of a consistent snapshot between a PREPARE and COMMIT PREPARED
-# and ensure that the whole transaction is decoded at the time of COMMIT
-# PREPARED.
+# Force building of a consistent snapshot between a PREPARE and COMMIT
+# PREPARED, where the changes together with the prepare are decoded
+# while the transaction still is in progress at the point in time the
+# consistent snapshot was taken.
#
# 's1init' step will initialize the replication slot and cause logical decoding
# to wait in initial starting point till the in-progress transaction in s2 is
# committed. 's2c' step will cause logical decoding to go to initial consistent
# point and wait for in-progress transaction s3 to commit. 's3c' step will cause
-# logical decoding to find a consistent point while the transaction s2 is
-# prepared and not yet committed. This will cause the first s1start to skip
-# prepared transaction s2 as that will be before consistent point. The second
-# s1start will allow decoding of skipped prepare along with commit prepared done
-# as part of s2cp.
+# logical decoding to find a consistent point while the transaction in s2 is
+# prepared but not yet committed.
+#
+# In this case, the prepare of the two-phase transaction is considered
+# to have happened prior to the creation of the logical slot. A
+# replica or any kind of consumer that is synchronized up to this
+# specific point in time should already know about that prepared
+# transaction (by gid) and be ready to process a final "commit
+# prepared" or "rollback prepared" for it.
+#
+# The following diagram shows the timeline of events. The '|' (pipe)
+# stands for a transaction in progress that's neither committed nor
+# prepared, while '.' (dot) stands for a prepared, but still
+# uncommitted transaction.
+#
+# s2b
+# s1init | <---- Start to state SNAPBUILD_BUILDING_SNAPSHOT with
+# | | one transaction in progress (by s2).
+# | | s3b
+# | | | The commit in s2 allows s1 to enter
+# | s2c | <---- SNAPBUILD_FULL_SNAPSHOT with a different
+# | | transaction still in progress (by s3).
+# | s2b |
+# | s2insert | The two-phase transaction to test started in s2,
+# | | | <---- so s2 and s3 now both have a transaction in
+# | s2p | progress.
+# | . |
+# | . s3c Immediately after the commit in s3, s1 is allowed
+# o . <---- to proceed to SNAPSHOT_CONSISTENT and terminate
+# . the creation of the logical slot.
+# .
+# . minimal transaction by s1, the first that starts
+# s1insert . <---- after the slot creation has completed, the
+# . prepared transaction in s2 still in progress
+# s1start .
+# s2cp since the transaction in s2 started before
+# s1start <---- the consistent point, none of its changes must be
+# decoded. The final commit prepared must still
+# be delivered.
+
permutation "s2b" "s2txid" "s1init" "s3b" "s3txid" "s2c" "s2b" "s2insert" "s2p" "s3c" "s1insert" "s1start" "s2cp" "s1start"
+
+# Equivalent test case with ROLLBACK PREPARED instead.
+permutation "s2b" "s2txid" "s1init" "s3b" "s3txid" "s2c" "s2b" "s2insert" "s2p" "s3c" "s1insert" "s1start" "s2rp" "s1start"
--
2.30.0