From bc638e2220d5b82ea3b289f646617100a44adab6 Mon Sep 17 00:00:00 2001
From: rbagga <bangalorian@gmail.com>
Date: Thu, 28 Aug 2025 16:09:10 -0700
Subject: [PATCH] Implement WAL-based async notifications for improved
 throughput

- Added WAL logging for async notifications to improve scalability
- Implemented async resource manager for WAL-based notification handling
- Added new async descriptor files for pg_waldump support
- Updated makefiles and build configuration for new components
---
 src/backend/access/rmgrdesc/Makefile    |   1 +
 src/backend/access/rmgrdesc/asyncdesc.c |  58 +++
 src/backend/access/rmgrdesc/meson.build |   1 +
 src/backend/access/transam/rmgr.c       |   1 +
 src/backend/commands/async.c            | 568 ++++++++++++++++++------
 src/bin/pg_rewind/parsexlog.c           |   1 +
 src/bin/pg_waldump/asyncdesc.c          |   1 +
 src/bin/pg_waldump/rmgrdesc.c           |   1 +
 src/include/access/async_xlog.h         |  56 +++
 src/include/access/rmgrlist.h           |   1 +
 src/include/commands/async.h            |  19 +
 src/include/storage/proc.h              |   3 +
 12 files changed, 587 insertions(+), 124 deletions(-)
 create mode 100644 src/backend/access/rmgrdesc/asyncdesc.c
 create mode 120000 src/bin/pg_waldump/asyncdesc.c
 create mode 100644 src/include/access/async_xlog.h

diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index cd95eec37f1..6e6e75b12bd 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -9,6 +9,7 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = \
+	asyncdesc.o \
 	brindesc.o \
 	clogdesc.o \
 	committsdesc.o \
diff --git a/src/backend/access/rmgrdesc/asyncdesc.c b/src/backend/access/rmgrdesc/asyncdesc.c
new file mode 100644
index 00000000000..b110457431f
--- /dev/null
+++ b/src/backend/access/rmgrdesc/asyncdesc.c
@@ -0,0 +1,58 @@
+/*-------------------------------------------------------------------------
+ *
+ * asyncdesc.c
+ *	  rmgr descriptor routines for access/transam/async.c
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/rmgrdesc/asyncdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/async_xlog.h"
+
+void
+async_desc(StringInfo buf, XLogReaderState *record)
+{
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info == XLOG_ASYNC_NOTIFY_DATA)
+	{
+		xl_async_notify_data *xlrec = (xl_async_notify_data *) rec;
+
+		appendStringInfo(buf, "notify data: db %u xid %u pid %d notifications %u",
+						 xlrec->dbid, xlrec->xid, xlrec->srcPid, xlrec->nnotifications);
+	}
+	else if (info == XLOG_ASYNC_NOTIFY_COMMIT)
+	{
+		xl_async_notify_commit *xlrec = (xl_async_notify_commit *) rec;
+
+		appendStringInfo(buf, "notify commit: db %u xid %u notify_lsn %X/%X",
+						 xlrec->dbid, xlrec->xid,
+						 LSN_FORMAT_ARGS(xlrec->notify_lsn));
+	}
+}
+
+const char *
+async_identify(uint8 info)
+{
+	const char *id = NULL;
+
+	switch (info & ~XLR_INFO_MASK)
+	{
+		case XLOG_ASYNC_NOTIFY_DATA:
+			id = "NOTIFY_DATA";
+			break;
+		case XLOG_ASYNC_NOTIFY_COMMIT:
+			id = "NOTIFY_COMMIT";
+			break;
+	}
+
+	return id;
+}
\ No newline at end of file
diff --git a/src/backend/access/rmgrdesc/meson.build b/src/backend/access/rmgrdesc/meson.build
index 96c98e800c2..38bef2e87f6 100644
--- a/src/backend/access/rmgrdesc/meson.build
+++ b/src/backend/access/rmgrdesc/meson.build
@@ -2,6 +2,7 @@
 
 # used by frontend programs like pg_waldump
 rmgr_desc_sources = files(
+  'asyncdesc.c',
   'brindesc.c',
   'clogdesc.c',
   'committsdesc.c',
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 1b7499726eb..f8c25e6597a 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -19,6 +19,7 @@
 
 /* includes needed for "access/rmgrlist.h" */
 /* IWYU pragma: begin_keep */
+#include "access/async_xlog.h"
 #include "access/brin_xlog.h"
 #include "access/clog.h"
 #include "access/commit_ts.h"
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..8520dbe8920 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -129,10 +129,16 @@
 #include <unistd.h>
 #include <signal.h>
 
