From 9b94a72da3cf78214b895bdf573c42c0f3ab75c9 Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Mon, 17 Feb 2025 23:57:26 +0530
Subject: [PATCH v15] Add support for two-phase commit in pg_createsubscriber

This patch introduces the '--enable-two-phase' option to the
'pg_createsubscriber' utility, allowing users to enable two-phase commit for
all subscriptions during their creation.

By default, two-phase commit is disabled if the option is not provided.

When two-phase commit is enabled, prepared transactions are sent to the
subscriber at the time of 'PREPARE TRANSACTION', and they are processed as
two-phase transactions on the subscriber as well. If disabled, prepared
transactions are sent only when committed and are processed immediately by the
subscriber.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     | 18 +++++-
 src/bin/pg_basebackup/pg_createsubscriber.c   | 63 ++++++++++++-------
 .../t/040_pg_createsubscriber.pl              | 11 ++++
 3 files changed, 68 insertions(+), 24 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index 26b8e64a4e0..ee3b6678f63 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -165,6 +165,19 @@ PostgreSQL documentation
      </listitem>
     </varlistentry>
 
+    <varlistentry>
+     <term><option>-T</option></term>
+     <term><option>--enable-two-phase</option></term>
+     <listitem>
+      <para>
+       Enables <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>
+       commit for the subscription. When multiple databases are specified, this
+       option applies uniformly to all subscriptions created on those databases.
+       The default is <literal>false</literal>.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>-U <replaceable class="parameter">username</replaceable></option></term>
      <term><option>--subscriber-username=<replaceable class="parameter">username</replaceable></option></term>
@@ -300,7 +313,9 @@ PostgreSQL documentation
     greater than or equal to the number of specified databases.  The target
     server must have <xref linkend="guc-max-worker-processes"/> configured to a
     value greater than the number of specified databases.  The target server
-    must accept local connections.
+    must accept local connections. If you are planning to use the
+    <option>--enable-two-phase</option> switch then you will also need to set
+    the <xref linkend="guc-max-prepared-transactions"/> appropriately.
    </para>
 
    <para>
@@ -360,6 +375,7 @@ PostgreSQL documentation
    </para>
 
    <para>
+    Unless the <option>--enable-two-phase</option> switch is specified,
     <application>pg_createsubscriber</application> sets up logical
     replication with two-phase commit disabled.  This means that any
     prepared transactions will be replicated at the time
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 2d881d54f5b..d8ddcec3159 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -38,6 +38,7 @@ struct CreateSubscriberOptions
 	char	   *socket_dir;		/* directory for Unix-domain socket, if any */
 	char	   *sub_port;		/* subscriber port number */
 	const char *sub_username;	/* subscriber username */
+	bool		two_phase;		/* enable-two-phase option */
 	SimpleStringList database_names;	/* list of database names */
 	SimpleStringList pub_names; /* list of publication names */
 	SimpleStringList sub_names; /* list of subscription names */
@@ -75,18 +76,19 @@ static uint64 get_standby_sysid(const char *datadir);
 static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
 static bool server_is_in_recovery(PGconn *conn);
 static char *generate_object_name(PGconn *conn);
-static void check_publisher(const struct LogicalRepInfo *dbinfo);
-static char *setup_publisher(struct LogicalRepInfo *dbinfo);
+static void check_publisher(const struct LogicalRepInfo *dbinfo, bool two_phase);
+static char *setup_publisher(struct LogicalRepInfo *dbinfo, bool two_phase);
 static void check_subscriber(const struct LogicalRepInfo *dbinfo);
 static void setup_subscriber(struct LogicalRepInfo *dbinfo,
-							 const char *consistent_lsn);
+							 const char *consistent_lsn, bool two_phase);
 static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
 						   const char *lsn);
 static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
 										  const char *slotname);
 static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
 static char *create_logical_replication_slot(PGconn *conn,
-											 struct LogicalRepInfo *dbinfo);
+											 struct LogicalRepInfo *dbinfo,
+											 bool two_phase);
 static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
 								  const char *slot_name);
 static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
@@ -98,7 +100,9 @@ static void wait_for_end_recovery(const char *conninfo,
 								  const struct CreateSubscriberOptions *opt);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
 static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
