From 63b859a9b0d1003b4466bf118ac7cd0c899cb248 Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Mon, 2 Jan 2023 15:37:25 +0530
Subject: [PATCH v79 3/4] Add GUC stream_serialize_threshold and test
 serializing messages to disk.

---
 doc/src/sgml/config.sgml                           |  32 +++++
 .../replication/logical/applyparallelworker.c      |  12 ++
 src/backend/replication/logical/worker.c           |   9 ++
 src/backend/utils/misc/guc_tables.c                |  14 ++
 src/include/replication/worker_internal.h          |   4 +
 src/test/subscription/t/015_stream.pl              | 144 ++++++++++++++++++++-
 6 files changed, 212 insertions(+), 3 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 77574e2..0f24410 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -11683,6 +11683,38 @@ LOG:  CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1)
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-stream-serialize-threshold" xreflabel="stream_serialize_threshold">
+      <term><varname>stream_serialize_threshold</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>stream_serialize_threshold</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Forces the leader apply worker to serialize messages to files after
+        sending specified amount of streaming chunks to the parallel apply
+        worker. Setting this to zero serialize all messages. A value of
+        <literal>-1</literal> (the default) disables this feature. This is
+        intended to test serialization to files with
+        <literal>streaming = parallel</literal>.
+       </para>
+
+       <para>
+        When logical replication subscription <literal>streaming</literal>
+        parameter is set to <literal>parallel</literal>, the leader apply worker
+        sends messages to parallel workers with a timeout. By default, the
+        leader apply worker will serialize the remaining messages to files if
+        the timeout is exceeded. If this option is set to any value other than
+        <literal>-1</literal>, serialize to files even without timeout.
+       </para>
+
+       <para>
+        This parameter can only be set in the <filename>postgresql.conf</filename>
+        file or on the server command line.
+       </para>
+      </listitem>
+     </varlistentry>
+
     </variablelist>
   </sect1>
   <sect1 id="runtime-config-short">
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 4fa2b62..777926c 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -254,6 +254,9 @@ static ParallelApplyWorkerInfo *stream_apply_worker = NULL;
 /* A list to maintain subtransactions, if any. */
 static List *subxactlist = NIL;
 
+/* GUC variable */
+int			stream_serialize_threshold = -1;
+
 static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
 static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
 static PartialFileSetState pa_get_fileset_state(void);
@@ -1187,6 +1190,15 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
 	Assert(!IsTransactionState());
 	Assert(!winfo->serialize_changes);
 
