From 344720c8c7b976e211239e551c244e66f6f7491c Mon Sep 17 00:00:00 2001
From: Melih Mutlu <m.melihmutlu@gmail.com>
Date: Tue, 4 Jul 2023 22:04:46 +0300
Subject: [PATCH v26 1/2] Reuse Tablesync Workers

Before this patch, tablesync workers were capable of syncing only one
table. For each table, a new sync worker was launched and that worker would
exit when done processing the table.

Now, tablesync workers are not limited to processing only one
table. When done, they can move to processing another table in
the same subscription.

If there is a table that needs to be synced, an available tablesync
worker picks up that table and syncs it. Each tablesync worker
continues to pick new tables to sync until there are no tables left
requiring synchronization. If there was no available worker to
process the table, then a new tablesync worker will be launched,
provided the number of tablesync workers for the subscription does not
exceed max_sync_workers_per_subscription.

Discussion: http://postgr.es/m/CAGPVpCTq=rUDd4JUdaRc1XUWf4BrH2gdSNf3rtOMUGj9rPpfzQ@mail.gmail.com
---
 src/backend/replication/logical/launcher.c  |   1 +
 src/backend/replication/logical/tablesync.c | 117 ++++++++++++++++++--
 src/backend/replication/logical/worker.c    |  49 ++++++--
 src/include/replication/worker_internal.h   |   2 +
 4 files changed, 151 insertions(+), 18 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index e231fa7f95..25dd06b8af 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -440,6 +440,7 @@ retry:
 	worker->stream_fileset = NULL;
 	worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
 	worker->parallel_apply = is_parallel_apply_worker;
+	worker->relsync_completed = false;
 	worker->last_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->last_send_time);
 	TIMESTAMP_NOBEGIN(worker->last_recv_time);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 651a775065..31d4a7db15 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -131,10 +131,12 @@ static StringInfo copybuf = NULL;
 
 /*
  * Exit routine for synchronization worker.
+ *
+ * If reuse_worker is false, at the conclusion of this function the worker
+ * process will exit.
  */
 static void
-pg_attribute_noreturn()
-finish_sync_worker(void)
+finish_sync_worker(bool reuse_worker)
 {
 	/*
 	 * Commit any outstanding transaction. This is the usual case, unless
@@ -146,21 +148,38 @@ finish_sync_worker(void)
 		pgstat_report_stat(true);
 	}
 
+	/*
+	 * Disconnect from the publisher otherwise reusing the sync worker can
+	 * error due to exceeding max_wal_senders.
+	 */
+	if (LogRepWorkerWalRcvConn != NULL)
+	{
+		walrcv_disconnect(LogRepWorkerWalRcvConn);
+		LogRepWorkerWalRcvConn = NULL;
+	}
+
 	/* And flush all writes. */
 	XLogFlush(GetXLogWriteRecPtr());
 
 	StartTransactionCommand();
-	ereport(LOG,
-			(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
-					MySubscription->name,
-					get_rel_name(MyLogicalRepWorker->relid))));
+	if (reuse_worker)
+		ereport(LOG,
+				errmsg("logical replication table synchronization worker for subscription \"%s\" will be reused to sync table \"%s\" with relid %u.",
+						MySubscription->name,
+						get_rel_name(MyLogicalRepWorker->relid),
+						MyLogicalRepWorker->relid));
+	else
+		ereport(LOG,
+				errmsg("logical replication table synchronization worker for subscription \"%s\" has finished",
+						MySubscription->name));
 	CommitTransactionCommand();
 
 	/* Find the leader apply worker and signal it. */
 	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
 
 	/* Stop gracefully */
-	proc_exit(0);
+	if (!reuse_worker)
+		proc_exit(0);
 }
 
 /*
@@ -380,7 +399,15 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		 */
 		replorigin_drop_by_name(originname, true, false);
 
-		finish_sync_worker();
+		/* Sync worker has completed synchronization of the current table. */
+		MyLogicalRepWorker->relsync_completed = true;
+
+		ereport(LOG,
+				errmsg("logical replication table synchronization for subscription \"%s\", relation \"%s\" with relid %u has finished",
+						MySubscription->name,
+						get_rel_name(MyLogicalRepWorker->relid),
+						MyLogicalRepWorker->relid));
+		CommitTransactionCommand();
 	}
 	else
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
@@ -1285,7 +1312,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		case SUBREL_STATE_SYNCDONE:
 		case SUBREL_STATE_READY:
 		case SUBREL_STATE_UNKNOWN:
-			finish_sync_worker();	/* doesn't return */
+			finish_sync_worker(false);	/* doesn't return */
 	}
 
 	/* Calculate the name of the tablesync slot. */
@@ -1643,6 +1670,8 @@ run_tablesync_worker()
 	char	   *slotname = NULL;
 	WalRcvStreamOptions options;
 