+#include "access/async_xlog.h"
 #include "access/parallel.h"
 #include "access/slru.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xloginsert.h"
+#include "access/xlogreader.h"
+#include "access/xlogutils.h"
+#include "access/xlogrecovery.h"
 #include "catalog/pg_database.h"
 #include "commands/async.h"
 #include "common/hashfn.h"
@@ -151,6 +157,16 @@
 #include "utils/snapmgr.h"
 #include "utils/timestamp.h"
 
+/* Missing definitions for WAL-based notification system */
+#define AsyncQueueEntryEmptySize ASYNC_QUEUE_ENTRY_SIZE
+#define SLRU_PAGE_SIZE BLCKSZ
+#define AsyncCtl NotifyCtl
+
+/* WAL record types */
+#define XLOG_ASYNC_NOTIFY_DATA	0x00
+#define XLOG_ASYNC_NOTIFY_COMMIT	0x10
+
+
 
 /*
  * Maximum size of a NOTIFY payload, including terminating NULL.  This
@@ -163,30 +179,13 @@
 #define NOTIFY_PAYLOAD_MAX_LENGTH	(BLCKSZ - NAMEDATALEN - 128)
 
 /*
- * Struct representing an entry in the global notify queue
- *
- * This struct declaration has the maximal length, but in a real queue entry
- * the data area is only big enough for the actual channel and payload strings
- * (each null-terminated).  AsyncQueueEntryEmptySize is the minimum possible
- * entry size, if both channel and payload strings are empty (but note it
- * doesn't include alignment padding).
- *
- * The "length" field should always be rounded up to the next QUEUEALIGN
- * multiple so that all fields are properly aligned.
+ * NOTE: The AsyncQueueEntry structure is now defined in commands/async.h
+ * as a compact metadata-only structure for the new WAL-based notification system.
+ * The old variable-length structure with full notification content is no longer used.
  */
-typedef struct AsyncQueueEntry
-{
-	int			length;			/* total allocated length of entry */
-	Oid			dboid;			/* sender's database OID */
-	TransactionId xid;			/* sender's XID */
-	int32		srcPid;			/* sender's PID */
-	char		data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
-} AsyncQueueEntry;
-
-/* Currently, no field of AsyncQueueEntry requires more than int alignment */
-#define QUEUEALIGN(len)		INTALIGN(len)
 
