From 6ef2952862219a522b218eee7372bdc4ad4e9f5d Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Thu, 18 Mar 2021 07:22:07 -0400
Subject: [PATCH v62] Support 2PC txn - subscriber tests.

This patch adds the two-phase commit subscriber test code.
---
 src/test/subscription/t/020_twophase.pl         | 337 ++++++++++++++++++++++++
 src/test/subscription/t/021_twophase_cascade.pl | 280 ++++++++++++++++++++
 2 files changed, 617 insertions(+)
 create mode 100644 src/test/subscription/t/020_twophase.pl
 create mode 100644 src/test/subscription/t/021_twophase_cascade.pl

diff --git a/src/test/subscription/t/020_twophase.pl b/src/test/subscription/t/020_twophase.pl
new file mode 100644
index 0000000..b135997
--- /dev/null
+++ b/src/test/subscription/t/020_twophase.pl
@@ -0,0 +1,337 @@
+# logical replication of 2PC test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 20;
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_subscriber->start;
+
+# Create some pre-existing content on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_full (a int PRIMARY KEY)");
+$node_publisher->safe_psql('postgres', "
+	BEGIN;
+	INSERT INTO tab_full SELECT generate_series(1,10);
+	PREPARE TRANSACTION 'some_initial_data';
+	COMMIT PREPARED 'some_initial_data';");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR TABLE tab_full");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',	"
+	CREATE SUBSCRIPTION tap_sub
+	CONNECTION '$publisher_connstr application_name=$appname'
+	PUBLICATION tap_pub
+	WITH (two_phase = on)");
+
+# Wait for subscriber to finish initialization
+my $caughtup_query =
+	"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';";
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# Also wait for initial table sync to finish
+my $synced_query =
+	"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Also wait for two-phase to be enabled
+my $twophase_query =
+	"SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');";
+$node_subscriber->poll_query_until('postgres', $twophase_query)
+  or die "Timed out while waiting for subscriber to enable twophase";
+
+###############################
+# check that 2PC gets replicated to subscriber
+# then COMMIT PREPARED
+###############################
+
+$node_publisher->safe_psql('postgres', "
+	BEGIN;
+	INSERT INTO tab_full VALUES (11);
+	PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is in prepared state on subscriber
+my $result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# check that 2PC gets committed on subscriber
+$node_publisher->safe_psql('postgres',
+	"COMMIT PREPARED 'test_prepared_tab_full';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is committed on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a = 11;");
+is($result, qq(1), 'Row inserted via 2PC has committed on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+is($result, qq(0), 'transaction is committed on subscriber');
+
+###############################
+# check that 2PC gets replicated to subscriber
+# then ROLLBACK PREPARED
+###############################
+
+$node_publisher->safe_psql('postgres',"
+	BEGIN;
+	INSERT INTO tab_full VALUES (12);
+	PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# check that 2PC gets aborted on subscriber
+$node_publisher->safe_psql('postgres',
+	"ROLLBACK PREPARED 'test_prepared_tab_full';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is aborted on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a = 12;");
+is($result, qq(0), 'Row inserted via 2PC is not present on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+is($result, qq(0), 'transaction is aborted on subscriber');
+
+###############################
+# Check that ROLLBACK PREPARED is decoded properly on crash restart
+# (publisher and subscriber crash)
+###############################
+
+$node_publisher->safe_psql('postgres', "
+    BEGIN;
+    INSERT INTO tab_full VALUES (12);
+    INSERT INTO tab_full VALUES (13);
+    PREPARE TRANSACTION 'test_prepared_tab';");
+
+$node_subscriber->stop('immediate');
+$node_publisher->stop('immediate');
+
+$node_publisher->start;
+$node_subscriber->start;
+
+# rollback post the restart
+$node_publisher->safe_psql('postgres',
+	"ROLLBACK PREPARED 'test_prepared_tab';");
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check inserts are rolled back
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a IN (12,13);");
+is($result, qq(0), 'Rows rolled back are not on the subscriber');
+
+###############################
+# Check that COMMIT PREPARED is decoded properly on crash restart
+# (publisher and subscriber crash)
+###############################
+
+$node_publisher->safe_psql('postgres', "
+    BEGIN;
+    INSERT INTO tab_full VALUES (12);
+    INSERT INTO tab_full VALUES (13);
+    PREPARE TRANSACTION 'test_prepared_tab';");
+
+$node_subscriber->stop('immediate');
+$node_publisher->stop('immediate');
+
+$node_publisher->start;
+$node_subscriber->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres',
+	"COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a IN (12,13);");
+is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber');
+
+###############################
+# Check that COMMIT PREPARED is decoded properly on crash restart
+# (subscriber only crash)
+###############################
+
+$node_publisher->safe_psql('postgres', "
+    BEGIN;
+    INSERT INTO tab_full VALUES (14);
+    INSERT INTO tab_full VALUES (15);
+    PREPARE TRANSACTION 'test_prepared_tab';");
+
+$node_subscriber->stop('immediate');
+$node_subscriber->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres',
+	"COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a IN (14,15);");
+is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber');
+
+###############################
+# Check that COMMIT PREPARED is decoded properly on crash restart
+# (publisher only crash)
+###############################
+
+$node_publisher->safe_psql('postgres', "
+    BEGIN;
+    INSERT INTO tab_full VALUES (16);
+    INSERT INTO tab_full VALUES (17);
+    PREPARE TRANSACTION 'test_prepared_tab';");
+
+$node_publisher->stop('immediate');
+$node_publisher->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres',
+	"COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a IN (16,17);");
+is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber');
+
+###############################
+# Test nested transaction with 2PC
+###############################
+
+# check that 2PC gets replicated to subscriber
+$node_publisher->safe_psql('postgres', "
+	BEGIN;
+	INSERT INTO tab_full VALUES (21);
+	SAVEPOINT sp_inner;
+	INSERT INTO tab_full VALUES (22);
+	ROLLBACK TO SAVEPOINT sp_inner;
+	PREPARE TRANSACTION 'outer';
+	");
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'outer';");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# COMMIT
+$node_publisher->safe_psql('postgres', "
+	COMMIT PREPARED 'outer';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check the tx state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'outer';");
+is($result, qq(0), 'transaction is ended on subscriber');
+
+# check inserts are visible. 22 should be rolled back. 21 should be committed.
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a IN (21);");
+is($result, qq(1), 'Rows committed are on the subscriber');
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a IN (22);");
+is($result, qq(0), 'Rows rolled back are not on the subscriber');
+
+###############################
+# Test using empty GID
+###############################
+
+# check that 2PC gets replicated to subscriber
+$node_publisher->safe_psql('postgres', "
+	BEGIN;
+	INSERT INTO tab_full VALUES (51);
+	PREPARE TRANSACTION '';");
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = '';");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# ROLLBACK
+$node_publisher->safe_psql('postgres',
+	"ROLLBACK PREPARED '';");
+
+# check that 2PC gets aborted on subscriber
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = '';");
+is($result, qq(0), 'transaction is aborted on subscriber');
+
+###############################
+# check all the cleanup
+###############################
+
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber');
+
+$result = $node_publisher->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0),
+	'check subscription relation status was dropped on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
diff --git a/src/test/subscription/t/021_twophase_cascade.pl b/src/test/subscription/t/021_twophase_cascade.pl
new file mode 100644
index 0000000..8e9dfdc
--- /dev/null
+++ b/src/test/subscription/t/021_twophase_cascade.pl
@@ -0,0 +1,280 @@
+# Test cascading logical replication of 2PC.
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 29;
+
+###############################
+# Setup a cascade of pub/sub nodes.
+# node_A -> node_B -> node_C
+###############################
+
+# Initialize nodes
+# node_A
+my $node_A = get_new_node('node_A');
+$node_A->init(allows_streaming => 'logical');
+$node_A->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_A->start;
+# node_B
+my $node_B = get_new_node('node_B');
+$node_B->init(allows_streaming => 'logical');
+$node_B->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_B->start;
+# node_C
+my $node_C = get_new_node('node_C');
+$node_C->init(allows_streaming => 'logical');
+$node_C->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_C->start;
+
+# Create some pre-existing content on node_A
+$node_A->safe_psql('postgres',
+	"CREATE TABLE tab_full (a int PRIMARY KEY)");
+$node_A->safe_psql('postgres', "
+	INSERT INTO tab_full SELECT generate_series(1,10);");
+
+# Create the same tables on node_B amd node_C
+$node_B->safe_psql('postgres',
+	"CREATE TABLE tab_full (a int PRIMARY KEY)");
+$node_C->safe_psql('postgres',
+	"CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Setup logical replication
+
+# node_A (pub) -> node_B (sub)
+my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
+$node_A->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_A FOR TABLE tab_full");
+my $appname_B = 'tap_sub_B';
+$node_B->safe_psql('postgres',	"
+	CREATE SUBSCRIPTION tap_sub_B
+	CONNECTION '$node_A_connstr application_name=$appname_B'
+	PUBLICATION tap_pub_A
+	WITH (two_phase = on)");
+
+# node_B (pub) -> node_C (sub)
+my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
+$node_B->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_B FOR TABLE tab_full");
+my $appname_C = 'tap_sub_C';
+$node_C->safe_psql('postgres',	"
+	CREATE SUBSCRIPTION tap_sub_C
+	CONNECTION '$node_B_connstr application_name=$appname_C'
+	PUBLICATION tap_pub_B
+	WITH (two_phase = on)");
+
+# Wait for subscribers to finish initialization
+my $caughtup_query_B =
+	"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname_B';";
+$node_A->poll_query_until('postgres', $caughtup_query_B)
+	or die "Timed out while waiting for subscriber B to catch up";
+my $caughtup_query_C =
+	"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname_C';";
+$node_B->poll_query_until('postgres', $caughtup_query_C)
+	or die "Timed out while waiting for subscriber C to catch up";
+
+# Also wait for two-phase to be enabled
+my $twophase_query =
+	"SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');";
+$node_B->poll_query_until('postgres', $twophase_query)
+	or die "Timed out while waiting for subscriber to enable twophase";
+$node_C->poll_query_until('postgres', $twophase_query)
+	or die "Timed out while waiting for subscriber to enable twophase";
+
+is(1,1, "Cascade setup is complete");
+
+my $result;
+
+###############################
+# check that 2PC gets replicated to subscriber(s)
+# then COMMIT PREPARED
+###############################
+
+# 2PC PREPARE
+$node_A->safe_psql('postgres', "
+	BEGIN;
+	INSERT INTO tab_full VALUES (11);
+	PREPARE TRANSACTION 'test_prepared_tab_full';");
+$node_A->poll_query_until('postgres', $caughtup_query_B)
+	or die "Timed out while waiting for subscriber B to catch up";
+$node_B->poll_query_until('postgres', $caughtup_query_C)
+	or die "Timed out while waiting for subscriber C to catch up";
+
+# check the tx state is prepared on subscriber(s)
+$result = $node_B->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+is($result, qq(1), 'transaction is prepared on subscriber B');
+$result = $node_C->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+is($result, qq(1), 'transaction is prepared on subscriber C');
+
+# 2PC COMMIT
+$node_A->safe_psql('postgres',
+	"COMMIT PREPARED 'test_prepared_tab_full';");
+$node_A->poll_query_until('postgres', $caughtup_query_B)
+	or die "Timed out while waiting for subscriber B to catch up";
+$node_B->poll_query_until('postgres', $caughtup_query_C)
+	or die "Timed out while waiting for subscriber C to catch up";
+
+# check that transaction was committed on subscriber(s)
+$result = $node_B->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a = 11;");
+is($result, qq(1), 'Row inserted via 2PC has committed on subscriber B');
+$result = $node_C->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a = 11;");
+is($result, qq(1), 'Row inserted via 2PC has committed on subscriber C');
+
+# check the tx state is ended on subscriber(s)
+$result = $node_B->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+is($result, qq(0), 'transaction is committed on subscriber B');
+$result = $node_C->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+is($result, qq(0), 'transaction is committed on subscriber C');
+
+###############################
+# check that 2PC gets replicated to subscriber(s)
+# then ROLLBACK PREPARED
+###############################
+
+# 2PC PREPARE
+$node_A->safe_psql('postgres', "
+	BEGIN;
+	INSERT INTO tab_full VALUES (12);
+	PREPARE TRANSACTION 'test_prepared_tab_full';");
+$node_A->poll_query_until('postgres', $caughtup_query_B)
+	or die "Timed out while waiting for subscriber B to catch up";
+$node_B->poll_query_until('postgres', $caughtup_query_C)
+	or die "Timed out while waiting for subscriber C to catch up";
+
+# check the tx state is prepared on subscriber(s)
+$result = $node_B->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+is($result, qq(1), 'transaction is prepared on subscriber B');
+$result = $node_C->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+is($result, qq(1), 'transaction is prepared on subscriber C');
+
+# 2PC ROLLBACK
+$node_A->safe_psql('postgres',
+	"ROLLBACK PREPARED 'test_prepared_tab_full';");
+$node_A->poll_query_until('postgres', $caughtup_query_B)
+	or die "Timed out while waiting for subscriber B to catch up";
+$node_B->poll_query_until('postgres', $caughtup_query_C)
+	or die "Timed out while waiting for subscriber C to catch up";
+
+# check that transaction is aborted on subscriber(s)
+$result = $node_B->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a = 12;");
+is($result, qq(0), 'Row inserted via 2PC is not present on subscriber B');
+$result = $node_C->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a = 12;");
+is($result, qq(0), 'Row inserted via 2PC is not present on subscriber C');
+
+# check the tx state is ended on subscriber(s)
+$result = $node_B->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+is($result, qq(0), 'transaction is ended on subscriber B');
+$result = $node_C->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+is($result, qq(0), 'transaction is ended on subscriber C');
+
+###############################
+# Test nested transactions with 2PC
+###############################
+
+# 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT
+$node_A->safe_psql('postgres', "
+	BEGIN;
+	INSERT INTO tab_full VALUES (21);
+	SAVEPOINT sp_inner;
+	INSERT INTO tab_full VALUES (22);
+	ROLLBACK TO SAVEPOINT sp_inner;
+	PREPARE TRANSACTION 'outer';
+	");
+$node_A->poll_query_until('postgres', $caughtup_query_B)
+	or die "Timed out while waiting for subscriber B to catch up";
+$node_B->poll_query_until('postgres', $caughtup_query_C)
+	or die "Timed out while waiting for subscriber C to catch up";
+
+# check the tx state prepared on subscriber(s)
+$result = $node_B->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'outer';");
+is($result, qq(1), 'transaction is prepared on subscriber B');
+$result = $node_C->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'outer';");
+is($result, qq(1), 'transaction is prepared on subscriber C');
+
+# 2PC COMMIT
+$node_A->safe_psql('postgres', "
+	COMMIT PREPARED 'outer';");
+$node_A->poll_query_until('postgres', $caughtup_query_B)
+	or die "Timed out while waiting for subscriber B to catch up";
+$node_B->poll_query_until('postgres', $caughtup_query_C)
+	or die "Timed out while waiting for subscriber C to catch up";
+
+# check the tx state is ended on subscriber
+$result = $node_B->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'outer';");
+is($result, qq(0), 'transaction is ended on subscriber B');
+$result = $node_C->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'outer';");
+is($result, qq(0), 'transaction is ended on subscriber C');
+
+# check inserts are visible at subscriber(s).
+# 22 should be rolled back.
+# 21 should be committed.
+$result = $node_B->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a IN (21);");
+is($result, qq(1), 'Rows committed are present on subscriber B');
+$result = $node_B->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a IN (22);");
+is($result, qq(0), 'Rows rolled back are not present on subscriber B');
+$result = $node_C->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a IN (21);");
+is($result, qq(1), 'Rows committed are present on subscriber C');
+$result = $node_C->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a IN (22);");
+is($result, qq(0), 'Rows rolled back are not present on subscriber C');
+
+###############################
+# check all the cleanup
+###############################
+
+# cleanup the node_B => node_C pub/sub
+$node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C");
+$result = $node_C->safe_psql('postgres',
+	"SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber node C');
+$result = $node_C->safe_psql('postgres',
+	"SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0), 'check subscription relation status was dropped on subscriber node C');
+$result = $node_C->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber node C');
+$result = $node_B->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher node B');
+
+# cleanup the node_A => node_B pub/sub
+$node_B->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_B");
+$result = $node_B->safe_psql('postgres',
+	"SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber node B');
+$result = $node_B->safe_psql('postgres',
+	"SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0), 'check subscription relation status was dropped on subscriber node B');
+$result = $node_B->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber node B');
+$result = $node_A->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher node A');
+
+# shutdown
+$node_C->stop('fast');
+$node_B->stop('fast');
+$node_A->stop('fast');
-- 
1.8.3.1

