This conflicts with 6912acc (replication lag tracker) so just
rebased on a6f22e8.

At Fri, 17 Mar 2017 16:48:27 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI 
<horiguchi.kyot...@lab.ntt.co.jp> wrote in 
<20170317.164827.46663014.horiguchi.kyot...@lab.ntt.co.jp>
> Hello,
> 
> At Mon, 13 Mar 2017 11:06:00 +1100, Venkata B Nagothi <nag1...@gmail.com> 
> wrote in <caeyp7j-4mmvwgozswvasulzc80jdd_tl-9ksniqf17+bnqi...@mail.gmail.com>
> > On Tue, Jan 17, 2017 at 9:36 PM, Kyotaro HORIGUCHI <
> > horiguchi.kyot...@lab.ntt.co.jp> wrote:
> > > I managed to reproduce this. A little tweak as the first patch
> > > lets the standby to suicide as soon as walreceiver sees a
> > > contrecord at the beginning of a segment.
> > >
> > > - M(aster): createdb as a master with wal_keep_segments = 0
> > >             (default), min_log_messages = debug2
> > > - M: Create a physical repslot.
> > > - S(tandby): Setup a standby database.
> > > - S: Edit recovery.conf to use the replication slot above then
> > >      start it.
> > > - S: touch /tmp/hoge
> > > - M: Run pgbench ...
> > > - S: After a while, the standby stops.
> > >   > LOG:  #################### STOP THE SERVER
> > >
> > > - M: Stop pgbench.
> > > - M: Do 'checkpoint;' twice.
> > > - S: rm /tmp/hoge
> > > - S: Fails to catch up with the following error.
> > >
> > >   > FATAL:  could not receive data from WAL stream: ERROR:  requested WAL
> > > segment 00000001000000000000002B has already been removed
> > >
> > >
> > I have been testing / reviewing the latest patch
> > "0001-Fix-a-bug-of-physical-replication-slot.patch" and i think, i might
> > need some more clarification on this.
> > 
> > Before applying the patch, I tried re-producing the above error -
> > 
> > - I had master->standby in streaming replication
> > - Took the backup of master
> >    - with a low max_wal_size and wal_keep_segments = 0
> > - Configured standby with recovery.conf
> > - Created replication slot on master
> > - Configured the replication slot on standby and started the standby
> 
> I suppose the "configure" means primary_slot_name in recovery.conf.
> 
> > - I got the below error
> > 
> >    >> 2017-03-10 11:58:15.704 AEDT [478] LOG:  invalid record length at
> > 0/F2000140: wanted 24, got 0
> >    >> 2017-03-10 11:58:15.706 AEDT [481] LOG:  started streaming WAL from
> > primary at 0/F2000000 on timeline 1
> >    >> 2017-03-10 11:58:15.706 AEDT [481] FATAL:  could not receive data
> > from WAL stream: ERROR:  requested WAL segment 0000000100000000000000F2 has
> > already been removed
> 
> Maybe you created the master slot with non-reserve (default) mode
> and put a some-minites pause after making the backup and before
> starting the standby. For the case the master slot doesn't keep
> WAL segments unless the standby connects so a couple of
> checkpoints can blow away the first segment required by the
> standby. This is quite reasonable behavior. The following steps
> makes this more sure.
> 
> > - Took the backup of master
> >    - with a low max_wal_size = 2 and wal_keep_segments = 0
> > - Configured standby with recovery.conf
> > - Created replication slot on master
> + - SELECT pg_switch_wal(); on master twice.
> + - checkpoint; on master twice.
> > - Configured the replication slot on standby and started the standby
> 
> Creating the slot with the following command will save it.
> 
> =# select pg_create_physical_replication_slot('s1', true);
> 
> 
> > and i could notice that the file "0000000100000000000000F2" was removed
> > from the master. This can be easily re-produced and this occurs
> > irrespective of configuring replication slots.
> > 
> > As long as the file "0000000100000000000000F2" is available on the master,
> > standby continues to stream WALs without any issues.
> ...
> > If the scenario i created to reproduce the error is correct, then, applying
> > the patch is not making a difference.
> 
> Yes, the patch is not for saving this case. The patch saves the
> case where the previous segment to the first required segment by
> standby was removed and it contains the first part of a record
> continues to the first required segment. On the other hand this
> case is that the segment at the start point of standby is just
> removed.
> 
> > I think, i need help in building a specific test case which will re-produce
> > the specific BUG related to physical replication slots as reported ?
> > 
> > Will continue to review the patch, once i have any comments on this.
> 
> Thaks a lot!

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From 9046125563b1b02e9bc0839bde5d77e77c940bbd Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Wed, 1 Feb 2017 16:07:22 +0900
Subject: [PATCH] Fix a bug of physical replication slot.