+	MyLogicalRepWorker->relsync_completed = false;
+
 	start_table_sync(&origin_startpos, &slotname);
 
 	ReplicationOriginNameForLogicalRep(MySubscription->oid,
@@ -1665,12 +1694,78 @@ void
 TablesyncWorkerMain(Datum main_arg)
 {
 	int			worker_slot = DatumGetInt32(main_arg);
+	bool 		done = false;
 
 	SetupApplyOrSyncWorker(worker_slot);
 
-	run_tablesync_worker();
+	/*
+	 * The loop where worker does its job. It loops until there is no relation
+	 * left to sync.
+	 */
+	for (;!done;)
+	{
+		run_tablesync_worker();
+
+		if (IsTransactionState())
+			CommitTransactionCommand();
+
+		if (MyLogicalRepWorker->relsync_completed)
+		{
+			List	   *rstates;
+			ListCell   *lc;
+
+			/*
+			 * This tablesync worker is 'done' unless another table that needs
+			 * syncing is found.
+			 */
+			done = true;
+
+			/* This transaction will be committed by finish_sync_worker. */
+			StartTransactionCommand();
+
+			/*
+			 * Check if there is any table whose relation state is still INIT.
+			 * If a table in INIT state is found, the worker will not be
+			 * finished, it will be reused instead.
+			 */
+			rstates = GetSubscriptionRelations(MySubscription->oid, true);
+
+			foreach(lc, rstates)
+			{
+				SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
+
+				if (rstate->state == SUBREL_STATE_SYNCDONE)
+					continue;
+
+				/*
+				 * Take exclusive lock to prevent any other sync worker from
+				 * picking the same table.
+				 */
+				LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
+
+				/*
+				 * Pick the table for the next run if it is not already picked
+				 * up by another worker.
+				 */
+				if (!logicalrep_worker_find(MySubscription->oid, rstate->relid, false))
+				{
+					/* Update worker state for the next table */
+					MyLogicalRepWorker->relid = rstate->relid;
+					MyLogicalRepWorker->relstate = rstate->state;
+					MyLogicalRepWorker->relstate_lsn = rstate->lsn;
+					LWLockRelease(LogicalRepWorkerLock);
+
+					/* Found a table for next iteration */
+					finish_sync_worker(true);
+					done = false;
+					break;
+				}
+				LWLockRelease(LogicalRepWorkerLock);
+			}
+		}
+	}
 
-	finish_sync_worker();
+	finish_sync_worker(false);
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index a9f5fa7dfc..32087a00b9 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3621,12 +3621,30 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 					MemoryContextReset(ApplyMessageContext);
 				}
 
+				if (am_tablesync_worker())
+				{
+					/*
+					 * apply_dispatch() may have gone into apply_handle_commit()
+					 * which can call process_syncing_tables_for_sync.
+					 *
+					 * process_syncing_tables_for_sync decides whether the sync of
+					 * the current table is completed. If it is completed,
+					 * streaming must be already ended. So, we can break the loop.
+					 */
+					if (MyLogicalRepWorker->relsync_completed)
+					{
+						endofstream = true;
+						break;
+					}
+				}
+
 				len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 			}
 		}
 
 		/* confirm all writes so far */
-		send_feedback(last_received, false, false);
+		if (!MyLogicalRepWorker->relsync_completed)
+			send_feedback(last_received, false, false);
 
 		if (!in_remote_transaction && !in_streamed_transaction)
 		{
@@ -3640,6 +3658,18 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 			/* Process any table synchronization changes. */
 			process_syncing_tables(last_received);
+
+			if (am_tablesync_worker())
+			{
+				/*
+				 * If relsync_completed is true, this means that the tablesync
+				 * worker is done with synchronization. Streaming has already been
+				 * ended by process_syncing_tables_for_sync. We should move to the
+				 * next table if needed, or exit.
+				 */
+				if (MyLogicalRepWorker->relsync_completed)
+					endofstream = true;
+			}
 		}
 
 		/* Cleanup the memory. */
@@ -3742,8 +3772,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	error_context_stack = errcallback.previous;
 	apply_error_context_stack = error_context_stack;
 
-	/* All done */
-	walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
+	/*
+	 * End streaming here for only apply workers. Ending streaming for
+	 * tablesync workers is deferred until the worker exits its main loop.
+	 */
+	if (!am_tablesync_worker())
+		walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
 }
 
 /*
@@ -4617,13 +4651,14 @@ InitializeLogRepWorker(void)
 
 	if (am_tablesync_worker())
 		ereport(LOG,
-				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
+				errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" with relid %u has started",
 						MySubscription->name,
-						get_rel_name(MyLogicalRepWorker->relid))));
+						get_rel_name(MyLogicalRepWorker->relid),
+						MyLogicalRepWorker->relid));
 	else
 		ereport(LOG,
-				(errmsg("logical replication apply worker for subscription \"%s\" has started",
-						MySubscription->name)));
+				errmsg("logical replication apply worker for subscription \"%s\" has started",
+						MySubscription->name));
 
 	CommitTransactionCommand();
 }
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 672a7117c0..10bea1d533 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -56,6 +56,8 @@ typedef struct LogicalRepWorker
 	char		relstate;
 	XLogRecPtr	relstate_lsn;
 	slock_t		relmutex;
+	bool		relsync_completed; /* has tablesync finished syncing
+									* the assigned table? */
 
 	/*
 	 * Used to create the changes and subxact files for the streaming
-- 
2.25.1

