On Fri, 2026-03-13 at 16:48 -0400, Andres Freund wrote:
> I suspect the more scalable approach would be to create a dedicated
> memory
> context for each GetSubscription() call that's then torn down during
> invalidation.

Attached. This is posted as an alternative to the previous patch.

Regards,
        Jeff Davis

From 98705f06a8dfb757c5db69f3b4f9ea2d73f21deb Mon Sep 17 00:00:00 2001
From: Jeff Davis <[email protected]>
Date: Fri, 13 Mar 2026 13:24:18 -0700
Subject: [PATCH v22] Temp context for maybe_reread_subscription().

Move temp context from ForeignServerConnectionString() to
maybe_reread_subscription(), so that it prevents more
invalidation-related leaks.

Suggested-by: Andres Freund <[email protected]>
Discussion: https://postgr.es/m/xvdjrdqnpap3uq7owbaox3r7p5gf7sv62aaqf2ju3vb6yglatr%40kvvwhoudrlxq
---
 src/backend/catalog/pg_subscription.c    | 14 -----
 src/backend/foreign/foreign.c            | 67 +++++++-----------------
 src/backend/replication/logical/worker.c | 30 +++++++++--
 src/include/catalog/pg_subscription.h    |  1 -
 4 files changed, 43 insertions(+), 69 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 3673d4f0bc1..ca053c152cf 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -216,20 +216,6 @@ CountDBSubscriptions(Oid dbid)
 	return nsubs;
 }
 
-/*
- * Free memory allocated by subscription struct.
- */
-void
-FreeSubscription(Subscription *sub)
-{
-	pfree(sub->name);
-	pfree(sub->conninfo);
-	if (sub->slotname)
-		pfree(sub->slotname);
-	list_free_deep(sub->publications);
-	pfree(sub);
-}
-
 /*
  * Disable the given subscription.
  */
diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c
index cf10a8c00f9..f437b447282 100644
--- a/src/backend/foreign/foreign.c
+++ b/src/backend/foreign/foreign.c
@@ -219,63 +219,32 @@ GetForeignServerByName(const char *srvname, bool missing_ok)
 
 /*
  * Retrieve connection string from server's FDW.
+ *
+ * NB: leaks into CurrentMemoryContext.
  */
 char *
 ForeignServerConnectionString(Oid userid, Oid serverid)
 {
-	MemoryContext tempContext;
-	MemoryContext oldcxt;
-	char	   *result = NULL;
-
-	/*
-	 * GetForeignServer, GetForeignDataWrapper, and the connection function
-	 * itself all leak memory into CurrentMemoryContext. Switch to a temporary
-	 * context for easy cleanup.
-	 */
-	tempContext = AllocSetContextCreate(CurrentMemoryContext,
-										"FDWConnectionContext",
-										ALLOCSET_SMALL_SIZES);
-
-	oldcxt = MemoryContextSwitchTo(tempContext);
-
-	PG_TRY();
-	{
-		ForeignServer *server;
-		ForeignDataWrapper *fdw;
-		text	   *connection_text = NULL;
-		Datum		connection_datum;
-
-		server = GetForeignServer(serverid);
-		fdw = GetForeignDataWrapper(server->fdwid);
-
-		if (!OidIsValid(fdw->fdwconnection))
-			ereport(ERROR,
-					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-					 errmsg("foreign data wrapper \"%s\" does not support subscription connections",
-							fdw->fdwname),
-					 errdetail("Foreign data wrapper must be defined with CONNECTION specified.")));
-
-
-		connection_datum = OidFunctionCall3(fdw->fdwconnection,
-											ObjectIdGetDatum(userid),
-											ObjectIdGetDatum(serverid),
-											PointerGetDatum(NULL));
+	ForeignServer *server;
+	ForeignDataWrapper *fdw;
+	Datum		connection_datum;
 
-		connection_text = DatumGetTextPP(connection_datum);
+	server = GetForeignServer(serverid);
+	fdw = GetForeignDataWrapper(server->fdwid);
 
-		MemoryContextSwitchTo(oldcxt);
-		result = text_to_cstring((text *) connection_text);
-	}
-	PG_FINALLY();
-	{
-		/* no-op on success path */
-		MemoryContextSwitchTo(oldcxt);
+	if (!OidIsValid(fdw->fdwconnection))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("foreign data wrapper \"%s\" does not support subscription connections",
+						fdw->fdwname),
+				 errdetail("Foreign data wrapper must be defined with CONNECTION specified.")));
 
