Included 蔡梦娟 and Jakub Wartak because they've expressed interest on this topic -- notably [2] ("Bug on update timing of walrcv->flushedUpto variable").
As mentioned in the course of thread [1], we're missing a fix for streaming replication to avoid sending records that the primary hasn't fully flushed yet. This patch is a first attempt at fixing that problem by retreating the LSN reported as FlushPtr whenever a segment is registered, based on the understanding that if no registration exists then the LogwrtResult.Flush pointer can be taken at face value; but if a registration exists, then we have to stream only till the start LSN of that registered entry. This patch is probably incomplete. First, I'm not sure that logical replication is affected by this problem. I think it isn't, because logical replication will halt until the record can be read completely -- maybe I'm wrong and there is a way for things to go wrong with logical replication as well. But also, I need to look at the other uses of GetFlushRecPtr() and see if those need to change to the new function too or they can remain what they are now. I'd also like to have tests. That seems moderately hard, but if we had WAL-molasses that could be used in walreceiver, it could be done. (It sounds easier to write tests with a molasses-archive_command.) [1] https://postgr.es/m/cbddfa01-6e40-46bb-9f98-9340f4379...@amazon.com [2] https://postgr.es/m/3f9c466d-d143-472c-a961-66406172af96.mengjuan....@alibaba-inc.com -- Álvaro Herrera 39°49'30"S 73°17'W — https://www.EnterpriseDB.com/
>From 2c5d347e9b6819ba59fa0e72e4227a2cb4d1230f Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Mon, 23 Aug 2021 18:25:48 -0400 Subject: [PATCH] Don't stream non-final WAL segments Avoid setting the physical-stream replication read pointer in the middle of a WAL record. This can occur if a record is split in two (or more) across segment boundaries. The reason to avoid it is that if we stream the segment containing the first half, and then we crash before writing the next segment, the primary will rewrite the tail of the segment with a new WAL record (having discarded the incomplete record), but the replica will be stuck trying to replay a broken file (since the next segment will never contain the now-gone data). To do this, change streaming replication to retreat the flush pointer according to registered segment boundaries. --- src/backend/access/transam/xlog.c | 39 ++++++++++++++++++++++++++--- src/backend/replication/walsender.c | 2 +- src/include/access/xlog.h | 1 + 3 files changed, 37 insertions(+), 5 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 24165ab03e..2b85d1b2ae 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -731,8 +731,10 @@ typedef struct XLogCtlData */ XLogSegNo lastNotifiedSeg; XLogSegNo earliestSegBoundary; + XLogRecPtr earliestSegBoundaryStartPtr; XLogRecPtr earliestSegBoundaryEndPtr; XLogSegNo latestSegBoundary; + XLogRecPtr latestSegBoundaryStartPtr; XLogRecPtr latestSegBoundaryEndPtr; slock_t segtrack_lck; /* locks shared variables shown above */ @@ -932,7 +934,7 @@ static void RemoveXlogFile(const char *segname, XLogSegNo recycleSegNo, XLogSegNo *endlogSegNo); static void UpdateLastRemovedPtr(char *filename); static void ValidateXLOGDirectoryStructure(void); -static void RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr pos); +static void RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr startpos, XLogRecPtr endpos); static void CleanupBackupHistory(void); static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force); static XLogRecord *ReadRecord(XLogReaderState *xlogreader, @@ -1183,11 +1185,11 @@ XLogInsertRecord(XLogRecData *rdata, * * Note that we did not use XLByteToPrevSeg() for determining the * ending segment. This is so that a record that fits perfectly into - * the end of the segment causes the latter to get marked ready for + * the end of the segment causes said segment to get marked ready for * archival immediately. */ if (StartSeg != EndSeg && XLogArchivingActive()) - RegisterSegmentBoundary(EndSeg, EndPos); + RegisterSegmentBoundary(EndSeg, StartPos, EndPos); /* * Advance LogwrtRqst.Write so that it includes new block(s). @@ -4398,7 +4400,7 @@ ValidateXLOGDirectoryStructure(void) * to delay until the end segment is known flushed. */ static void -RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos) +RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr startpos, XLogRecPtr endpos) { XLogSegNo segno PG_USED_FOR_ASSERTS_ONLY; @@ -4415,6 +4417,7 @@ RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos) if (XLogCtl->earliestSegBoundary == MaxXLogSegNo) { XLogCtl->earliestSegBoundary = seg; + XLogCtl->earliestSegBoundaryStartPtr = startpos; XLogCtl->earliestSegBoundaryEndPtr = endpos; } else if (seg > XLogCtl->earliestSegBoundary && @@ -4422,6 +4425,7 @@ RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos) seg > XLogCtl->latestSegBoundary)) { XLogCtl->latestSegBoundary = seg; + XLogCtl->latestSegBoundaryStartPtr = startpos; XLogCtl->latestSegBoundaryEndPtr = endpos; } @@ -4481,15 +4485,18 @@ NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr) if (keep_latest) { XLogCtl->earliestSegBoundary = XLogCtl->latestSegBoundary; + XLogCtl->earliestSegBoundaryStartPtr = XLogCtl->latestSegBoundaryStartPtr; XLogCtl->earliestSegBoundaryEndPtr = XLogCtl->latestSegBoundaryEndPtr; } else { XLogCtl->earliestSegBoundary = MaxXLogSegNo; + XLogCtl->earliestSegBoundaryStartPtr = InvalidXLogRecPtr; XLogCtl->earliestSegBoundaryEndPtr = InvalidXLogRecPtr; } XLogCtl->latestSegBoundary = MaxXLogSegNo; + XLogCtl->latestSegBoundaryStartPtr = InvalidXLogRecPtr; XLogCtl->latestSegBoundaryEndPtr = InvalidXLogRecPtr; SpinLockRelease(&XLogCtl->segtrack_lck); @@ -8776,6 +8783,30 @@ GetFlushRecPtr(void) return LogwrtResult.Flush; } +/* + * GetSafeFlushRecPtr -- Returns a "safe" flush position. + * + * Similar to the above, except that avoid reporting a location that might be + * overwritten if there's a crash before syncing the next segment. + */ +XLogRecPtr +GetSafeFlushRecPtr(void) +{ + XLogRecPtr flush; + + SpinLockAcquire(&XLogCtl->info_lck); + flush = XLogCtl->LogwrtResult.Flush; + SpinLockRelease(&XLogCtl->info_lck); + + SpinLockAcquire(&XLogCtl->segtrack_lck); + if (XLogCtl->earliestSegBoundary != MaxXLogSegNo && + XLogCtl->earliestSegBoundaryStartPtr < flush) + flush = XLogCtl->earliestSegBoundaryStartPtr; + SpinLockRelease(&XLogCtl->segtrack_lck); + + return flush; +} + /* * GetLastImportantRecPtr -- Returns the LSN of the last important record * inserted. All records not explicitly marked as unimportant are considered diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3ca2a11389..4c98fecdce 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2650,7 +2650,7 @@ XLogSendPhysical(void) * primary: if the primary subsequently crashes and restarts, standbys * must not have applied any WAL that got lost on the primary. */ - SendRqstPtr = GetFlushRecPtr(); + SendRqstPtr = GetSafeFlushRecPtr(); } /* diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 6b6ae81c2d..1af59c36d4 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -313,6 +313,7 @@ extern void GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool *doPageWrites_p) extern XLogRecPtr GetRedoRecPtr(void); extern XLogRecPtr GetInsertRecPtr(void); extern XLogRecPtr GetFlushRecPtr(void); +extern XLogRecPtr GetSafeFlushRecPtr(void); extern XLogRecPtr GetLastImportantRecPtr(void); extern void RemovePromoteSignalFiles(void); extern void NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr); -- 2.30.2