From 2e9b2d343f69c9d80d3a37249283b8ba9632b1d5 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Tue, 12 Aug 2025 23:02:37 -0400
Subject: [PATCH v6] Improve initial slot synchronization in
 pg_sync_replication_slots()

During initial slot synchronization on a standby, the operation may fail if
required catalog rows or WALs have been removed or are at risk of removal. The
slotsync worker handles this by creating a temporary slot for initial sync and
retain it even in case of failure. It will keep retrying until the slot on the
primary has been advanced to a position where all the required data are also
available on the standby. However, pg_sync_replication_slots() had
no such protection mechanism.

The SQL API would fail immediately if synchronization requirements weren't
met. This could lead to permanent failure as the standby might continue
removing the still-required data.

To address this, we now make pg_sync_replication_slots() wait for the primary
slot to advance to a suitable position before completing synchronization and
before removing the temporary slot. Once the slot advances to a suitable
position, we retry synchronization. Additionally, if a promotion occurs on
the standby during this wait, the process exits gracefully and the
temporary slot is removed.
---
 doc/src/sgml/func/func-admin.sgml               |   4 +-
 doc/src/sgml/logicaldecoding.sgml               |  40 +--
 src/backend/replication/logical/slotsync.c      | 334 ++++++++++++++++++------
 src/backend/utils/activity/wait_event_names.txt |   2 +-
 4 files changed, 262 insertions(+), 118 deletions(-)

diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 446fdfe..3608610 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1478,9 +1478,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         standby server. Temporary synced slots, if any, cannot be used for
         logical decoding and must be dropped after promotion. See
         <xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
-        Note that this function is primarily intended for testing and
-        debugging purposes and should be used with caution. Additionally,
-        this function cannot be executed if
+        Note that this function cannot be executed if
         <link linkend="guc-sync-replication-slots"><varname>
         sync_replication_slots</varname></link> is enabled and the slotsync
         worker is already running to perform the synchronization of slots.
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 77c720c..6e4251a 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -364,18 +364,23 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
    <sect2 id="logicaldecoding-replication-slots-synchronization">
     <title>Replication Slot Synchronization</title>
     <para>
-     The logical replication slots on the primary can be synchronized to
-     the hot standby by using the <literal>failover</literal> parameter of
+     The logical replication slots on the primary can be enabled for
+     synchronization to the hot standby by using the
+     <literal>failover</literal> parameter of
      <link linkend="pg-create-logical-replication-slot">
      <function>pg_create_logical_replication_slot</function></link>, or by
      using the <link linkend="sql-createsubscription-params-with-failover">
      <literal>failover</literal></link> option of
-     <command>CREATE SUBSCRIPTION</command> during slot creation.
-     Additionally, enabling <link linkend="guc-sync-replication-slots">
-     <varname>sync_replication_slots</varname></link> on the standby
-     is required. By enabling <link linkend="guc-sync-replication-slots">
-     <varname>sync_replication_slots</varname></link>
-     on the standby, the failover slots can be synchronized periodically in
+     <command>CREATE SUBSCRIPTION</command> during slot creation. After that,
+     synchronization can be be performed either manually by calling
+     <link linkend="pg-sync-replication-slots">
+     <function>pg_sync_replication_slots</function></link>
+     on the standby, or automatically by enabling
+     <link linkend="guc-sync-replication-slots">
+     <varname>sync_replication_slots</varname></link> on the standby.
+     When <link linkend="guc-sync-replication-slots">
+     <varname>sync_replication_slots</varname></link> is enabled
+     on the standby, the failover slots are periodically synchronized by
      the slotsync worker. For the synchronization to work, it is mandatory to
      have a physical replication slot between the primary and the standby (i.e.,
      <link linkend="guc-primary-slot-name"><varname>primary_slot_name</varname></link>
@@ -398,25 +403,6 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
      receiving the WAL up to the latest flushed position on the primary server.
     </para>
 
-    <note>
-     <para>
-      While enabling <link linkend="guc-sync-replication-slots">
-      <varname>sync_replication_slots</varname></link> allows for automatic
-      periodic synchronization of failover slots, they can also be manually
-      synchronized using the <link linkend="pg-sync-replication-slots">
-      <function>pg_sync_replication_slots</function></link> function on the standby.
-      However, this function is primarily intended for testing and debugging and
-      should be used with caution. Unlike automatic synchronization, it does not
-      include cyclic retries, making it more prone to synchronization failures,
-      particularly during initial sync scenarios where the required WAL files
-      or catalog rows for the slot might have already been removed or are at risk
-      of being removed on the standby. In contrast, automatic synchronization
-      via <varname>sync_replication_slots</varname> provides continuous slot
-      updates, enabling seamless failover and supporting high availability.
-      Therefore, it is the recommended method for synchronizing slots.
-     </para>
-    </note>
-
     <para>
      When slot synchronization is configured as recommended,
      and the initial synchronization is performed either automatically or
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 3773844..09833aa 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -99,6 +99,8 @@ typedef struct SlotSyncCtxStruct
 	bool		syncing;
 	time_t		last_start_time;
 	slock_t		mutex;
+	/* used by pg_sync_replication_slots() API only */
+	bool		slot_not_persisted;
 } SlotSyncCtxStruct;
 
 static SlotSyncCtxStruct *SlotSyncCtx = NULL;
