On Mon, Oct 20, 2025, at 00:10, Joel Jacobson wrote:
> Attachments:
> * 0001-optimize_listen_notify-v20.patch
> * 0002-optimize_listen_notify-v20.patch
> * 0002-optimize_listen_notify-v20-alt1.txt
> * 0002-optimize_listen_notify-v20-alt3.txt
> * 0002-optimize_listen_notify-v20-alt2.txt
Attaching a new alt1 version, that fixes the mistake of using max(pos,
advisoryPos) for lag calculation, which is wrong, since in alt1 it's the
backend itself that updates its 'pos' when it wakes up, and it's 'pos'
that asyncQueueAdvanceTail looks at, in alt1.
/Joel
From 493f05130febbd8c4bc0bc2533e22ec6ddf6d5f5 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Sun, 19 Oct 2025 08:08:05 +0200
Subject: [PATCH] Implements idea #1: advisoryPos
---
src/backend/commands/async.c | 67 ++++++++++++++++++++++++++++++++----
1 file changed, 61 insertions(+), 6 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4e6556fb8d1..4a8a6f5bf1b 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -264,6 +264,11 @@ typedef struct QueuePosition
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
+/* returns true if x comes before y in queue order */
+#define QUEUE_POS_PRECEDES(x,y) \
+ (asyncQueuePagePrecedes((x).page, (y).page) || \
+ ((x).page == (y).page && (x).offset < (y).offset))
+
/*
* Parameter determining how often we try to advance the tail pointer:
* we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is
@@ -286,6 +291,7 @@ typedef struct QueueBackendStatus
Oid dboid; /* backend's database
OID, or InvalidOid */
ProcNumber nextListener; /* id of next listener, or
INVALID_PROC_NUMBER */
QueuePosition pos; /* backend has read queue up to
here */
+ QueuePosition advisoryPos; /* backend could skip queue to here */
bool wakeupPending; /* signal sent but not yet processed */
} QueueBackendStatus;
@@ -347,6 +353,7 @@ static dshash_table *channelHash = NULL;
#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
#define QUEUE_NEXT_LISTENER(i)
(asyncQueueControl->backend[i].nextListener)
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_ADVISORY_POS(i)
(asyncQueueControl->backend[i].advisoryPos)
#define QUEUE_BACKEND_WAKEUP_PENDING(i)
(asyncQueueControl->backend[i].wakeupPending)
/*
@@ -674,6 +681,7 @@ AsyncShmemInit(void)
QUEUE_BACKEND_DBOID(i) = InvalidOid;
QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+ SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(i), 0, 0);
QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
}
}
@@ -1312,6 +1320,7 @@ Exec_ListenPreCommit(void)
prevListener = i;
}
QUEUE_BACKEND_POS(MyProcNumber) = max;
+ QUEUE_BACKEND_ADVISORY_POS(MyProcNumber) = max;
QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
/* Insert backend into list of listeners at correct position */
@@ -2031,9 +2040,13 @@ SignalBackends(void)
* Even though we may take and release NotifyQueueLock multiple times
* while writing, the heavyweight lock guarantees this region contains
* only our messages. Therefore, any backend still positioned at the
- * queue head from before our write can be safely advanced to the
current
+ * queue head from before our write can be advised to skip to the
current
* queue head without waking it.
*
+ * We use the advisoryPos field rather than directly modifying pos.
+ * The backend controls its own pos field and will check advisoryPos
+ * when it's safe to do so.
+ *
* False-positive possibility: if a backend was previously signaled but
* hasn't yet awoken, we'll skip advancing it (because wakeupPending is
* true). This is safe - the backend will advance its pointer when it
@@ -2048,6 +2061,7 @@ SignalBackends(void)
i = QUEUE_NEXT_LISTENER(i))
{
QueuePosition pos;
+ QueuePosition advisoryPos;
int64 lag;
int32 pid;
@@ -2055,13 +2069,22 @@ SignalBackends(void)
continue;
pos = QUEUE_BACKEND_POS(i);
+ advisoryPos = QUEUE_BACKEND_ADVISORY_POS(i);
- /* Direct advancement for idle backends at the old head */
+ /*
+ * Direct advancement for idle backends at the old head.
+ *
+ * We check advisoryPos rather than pos to allow accumulating
advances
+ * from multiple consecutive notifying backends. If we checked
pos,
+ * only the first notifier could advance idle backends;
subsequent
+ * notifiers would find pos unchanged (since the backend hasn't
woken
+ * up yet) and fail to advance further.
+ */
if (pendingNotifies != NULL &&
- QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite))
+ QUEUE_POS_EQUAL(advisoryPos, queueHeadBeforeWrite))
{
- QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
- pos = queueHeadAfterWrite;
+ QUEUE_BACKEND_ADVISORY_POS(i) = queueHeadAfterWrite;
+ advisoryPos = queueHeadAfterWrite;
}
/* Signal backends that have fallen too far behind */
@@ -2302,6 +2325,7 @@ static void
asyncQueueReadAllNotifications(void)
{
volatile QueuePosition pos;
+ QueuePosition advisoryPos;
QueuePosition head;
Snapshot snapshot;
@@ -2319,11 +2343,35 @@ asyncQueueReadAllNotifications(void)
QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
pos = QUEUE_BACKEND_POS(MyProcNumber);
head = QUEUE_HEAD;
+
+ /*
+ * Check if another backend has set an advisory position for us.
+ * If so, and if we haven't yet read past that point, we can safely
+ * adopt the advisory position and skip the intervening notifications.
+ */
+ advisoryPos = QUEUE_BACKEND_ADVISORY_POS(MyProcNumber);
+
+ if (!QUEUE_POS_EQUAL(advisoryPos, pos) &&
+ QUEUE_POS_PRECEDES(pos, advisoryPos))
+ {
+ pos = advisoryPos;
+ QUEUE_BACKEND_POS(MyProcNumber) = pos;
+ }
+
LWLockRelease(NotifyQueueLock);
if (QUEUE_POS_EQUAL(pos, head))
{
- /* Nothing to do, we have read all notifications already. */
+ /*
+ * Nothing to do, we have read all notifications already.
+ * Before returning, update advisoryPos if it has fallen behind
our
+ * current position, since we're bypassing the PG_FINALLY block
that
+ * would normally do this.
+ */
+ LWLockAcquire(NotifyQueueLock, LW_SHARED);
+ QUEUE_BACKEND_ADVISORY_POS(MyProcNumber) =
+ QUEUE_POS_MAX(pos,
QUEUE_BACKEND_ADVISORY_POS(MyProcNumber));
+ LWLockRelease(NotifyQueueLock);
return;
}
@@ -2440,6 +2488,13 @@ asyncQueueReadAllNotifications(void)
/* Update shared state */
LWLockAcquire(NotifyQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyProcNumber) = pos;
+ /*
+ * Advance advisoryPos to our current position if it has fallen
behind,
+ * but preserve any newer advisory position that may have been
set by
+ * another backend while we were processing notifications.
+ */
+ QUEUE_BACKEND_ADVISORY_POS(MyProcNumber) =
+ QUEUE_POS_MAX(pos,
QUEUE_BACKEND_ADVISORY_POS(MyProcNumber));
LWLockRelease(NotifyQueueLock);
}
PG_END_TRY();
--
2.50.1