From 09a9b666ee3df60b2662769e200643527dfc9435 Mon Sep 17 00:00:00 2001
From: Melih Mutlu <m.melihmutlu@gmail.com>
Date: Thu, 2 Jun 2022 17:39:37 +0300
Subject: [PATCH] Reuse Logical Replication Background worker

---
 src/backend/catalog/pg_subscription.c       |  59 ++++
 src/backend/commands/subscriptioncmds.c     | 164 ++++++----
 src/backend/replication/logical/launcher.c  |   3 +
 src/backend/replication/logical/tablesync.c | 118 +++++--
 src/backend/replication/logical/worker.c    | 338 ++++++++++++--------
 src/include/catalog/pg_subscription_rel.h   |   1 +
 src/include/replication/slot.h              |   3 +-
 src/include/replication/worker_internal.h   |  14 +
 8 files changed, 488 insertions(+), 212 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 8856ce3b50..81f8ab6cbf 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -635,3 +635,62 @@ GetSubscriptionNotReadyRelations(Oid subid)
 
 	return res;
 }
+
+/*
+ * Get all relations for subscription that are in init state.
+ *
+ * Returned list is palloc'ed in current memory context.
+ */
+List *
+GetSubscriptionInitStateRelations(Oid subid)
+{
+	List	   *res = NIL;
+	Relation	rel;
+	HeapTuple	tup;
+	int			nkeys = 0;
+	ScanKeyData skey[2];
+	SysScanDesc scan;
+
+	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+	ScanKeyInit(&skey[nkeys++],
+				Anum_pg_subscription_rel_srsubid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(subid));
+
+	ScanKeyInit(&skey[nkeys++],
+				Anum_pg_subscription_rel_srsubstate,
+				BTEqualStrategyNumber, F_CHAREQ,
+				CharGetDatum(SUBREL_STATE_INIT));
+
+	scan = systable_beginscan(rel, InvalidOid, false,
+							  NULL, nkeys, skey);
+
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+	{
+		Form_pg_subscription_rel subrel;
+		SubscriptionRelState *relstate;
+		Datum		d;
+		bool		isnull;
+
+		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
+
+		relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
+		relstate->relid = subrel->srrelid;
+		relstate->state = subrel->srsubstate;
+		d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
+							Anum_pg_subscription_rel_srsublsn, &isnull);
+		if (isnull)
+			relstate->lsn = InvalidXLogRecPtr;
+		else
+			relstate->lsn = DatumGetLSN(d);
+
+		res = lappend(res, relstate);
+	}
+
+	/* Cleanup */
+	systable_endscan(scan);
+	table_close(rel, AccessShareLock);
+
+	return res;
+}
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index e2852286a7..34f4c0cb06 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -765,6 +765,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 	} SubRemoveRels;
 	SubRemoveRels *sub_remove_rels;
 	WalReceiverConn *wrconn;
+	List	   *sub_remove_slots = NIL;
+	LogicalRepWorker *worker;
 
 	/* Load the library providing us libpq calls. */
 	load_file("libpqwalreceiver", false);
@@ -887,7 +889,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 
 				RemoveSubscriptionRel(sub->oid, relid);
 
-				logicalrep_worker_stop(sub->oid, relid);
+				/* 
+				 * Find the logical replication sync worker if exists 
+				 * Store the slot number for dropping associated replication slot later.
+				 */
+				LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+				worker = logicalrep_worker_find(sub->oid, relid, false);
+				if (worker)
+				{
+					logicalrep_worker_stop(sub->oid, relid);
+					sub_remove_slots = lappend(sub_remove_slots, &worker->slot);
+				}
+				LWLockRelease(LogicalRepWorkerLock);
 
 				/*
 				 * For READY state, we would have already dropped the
@@ -921,31 +934,27 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 		}
 
 		/*
-		 * Drop the tablesync slots associated with removed tables. This has
-		 * to be at the end because otherwise if there is an error while doing
+		 * Drop the replication slots associated with tablesync workers for removed tables.
+		 * This has to be at the end because otherwise if there is an error while doing
 		 * the database operations we won't be able to rollback dropped slots.
 		 */
