Hi,
Please find attached a POC patch that introduces changes to the WAL sender
and
receiver, allowing WAL records to be sent to standbys before they are
flushed
to disk on the primary during physical replication. This is intended to
improve
replication latency by reducing the amount of WAL read from disk.
For large transactions, this approach ensures that the bulk of the
transaction’s
WAL records are already sent to the standby before the flush occurs on the
primary.
As a result, the flush on the primary and standby happen closer together,
reducing replication lag.
Observations from the benchmark:
1. The patch improves TPS by ~13% in the sync replication setup. In
repeated runs,
I see that the TPS increase is anywhere between 5% to 13% .
2. WAL sender reads significantly less WAL from disk, indicating more
efficient use
of WAL buffers and reduced disk I/O
Following are some of the details of the implementation:
1. Primary does not wait for flush before starting to send data, so it is
likely to
send smaller chunks of data. To prevent network overload, changes are made
to
avoid sending excessively small packets.
2. The sender includes the current flush pointer in the replication
protocol
messages, so the standby knows up to which point WAL has been safely
flushed
on the primary.
3. The logic ensures that standbys do not apply transactions that have not
been flushed on the primary, by updating the flushedUpto position on the
standby
only up to the flushPtr received from the primary.
4. WAL records received from the primary are written and can be flushed to
disk on the
standby, but are only marked as flushed up to the flushPtr reported by the
primary.
Benchmark details are as follows:
Synchronous replication with remote write enabled.
Two Azure VMs: Central India (primary), Central US (standby).
OS: Ubuntu 24.04, VM size D4s (4 vCPUs, 16 GiB RAM).
With patch
TPS : 115
WAL read from disk by wal sender : ~40MB (read bytes from pg_stat_io)
WAL generated during the test: 772705760 bytes.
Without the patch
TPS: 102
WAL read from disk by wal sender : ~79MB (read bytes from pg_stat_io)
WAL generated during the test : 760060792 bytes
Commit hash: b1187266e0
pgbench -c 32 -j 4 postgres -T 300 -f wal_test.sql
wal_test.sql (each transaction generates ~36KB of WAL):
\set delta random(1, 500)
BEGIN;
INSERT INTO wal_bloat_:delta (data)
SELECT repeat('x', 8000)
FROM generate_series(1, 80);
TODO:
1. Ensure there is a robust mechanism on the receiver to prevent WAL
records
that are not flushed on primary from being applied on standby, under any
circumstances.
2. When smaller chunks of WAL are received on the standby, it can lead to
more
frequent disk write operations. To mitigate this issue, employing WAL
buffers
on the standby could be a more effective approach. Evaluate the performance
impact of using WAL buffers on the standby.
Similar idea was proposed here:
Proposal: Allow walsenders to send WAL directly from wal_buffers to replicas
<https://www.postgresql.org/message-id/flat/CALj2ACXCSM%2BsTR%3D5NNRtmSQr3g1Vnr-yR91azzkZCaCJ7u4d4w%40mail.gmail.com>
This idea is also discussed here recently :
https://www.postgresql.org/message-id/fa2e932eeff472250e2dbacb49d8c43ad282fea9.camel%40j-davis.com
Kindly let me know your thoughts.
Thank you,
Rahila Syed
From 54dab841e02a8c00f4d14c5955bacc6309082f52 Mon Sep 17 00:00:00 2001
From: Rahila Syed <[email protected]>
Date: Wed, 9 Jul 2025 15:35:20 +0530
Subject: [PATCH] Changes for sending of WAL records before flush
This patch adds all the wal sender side changes required
to send unflushed WAL records to standby.
In order to ensure that the unflushed transactions on primary
are not applied on standby, update the flushedUpto position
on standby only upto the flushPtr on master.
Co-authored by: Melih Mutlu <[email protected]>
Co-authored by: Rahila Syed <[email protected]>
---
src/backend/access/transam/xlog.c | 16 +++-
src/backend/access/transam/xloginsert.c | 4 +
src/backend/replication/walreceiver.c | 30 ++++--
src/backend/replication/walsender.c | 122 ++++++++++++++++++------
src/bin/pg_basebackup/receivelog.c | 1 +
src/include/access/xlog.h | 2 +
6 files changed, 136 insertions(+), 39 deletions(-)
diff --git a/src/backend/access/transam/xlog.c
b/src/backend/access/transam/xlog.c
index 0baf0ac6160..f203ac442cb 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -704,7 +704,6 @@ static void ReserveXLogInsertLocation(int size, XLogRecPtr
*StartPos,
XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
XLogRecPtr *PrevPtr);
-static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
static char *GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli);
static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
@@ -923,6 +922,9 @@ XLogInsertRecord(XLogRecData *rdata,
class ==
WALINSERT_SPECIAL_SWITCH, rdata,
StartPos, EndPos,
insertTLI);
+ if (StartPos - StartPos % XLOG_BLCKSZ + XLOG_BLCKSZ < EndPos)
+ WalSndWakeupRequest();
+
/*
* Unless record is flagged as not important, update LSN of last
* important record in the current slot. When holding all
locks, just
@@ -1503,7 +1505,7 @@ WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt)
* uninitialized page), and the inserter might need to evict an old WAL buffer
* to make room for a new one, which in turn requires WALWriteLock.
*/
-static XLogRecPtr
+XLogRecPtr
WaitXLogInsertionsToFinish(XLogRecPtr upto)
{
uint64 bytepos;
@@ -6522,6 +6524,16 @@ GetInsertRecPtr(void)
return recptr;
}
+XLogRecPtr
+GetLogInsertRecPtr(void)
+{
+ XLogRecPtr recptr;
+
+ recptr = pg_atomic_read_membarrier_u64(&XLogCtl->logInsertResult);
+
+ return recptr;
+}
+
/*
* GetFlushRecPtr -- Returns the current flush position, ie, the last WAL
* position known to be fsync'd to disk. This should only be used on a
diff --git a/src/backend/access/transam/xloginsert.c
b/src/backend/access/transam/xloginsert.c
index c7571429e8e..9c189104946 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -36,6 +36,7 @@
#include "miscadmin.h"
#include "pg_trace.h"
#include "replication/origin.h"
+#include "replication/walsender.h"
#include "storage/bufmgr.h"
#include "storage/proc.h"
#include "utils/memutils.h"
@@ -526,6 +527,9 @@ XLogInsert(RmgrId rmid, uint8 info)
XLogResetInsertion();
+ /* Wake up Walsender and let it know that we inserted new WAL */
+ WalSndWakeupProcessRequests(true, !RecoveryInProgress());
+
return EndPos;
}
diff --git a/src/backend/replication/walreceiver.c
b/src/backend/replication/walreceiver.c
index 7361ffc9dcf..335146745a4 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -110,6 +110,7 @@ static struct
{
XLogRecPtr Write; /* last byte + 1 written out in
the standby */
XLogRecPtr Flush; /* last byte + 1 flushed in the
standby */
+ XLogRecPtr SenderFlush; /* last byte + 1 flushed in the sender
*/
} LogstreamResult;
/*
@@ -137,7 +138,7 @@ static void WalRcvWaitForStartPosition(XLogRecPtr
*startpoint, TimeLineID *start
static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len,
TimeLineID
tli);
-static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
+static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
XLogRecPtr flushedupto,
TimeLineID tli);
static void XLogWalRcvFlush(bool dying, TimeLineID tli);
static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
@@ -821,6 +822,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size
len, TimeLineID tli)
int hdrlen;
XLogRecPtr dataStart;
XLogRecPtr walEnd;
+ XLogRecPtr flushedWal;
TimestampTz sendTime;
bool replyRequested;
@@ -830,7 +832,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size
len, TimeLineID tli)
{
StringInfoData incoming_message;
- hdrlen = sizeof(int64) + sizeof(int64) +
sizeof(int64);
+ hdrlen = sizeof(int64) + sizeof(int64) +
sizeof(int64) + sizeof(int64);
if (len < hdrlen)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -842,12 +844,13 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size
len, TimeLineID tli)
/* read the fields */
dataStart = pq_getmsgint64(&incoming_message);
walEnd = pq_getmsgint64(&incoming_message);
+ flushedWal = pq_getmsgint64(&incoming_message);
sendTime = pq_getmsgint64(&incoming_message);
ProcessWalSndrMessage(walEnd, sendTime);
buf += hdrlen;
len -= hdrlen;
- XLogWalRcvWrite(buf, len, dataStart, tli);
+ XLogWalRcvWrite(buf, len, dataStart,
flushedWal, tli);
break;
}
case PqReplMsg_Keepalive:
@@ -887,7 +890,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size
len, TimeLineID tli)
* Write XLOG data to disk.
*/
static void
-XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
+XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, XLogRecPtr
flushedupto, TimeLineID tli)
{
int startoff;
int byteswritten;
@@ -960,6 +963,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
TimeLineID tli)
buf += byteswritten;
LogstreamResult.Write = recptr;
+ LogstreamResult.SenderFlush = flushedupto;
}
/* Update shared-memory status */
@@ -986,20 +990,32 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
{
Assert(tli != 0);
- if (LogstreamResult.Flush < LogstreamResult.Write)
+ /*
+ * The wal records can be flushed on standby once the flushptr on
primary
+ * is greater than flushptr on standby. At a given point in time it may
be
+ * possible that some WAL records that have not been flushed to disk on
+ * primary may get flushed on standby but those WAL won't be applied on
+ * standby until they are flushed on primary.
+ */
+ if ((LogstreamResult.Flush < LogstreamResult.Write) &&
+ (LogstreamResult.Flush < LogstreamResult.SenderFlush))
{
WalRcvData *walrcv = WalRcv;
+ XLogRecPtr flush_ptr;
issue_xlog_fsync(recvFile, recvSegNo, tli);
LogstreamResult.Flush = LogstreamResult.Write;
+ flush_ptr = LogstreamResult.Flush > LogstreamResult.SenderFlush
? LogstreamResult.SenderFlush :
+ LogstreamResult.Flush;
+
/* Update shared-memory status */
SpinLockAcquire(&walrcv->mutex);
- if (walrcv->flushedUpto < LogstreamResult.Flush)
+ if (walrcv->flushedUpto < flush_ptr)
{
walrcv->latestChunkStart = walrcv->flushedUpto;
- walrcv->flushedUpto = LogstreamResult.Flush;
+ walrcv->flushedUpto = flush_ptr;
walrcv->receivedTLI = tli;
}
SpinLockRelease(&walrcv->mutex);
diff --git a/src/backend/replication/walsender.c
b/src/backend/replication/walsender.c
index 59822f22b8d..71800eeb70f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -3160,7 +3160,7 @@ WalSndSegmentOpen(XLogReaderState *state, XLogSegNo
nextSegNo,
/*
* Send out the WAL in its normal physical/stored form.
*
- * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
+ * Read up to MAX_SEND_SIZE bytes of WAL that's been written to WAL buffers,
* but not yet sent to the client, and buffer it in the libpq output
* buffer.
*
@@ -3174,9 +3174,12 @@ XLogSendPhysical(void)
XLogRecPtr startptr;
XLogRecPtr endptr;
Size nbytes;
+ Size nbytesUntilFlush;
+ Size nbytesAfterFlush;
XLogSegNo segno;
WALReadError errinfo;
Size rbytes;
+ XLogRecPtr flushPtr;
/* If requested switch the WAL sender to the stopping state. */
if (got_STOPPING)
@@ -3188,6 +3191,11 @@ XLogSendPhysical(void)
return;
}
+ if (am_cascading_walsender)
+ flushPtr = GetStandbyFlushRecPtr(NULL);
+ else
+ flushPtr = GetFlushRecPtr(NULL);
+
/* Figure out how far we can safely send the WAL. */
if (sendTimeLineIsHistoric)
{
@@ -3265,14 +3273,23 @@ XLogSendPhysical(void)
/*
* Streaming the current timeline on a primary.
*
- * Attempt to send all data that's already been written out and
- * fsync'd to disk. We cannot go further than what's been
written out
- * given the current implementation of WALRead(). And in any
case
- * it's unsafe to send WAL that is not securely down to disk on
the
- * primary: if the primary subsequently crashes and restarts,
standbys
- * must not have applied any WAL that got lost on the primary.
+ * Try to send all data that has already been sent to the WAL
buffers,
+ * even though it is unsafe to send WAL that hasn't been
securely
+ * written to disk on the primary. If the primary crashes and
+ * restarts, standbys must not apply any WAL that was lost on
the
+ * primary. To prevent this, even if we send and write WAL
records to
+ * disk on the standby before they are flushed on the primary,
we only
+ * apply them after they have been flushed on the primary.
*/
- SendRqstPtr = GetFlushRecPtr(NULL);
+ SendRqstPtr = GetLogInsertRecPtr();
+ if (sentPtr >= SendRqstPtr)
+ {
+ SendRqstPtr = WaitXLogInsertionsToFinish(sentPtr);
+ }
+ else
+ {
+ SendRqstPtr = WaitXLogInsertionsToFinish(SendRqstPtr);
+ }
}
/*
@@ -3375,6 +3392,26 @@ XLogSendPhysical(void)
nbytes = endptr - startptr;
Assert(nbytes <= MAX_SEND_SIZE);
+ /*
+ * Older WALs are more likely to be evicted from buffers and written to
+ * disk. For any WAL before latest flush position, we first try to read
+ * from WAL buffers and then from disk. WALs after the flush position
+ * cannot be found on disk, so we only try to read such WALs from
buffers.
+ */
+ nbytesUntilFlush = 0;
+ nbytesAfterFlush = 0;
+ if (flushPtr > endptr)
+ nbytesUntilFlush = endptr - startptr;
+ else if (flushPtr > startptr)
+ {
+ nbytesUntilFlush = flushPtr - startptr;
+ nbytesAfterFlush = endptr - flushPtr;
+ }
+ else
+ nbytesAfterFlush = endptr - startptr;
+
+ Assert(nbytes == (nbytesAfterFlush + nbytesUntilFlush));
+
/*
* OK to read and send the slice.
*/
@@ -3382,7 +3419,8 @@ XLogSendPhysical(void)
pq_sendbyte(&output_message, PqReplMsg_WALData);
pq_sendint64(&output_message, startptr); /* dataStart */
- pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
+ pq_sendint64(&output_message, endptr); /* walEnd */
+ pq_sendint64(&output_message, flushPtr); /* wal flushed upto */
pq_sendint64(&output_message, 0); /* sendtime, filled in last */
/*
@@ -3392,25 +3430,49 @@ XLogSendPhysical(void)
enlargeStringInfo(&output_message, nbytes);
retry:
- /* attempt to read WAL from WAL buffers first */
- rbytes = WALReadFromBuffers(&output_message.data[output_message.len],
- startptr,
nbytes, xlogreader->seg.ws_tli);
- output_message.len += rbytes;
- startptr += rbytes;
- nbytes -= rbytes;
-
- /* now read the remaining WAL from WAL file */
- if (nbytes > 0 &&
- !WALRead(xlogreader,
- &output_message.data[output_message.len],
- startptr,
- nbytes,
- xlogreader->seg.ws_tli, /* Pass the
current TLI because
-
* only WalSndSegmentOpen controls
-
* whether new TLI is needed. */
- &errinfo))
- WALReadRaiseError(&errinfo);
+ if (nbytesAfterFlush == 0)
+ {
+ /* attempt to read WAL from WAL buffers first */
+ rbytes =
WALReadFromBuffers(&output_message.data[output_message.len],
+
startptr, nbytesUntilFlush, xlogreader->seg.ws_tli);
+ output_message.len += rbytes;
+ startptr += rbytes;
+ nbytes -= rbytes;
+ nbytesUntilFlush -= rbytes;
+ }
+ if (nbytesUntilFlush > 0)
+ {
+ if (!WALRead(xlogreader,
+
&output_message.data[output_message.len],
+ startptr,
+ nbytesUntilFlush,
+ xlogreader->seg.ws_tli, /* Pass
the current TLI
+
* because only
+
* WalSndSegmentOpen controls
+
* whether new TLI is needed. */
+ &errinfo))
+ WALReadRaiseError(&errinfo);
+ output_message.len += nbytesUntilFlush;
+ startptr += nbytesUntilFlush;
+ nbytes -= nbytesUntilFlush;
+ }
+ /*
+ * Any WAL further than the latest flush position cannot be found on
disk,
+ * so we try to read such WALs from buffers.
+ */
+ if (nbytesAfterFlush > 0)
+ {
+ /* attempt to read WAL from WAL buffers for the rest */
+ rbytes =
WALReadFromBuffers(&output_message.data[output_message.len],
+
startptr, nbytesAfterFlush, xlogreader->seg.ws_tli);
+ output_message.len += rbytes;
+ startptr += rbytes;
+ nbytesAfterFlush -= rbytes;
+ }
+ endptr -= nbytesAfterFlush;
+
+ output_message.data[output_message.len] = '\0';
/* See logical_read_xlog_page(). */
XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
CheckXLogRemoved(segno, xlogreader->seg.ws_tli);
@@ -3439,15 +3501,13 @@ retry:
}
}
- output_message.len += nbytes;
- output_message.data[output_message.len] = '\0';
/*
* Fill the send timestamp last, so that it is taken as late as
possible.
*/
resetStringInfo(&tmpbuf);
pq_sendint64(&tmpbuf, GetCurrentTimestamp());
- memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
+ memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64) +
sizeof(int64)],
tmpbuf.data, sizeof(int64));
pq_putmessage_noblock(PqMsg_CopyData, output_message.data,
output_message.len);
@@ -4194,7 +4254,9 @@ WalSndKeepaliveIfNecessary(void)
/* Try to flush pending output to the client */
if (pq_flush_if_writable() != 0)
+ {
WalSndShutdown();
+ }
}
}
diff --git a/src/bin/pg_basebackup/receivelog.c
b/src/bin/pg_basebackup/receivelog.c
index 25b13c7f55c..572b0a3b19f 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -1068,6 +1068,7 @@ ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char
*copybuf, int len,
hdr_len = 1; /* msgtype PqReplMsg_WALData */
hdr_len += 8; /* dataStart */
hdr_len += 8; /* walEnd */
+ hdr_len += 8; /* flushPtr */
hdr_len += 8; /* sendTime */
if (len < hdr_len)
{
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index d12798be3d8..4033e62bb93 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -249,6 +249,7 @@ extern void UpdateFullPageWrites(void);
extern void GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool
*doPageWrites_p);
extern XLogRecPtr GetRedoRecPtr(void);
extern XLogRecPtr GetInsertRecPtr(void);
+extern XLogRecPtr GetLogInsertRecPtr(void);
extern XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI);
extern TimeLineID GetWALInsertionTimeLine(void);
extern TimeLineID GetWALInsertionTimeLineIfSet(void);
@@ -297,6 +298,7 @@ extern void do_pg_backup_stop(BackupState *state, bool
waitforarchive);
extern void do_pg_abort_backup(int code, Datum arg);
extern void register_persistent_abort_backup_handler(void);
extern SessionBackupState get_backup_status(void);
+extern XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
/* File path names (all relative to $PGDATA) */
#define RECOVERY_SIGNAL_FILE "recovery.signal"
--
2.34.1