From 5e6681da48ecf045b2227502de5581bf65cc6f8a Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Fri, 4 Dec 2020 11:15:04 +1100
Subject: [PATCH v29] Support 2PC txn - Subscription option.

This patch implements new SUBSCRIPTION option "two_phase".

Usage: CREATE SUBSCRIPTION ... WITH (two_phase = on)

Default is off.

Note: The tablesync worker slot always has two_phase disabled, regardless of the option.
---
 doc/src/sgml/ref/alter_subscription.sgml           |  5 +-
 doc/src/sgml/ref/create_subscription.sgml          | 13 ++++
 src/backend/catalog/pg_subscription.c              |  1 +
 src/backend/catalog/system_views.sql               |  2 +-
 src/backend/commands/subscriptioncmds.c            | 44 ++++++++++--
 .../libpqwalreceiver/libpqwalreceiver.c            |  4 ++
 src/backend/replication/logical/worker.c           |  2 +
 src/backend/replication/pgoutput/pgoutput.c        | 36 +++++++++-
 src/bin/pg_dump/pg_dump.c                          | 16 ++++-
 src/bin/pg_dump/pg_dump.h                          |  1 +
 src/bin/psql/describe.c                            | 10 +--
 src/include/catalog/catversion.h                   |  2 +-
 src/include/catalog/pg_subscription.h              |  3 +
 src/include/replication/logicalproto.h             |  4 ++
 src/include/replication/walreceiver.h              |  1 +
 src/test/regress/expected/subscription.out         | 79 ++++++++++++++--------
 src/test/regress/sql/subscription.sql              | 15 ++++
 src/test/subscription/t/020_twophase.pl            |  3 +-
 src/test/subscription/t/021_twophase_stream.pl     |  2 +-
 src/test/subscription/t/022_twophase_cascade.pl    |  6 +-
 .../subscription/t/023_twophase_cascade_stream.pl  |  4 +-
 21 files changed, 201 insertions(+), 52 deletions(-)

diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index db5e59f..dbe2a43 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -166,8 +166,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       information.  The parameters that can be altered
       are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
-      <literal>binary</literal>, and
-      <literal>streaming</literal>.
+      <literal>binary</literal>,
+      <literal>streaming</literal>, and
+      <literal>two_phase</literal>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index e812bee..0d233f4e 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -239,6 +239,19 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
+       <varlistentry>
+        <term><literal>two_phase</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether two-phase commit is enabled for this subscription.
+          The default is <literal>false</literal>.
+          When two-phase commit is enabled then the decoded transactions are sent
+          to the subscriber on the PREPARE TRANSACTION. When two-phase commit is not
+          enabled then PREPARE TRANSACTION and COMMIT/ROLLBACK PREPARED are not
+          decoded on the publisher.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist></para>
     </listitem>
    </varlistentry>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index ca78d39..886839e 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -67,6 +67,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->enabled = subform->subenabled;
 	sub->binary = subform->subbinary;
 	sub->stream = subform->substream;
+	sub->twophase = subform->subtwophase;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index b140c21..5f4e191 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1149,7 +1149,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 
 -- All columns of pg_subscription except subconninfo are readable.
 REVOKE ALL ON pg_subscription FROM public;
-GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, substream, subslotname, subpublications)
+GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, substream, subtwophase, subslotname, subpublications)
     ON pg_subscription TO public;
 
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 1696454..b0745d5 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -64,7 +64,8 @@ parse_subscription_options(List *options,
 						   char **synchronous_commit,
 						   bool *refresh,
 						   bool *binary_given, bool *binary,
-						   bool *streaming_given, bool *streaming)
+						   bool *streaming_given, bool *streaming,
+						   bool *twophase_given, bool *twophase)
 {
 	ListCell   *lc;
 	bool		connect_given = false;
@@ -105,6 +106,11 @@ parse_subscription_options(List *options,
 		*streaming_given = false;
 		*streaming = false;
 	}
+	if (twophase)
+	{
+		*twophase_given = false;
+		*twophase = false;
+	}
 
 	/* Parse options */
 	foreach(lc, options)
@@ -210,6 +216,15 @@ parse_subscription_options(List *options,
 			*streaming_given = true;
 			*streaming = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "two_phase") == 0 && twophase)
