On 2019-Nov-22, Antonin Houska wrote:

> As I pointed out in
> 
> https://www.postgresql.org/message-id/88183.1574261429%40antos
> 
> seg.ws_off only replaced readOff in XLogReaderState. So we should only update
> ws_off where readOff was updated before commit 709d003. This does happen in
> ReadPageInternal (see HEAD) and I see no reason for the final patch to update
> ws_off anywhere else.

Oh you're right.

I see no reason to leave ws_off.  We can move that to XLogReaderState; I
did that here.  We also need the offset in WALReadError, though, so I
added it there too.  Conceptually it seems clearer to me this way.

What do you think of the attached?

BTW I'm not clear what errors can pread()/pg_pread() report that do not
set errno.  I think lines 1083/1084 of WALRead are spurious now.

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From 41d9a060db397aacb2b3f122204c0b4fbb51a109 Mon Sep 17 00:00:00 2001
From: Antonin Houska <a...@cybertec.at>
Date: Tue, 12 Nov 2019 11:51:14 +0100
Subject: [PATCH v12] Use only xlogreader.c:WALRead()

---
 src/backend/access/transam/xlogreader.c | 104 ++++++++-
 src/backend/access/transam/xlogutils.c  | 207 ++++++----------
 src/backend/replication/walsender.c     | 298 ++++++++++--------------
 src/bin/pg_waldump/pg_waldump.c         | 164 ++++---------
 src/include/access/xlogreader.h         |  39 +++-
 src/include/access/xlogutils.h          |   2 +
 6 files changed, 383 insertions(+), 431 deletions(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 7f24f0cb95..3692c46f43 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -17,6 +17,8 @@
  */
 #include "postgres.h"
 
+#include <unistd.h>
+
 #include "access/transam.h"
 #include "access/xlog_internal.h"
 #include "access/xlogreader.h"
@@ -27,6 +29,7 @@
 
 #ifndef FRONTEND
 #include "miscadmin.h"
+#include "pgstat.h"
 #include "utils/memutils.h"
 #endif
 
@@ -208,7 +211,6 @@ WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 {
 	seg->ws_file = -1;
 	seg->ws_segno = 0;
-	seg->ws_off = 0;
 	seg->ws_tli = 0;
 
 	segcxt->ws_segsize = segsize;
@@ -295,8 +297,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	 * byte to cover the whole record header, or at least the part of it that
 	 * fits on the same page.
 	 */
-	readOff = ReadPageInternal(state,
-							   targetPagePtr,
+	readOff = ReadPageInternal(state, targetPagePtr,
 							   Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
 	if (readOff < 0)
 		goto err;
@@ -556,7 +557,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 
 	/* check whether we have all the requested data already */
 	if (targetSegNo == state->seg.ws_segno &&
-		targetPageOff == state->seg.ws_off && reqLen <= state->readLen)
+		targetPageOff == state->segoff && reqLen <= state->readLen)
 		return state->readLen;
 
 	/*
@@ -627,7 +628,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 
 	/* update read state information */
 	state->seg.ws_segno = targetSegNo;
-	state->seg.ws_off = targetPageOff;
+	state->segoff = targetPageOff;
 	state->readLen = readLen;
 
 	return readLen;
@@ -644,7 +645,7 @@ static void
 XLogReaderInvalReadState(XLogReaderState *state)
 {
 	state->seg.ws_segno = 0;
-	state->seg.ws_off = 0;
+	state->segoff = 0;
 	state->readLen = 0;
 }
 
@@ -1015,6 +1016,97 @@ out:
 
 #endif							/* FRONTEND */
 
+/*
+ * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
+ * fetched from timeline 'tli'.
+ *
+ * 'seg/segcxt' identify the last segment used.  'openSegment' is a callback
+ * to open the next segment, if necessary.
+ *
+ * Returns true if succeeded, false if an error occurs, in which case
+ * 'errinfo' receives error details.
+ *
+ * XXX probably this should be improved to suck data directly from the
+ * WAL buffers when possible.
+ */
+bool
+WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
+		WALOpenSegment *seg, WALSegmentContext *segcxt,
+		WALSegmentOpen openSegment, WALReadError *errinfo)
+{
+	char	   *p;
+	XLogRecPtr	recptr;
+	Size		nbytes;
+
+	p = buf;
+	recptr = startptr;
+	nbytes = count;
+
+	while (nbytes > 0)
+	{
+		uint32		startoff;
+		int			segbytes;
+		int			readbytes;
+
+		startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
+
+		if (seg->ws_file < 0 ||
+			!XLByteInSeg(recptr, seg->ws_segno, segcxt->ws_segsize) ||
+			tli != seg->ws_tli)
+		{
+			XLogSegNo	nextSegNo;
+
+			/* Switch to another logfile segment */
+			if (seg->ws_file >= 0)
+				close(seg->ws_file);
+
+			XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize);
+
+			/* Open the next segment in the caller's way. */
+			seg->ws_file = openSegment(nextSegNo, segcxt, &tli);
+
+			/* Update the current segment info. */
+			seg->ws_tli = tli;
+			seg->ws_segno = nextSegNo;
+		}
+
+		/* How many bytes are within this segment? */
+		if (nbytes > (segcxt->ws_segsize - startoff))
+			segbytes = segcxt->ws_segsize - startoff;
+		else
+			segbytes = nbytes;
+
+#ifndef FRONTEND
+		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
+#endif
+
+		/* Reset errno first; eases reporting non-errno-affecting errors */
+		errno = 0;
+		readbytes = pg_pread(seg->ws_file, p, segbytes, (off_t) startoff);
+
+#ifndef FRONTEND
+		pgstat_report_wait_end();
+#endif
+
+		if (readbytes <= 0)
+		{
+			errinfo->wre_errno = errno;
+			errinfo->wre_req = segbytes;
+			errinfo->wre_read = readbytes;
+			errinfo->wre_off = startoff;
+			errinfo->wre_seg = *seg;
+			return false;
+		}
+
+		/* Update state for read */
+		recptr += readbytes;
+		nbytes -= readbytes;
+		p += readbytes;
+	}
+
+	return true;
+}
+
 /* ----------------------------------------
  * Functions for decoding the data and block references in a record.
  * ----------------------------------------
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 5f1e5ba75d..9836b6446a 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -639,128 +639,6 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
 	forget_invalid_pages(rnode, forkNum, nblocks);
 }
 
-/*
- * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
- * in timeline 'tli'.
- *
- * Will open, and keep open, one WAL segment stored in the static file
- * descriptor 'sendFile'. This means if XLogRead is used once, there will
- * always be one descriptor left open until the process ends, but never
- * more than one.
- *
- * XXX This is very similar to pg_waldump's XLogDumpXLogRead and to XLogRead
- * in walsender.c but for small differences (such as lack of elog() in
- * frontend).  Probably these should be merged at some point.
- */
-static void
-XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
-		 Size count)
-{
-	char	   *p;
-	XLogRecPtr	recptr;
-	Size		nbytes;
-
-	/* state maintained across calls */
-	static int	sendFile = -1;
-	static XLogSegNo sendSegNo = 0;
-	static TimeLineID sendTLI = 0;
-	static uint32 sendOff = 0;
-
-	Assert(segsize == wal_segment_size);
-
-	p = buf;
-	recptr = startptr;
-	nbytes = count;
-
-	while (nbytes > 0)
-	{
-		uint32		startoff;
-		int			segbytes;
-		int			readbytes;
-
-		startoff = XLogSegmentOffset(recptr, segsize);
-
-		/* Do we need to switch to a different xlog segment? */
-		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, segsize) ||
-			sendTLI != tli)
-		{
-			char		path[MAXPGPATH];
-
-			if (sendFile >= 0)
-				close(sendFile);
-
-			XLByteToSeg(recptr, sendSegNo, segsize);
-
-			XLogFilePath(path, tli, sendSegNo, segsize);
-
-			sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-
-			if (sendFile < 0)
-			{
-				if (errno == ENOENT)
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("requested WAL segment %s has already been removed",
-									path)));
-				else
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("could not open file \"%s\": %m",
-									path)));
-			}
-			sendOff = 0;
-			sendTLI = tli;
-		}
-
-		/* Need to seek in the file? */
-		if (sendOff != startoff)
-		{
-			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
-			{
-				char		path[MAXPGPATH];
-				int			save_errno = errno;
-
-				XLogFilePath(path, tli, sendSegNo, segsize);
-				errno = save_errno;
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not seek in log segment %s to offset %u: %m",
-								path, startoff)));
-			}
-			sendOff = startoff;
-		}
-
-		/* How many bytes are within this segment? */
-		if (nbytes > (segsize - startoff))
-			segbytes = segsize - startoff;
-		else
-			segbytes = nbytes;
-
-		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
-		readbytes = read(sendFile, p, segbytes);
-		pgstat_report_wait_end();
-		if (readbytes <= 0)
-		{
-			char		path[MAXPGPATH];
-			int			save_errno = errno;
-
-			XLogFilePath(path, tli, sendSegNo, segsize);
-			errno = save_errno;
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read from log segment %s, offset %u, length %lu: %m",
-							path, sendOff, (unsigned long) segbytes)));
-		}
-
-		/* Update state for read */
-		recptr += readbytes;
-
-		sendOff += readbytes;
-		nbytes -= readbytes;
-		p += readbytes;
-	}
-}
-
 /*
  * Determine which timeline to read an xlog page from and set the
  * XLogReaderState's currTLI to that timeline ID.
@@ -802,8 +680,8 @@ XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
 void
 XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
 {
-	const XLogRecPtr lastReadPage = state->seg.ws_segno *
-	state->segcxt.ws_segsize + state->seg.ws_off;
+	const XLogRecPtr lastReadPage = (state->seg.ws_segno *
+									 state->segcxt.ws_segsize + state->segoff);
 
 	Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
 	Assert(wantLength <= XLOG_BLCKSZ);
@@ -896,6 +774,36 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
 	}
 }
 
+/*
+ * Callback for WALRead() to open the next segment.
+ */
+static int
+wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+				 TimeLineID *tli_p)
+{
+	TimeLineID	tli = *tli_p;
+	char		path[MAXPGPATH];
+	int			fd;
+
+	XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize);
+	fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+	if (fd >= 0)
+		return fd;
+
+	if (errno == ENOENT)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("requested WAL segment %s has already been removed",
+						path)));
+	else
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not open file \"%s\": %m",
+						path)));
+
+	return -1;					/* keep compiler quiet */
+}
+
 /*
  * read_page callback for reading local xlog files
  *
@@ -913,7 +821,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 {
 	XLogRecPtr	read_upto,
 				loc;
+	TimeLineID	tli;
 	int			count;
+	WALReadError errinfo;
 
 	loc = targetPagePtr + reqLen;
 
@@ -932,7 +842,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 			read_upto = GetFlushRecPtr();
 		else
 			read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
-		state->seg.ws_tli = ThisTimeLineID;
+		tli = ThisTimeLineID;
 
 		/*
 		 * Check which timeline to get the record from.
@@ -982,14 +892,14 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 			read_upto = state->currTLIValidUntil;
 
 			/*
-			 * Setting ws_tli to our wanted record's TLI is slightly wrong;
-			 * the page might begin on an older timeline if it contains a
-			 * timeline switch, since its xlog segment will have been copied
-			 * from the prior timeline. This is pretty harmless though, as
-			 * nothing cares so long as the timeline doesn't go backwards.  We
-			 * should read the page header instead; FIXME someday.
+			 * Setting tli to our wanted record's TLI is slightly wrong; the
+			 * page might begin on an older timeline if it contains a timeline
+			 * switch, since its xlog segment will have been copied from the
+			 * prior timeline. This is pretty harmless though, as nothing
+			 * cares so long as the timeline doesn't go backwards.  We should
+			 * read the page header instead; FIXME someday.
 			 */
