From e8954bd0f0aeccd911d6dffe9e39c1636634218a Mon Sep 17 00:00:00 2001
From: Melih Mutlu <m.melihmutlu@gmail.com>
Date: Mon, 5 Jun 2023 15:04:41 +0300
Subject: [PATCH v21 1/5] Refactor to split Apply and Tablesync Workers

Both apply and tablesync workers were using ApplyWorkerMain() as entry
point. As the name implies, ApplyWorkerMain() should be considered as
the main function for apply workers. Tablesync worker's path was hidden
and does not have enough in common to share the same main function with
apply worker.

Also, most of the code shared by both worker types is already combined
in LogicalRepApplyLoop(). There is no need to combine the rest in
ApplyWorkerMain() anymore.

This patch introduces TablesyncWorkerMain() as a new entry point for
tablesync workers. This aims to increase code readability and help to
the upcoming reuse tablesync worker improvements.

Discussion: http://postgr.es/m/CAGPVpCTq=rUDd4JUdaRc1XUWf4BrH2gdSNf3rtOMUGj9rPpfzQ@mail.gmail.com
---
 src/backend/postmaster/bgworker.c             |   3 +
 .../replication/logical/applyparallelworker.c |   2 +-
 src/backend/replication/logical/launcher.c    |  32 +-
 src/backend/replication/logical/tablesync.c   |  98 ++++-
 src/backend/replication/logical/worker.c      | 386 ++++++++----------
 src/include/replication/logicalworker.h       |   1 +
 src/include/replication/worker_internal.h     |  11 +-
 7 files changed, 307 insertions(+), 226 deletions(-)

diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 5b4bd71694..505e38376c 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -131,6 +131,9 @@ static const struct
 	},
 	{
 		"ParallelApplyWorkerMain", ParallelApplyWorkerMain
+	},
+	{
+		"TablesyncWorkerMain", TablesyncWorkerMain
 	}
 };
 
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 6fb96148f4..1d4e83c4c1 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -942,7 +942,7 @@ ParallelApplyWorkerMain(Datum main_arg)
 	MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
 		MyLogicalRepWorker->reply_time = 0;
 
-	InitializeApplyWorker();
+	InitializeLogRepWorker();
 
 	InitializingApplyWorker = false;
 
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 542af7d863..e231fa7f95 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -459,24 +459,30 @@ retry:
 	snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
 
 	if (is_parallel_apply_worker)
+	{
 		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
-	else
-		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
-
-	if (OidIsValid(relid))
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication worker for subscription %u sync %u", subid, relid);
-	else if (is_parallel_apply_worker)
+				 "logical replication parallel apply worker for subscription %u",
+				 subid);
+		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
+	}
+	else if (OidIsValid(relid))
+	{
+		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication parallel apply worker for subscription %u", subid);
+				 "logical replication tablesync worker for subscription %u sync %u",
+				 subid,
+				 relid);
+		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
+	}
 	else
+	{
+		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication apply worker for subscription %u", subid);
-
-	if (is_parallel_apply_worker)
-		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
-	else
-		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
+				 "logical replication apply worker for subscription %u",
+				 subid);
+		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
+	}
 
 	bgw.bgw_restart_time = BGW_NEVER_RESTART;
 	bgw.bgw_notify_pid = MyProcPid;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6d461654ab..729f48a3b5 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -104,17 +104,21 @@
 #include "nodes/makefuncs.h"
 #include "parser/parse_relation.h"
 #include "pgstat.h"
+#include "postmaster/interrupt.h"
 #include "replication/logicallauncher.h"
 #include "replication/logicalrelation.h"
+#include "replication/logicalworker.h"
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
 #include "replication/slot.h"
 #include "replication/origin.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
+#include "tcop/tcopprot.h"
 #include "utils/acl.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
+#include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/rls.h"
@@ -1241,7 +1245,7 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
  *
  * The returned slot name is palloc'ed in current memory context.
  */
-char *
+static char *
 LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 {
 	char	   *slotname;
@@ -1584,6 +1588,98 @@ FetchTableStates(bool *started_tx)
 	return has_subrels;
 }
 