-#define AsyncQueueEntryEmptySize	(offsetof(AsyncQueueEntry, data) + 2)
+/* Queue alignment is still needed for SLRU page management */
+#define QUEUEALIGN(len)		INTALIGN(len)
 
 /*
  * Struct describing a queue position, and assorted macros for working with it
@@ -440,8 +439,6 @@ static bool IsListeningOn(const char *channel);
 static void asyncQueueUnregister(void);
 static bool asyncQueueIsFull(void);
 static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
-static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
-static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
 static double asyncQueueUsage(void);
 static void asyncQueueFillWarning(void);
 static void SignalBackends(void);
@@ -457,6 +454,8 @@ static void AddEventToPendingNotifies(Notification *n);
 static uint32 notification_hash(const void *key, Size keysize);
 static int	notification_match(const void *key1, const void *key2, Size keysize);
 static void ClearPendingActionsAndNotifies(void);
+static void asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn);
+static void processNotificationFromWAL(XLogRecPtr notify_lsn);
 
 /*
  * Compute the difference between two queue page numbers.
@@ -890,65 +889,81 @@ PreCommit_Notify(void)
 		}
 	}
 
-	/* Queue any pending notifies (must happen after the above) */
+	/* Write notification data to WAL if we have any */
 	if (pendingNotifies)
 	{
-		ListCell   *nextNotify;
+		TransactionId currentXid;
+		ListCell   *l;
+		size_t		total_size = 0;
+		uint32		nnotifications = 0;
+		char	   *notifications_data;
+		char	   *ptr;
+		XLogRecPtr	notify_lsn;
 
 		/*
 		 * Make sure that we have an XID assigned to the current transaction.
 		 * GetCurrentTransactionId is cheap if we already have an XID, but not
-		 * so cheap if we don't, and we'd prefer not to do that work while
-		 * holding NotifyQueueLock.
+		 * so cheap if we don't.
 		 */
-		(void) GetCurrentTransactionId();
+		currentXid = GetCurrentTransactionId();
 
 		/*
-		 * Serialize writers by acquiring a special lock that we hold till
-		 * after commit.  This ensures that queue entries appear in commit
-		 * order, and in particular that there are never uncommitted queue
-		 * entries ahead of committed ones, so an uncommitted transaction
-		 * can't block delivery of deliverable notifications.
-		 *
-		 * We use a heavyweight lock so that it'll automatically be released
-		 * after either commit or abort.  This also allows deadlocks to be
-		 * detected, though really a deadlock shouldn't be possible here.
-		 *
-		 * The lock is on "database 0", which is pretty ugly but it doesn't
-		 * seem worth inventing a special locktag category just for this.
-		 * (Historical note: before PG 9.0, a similar lock on "database 0" was
-		 * used by the flatfiles mechanism.)
+		 * Step 1: Write notification data to WAL.
+		 * This can be done in parallel with other transactions since we're
+		 * not holding any global locks yet.
 		 */
-		LockSharedObject(DatabaseRelationId, InvalidOid, 0,
-						 AccessExclusiveLock);
+		
+		/* First pass: calculate total size needed for serialization */
+		foreach(l, pendingNotifies->events)
+		{
+			Notification *n = (Notification *) lfirst(l);
+			
+			/* Size: 2 bytes for channel_len + 2 bytes for payload_len + strings */
+			total_size += 4 + n->channel_len + 1 + n->payload_len + 1;
+			nnotifications++;
+		}
+
+		/* Allocate buffer for notification data */
+		notifications_data = palloc(total_size);
+		ptr = notifications_data;
 
-		/* Now push the notifications into the queue */
-		nextNotify = list_head(pendingNotifies->events);
-		while (nextNotify != NULL)
+		/* Second pass: serialize all notifications */
+		foreach(l, pendingNotifies->events)
 		{
-			/*
-			 * Add the pending notifications to the queue.  We acquire and
-			 * release NotifyQueueLock once per page, which might be overkill
-			 * but it does allow readers to get in while we're doing this.
-			 *
-			 * A full queue is very uncommon and should really not happen,
-			 * given that we have so much space available in the SLRU pages.
-			 * Nevertheless we need to deal with this possibility. Note that
-			 * when we get here we are in the process of committing our
-			 * transaction, but we have not yet committed to clog, so at this
-			 * point in time we can still roll the transaction back.
-			 */
-			LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
-			asyncQueueFillWarning();
-			if (asyncQueueIsFull())
-				ereport(ERROR,
-						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-						 errmsg("too many notifications in the NOTIFY queue")));
-			nextNotify = asyncQueueAddEntries(nextNotify);
-			LWLockRelease(NotifyQueueLock);
+			Notification *n = (Notification *) lfirst(l);
+			char	   *channel = n->data;
+			char	   *payload = n->data + n->channel_len + 1;
+
+			/* Write channel length, payload length, channel, and payload */
+			memcpy(ptr, &n->channel_len, 2);
+			ptr += 2;
+			memcpy(ptr, &n->payload_len, 2);
+			ptr += 2;
+			memcpy(ptr, channel, n->channel_len + 1);
+			ptr += n->channel_len + 1;
+			memcpy(ptr, payload, n->payload_len + 1);
+			ptr += n->payload_len + 1;
 		}
 
-		/* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
+		/* Write notification data to WAL */
+		notify_lsn = LogAsyncNotifyData(MyDatabaseId, currentXid, MyProcPid,
+										nnotifications, total_size,
+										notifications_data);
+
+		pfree(notifications_data);
+
+		/*
+		 * Step 2: Record the notification LSN in transaction state.
+		 * This will be included in the commit record later.
+		 */
+		MyProc->notifyCommitLsn = notify_lsn;
+
+		/*
+		 * Note: We don't add to the traditional SLRU queue here anymore.
+		 * Instead, AtCommit_Notify will add a compact entry to the queue
+		 * pointing to the WAL data after the transaction commits.
+		 * We also don't clear pendingNotifies here; AtCommit_Notify will.
+		 */
 	}
 }
 
@@ -1006,13 +1021,34 @@ AtCommit_Notify(void)
 		asyncQueueUnregister();
 
 	/*
-	 * Send signals to listening backends.  We need do this only if there are
-	 * pending notifies, which were previously added to the shared queue by
-	 * PreCommit_Notify().
+	 * Step 3: If we have notifications, add compact metadata to SLRU queue
+	 * and signal listeners. This happens after transaction commit so the
+	 * notification LSN in our commit record is now durable.
 	 */
