From 7ebce29cea14c878c46baa15029bc382634dc986 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Wed, 30 Jul 2025 05:04:44 -0400
Subject: [PATCH v7] Fix a deadlock during ALTER SUBSCRIPTION... DROP
 PUBLICATION

When user drops a publication from a subscription, this will result in a
publication refresh on the subscriber which will try and drop any pending origins.
Meanwhile the apply worker could also be trying to cleanup origins. There could be
a deadlock if the order of locking of SubscriptionRelationId, SubscriptionRelRelationId and
ReplicationOriginRelationId are not aligned between functions
process_syncing_tables_for_apply() and AlterSubscription_refresh().

The fix is to get locks on SubscriptionRelationId, SubscriptionRelRelationId and
ReplicationOriginRelationId in that order when dropping tracking origins.
---
 src/backend/catalog/pg_subscription.c       | 37 +++++++++++++++++++++++++----
 src/backend/replication/logical/tablesync.c | 27 ++++++++++++++++++---
 src/include/catalog/pg_subscription_rel.h   |  2 ++
 3 files changed, 58 insertions(+), 8 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index d07f88c..d054327 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -273,8 +273,8 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
  * Update the state of a subscription table.
  */
 void
-UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
-						   XLogRecPtr sublsn)
+UpdateSubscriptionRelStateEx(Oid subid, Oid relid, char state,
+							 XLogRecPtr sublsn, bool already_locked)
 {
 	Relation	rel;
 	HeapTuple	tup;
@@ -282,9 +282,26 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 	Datum		values[Natts_pg_subscription_rel];
 	bool		replaces[Natts_pg_subscription_rel];
 
-	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
-
-	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+	if (already_locked)
+	{
+#ifdef USE_ASSERT_CHECKING
+		LOCKTAG     tag;
+#endif
+
+		Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
+										  RowExclusiveLock, true));
+
+		rel = table_open(SubscriptionRelRelationId, NoLock);
+#ifdef USE_ASSERT_CHECKING
+		SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
+		Assert(LockHeldByMe(&tag, AccessShareLock));
+#endif
+	}
+	else
+	{
+		LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+		rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+	}
 
 	/* Try finding existing mapping. */
 	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
@@ -319,6 +336,16 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 }
 
 /*
+ * Update the state of a subscription table.
+ */
+void
+UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+						   XLogRecPtr sublsn)
+{
+	UpdateSubscriptionRelStateEx(subid, relid, state, sublsn, false);
+}
+
+/*
  * Get state of subscription table.
  *
  * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index ca88133..3fa921a 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -425,6 +425,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	ListCell   *lc;
 	bool		started_tx = false;
 	bool		should_exit = false;
+	Relation    rel = NULL;
 
 	Assert(!IsTransactionState());
 
@@ -492,7 +493,16 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 * worker to remove the origin tracking as if there is any
 				 * error while dropping we won't restart it to drop the
 				 * origin. So passing missing_ok = true.
+				 *
+				 * Lock the subscription and origin in the same order as we
+				 * are doing during DDL commands to avoid deadlocks. See
+				 * AlterSubscription_refresh.
 				 */
+				LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
+								 0, AccessShareLock);
+				if (!rel)
+					rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+
 				ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
 												   rstate->relid,
 												   originname,
@@ -502,9 +512,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				/*
 				 * Update the state to READY only after the origin cleanup.
 				 */
-				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-										   rstate->relid, rstate->state,
-										   rstate->lsn);
+				UpdateSubscriptionRelStateEx(MyLogicalRepWorker->subid,
+											 rstate->relid, rstate->state,
+											 rstate->lsn, true);
 			}
 		}
 		else
@@ -555,7 +565,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 						 * This is required to avoid any undetected deadlocks
 						 * due to any existing lock as deadlock detector won't
 						 * be able to detect the waits on the latch.
+						 *
+						 * Also close any tables prior to the commit.
 						 */
+						if (rel)
+						{
+							table_close(rel, NoLock);
+							rel = NULL;
+						}
 						CommitTransactionCommand();
 						pgstat_report_stat(false);
 					}
@@ -621,6 +638,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		}
 	}
 
+	/* Close table if opened */
+	if (rel)
+		table_close(rel, NoLock);
+
 	if (started_tx)
 	{
 		/*
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 60a2bcc..7817a30 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -84,6 +84,8 @@ extern void AddSubscriptionRelState(Oid subid, Oid relid, char state,
 									XLogRecPtr sublsn);
 extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 									   XLogRecPtr sublsn);
+extern void UpdateSubscriptionRelStateEx(Oid subid, Oid relid, char state,
+									   XLogRecPtr sublsn, bool already_locked);
 extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 
-- 
1.8.3.1