+		{
+			if (*twophase_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			*twophase_given = true;
+			*twophase = defGetBoolean(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -355,6 +370,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	bool		copy_data;
 	bool		streaming;
 	bool		streaming_given;
+	bool		twophase;
+	bool		twophase_given;
 	char	   *synchronous_commit;
 	char	   *conninfo;
 	char	   *slotname;
@@ -379,7 +396,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 							   &synchronous_commit,
 							   NULL,	/* no "refresh" */
 							   &binary_given, &binary,
-							   &streaming_given, &streaming);
+							   &streaming_given, &streaming,
+							   &twophase_given, &twophase);
 
 	/*
 	 * Since creating a replication slot is not transactional, rolling back
@@ -447,6 +465,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
 	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
 	values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming);
+	values[Anum_pg_subscription_subtwophase - 1] = BoolGetDatum(twophase);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (slotname)
@@ -720,6 +739,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				bool		binary;
 				bool		streaming_given;
 				bool		streaming;
+				bool		twophase_given;
+				bool		twophase;
 
 				parse_subscription_options(stmt->options,
 										   NULL,	/* no "connect" */
@@ -730,7 +751,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 										   &synchronous_commit,
 										   NULL,	/* no "refresh" */
 										   &binary_given, &binary,
-										   &streaming_given, &streaming);
+										   &streaming_given, &streaming,
+										   &twophase_given, &twophase);
 
 				if (slotname_given)
 				{
@@ -769,6 +791,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 					replaces[Anum_pg_subscription_substream - 1] = true;
 				}
 
+				if (twophase_given)
+				{
+					values[Anum_pg_subscription_subtwophase - 1] =
+						BoolGetDatum(twophase);
+					replaces[Anum_pg_subscription_subtwophase - 1] = true;
+				}
+
 				update_tuple = true;
 				break;
 			}
@@ -787,7 +816,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 										   NULL,	/* no "synchronous_commit" */
 										   NULL,	/* no "refresh" */
 										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no streaming */
+										   NULL, NULL,	/* no "streaming" */
+										   NULL, NULL);	/* no "two_phase" */
 				Assert(enabled_given);
 
 				if (!sub->slotname && enabled)
@@ -832,7 +862,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 										   NULL,	/* no "synchronous_commit" */
 										   &refresh,
 										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no "streaming" */
+										   NULL, NULL,	/* no "streaming" */
+										   NULL, NULL);	/* no "two_phase" */
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(stmt->publication);
 				replaces[Anum_pg_subscription_subpublications - 1] = true;
@@ -875,7 +906,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 										   NULL,	/* no "synchronous_commit" */
 										   NULL,	/* no "refresh" */
 										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no "streaming" */
+										   NULL, NULL,	/* no "streaming" */
+										   NULL, NULL);	/* no "two_phase" */
 
 				AlterSubscription_refresh(sub, copy_data);
 
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 24f8b3e..1f404cd 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -429,6 +429,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			PQserverVersion(conn->streamConn) >= 140000)
 			appendStringInfoString(&cmd, ", streaming 'on'");
 
+		if (options->proto.logical.twophase &&
+			PQserverVersion(conn->streamConn) >= 140000)
+			appendStringInfoString(&cmd, ", two_phase 'on'");
+
 		pubnames = options->proto.logical.publication_names;
 		pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
 		if (!pubnames_str)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 2416b85..7a6c594 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2702,6 +2702,7 @@ maybe_reread_subscription(void)
 		strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
 		newsub->binary != MySubscription->binary ||
 		newsub->stream != MySubscription->stream ||
+		(!am_tablesync_worker() && newsub->twophase != MySubscription->twophase) ||
 		!equal(newsub->publications, MySubscription->publications))
 	{
 		ereport(LOG,
@@ -3348,6 +3349,7 @@ ApplyWorkerMain(Datum main_arg)
 	options.proto.logical.publication_names = MySubscription->publications;
 	options.proto.logical.binary = MySubscription->binary;
 	options.proto.logical.streaming = MySubscription->stream;
+	options.proto.logical.twophase = MySubscription->twophase && !am_tablesync_worker();
 
 	/* Start normal logical streaming replication. */
 	walrcv_startstreaming(wrconn, &options);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 71ac431..7651a62 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -172,13 +172,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 static void
 parse_output_parameters(List *options, uint32 *protocol_version,
 						List **publication_names, bool *binary,
-						bool *enable_streaming)
+						bool *enable_streaming,
+						bool *enable_twophase)
 {
 	ListCell   *lc;
 	bool		protocol_version_given = false;
 	bool		publication_names_given = false;
 	bool		binary_option_given = false;
 	bool		streaming_given = false;
+	bool		twophase_given = false;
 
 	*binary = false;
 
@@ -246,6 +248,16 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 
 			*enable_streaming = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "two_phase") == 0)