@@ -113,6 +115,7 @@ bool		sync_replication_slots = false;
  */
 #define MIN_SLOTSYNC_WORKER_NAPTIME_MS  200
 #define MAX_SLOTSYNC_WORKER_NAPTIME_MS  30000	/* 30s */
+#define SLOTSYNC_API_NAPTIME_MS         2000	/* 2s */
 
 static long sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
 
@@ -146,6 +149,7 @@ typedef struct RemoteSlot
 	ReplicationSlotInvalidationCause invalidated;
 } RemoteSlot;
 
+static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn);
 static void slotsync_failure_callback(int code, Datum arg);
 static void update_synced_slots_inactive_since(void);
 
@@ -211,13 +215,13 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
 		 * impact the users, so we used DEBUG1 level to log the message.
 		 */
 		ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1,
-				errmsg("could not synchronize replication slot \"%s\"",
-					   remote_slot->name),
-				errdetail("Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.",
-						  LSN_FORMAT_ARGS(remote_slot->restart_lsn),
-						  remote_slot->catalog_xmin,
-						  LSN_FORMAT_ARGS(slot->data.restart_lsn),
-						  slot->data.catalog_xmin));
+			errmsg("Replication slot \"%s\" is not sync ready; will keep retrying",
+				   remote_slot->name),
+			errdetail("Attempting Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.",
+			LSN_FORMAT_ARGS(remote_slot->restart_lsn),
+			remote_slot->catalog_xmin,
+			LSN_FORMAT_ARGS(slot->data.restart_lsn),
+			slot->data.catalog_xmin));
 
 		if (remote_slot_precedes)
 			*remote_slot_precedes = true;
@@ -558,7 +562,8 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
  * false.
  */
 static bool
-update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+update_and_persist_local_synced_slot(WalReceiverConn * wrconn,
+	RemoteSlot * remote_slot, Oid remote_dbid)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
 	bool		found_consistent_snapshot = false;
@@ -575,13 +580,18 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 	if (remote_slot_precedes)
 	{
 		/*
-		 * The remote slot didn't catch up to locally reserved position.
+		 * The remote slot didn't catch up to locally reserved
+		 * position.
 		 *
-		 * We do not drop the slot because the restart_lsn can be ahead of the
-		 * current location when recreating the slot in the next cycle. It may
-		 * take more time to create such a slot. Therefore, we keep this slot
-		 * and attempt the synchronization in the next cycle.
+		 * We do not drop the slot because the restart_lsn can be
+		 * ahead of the current location when recreating the slot in
+		 * the next cycle. It may take more time to create such a
+		 * slot. Therefore, we keep this slot and attempt the
+		 * synchronization in the next cycle. Update flag, so that
+		 * API logic can retry.
 		 */
+		SlotSyncCtx->slot_not_persisted = true;
+
 		return false;
 	}
 