-		for (off = 0; off < remove_rel_len; off++)
+		foreach(lc, sub_remove_slots)
 		{
-			if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
-				sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
-			{
-				char		syncslotname[NAMEDATALEN] = {0};
+			char		syncslotname[NAMEDATALEN] = {0};
 
-				/*
-				 * For READY/SYNCDONE states we know the tablesync slot has
-				 * already been dropped by the tablesync worker.
-				 *
-				 * For other states, there is no certainty, maybe the slot
-				 * does not exist yet. Also, if we fail after removing some of
-				 * the slots, next time, it will again try to drop already
-				 * dropped slots and fail. For these reasons, we allow
-				 * missing_ok = true for the drop.
-				 */
-				ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
-												syncslotname, sizeof(syncslotname));
-				ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
-			}
+			int *slot_to_drop = (int *) palloc(sizeof(int));
+			memcpy(slot_to_drop, lfirst(lc), sizeof(int));
+
+			/*
+			 * There is no certainty, maybe the slot
+			 * does not exist yet. Also, if we fail after removing some of
+			 * the slots, next time, it will again try to drop already
+			 * dropped slots and fail. For these reasons, we allow
+			 * missing_ok = true for the drop.
+			 */
+			ReplicationSlotNameForTablesync(sub->oid, *slot_to_drop,
+											syncslotname, sizeof(syncslotname));
+			ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
 		}
 	}
 	PG_FINALLY();