-			state->seg.ws_tli = state->currTLI;
+			tli = state->currTLI;
 
 			/* No need to wait on a historical timeline */
 			break;
@@ -1020,9 +930,38 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	 * as 'count', read the whole page anyway. It's guaranteed to be
 	 * zero-padded up to the page boundary if it's incomplete.
 	 */
-	XLogRead(cur_page, state->segcxt.ws_segsize, state->seg.ws_tli, targetPagePtr,
-			 XLOG_BLCKSZ);
+	if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg,
+				 &state->segcxt, wal_segment_open, &errinfo))
+		WALReadRaiseError(&errinfo);
 
 	/* number of valid bytes in the buffer */
 	return count;
 }
+
+/*
+ * Backend-specific convenience code to handle read errors encountered by
+ * WALRead().
+ */
+void
+WALReadRaiseError(WALReadError *errinfo)
+{
+	WALOpenSegment *seg = &errinfo->wre_seg;
+	char	   *fname = XLogFileNameP(seg->ws_tli, seg->ws_segno);
+
+	if (errinfo->wre_read < 0)
+	{
+		errno = errinfo->wre_errno;
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not read from log segment %s, offset %u, length %zu: %m",
+						fname, errinfo->wre_off, (Size) errinfo->wre_req)));
+	}
+	else if (errinfo->wre_read == 0)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_DATA_CORRUPTED),
+				 errmsg("could not read from log segment %s, offset %u: read %d of %zu",
+						fname, errinfo->wre_off, errinfo->wre_read,
+						(Size) errinfo->wre_req)));
+	}
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index fa75872877..c124f59435 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -248,8 +248,9 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
+static int	WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+							  TimeLineID *tli_p);
 static void UpdateSpillStats(LogicalDecodingContext *ctx);