+/*
+ * Execute the initial sync with error handling. Disable the subscription,
+ * if it's required.
+ *
+ * Allocate the slot name in long-lived context on return. Note that we don't
+ * handle FATAL errors which are probably because of system resource error and
+ * are not repeatable.
+ */
+static void
+start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
+{
+	char	   *syncslotname = NULL;
+
+	Assert(am_tablesync_worker());
+
+	PG_TRY();
+	{
+		/* Call initial sync. */
+		syncslotname = LogicalRepSyncTableStart(origin_startpos);
+	}
+	PG_CATCH();
+	{
+		if (MySubscription->disableonerr)
+			DisableSubscriptionAndExit();
+		else
+		{
+			/*
+			 * Report the worker failed during table synchronization. Abort
+			 * the current transaction so that the stats message is sent in an
+			 * idle state.
+			 */
+			AbortOutOfAnyTransaction();
+			pgstat_report_subscription_error(MySubscription->oid, false);
+
+			PG_RE_THROW();
+		}
+	}
+	PG_END_TRY();
+
+	/* allocate slot name in long-lived context */
+	*myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
+	pfree(syncslotname);
+}
+
+/*
+ * Runs the tablesync worker.
+ *
+ * It starts syncing tables. After a successful sync, sets streaming options
+ * and starts streaming to catchup.
+ */
+static void
+run_tablesync_worker(WalRcvStreamOptions *options,
+					 char *slotname,
+					 char *originname,
+					 int originname_size,
+					 XLogRecPtr *origin_startpos)
+{
+	start_table_sync(origin_startpos, &slotname);
+
+	ReplicationOriginNameForLogicalRep(MySubscription->oid,
+									   MyLogicalRepWorker->relid,
+									   originname,
+									   originname_size);
+
+	set_apply_error_context_origin(originname);
+
+	set_stream_options(options, slotname, origin_startpos);
+
+	walrcv_startstreaming(LogRepWorkerWalRcvConn, options);
+
+	/* Start applying changes to catchup. */
+	start_apply(*origin_startpos);
+}
+
+/* Logical Replication Tablesync worker entry point */
+void
+TablesyncWorkerMain(Datum main_arg)
+{
+	int			worker_slot = DatumGetInt32(main_arg);
+	char		originname[NAMEDATALEN];
+	XLogRecPtr	origin_startpos = InvalidXLogRecPtr;
+	char	   *myslotname = NULL;
+	WalRcvStreamOptions options;
+
+	StartLogRepWorker(worker_slot);
+
+	run_tablesync_worker(&options, myslotname, originname,
+						 sizeof(originname), &origin_startpos);
+
+	finish_sync_worker();
+}
+
 /*
  * If the subscription has no tables then return false.
  *
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index dd353fd1cb..3b8976f717 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -395,8 +395,6 @@ static void stream_close_file(void);
 
 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
 
-static void DisableSubscriptionAndExit(void);
-
 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
 static void apply_handle_insert_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
@@ -4313,6 +4311,71 @@ stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
 	stream_stop_internal(xid);
 }
 
+/*
+ * Sets streaming options including replication slot name and origin start
+ * position. Workers need these options for logical replication.
+ */
+void
+set_stream_options(WalRcvStreamOptions *options,
+				   char *slotname,
+				   XLogRecPtr *origin_startpos)
+{
+	int			server_version;
+
+	options->logical = true;
+	options->startpoint = *origin_startpos;
+	options->slotname = slotname;
+
+	server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
+	options->proto.logical.proto_version =
+		server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
+		server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
+		server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
+		LOGICALREP_PROTO_VERSION_NUM;
+
+	options->proto.logical.publication_names = MySubscription->publications;
+	options->proto.logical.binary = MySubscription->binary;
+
+	/*
+	 * Assign the appropriate option value for streaming option according to
+	 * the 'streaming' mode and the publisher's ability to support that mode.
+	 */
+	if (server_version >= 160000 &&
+		MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
+	{
+		options->proto.logical.streaming_str = "parallel";
+		MyLogicalRepWorker->parallel_apply = true;
+	}
+	else if (server_version >= 140000 &&
+			 MySubscription->stream != LOGICALREP_STREAM_OFF)
+	{
+		options->proto.logical.streaming_str = "on";
+		MyLogicalRepWorker->parallel_apply = false;
+	}
+	else
+	{
+		options->proto.logical.streaming_str = NULL;
+		MyLogicalRepWorker->parallel_apply = false;
+	}
+
+	options->proto.logical.twophase = false;
+	options->proto.logical.origin = pstrdup(MySubscription->origin);
+
+	/*
+	 * Even when the two_phase mode is requested by the user, it remains as
+	 * the tri-state PENDING until all tablesyncs have reached READY state.
+	 * Only then, can it become ENABLED.
+	 *
+	 * Note: If the subscription has no tables then leave the state as
+	 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+	 * work.
+	 */
+	if (!am_tablesync_worker() &&
+		MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+		AllTablesyncsReady())
+		options->proto.logical.twophase = true;
+}
+
 /*
  * Cleanup the memory for subxacts and reset the related variables.
  */
@@ -4347,24 +4410,18 @@ TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
 }
 
 /*
- * Execute the initial sync with error handling. Disable the subscription,
- * if it's required.
+ * Common function to run the apply loop with error handling. Disable the
+ * subscription, if necessary.
  *
- * Allocate the slot name in long-lived context on return. Note that we don't
- * handle FATAL errors which are probably because of system resource error and
- * are not repeatable.
+ * Note that we don't handle FATAL errors which are probably because
+ * of system resource error and are not repeatable.
  */
