From c3989215cd3c1d1efa790e96ad9fa537a6b8da4e Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Thu, 17 Aug 2023 14:09:17 +0530
Subject: [PATCH v28 3/4] apply worker assigns tables

apply worker assigns tables
---
 src/backend/replication/logical/launcher.c  |  32 ++++++
 src/backend/replication/logical/tablesync.c | 110 +++++++++++---------
 src/include/replication/worker_internal.h   |   1 +
 3 files changed, 93 insertions(+), 50 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index b19437f9d0..27fea8cd6f 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -297,6 +297,38 @@ logicalrep_workers_find(Oid subid, bool only_running)
 	return res;
 }
 
+/*
+ * Return a logical rep worker in ready state
+ */
+LogicalRepWorker *
+logicalrep_worker_find_syncdone(Oid subid, bool only_running)
+{
+	int			i;
+	LogicalRepWorker *res = NULL;
+
+	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
+	/* Search for attached worker for a given subscription id. */
+	for (i = 0; i < max_logical_replication_workers; i++)
+	{
+		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+		/* Skip parallel apply workers. */
+		if (isParallelApplyWorker(w))
+			continue;
+
+		if (w->in_use && w->subid == subid &&
+			w->relstate == SUBREL_STATE_SYNCDONE &&
+			(!only_running || w->proc))
+		{
+			res = w;
+			break;
+		}
+	}
+
+	return res;
+}
+
 /*
  * Start new logical replication background worker, if possible.
  *
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index ec2f67d879..c65f2a64d9 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -241,6 +241,12 @@ wait_for_worker_state_change(char expected_state)
 
 		CHECK_FOR_INTERRUPTS();
 
+		/* No table needs sync anymore. Apply worker wants this sync worker to exit. */
+		if (!OidIsValid(MyLogicalRepWorker->relid))
+		{
+			return false;
+		}
+
 		/*
 		 * Done if already in correct state.  (We assume this fetch is atomic
 		 * enough to not give a misleading answer if we do it with no lock.)
@@ -463,8 +469,28 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	 */
 	else if (table_states_not_ready == NIL && last_start_times)
 	{
+		List       *workers;
+
 		hash_destroy(last_start_times);
 		last_start_times = NULL;
+
+		/* Let all sync workers exit */
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		workers = logicalrep_workers_find(MyLogicalRepWorker->subid, false);
+		foreach(lc, workers)
+		{
+			LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+
+			if (OidIsValid(w->relid))
+			{
+					SpinLockAcquire(&w->relmutex);
+					w->relid = InvalidOid;
+					SpinLockRelease(&w->relmutex);
+					logicalrep_worker_wakeup_ptr(w);
+			}
+		}
+
+		LWLockRelease(LogicalRepWorkerLock);
 	}
 
 	/*
@@ -613,6 +639,32 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 						hentry->last_start_time = now;
 					}
 				}
+				else
+				{
+					/*
+					 * We reached the max_sync_workers_per_subscription limit.
+					 * Check if there is an existing sync worker waiting for
+					 * new table to sync.
+					 */
+					LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+					syncworker = logicalrep_worker_find_syncdone(MyLogicalRepWorker->subid, false);
+					if (syncworker)
+					{
+						SpinLockAcquire(&syncworker->relmutex);
+						syncworker->relid = rstate->relid;
+						syncworker->relstate = rstate->state;
+						syncworker->relstate_lsn = rstate->lsn;
+						SpinLockRelease(&syncworker->relmutex);
+
+						if (syncworker->proc)
+						{
+							logicalrep_worker_wakeup_ptr(syncworker);
+						}
+					}
+
+					LWLockRelease(LogicalRepWorkerLock);
+				}
 			}
 		}
 	}
@@ -1731,59 +1783,17 @@ TablesyncWorkerMain(Datum main_arg)
 		if (IsTransactionState())
 			CommitTransactionCommand();
 
+		finish_sync_worker(true);
+
 		if (MyLogicalRepWorker->relsync_completed)
 		{
-			List	   *rstates;
-			ListCell   *lc;
-
-			/*
-			 * This tablesync worker is 'done' unless another table that needs
-			 * syncing is found.
-			 */
-			done = true;
-
-			/* This transaction will be committed by finish_sync_worker. */
-			StartTransactionCommand();
-
-			/*
-			 * Check if there is any table whose relation state is still INIT.
-			 * If a table in INIT state is found, the worker will not be
-			 * finished, it will be reused instead.
-			 */
-			rstates = GetSubscriptionRelations(MySubscription->oid, true);
-
-			foreach(lc, rstates)
-			{
-				SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
-
-				if (rstate->state == SUBREL_STATE_SYNCDONE)
-					continue;
-
-				/*
-				 * Take exclusive lock to prevent any other sync worker from
-				 * picking the same table.
-				 */
-				LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
-
-				/*
-				 * Pick the table for the next run if it is not already picked
-				 * up by another worker.
-				 */
-				if (!logicalrep_worker_find(MySubscription->oid, rstate->relid, false))
-				{
-					/* Update worker state for the next table */
-					MyLogicalRepWorker->relid = rstate->relid;
-					MyLogicalRepWorker->relstate = rstate->state;
-					MyLogicalRepWorker->relstate_lsn = rstate->lsn;
-					LWLockRelease(LogicalRepWorkerLock);
+			/* wait for apply worker to assign a new table with INIT state. */
+			wait_for_worker_state_change(SUBREL_STATE_INIT);
+		}
 
-					/* Found a table for next iteration */
-					finish_sync_worker(true);
-					done = false;
-					break;
-				}
-				LWLockRelease(LogicalRepWorkerLock);
-			}
+		if (!OidIsValid(MyLogicalRepWorker->relid))
+		{
+			break;
 		}
 	}
 
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 47fa7fbd55..0cf2a69fd6 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -237,6 +237,7 @@ extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
 extern List *logicalrep_workers_find(Oid subid, bool only_running);
+extern LogicalRepWorker * logicalrep_worker_find_syncdone(Oid subid, bool only_running);
 extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
 									 Oid userid, Oid relid,
 									 dsm_handle subworker_dsm);
-- 
2.34.1