@@ -596,11 +606,17 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 				errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
 						  LSN_FORMAT_ARGS(slot->data.restart_lsn)));
 
+		/* update flag, so that we retry */
+		SlotSyncCtx->slot_not_persisted = true;
+
 		return false;
 	}
 
 	ReplicationSlotPersist();
 
+	/* slot has been persisted, no need to retry */
+	SlotSyncCtx->slot_not_persisted = false;
+
 	ereport(LOG,
 			errmsg("newly created replication slot \"%s\" is sync-ready now",
 				   remote_slot->name));
@@ -622,7 +638,8 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
  * Returns TRUE if the local slot is updated.
  */
 static bool
-synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+synchronize_one_slot(WalReceiverConn * wrconn, RemoteSlot * remote_slot,
+					Oid remote_dbid)
 {
 	ReplicationSlot *slot;
 	XLogRecPtr	latestFlushPtr;
@@ -715,7 +732,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		/* Slot not ready yet, let's attempt to make it sync-ready now. */
 		if (slot->data.persistency == RS_TEMPORARY)
 		{
-			slot_updated = update_and_persist_local_synced_slot(remote_slot,
+			slot_updated = update_and_persist_local_synced_slot(wrconn,
+																remote_slot,
 																remote_dbid);
 		}
 
@@ -785,7 +803,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		ReplicationSlotsComputeRequiredXmin(true);
 		LWLockRelease(ProcArrayLock);
 
-		update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+		update_and_persist_local_synced_slot(wrconn, remote_slot, remote_dbid);
 
 		slot_updated = true;
 	}
@@ -796,46 +814,87 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 }
 
 /*
- * Synchronize slots.
+ * Fetch or refresh remote slots.
  *
- * Gets the failover logical slots info from the primary server and updates
- * the slots locally. Creates the slots if not present on the standby.
+ * If remote_slot_list is NIL, fetches all failover logical slots from the
+ * primary server. If remote_slot_list is provided, refreshes only those
+ * specific slots with current values from the primary server.
  *
- * Returns TRUE if any of the slots gets updated in this sync-cycle.
+ * NOTE: Caller must ensure a transaction is active before calling this
+ * function.
+ *
+ * Parameters:
+ *   wrconn - Connection to the primary server
+ *   remote_slot_list - List of RemoteSlot structures to refresh, or NIL to
+ *                      fetch all failover slots
+ *
+ * Returns a list of RemoteSlot structures. If refreshing and the query fails,
+ * returns the original list. Slots that no longer exist on the primary will
+ * be removed from the list.
  */
-static bool
-synchronize_slots(WalReceiverConn *wrconn)
+static List *
+fetch_or_refresh_remote_slots(WalReceiverConn *wrconn, List *remote_slot_list)
 {
 #define SLOTSYNC_COLUMN_COUNT 10
 	Oid			slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
 	LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID};
-
 	WalRcvExecResult *res;
 	TupleTableSlot *tupslot;
-	List	   *remote_slot_list = NIL;
-	bool		some_slot_updated = false;
-	bool		started_tx = false;
-	const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
-		" restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
-		" database, invalidation_reason"
-		" FROM pg_catalog.pg_replication_slots"
-		" WHERE failover and NOT temporary";
-
-	/* The syscache access in walrcv_exec() needs a transaction env. */
-	if (!IsTransactionState())
+	List	   *updated_slot_list = NIL;
+	StringInfoData query;
+	ListCell   *lc;
+	bool		is_refresh = (remote_slot_list != NIL);
+	bool		first_slot = true;
+
+	/* Build the query based on whether we're fetching all or refreshing specific slots */
+	initStringInfo(&query);
+	appendStringInfoString(&query,
+						   "SELECT slot_name, plugin, confirmed_flush_lsn,"
+						   " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
+						   " database, invalidation_reason"
+						   " FROM pg_catalog.pg_replication_slots"
+						   " WHERE failover and NOT temporary");
+
+	if (is_refresh)
 	{
-		StartTransactionCommand();
-		started_tx = true;
+		/* Add IN clause for specific slot names */
+		appendStringInfoString(&query, " AND slot_name IN (");
+
+		foreach(lc, remote_slot_list)
+		{
+			RemoteSlot *remote_slot = (RemoteSlot *) lfirst(lc);
+
+			if (!first_slot)
+				appendStringInfoString(&query, ", ");
+
+			appendStringInfo(&query, "'%s'", remote_slot->name);
+			first_slot = false;
+		}
+		appendStringInfoString(&query, ")");
 	}
 
 	/* Execute the query */