-static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count);
 
 
 /* Initialize walsender process before entering the main command loop */
@@ -767,6 +768,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 {
 	XLogRecPtr	flushptr;
 	int			count;
+	WALReadError errinfo;
+	XLogSegNo	segno;
 
 	XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
 	sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
@@ -787,7 +790,27 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 		count = flushptr - targetPagePtr;	/* part of the page available */
 
 	/* now actually read the data, we know it's there */
-	XLogRead(sendCxt, cur_page, targetPagePtr, XLOG_BLCKSZ);
+	if (!WALRead(cur_page,
+				 targetPagePtr,
+				 XLOG_BLCKSZ,
+				 sendSeg->ws_tli,	/* Pass the current TLI because only
+									 * WalSndSegmentOpen controls whether new
+									 * TLI is needed. */
+				 sendSeg,
+				 sendCxt,
+				 WalSndSegmentOpen,
+				 &errinfo))
+		WALReadRaiseError(&errinfo);
+
+	/*
+	 * After reading into the buffer, check that what we read was valid. We do
+	 * this after reading, because even though the segment was present when we
+	 * opened it, it might get recycled or removed while we read it. The
+	 * read() succeeds in that case, but the data we tried to read might
+	 * already have been overwritten with new WAL records.
+	 */
+	XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize);
+	CheckXLogRemoved(segno, sendSeg->ws_tli);
 
 	return count;
 }
