On 2020-May-08, Kyotaro Horiguchi wrote:

> I agree to the direction of this patch. Thanks for the explanation.
> The patch looks good to me except the two points below.

Thanks!  I pushed the patch.  I fixed the walsender commentary as you
suggested, but I'm still of the opinion that we might want to use the
XLogReader abstraction in physical walsender than work without it; if
nothing else, that would simplify WALRead's API.

I didn't change this one though:

> wal_segment_close(XlogReaderState *state) is setting
> state->seg.ws_file to -1.  On the other hand wal_segment_close(state,..)
> doesn't update ws_file and the caller sets the returned value to
> (eventually) the same field.
> 
> +                     seg->ws_file = state->routine.segment_open(state, 
> nextSegNo,
> +                                                                             
>                            segcxt, &tli);
> 
> If you are willing to do so, I think it is better to make the callback
> functions are responsible to update the seg.ws_file and the callers
> don't care.

I agree that this would be a good idea, but it's more than just a
handful of lines of changes so I think we should consider it separately.
Attached as 0002.  I also realized while doing this that we can further
simplify WALRead()'s API if we're willing to bend walsender a little bit
more into the fake xlogreader thing; that's 0001.

I marked the open item closed nonetheless.

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From 600e28124ffb2741482bb096d5615b521ae83850 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Fri, 8 May 2020 16:40:24 -0400
Subject: [PATCH 1/2] fix WALRead API to take seg/segcxt from XLogReaderState

---
 src/backend/access/transam/xlogreader.c | 31 +++++++++++--------------
 src/backend/access/transam/xlogutils.c  |  1 -
 src/backend/replication/walsender.c     |  6 ++---
 src/bin/pg_waldump/pg_waldump.c         |  1 -
 src/include/access/xlogreader.h         |  4 +---
 5 files changed, 17 insertions(+), 26 deletions(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 7cee8b92c9..f42dee2640 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1050,8 +1050,6 @@ err:
  * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
  * fetched from timeline 'tli'.
  *
- * 'seg/segcxt' identify the last segment used.
- *
  * Returns true if succeeded, false if an error occurs, in which case
  * 'errinfo' receives error details.
  *
@@ -1061,7 +1059,6 @@ err:
 bool
 WALRead(XLogReaderState *state,
 		char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
-		WALOpenSegment *seg, WALSegmentContext *segcxt,
 		WALReadError *errinfo)
 {
 	char	   *p;
@@ -1078,34 +1075,34 @@ WALRead(XLogReaderState *state,
 		int			segbytes;
 		int			readbytes;
 
-		startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
+		startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
 
 		/*
 		 * If the data we want is not in a segment we have open, close what we
 		 * have (if anything) and open the next one, using the caller's
 		 * provided openSegment callback.
 		 */
-		if (seg->ws_file < 0 ||
-			!XLByteInSeg(recptr, seg->ws_segno, segcxt->ws_segsize) ||
-			tli != seg->ws_tli)
+		if (state->seg.ws_file < 0 ||
+			!XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) ||
+			tli != state->seg.ws_tli)
 		{
 			XLogSegNo	nextSegNo;
 
-			if (seg->ws_file >= 0)
+			if (state->seg.ws_file >= 0)
 				state->routine.segment_close(state);
 
-			XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize);
-			seg->ws_file = state->routine.segment_open(state, nextSegNo,
-													   segcxt, &tli);
+			XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
+			state->seg.ws_file = state->routine.segment_open(state, nextSegNo,
+															 &state->segcxt, &tli);
 
 			/* Update the current segment info. */
-			seg->ws_tli = tli;
-			seg->ws_segno = nextSegNo;
+			state->seg.ws_tli = tli;
+			state->seg.ws_segno = nextSegNo;
 		}
 
 		/* How many bytes are within this segment? */
-		if (nbytes > (segcxt->ws_segsize - startoff))
-			segbytes = segcxt->ws_segsize - startoff;
+		if (nbytes > (state->segcxt.ws_segsize - startoff))
+			segbytes = state->segcxt.ws_segsize - startoff;
 		else
 			segbytes = nbytes;
 
@@ -1115,7 +1112,7 @@ WALRead(XLogReaderState *state,
 
 		/* Reset errno first; eases reporting non-errno-affecting errors */
 		errno = 0;
-		readbytes = pg_pread(seg->ws_file, p, segbytes, (off_t) startoff);
+		readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff);
 
 #ifndef FRONTEND
 		pgstat_report_wait_end();
@@ -1127,7 +1124,7 @@ WALRead(XLogReaderState *state,
 			errinfo->wre_req = segbytes;
 			errinfo->wre_read = readbytes;
 			errinfo->wre_off = startoff;
-			errinfo->wre_seg = *seg;
+			errinfo->wre_seg = state->seg;
 			return false;
 		}
 
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index bbd801513a..fc0bb7d059 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -947,7 +947,6 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	 * zero-padded up to the page boundary if it's incomplete.
 	 */
 	if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli,
-				 &state->seg, &state->segcxt,
 				 &errinfo))
 		WALReadRaiseError(&errinfo);
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d18475b854..ed8c08cb6a 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -840,8 +840,6 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 				 sendSeg->ws_tli,	/* Pass the current TLI because only
 									 * WalSndSegmentOpen controls whether new
 									 * TLI is needed. */