-		MemoryContextDelete(tempContext);
-	}
-	PG_END_TRY();
+	connection_datum = OidFunctionCall3(fdw->fdwconnection,
+										ObjectIdGetDatum(userid),
+										ObjectIdGetDatum(serverid),
+										PointerGetDatum(NULL));
 
-	return result;
+	return text_to_cstring(DatumGetTextPP(connection_datum));
 }
 
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 033858752d9..e00da48ae39 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -479,6 +479,7 @@ static MemoryContext LogicalStreamingContext = NULL;
 WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
 
 Subscription *MySubscription = NULL;
+MemoryContext MySubscriptionCtx = NULL;
 static bool MySubscriptionValid = false;
 
 static List *on_commit_wakeup_workers_subids = NIL;
@@ -5042,6 +5043,7 @@ void
 maybe_reread_subscription(void)
 {
 	MemoryContext oldctx;
+	MemoryContext newctx;
 	Subscription *newsub;
 	bool		started_tx = false;
 
@@ -5056,8 +5058,15 @@ maybe_reread_subscription(void)
 		started_tx = true;
 	}
 
-	/* Ensure allocations in permanent context. */
-	oldctx = MemoryContextSwitchTo(ApplyContext);
+	newctx = AllocSetContextCreate(ApplyContext,
+								   "Subscription Context",
+								   ALLOCSET_SMALL_SIZES);
+
+	/*
+	 * GetSubscription() leaks a number of small allocations, so use a
+	 * subcontext for each call.
+	 */
+	oldctx = MemoryContextSwitchTo(newctx);
 
 	newsub = GetSubscription(MyLogicalRepWorker->subid, true, true);
 
@@ -5149,7 +5158,8 @@ maybe_reread_subscription(void)
 	}
 
 	/* Clean old subscription info and switch to new one. */
-	FreeSubscription(MySubscription);
+	MemoryContextDelete(MySubscriptionCtx);
+	MySubscriptionCtx = newctx;
 	MySubscription = newsub;
 
 	MemoryContextSwitchTo(oldctx);
@@ -5794,12 +5804,19 @@ InitializeLogRepWorker(void)
 	 */
 	SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
 
-	/* Load the subscription into persistent memory context. */
 	ApplyContext = AllocSetContextCreate(TopMemoryContext,
 										 "ApplyContext",
 										 ALLOCSET_DEFAULT_SIZES);
+
+	/*
+	 * GetSubscription() leaks a number of small allocations, so use a
+	 * subcontext for each call.
+	 */
+	MySubscriptionCtx = AllocSetContextCreate(ApplyContext,
+											  "Subscription Context",
+											  ALLOCSET_SMALL_SIZES);
+
 	StartTransactionCommand();
-	oldctx = MemoryContextSwitchTo(ApplyContext);
 
 	/*
 	 * Lock the subscription to prevent it from being concurrently dropped,
@@ -5808,7 +5825,10 @@ InitializeLogRepWorker(void)
 	 */
 	LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
 					 AccessShareLock);
+
+	oldctx = MemoryContextSwitchTo(MySubscriptionCtx);
 	MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true);
+
 	if (!MySubscription)
 	{
 		ereport(LOG,
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 0058d9387d7..2f6f7b57698 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -212,7 +212,6 @@ typedef struct Subscription
 
 extern Subscription *GetSubscription(Oid subid, bool missing_ok,
 									 bool aclcheck);
-extern void FreeSubscription(Subscription *sub);
 extern void DisableSubscription(Oid subid);
 
 extern int	CountDBSubscriptions(Oid dbid);
-- 
2.43.0

Reply via email to