From 5fe8f54ad16c3b8da0bf926293d7c4f1ce3fec8d Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Fri, 20 Jan 2023 16:59:37 +0800
Subject: [PATCH] Extend the logical_replication_mode to test the stream
 parallel mode

Use the use the existing developer option logical_replication_mode to test the
parallel apply of large transaction on subscriber.

When set to 'buffered', the leader sends changes to parallel apply workers via
shared memory queue. When set to 'immediate', the leader serializes all changes
to files and notifies the parallel apply workers to read and apply them at the
end of the transaction.

---
 doc/src/sgml/config.sgml                      | 12 ++++
 .../replication/logical/applyparallelworker.c | 12 ++--
 src/test/subscription/t/015_stream.pl         | 27 ++++++++
 .../t/018_stream_subxact_abort.pl             | 65 ++++++++++++++++++-
 .../subscription/t/023_twophase_stream.pl     | 45 ++++++++++++-
 5 files changed, 154 insertions(+), 7 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 275dcd55ba..87df627751 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -11708,6 +11708,18 @@ LOG:  CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1)
         or serialize changes when <varname>logical_decoding_work_mem</varname>
         is reached.
        </para>
+
+       <para>
+        On the subscriber side, if <literal>streaming</literal> option is set
+        to <literal>parallel</literal>, this parameter also allows the leader
+        apply worker to send changes to the shared memory queue or to serialize
+        changes. When set to <literal>buffered</literal>, the leader sends
+        changes to parallel apply workers via shared memory queue. When set to
+        <literal>immediate</literal>, the leader serializes all changes to
+        files and notifies the parallel apply workers to read and apply them at
+        the end of the transaction.
+       </para>
+
        <para>
         This parameter is intended to be used to test logical decoding and
         replication of large transactions for which otherwise we need to
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 3579e704fe..39e8f2d36d 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -1149,6 +1149,9 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
 	Assert(!IsTransactionState());
 	Assert(!winfo->serialize_changes);
 