+		{
+			if (twophase_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			twophase_given = true;
+
+			*enable_twophase = defGetBoolean(defel);
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -259,6 +271,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 				 bool is_init)
 {
 	bool		enable_streaming = false;
+	bool		enable_twophase = false;
 	PGOutputData *data = palloc0(sizeof(PGOutputData));
 
 	/* Create our memory context for private allocations. */
@@ -283,7 +296,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 								&data->protocol_version,
 								&data->publication_names,
 								&data->binary,
-								&enable_streaming);
+								&enable_streaming,
+								&enable_twophase);
 
 		/* Check if we support requested protocol */
 		if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
@@ -324,6 +338,24 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		/* Also remember we're currently not streaming any transaction. */
 		in_streaming = false;
 
+		/*
+		 * Decide whether to enable two-phase commit. It is disabled by default, in
+		 * which case we just update the flag in decoding context. Otherwise
+		 * we only allow it with sufficient version of the protocol, and when
+		 * the output plugin supports it.
+		 */
+		if (!enable_twophase)
+			ctx->twophase = false;
+		else if (data->protocol_version < LOGICALREP_PROTO_2PC_VERSION_NUM)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
+							data->protocol_version, LOGICALREP_PROTO_2PC_VERSION_NUM)));
+		else if (!ctx->twophase)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("two-phase commit requested, but not supported by output plugin")));
+
 		/* Init publication state. */
 		data->publications = NIL;
 		publications_valid = false;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index dc1d41d..97f0dd5 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4221,6 +4221,7 @@ getSubscriptions(Archive *fout)
 	int			i_subname;
 	int			i_rolname;
 	int			i_substream;
+	int			i_subtwophase;
 	int			i_subconninfo;
 	int			i_subslotname;
 	int			i_subsynccommit;
@@ -4264,9 +4265,14 @@ getSubscriptions(Archive *fout)
 		appendPQExpBufferStr(query, " false AS subbinary,\n");
 
 	if (fout->remoteVersion >= 140000)
-		appendPQExpBufferStr(query, " s.substream\n");
+		appendPQExpBufferStr(query, " s.substream,\n");
 	else
-		appendPQExpBufferStr(query, " false AS substream\n");
+		appendPQExpBufferStr(query, " false AS substream,\n");
+
+	if (fout->remoteVersion >= 140000)
+		appendPQExpBufferStr(query, " s.subtwophase\n");
+	else
+		appendPQExpBufferStr(query, " false AS subtwophase\n");
 
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n"
@@ -4287,6 +4293,7 @@ getSubscriptions(Archive *fout)
 	i_subpublications = PQfnumber(res, "subpublications");
 	i_subbinary = PQfnumber(res, "subbinary");
 	i_substream = PQfnumber(res, "substream");
+	i_subtwophase = PQfnumber(res, "subtwophase");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4312,6 +4319,8 @@ getSubscriptions(Archive *fout)
 			pg_strdup(PQgetvalue(res, i, i_subbinary));
 		subinfo[i].substream =
 			pg_strdup(PQgetvalue(res, i, i_substream));
+		subinfo[i].subtwophase =
+			pg_strdup(PQgetvalue(res, i, i_subtwophase));
 
 		if (strlen(subinfo[i].rolname) == 0)
 			pg_log_warning("owner of subscription \"%s\" appears to be invalid",
@@ -4380,6 +4389,9 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo)
 	if (strcmp(subinfo->substream, "f") != 0)
 		appendPQExpBufferStr(query, ", streaming = on");
 
+	if (strcmp(subinfo->subtwophase, "f") != 0)
+		appendPQExpBufferStr(query, ", two_phase = on");
+	
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
 		appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 317bb83..22e4e6c 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -629,6 +629,7 @@ typedef struct _SubscriptionInfo
 	char	   *subslotname;
 	char	   *subbinary;
 	char	   *substream;
