Hi,

Please find attached a POC patch that introduces changes to the WAL sender
and
receiver, allowing WAL records to be sent to standbys before they are
flushed
to disk on the primary during physical replication. This is intended to
improve
replication latency by reducing the amount of WAL read from disk.
For large transactions, this approach ensures that the bulk of the
transaction’s
WAL records are already sent to the standby before the flush occurs on the
primary.
As a result, the flush on the primary and standby happen closer together,
reducing replication lag.

Observations from the benchmark:
1. The patch improves TPS by ~13% in the sync replication setup. In
repeated runs,
I see that the TPS increase is anywhere between 5% to 13% .
2. WAL sender reads significantly less WAL from disk, indicating more
efficient use
of WAL buffers and reduced disk I/O

Following are some of the details of the implementation:

1. Primary does not wait for flush before starting to send data, so it is
likely to
send smaller chunks of data. To prevent network overload, changes are made
to
avoid sending excessively small packets.
2. The sender includes the current flush pointer in the replication
protocol
messages, so the standby knows up to which point WAL has been safely
flushed
on the primary.
3. The logic ensures that standbys do not apply transactions that have not
been flushed on the primary, by updating the flushedUpto position on the
standby
only up to the flushPtr received from the primary.
4. WAL records received from the primary are written and can be flushed to
disk on the
standby, but are only marked as flushed up to the flushPtr reported by the
primary.

Benchmark details are as follows:
Synchronous replication with remote write enabled.
Two Azure VMs: Central India (primary), Central US (standby).
OS: Ubuntu 24.04, VM size D4s (4 vCPUs, 16 GiB RAM).

With patch
TPS : 115
WAL read from disk by wal sender : ~40MB (read bytes from pg_stat_io)
WAL generated during the test: 772705760 bytes.

Without the patch
TPS: 102
WAL read from disk by wal sender : ~79MB (read bytes from pg_stat_io)
WAL generated during the test : 760060792 bytes

Commit hash: b1187266e0

pgbench -c 32 -j 4 postgres -T 300 -f wal_test.sql

wal_test.sql (each transaction generates ~36KB of WAL):
\set delta random(1, 500)
BEGIN;
INSERT INTO wal_bloat_:delta (data)
SELECT repeat('x', 8000)
FROM generate_series(1, 80);

TODO:
1. Ensure there is a robust mechanism on the receiver to prevent WAL
records
that are not flushed on primary from being applied on standby, under any
circumstances.
2. When smaller chunks of WAL are received on the standby, it can lead to
more
frequent disk write operations. To mitigate this issue, employing WAL
buffers
on the standby could be a more effective approach. Evaluate the performance
impact of using WAL buffers on the standby.

Similar idea was proposed here:
Proposal: Allow walsenders to send WAL directly from wal_buffers to replicas
<https://www.postgresql.org/message-id/flat/CALj2ACXCSM%2BsTR%3D5NNRtmSQr3g1Vnr-yR91azzkZCaCJ7u4d4w%40mail.gmail.com>
This idea is also discussed here recently :
https://www.postgresql.org/message-id/fa2e932eeff472250e2dbacb49d8c43ad282fea9.camel%40j-davis.com

Kindly let me know your thoughts.

Thank you,
Rahila Syed
From 54dab841e02a8c00f4d14c5955bacc6309082f52 Mon Sep 17 00:00:00 2001
From: Rahila Syed <[email protected]>
Date: Wed, 9 Jul 2025 15:35:20 +0530
Subject: [PATCH] Changes for sending of WAL records before flush

This patch adds all the wal sender side changes required
to send unflushed WAL records to standby.
In order to ensure that the unflushed transactions on primary
are not applied on standby, update the flushedUpto position
on standby only upto the flushPtr on master.

Co-authored by: Melih Mutlu <[email protected]>
Co-authored by: Rahila Syed <[email protected]>
---
 src/backend/access/transam/xlog.c       |  16 +++-
 src/backend/access/transam/xloginsert.c |   4 +
 src/backend/replication/walreceiver.c   |  30 ++++--
 src/backend/replication/walsender.c     | 122 ++++++++++++++++++------
 src/bin/pg_basebackup/receivelog.c      |   1 +
 src/include/access/xlog.h               |   2 +
 6 files changed, 136 insertions(+), 39 deletions(-)

diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index 0baf0ac6160..f203ac442cb 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -704,7 +704,6 @@ static void ReserveXLogInsertLocation(int size, XLogRecPtr 
*StartPos,
                                                                          
XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
 static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
                                                          XLogRecPtr *PrevPtr);
-static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
 static char *GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli);
 static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
 static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
@@ -923,6 +922,9 @@ XLogInsertRecord(XLogRecData *rdata,
                                                        class == 
WALINSERT_SPECIAL_SWITCH, rdata,
                                                        StartPos, EndPos, 
insertTLI);
 
+               if (StartPos - StartPos % XLOG_BLCKSZ + XLOG_BLCKSZ < EndPos)
+                       WalSndWakeupRequest();
+
                /*
                 * Unless record is flagged as not important, update LSN of last
                 * important record in the current slot. When holding all 
locks, just
@@ -1503,7 +1505,7 @@ WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt)
  * uninitialized page), and the inserter might need to evict an old WAL buffer
  * to make room for a new one, which in turn requires WALWriteLock.
  */
-static XLogRecPtr
+XLogRecPtr
 WaitXLogInsertionsToFinish(XLogRecPtr upto)
 {
        uint64          bytepos;
@@ -6522,6 +6524,16 @@ GetInsertRecPtr(void)
        return recptr;
 }
 
+XLogRecPtr
+GetLogInsertRecPtr(void)
+{
+       XLogRecPtr      recptr;
+
+       recptr = pg_atomic_read_membarrier_u64(&XLogCtl->logInsertResult);
+
+       return recptr;
+}
+
 /*
  * GetFlushRecPtr -- Returns the current flush position, ie, the last WAL
  * position known to be fsync'd to disk. This should only be used on a
diff --git a/src/backend/access/transam/xloginsert.c 
b/src/backend/access/transam/xloginsert.c
index c7571429e8e..9c189104946 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -36,6 +36,7 @@
 #include "miscadmin.h"
 #include "pg_trace.h"
 #include "replication/origin.h"
+#include "replication/walsender.h"
 #include "storage/bufmgr.h"
 #include "storage/proc.h"
 #include "utils/memutils.h"
@@ -526,6 +527,9 @@ XLogInsert(RmgrId rmid, uint8 info)
 
        XLogResetInsertion();
 
+       /* Wake up Walsender and let it know that we inserted new WAL */
+       WalSndWakeupProcessRequests(true, !RecoveryInProgress());
+
        return EndPos;
 }
 
diff --git a/src/backend/replication/walreceiver.c 
b/src/backend/replication/walreceiver.c
index 7361ffc9dcf..335146745a4 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -110,6 +110,7 @@ static struct
 {
        XLogRecPtr      Write;                  /* last byte + 1 written out in 
the standby */
        XLogRecPtr      Flush;                  /* last byte + 1 flushed in the 
standby */
+       XLogRecPtr      SenderFlush;    /* last byte + 1 flushed in the sender 
*/
 }                      LogstreamResult;
 
 /*
@@ -137,7 +138,7 @@ static void WalRcvWaitForStartPosition(XLogRecPtr 
*startpoint, TimeLineID *start
 static void WalRcvDie(int code, Datum arg);
 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len,
                                                                 TimeLineID 
tli);
-static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
+static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, 
XLogRecPtr flushedupto,
                                                        TimeLineID tli);
 static void XLogWalRcvFlush(bool dying, TimeLineID tli);
 static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
@@ -821,6 +822,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size 
len, TimeLineID tli)
        int                     hdrlen;
        XLogRecPtr      dataStart;
        XLogRecPtr      walEnd;
+       XLogRecPtr      flushedWal;
        TimestampTz sendTime;
        bool            replyRequested;
 
@@ -830,7 +832,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size 
len, TimeLineID tli)
                        {
                                StringInfoData incoming_message;
 
-                               hdrlen = sizeof(int64) + sizeof(int64) + 
sizeof(int64);
+                               hdrlen = sizeof(int64) + sizeof(int64) + 
sizeof(int64) + sizeof(int64);
                                if (len < hdrlen)
                                        ereport(ERROR,
                                                        
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -842,12 +844,13 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size 
len, TimeLineID tli)
                                /* read the fields */
                                dataStart = pq_getmsgint64(&incoming_message);
                                walEnd = pq_getmsgint64(&incoming_message);