@@ -1530,39 +1539,16 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 
 	PG_TRY();
 	{
-		foreach(lc, rstates)
-		{
-			SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
-			Oid			relid = rstate->relid;
-
-			/* Only cleanup resources of tablesync workers */
-			if (!OidIsValid(relid))
-				continue;
-
-			/*
-			 * Drop the tablesync slots associated with removed tables.
-			 *
-			 * For SYNCDONE/READY states, the tablesync slot is known to have
-			 * already been dropped by the tablesync worker.
-			 *
-			 * For other states, there is no certainty, maybe the slot does
-			 * not exist yet. Also, if we fail after removing some of the
-			 * slots, next time, it will again try to drop already dropped
-			 * slots and fail. For these reasons, we allow missing_ok = true
-			 * for the drop.
-			 */
-			if (rstate->state != SUBREL_STATE_SYNCDONE)
-			{
-				char		syncslotname[NAMEDATALEN] = {0};
+		List *slots = NULL;
 
-				ReplicationSlotNameForTablesync(subid, relid, syncslotname,
-												sizeof(syncslotname));
-				ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
-			}
+		
+		slots = GetReplicationSlotNamesBySubId(wrconn, subid, true);
+		foreach(lc, slots)
+		{
+			char *syncslotname = (char *) lfirst(lc);
+			ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
 		}
 
-		list_free(rstates);
-
 		/*
 		 * If there is a slot associated with the subscription, then drop the
 		 * replication slot at the publisher.
@@ -1591,6 +1577,69 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	table_close(rel, NoLock);
 }
 
+/*
+ * GetReplicationSlotNamesBySubId
+ *
+ * WRITE COMMENT HERE
+ */
+List *
+GetReplicationSlotNamesBySubId(WalReceiverConn *wrconn, Oid subid, bool missing_ok){
+	StringInfoData cmd;
+	TupleTableSlot *slot;
+	Oid			tableRow[1] = {NAMEOID};
+	List	   *tablelist = NIL;
+
+	Assert(wrconn);
+
+	load_file("libpqwalreceiver", false);
+
+	initStringInfo(&cmd);
+	appendStringInfo(&cmd, "SELECT slot_name"
+						" FROM pg_replication_slots"
+						" WHERE slot_name LIKE 'pg_%i_sync_%%';",
+						 subid);
+	PG_TRY();
+	{
+		WalRcvExecResult *res;
+
+		res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
+
+		if (res->status != WALRCV_OK_TUPLES)
+		{
+			ereport(ERROR,
+					 errmsg("not tuple returned."));
+		}
+
+		/* Process tables. */
+		slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+		while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+		{
+			char	   *repslotname;
+			char	   *slotattr;
+			bool		isnull;
+
+			slotattr = NameStr(*DatumGetName(slot_getattr(slot, 1, &isnull)));
+			Assert(!isnull);
+
+			repslotname = palloc(sizeof(char) * strlen(slotattr) + 1);
+			memcpy(repslotname, slotattr, sizeof(char) * strlen(slotattr));
+			repslotname[strlen(slotattr)] = '\0';
+			tablelist = lappend(tablelist, repslotname);
+
+			ExecClearTuple(slot);
+		}
+		ExecDropSingleTupleTableSlot(slot);
+
+		walrcv_clear_result(res);
+	}
+	PG_FINALLY();
+	{
+		pfree(cmd.data);
+	}
+	PG_END_TRY();\
+	return tablelist;
+}
+
 /*
  * Drop the replication slot at the publisher node using the replication
  * connection.
@@ -1832,6 +1881,7 @@ static void
 ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
 {
 	ListCell   *lc;
+	LogicalRepWorker *worker;
 
 	foreach(lc, rstates)
 	{
@@ -1842,15 +1892,21 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
 		if (!OidIsValid(relid))
 			continue;
 
+		/* Check if there is a sync worker for the relation */
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		worker = logicalrep_worker_find(subid, relid, false);
+		LWLockRelease(LogicalRepWorkerLock);
+		
 		/*
 		 * Caller needs to ensure that relstate doesn't change underneath us.
 		 * See DropSubscription where we get the relstates.
 		 */
-		if (rstate->state != SUBREL_STATE_SYNCDONE)
+		if (worker &&
+			rstate->state != SUBREL_STATE_SYNCDONE)
 		{
 			char		syncslotname[NAMEDATALEN] = {0};
 
-			ReplicationSlotNameForTablesync(subid, relid, syncslotname,
+			ReplicationSlotNameForTablesync(subid, worker->slot, syncslotname,
 											sizeof(syncslotname));
 			elog(WARNING, "could not drop tablesync replication slot \"%s\"",
 				 syncslotname);
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 2bdab53e19..918d8137c0 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -370,7 +370,9 @@ retry:
 	/* Prepare the worker slot. */
 	worker->launch_time = now;
 	worker->in_use = true;
+	worker->is_first_run = true;
 	worker->generation++;
+	worker->slot = slot;
 	worker->proc = NULL;
 	worker->dbid = dbid;
 	worker->userid = userid;
@@ -378,6 +380,7 @@ retry:
 	worker->relid = relid;
 	worker->relstate = SUBREL_STATE_UNKNOWN;
 	worker->relstate_lsn = InvalidXLogRecPtr;
+	worker->move_to_next_rel = false;
 	worker->stream_fileset = NULL;
 	worker->last_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->last_send_time);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 670c6fcada..c82a203fc5 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -126,12 +126,8 @@ static bool FetchTableStates(bool *started_tx);
 
 static StringInfo copybuf = NULL;
 
-/*
- * Exit routine for synchronization worker.
- */
 static void
-pg_attribute_noreturn()
-finish_sync_worker(void)
+clean_sync_worker(void)
 {
 	/*
 	 * Commit any outstanding transaction. This is the usual case, unless
@@ -143,18 +139,27 @@ finish_sync_worker(void)
 		pgstat_report_stat(true);
 	}
 
-	/* And flush all writes. */
-	XLogFlush(GetXLogWriteRecPtr());
-
-	StartTransactionCommand();
-	ereport(LOG,
-			(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
-					MySubscription->name,
-					get_rel_name(MyLogicalRepWorker->relid))));
-	CommitTransactionCommand();
+	/* Disconnect from publisher.
+	 * Otherwise reused sync workers causes exceeding max_wal_senders 
+	 */
+	walrcv_disconnect(LogRepWorkerWalRcvConn);
+	LogRepWorkerWalRcvConn = NULL;
 
 	/* Find the main apply worker and signal it. */
 	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+}
+
+/*
+ * Exit routine for synchronization worker.
+ */
+static void
+pg_attribute_noreturn()
+finish_sync_worker(void)
+{
+	clean_sync_worker();
+	
+	/* And flush all writes. */
+	XLogFlush(GetXLogWriteRecPtr());
 
 	/* Stop gracefully */
 	proc_exit(0);
@@ -180,7 +185,7 @@ wait_for_relation_state_change(Oid relid, char expected_state)
 		LogicalRepWorker *worker;
 		XLogRecPtr	statelsn;
 
-		CHECK_FOR_INTERRUPTS();
+		CHECK_FOR_INTERRUPTS();		
 
 		InvalidateCatalogSnapshot();
 		state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
@@ -284,6 +289,10 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
 static void
 process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 {
+	List	   *rstates;
+	SubscriptionRelState *rstate;
+	ListCell   *lc;
+
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 
 	if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
@@ -323,18 +332,64 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		 * able to rollback dropped slot.
 		 */
 		ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
-										MyLogicalRepWorker->relid,
+										MyLogicalRepWorker->slot,
 										syncslotname,
 										sizeof(syncslotname));
 
 		/*
-		 * It is important to give an error if we are unable to drop the slot,
-		 * otherwise, it won't be dropped till the corresponding subscription
-		 * is dropped. So passing missing_ok = false.
+		 * Check if any table whose relation state is still INIT. 
+		 * If a table in INIT state is found, the worker will not be finished,
+		 * it will be reused instead.
 		 */
-		ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
+		rstates = GetSubscriptionInitStateRelations(MySubscription->oid);
+		
+		foreach (lc, rstates)
+		{
+			rstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
+			memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
 
-		finish_sync_worker();
+			/* 
+			 * Pick the table for the next run
+			 * if there is not another worker already picked that table.
+			 */
+			LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+			if (!logicalrep_worker_find(MySubscription->oid, rstate->relid, false))
+			{
+				ereport(LOG,
+						(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
+								MySubscription->name,
+								get_rel_name(MyLogicalRepWorker->relid))));
+
+				/* Update worker state for the next table */
+				SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+				MyLogicalRepWorker->is_first_run = false;
+				MyLogicalRepWorker->relid = rstate->relid;
+				MyLogicalRepWorker->relstate = rstate->state;
+				MyLogicalRepWorker->relstate_lsn = rstate->lsn;
+				MyLogicalRepWorker->move_to_next_rel = true;
+				SpinLockRelease(&MyLogicalRepWorker->relmutex);
+				LWLockRelease(LogicalRepWorkerLock);
+				break;
+			}
+			LWLockRelease(LogicalRepWorkerLock);
+		}
+
+		/* Cleanup before next run or ending the worker. */
+		if(!MyLogicalRepWorker->move_to_next_rel)
+		{
+		   /*
+			* It is important to give an error if we are unable to drop the slot,
+			* otherwise, it won't be dropped till the corresponding subscription
+			* is dropped. So passing missing_ok = false.
+			*/
+			ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
+
+			finish_sync_worker();
+		}
+		else
+		{
+			clean_sync_worker();
+		}
 	}
 	else
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
@@ -1152,11 +1207,11 @@ copy_table(Relation rel)
  * had changed.
  */
 void
-ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
+ReplicationSlotNameForTablesync(Oid suboid, int slot,
 								char *syncslotname, int szslot)
 {
-	snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
-			 relid, GetSystemIdentifier());
+	snprintf(syncslotname, szslot, "pg_%u_sync_%i_" UINT64_FORMAT, suboid,
+			 slot, GetSystemIdentifier());
 }
 
 /*
@@ -1219,7 +1274,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	/* Calculate the name of the tablesync slot. */
 	slotname = (char *) palloc(NAMEDATALEN);
 	ReplicationSlotNameForTablesync(MySubscription->oid,
-									MyLogicalRepWorker->relid,
+									MyLogicalRepWorker->slot,
 									slotname,
 									NAMEDATALEN);
 