@@ -2361,188 +2384,69 @@ WalSndKill(int code, Datum arg)
 }
 
 /*
- * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
- *
- * XXX probably this should be improved to suck data directly from the
- * WAL buffers when possible.
- *
- * Will open, and keep open, one WAL segment stored in the global file
- * descriptor sendFile. This means if XLogRead is used once, there will
- * always be one descriptor left open until the process ends, but never
- * more than one.
+ * Callback for XLogRead() to open the next segment.
  */
-static void
-XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count)
+int
+WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+				  TimeLineID *tli_p)
 {
-	char	   *p;
-	XLogRecPtr	recptr;
-	Size		nbytes;
-	XLogSegNo	segno;
+	char		path[MAXPGPATH];
+	int			fd;
 
-retry:
-	p = buf;
-	recptr = startptr;
-	nbytes = count;
-
-	while (nbytes > 0)
+	/*-------
+	 * When reading from a historic timeline, and there is a timeline switch
+	 * within this segment, read from the WAL segment belonging to the new
+	 * timeline.
+	 *
+	 * For example, imagine that this server is currently on timeline 5, and
+	 * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
+	 * 0/13002088. In pg_wal, we have these files:
+	 *
+	 * ...
+	 * 000000040000000000000012
+	 * 000000040000000000000013
+	 * 000000050000000000000013
+	 * 000000050000000000000014
+	 * ...
+	 *
+	 * In this situation, when requested to send the WAL from segment 0x13, on
+	 * timeline 4, we read the WAL from file 000000050000000000000013. Archive
+	 * recovery prefers files from newer timelines, so if the segment was
+	 * restored from the archive on this server, the file belonging to the old
+	 * timeline, 000000040000000000000013, might not exist. Their contents are
+	 * equal up to the switchpoint, because at a timeline switch, the used
+	 * portion of the old segment is copied to the new file.  -------
+	 */
+	*tli_p = sendTimeLine;
+	if (sendTimeLineIsHistoric)
 	{
-		uint32		startoff;
-		int			segbytes;
-		int			readbytes;
+		XLogSegNo	endSegNo;
 
-		startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
-
-		if (sendSeg->ws_file < 0 ||
-			!XLByteInSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize))
-		{
-			char		path[MAXPGPATH];
-
-			/* Switch to another logfile segment */
-			if (sendSeg->ws_file >= 0)
-				close(sendSeg->ws_file);
-
-			XLByteToSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize);
-
-			/*-------
-			 * When reading from a historic timeline, and there is a timeline
-			 * switch within this segment, read from the WAL segment belonging
-			 * to the new timeline.
-			 *
-			 * For example, imagine that this server is currently on timeline
-			 * 5, and we're streaming timeline 4. The switch from timeline 4
-			 * to 5 happened at 0/13002088. In pg_wal, we have these files:
-			 *
-			 * ...
-			 * 000000040000000000000012
-			 * 000000040000000000000013
-			 * 000000050000000000000013
-			 * 000000050000000000000014
-			 * ...
-			 *
-			 * In this situation, when requested to send the WAL from
-			 * segment 0x13, on timeline 4, we read the WAL from file
-			 * 000000050000000000000013. Archive recovery prefers files from
-			 * newer timelines, so if the segment was restored from the
-			 * archive on this server, the file belonging to the old timeline,
-			 * 000000040000000000000013, might not exist. Their contents are
-			 * equal up to the switchpoint, because at a timeline switch, the
-			 * used portion of the old segment is copied to the new file.
-			 *-------
-			 */
-			sendSeg->ws_tli = sendTimeLine;
-			if (sendTimeLineIsHistoric)
-			{
-				XLogSegNo	endSegNo;
-
-				XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
-				if (sendSeg->ws_segno == endSegNo)
-					sendSeg->ws_tli = sendTimeLineNextTLI;
-			}
-
-			XLogFilePath(path, sendSeg->ws_tli, sendSeg->ws_segno, segcxt->ws_segsize);
-
-			sendSeg->ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-			if (sendSeg->ws_file < 0)
-			{
-				/*
-				 * If the file is not found, assume it's because the standby
-				 * asked for a too old WAL segment that has already been
-				 * removed or recycled.
-				 */
-				if (errno == ENOENT)
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("requested WAL segment %s has already been removed",
-									XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno))));
-				else
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("could not open file \"%s\": %m",
-									path)));
-			}
-			sendSeg->ws_off = 0;
-		}
-
-		/* Need to seek in the file? */
-		if (sendSeg->ws_off != startoff)
-		{
-			if (lseek(sendSeg->ws_file, (off_t) startoff, SEEK_SET) < 0)
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not seek in log segment %s to offset %u: %m",
-								XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
-								startoff)));
-			sendSeg->ws_off = startoff;
-		}
-
-		/* How many bytes are within this segment? */
-		if (nbytes > (segcxt->ws_segsize - startoff))
-			segbytes = segcxt->ws_segsize - startoff;
-		else
-			segbytes = nbytes;
-
-		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
-		readbytes = read(sendSeg->ws_file, p, segbytes);
-		pgstat_report_wait_end();
-		if (readbytes < 0)
-		{
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read from log segment %s, offset %u, length %zu: %m",
-							XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
-							sendSeg->ws_off, (Size) segbytes)));
-		}
-		else if (readbytes == 0)
-		{
-			ereport(ERROR,
-					(errcode(ERRCODE_DATA_CORRUPTED),
-					 errmsg("could not read from log segment %s, offset %u: read %d of %zu",
-							XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
-							sendSeg->ws_off, readbytes, (Size) segbytes)));
-		}
-
-		/* Update state for read */
-		recptr += readbytes;
-
-		sendSeg->ws_off += readbytes;
-		nbytes -= readbytes;
-		p += readbytes;
+		XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
+		if (sendSeg->ws_segno == endSegNo)
+			*tli_p = sendTimeLineNextTLI;
 	}
 