-	if (pendingNotifies != NULL)
+	if (pendingNotifies != NULL && !XLogRecPtrIsInvalid(MyProc->notifyCommitLsn))
+	{
+		/*
+		 * Write commit record with reference to notification data.
+		 * This establishes the connection between commit and notifications.
+		 */
+		LogAsyncNotifyCommit(MyDatabaseId, GetCurrentTransactionId(), MyProc->notifyCommitLsn);
+
+		/*
+		 * Add compact entry to SLRU queue pointing to WAL data.
+		 * This is much faster than the old approach since we're only
+		 * writing metadata, not the full notification content.
+		 */
+		LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+		asyncQueueAddCompactEntry(MyDatabaseId, GetCurrentTransactionId(), MyProc->notifyCommitLsn);
+		LWLockRelease(NotifyQueueLock);
+
+		/* Signal listening backends to check the queue */
 		SignalBackends();
 
+		/* Clear the notification LSN now that we're done with it */
+		MyProc->notifyCommitLsn = InvalidXLogRecPtr;
+	}
+
 	/*
 	 * If it's time to try to advance the global tail pointer, do that.
 	 *
@@ -1319,21 +1355,11 @@ asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
 static void
 asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
 {
-	size_t		channellen = n->channel_len;
-	size_t		payloadlen = n->payload_len;
-	int			entryLength;
-
-	Assert(channellen < NAMEDATALEN);
-	Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
-
-	/* The terminators are already included in AsyncQueueEntryEmptySize */
-	entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
-	entryLength = QUEUEALIGN(entryLength);
-	qe->length = entryLength;
-	qe->dboid = MyDatabaseId;
+	/* For the new WAL-based system, we create a compact entry with metadata only */
+	qe->dbid = MyDatabaseId;
 	qe->xid = GetCurrentTransactionId();
-	qe->srcPid = MyProcPid;
-	memcpy(qe->data, n->data, channellen + payloadlen + 2);
+	/* notify_lsn will be set later when we write to WAL */
+	qe->notify_lsn = InvalidXLogRecPtr;
 }
 
 /*
@@ -1405,7 +1431,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
 		offset = QUEUE_POS_OFFSET(queue_head);
 
 		/* Check whether the entry really fits on the current page */
-		if (offset + qe.length <= QUEUE_PAGESIZE)
+		if (offset + ASYNC_QUEUE_ENTRY_SIZE <= QUEUE_PAGESIZE)
 		{
 			/* OK, so advance nextNotify past this item */
 			nextNotify = lnext(pendingNotifies->events, nextNotify);
@@ -1414,22 +1440,21 @@ asyncQueueAddEntries(ListCell *nextNotify)
 		{
 			/*
 			 * Write a dummy entry to fill up the page. Actually readers will
-			 * only check dboid and since it won't match any reader's database
+			 * only check dbid and since it won't match any reader's database
 			 * OID, they will ignore this entry and move on.
 			 */
-			qe.length = QUEUE_PAGESIZE - offset;
-			qe.dboid = InvalidOid;
-			qe.data[0] = '\0';	/* empty channel */
-			qe.data[1] = '\0';	/* empty payload */
+			qe.dbid = InvalidOid;
+			qe.xid = InvalidTransactionId;
+			qe.notify_lsn = InvalidXLogRecPtr;
 		}
 
 		/* Now copy qe into the shared buffer page */
 		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
 			   &qe,
-			   qe.length);
+			   ASYNC_QUEUE_ENTRY_SIZE);
 
 		/* Advance queue_head appropriately, and detect if page is full */