+                               flushedWal = pq_getmsgint64(&incoming_message);
                                sendTime = pq_getmsgint64(&incoming_message);
                                ProcessWalSndrMessage(walEnd, sendTime);
 
                                buf += hdrlen;
                                len -= hdrlen;
-                               XLogWalRcvWrite(buf, len, dataStart, tli);
+                               XLogWalRcvWrite(buf, len, dataStart, 
flushedWal, tli);
                                break;
                        }
                case PqReplMsg_Keepalive:
@@ -887,7 +890,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size 
len, TimeLineID tli)
  * Write XLOG data to disk.
  */
 static void
-XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
+XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, XLogRecPtr 
flushedupto, TimeLineID tli)
 {
        int                     startoff;
        int                     byteswritten;
@@ -960,6 +963,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, 
TimeLineID tli)
                buf += byteswritten;
 
                LogstreamResult.Write = recptr;
+               LogstreamResult.SenderFlush = flushedupto;
        }
 
        /* Update shared-memory status */
@@ -986,20 +990,32 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
 {
        Assert(tli != 0);
 
-       if (LogstreamResult.Flush < LogstreamResult.Write)
+       /*
+        * The wal records can be flushed on standby once the flushptr on 
primary
+        * is greater than flushptr on standby. At a given point in time it may 
be
+        * possible that some WAL records that have not been flushed to disk on
+        * primary may get flushed on standby but those WAL won't be applied on
+        * standby until they are flushed on primary.
+        */
+       if ((LogstreamResult.Flush < LogstreamResult.Write) &&
+               (LogstreamResult.Flush < LogstreamResult.SenderFlush))
        {
                WalRcvData *walrcv = WalRcv;
+               XLogRecPtr      flush_ptr;
 
                issue_xlog_fsync(recvFile, recvSegNo, tli);
 
                LogstreamResult.Flush = LogstreamResult.Write;
 
+               flush_ptr = LogstreamResult.Flush > LogstreamResult.SenderFlush 
? LogstreamResult.SenderFlush :
+                       LogstreamResult.Flush;
+
                /* Update shared-memory status */
                SpinLockAcquire(&walrcv->mutex);
-               if (walrcv->flushedUpto < LogstreamResult.Flush)
+               if (walrcv->flushedUpto < flush_ptr)
                {
                        walrcv->latestChunkStart = walrcv->flushedUpto;
-                       walrcv->flushedUpto = LogstreamResult.Flush;
+                       walrcv->flushedUpto = flush_ptr;
                        walrcv->receivedTLI = tli;
                }
                SpinLockRelease(&walrcv->mutex);
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 59822f22b8d..71800eeb70f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -3160,7 +3160,7 @@ WalSndSegmentOpen(XLogReaderState *state, XLogSegNo 
nextSegNo,
 /*
  * Send out the WAL in its normal physical/stored form.
  *
- * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
+ * Read up to MAX_SEND_SIZE bytes of WAL that's been written to WAL buffers,
  * but not yet sent to the client, and buffer it in the libpq output
  * buffer.
  *
@@ -3174,9 +3174,12 @@ XLogSendPhysical(void)
        XLogRecPtr      startptr;
        XLogRecPtr      endptr;
        Size            nbytes;
+       Size            nbytesUntilFlush;
+       Size            nbytesAfterFlush;
        XLogSegNo       segno;
        WALReadError errinfo;
        Size            rbytes;
+       XLogRecPtr      flushPtr;
 
        /* If requested switch the WAL sender to the stopping state. */
        if (got_STOPPING)
@@ -3188,6 +3191,11 @@ XLogSendPhysical(void)
                return;
        }
 
+       if (am_cascading_walsender)
+               flushPtr = GetStandbyFlushRecPtr(NULL);
+       else
+               flushPtr = GetFlushRecPtr(NULL);
+
        /* Figure out how far we can safely send the WAL. */
        if (sendTimeLineIsHistoric)
        {
@@ -3265,14 +3273,23 @@ XLogSendPhysical(void)
                /*
                 * Streaming the current timeline on a primary.
                 *
-                * Attempt to send all data that's already been written out and
-                * fsync'd to disk.  We cannot go further than what's been 
written out
-                * given the current implementation of WALRead().  And in any 
case
-                * it's unsafe to send WAL that is not securely down to disk on 
the
-                * primary: if the primary subsequently crashes and restarts, 
standbys
-                * must not have applied any WAL that got lost on the primary.
+                * Try to send all data that has already been sent to the WAL 
buffers,
+                * even though it is unsafe to send WAL that hasn't been 
securely
+                * written to disk on the primary. If the primary crashes and
+                * restarts, standbys must not apply any WAL that was lost on 
the
+                * primary. To prevent this, even if we send and write WAL 
records to
+                * disk on the standby before they are flushed on the primary, 
we only
+                * apply them after they have been flushed on the primary.
                 */
-               SendRqstPtr = GetFlushRecPtr(NULL);
+               SendRqstPtr = GetLogInsertRecPtr();
+               if (sentPtr >= SendRqstPtr)
+               {
+                       SendRqstPtr = WaitXLogInsertionsToFinish(sentPtr);
+               }
+               else
+               {
+                       SendRqstPtr = WaitXLogInsertionsToFinish(SendRqstPtr);
+               }
        }
 
        /*
@@ -3375,6 +3392,26 @@ XLogSendPhysical(void)
        nbytes = endptr - startptr;
        Assert(nbytes <= MAX_SEND_SIZE);
 
+       /*
+        * Older WALs are more likely to be evicted from buffers and written to
+        * disk. For any WAL before latest flush position, we first try to read
+        * from WAL buffers and then from disk. WALs after the flush position
+        * cannot be found on disk, so we only try to read such WALs from 
buffers.
+        */
+       nbytesUntilFlush = 0;
+       nbytesAfterFlush = 0;
+       if (flushPtr > endptr)
+               nbytesUntilFlush = endptr - startptr;
+       else if (flushPtr > startptr)
+       {
+               nbytesUntilFlush = flushPtr - startptr;
+               nbytesAfterFlush = endptr - flushPtr;
+       }
+       else
+               nbytesAfterFlush = endptr - startptr;
+
+       Assert(nbytes == (nbytesAfterFlush + nbytesUntilFlush));
+
        /*
         * OK to read and send the slice.
         */
@@ -3382,7 +3419,8 @@ XLogSendPhysical(void)
        pq_sendbyte(&output_message, PqReplMsg_WALData);
 
        pq_sendint64(&output_message, startptr);        /* dataStart */
-       pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
+       pq_sendint64(&output_message, endptr);  /* walEnd */
+       pq_sendint64(&output_message, flushPtr);        /* wal flushed upto */
        pq_sendint64(&output_message, 0);       /* sendtime, filled in last */
 
        /*
@@ -3392,25 +3430,49 @@ XLogSendPhysical(void)
        enlargeStringInfo(&output_message, nbytes);
 
 retry:
-       /* attempt to read WAL from WAL buffers first */
-       rbytes = WALReadFromBuffers(&output_message.data[output_message.len],
-                                                               startptr, 
nbytes, xlogreader->seg.ws_tli);
-       output_message.len += rbytes;
-       startptr += rbytes;
-       nbytes -= rbytes;
-
-       /* now read the remaining WAL from WAL file */
-       if (nbytes > 0 &&
-               !WALRead(xlogreader,
-                                &output_message.data[output_message.len],
-                                startptr,
-                                nbytes,
-                                xlogreader->seg.ws_tli,        /* Pass the 
current TLI because
-                                                                               
         * only WalSndSegmentOpen controls
-                                                                               
         * whether new TLI is needed. */
-                                &errinfo))
-               WALReadRaiseError(&errinfo);
+       if (nbytesAfterFlush == 0)
+       {
+               /* attempt to read WAL from WAL buffers first */
+               rbytes = 
WALReadFromBuffers(&output_message.data[output_message.len],
+                                                                       
startptr, nbytesUntilFlush, xlogreader->seg.ws_tli);
+               output_message.len += rbytes;
+               startptr += rbytes;
+               nbytes -= rbytes;
+               nbytesUntilFlush -= rbytes;
+       }
+       if (nbytesUntilFlush > 0)
+       {
+               if (!WALRead(xlogreader,
+                                        
&output_message.data[output_message.len],
+                                        startptr,
+                                        nbytesUntilFlush,
+                                        xlogreader->seg.ws_tli,        /* Pass 
the current TLI
+                                                                               
                 * because only
+                                                                               
                 * WalSndSegmentOpen controls
+                                                                               
                 * whether new TLI is needed. */
+                                        &errinfo))
+                       WALReadRaiseError(&errinfo);
+               output_message.len += nbytesUntilFlush;
+               startptr += nbytesUntilFlush;
+               nbytes -= nbytesUntilFlush;
+       }
 
+       /*
+        * Any WAL further than the latest flush position cannot be found on 
disk,
+        * so we try to read such WALs from buffers.
+        */
+       if (nbytesAfterFlush > 0)
+       {
+               /* attempt to read WAL from WAL buffers for the rest */
+               rbytes = 
WALReadFromBuffers(&output_message.data[output_message.len],
+                                                                       
startptr, nbytesAfterFlush, xlogreader->seg.ws_tli);
+               output_message.len += rbytes;
+               startptr += rbytes;
+               nbytesAfterFlush -= rbytes;
+       }
+       endptr -= nbytesAfterFlush;
+
+       output_message.data[output_message.len] = '\0';
        /* See logical_read_xlog_page(). */
        XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
        CheckXLogRemoved(segno, xlogreader->seg.ws_tli);
@@ -3439,15 +3501,13 @@ retry:
                }
        }
 
