On Fri, 2026-03-13 at 16:48 -0400, Andres Freund wrote:
> I suspect the more scalable approach would be to create a dedicated
> memory
> context for each GetSubscription() call that's then torn down during
> invalidation.
Attached. This is posted as an alternative to the previous patch.
Regards,
Jeff Davis
From 98705f06a8dfb757c5db69f3b4f9ea2d73f21deb Mon Sep 17 00:00:00 2001
From: Jeff Davis <[email protected]>
Date: Fri, 13 Mar 2026 13:24:18 -0700
Subject: [PATCH v22] Temp context for maybe_reread_subscription().
Move temp context from ForeignServerConnectionString() to
maybe_reread_subscription(), so that it prevents more
invalidation-related leaks.
Suggested-by: Andres Freund <[email protected]>
Discussion: https://postgr.es/m/xvdjrdqnpap3uq7owbaox3r7p5gf7sv62aaqf2ju3vb6yglatr%40kvvwhoudrlxq
---
src/backend/catalog/pg_subscription.c | 14 -----
src/backend/foreign/foreign.c | 67 +++++++-----------------
src/backend/replication/logical/worker.c | 30 +++++++++--
src/include/catalog/pg_subscription.h | 1 -
4 files changed, 43 insertions(+), 69 deletions(-)
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 3673d4f0bc1..ca053c152cf 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -216,20 +216,6 @@ CountDBSubscriptions(Oid dbid)
return nsubs;
}
-/*
- * Free memory allocated by subscription struct.
- */
-void
-FreeSubscription(Subscription *sub)
-{
- pfree(sub->name);
- pfree(sub->conninfo);
- if (sub->slotname)
- pfree(sub->slotname);
- list_free_deep(sub->publications);
- pfree(sub);
-}
-
/*
* Disable the given subscription.
*/
diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c
index cf10a8c00f9..f437b447282 100644
--- a/src/backend/foreign/foreign.c
+++ b/src/backend/foreign/foreign.c
@@ -219,63 +219,32 @@ GetForeignServerByName(const char *srvname, bool missing_ok)
/*
* Retrieve connection string from server's FDW.
+ *
+ * NB: leaks into CurrentMemoryContext.
*/
char *
ForeignServerConnectionString(Oid userid, Oid serverid)
{
- MemoryContext tempContext;
- MemoryContext oldcxt;
- char *result = NULL;
-
- /*
- * GetForeignServer, GetForeignDataWrapper, and the connection function
- * itself all leak memory into CurrentMemoryContext. Switch to a temporary
- * context for easy cleanup.
- */
- tempContext = AllocSetContextCreate(CurrentMemoryContext,
- "FDWConnectionContext",
- ALLOCSET_SMALL_SIZES);
-
- oldcxt = MemoryContextSwitchTo(tempContext);
-
- PG_TRY();
- {
- ForeignServer *server;
- ForeignDataWrapper *fdw;
- text *connection_text = NULL;
- Datum connection_datum;
-
- server = GetForeignServer(serverid);
- fdw = GetForeignDataWrapper(server->fdwid);
-
- if (!OidIsValid(fdw->fdwconnection))
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("foreign data wrapper \"%s\" does not support subscription connections",
- fdw->fdwname),
- errdetail("Foreign data wrapper must be defined with CONNECTION specified.")));
-
-
- connection_datum = OidFunctionCall3(fdw->fdwconnection,
- ObjectIdGetDatum(userid),
- ObjectIdGetDatum(serverid),
- PointerGetDatum(NULL));
+ ForeignServer *server;
+ ForeignDataWrapper *fdw;
+ Datum connection_datum;
- connection_text = DatumGetTextPP(connection_datum);
+ server = GetForeignServer(serverid);
+ fdw = GetForeignDataWrapper(server->fdwid);
- MemoryContextSwitchTo(oldcxt);
- result = text_to_cstring((text *) connection_text);
- }
- PG_FINALLY();
- {
- /* no-op on success path */
- MemoryContextSwitchTo(oldcxt);
+ if (!OidIsValid(fdw->fdwconnection))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("foreign data wrapper \"%s\" does not support subscription connections",
+ fdw->fdwname),
+ errdetail("Foreign data wrapper must be defined with CONNECTION specified.")));
- MemoryContextDelete(tempContext);
- }
- PG_END_TRY();
+ connection_datum = OidFunctionCall3(fdw->fdwconnection,
+ ObjectIdGetDatum(userid),
+ ObjectIdGetDatum(serverid),
+ PointerGetDatum(NULL));
- return result;
+ return text_to_cstring(DatumGetTextPP(connection_datum));
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 033858752d9..e00da48ae39 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -479,6 +479,7 @@ static MemoryContext LogicalStreamingContext = NULL;
WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
Subscription *MySubscription = NULL;
+MemoryContext MySubscriptionCtx = NULL;
static bool MySubscriptionValid = false;
static List *on_commit_wakeup_workers_subids = NIL;
@@ -5042,6 +5043,7 @@ void
maybe_reread_subscription(void)
{
MemoryContext oldctx;
+ MemoryContext newctx;
Subscription *newsub;
bool started_tx = false;
@@ -5056,8 +5058,15 @@ maybe_reread_subscription(void)
started_tx = true;
}
- /* Ensure allocations in permanent context. */
- oldctx = MemoryContextSwitchTo(ApplyContext);
+ newctx = AllocSetContextCreate(ApplyContext,
+ "Subscription Context",
+ ALLOCSET_SMALL_SIZES);
+
+ /*
+ * GetSubscription() leaks a number of small allocations, so use a
+ * subcontext for each call.
+ */
+ oldctx = MemoryContextSwitchTo(newctx);
newsub = GetSubscription(MyLogicalRepWorker->subid, true, true);
@@ -5149,7 +5158,8 @@ maybe_reread_subscription(void)
}
/* Clean old subscription info and switch to new one. */
- FreeSubscription(MySubscription);
+ MemoryContextDelete(MySubscriptionCtx);
+ MySubscriptionCtx = newctx;
MySubscription = newsub;
MemoryContextSwitchTo(oldctx);
@@ -5794,12 +5804,19 @@ InitializeLogRepWorker(void)
*/
SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
- /* Load the subscription into persistent memory context. */
ApplyContext = AllocSetContextCreate(TopMemoryContext,
"ApplyContext",
ALLOCSET_DEFAULT_SIZES);
+
+ /*
+ * GetSubscription() leaks a number of small allocations, so use a
+ * subcontext for each call.
+ */
+ MySubscriptionCtx = AllocSetContextCreate(ApplyContext,
+ "Subscription Context",
+ ALLOCSET_SMALL_SIZES);
+
StartTransactionCommand();
- oldctx = MemoryContextSwitchTo(ApplyContext);
/*
* Lock the subscription to prevent it from being concurrently dropped,
@@ -5808,7 +5825,10 @@ InitializeLogRepWorker(void)
*/
LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
AccessShareLock);
+
+ oldctx = MemoryContextSwitchTo(MySubscriptionCtx);
MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true);
+
if (!MySubscription)
{
ereport(LOG,
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 0058d9387d7..2f6f7b57698 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -212,7 +212,6 @@ typedef struct Subscription
extern Subscription *GetSubscription(Oid subid, bool missing_ok,
bool aclcheck);
-extern void FreeSubscription(Subscription *sub);
extern void DisableSubscription(Oid subid);
extern int CountDBSubscriptions(Oid dbid);
--
2.43.0