-	res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow);
+	res = walrcv_exec(wrconn, query.data, SLOTSYNC_COLUMN_COUNT, slotRow);
 	if (res->status != WALRCV_OK_TUPLES)
-		ereport(ERROR,
-				errmsg("could not fetch failover logical slots info from the primary server: %s",
-					   res->err));
+	{
+		if (is_refresh)
+		{
+			ereport(WARNING,
+					errmsg("could not fetch updated failover logical slots info"
+						   " from the primary server: %s",
+						   res->err));
+			pfree(query.data);
+			return remote_slot_list; /* Return original list on refresh failure */
+		}
+		else
+		{
+			ereport(ERROR,
+					errmsg("could not fetch failover logical slots info from the primary server: %s",
+						   res->err));
+		}
+	}
 
-	/* Construct the remote_slot tuple and synchronize each slot locally */
+	/* Process the slot information */
 	tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 	while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
 	{
@@ -853,8 +912,8 @@ synchronize_slots(WalReceiverConn *wrconn)
 		Assert(!isnull);
 
 		/*
-		 * It is possible to get null values for LSN and Xmin if slot is
-		 * invalidated on the primary server, so handle accordingly.
+		 * Handle possible null values for LSN and Xmin if slot is
+		 * invalidated on the primary server.
 		 */
 		d = slot_getattr(tupslot, ++col, &isnull);
 		remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr :
@@ -890,15 +949,8 @@ synchronize_slots(WalReceiverConn *wrconn)
 		Assert(col == SLOTSYNC_COLUMN_COUNT);
 
 		/*
-		 * If restart_lsn, confirmed_lsn or catalog_xmin is invalid but the
-		 * slot is valid, that means we have fetched the remote_slot in its
-		 * RS_EPHEMERAL state. In such a case, don't sync it; we can always
-		 * sync it in the next sync cycle when the remote_slot is persisted
-		 * and has valid lsn(s) and xmin values.
-		 *
-		 * XXX: In future, if we plan to expose 'slot->data.persistency' in
-		 * pg_replication_slots view, then we can avoid fetching RS_EPHEMERAL
-		 * slots in the first place.
+		 * Apply ephemeral slot filtering. Skip slots that are in RS_EPHEMERAL
+		 * state (invalid LSNs/xmin but not explicitly invalidated).
 		 */
 		if ((XLogRecPtrIsInvalid(remote_slot->restart_lsn) ||
 			 XLogRecPtrIsInvalid(remote_slot->confirmed_lsn) ||
@@ -906,12 +958,42 @@ synchronize_slots(WalReceiverConn *wrconn)
 			remote_slot->invalidated == RS_INVAL_NONE)
 			pfree(remote_slot);
 		else
-			/* Create list of remote slots */
-			remote_slot_list = lappend(remote_slot_list, remote_slot);
+			/* Add to updated list */
+			updated_slot_list = lappend(updated_slot_list, remote_slot);
 
 		ExecClearTuple(tupslot);
 	}
 
+	walrcv_clear_result(res);
+	pfree(query.data);
+
+	/* If refreshing, free the original list structures */
+	if (is_refresh)
+	{
+		foreach(lc, remote_slot_list)
+		{
+			RemoteSlot *old_slot = (RemoteSlot *) lfirst(lc);
+			pfree(old_slot);
+		}
+		list_free(remote_slot_list);
+	}
+
+	return updated_slot_list;
+}
+
+/*
+ * Synchronize slots.
+ *
+ * Takes a list of remote slots and synchronizes them locally. Creates the
+ * slots if not present on the standby and updates existing ones.
+ *
+ * Returns TRUE if any of the slots gets updated in this sync-cycle.
+ */
+static bool
+synchronize_slots(WalReceiverConn *wrconn, List *remote_slot_list)
+{
+	bool		some_slot_updated = false;
+
 	/* Drop local slots that no longer need to be synced. */
 	drop_local_obsolete_slots(remote_slot_list);
 
@@ -927,19 +1009,12 @@ synchronize_slots(WalReceiverConn *wrconn)
 		 */
 		LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
 
-		some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
+		some_slot_updated |= synchronize_one_slot(wrconn, remote_slot,
+								remote_dbid);
 
 		UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
 	}
 