+	char	   *subtwophase;
 	char	   *subsynccommit;
 	char	   *subpublications;
 } SubscriptionInfo;
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 14150d0..47306a2 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -5997,7 +5997,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false, false, false};
+	false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6023,13 +6023,15 @@ describeSubscriptions(const char *pattern, bool verbose)
 
 	if (verbose)
 	{
-		/* Binary mode and streaming are only supported in v14 and higher */
+		/* Binary mode and streaming and Two phase commit are only supported in v14 and higher */
 		if (pset.sversion >= 140000)
 			appendPQExpBuffer(&buf,
 							  ", subbinary AS \"%s\"\n"
-							  ", substream AS \"%s\"\n",
+							  ", substream AS \"%s\"\n"
+							  ", subtwophase AS \"%s\"\n",
 							  gettext_noop("Binary"),
-							  gettext_noop("Streaming"));
+							  gettext_noop("Streaming"),
+							  gettext_noop("Two phase commit"));
 
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index a16cc38..bc1d8fd 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
  */
 
 /*							yyyymmddN */
-#define CATALOG_VERSION_NO	202011251
+#define CATALOG_VERSION_NO	202011271
 
 #endif
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 3fa02af..e07eed0 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -53,6 +53,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	bool		substream;		/* Stream in-progress transactions. */
 
+	bool		subtwophase;	/* Decode 2PC PREPARE? */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -90,6 +92,7 @@ typedef struct Subscription
 	bool		binary;			/* Indicates if the subscription wants data in
 								 * binary format */
 	bool		stream;			/* Allow streaming in-progress transactions. */
+	bool		twophase;		/* Decode 2PC PREPARE? */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index c04d872..4b0afac 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -28,10 +28,14 @@
  *
  * LOGICALREP_PROTO_STREAM_VERSION_NUM is the minimum protocol version with
  * support for streaming large transactions.
+ *
+ * LOGICALREP_PROTO_2PC_VERSION_NUM is the minimum protocol version with
+ * support for two-phase commit PREPARE decoding.
  */
 #define LOGICALREP_PROTO_MIN_VERSION_NUM 1
 #define LOGICALREP_PROTO_VERSION_NUM 1
 #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
+#define LOGICALREP_PROTO_2PC_VERSION_NUM 2
 #define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM
 
 /*
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 1b05b39..f96c891 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -179,6 +179,7 @@ typedef struct
 			List	   *publication_names;	/* String list of publications */
 			bool		binary; /* Ask publisher to use binary */
 			bool		streaming;	/* Streaming of large transactions */
+			bool		twophase;	/* Enable 2PC decoding of PREPARE */
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 2fa9bce..23d876e 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                            List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | off                | dbname=regress_doesnotexist
+                                                                     List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -91,10 +91,10 @@ ERROR:  subscription "regress_doesnotexist" does not exist
 ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
 ERROR:  unrecognized subscription parameter: "create_slot"
 \dRs+
-                                                                List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Synchronous commit |           Conninfo           
------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | off                | dbname=regress_doesnotexist2
+                                                                          List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Synchronous commit |           Conninfo           
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | f                | off                | dbname=regress_doesnotexist2
 (1 row)
 
 BEGIN;
@@ -126,10 +126,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                  List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Synchronous commit |           Conninfo           
----------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | local              | dbname=regress_doesnotexist2
+                                                                            List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Synchronous commit |           Conninfo           
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | f                | local              | dbname=regress_doesnotexist2
 (1 row)
 
 -- rename back to keep the rest simple
@@ -162,19 +162,19 @@ ERROR:  binary requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                            List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | off                | dbname=regress_doesnotexist
+                                                                     List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                            List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | off                | dbname=regress_doesnotexist
+                                                                     List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -185,19 +185,42 @@ ERROR:  streaming requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                            List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | off                | dbname=regress_doesnotexist
+                                                                     List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                            List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | off                | dbname=regress_doesnotexist
+                                                                     List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | f                | off                | dbname=regress_doesnotexist
+(1 row)
+
+DROP SUBSCRIPTION regress_testsub;
+-- fail - two_phase must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = foo);
+ERROR:  two_phase requires a Boolean value
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+
+                                                                     List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | t                | off                | dbname=regress_doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+\dRs+
+                                                                     List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 14fa0b2..2a0b366 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -147,6 +147,21 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 
 DROP SUBSCRIPTION regress_testsub;
 