-	/*
-	 * After reading into the buffer, check that what we read was valid. We do
-	 * this after reading, because even though the segment was present when we
-	 * opened it, it might get recycled or removed while we read it. The
-	 * read() succeeds in that case, but the data we tried to read might
-	 * already have been overwritten with new WAL records.
-	 */
-	XLByteToSeg(startptr, segno, segcxt->ws_segsize);
-	CheckXLogRemoved(segno, ThisTimeLineID);
+	XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize);
+	fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+	if (fd >= 0)
+		return fd;
 
 	/*
-	 * During recovery, the currently-open WAL file might be replaced with the
-	 * file of the same name retrieved from archive. So we always need to
-	 * check what we read was valid after reading into the buffer. If it's
-	 * invalid, we try to open and read the file again.
+	 * If the file is not found, assume it's because the standby asked for a
+	 * too old WAL segment that has already been removed or recycled.
 	 */
-	if (am_cascading_walsender)
-	{
-		WalSnd	   *walsnd = MyWalSnd;
-		bool		reload;
-
-		SpinLockAcquire(&walsnd->mutex);
-		reload = walsnd->needreload;
-		walsnd->needreload = false;
-		SpinLockRelease(&walsnd->mutex);
-
-		if (reload && sendSeg->ws_file >= 0)
-		{
-			close(sendSeg->ws_file);
-			sendSeg->ws_file = -1;
-
-			goto retry;
-		}
-	}
+	if (errno == ENOENT)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("requested WAL segment %s has already been removed",
+						XLogFileNameP(*tli_p, nextSegNo))));
+	else
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not open file \"%s\": %m",
+						path)));
+	return -1;					/* keep compiler quiet */
 }
 
 /*
@@ -2562,6 +2466,8 @@ XLogSendPhysical(void)
 	XLogRecPtr	startptr;
 	XLogRecPtr	endptr;
 	Size		nbytes;
+	XLogSegNo	segno;
+	WALReadError errinfo;
 
 	/* If requested switch the WAL sender to the stopping state. */
 	if (got_STOPPING)