-static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
+static void create_subscription(PGconn *conn,
+								const struct LogicalRepInfo *dbinfo,
+								bool two_phase);
 static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
 									 const char *lsn);
 static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
@@ -227,6 +231,7 @@ usage(void)
 	printf(_("  -P, --publisher-server=CONNSTR  publisher connection string\n"));
 	printf(_("  -s, --socketdir=DIR             socket directory to use (default current dir.)\n"));
 	printf(_("  -t, --recovery-timeout=SECS     seconds to wait for recovery to end\n"));
+	printf(_("  -T, --enable-two-phase          enable two-phase commit for all subscriptions\n"));
 	printf(_("  -U, --subscriber-username=NAME  user name for subscriber connection\n"));
 	printf(_("  -v, --verbose                   output verbose messages\n"));
 	printf(_("      --config-file=FILENAME      use specified main server configuration\n"
@@ -479,9 +484,10 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
 					 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
 					 dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
 					 dbinfo[i].pubconninfo);
-		pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i,
+		pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
 					 dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
-					 dbinfo[i].subconninfo);
+					 dbinfo[i].subconninfo,
+					 opt->two_phase ? "true" : "false");
 
 		if (num_pubs > 0)
 			pubcell = pubcell->next;
@@ -730,7 +736,7 @@ generate_object_name(PGconn *conn)
  * set_replication_progress).
  */
 static char *
-setup_publisher(struct LogicalRepInfo *dbinfo)
+setup_publisher(struct LogicalRepInfo *dbinfo, bool two_phase)
 {
 	char	   *lsn = NULL;
 
@@ -770,7 +776,7 @@ setup_publisher(struct LogicalRepInfo *dbinfo)
 		/* Create replication slot on publisher */
 		if (lsn)
 			pg_free(lsn);
-		lsn = create_logical_replication_slot(conn, &dbinfo[i]);
+		lsn = create_logical_replication_slot(conn, &dbinfo[i], two_phase);
 		if (lsn != NULL || dry_run)
 			pg_log_info("create replication slot \"%s\" on publisher",
 						dbinfo[i].replslotname);
@@ -837,7 +843,7 @@ server_is_in_recovery(PGconn *conn)
  * XXX Does it not allow a synchronous replica?
  */
 static void
-check_publisher(const struct LogicalRepInfo *dbinfo)
+check_publisher(const struct LogicalRepInfo *dbinfo, bool two_phase)
 {
 	PGconn	   *conn;
 	PGresult   *res;
@@ -932,11 +938,12 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
 		failed = true;
 	}
 
-	if (max_prepared_transactions != 0)
+	if (max_prepared_transactions != 0 && !two_phase)
 	{
 		pg_log_warning("two_phase option will not be enabled for replication slots");
 		pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled.  "
 							  "Prepared transactions will be replicated at COMMIT PREPARED.");
+		pg_log_warning_hint("You can use --enable-two-phase switch to enable two_phase.");
 	}
 
 	pg_free(wal_level);
@@ -1139,7 +1146,8 @@ check_and_drop_existing_subscriptions(PGconn *conn,
  * replication setup.
  */
 static void
-setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
+setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn,
+				 bool two_phase)
 {
 	for (int i = 0; i < num_dbs; i++)
 	{
@@ -1163,7 +1171,7 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
 		 */
 		drop_publication(conn, &dbinfo[i]);
 
-		create_subscription(conn, &dbinfo[i]);
+		create_subscription(conn, &dbinfo[i], two_phase);
 
 		/* Set the replication progress to the correct LSN */
 		set_replication_progress(conn, &dbinfo[i], consistent_lsn);
@@ -1311,7 +1319,8 @@ drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
  * result set that contains the LSN.
  */
 static char *
-create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
+create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
+								bool two_phase)
 {
 	PQExpBuffer str = createPQExpBuffer();
 	PGresult   *res = NULL;
@@ -1327,8 +1336,9 @@ create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
 	slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
 
 	appendPQExpBuffer(str,
-					  "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false)",
-					  slot_name_esc);
+					  "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
+					  slot_name_esc,
+					  two_phase ? "true" : "false");
 
 	PQfreemem(slot_name_esc);
 
@@ -1678,7 +1688,8 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
  * initial location.
  */
 static void
-create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
+create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo,
+					bool two_phase)
 {
 	PQExpBuffer str = createPQExpBuffer();
 	PGresult   *res;
@@ -1700,8 +1711,9 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
 	appendPQExpBuffer(str,
 					  "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
 					  "WITH (create_slot = false, enabled = false, "
-					  "slot_name = %s, copy_data = false)",
-					  subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc);
+					  "slot_name = %s, copy_data = false, two_phase = %s)",
+					  subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
+					  two_phase ? "true" : "false");
 
 	PQfreemem(pubname_esc);
 	PQfreemem(subname_esc);