A physical-replication standby can stop just at the boundary of WAL
segments. restart_lsn of the slot on the master can be assumed to be
the same location. The last segment on the master will be removed
after some checkpoints for the case. If the first record of the next
segment is a continuation record, it is only on the master and its
beginning is only on the standby so the standby cannot restart because
the record to read is scattered to two sources.

This patch detains restart_lsn in the last sgement when the first page
of the next segment is a continuation record.
---
 src/backend/replication/walsender.c | 104 +++++++++++++++++++++++++++++++++---
 1 file changed, 97 insertions(+), 7 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index cfc3fba..6dff18b 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -210,6 +210,13 @@ static struct
 	WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE];
 } LagTracker;
 
+/*
+ * This variable corresponds to restart_lsn in pg_replication_slots for a
+ * physical slot. This has a valid value only when it differs from the current
+ * flush pointer.
+ */
+static XLogRecPtr	   restartLSN = InvalidXLogRecPtr;
+
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
 static void WalSndXLogSendHandler(SIGNAL_ARGS);
@@ -244,7 +251,7 @@ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
-static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+static bool XLogRead(char *buf, XLogRecPtr startptr, Size count, bool noutfoundok);
 
 
 /* Initialize walsender process before entering the main command loop */
@@ -538,6 +545,9 @@ StartReplication(StartReplicationCmd *cmd)
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 					 (errmsg("cannot use a logical replication slot for physical replication"))));
+
+		/* Restore restartLSN from replication slot */
+		restartLSN = MyReplicationSlot->data.restart_lsn;
 	}
 
 	/*
@@ -553,6 +563,10 @@ StartReplication(StartReplicationCmd *cmd)
 	else
 		FlushPtr = GetFlushRecPtr();
 
+	/* Set InvalidXLogRecPtr if catching up */
+	if (restartLSN == FlushPtr)
+		restartLSN = InvalidXLogRecPtr;
+
 	if (cmd->timeline != 0)
 	{
 		XLogRecPtr	switchpoint;
@@ -767,7 +781,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 		count = flushptr - targetPagePtr;
 
 	/* now actually read the data, we know it's there */
-	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, false);
 
 	return count;
 }