@@ -2777,7 +2683,49 @@ XLogSendPhysical(void)
 	 * calls.
 	 */
 	enlargeStringInfo(&output_message, nbytes);
-	XLogRead(sendCxt, &output_message.data[output_message.len], startptr, nbytes);
+
+retry:
+	if (!WALRead(&output_message.data[output_message.len],
+				 startptr,
+				 nbytes,
+				 sendSeg->ws_tli,	/* Pass the current TLI because only
+									 * WalSndSegmentOpen controls whether new
+									 * TLI is needed. */
+				 sendSeg,
+				 sendCxt,
+				 WalSndSegmentOpen,
+				 &errinfo))
+		WALReadRaiseError(&errinfo);
+
+	/* See logical_read_xlog_page(). */
+	XLByteToSeg(startptr, segno, sendCxt->ws_segsize);
+	CheckXLogRemoved(segno, sendSeg->ws_tli);
+
+	/*
+	 * During recovery, the currently-open WAL file might be replaced with the
+	 * file of the same name retrieved from archive. So we always need to
+	 * check what we read was valid after reading into the buffer. If it's
+	 * invalid, we try to open and read the file again.
+	 */
+	if (am_cascading_walsender)
+	{
+		WalSnd	   *walsnd = MyWalSnd;
+		bool		reload;
+
+		SpinLockAcquire(&walsnd->mutex);
+		reload = walsnd->needreload;
+		walsnd->needreload = false;
+		SpinLockRelease(&walsnd->mutex);
+
+		if (reload && sendSeg->ws_file >= 0)
+		{
+			close(sendSeg->ws_file);
+			sendSeg->ws_file = -1;
+
+			goto retry;
+		}
+	}
+
 	output_message.len += nbytes;
 	output_message.data[output_message.len] = '\0';
 
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 1524e5eb1e..d5eb2f42f5 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -280,137 +280,56 @@ identify_target_directory(char *directory, char *fname)
 	return NULL;				/* not reached */
 }
 