-       output_message.len += nbytes;
-       output_message.data[output_message.len] = '\0';
 
        /*
         * Fill the send timestamp last, so that it is taken as late as 
possible.
         */
        resetStringInfo(&tmpbuf);
        pq_sendint64(&tmpbuf, GetCurrentTimestamp());
-       memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
+       memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64) + 
sizeof(int64)],
                   tmpbuf.data, sizeof(int64));
 
        pq_putmessage_noblock(PqMsg_CopyData, output_message.data, 
output_message.len);
@@ -4194,7 +4254,9 @@ WalSndKeepaliveIfNecessary(void)
 
                /* Try to flush pending output to the client */
                if (pq_flush_if_writable() != 0)
+               {
                        WalSndShutdown();
+               }
        }
 }
 
diff --git a/src/bin/pg_basebackup/receivelog.c 
b/src/bin/pg_basebackup/receivelog.c
index 25b13c7f55c..572b0a3b19f 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -1068,6 +1068,7 @@ ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char 
*copybuf, int len,
        hdr_len = 1;                            /* msgtype PqReplMsg_WALData */
        hdr_len += 8;                           /* dataStart */
        hdr_len += 8;                           /* walEnd */
+       hdr_len += 8;                           /* flushPtr */
        hdr_len += 8;                           /* sendTime */
        if (len < hdr_len)
        {
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index d12798be3d8..4033e62bb93 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -249,6 +249,7 @@ extern void UpdateFullPageWrites(void);
 extern void GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool 
*doPageWrites_p);
 extern XLogRecPtr GetRedoRecPtr(void);
 extern XLogRecPtr GetInsertRecPtr(void);
+extern XLogRecPtr GetLogInsertRecPtr(void);
 extern XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI);
 extern TimeLineID GetWALInsertionTimeLine(void);
 extern TimeLineID GetWALInsertionTimeLineIfSet(void);
@@ -297,6 +298,7 @@ extern void do_pg_backup_stop(BackupState *state, bool 
waitforarchive);
 extern void do_pg_abort_backup(int code, Datum arg);
 extern void register_persistent_abort_backup_handler(void);
 extern SessionBackupState get_backup_status(void);
+extern XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
 
 /* File path names (all relative to $PGDATA) */
 #define RECOVERY_SIGNAL_FILE   "recovery.signal"
-- 
2.34.1

Reply via email to