@@ -1356,11 +1411,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	 * drop subscription happens which would complete without removing this
 	 * slot leading to a dangling slot on the server.
 	 */
-	HOLD_INTERRUPTS();
-	walrcv_create_slot(LogRepWorkerWalRcvConn,
-					   slotname, false /* permanent */ , false /* two_phase */ ,
-					   CRS_USE_SNAPSHOT, origin_startpos);
-	RESUME_INTERRUPTS();
+	if (MyLogicalRepWorker->is_first_run)
+	{
+		HOLD_INTERRUPTS();
+		walrcv_create_slot(LogRepWorkerWalRcvConn,
+						slotname, false /* permanent */ , false /* two_phase */ ,
+						CRS_USE_SNAPSHOT, origin_startpos);
+		RESUME_INTERRUPTS();
+	}
 
 	/*
 	 * Setup replication origin tracking. The purpose of doing this before the
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 38e3b1c1b3..31ed8ed3d0 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -315,6 +315,7 @@ static void stream_cleanup_files(Oid subid, TransactionId xid);
 static void stream_open_file(Oid subid, TransactionId xid, bool first);
 static void stream_write_change(char action, StringInfo s);
 static void stream_close_file(void);
+static void stream_build_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos);
 
 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
 
@@ -2814,6 +2815,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 			/* Process any table synchronization changes. */
 			process_syncing_tables(last_received);
+			if (MyLogicalRepWorker->move_to_next_rel)
+			{
+				endofstream = true;
+			}
 		}
 
 		/* Cleanup the memory. */