-		if (asyncQueueAdvance(&(queue_head), qe.length))
+		if (asyncQueueAdvance(&(queue_head), ASYNC_QUEUE_ENTRY_SIZE))
 		{
 			LWLock	   *lock;
 
@@ -2032,14 +2057,13 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 		qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
 
 		/*
-		 * Advance *current over this message, possibly to the next page. As
-		 * noted in the comments for asyncQueueReadAllNotifications, we must
-		 * do this before possibly failing while processing the message.
+		 * Advance *current over this compact entry. The new compact entries are
+		 * fixed-size, making this much simpler than the old variable-length entries.
 		 */
-		reachedEndOfPage = asyncQueueAdvance(current, qe->length);
+		reachedEndOfPage = asyncQueueAdvance(current, sizeof(AsyncQueueEntry));
 
 		/* Ignore messages destined for other databases */
-		if (qe->dboid == MyDatabaseId)
+		if (qe->dbid == MyDatabaseId)
 		{
 			if (XidInMVCCSnapshot(qe->xid, snapshot))
 			{
@@ -2047,20 +2071,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 				 * The source transaction is still in progress, so we can't
 				 * process this message yet.  Break out of the loop, but first
 				 * back up *current so we will reprocess the message next
-				 * time.  (Note: it is unlikely but not impossible for
-				 * TransactionIdDidCommit to fail, so we can't really avoid
-				 * this advance-then-back-up behavior when dealing with an
-				 * uncommitted message.)
-				 *
-				 * Note that we must test XidInMVCCSnapshot before we test
-				 * TransactionIdDidCommit, else we might return a message from
-				 * a transaction that is not yet visible to snapshots; compare
-				 * the comments at the head of heapam_visibility.c.
-				 *
-				 * Also, while our own xact won't be listed in the snapshot,
-				 * we need not check for TransactionIdIsCurrentTransactionId
-				 * because our transaction cannot (yet) have queued any
-				 * messages.
+				 * time.
 				 */
 				*current = thisentry;
 				reachedStop = true;
@@ -2068,16 +2079,12 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 			}
 			else if (TransactionIdDidCommit(qe->xid))
 			{
-				/* qe->data is the null-terminated channel name */
-				char	   *channel = qe->data;
-
-				if (IsListeningOn(channel))
-				{
-					/* payload follows channel name */
-					char	   *payload = qe->data + strlen(channel) + 1;
-
-					NotifyMyFrontEnd(channel, payload, qe->srcPid);
-				}
+				/*
+				 * Step 5: Read notification data from WAL using stored LSN.
+				 * The compact entry only contains metadata; actual notification
+				 * content is retrieved from WAL on demand.
+				 */
+				processNotificationFromWAL(qe->notify_lsn);
 			}
 			else
 			{
@@ -2097,6 +2104,228 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 	return reachedStop;
 }
 
+/*
+ * processNotificationFromWAL
+ *
+ * Fetch notification data from WAL using the stored LSN and process
+ * the individual notifications for delivery to listening frontend.
+ * This implements Step 5 of the new WAL-based notification system.
+ */
+static void
+processNotificationFromWAL(XLogRecPtr notify_lsn)
+{
+	XLogReaderState *xlogreader;
+	DecodedXLogRecord *record;
+	xl_async_notify_data *xlrec;
+	char	   *data;
+	char	   *ptr;
+	uint32_t	remaining;
+	int			srcPid;
+	char	   *errormsg;
+
+	/*
+	 * Create XLog reader to fetch the notification data record.
+	 * We use a temporary reader since this is called during normal
+	 * notification processing, not during recovery.
+	 */
+	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+									XL_ROUTINE(.page_read = &read_local_xlog_page,
+											   .segment_open = &wal_segment_open,
+											   .segment_close = &wal_segment_close),
+									NULL);
+	if (!xlogreader)
+		elog(ERROR, "failed to allocate XLog reader for notification data");
+
+	/* Begin reading from the specified LSN */
+	{
+		XLogRecPtr startptr;
+		/* notify_lsn can be the end LSN; back up one byte and find next record */
+		startptr = XLogFindNextRecord(xlogreader, notify_lsn - 1);
+		if (XLogRecPtrIsInvalid(startptr))
+			elog(ERROR, "could not locate WAL record preceding %X/%X",
+				 LSN_FORMAT_ARGS(notify_lsn));
+		XLogBeginRead(xlogreader, startptr);
+	}
+
+	/* Read the WAL record containing notification data */
+	record = (DecodedXLogRecord *) XLogReadRecord(xlogreader, &errormsg);
+	if (record == NULL)
+		elog(ERROR, "failed to read notification data from WAL at %X/%X: %s",
+			 LSN_FORMAT_ARGS(notify_lsn), errormsg ? errormsg : "no error message");
+
+	/* Verify this is the expected record type */
+	if (XLogRecGetRmid(xlogreader) != RM_ASYNC_ID ||
+		(XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK) != XLOG_ASYNC_NOTIFY_DATA)
+	{
+		elog(LOG, "Unexpected WAL record type for notification data");
+		elog(LOG, "XLogRecGetRmid(xlogreader): %d", XLogRecGetRmid(xlogreader));
+		elog(LOG, "XLogRecGetInfo(xlogreader): %d", XLogRecGetInfo(xlogreader));
+	}
+
+	/* Extract the notification data from the WAL record */
+	xlrec = (xl_async_notify_data *) XLogRecGetData(xlogreader);
+	srcPid = xlrec->srcPid;
+	data = (char *) xlrec + SizeOfAsyncNotifyData;
+	ptr = data;
+	remaining = XLogRecGetDataLen(xlogreader) - SizeOfAsyncNotifyData;
+
+	/*
+	 * Process each notification in the serialized data.
+	 * The format is: 2-byte channel_len, 2-byte payload_len,
+	 * null-terminated channel, null-terminated payload.
+	 */
+	for (uint32_t i = 0; i < xlrec->nnotifications && remaining >= 4; i++)
+	{
+		uint16		channel_len;
+		uint16		payload_len;
+		char	   *channel;
+		char	   *payload;
+
+		/* Read lengths */
+		memcpy(&channel_len, ptr, 2);
+		ptr += 2;
+		memcpy(&payload_len, ptr, 2);
+		ptr += 2;
+		remaining -= 4;
+
+		/* Verify we have enough data */
+		if (remaining < channel_len + 1 + payload_len + 1)
+			break;
+
+		/* Extract channel and payload strings */
+		channel = ptr;
+		ptr += channel_len + 1;
+		payload = ptr;
+		ptr += payload_len + 1;
+		remaining -= (channel_len + 1 + payload_len + 1);
+
+		/* Deliver notification if we're listening on this channel */
+		if (IsListeningOn(channel))
+			NotifyMyFrontEnd(channel, payload, srcPid);
+	}
+
+	/* Clean up */
+	XLogReaderFree(xlogreader);
+}
+
+/*
+ * asyncQueueAddCompactEntry
+ *
+ * Add a compact entry to the notification SLRU queue containing only
+ * metadata (dbid, xid, notify_lsn) that points to the full notification 
+ * data in WAL. This is much more efficient than the old approach of
+ * storing complete notification content in the SLRU queue.
+ */
+static void
+asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn)
+{
+	AsyncQueueEntry entry;
+	QueuePosition queue_head;
+	int64		pageno;
+	int			offset;
+	int			slotno;
+	LWLock	   *banklock;
+
+	/*
+	 * Fill in the compact entry with just the metadata.
+	 * No payload data is stored here - it's all in WAL.
+	 */
+	entry.dbid = dbid;
+	entry.xid = xid;
+	entry.notify_lsn = notify_lsn;
+
+	/* Caller should already hold NotifyQueueLock in exclusive mode */
+	queue_head = QUEUE_HEAD;
+
+	/*
+	 * Get the current page. If this is the first write since postmaster
+	 * started, initialize the first page.
+	 */
+	pageno = QUEUE_POS_PAGE(queue_head);
+	banklock = SimpleLruGetBankLock(NotifyCtl, pageno);
+
+	LWLockAcquire(banklock, LW_EXCLUSIVE);
+
+	if (QUEUE_POS_IS_ZERO(queue_head))
+		slotno = SimpleLruZeroPage(NotifyCtl, pageno);
+	else
+		slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
+								   InvalidTransactionId);
+
+	/* Mark the page dirty before writing */
+	NotifyCtl->shared->page_dirty[slotno] = true;
+
+	offset = QUEUE_POS_OFFSET(queue_head);
+
+	/* Check if the compact entry fits on the current page */
+	if (offset + sizeof(AsyncQueueEntry) <= QUEUE_PAGESIZE)
+	{
+		/* Copy the compact entry to the shared buffer */
+		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
+			   &entry,
+			   sizeof(AsyncQueueEntry));
+
+		/* Advance queue head by the size of our compact entry */
+		if (asyncQueueAdvance(&queue_head, sizeof(AsyncQueueEntry)))
+		{
+			/*
+			 * Page became full. Initialize the next page to ensure SLRU
+			 * consistency (similar to what asyncQueueAddEntries does).
+			 */
+			LWLock	   *nextlock;
+
+			pageno = QUEUE_POS_PAGE(queue_head);
+			nextlock = SimpleLruGetBankLock(NotifyCtl, pageno);
+			if (nextlock != banklock)
+			{
+				LWLockRelease(banklock);
+				LWLockAcquire(nextlock, LW_EXCLUSIVE);
+			}
+			SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
+			if (nextlock != banklock)
+			{
+				LWLockRelease(nextlock);
+				LWLockAcquire(banklock, LW_EXCLUSIVE);
+			}
+
+			/* Set cleanup flag if appropriate */
+			if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
+				tryAdvanceTail = true;
+		}
+
+		/* Update the global queue head */
+		QUEUE_HEAD = queue_head;
+	}
+	else
+	{
+		/*
+		 * Entry doesn't fit on current page. This should be very rare with
+		 * our small compact entries, but handle it by padding the page and
+		 * writing to the next page.
+		 */
+		AsyncQueueEntry padding;
+
+		memset(&padding, 0, sizeof(padding));
+		padding.dbid = InvalidOid;  /* Mark as padding */
+
+		/* Fill the rest of the page with padding */
+		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
+			   &padding,
+			   QUEUE_PAGESIZE - offset);
+
+		/* Advance to next page */
+		asyncQueueAdvance(&queue_head, QUEUE_PAGESIZE - offset);
+
+		/* Recursively add the entry on the new page */
+		QUEUE_HEAD = queue_head;
+		LWLockRelease(banklock);
+		asyncQueueAddCompactEntry(dbid, xid, notify_lsn);
+		return;
+	}
+
+	LWLockRelease(banklock);
+}
+
 /*
  * Advance the shared queue tail variable to the minimum of all the
  * per-backend tail pointers.  Truncate pg_notify space if possible.
@@ -2395,3 +2624,94 @@ check_notify_buffers(int *newval, void **extra, GucSource source)
 {
 	return check_slru_buffers("notify_buffers", newval);
 }
+
+/*
+ * Write a WAL record containing async notification data
+ *
+ * This logs notification data to WAL, allowing us to release locks earlier
+ * and maintain commit ordering through WAL's natural ordering guarantees.
+ */
+XLogRecPtr
+LogAsyncNotifyData(Oid dboid, TransactionId xid, int32 srcPid,
+				   uint32 nnotifications, Size data_len, char *data)
+{
+	xl_async_notify_data xlrec;
+	XLogRecPtr	recptr;
+
+	xlrec.dbid = dboid;
+	xlrec.xid = xid;
+	xlrec.srcPid = srcPid;
+	xlrec.nnotifications = nnotifications;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfAsyncNotifyData);
+	XLogRegisterData(data, data_len);
+
+	recptr = XLogInsert(RM_ASYNC_ID, XLOG_ASYNC_NOTIFY_DATA);
+
+	return recptr;
+}
+
+/*
+ * Write a WAL record marking commit with notification reference
+ *
+ * This creates a link between the transaction commit and its notification data,
+ * allowing listeners to efficiently locate notification data in WAL.
+ */
+XLogRecPtr
+LogAsyncNotifyCommit(Oid dboid, TransactionId xid, XLogRecPtr notify_lsn)
+{
+	xl_async_notify_commit xlrec;
+	XLogRecPtr	recptr;
+
+	xlrec.dbid = dboid;
+	xlrec.xid = xid;
+	xlrec.notify_lsn = notify_lsn;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfAsyncNotifyCommit);
+
+	recptr = XLogInsert(RM_ASYNC_ID, XLOG_ASYNC_NOTIFY_COMMIT);
+
+	return recptr;
+}
+
+
+
+/*
+ * Redo function for async notification WAL records
+ *
+ * During recovery, we need to replay notification records. For now,
+ * we'll add them to the traditional notification queue. In a complete
+ * implementation, replaying backends would read directly from WAL.
+ */
+void
+async_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	switch (info)
+	{
+		case XLOG_ASYNC_NOTIFY_DATA:
+			/* 
+			 * For notification data records, we don't need to do anything
+			 * during recovery since listeners will read directly from WAL.
+			 * The data is already durably stored in the WAL record itself.
+			 */
+			break;
+
+		case XLOG_ASYNC_NOTIFY_COMMIT:
+			/*
+			 * For commit records, we could add the compact entry to the
+			 * SLRU queue during recovery, but it's not strictly necessary
+			 * since recovery typically happens with no active listeners.
+			 * The important thing is that the WAL data is preserved.
+			 */
+			break;
+
+		default:
+			elog(PANIC, "async_redo: unknown op code %u", info);
+	}
+}
+
+
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 8f4b282c6b1..a2e536cc910 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -13,6 +13,7 @@
 
 #include <unistd.h>
 