-				 sendSeg,
-				 sendCxt,
 				 &errinfo))
 		WALReadRaiseError(&errinfo);
 
@@ -2760,6 +2758,8 @@ XLogSendPhysical(void)
 	enlargeStringInfo(&output_message, nbytes);
 
 retry:
+	fake_xlogreader.seg = *sendSeg;
+	fake_xlogreader.segcxt = *sendCxt;
 	if (!WALRead(&fake_xlogreader,
 				 &output_message.data[output_message.len],
 				 startptr,
@@ -2767,8 +2767,6 @@ retry:
 				 sendSeg->ws_tli,	/* Pass the current TLI because only
 									 * WalSndSegmentOpen controls whether new
 									 * TLI is needed. */
-				 sendSeg,
-				 sendCxt,
 				 &errinfo))
 		WALReadRaiseError(&errinfo);
 
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index e29f65500f..46734914b7 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -356,7 +356,6 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
 	}
 
 	if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline,
-				 &state->seg, &state->segcxt,
 				 &errinfo))
 	{
 		WALOpenSegment *seg = &errinfo.wre_seg;
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 81af200f5e..e77f478d68 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -301,9 +301,7 @@ typedef struct WALReadError
 
 extern bool WALRead(XLogReaderState *state,
 					char *buf, XLogRecPtr startptr, Size count,
-					TimeLineID tli, WALOpenSegment *seg,
-					WALSegmentContext *segcxt,
-					WALReadError *errinfo);
+					TimeLineID tli, WALReadError *errinfo);
 
 /* Functions for decoding an XLogRecord */
 
-- 
2.20.1

>From 721cadab3f1ab88c7b2f12ed2ede5dd3c9455856 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Fri, 8 May 2020 17:03:09 -0400
Subject: [PATCH 2/2] Simplify XLogReader's open_segment API

Instead of returning the file descriptor, install it directly in
XLogReaderState->seg.ws_file.
---
 src/backend/access/transam/xlogreader.c |  4 ++--
 src/backend/access/transam/xlogutils.c  | 32 ++++++++++++-------------
 src/backend/replication/walsender.c     | 12 ++++------
 src/bin/pg_waldump/pg_waldump.c         |  9 ++++---
 src/include/access/xlogreader.h         | 12 +++++-----
 src/include/access/xlogutils.h          |  2 +-
 6 files changed, 33 insertions(+), 38 deletions(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index f42dee2640..a533241370 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1092,8 +1092,8 @@ WALRead(XLogReaderState *state,
 				state->routine.segment_close(state);
 
 			XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
-			state->seg.ws_file = state->routine.segment_open(state, nextSegNo,
-															 &state->segcxt, &tli);
+			state->routine.segment_open(state, nextSegNo, &state->segcxt, &tli);
+			Assert(state->seg.ws_file >= 0);	/* shouldn't happen */
 
 			/* Update the current segment info. */
 			state->seg.ws_tli = tli;
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index fc0bb7d059..1cc2c624a4 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -784,7 +784,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
 }
 
 /* XLogReaderRoutine->segment_open callback for local pg_wal files */
-int
+void
 wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo,
 				 WALSegmentContext *segcxt, TimeLineID *tli_p)
 {
@@ -793,22 +793,20 @@ wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo,
 	int			fd;
 
 	XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize);
-	fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-	if (fd >= 0)
-		return fd;
-
-	if (errno == ENOENT)
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("requested WAL segment %s has already been removed",
-						path)));
-	else
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not open file \"%s\": %m",
-						path)));
-
-	return -1;					/* keep compiler quiet */
+	state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+	if (state->seg.ws_file < 0)
+	{
+		if (errno == ENOENT)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("requested WAL segment %s has already been removed",
+							path)));
+		else
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not open file \"%s\": %m",
+							path)));
+	}
 }
 
 /* stock XLogReaderRoutine->segment_close callback */
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index ed8c08cb6a..b9f029d44f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -248,7 +248,7 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
-static int	WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
+static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
 							  WALSegmentContext *segcxt, TimeLineID *tli_p);
 static void UpdateSpillStats(LogicalDecodingContext *ctx);
 