@@ -2915,8 +2920,16 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
 
-	/* All done */
-	walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
+	/* 
+	 * If it's moving to next relation, this is a sync worker.
+	 * Sync workers end the streaming during process_syncing_tables_for_sync.
+	 * Calling endstreaming twice causes "no COPY in progress" errors.
+	 */
+	if (!MyLogicalRepWorker->move_to_next_rel)
+	{
+		/* All done */
+		walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
+	}
 }
 
 /*
@@ -3457,6 +3470,34 @@ stream_write_change(char action, StringInfo s)
 	BufFileWrite(stream_fd, &s->data[s->cursor], len);
 }
 
+/*
+ * stream_build_options_replication
+ * 		Build logical replication streaming options.
+ *
+ * This function sets streaming options including replication slot name
+ * and origin start position. Workers need these options for logical replication.
+ */
+static void
+stream_build_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
+{
+	int server_version;
+
+	options->logical = true;
+	options->startpoint = *origin_startpos;
+	options->slotname = slotname;
+
+	server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
+	options->proto.logical.proto_version =
+		server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
+		server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
+		LOGICALREP_PROTO_VERSION_NUM;
+
+	options->proto.logical.publication_names = MySubscription->publications;
+	options->proto.logical.binary = MySubscription->binary;
+	options->proto.logical.streaming = MySubscription->stream;
+	options->proto.logical.twophase = false;
+}
+
 /*
  * Cleanup the memory for subxacts and reset the related variables.
  */
@@ -3568,6 +3609,136 @@ start_apply(XLogRecPtr origin_startpos)
 	PG_END_TRY();
 }
 
+/*
+ * Runs the tablesync worker.
+ * It starts table sync. After successful sync, 
+ * builds streaming options and starts streaming. 
+ */
+static void
+run_tablesync_worker(WalRcvStreamOptions *options, 
+					 char *slotname,
+					 char *originname,
+					 int originame_size,
+					 XLogRecPtr *origin_startpos)
+{
+	/* Set this to false for safety, in case we're already reusing the worker */
+    MyLogicalRepWorker->move_to_next_rel = false;
+
+    start_table_sync(origin_startpos, &slotname);
+
+    /*
+        * Allocate the origin name in long-lived context for error context
+        * message.
+        */
+    ReplicationOriginNameForTablesync(MySubscription->oid,
+                                        MyLogicalRepWorker->relid,
+                                        originname,
+                                        originame_size);
+    apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+                                                                originname);
+    
+    stream_build_options(options, slotname, origin_startpos);
+
+    /* Start normal logical streaming replication. */
+	walrcv_startstreaming(LogRepWorkerWalRcvConn, options);
+}
+
+/*
+ * Runs the apply worker.
+ * It sets up replication origin, the streaming options 
+ * and then starts streaming. 
+ */
+static void
+run_apply_worker(WalRcvStreamOptions *options,
+				 char *slotname,
+				 char *originname,
+				 int originname_size,
+				 XLogRecPtr *origin_startpos)
+{
+    RepOriginId originid;
+    TimeLineID	startpointTLI;
+    char	   *err;
+
+    slotname = MySubscription->slotname;
+
+    /*
+	 * This shouldn't happen if the subscription is enabled, but guard
+	 * against DDL bugs or manual catalog changes.  (libpqwalreceiver will
+	 * crash if slot is NULL.)
+	 */
+    if (!slotname)
+        ereport(ERROR,
+                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                    errmsg("subscription has no replication slot set")));
+
+    /* Setup replication origin tracking. */
+    StartTransactionCommand();
+    snprintf(originname, originname_size, "pg_%u", MySubscription->oid);
+    originid = replorigin_by_name(originname, true);
+    if (!OidIsValid(originid))
+        originid = replorigin_create(originname);
+    replorigin_session_setup(originid);
+    replorigin_session_origin = originid;
+    *origin_startpos = replorigin_session_get_progress(false);
+    CommitTransactionCommand();
+
+    LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
+                                            MySubscription->name, &err);
+    if (LogRepWorkerWalRcvConn == NULL)
+        ereport(ERROR,
+                (errcode(ERRCODE_CONNECTION_FAILURE),
+                    errmsg("could not connect to the publisher: %s", err)));
+
+    /*
+	 * We don't really use the output identify_system for anything but it
+	 * does some initializations on the upstream so let's still call it.
+	 */
+    (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
+
+    /*
+	 * Allocate the origin name in long-lived context for error context
+	 * message.
+	 */
+    apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+                                                                originname);
+
+    stream_build_options(options, slotname, origin_startpos);
+
+    /*
+     * Even when the two_phase mode is requested by the user, it remains
+     * as the tri-state PENDING until all tablesyncs have reached READY
+     * state. Only then, can it become ENABLED.
+     *
+     * Note: If the subscription has no tables then leave the state as
+     * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+     * work.
+     */
+    if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+        AllTablesyncsReady())
+    {
+        /* Start streaming with two_phase enabled */
+        options->proto.logical.twophase = true;
+        walrcv_startstreaming(LogRepWorkerWalRcvConn, options);
+
+        StartTransactionCommand();
+        UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
+        MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
+        CommitTransactionCommand();
+    }
+    else
+    {
+        walrcv_startstreaming(LogRepWorkerWalRcvConn, options);
+    }
+
+    ereport(DEBUG1,
+            (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s",
+                    MySubscription->name,
+                    MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
+                    MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
+                    MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
+                    "?")));
+}
+
 /* Logical Replication Apply worker entry point */
 void
 ApplyWorkerMain(Datum main_arg)