-static void
-start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
+void
+start_apply(XLogRecPtr origin_startpos)
 {
-	char	   *syncslotname = NULL;
-
-	Assert(am_tablesync_worker());
-
 	PG_TRY();
 	{
-		/* Call initial sync. */
-		syncslotname = LogicalRepSyncTableStart(origin_startpos);
+		LogicalRepApplyLoop(origin_startpos);
 	}
 	PG_CATCH();
 	{
@@ -4373,65 +4430,117 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
 		else
 		{
 			/*
-			 * Report the worker failed during table synchronization. Abort
-			 * the current transaction so that the stats message is sent in an
+			 * Report the worker failed while applying changes. Abort the
+			 * current transaction so that the stats message is sent in an
 			 * idle state.
 			 */
 			AbortOutOfAnyTransaction();
-			pgstat_report_subscription_error(MySubscription->oid, false);
+			pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
 
 			PG_RE_THROW();
 		}
 	}
 	PG_END_TRY();
-
-	/* allocate slot name in long-lived context */
-	*myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
-	pfree(syncslotname);
 }
 
 /*
- * Run the apply loop with error handling. Disable the subscription,
- * if necessary.
+ * Runs the leader apply worker.
  *
- * Note that we don't handle FATAL errors which are probably because
- * of system resource error and are not repeatable.
+ * It sets up replication origin, streaming options and then starts streaming.
  */
 static void
-start_apply(XLogRecPtr origin_startpos)
+run_apply_worker(WalRcvStreamOptions *options,
+				 char *slotname,
+				 char *originname,
+				 int originname_size,
+				 XLogRecPtr *origin_startpos)
 {
-	PG_TRY();
+	RepOriginId originid;
+	TimeLineID	startpointTLI;
+	char	   *err;
+	bool		must_use_password;
+
+	slotname = MySubscription->slotname;
+
+	/*
+	 * This shouldn't happen if the subscription is enabled, but guard
+	 * against DDL bugs or manual catalog changes.  (libpqwalreceiver will
+	 * crash if slot is NULL.)
+	 */
+	if (!slotname)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("subscription has no replication slot set")));
+
+	/* Setup replication origin tracking. */
+	ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+									   originname, originname_size);
+	StartTransactionCommand();
+	originid = replorigin_by_name(originname, true);
+	if (!OidIsValid(originid))
+		originid = replorigin_create(originname);
+	replorigin_session_setup(originid, 0);
+	replorigin_session_origin = originid;
+	*origin_startpos = replorigin_session_get_progress(false);
+
+	/* Is the use of a password mandatory? */
+	must_use_password = MySubscription->passwordrequired &&
+		!superuser_arg(MySubscription->owner);
+
+	/* Note that the superuser_arg call can access the DB */
+	CommitTransactionCommand();
+
+	LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
+											must_use_password,
+											MySubscription->name, &err);
+
+	if (LogRepWorkerWalRcvConn == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_FAILURE),
+					errmsg("could not connect to the publisher: %s", err)));
+
+	/*
+	 * We don't really use the output identify_system for anything but it
+	 * does some initializations on the upstream so let's still call it.
+	 */
+	(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
+
+	set_apply_error_context_origin(originname);
+
+	set_stream_options(options, slotname, origin_startpos);
+
+	walrcv_startstreaming(LogRepWorkerWalRcvConn, options);
+
+	if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+		AllTablesyncsReady())
 	{
-		LogicalRepApplyLoop(origin_startpos);
+		StartTransactionCommand();
+		UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
+		MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
+		CommitTransactionCommand();
 	}
-	PG_CATCH();
-	{
-		if (MySubscription->disableonerr)
-			DisableSubscriptionAndExit();
-		else
-		{
-			/*
-			 * Report the worker failed while applying changes. Abort the
-			 * current transaction so that the stats message is sent in an
-			 * idle state.
-			 */
-			AbortOutOfAnyTransaction();
-			pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
 
-			PG_RE_THROW();
-		}
-	}
-	PG_END_TRY();
+	ereport(DEBUG1,
+			(errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
+							 MySubscription->name,
+							 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
+							 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
+							 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
+							 "?")));
+
+	/* Run the main loop. */
+	start_apply(*origin_startpos);
 }
 
 /*
- * Common initialization for leader apply worker and parallel apply worker.
+ * Common initialization for leader apply worker, parallel apply worker and
+ * tablesync worker.
  *
  * Initialize the database connection, in-memory subscription and necessary
  * config options.
  */
 void