@@ -2445,13 +2445,12 @@ WalSndKill(int code, Datum arg)
 }
 
 /* XLogReaderRoutine->segment_open callback */
-static int
+static void
 WalSndSegmentOpen(XLogReaderState *state,
 				  XLogSegNo nextSegNo, WALSegmentContext *segcxt,
 				  TimeLineID *tli_p)
 {
 	char		path[MAXPGPATH];
-	int			fd;
 
 	/*-------
 	 * When reading from a historic timeline, and there is a timeline switch
@@ -2488,9 +2487,9 @@ WalSndSegmentOpen(XLogReaderState *state,
 	}
 
 	XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize);
-	fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-	if (fd >= 0)
-		return fd;
+	state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+	if (state->seg.ws_file >= 0)
+		return;
 
 	/*
 	 * If the file is not found, assume it's because the standby asked for a
@@ -2513,7 +2512,6 @@ WalSndSegmentOpen(XLogReaderState *state,
 				(errcode_for_file_access(),
 				 errmsg("could not open file \"%s\": %m",
 						path)));
-	return -1;					/* keep compiler quiet */
 }
 
 /*
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 46734914b7..1a5c5a157c 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -280,7 +280,7 @@ identify_target_directory(char *directory, char *fname)
 }
 
 /* pg_waldump's XLogReaderRoutine->segment_open callback */
-static int
+static void
 WALDumpOpenSegment(XLogReaderState *state,
 				   XLogSegNo nextSegNo, WALSegmentContext *segcxt,
 				   TimeLineID *tli_p)
@@ -300,9 +300,9 @@ WALDumpOpenSegment(XLogReaderState *state,
 	 */
 	for (tries = 0; tries < 10; tries++)
 	{
-		fd = open_file_in_directory(segcxt->ws_dir, fname);
-		if (fd >= 0)
-			return fd;
+		state->seg.ws_file = open_file_in_directory(segcxt->ws_dir, fname);
+		if (state->seg.ws_file >= 0)
+			return;
 		if (errno == ENOENT)
 		{
 			int			save_errno = errno;
@@ -318,7 +318,6 @@ WALDumpOpenSegment(XLogReaderState *state,
 	}
 
 	fatal_error("could not find file \"%s\": %m", fname);
-	return -1;					/* keep compiler quiet */
 }
 
 /*
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index e77f478d68..b73df02218 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -63,10 +63,10 @@ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
 							   int reqLen,
 							   XLogRecPtr targetRecPtr,
 							   char *readBuf);
-typedef int (*WALSegmentOpenCB) (XLogReaderState *xlogreader,
-								 XLogSegNo nextSegNo,
-								 WALSegmentContext *segcxt,
-								 TimeLineID *tli_p);
+typedef void (*WALSegmentOpenCB) (XLogReaderState *xlogreader,
+								  XLogSegNo nextSegNo,
+								  WALSegmentContext *segcxt,
+								  TimeLineID *tli_p);
 typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader);
 
 typedef struct XLogReaderRoutine
@@ -94,8 +94,8 @@ typedef struct XLogReaderRoutine
 	XLogPageReadCB page_read;
 
 	/*
-	 * Callback to open the specified WAL segment for reading.  The file
-	 * descriptor of the opened segment shall be returned.  In case of
+	 * Callback to open the specified WAL segment for reading.  ->seg.ws_file
+	 * shall be set to the file descriptor of the opened segment.  In case of
 	 * failure, an error shall be raised by the callback and it shall not
 	 * return.
 	 *
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 68ce815476..b7bdc5db34 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -50,7 +50,7 @@ extern void FreeFakeRelcacheEntry(Relation fakerel);
 extern int	read_local_xlog_page(XLogReaderState *state,
 								 XLogRecPtr targetPagePtr, int reqLen,
 								 XLogRecPtr targetRecPtr, char *cur_page);
-extern int	wal_segment_open(XLogReaderState *state,
+extern void wal_segment_open(XLogReaderState *state,
 							 XLogSegNo nextSegNo,
 							 WALSegmentContext *segcxt,
 							 TimeLineID *tli_p);
-- 
2.20.1

Reply via email to