On 2026-Mar-18, Álvaro Herrera wrote:
> On 2026-03-17, Jeff Davis wrote:
>
> > Then we have to invent a deep copy for the Subscription, and we've
> > already seen that the FreeSubscrpition() method was not being
> > maintained properly.
>
> Maybe another possibility would be to use a separate memory context
> for each subscription, initially making it a child of the transaction
> context, and then reparenting it as appropriate.
I mean something like this on top of your 0003.
--
Álvaro Herrera PostgreSQL Developer — https://www.EnterpriseDB.com/
"I love the Postgres community. It's all about doing things _properly_. :-)"
(David Garamond)
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index ca053c152cf..e52d7c08fcf 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -31,6 +31,7 @@
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
+#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/syscache.h"
@@ -79,6 +80,12 @@ GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
Form_pg_subscription subform;
Datum datum;
bool isnull;
+ MemoryContext cxt;
+ MemoryContext oldcxt;
+
+ cxt = AllocSetContextCreate(CurrentMemoryContext, "subscription",
+ ALLOCSET_SMALL_SIZES);
+ oldcxt = MemoryContextSwitchTo(cxt);
tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
@@ -93,6 +100,7 @@ GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
subform = (Form_pg_subscription) GETSTRUCT(tup);
sub = palloc_object(Subscription);
+ sub->cxt = cxt;
sub->oid = subid;
sub->dbid = subform->subdbid;
sub->skiplsn = subform->subskiplsn;
@@ -180,6 +188,8 @@ GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
ReleaseSysCache(tup);
+ MemoryContextSwitchTo(oldcxt);
+
return sub;
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index a8256a54a97..a2278d131ae 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -481,7 +481,6 @@ static MemoryContext LogicalStreamingContext = NULL;
WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
Subscription *MySubscription = NULL;
-static MemoryContext MySubscriptionCtx = NULL;
static bool MySubscriptionValid = false;
static List *on_commit_wakeup_workers_subids = NIL;
@@ -5044,8 +5043,6 @@ apply_worker_exit(void)
void
maybe_reread_subscription(void)
{
- MemoryContext oldctx;
- MemoryContext newctx;
Subscription *newsub;
bool started_tx = false;
@@ -5060,17 +5057,9 @@ maybe_reread_subscription(void)
started_tx = true;
}
- newctx = AllocSetContextCreate(ApplyContext,
- "Subscription Context",
- ALLOCSET_SMALL_SIZES);
-
- /*
- * GetSubscription() leaks a number of small allocations, so use a
- * subcontext for each call.
- */
- oldctx = MemoryContextSwitchTo(newctx);
-
newsub = GetSubscription(MyLogicalRepWorker->subid, true, true);
+ if (newsub)
+ MemoryContextSetParent(newsub->cxt, ApplyContext);
/*
* Exit if the subscription was removed. This normally should not happen
@@ -5160,12 +5149,9 @@ maybe_reread_subscription(void)
}
/* Clean old subscription info and switch to new one. */
- MemoryContextDelete(MySubscriptionCtx);
- MySubscriptionCtx = newctx;
+ MemoryContextDelete(MySubscription->cxt);
MySubscription = newsub;
- MemoryContextSwitchTo(oldctx);
-
/* Change synchronous commit according to the user's wishes */
SetConfigOption("synchronous_commit", MySubscription->synccommit,
PGC_BACKEND, PGC_S_OVERRIDE);
@@ -5789,8 +5775,6 @@ run_apply_worker(void)
void
InitializeLogRepWorker(void)
{
- MemoryContext oldctx;
-
/* Run as replica session replication role. */
SetConfigOption("session_replication_role", "replica",
PGC_SUSET, PGC_S_OVERRIDE);
@@ -5810,14 +5794,6 @@ InitializeLogRepWorker(void)
"ApplyContext",
ALLOCSET_DEFAULT_SIZES);
- /*
- * GetSubscription() leaks a number of small allocations, so use a
- * subcontext for each call.
- */
- MySubscriptionCtx = AllocSetContextCreate(ApplyContext,
- "Subscription Context",
- ALLOCSET_SMALL_SIZES);
-
StartTransactionCommand();
/*
@@ -5828,8 +5804,9 @@ InitializeLogRepWorker(void)
LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
AccessShareLock);
- oldctx = MemoryContextSwitchTo(MySubscriptionCtx);
MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true);
+ if (MySubscription)
+ MemoryContextSetParent(MySubscription->cxt, ApplyContext);
if (!MySubscription)
{
@@ -5845,7 +5822,6 @@ InitializeLogRepWorker(void)
}
MySubscriptionValid = true;
- MemoryContextSwitchTo(oldctx);
if (!MySubscription->enabled)
{
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 2f6f7b57698..a6a2ad1e49c 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -130,6 +130,8 @@ MAKE_SYSCACHE(SUBSCRIPTIONNAME, pg_subscription_subname_index, 4);
typedef struct Subscription
{
+ MemoryContext cxt; /* mem cxt containing this subscription */
+
Oid oid; /* Oid of the subscription */
Oid dbid; /* Oid of the database which subscription is
* in */