On 2021/09/02 10:16, Kyotaro Horiguchi wrote:
Ok, I agree that the reader-side needs an amendment.

Thanks for the review! Attached is the updated version of the patch.
Based on my latest patch, I changed the startup process so that
it creates an archive notification file of the streamed WAL segment
including XLOG_SWITCH record if the notification file has not been created yet.

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index 24165ab03e..6c407045dd 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7576,6 +7576,34 @@ StartupXLOG(void)
                                /* Handle interrupt signals of startup process 
*/
                                HandleStartupProcInterrupts();
 
+                               /*
+                                * Create an archive notification file of a 
streamed WAL
+                                * segment if it includes an XLOG_SWITCH record 
and its
+                                * notification file has not been created yet. 
This is
+                                * necessary to handle the corner case that 
walreceiver may
+                                * fail to create such notification file if it 
exits after it
+                                * receives XLOG_SWITCH record but while it's 
receiving the
+                                * remaining bytes in the segment. Without this 
handling, WAL
+                                * archiving of the segment will be delayed 
until subsequent
+                                * checkpoint creates its notification file 
when removing it
+                                * even though it can be archived soon.
+                                */
+                               if (readSource == XLOG_FROM_STREAM &&
+                                       record->xl_rmid == RM_XLOG_ID &&
+                                       (record->xl_info & ~XLR_INFO_MASK) == 
XLOG_SWITCH)
+                               {
+                                       char            
xlogfilename[MAXFNAMELEN];
+
+                                       XLogFileName(xlogfilename, curFileTLI, 
readSegNo, wal_segment_size);
+                                       if 
(!XLogArchiveIsReadyOrDone(xlogfilename))
+                                       {
+                                               if (XLogArchivingAlways())
+                                                       
XLogArchiveNotify(xlogfilename, true);
+                                               else
+                                                       
XLogArchiveForceDone(xlogfilename);
+                                       }
+                               }
+
                                /*
                                 * Pause WAL replay, if requested by a 
hot-standby session via
                                 * SetRecoveryPause().
diff --git a/src/backend/replication/walreceiver.c 
b/src/backend/replication/walreceiver.c
index 60de3be92c..5b07eef3aa 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -125,6 +125,7 @@ static void WalRcvDie(int code, Datum arg);
 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(bool dying);
+static void XLogWalRcvClose(XLogRecPtr recptr);
 static void XLogWalRcvSendReply(bool force, bool requestReply);
 static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
@@ -883,42 +884,11 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
        {
                int                     segbytes;
 
-               if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, 
wal_segment_size))
+               /* Close the current segment if it's completed */
+               XLogWalRcvClose(recptr);
+
+               if (recvFile < 0)
                {
-                       /*
-                        * fsync() and close current file before we switch to 
next one. We
-                        * would otherwise have to reopen this file to fsync it 
later
-                        */
-                       if (recvFile >= 0)
-                       {
-                               char            xlogfname[MAXFNAMELEN];
-
-                               XLogWalRcvFlush(false);
-
-                               XLogFileName(xlogfname, recvFileTLI, recvSegNo, 
wal_segment_size);
-
-                               /*
-                                * XLOG segment files will be re-read by 
recovery in startup
-                                * process soon, so we don't advise the OS to 
release cache
-                                * pages associated with the file like 
XLogFileClose() does.
-                                */
-                               if (close(recvFile) != 0)
-                                       ereport(PANIC,
-                                                       
(errcode_for_file_access(),
-                                                        errmsg("could not 
close log segment %s: %m",
-                                                                       
xlogfname)));
-
-                               /*
-                                * Create .done file forcibly to prevent the 
streamed segment
-                                * from being archived later.
-                                */
-                               if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
-                                       XLogArchiveForceDone(xlogfname);
-                               else
-                                       XLogArchiveNotify(xlogfname, true);
-                       }
-                       recvFile = -1;
-
                        /* Create/use new log file */
                        XLByteToSeg(recptr, recvSegNo, wal_segment_size);
                        recvFile = XLogFileInit(recvSegNo);
@@ -967,6 +937,14 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 
        /* Update shared-memory status */
        pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
+
+       /*
+        * Close the current segment if it's fully written up in the last cycle 
of
+        * the loop, to create its archive notification file soon. Otherwise WAL
+        * archiving of the segment will be delayed until any data in the next
+        * segment is received and written.
+        */
+       XLogWalRcvClose(recptr);
 }
 
 /*
@@ -1020,6 +998,53 @@ XLogWalRcvFlush(bool dying)
        }
 }
 
+/*
+ * Close the current segment if it's completed.
+ *
+ * Flush the segment to disk before closing it. Otherwise we have to
+ * reopen and fsync it later.
+ *
+ * Create an archive notification file since the segment is known completed.
+ */
+static void
+XLogWalRcvClose(XLogRecPtr recptr)
+{
+       if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
+       {
+               char            xlogfname[MAXFNAMELEN];
+
+               /*
+                * fsync() and close current file before we switch to next one. 
We
+                * would otherwise have to reopen this file to fsync it later
+                */
+               XLogWalRcvFlush(false);
+
+               XLogFileName(xlogfname, recvFileTLI, recvSegNo, 
wal_segment_size);
+
+               /*
+                * XLOG segment files will be re-read by recovery in startup 
process
+                * soon, so we don't advise the OS to release cache pages 
associated
+                * with the file like XLogFileClose() does.
+                */
+               if (close(recvFile) != 0)
+                       ereport(PANIC,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not close log segment 
%s: %m",
+                                                       xlogfname)));
+
+               /*
+                * Create .done file forcibly to prevent the streamed segment 
from
+                * being archived later.
+                */
+               if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
+                       XLogArchiveForceDone(xlogfname);
+               else
+                       XLogArchiveNotify(xlogfname, true);
+
+               recvFile = -1;
+       }
+}
+
 /*
  * Send reply message to primary, indicating our current WAL locations, oldest
  * xmin and the current time.

Reply via email to