-	/* We are done, free remote_slot_list elements */
-	list_free_deep(remote_slot_list);
-
-	walrcv_clear_result(res);
-
-	if (started_tx)
-		CommitTransactionCommand();
-
 	return some_slot_updated;
 }
 
@@ -1131,7 +1206,7 @@ slotsync_reread_config(void)
 	bool		conninfo_changed;
 	bool		primary_slotname_changed;
 
-	Assert(sync_replication_slots);
+	Assert(!AmLogicalSlotSyncWorkerProcess() || sync_replication_slots);
 
 	ConfigReloadPending = false;
 	ProcessConfigFile(PGC_SIGHUP);
@@ -1252,31 +1327,38 @@ slotsync_worker_onexit(int code, Datum arg)
  * sync-cycles is reset to the minimum (200ms).
  */
 static void
-wait_for_slot_activity(bool some_slot_updated)
+wait_for_slot_activity(bool some_slot_updated, bool called_from_api)
 {
-	int			rc;
+	int		rc;
+	int		wait_time;
 
-	if (!some_slot_updated)
-	{
+	if (called_from_api) {
 		/*
-		 * No slots were updated, so double the sleep time, but not beyond the
-		 * maximum allowable value.
+		 * When called from pg_sync_replication_slots, use a fixed 2
+		 * second wait time.
 		 */
-		sleep_ms = Min(sleep_ms * 2, MAX_SLOTSYNC_WORKER_NAPTIME_MS);
-	}
-	else
-	{
-		/*
-		 * Some slots were updated since the last sleep, so reset the sleep
-		 * time.
-		 */
-		sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
+		wait_time = SLOTSYNC_API_NAPTIME_MS;
+	} else {
+		if (!some_slot_updated) {
+			/*
+			 * No slots were updated, so double the sleep time,
+			 * but not beyond the maximum allowable value.
+			 */
+			sleep_ms = Min(sleep_ms * 2, MAX_SLOTSYNC_WORKER_NAPTIME_MS);
+		} else {
+			/*
+			 * Some slots were updated since the last sleep, so
+			 * reset the sleep time.
+			 */
+			sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
+		}
+		wait_time = sleep_ms;
 	}
 
 	rc = WaitLatch(MyLatch,
 				   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
-				   sleep_ms,
-				   WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
+				   wait_time,
+				   WAIT_EVENT_REPLICATION_SLOTSYNC_PRIMARY_CATCHUP);
 
 	if (rc & WL_LATCH_SET)
 		ResetLatch(MyLatch);
@@ -1505,12 +1587,28 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
 	for (;;)
 	{
 		bool		some_slot_updated = false;
+		List	       *remote_slots;
+		bool		started_tx = false;
 
 		ProcessSlotSyncInterrupts(wrconn);
 
-		some_slot_updated = synchronize_slots(wrconn);
+		/*
+		 * The syscache access in fetch_or_refresh_remote_slots() needs a
+		 * transaction env.
+		 */
+		if (!IsTransactionState()) {
+			StartTransactionCommand();
+			started_tx = true;
+		}
+
+		remote_slots = fetch_or_refresh_remote_slots(wrconn, NIL);
+		some_slot_updated = synchronize_slots(wrconn, remote_slots);
+		list_free_deep(remote_slots);
 
-		wait_for_slot_activity(some_slot_updated);
+		if (started_tx)
+			CommitTransactionCommand();
+
+		wait_for_slot_activity(some_slot_updated, false);
 	}
 
 	/*
@@ -1736,19 +1834,81 @@ slotsync_failure_callback(int code, Datum arg)
 }
 
 /*
- * Synchronize the failover enabled replication slots using the specified
- * primary server connection.
+ * Synchronize failover enabled replication slots using the specified primary
+ * server connection.
+ *
+ * Repeatedly fetches and updates replication slot information from the
+ * primary until all slots are at least "sync ready". Retry is done after 2
+ * sec wait. Exits early is promotion is triggered.
  */
 void
-SyncReplicationSlots(WalReceiverConn *wrconn)
+SyncReplicationSlots(WalReceiverConn * wrconn)
 {
 	PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
 	{
+		List	       *remote_slots;
+		bool		started_tx = false;
+
 		check_and_set_sync_info(InvalidPid);
 
 		validate_remote_info(wrconn);
 
-		synchronize_slots(wrconn);
+		/*
+		 * The syscache access in fetch_or_refresh_remote_slots() needs a
+		 * transaction env.
+		 */
+		if (!IsTransactionState()) {
+			StartTransactionCommand();
+			started_tx = true;
+		}
+
+		remote_slots = fetch_or_refresh_remote_slots(wrconn, NIL);
+
+		/* Retry until all slots are sync ready atleast */
+		for (;;)
+		{
+			bool		some_slot_updated = false;
+
+			/*
+			 * Refresh the remote slot data. We keep using the original slot
+			 * list, even if some slots are already sync ready, so that all
+			 * slots are updated with the latest status from the primary.
+			 */
+			remote_slots = fetch_or_refresh_remote_slots(wrconn, remote_slots);
+
+			/* Attempt to synchronize slots */
+			some_slot_updated = synchronize_slots(wrconn, remote_slots);
+
+			/* Done if all slots are atleast sync ready */
+			if (!SlotSyncCtx->slot_not_persisted)
+				break;
+			else
+			{
+				/* wait for 2 seconds before retrying */
+				wait_for_slot_activity(some_slot_updated, true);
+
+				/*
+				 * If we've been promoted, then no point
+				 * continuing.
+				 */
+				if (SlotSyncCtx->stopSignaled)
+				{
+					ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("exiting from slot synchronization as"
+								" promotion is triggered")));
+					break;
+				}
+
+				/* Handle any termination request if any */
+				ProcessSlotSyncInterrupts(wrconn);
+			}
+		}
+
+		list_free_deep(remote_slots);
+
+		if (started_tx)
+			CommitTransactionCommand();
 
 		/* Cleanup the synced temporary slots */
 		ReplicationSlotCleanup(true);
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 0be307d..3497f0f 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -62,8 +62,8 @@ LOGICAL_APPLY_MAIN	"Waiting in main loop of logical replication apply process."
 LOGICAL_LAUNCHER_MAIN	"Waiting in main loop of logical replication launcher process."
 LOGICAL_PARALLEL_APPLY_MAIN	"Waiting in main loop of logical replication parallel apply process."
 RECOVERY_WAL_STREAM	"Waiting in main loop of startup process for WAL to arrive, during streaming recovery."
-REPLICATION_SLOTSYNC_MAIN	"Waiting in main loop of slot sync worker."
 REPLICATION_SLOTSYNC_SHUTDOWN	"Waiting for slot sync worker to shut down."
+REPLICATION_SLOTSYNC_PRIMARY_CATCHUP	"Waiting for the primary to catch-up."
 SYSLOGGER_MAIN	"Waiting in main loop of syslogger process."
 WAL_RECEIVER_MAIN	"Waiting in main loop of WAL receiver process."
 WAL_SENDER_MAIN	"Waiting in main loop of WAL sender process."
-- 
1.8.3.1

