On 2021/09/02 10:16, Kyotaro Horiguchi wrote:
Ok, I agree that the reader-side needs an amendment.
Thanks for the review! Attached is the updated version of the patch.
Based on my latest patch, I changed the startup process so that
it creates an archive notification file of the streamed WAL segment
including XLOG_SWITCH record if the notification file has not been created yet.
Regards,
--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
diff --git a/src/backend/access/transam/xlog.c
b/src/backend/access/transam/xlog.c
index 24165ab03e..6c407045dd 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7576,6 +7576,34 @@ StartupXLOG(void)
/* Handle interrupt signals of startup process
*/
HandleStartupProcInterrupts();
+ /*
+ * Create an archive notification file of a
streamed WAL
+ * segment if it includes an XLOG_SWITCH record
and its
+ * notification file has not been created yet.
This is
+ * necessary to handle the corner case that
walreceiver may
+ * fail to create such notification file if it
exits after it
+ * receives XLOG_SWITCH record but while it's
receiving the
+ * remaining bytes in the segment. Without this
handling, WAL
+ * archiving of the segment will be delayed
until subsequent
+ * checkpoint creates its notification file
when removing it
+ * even though it can be archived soon.
+ */
+ if (readSource == XLOG_FROM_STREAM &&
+ record->xl_rmid == RM_XLOG_ID &&
+ (record->xl_info & ~XLR_INFO_MASK) ==
XLOG_SWITCH)
+ {
+ char
xlogfilename[MAXFNAMELEN];
+
+ XLogFileName(xlogfilename, curFileTLI,
readSegNo, wal_segment_size);
+ if
(!XLogArchiveIsReadyOrDone(xlogfilename))
+ {
+ if (XLogArchivingAlways())
+
XLogArchiveNotify(xlogfilename, true);
+ else
+
XLogArchiveForceDone(xlogfilename);
+ }
+ }
+
/*
* Pause WAL replay, if requested by a
hot-standby session via
* SetRecoveryPause().
diff --git a/src/backend/replication/walreceiver.c
b/src/backend/replication/walreceiver.c
index 60de3be92c..5b07eef3aa 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -125,6 +125,7 @@ static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(bool dying);
+static void XLogWalRcvClose(XLogRecPtr recptr);
static void XLogWalRcvSendReply(bool force, bool requestReply);
static void XLogWalRcvSendHSFeedback(bool immed);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
@@ -883,42 +884,11 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
{
int segbytes;
- if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo,
wal_segment_size))
+ /* Close the current segment if it's completed */
+ XLogWalRcvClose(recptr);
+
+ if (recvFile < 0)
{
- /*
- * fsync() and close current file before we switch to
next one. We
- * would otherwise have to reopen this file to fsync it
later
- */
- if (recvFile >= 0)
- {
- char xlogfname[MAXFNAMELEN];
-
- XLogWalRcvFlush(false);
-
- XLogFileName(xlogfname, recvFileTLI, recvSegNo,
wal_segment_size);
-
- /*
- * XLOG segment files will be re-read by
recovery in startup
- * process soon, so we don't advise the OS to
release cache
- * pages associated with the file like
XLogFileClose() does.
- */
- if (close(recvFile) != 0)
- ereport(PANIC,
-
(errcode_for_file_access(),
- errmsg("could not
close log segment %s: %m",
-
xlogfname)));
-
- /*
- * Create .done file forcibly to prevent the
streamed segment
- * from being archived later.
- */
- if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
- XLogArchiveForceDone(xlogfname);
- else
- XLogArchiveNotify(xlogfname, true);
- }
- recvFile = -1;
-
/* Create/use new log file */
XLByteToSeg(recptr, recvSegNo, wal_segment_size);
recvFile = XLogFileInit(recvSegNo);
@@ -967,6 +937,14 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
/* Update shared-memory status */
pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
+
+ /*
+ * Close the current segment if it's fully written up in the last cycle
of
+ * the loop, to create its archive notification file soon. Otherwise WAL
+ * archiving of the segment will be delayed until any data in the next
+ * segment is received and written.
+ */
+ XLogWalRcvClose(recptr);
}
/*
@@ -1020,6 +998,53 @@ XLogWalRcvFlush(bool dying)
}
}
+/*
+ * Close the current segment if it's completed.
+ *
+ * Flush the segment to disk before closing it. Otherwise we have to
+ * reopen and fsync it later.
+ *
+ * Create an archive notification file since the segment is known completed.
+ */
+static void
+XLogWalRcvClose(XLogRecPtr recptr)
+{
+ if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
+ {
+ char xlogfname[MAXFNAMELEN];
+
+ /*
+ * fsync() and close current file before we switch to next one.
We
+ * would otherwise have to reopen this file to fsync it later
+ */
+ XLogWalRcvFlush(false);
+
+ XLogFileName(xlogfname, recvFileTLI, recvSegNo,
wal_segment_size);
+
+ /*
+ * XLOG segment files will be re-read by recovery in startup
process
+ * soon, so we don't advise the OS to release cache pages
associated
+ * with the file like XLogFileClose() does.
+ */
+ if (close(recvFile) != 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not close log segment
%s: %m",
+ xlogfname)));
+
+ /*
+ * Create .done file forcibly to prevent the streamed segment
from
+ * being archived later.
+ */
+ if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
+ XLogArchiveForceDone(xlogfname);
+ else
+ XLogArchiveNotify(xlogfname, true);
+
+ recvFile = -1;
+ }
+}
+
/*
* Send reply message to primary, indicating our current WAL locations, oldest
* xmin and the current time.