+#include "access/async_xlog.h"
 #include "access/rmgr.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
diff --git a/src/bin/pg_waldump/asyncdesc.c b/src/bin/pg_waldump/asyncdesc.c
new file mode 120000
index 00000000000..0f6512e98ef
--- /dev/null
+++ b/src/bin/pg_waldump/asyncdesc.c
@@ -0,0 +1 @@
+../../../src/backend/access/rmgrdesc/asyncdesc.c
\ No newline at end of file
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index fac509ed134..b06c85bf0e7 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -8,6 +8,7 @@
 #define FRONTEND 1
 #include "postgres.h"
 
+#include "access/async_xlog.h"
 #include "access/brin_xlog.h"
 #include "access/clog.h"
 #include "access/commit_ts.h"
diff --git a/src/include/access/async_xlog.h b/src/include/access/async_xlog.h
new file mode 100644
index 00000000000..1214be82099
--- /dev/null
+++ b/src/include/access/async_xlog.h
@@ -0,0 +1,56 @@
+/*-------------------------------------------------------------------------
+ *
+ * async_xlog.h
+ *	  Async notification WAL definitions
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/async_xlog.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef ASYNC_XLOG_H
+#define ASYNC_XLOG_H
+
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+#include "lib/stringinfo.h"
+
+/*
+ * WAL record types for async notifications
+ */
+#define XLOG_ASYNC_NOTIFY_DATA	0x00	/* notification data */
+#define XLOG_ASYNC_NOTIFY_COMMIT	0x10	/* commit with notify reference */
+
+/*
+ * WAL record for notification data (written in PreCommit_Notify)
+ */
+typedef struct xl_async_notify_data
+{
+	Oid			dbid;			/* database ID */
+	TransactionId xid;			/* transaction ID */
+	int32		srcPid;			/* source backend PID */
+	uint32		nnotifications;	/* number of notifications */
+	/* followed by serialized notification data */
+} xl_async_notify_data;
+
+#define SizeOfAsyncNotifyData	(offsetof(xl_async_notify_data, nnotifications) + sizeof(uint32))
+
+/*
+ * WAL record for commit with notification reference
+ */
+typedef struct xl_async_notify_commit
+{
+	Oid			dbid;			/* database ID */
+	TransactionId xid;			/* transaction ID */
+	XLogRecPtr	notify_lsn;		/* LSN of corresponding notify data record */
+} xl_async_notify_commit;
+
+#define SizeOfAsyncNotifyCommit	(offsetof(xl_async_notify_commit, notify_lsn) + sizeof(XLogRecPtr))
+
+extern void async_redo(XLogReaderState *record);
+extern void async_desc(StringInfo buf, XLogReaderState *record);
+extern const char *async_identify(uint8 info);
+
+#endif							/* ASYNC_XLOG_H */
\ No newline at end of file
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 8e7fc9db877..58293e05165 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
 PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