+	/* Force to serialize messages if stream_serialize_threshold is reached. */
+	if (stream_serialize_threshold != -1 &&
+		(stream_serialize_threshold == 0 ||
+		 stream_serialize_threshold < parallel_stream_nchunks))
+	{
+		parallel_stream_nchunks = 0;
+		return false;
+	}
+
 /*
  * This timeout is a bit arbitrary but testing revealed that it is sufficient
  * to send the message unless the parallel apply worker is waiting on some
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c7be76d..5c8ce97 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -328,6 +328,12 @@ static TransactionId stream_xid = InvalidTransactionId;
 static uint32 parallel_stream_nchanges = 0;
 
 /*
+ * The number of streaming chunks sent by leader apply worker during one
+ * streamed transaction. This is only used when stream_serialize_threshold > 0.
+ */
+uint32		parallel_stream_nchunks = 0;
+
+/*
  * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
  * the subscription if the remote transaction's finish LSN matches the subskiplsn.
  * Once we start skipping changes, we don't stop it until we skip all changes of
@@ -1521,6 +1527,9 @@ apply_handle_stream_start(StringInfo s)
 		case TRANS_LEADER_SEND_TO_PARALLEL:
 			Assert(winfo);
 
+			if (stream_serialize_threshold > 0)
+				parallel_stream_nchunks++;
+
 			/*
 			 * Once we start serializing the changes, the parallel apply
 			 * worker will wait for the leader to release the stream lock
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 5025e80..15d3781 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -61,6 +61,7 @@
 #include "replication/logicallauncher.h"
 #include "replication/slot.h"
 #include "replication/syncrep.h"
+#include "replication/worker_internal.h"
 #include "storage/bufmgr.h"
 #include "storage/large_object.h"
 #include "storage/pg_shmem.h"
@@ -3015,6 +3016,19 @@ struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"stream_serialize_threshold", PGC_SIGHUP, DEVELOPER_OPTIONS,
+			gettext_noop("Forces the leader apply worker to serialize messages "
+						 "to files after sending specified amount of streaming "
+						 "chunks in streaming parallel mode."),
+			gettext_noop("A value of -1 disables this feature."),
+			GUC_NOT_IN_SAMPLE
+		},
+		&stream_serialize_threshold,
+		-1, -1, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"log_rotation_age", PGC_SIGHUP, LOGGING_WHERE,
 			gettext_noop("Sets the amount of time to wait before forcing "
 						 "log file rotation."),
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 34e5006..7f6a9f9 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -225,6 +225,10 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
 
 extern PGDLLIMPORT bool in_remote_transaction;
 
+extern PGDLLIMPORT int stream_serialize_threshold;
+
+extern PGDLLIMPORT uint32 parallel_stream_nchunks;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl
index 91e8aa8..83d6956 100644
--- a/src/test/subscription/t/015_stream.pl
+++ b/src/test/subscription/t/015_stream.pl
@@ -133,13 +133,20 @@ sub test_streaming
 # Create publisher node
 my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
 $node_publisher->init(allows_streaming => 'logical');
-$node_publisher->append_conf('postgresql.conf',
-	'logical_decoding_work_mem = 64kB');
+$node_publisher->append_conf(
+	'postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
 $node_publisher->start;
 
 # Create subscriber node
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf(
+	'postgresql.conf', qq(
+max_prepared_transactions = 10
+));
 $node_subscriber->start;
 
 # Create some preexisting content on publisher
@@ -170,7 +177,7 @@ my $appname = 'tap_sub';
 # Test using streaming mode 'on'
 ################################
 $node_subscriber->safe_psql('postgres',
-	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on, two_phase = on)"
 );
 
 # Wait for initial table sync to finish
@@ -312,6 +319,137 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
 is($result, qq(10000), 'data replicated to subscriber after dropping index');
 
+# Clean up test data from the environment.
+$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab_2");
+$node_publisher->wait_for_catchup($appname);
+
+# Test serializing messages to disk
+
+# Set stream_serialize_threshold to zero, so the messages will be serialized to disk.
+$node_subscriber->safe_psql('postgres',
+	'ALTER SYSTEM SET stream_serialize_threshold = 0;');
+$node_subscriber->reload;
+
+# Run a query to make sure that the reload has taken effect.
+$node_subscriber->safe_psql('postgres', q{SELECT 1});
+
+# Serialize the COMMIT transaction.
+# Check the subscriber log from now on.
+$offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i)");
+
+# Ensure that the messages are serialized.
+$node_subscriber->wait_for_log(
+	qr/DEBUG: ( [A-Z0-9]+:)? opening file ".*\.changes" for streamed changes/,
+	$offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is committed on subscriber
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(5000),
+	'data replicated to subscriber by serializing messages to disk');
+
+# Clean up test data from the environment.
+$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab_2");
+$node_publisher->wait_for_catchup($appname);
+
+# Serialize the PREPARE transaction.
+# Check the subscriber log from now on.
+$offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql(
+	'postgres', q{
+	BEGIN;
+	INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i);
+	PREPARE TRANSACTION 'xact';
+	});
+
+# Ensure that the messages are serialized.
+$node_subscriber->wait_for_log(
+	qr/DEBUG: ( [A-Z0-9]+:)? opening file ".*\.changes" for streamed changes/,
+	$offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# Check that 2PC gets committed on subscriber
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'xact';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is committed on subscriber
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(5000),
+	'data replicated to subscriber by serializing messages to disk');
+
+# Clean up test data from the environment.
+$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab_2");
+$node_publisher->wait_for_catchup($appname);
+
+# Serialize the ABORT top-transaction.
+# Check the subscriber log from now on.
+$offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql(
+	'postgres', q{
+	BEGIN;
+	INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i);
+	ROLLBACK;
+	});
+
+# Ensure that the messages are serialized.
+$node_subscriber->wait_for_log(
+	qr/DEBUG: ( [A-Z0-9]+:)? opening file ".*\.changes" for streamed changes/,
+	$offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is aborted on subscriber
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(0),
+	'data replicated to subscriber by serializing messages to disk');
+
+# Clean up test data from the environment.
+$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab_2");
+$node_publisher->wait_for_catchup($appname);
+
+# Serialize the ABORT sub-transaction.
+# Check the subscriber log from now on.
+$offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql(
+	'postgres', q{
+	BEGIN;
+	INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i);
+	SAVEPOINT sp;
+	INSERT INTO test_tab_2 SELECT i FROM generate_series(5001, 10000) s(i);
+	ROLLBACK TO sp;
+	COMMIT;
+	});
+
+# Ensure that the messages are serialized.
+$node_subscriber->wait_for_log(
+	qr/DEBUG: ( [A-Z0-9]+:)? opening file ".*\.changes" for streamed changes/,
+	$offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that only sub-transaction is aborted on subscriber.
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(5000),
+	'data replicated to subscriber by serializing messages to disk');
+
 $node_subscriber->stop;
 $node_publisher->stop;
 
-- 
2.7.2.windows.1