+	if (logical_replication_mode == LOGICAL_REP_MODE_IMMEDIATE)
+		return false;
+
 /*
  * This timeout is a bit arbitrary but testing revealed that it is sufficient
  * to send the message unless the parallel apply worker is waiting on some
@@ -1187,12 +1190,7 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
 			startTime = GetCurrentTimestamp();
 		else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
 											SHM_SEND_TIMEOUT_MS))
-		{
-			ereport(LOG,
-					(errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
-							winfo->shared->xid)));
 			return false;
-		}
 	}
 }
 
@@ -1206,6 +1204,10 @@ void
 pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
 							   bool stream_locked)
 {
+	ereport(LOG,
+			(errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
+					winfo->shared->xid)));
+
 	/*
 	 * The parallel apply worker could be stuck for some reason (say waiting
 	 * on some lock by other backend), so stop trying to send data directly to
diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl
index 91e8aa8c0a..93086052f6 100644
--- a/src/test/subscription/t/015_stream.pl
+++ b/src/test/subscription/t/015_stream.pl
@@ -312,6 +312,33 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
 is($result, qq(10000), 'data replicated to subscriber after dropping index');
 
+# Test serializing changes to files and notify the parallel apply worker to
+# apply them at the end of transaction.
+$node_subscriber->append_conf('postgresql.conf',
+	'logical_replication_mode = immediate');
+$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning");
+$node_subscriber->reload;
+
+# Run a query to make sure that the reload has taken effect.
+$node_subscriber->safe_psql('postgres', q{SELECT 1});
+
+$offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i)");
+
+# Ensure that the messages are serialized.
+$node_subscriber->wait_for_log(
+	qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
+	$offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is committed on subscriber
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(15000), 'all changes are replayed from file');
+
 $node_subscriber->stop;
 $node_publisher->stop;
 
diff --git a/src/test/subscription/t/018_stream_subxact_abort.pl b/src/test/subscription/t/018_stream_subxact_abort.pl
index 814daf4d2f..1fa0ae0b05 100644
--- a/src/test/subscription/t/018_stream_subxact_abort.pl
+++ b/src/test/subscription/t/018_stream_subxact_abort.pl
@@ -143,15 +143,17 @@ $node_publisher->safe_psql('postgres',
 	"CREATE TABLE test_tab (a int primary key, b varchar)");
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
 
 # Setup structure on subscriber
 $node_subscriber->safe_psql('postgres',
 	"CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
 
 # Setup logical replication
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
 $node_publisher->safe_psql('postgres',
-	"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+	"CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2");
 
 my $appname = 'tap_sub';
 
@@ -198,6 +200,67 @@ $node_subscriber->safe_psql('postgres', q{SELECT 1});
 
 test_streaming($node_publisher, $node_subscriber, $appname, 1);
 
+# Test serializing changes to files and notify the parallel apply worker to
+# apply them at the end of transaction.
+$node_subscriber->append_conf('postgresql.conf',
+	'logical_replication_mode = immediate');
+$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning");
+$node_subscriber->reload;
+
+# Run a query to make sure that the reload has taken effect.
+$node_subscriber->safe_psql('postgres', q{SELECT 1});
+
+my $offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql(
+	'postgres', q{
+	BEGIN;
+	INSERT INTO test_tab_2 values(1);
+	ROLLBACK;
+	});
+
+# Ensure that the messages are serialized.
+$node_subscriber->wait_for_log(
+	qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
+	$offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is aborted on subscriber
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(0), 'all changes are replayed from file');
+
+# Clean up test data from the environment.
+$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab_2");
+$node_publisher->wait_for_catchup($appname);
+
+# Serialize the ABORT sub-transaction.
+# Check the subscriber log from now on.
+$offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql(
+	'postgres', q{
+	BEGIN;
+	INSERT INTO test_tab_2 values(1);
+	SAVEPOINT sp;
+	INSERT INTO test_tab_2 values(1);
+	ROLLBACK TO sp;
+	COMMIT;
+	});
+
+# Ensure that the messages are serialized.
+$node_subscriber->wait_for_log(
+	qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
+	$offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that only sub-transaction is aborted on subscriber.
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(1), 'all changes are replayed from file');
+
 $node_subscriber->stop;
 $node_publisher->stop;
 
diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl
index 497245a209..e1bf63b97c 100644
--- a/src/test/subscription/t/023_twophase_stream.pl
+++ b/src/test/subscription/t/023_twophase_stream.pl
@@ -319,16 +319,18 @@ $node_publisher->safe_psql('postgres',
 	"CREATE TABLE test_tab (a int primary key, b varchar)");
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
 
 # Setup structure on subscriber (columns a and b are compatible with same table name on publisher)
 $node_subscriber->safe_psql('postgres',
 	"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
 );
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
 
 # Setup logical replication (streaming = on)
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
 $node_publisher->safe_psql('postgres',
-	"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+	"CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2");
 
 my $appname = 'tap_sub';
 
@@ -384,6 +386,47 @@ $node_subscriber->safe_psql('postgres', q{SELECT 1});
 
 test_streaming($node_publisher, $node_subscriber, $appname, 1);
 
+# Test serializing changes to files and notify the parallel apply worker to
+# apply them at the end of transaction.
+$node_subscriber->append_conf('postgresql.conf',
+	'logical_replication_mode = immediate');
+$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning");
+$node_subscriber->reload;
+
+# Run a query to make sure that the reload has taken effect.
+$node_subscriber->safe_psql('postgres', q{SELECT 1});
+
+my $offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql(
+	'postgres', q{
+	BEGIN;
+	INSERT INTO test_tab_2 values(1);
+	PREPARE TRANSACTION 'xact';
+	});
+
+# Ensure that the messages are serialized.
+$node_subscriber->wait_for_log(
+	qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
+	$offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# Check that 2PC gets committed on subscriber
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'xact';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is committed on subscriber
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(1), 'all changes are replayed from file');
+
 ###############################
 # check all the cleanup
 ###############################
-- 
2.28.0.windows.1