+-- fail - two_phase must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = foo);
+
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+
+\dRs+
+
+DROP SUBSCRIPTION regress_testsub;
+
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/t/020_twophase.pl b/src/test/subscription/t/020_twophase.pl
index 9c1d681..a680c1a 100644
--- a/src/test/subscription/t/020_twophase.pl
+++ b/src/test/subscription/t/020_twophase.pl
@@ -47,7 +47,8 @@ my $appname = 'tap_sub';
 $node_subscriber->safe_psql('postgres',	"
 	CREATE SUBSCRIPTION tap_sub
 	CONNECTION '$publisher_connstr application_name=$appname'
-	PUBLICATION tap_pub");
+	PUBLICATION tap_pub
+	WITH (two_phase = on)");
 
 # Wait for subscriber to finish initialization
 my $caughtup_query =
diff --git a/src/test/subscription/t/021_twophase_stream.pl b/src/test/subscription/t/021_twophase_stream.pl
index 9ec1e31..a2d4824 100644
--- a/src/test/subscription/t/021_twophase_stream.pl
+++ b/src/test/subscription/t/021_twophase_stream.pl
@@ -41,7 +41,7 @@ $node_subscriber->safe_psql('postgres', "
 	CREATE SUBSCRIPTION tap_sub
 	CONNECTION '$publisher_connstr application_name=$appname'
 	PUBLICATION tap_pub
-	WITH (streaming = on)");
+	WITH (streaming = on, two_phase = on)");
 
 # Wait for subscriber to finish initialization
 my $caughtup_query =
diff --git a/src/test/subscription/t/022_twophase_cascade.pl b/src/test/subscription/t/022_twophase_cascade.pl
index 0f95530..9fb461b 100644
--- a/src/test/subscription/t/022_twophase_cascade.pl
+++ b/src/test/subscription/t/022_twophase_cascade.pl
@@ -54,7 +54,8 @@ my $appname_B = 'tap_sub_B';
 $node_B->safe_psql('postgres',	"
 	CREATE SUBSCRIPTION tap_sub_B
 	CONNECTION '$node_A_connstr application_name=$appname_B'
-	PUBLICATION tap_pub_A");
+	PUBLICATION tap_pub_A
+	WITH (two_phase = on)");
 
 # node_B (pub) -> node_C (sub)
 my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
@@ -66,7 +67,8 @@ my $appname_C = 'tap_sub_C';
 $node_C->safe_psql('postgres',	"
 	CREATE SUBSCRIPTION tap_sub_C
 	CONNECTION '$node_B_connstr application_name=$appname_C'
-	PUBLICATION tap_pub_B");
+	PUBLICATION tap_pub_B
+	WITH (two_phase = on)");
 
 # Wait for subscribers to finish initialization
 my $caughtup_query_B =
diff --git a/src/test/subscription/t/023_twophase_cascade_stream.pl b/src/test/subscription/t/023_twophase_cascade_stream.pl
index 3c6470d..ffba03f 100644
--- a/src/test/subscription/t/023_twophase_cascade_stream.pl
+++ b/src/test/subscription/t/023_twophase_cascade_stream.pl
@@ -56,7 +56,7 @@ $node_B->safe_psql('postgres',	"
 	CREATE SUBSCRIPTION tap_sub_B
 	CONNECTION '$node_A_connstr application_name=$appname_B'
 	PUBLICATION tap_pub_A
-	WITH (streaming = on)");
+	WITH (streaming = on, two_phase = on)");
 
 # node_B (pub) -> node_C (sub)
 my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
@@ -69,7 +69,7 @@ $node_C->safe_psql('postgres',	"
 	CREATE SUBSCRIPTION tap_sub_C
 	CONNECTION '$node_B_connstr application_name=$appname_C'
 	PUBLICATION tap_pub_B
-	WITH (streaming = on)");
+	WITH (streaming = on, two_phase = on)");
 
 # Wait for subscribers to finish initialization
 my $caughtup_query_B =
-- 
1.8.3.1