@@ -3578,7 +3749,6 @@ ApplyWorkerMain(Datum main_arg)
 	XLogRecPtr	origin_startpos = InvalidXLogRecPtr;
 	char	   *myslotname = NULL;
 	WalRcvStreamOptions options;
-	int			server_version;
 
 	/* Attach to slot */
 	logicalrep_worker_attach(worker_slot);
@@ -3669,141 +3839,55 @@ ApplyWorkerMain(Datum main_arg)
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
 
-	if (am_tablesync_worker())
-	{
-		start_table_sync(&origin_startpos, &myslotname);
-
-		/*
-		 * Allocate the origin name in long-lived context for error context
-		 * message.
-		 */
-		ReplicationOriginNameForTablesync(MySubscription->oid,
-										  MyLogicalRepWorker->relid,
-										  originname,
-										  sizeof(originname));
-		apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
-																   originname);
-	}
-	else
-	{
-		/* This is main apply worker */
-		RepOriginId originid;
-		TimeLineID	startpointTLI;
-		char	   *err;
-
-		myslotname = MySubscription->slotname;
-
-		/*
-		 * This shouldn't happen if the subscription is enabled, but guard
-		 * against DDL bugs or manual catalog changes.  (libpqwalreceiver will
-		 * crash if slot is NULL.)
-		 */
-		if (!myslotname)
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("subscription has no replication slot set")));
-
-		/* Setup replication origin tracking. */
-		StartTransactionCommand();
-		snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
-		originid = replorigin_by_name(originname, true);
-		if (!OidIsValid(originid))
-			originid = replorigin_create(originname);
-		replorigin_session_setup(originid);
-		replorigin_session_origin = originid;
-		origin_startpos = replorigin_session_get_progress(false);
-		CommitTransactionCommand();
-
-		LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
-												MySubscription->name, &err);
-		if (LogRepWorkerWalRcvConn == NULL)
-			ereport(ERROR,
-					(errcode(ERRCODE_CONNECTION_FAILURE),
-					 errmsg("could not connect to the publisher: %s", err)));
-
-		/*
-		 * We don't really use the output identify_system for anything but it
-		 * does some initializations on the upstream so let's still call it.
-		 */
-		(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
-
-		/*
-		 * Allocate the origin name in long-lived context for error context
-		 * message.
-		 */
-		apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
-																   originname);
-	}
-
 	/*
-	 * Setup callback for syscache so that we know when something changes in
-	 * the subscription relation state.
-	 */
+	* Setup callback for syscache so that we know when something changes in
+	* the subscription relation state.
+	* Do this outside the loop to avoid exceeding MAX_SYSCACHE_CALLBACKS
+	*/
 	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
-								  invalidate_syncing_table_states,
-								  (Datum) 0);
-
-	/* Build logical replication streaming options. */
-	options.logical = true;
-	options.startpoint = origin_startpos;
-	options.slotname = myslotname;
+								invalidate_syncing_table_states,
+								(Datum) 0);
 