@@ -1873,6 +1885,7 @@ main(int argc, char **argv)
 		{"publisher-server", required_argument, NULL, 'P'},
 		{"socketdir", required_argument, NULL, 's'},
 		{"recovery-timeout", required_argument, NULL, 't'},
+		{"enable-two-phase", no_argument, NULL, 'T'},
 		{"subscriber-username", required_argument, NULL, 'U'},
 		{"verbose", no_argument, NULL, 'v'},
 		{"version", no_argument, NULL, 'V'},
@@ -1928,6 +1941,7 @@ main(int argc, char **argv)
 	opt.socket_dir = NULL;
 	opt.sub_port = DEFAULT_SUB_PORT;
 	opt.sub_username = NULL;
+	opt.two_phase = false;
 	opt.database_names = (SimpleStringList)
 	{
 		0
@@ -1950,7 +1964,7 @@ main(int argc, char **argv)
 
 	get_restricted_token();
 
-	while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
+	while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:TU:v",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -1987,6 +2001,9 @@ main(int argc, char **argv)
 			case 't':
 				opt.recovery_timeout = atoi(optarg);
 				break;
+			case 'T':
+				opt.two_phase = true;
+				break;
 			case 'U':
 				opt.sub_username = pg_strdup(optarg);
 				break;
@@ -2195,7 +2212,7 @@ main(int argc, char **argv)
 	check_subscriber(dbinfo);
 
 	/* Check if the primary server is ready for logical replication */
-	check_publisher(dbinfo);
+	check_publisher(dbinfo, opt.two_phase);
 
 	/*
 	 * Stop the target server. The recovery process requires that the server
@@ -2208,7 +2225,7 @@ main(int argc, char **argv)
 	stop_standby_server(subscriber_dir);
 
 	/* Create the required objects for each database on publisher */
-	consistent_lsn = setup_publisher(dbinfo);
+	consistent_lsn = setup_publisher(dbinfo, opt.two_phase);
 
 	/* Write the required recovery parameters */
 	setup_recovery(dbinfo, subscriber_dir, consistent_lsn);
@@ -2230,7 +2247,7 @@ main(int argc, char **argv)
 	 * point to the LSN reported by setup_publisher().  It also cleans up
 	 * publications created by this tool and replication to the standby.
 	 */
-	setup_subscriber(dbinfo, consistent_lsn);
+	setup_subscriber(dbinfo, consistent_lsn, opt.two_phase);
 
 	/* Remove primary_slot_name if it exists on primary */
 	drop_primary_replication_slot(dbinfo, primary_slot_name);
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index c8dbdb7e9b7..c35fa108ce3 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -373,6 +373,7 @@ command_ok(
 
 # Run pg_createsubscriber on node S.  --verbose is used twice
 # to show more information.
+# In passing, also test the --enable-two-phase option
 command_ok(
 	[
 		'pg_createsubscriber',
@@ -388,6 +389,7 @@ command_ok(
 		'--replication-slot' => 'replslot2',
 		'--database' => $db1,
 		'--database' => $db2,
+		'--enable-two-phase'
 	],
 	'run pg_createsubscriber on node S');
 
@@ -406,6 +408,15 @@ $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
 # Start subscriber
 $node_s->start;
 
+# Verify that all subtwophase states are pending or enabled,
+# e.g. there are no subscriptions where subtwophase is disabled ('d')
+is( $node_s->safe_psql(
+		'postgres',
+		"SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate = 'd'"
+	),
+	't',
+	'subscriptions are created with the two-phase option enabled');
+
 # Confirm the pre-existing subscription has been removed
 $result = $node_s->safe_psql(
 	'postgres', qq(
-- 
2.41.0.windows.3

