From b4419ac9ea4bed647b0bc78eae537810364dee15 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Thu, 17 Aug 2023 20:09:00 +0530
Subject: [PATCH v28 4/4] Defect fixes

The following defects are fixed:
1) Setting table to ready immediately if there are no more incremental
   changes to be synced.
2) Recieve the incremental changes if applicable and setting relation
   state to ready without waiting.
3) Reuse the worker if the worker is free before trying to start a new
   table sync worker
---
 src/backend/replication/logical/launcher.c  |   1 +
 src/backend/replication/logical/tablesync.c | 113 ++++++++++++++------
 src/backend/replication/logical/worker.c    |   6 ++
 src/include/replication/worker_internal.h   |   3 +
 4 files changed, 91 insertions(+), 32 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 27fea8cd6f..7b208d22df 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -473,6 +473,7 @@ retry:
 	worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
 	worker->parallel_apply = is_parallel_apply_worker;
 	worker->relsync_completed = false;
+	worker->recv_immediately = false;
 	worker->slot_number = slot;
 	worker->last_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->last_send_time);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index c65f2a64d9..7d45178e2b 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -183,7 +183,8 @@ finish_sync_worker(bool reuse_worker)
  * CATCHUP state to SYNCDONE.
  */
 static bool
-wait_for_relation_state_change(Oid relid, char expected_state)
+wait_for_relation_state_change(Oid relid, char expected_state,
+							   bool check_equal, long timeout)
 {
 	char		state;
 
@@ -201,7 +202,10 @@ wait_for_relation_state_change(Oid relid, char expected_state)
 		if (state == SUBREL_STATE_UNKNOWN)
 			break;
 
-		if (state == expected_state)
+		if (check_equal && state == expected_state)
+			return true;
+
+		if (!check_equal && state != expected_state)
 			return true;
 
 		/* Check if the sync worker is still running and bail if not. */
@@ -214,7 +218,7 @@ wait_for_relation_state_change(Oid relid, char expected_state)
 
 		(void) WaitLatch(MyLatch,
 						 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
-						 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
+						 timeout, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
 
 		ResetLatch(MyLatch);
 	}
@@ -595,13 +599,62 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 					}
 
 					wait_for_relation_state_change(rstate->relid,
-												   SUBREL_STATE_SYNCDONE);
+												   SUBREL_STATE_SYNCDONE, true, 1000L);
+
+					if (current_lsn >= rstate->lsn)
+					{
+						char		originname[NAMEDATALEN];
+
+						rstate->state = SUBREL_STATE_READY;
+						rstate->lsn = current_lsn;
+						if (!started_tx)
+						{
+							StartTransactionCommand();
+							started_tx = true;
+						}
+
+						/*
+						 * Remove the tablesync origin tracking if exists.
+						 *
+						 * There is a chance that the user is concurrently performing
+						 * refresh for the subscription where we remove the table
+						 * state and its origin or the tablesync worker would have
+						 * already removed this origin. We can't rely on tablesync
+						 * worker to remove the origin tracking as if there is any
+						 * error while dropping we won't restart it to drop the
+						 * origin. So passing missing_ok = true.
+						 */
+						ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
+														rstate->relid,
+														originname,
+														sizeof(originname));
+						replorigin_drop_by_name(originname, true, false);
+
+						/*
+						 * Update the state to READY only after the origin cleanup.
+						 */
+						UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+												rstate->relid, rstate->state,
+												rstate->lsn);
+					}
+					else
+					{
+						/*
+						 * There are some transaction data that need to be
+						 * synced for the table to reach READY state.
+						 */
+						MyLogicalRepWorker->recv_immediately = true;
+					}
 				}
 				else
 					LWLockRelease(LogicalRepWorkerLock);
 			}
 			else
 			{
+				TimestampTz now = GetCurrentTimestamp();
+				struct tablesync_start_time_mapping *hentry;
+				bool		found;
+
 				/*
 				 * If there is no sync worker for this table yet, count
 				 * running sync workers for this subscription, while we have
@@ -613,33 +666,11 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				/* Now safe to release the LWLock */
 				LWLockRelease(LogicalRepWorkerLock);
 
-				/*
-				 * If there are free sync worker slot(s), start a new sync
-				 * worker for the table.
-				 */
-				if (nsyncworkers < max_sync_workers_per_subscription)
-				{
-					TimestampTz now = GetCurrentTimestamp();
-					struct tablesync_start_time_mapping *hentry;
-					bool		found;
-
-					hentry = hash_search(last_start_times, &rstate->relid,
-										 HASH_ENTER, &found);
-
-					if (!found ||
-						TimestampDifferenceExceeds(hentry->last_start_time, now,
-												   wal_retrieve_retry_interval))
-					{
-						logicalrep_worker_launch(MyLogicalRepWorker->dbid,
-												 MySubscription->oid,
-												 MySubscription->name,
-												 MyLogicalRepWorker->userid,
-												 rstate->relid,
-												 DSM_HANDLE_INVALID);
-						hentry->last_start_time = now;
-					}
-				}
-				else
+				hentry = hash_search(last_start_times, &rstate->relid,
+										HASH_ENTER, &found);
+				if (!found ||
+					TimestampDifferenceExceeds(hentry->last_start_time, now,
+												wal_retrieve_retry_interval))
 				{
 					/*
 					 * We reached the max_sync_workers_per_subscription limit.
@@ -661,9 +692,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 						{
 							logicalrep_worker_wakeup_ptr(syncworker);
 						}
+
+						LWLockRelease(LogicalRepWorkerLock);
+
+						wait_for_relation_state_change(rstate->relid,
+													   SUBREL_STATE_INIT, false, 10L);
+						hentry->last_start_time = now;
 					}
+					else if (nsyncworkers < max_sync_workers_per_subscription)
+					{
+						LWLockRelease(LogicalRepWorkerLock);
 
-					LWLockRelease(LogicalRepWorkerLock);
+						logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+												MySubscription->oid,
+												MySubscription->name,
+												MyLogicalRepWorker->userid,
+												rstate->relid,
+												DSM_HANDLE_INVALID);
+						hentry->last_start_time = now;
+					}
+					else
+						LWLockRelease(LogicalRepWorkerLock);
 				}
 			}
 		}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d212f9d543..e3be91e5a4 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3682,6 +3682,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		if (endofstream)
 			break;
 
+		if (MyLogicalRepWorker->recv_immediately)
+		{
+			MyLogicalRepWorker->recv_immediately = false;
+			continue;
+		}
+
 		/*
 		 * Wait for more data or latch.  If we have unflushed transactions,
 		 * wake up after WalWriterDelay to see if they've been flushed yet (in
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 0cf2a69fd6..a8d498f5af 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -54,6 +54,9 @@ typedef struct LogicalRepWorker
 	/* Subscription id for the worker. */
 	Oid			subid;
 
+	/* Receive the incremental changes immediately without waiting. */
+	bool		recv_immediately;
+
 	/* Used for initial table synchronization. */
 	Oid			relid;
 	char		relstate;
-- 
2.34.1