-	server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
-	options.proto.logical.proto_version =
-		server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
-		server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
-		LOGICALREP_PROTO_VERSION_NUM;
+	/*
+	 * The loop where worker does its job.
+	 * It loops until the worker is not reused. 
+	 */
+	while (MyLogicalRepWorker->is_first_run || 
+			MyLogicalRepWorker->move_to_next_rel)
+	{
+		if (am_tablesync_worker())
+			{
+				/* 
+				* This is a tablesync worker. 
+				* Start syncing tables before starting the apply loop.  
+				*/
+				run_tablesync_worker(&options, myslotname, originname, sizeof(originname), &origin_startpos);
+			}
+			else
+			{
+				/* This is main apply worker */
+				run_apply_worker(&options, myslotname, originname, sizeof(originname), &origin_startpos);
+			}
 
-	options.proto.logical.publication_names = MySubscription->publications;
-	options.proto.logical.binary = MySubscription->binary;
-	options.proto.logical.streaming = MySubscription->stream;
-	options.proto.logical.twophase = false;
+		/* Run the main loop. */
+		start_apply(origin_startpos);
 
-	if (!am_tablesync_worker())
-	{
-		/*
-		 * Even when the two_phase mode is requested by the user, it remains
-		 * as the tri-state PENDING until all tablesyncs have reached READY
-		 * state. Only then, can it become ENABLED.
-		 *
-		 * Note: If the subscription has no tables then leave the state as
-		 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
-		 * work.
-		 */
-		if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
-			AllTablesyncsReady())
+		if (MyLogicalRepWorker->move_to_next_rel)
 		{
-			/* Start streaming with two_phase enabled */
-			options.proto.logical.twophase = true;
-			walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+			/* Reset the currenct replication origin session.
+			* Since we'll use the same process for another relation, it needs to be reset 
+			* and will be created again later while syncing the new relation.
+			*/
+			replorigin_session_origin = InvalidRepOriginId;
+			replorigin_session_reset();
 
 			StartTransactionCommand();
-			UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
-			MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
+			ereport(LOG,
+					(errmsg("logical replication table synchronization worker for subscription \"%s\" has moved to sync table \"%s\".",
+							MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
 			CommitTransactionCommand();
 		}
-		else
-		{
-			walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
-		}
-
-		ereport(DEBUG1,
-				(errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s",
-						MySubscription->name,
-						MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
-						MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
-						MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
-						"?")));
 	}
-	else
-	{
-		/* Start normal logical streaming replication. */
-		walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
-	}
-
-	/* Run the main loop. */
-	start_apply(origin_startpos);
-
 	proc_exit(0);
 }
 
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 9df99c3418..21a773ad56 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -90,5 +90,6 @@ extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 extern bool HasSubscriptionRelations(Oid subid);
 extern List *GetSubscriptionRelations(Oid subid);
 extern List *GetSubscriptionNotReadyRelations(Oid subid);
+extern List *GetSubscriptionInitStateRelations(Oid subid);
 
 #endif							/* PG_SUBSCRIPTION_REL_H */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 8c9f3321d5..57c4215cfe 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -217,8 +217,9 @@ extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
 extern int	ReplicationSlotIndex(ReplicationSlot *slot);
-extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot);
+extern void ReplicationSlotNameForTablesync(Oid suboid, int slot, char *syncslotname, int szslot);
 extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
+extern List* GetReplicationSlotNamesBySubId(WalReceiverConn *wrconn, Oid subid, bool missing_ok);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 901845abc2..db4e96be80 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -30,9 +30,18 @@ typedef struct LogicalRepWorker
 	/* Indicates if this slot is used or free. */
 	bool		in_use;
 
+	/* 
+	 * Indicates if worker is running for the first time
+	 * or in reuse
+	 */
+	bool		is_first_run;
+
 	/* Increased every time the slot is taken by new worker. */
 	uint16		generation;
 
+	/* The slot that this worker is using */
+	int		slot;
+
 	/* Pointer to proc array. NULL if not running. */
 	PGPROC	   *proc;
 
@@ -51,6 +60,11 @@ typedef struct LogicalRepWorker
 	XLogRecPtr	relstate_lsn;
 	slock_t		relmutex;
 
+	/* 
+	 * Used to indicate whether sync worker will be reused for another relation
+	 */
+	bool		move_to_next_rel;
+	
 	/*
 	 * Used to create the changes and subxact files for the streaming
 	 * transactions.  Upon the arrival of the first streaming transaction, the
-- 
2.25.1