+PG_RMGR(RM_ASYNC_ID, "Async", async_redo, async_desc, async_identify, NULL, NULL, NULL, NULL)
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..1d204542840 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -14,11 +14,24 @@
 #define ASYNC_H
 
 #include <signal.h>
+#include "access/xlogreader.h"
 
 extern PGDLLIMPORT bool Trace_notify;
 extern PGDLLIMPORT int max_notify_queue_pages;
 extern PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending;
 
+/*
+ * Compact SLRU queue entry - stores metadata pointing to WAL data
+ */
+typedef struct AsyncQueueEntry
+{
+	Oid			dbid;			/* database ID for quick filtering */
+	TransactionId	xid;			/* transaction ID */
+	XLogRecPtr	notify_lsn;		/* LSN of notification data in WAL */
+} AsyncQueueEntry;
+
+#define ASYNC_QUEUE_ENTRY_SIZE	sizeof(AsyncQueueEntry)
+
 extern Size AsyncShmemSize(void);
 extern void AsyncShmemInit(void);
 
@@ -46,4 +59,10 @@ extern void HandleNotifyInterrupt(void);
 /* process interrupts */
 extern void ProcessNotifyInterrupt(bool flush);
 
+/* WAL-based notification functions */
+extern XLogRecPtr LogAsyncNotifyData(Oid dboid, TransactionId xid, int32 srcPid,
+									 uint32 nnotifications, Size data_len, char *data);
+extern XLogRecPtr LogAsyncNotifyCommit(Oid dboid, TransactionId xid, XLogRecPtr notify_lsn);
+extern void async_redo(XLogReaderState *record);
+
 #endif							/* ASYNC_H */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index c6f5ebceefd..71459fe5529 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -301,6 +301,9 @@ struct PGPROC
 
 	uint32		wait_event_info;	/* proc's wait information */
 
+	/* Support for async notifications */
+	XLogRecPtr	notifyCommitLsn;	/* LSN of notification data for current xact */
+
 	/* Support for group transaction status update. */
 	bool		clogGroupMember;	/* true, if member of clog group */
 	pg_atomic_uint32 clogGroupNext; /* next clog group member */
-- 
2.47.1