-InitializeApplyWorker(void)
+InitializeLogRepWorker(void)
 {
 	MemoryContext oldctx;
 
@@ -4493,7 +4602,7 @@ InitializeApplyWorker(void)
 
 	if (am_tablesync_worker())
 		ereport(LOG,
-				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
+				(errmsg("logical replication worker for subscription \"%s\", table \"%s\" has started",
 						MySubscription->name,
 						get_rel_name(MyLogicalRepWorker->relid))));
 	else
@@ -4504,19 +4613,10 @@ InitializeApplyWorker(void)
 	CommitTransactionCommand();
 }
 
-/* Logical Replication Apply worker entry point */
+/* Common function to start the leader apply or tablesync worker. */
 void
-ApplyWorkerMain(Datum main_arg)
+StartLogRepWorker(int worker_slot)
 {
-	int			worker_slot = DatumGetInt32(main_arg);
-	char		originname[NAMEDATALEN];
-	XLogRecPtr	origin_startpos = InvalidXLogRecPtr;
-	char	   *myslotname = NULL;
-	WalRcvStreamOptions options;
-	int			server_version;
-
-	InitializingApplyWorker = true;
-
 	/* Attach to slot */
 	logicalrep_worker_attach(worker_slot);
 
@@ -4537,79 +4637,12 @@ ApplyWorkerMain(Datum main_arg)
 	/* Load the libpq-specific functions */
 	load_file("libpqwalreceiver", false);
 
-	InitializeApplyWorker();
-
-	InitializingApplyWorker = false;
+	InitializeLogRepWorker();
 
 	/* Connect to the origin and start the replication. */
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
 
-	if (am_tablesync_worker())
-	{
-		start_table_sync(&origin_startpos, &myslotname);
-
-		ReplicationOriginNameForLogicalRep(MySubscription->oid,
-										   MyLogicalRepWorker->relid,
-										   originname,
-										   sizeof(originname));
-		set_apply_error_context_origin(originname);
-	}
-	else
-	{
-		/* This is the leader apply worker */
-		RepOriginId originid;
-		TimeLineID	startpointTLI;
-		char	   *err;
-		bool		must_use_password;
-
-		myslotname = MySubscription->slotname;
-
-		/*
-		 * This shouldn't happen if the subscription is enabled, but guard
-		 * against DDL bugs or manual catalog changes.  (libpqwalreceiver will
-		 * crash if slot is NULL.)
-		 */
-		if (!myslotname)
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("subscription has no replication slot set")));
-
-		/* Setup replication origin tracking. */
-		StartTransactionCommand();
-		ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
-										   originname, sizeof(originname));
-		originid = replorigin_by_name(originname, true);
-		if (!OidIsValid(originid))
-			originid = replorigin_create(originname);
-		replorigin_session_setup(originid, 0);
-		replorigin_session_origin = originid;
-		origin_startpos = replorigin_session_get_progress(false);
-
-		/* Is the use of a password mandatory? */
-		must_use_password = MySubscription->passwordrequired &&
-			!superuser_arg(MySubscription->owner);
-
-		/* Note that the superuser_arg call can access the DB */
-		CommitTransactionCommand();
-
-		LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
-												must_use_password,
-												MySubscription->name, &err);
-		if (LogRepWorkerWalRcvConn == NULL)
-			ereport(ERROR,
-					(errcode(ERRCODE_CONNECTION_FAILURE),
-					 errmsg("could not connect to the publisher: %s", err)));
-
-		/*
-		 * We don't really use the output identify_system for anything but it
-		 * does some initializations on the upstream so let's still call it.
-		 */
-		(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
-
-		set_apply_error_context_origin(originname);
-	}
-
 	/*
 	 * Setup callback for syscache so that we know when something changes in
 	 * the subscription relation state.
@@ -4617,91 +4650,26 @@ ApplyWorkerMain(Datum main_arg)
 	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
 								  invalidate_syncing_table_states,
 								  (Datum) 0);
+}
 
-	/* Build logical replication streaming options. */
-	options.logical = true;
-	options.startpoint = origin_startpos;
-	options.slotname = myslotname;
-
-	server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
-	options.proto.logical.proto_version =
-		server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
-		server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
-		server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
-		LOGICALREP_PROTO_VERSION_NUM;
-
-	options.proto.logical.publication_names = MySubscription->publications;
-	options.proto.logical.binary = MySubscription->binary;
-
-	/*
-	 * Assign the appropriate option value for streaming option according to
-	 * the 'streaming' mode and the publisher's ability to support that mode.
-	 */
-	if (server_version >= 160000 &&
-		MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
-	{
-		options.proto.logical.streaming_str = "parallel";
-		MyLogicalRepWorker->parallel_apply = true;
-	}
-	else if (server_version >= 140000 &&
-			 MySubscription->stream != LOGICALREP_STREAM_OFF)
-	{
-		options.proto.logical.streaming_str = "on";
-		MyLogicalRepWorker->parallel_apply = false;
-	}
-	else
-	{
-		options.proto.logical.streaming_str = NULL;
-		MyLogicalRepWorker->parallel_apply = false;
-	}
-
-	options.proto.logical.twophase = false;
-	options.proto.logical.origin = pstrdup(MySubscription->origin);
+/* Logical Replication Apply worker entry point */
+void
+ApplyWorkerMain(Datum main_arg)
+{
+	int			worker_slot = DatumGetInt32(main_arg);
+	char		originname[NAMEDATALEN];
+	XLogRecPtr	origin_startpos = InvalidXLogRecPtr;
+	char	   *myslotname = NULL;
+	WalRcvStreamOptions options;
 
-	if (!am_tablesync_worker())
-	{
-		/*
-		 * Even when the two_phase mode is requested by the user, it remains
-		 * as the tri-state PENDING until all tablesyncs have reached READY
-		 * state. Only then, can it become ENABLED.
-		 *
-		 * Note: If the subscription has no tables then leave the state as
-		 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
-		 * work.
-		 */
-		if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
-			AllTablesyncsReady())
-		{
-			/* Start streaming with two_phase enabled */
-			options.proto.logical.twophase = true;
-			walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+	InitializingApplyWorker = true;
 
-			StartTransactionCommand();
-			UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
-			MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
-			CommitTransactionCommand();
-		}
-		else
-		{
-			walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
-		}
+	StartLogRepWorker(worker_slot);
 
-		ereport(DEBUG1,
-				(errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
-								 MySubscription->name,
-								 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
-								 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
-								 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
-								 "?")));
-	}
-	else
-	{
-		/* Start normal logical streaming replication. */
-		walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
-	}
+	InitializingApplyWorker = false;
 