@@ -1668,7 +1682,7 @@ static void
 ProcessStandbyReplyMessage(void)
 {
 	XLogRecPtr	writePtr,
-				flushPtr,
+				flushPtr, oldFlushPtr,
 				applyPtr;
 	bool		replyRequested;
 	TimeOffset	writeLag,
@@ -1728,6 +1742,7 @@ ProcessStandbyReplyMessage(void)
 		WalSnd	   *walsnd = MyWalSnd;
 
 		SpinLockAcquire(&walsnd->mutex);
+		oldFlushPtr = walsnd->flush;
 		walsnd->write = writePtr;
 		walsnd->flush = flushPtr;
 		walsnd->apply = applyPtr;
@@ -1751,7 +1766,74 @@ ProcessStandbyReplyMessage(void)
 		if (SlotIsLogical(MyReplicationSlot))
 			LogicalConfirmReceivedLocation(flushPtr);
 		else
-			PhysicalConfirmReceivedLocation(flushPtr);
+		{
+			/*
+			 * Recovery on standby requires that a continuation reocrd must be
+			 * available from single WAL source. For the reason, physical
+			 * replication slot should stay in the first segment for a
+			 * continuation record spanning multiple segments. Since this
+			 * doesn't look into individual record, restartLSN may stay a bit
+			 * too behind.
+			 *
+			 * Since the objective is avoding to remove required segments,
+			 * checking at the beginning of every segment is enough. But once
+			 * restartLSN goes behind, check every page for quick restoration.
+			 *
+			 * restartLSN has a valid value only when it is behind flushPtr.
+			 */
+			if (oldFlushPtr != InvalidXLogRecPtr &&
+				(restartLSN == InvalidXLogRecPtr ?
+				 oldFlushPtr / XLOG_SEG_SIZE != flushPtr / XLOG_SEG_SIZE :
+				 restartLSN / XLOG_BLCKSZ != flushPtr / XLOG_BLCKSZ))
+			{
+				XLogRecPtr rp;
+
+				if (restartLSN == InvalidXLogRecPtr)
+					restartLSN = oldFlushPtr;
+
+				rp = restartLSN - (restartLSN % XLOG_BLCKSZ);
+
+				/*
+				 * We may have let the record at flushPtr sent, so it's worth
+				 * looking
+				 */
+				while (rp <= flushPtr)
+				{
+					XLogPageHeaderData header;
+
+					/*
+					 * If the page header is not available for now, don't move
+					 * restartLSN forward. We can read it by the next chance.
+					 */
+					if(sentPtr - rp >= sizeof(XLogPageHeaderData))
+					{
+						bool found;
+						/*
+						 * Fetch the page header of the next page. Move
+						 * restartLSN forward only if it is not a continuation
+						 * page.
+						 */
+						found = XLogRead((char *)&header, rp,
+											 sizeof(XLogPageHeaderData), true);
+						if (found &&
+							(header.xlp_info & XLP_FIRST_IS_CONTRECORD) == 0)
+							restartLSN = rp;
+					}
+					rp += XLOG_BLCKSZ;
+				}
+
+				/*
+				 * If restartLSN is on the same page with flushPtr, it means
+				 * that we are catching up.
+				 */
+				if (restartLSN / XLOG_BLCKSZ == flushPtr / XLOG_BLCKSZ)
+					restartLSN = InvalidXLogRecPtr;
+			}
+
+			/* restartLSN == InvalidXLogRecPtr means catching up */
+			PhysicalConfirmReceivedLocation(restartLSN != InvalidXLogRecPtr ?
+											restartLSN : flushPtr);
+		}
 	}
 }
 
@@ -2217,6 +2299,7 @@ WalSndKill(int code, Datum arg)
 
 /*
  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
+ * Returns false if the segment file is not found when notfoundok is true.
  *
  * XXX probably this should be improved to suck data directly from the
  * WAL buffers when possible.
@@ -2226,8 +2309,8 @@ WalSndKill(int code, Datum arg)
  * always be one descriptor left open until the process ends, but never
  * more than one.
  */
-static void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+static bool
+XLogRead(char *buf, XLogRecPtr startptr, Size count, bool notfoundok)
 {
 	char	   *p;
 	XLogRecPtr	recptr;
@@ -2304,10 +2387,15 @@ retry:
 				 * removed or recycled.
 				 */
 				if (errno == ENOENT)
+				{
+					if (notfoundok)
+						return false;
+
 					ereport(ERROR,
 							(errcode_for_file_access(),
 							 errmsg("requested WAL segment %s has already been removed",
 								XLogFileNameP(curFileTimeLine, sendSegNo))));
+				}
 				else
 					ereport(ERROR,
 							(errcode_for_file_access(),
@@ -2389,6 +2477,8 @@ retry:
 			goto retry;
 		}
 	}
+
+	return true;
 }
 
 /*
@@ -2619,7 +2709,7 @@ XLogSendPhysical(void)
 	 * calls.
 	 */
 	enlargeStringInfo(&output_message, nbytes);
-	XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+	XLogRead(&output_message.data[output_message.len], startptr, nbytes, false);
 	output_message.len += nbytes;
 	output_message.data[output_message.len] = '\0';
 
-- 
2.9.2

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to