-/*
- * Read count bytes from a segment file in the specified directory, for the
- * given timeline, containing the specified record pointer; store the data in
- * the passed buffer.
- */
-static void
-XLogDumpXLogRead(const char *directory, TimeLineID timeline_id,
-				 XLogRecPtr startptr, char *buf, Size count)
+static int
+WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+				   TimeLineID *tli_p)
 {
-	char	   *p;
-	XLogRecPtr	recptr;
-	Size		nbytes;
+	TimeLineID	tli = *tli_p;
+	char		fname[MAXPGPATH];
+	int			fd;
+	int			tries;
 
-	static int	sendFile = -1;
-	static XLogSegNo sendSegNo = 0;
-	static uint32 sendOff = 0;
+	XLogFileName(fname, tli, nextSegNo, segcxt->ws_segsize);
 
-	p = buf;
-	recptr = startptr;
-	nbytes = count;
-
-	while (nbytes > 0)
+	/*
+	 * In follow mode there is a short period of time after the server has
+	 * written the end of the previous file before the new file is available.
+	 * So we loop for 5 seconds looking for the file to appear before giving
+	 * up.
+	 */
+	for (tries = 0; tries < 10; tries++)
 	{
-		uint32		startoff;
-		int			segbytes;
-		int			readbytes;
-
-		startoff = XLogSegmentOffset(recptr, WalSegSz);
-
-		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, WalSegSz))
+		fd = open_file_in_directory(segcxt->ws_dir, fname);
+		if (fd >= 0)
+			return fd;
+		if (errno == ENOENT)
 		{
-			char		fname[MAXFNAMELEN];
-			int			tries;
-
-			/* Switch to another logfile segment */
-			if (sendFile >= 0)
-				close(sendFile);
-
-			XLByteToSeg(recptr, sendSegNo, WalSegSz);
-
-			XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-
-			/*
-			 * In follow mode there is a short period of time after the server
-			 * has written the end of the previous file before the new file is
-			 * available. So we loop for 5 seconds looking for the file to
-			 * appear before giving up.
-			 */
-			for (tries = 0; tries < 10; tries++)
-			{
-				sendFile = open_file_in_directory(directory, fname);
-				if (sendFile >= 0)
-					break;
-				if (errno == ENOENT)
-				{
-					int			save_errno = errno;
-
-					/* File not there yet, try again */
-					pg_usleep(500 * 1000);
-
-					errno = save_errno;
-					continue;
-				}
-				/* Any other error, fall through and fail */
-				break;
-			}
-
-			if (sendFile < 0)
-				fatal_error("could not find file \"%s\": %s",
-							fname, strerror(errno));
-			sendOff = 0;
-		}
-
-		/* Need to seek in the file? */
-		if (sendOff != startoff)
-		{
-			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
-			{
-				int			err = errno;
-				char		fname[MAXPGPATH];
-
-				XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-
-				fatal_error("could not seek in log file %s to offset %u: %s",
-							fname, startoff, strerror(err));
-			}
-			sendOff = startoff;
-		}
-
-		/* How many bytes are within this segment? */
-		if (nbytes > (WalSegSz - startoff))
-			segbytes = WalSegSz - startoff;
-		else
-			segbytes = nbytes;
-
-		readbytes = read(sendFile, p, segbytes);
-		if (readbytes <= 0)
-		{
-			int			err = errno;
-			char		fname[MAXPGPATH];
 			int			save_errno = errno;
 
-			XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
+			/* File not there yet, try again */
+			pg_usleep(500 * 1000);
+
 			errno = save_errno;
-
-			if (readbytes < 0)
-				fatal_error("could not read from log file %s, offset %u, length %d: %s",
-							fname, sendOff, segbytes, strerror(err));
-			else if (readbytes == 0)
-				fatal_error("could not read from log file %s, offset %u: read %d of %zu",
-							fname, sendOff, readbytes, (Size) segbytes);
+			continue;
 		}
-
-		/* Update state for read */
-		recptr += readbytes;
-
-		sendOff += readbytes;
-		nbytes -= readbytes;
-		p += readbytes;
+		/* Any other error, fall through and fail */
+		break;
 	}
+
+	fatal_error("could not find file \"%s\": %s", fname, strerror(errno));
+	return -1;					/* keep compiler quiet */
 }
 
 /*
  * XLogReader read_page callback
  */
 static int
-XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
-				 XLogRecPtr targetPtr, char *readBuff)
+WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
+				XLogRecPtr targetPtr, char *readBuff)
 {
 	XLogDumpPrivate *private = state->private_data;
 	int			count = XLOG_BLCKSZ;
+	WALReadError errinfo;
 
 	if (private->endptr != InvalidXLogRecPtr)
 	{
@@ -425,8 +344,23 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
 		}
 	}
 
