From 981c2a2d39307a42caf921e257d0a0058b104de8 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Thu, 25 Mar 2021 13:27:12 +1100
Subject: [PATCH v66] Change AllTablesyncsReady to return false when 0 tables.

---
 src/backend/replication/logical/tablesync.c | 65 ++++++++++++++---------------
 src/backend/replication/logical/worker.c    | 28 +++----------
 2 files changed, 38 insertions(+), 55 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 679b73f..a1c9949 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -119,7 +119,7 @@
 
 static bool table_states_valid = false;
 static List *table_states_not_ready = NIL;
-static void FetchTableStates(bool *started_tx);
+static int FetchTableStates(bool *started_tx);
 
 StringInfo	copybuf = NULL;
 
@@ -404,36 +404,18 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	 * When this happens, we restart the apply worker and (if the conditions
 	 * are still ok) then the two_phase tri-state will become properly
 	 * 'enabled' at that time.
+	 *
+	 * Note: If the subscription has no tables then leave the state as PENDING,
+	 * which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to work.
 	 */
-	if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
+	if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+		AllTablesyncsReady())
 	{
-		if (AllTablesyncsReady())
-		{
-			List *subrels = NIL;
-			bool become_two_phase_enabled = false;
-
-			if (!started_tx)
-			{
-				StartTransactionCommand();
-				started_tx = true;
-			}
-			subrels = GetSubscriptionRelations(MySubscription->oid);
+		ereport(LOG,
+				(errmsg("logical replication apply worker for subscription \"%s\" will restart so two_phase can be enabled",
+						MySubscription->name)));
 
-			/*
-			 * If there are no tables then leave the state as PENDING, which
-			 * allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to work.
-			 */
-			become_two_phase_enabled = list_length(subrels) > 0;
-
-			if (become_two_phase_enabled)
-			{
-				ereport(LOG,
-						(errmsg("logical replication apply worker for subscription \"%s\" will restart so two_phase can be enabled",
-								MySubscription->name)));
-
-				proc_exit(0);
-			}
-		}
+		proc_exit(0);
 	}
 
 	/*
@@ -1161,15 +1143,20 @@ copy_table_done:
 
 /*
  * Common code to fetch the up-to-date sync state info into the static lists.
+ *
+ * Returns how many tables belong to the subscription.
  */
-static void
+static int
 FetchTableStates(bool *started_tx)
 {
+	static int n_subrels = 0;
+
 	*started_tx = false;
 
 	if (!table_states_valid)
 	{
 		MemoryContext oldctx;
+		List	   *subrels = NIL;
 		List	   *rstates;
 		ListCell   *lc;
 		SubscriptionRelState *rstate;
@@ -1184,6 +1171,10 @@ FetchTableStates(bool *started_tx)
 			*started_tx = true;
 		}
 
+		/* count all subscription tables. */
+		subrels = GetSubscriptionRelations(MySubscription->oid);
+		n_subrels = list_length(subrels);
+
 		/* Fetch all non-ready tables. */
 		rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
 
@@ -1199,10 +1190,14 @@ FetchTableStates(bool *started_tx)
 
 		table_states_valid = true;
 	}
+
+	return n_subrels;
 }
 
 /*
- * Are all tablesyncs READY?
+ * If the subscription has no tables then return false.
+ *
+ * Otherwise, are all tablesyncs READY?
  *
  * Note: This function is not suitable to be called from outside of apply or
  * tablesync workers because MySubscription needs to be already initialized.
@@ -1212,9 +1207,10 @@ AllTablesyncsReady(void)
 {
 	bool		found_busy = false;
 	bool		started_tx = false;
+	int			n_subrels = 0;
 
 	/* We need up-to-date sync state info for subscription tables here. */
-	FetchTableStates(&started_tx);
+	n_subrels = FetchTableStates(&started_tx);
 
 	found_busy = list_length(table_states_not_ready) > 0;
 
@@ -1224,8 +1220,11 @@ AllTablesyncsReady(void)
 		pgstat_report_stat(false);
 	}
 
-	/* When no tablesyncs are busy, then all are READY */
-	return !found_busy;
+	/*
+	 * When there are no tables, then return false.
+	 * When no tablesyncs are busy, then all are READY
+	 */
+	return n_subrels > 0 && !found_busy;
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 634fd92..2cbc70e 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3381,33 +3381,17 @@ ApplyWorkerMain(Datum main_arg)
 
 	if (!am_tablesync_worker())
 	{
-		bool	become_two_phase_enabled = false;
-
 		/*
 		 * Even when the two_phase mode is requested by the user, it remains
 		 * as the tri-state PENDING until all tablesyncs have reached READY
 		 * state. Only then, can it become properly ENABLED.
+		 *
+		 * Note: If the subscription has no tables then leave the state as
+		 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+		 * work.
 		 */
-		if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
-		{
-			if (AllTablesyncsReady())
-			{
-				List *subrels = NIL;
-
-				StartTransactionCommand();
-				subrels = GetSubscriptionRelations(MySubscription->oid);
-				CommitTransactionCommand();
-
-				/*
-				 * If there are no tables then leave the state as PENDING,
-				 * otherwise ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
-				 * not allowed.
-				 */
-				become_two_phase_enabled = list_length(subrels) > 0;
-			}
-		}
-
-		if (become_two_phase_enabled)
+		if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+			AllTablesyncsReady())
 		{
 			/* Start streaming with two_phase enabled */
 			options.proto.logical.twophase = true;
-- 
1.8.3.1