-	/* Run the main loop. */
-	start_apply(origin_startpos);
+	run_apply_worker(&options, myslotname, originname,
+					 sizeof(originname), &origin_startpos);
 
 	proc_exit(0);
 }
@@ -4710,7 +4678,7 @@ ApplyWorkerMain(Datum main_arg)
  * After error recovery, disable the subscription in a new transaction
  * and exit cleanly.
  */
-static void
+void
 DisableSubscriptionAndExit(void)
 {
 	/*
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 39588da79f..bbd71d0b42 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -18,6 +18,7 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
 
 extern void ApplyWorkerMain(Datum main_arg);
 extern void ParallelApplyWorkerMain(Datum main_arg);
+extern void TablesyncWorkerMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 extern bool IsLogicalParallelApplyWorker(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 343e781896..9012af38cd 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -19,6 +19,7 @@
 #include "datatype/timestamp.h"
 #include "miscadmin.h"
 #include "replication/logicalrelation.h"
+#include "replication/walreceiver.h"
 #include "storage/buffile.h"
 #include "storage/fileset.h"
 #include "storage/lock.h"
@@ -243,7 +244,6 @@ extern int	logicalrep_sync_worker_count(Oid subid);
 
 extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 											   char *originname, Size szoriginname);
-extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
 
 extern bool AllTablesyncsReady(void);
 extern void UpdateTwoPhaseState(Oid suboid, char new_state);
@@ -265,7 +265,7 @@ extern void maybe_reread_subscription(void);
 
 extern void stream_cleanup_files(Oid subid, TransactionId xid);
 
-extern void InitializeApplyWorker(void);
+extern void InitializeLogRepWorker(void);
 
 extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
 
@@ -304,6 +304,13 @@ extern void pa_decr_and_wait_stream_block(void);
 
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
+extern void set_stream_options(WalRcvStreamOptions *options,
+							   char *slotname,
+							   XLogRecPtr *origin_startpos);
+
+extern void start_apply(XLogRecPtr origin_startpos);
+extern void DisableSubscriptionAndExit(void);
+extern void StartLogRepWorker(int worker_slot);
 
 #define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
 
-- 
2.25.1