-	XLogDumpXLogRead(state->segcxt.ws_dir, private->timeline, targetPagePtr,
-					 readBuff, count);
+	if (!WALRead(readBuff, targetPagePtr, count, private->timeline,
+				 &state->seg, &state->segcxt, WALDumpOpenSegment, &errinfo))
+	{
+		WALOpenSegment *seg = &errinfo.wre_seg;
+		char		fname[MAXPGPATH];
+
+		XLogFileName(fname, seg->ws_tli, seg->ws_segno,
+					 state->segcxt.ws_segsize);
+
+		if (errinfo.wre_errno != 0)
+			fatal_error("could not read in file %s, offset %u, length %zu: %s",
+						fname, errinfo.wre_off, (Size) errinfo.wre_req,
+						strerror(errinfo.wre_errno));
+		else
+			fatal_error("could not read in file %s, offset %u: length: %zu",
+						fname, errinfo.wre_off, (Size) errinfo.wre_req);
+	}
 
 	return count;
 }
@@ -1089,7 +1023,7 @@ main(int argc, char **argv)
 	/* done with argument parsing, do the actual work */
 
 	/* we have everything we need, start reading */
-	xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, XLogDumpReadPage,
+	xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, WALDumpReadPage,
 										  &private);
 	if (!xlogreader_state)
 		fatal_error("out of memory");
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 1bbee386e8..0193611b7f 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -36,7 +36,6 @@ typedef struct WALOpenSegment
 {
 	int			ws_file;		/* segment file descriptor */
 	XLogSegNo	ws_segno;		/* segment number */
-	uint32		ws_off;			/* offset in the segment */
 	TimeLineID	ws_tli;			/* timeline ID of the currently open file */
 } WALOpenSegment;
 
@@ -168,6 +167,7 @@ struct XLogReaderState
 	/* last read XLOG position for data currently in readBuf */
 	WALSegmentContext segcxt;
 	WALOpenSegment seg;
+	uint32		segoff;
 
 	/*
 	 * beginning of prior page read, and its TLI.  Doesn't necessarily
@@ -217,6 +217,24 @@ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
 /* Free an XLogReader */
 extern void XLogReaderFree(XLogReaderState *state);
 
+/*
+ * Callback to open the specified WAL segment for reading.  Returns a valid
+ * file descriptor when the file was opened successfully.
+ *
+ * "nextSegNo" is the number of the segment to be opened.
+ *
+ * "segcxt" is additional information about the segment.
+ *
+ * "tli_p" is an input/output argument. XLogRead() uses it to pass the
+ * timeline in which the new segment should be found, but the callback can use
+ * it to return the TLI that it actually opened.
+ *
+ * BasicOpenFile() is the preferred way to open the segment file in backend
+ * code, whereas open(2) should be used in frontend.
+ */
+typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+							   TimeLineID *tli_p);
+
 /* Initialize supporting structures */
 extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 							   int segsize, const char *waldir);
@@ -232,6 +250,25 @@ extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
 #ifdef FRONTEND
 extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
 #endif							/* FRONTEND */
+
+/*
+ * Error information from WALRead that both backend and frontend caller can
+ * process.  Currently only errors from pg_pread can be reported.
+ */
+typedef struct WALReadError
+{
+	int			wre_errno;		/* errno set by the last pg_pread() */
+	int			wre_off;		/* Offset we tried to read from. */
+	int			wre_req;		/* Bytes requested to be read. */
+	int			wre_read;		/* Bytes read by the last read(). */
+	WALOpenSegment wre_seg;		/* Segment we tried to read from. */
+} WALReadError;
+
+extern bool WALRead(char *buf, XLogRecPtr startptr, Size count,
+					TimeLineID tli, WALOpenSegment *seg,
+					WALSegmentContext *segcxt, WALSegmentOpen openSegment,
+					WALReadError *errinfo);
+
 /* Functions for decoding an XLogRecord */
 
 extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 2df98e45b2..0572b24192 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -54,4 +54,6 @@ extern int	read_local_xlog_page(XLogReaderState *state,
 extern void XLogReadDetermineTimeline(XLogReaderState *state,
 									  XLogRecPtr wantPage, uint32 wantLength);
 
+extern void WALReadRaiseError(WALReadError *errinfo);
+
 #endif
-- 
2.20.1

Reply via email to