Hello,

At Thu, 19 Jan 2017 18:37:31 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI 
<horiguchi.kyot...@lab.ntt.co.jp> wrote in 
<20170119.183731.223893446.horiguchi.kyot...@lab.ntt.co.jp>
> > > - Delaying recycling a segment until the last partial record on it
> > >   completes. This seems doable in page-wise (coarse resolution)
> > >   but would cost additional reading of past xlog files (page
> > >   header of past pages is required).
> > 
> > Hm, yes. That looks like the least invasive way to go. At least that
> > looks more correct than the others.
> 
> The attached patch does that. Usually it reads page headers only
> on segment boundaries, but once continuation record found (or
> failed to read the next page header, that is, the first record on
> the first page in the next segment has not been replicated), it
> becomes to happen on every page boundary until non-continuation
> page comes.
> 
> I leave a debug info (at LOG level) in the attached file shown on
> every state change of keep pointer. At least for pgbench, the
> cost seems ignorable.

I revised it. It became neater and less invasive.

 - Removed added keep from struct WalSnd. It is never referrenced
   from other processes. It is static variable now.

 - Restore keepPtr from replication slot on starting.

 - Moved the main part to more appropriate position.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f3082c3..0270474 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -185,6 +185,12 @@ static volatile sig_atomic_t replication_active = false;
 static LogicalDecodingContext *logical_decoding_ctx = NULL;
 static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
 
+/*
+ * Segment keep pointer for physical slots. Has a valid value only when it
+ * differs from the current flush pointer.
+ */
+static XLogRecPtr	   keepPtr = InvalidXLogRecPtr;
+
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
 static void WalSndXLogSendHandler(SIGNAL_ARGS);
@@ -217,7 +223,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 
-static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+static bool XLogRead(char *buf, XLogRecPtr startptr, Size count, bool noutfoundok);
 
 
 /* Initialize walsender process before entering the main command loop */
@@ -538,6 +544,9 @@ StartReplication(StartReplicationCmd *cmd)
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 					 (errmsg("cannot use a logical replication slot for physical replication"))));
+
+		/* Restore keepPtr from replication slot */
+		keepPtr = MyReplicationSlot->data.restart_lsn;
 	}
 
 	/*
@@ -553,6 +562,10 @@ StartReplication(StartReplicationCmd *cmd)
 	else
 		FlushPtr = GetFlushRecPtr();
 
+	/* Set InvalidXLogRecPtr if catching up */
+	if (keepPtr == FlushPtr)
+		keepPtr = InvalidXLogRecPtr;
+	
 	if (cmd->timeline != 0)
 	{
 		XLogRecPtr	switchpoint;
@@ -774,7 +787,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 		count = flushptr - targetPagePtr;
 
 	/* now actually read the data, we know it's there */
-	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, false);
 
 	return count;
 }
@@ -1551,7 +1564,7 @@ static void
 ProcessStandbyReplyMessage(void)
 {
 	XLogRecPtr	writePtr,
-				flushPtr,
+				flushPtr, oldFlushPtr,
 				applyPtr;
 	bool		replyRequested;
 
@@ -1580,6 +1593,7 @@ ProcessStandbyReplyMessage(void)
 		WalSnd	   *walsnd = MyWalSnd;
 
 		SpinLockAcquire(&walsnd->mutex);
+		oldFlushPtr = walsnd->flush;
 		walsnd->write = writePtr;
 		walsnd->flush = flushPtr;
 		walsnd->apply = applyPtr;
@@ -1597,7 +1611,78 @@ ProcessStandbyReplyMessage(void)
 		if (SlotIsLogical(MyReplicationSlot))
 			LogicalConfirmReceivedLocation(flushPtr);
 		else
-			PhysicalConfirmReceivedLocation(flushPtr);
+		{
+			/*
+			 * On recovery, a continuation reocrd must be available from
+			 * single WAL source. So physical replication slot should stay in
+			 * the first segment for a continuation record spanning multiple
+			 * segments. Since this doesn't look into individual record,
+			 * keepPtr may stay a bit too behind.
+			 *
+			 * Since the objective is avoding to remove required segments,
+			 * checking every segment is enough. But once keepPtr goes behind,
+			 * check every page for quick restoration.
+			 *
+			 * keepPtr has a valid value only when it is behind flushPtr.
+			 */
+			if (oldFlushPtr != InvalidXLogRecPtr &&
+				(keepPtr == InvalidXLogRecPtr ?
+				 oldFlushPtr / XLOG_SEG_SIZE != flushPtr / XLOG_SEG_SIZE :
+				 keepPtr / XLOG_BLCKSZ != flushPtr / XLOG_BLCKSZ))
+			{
+				XLogRecPtr rp;
+				XLogRecPtr oldKeepPtr = keepPtr; /* for debug */
+
+				if (keepPtr == InvalidXLogRecPtr)
+					keepPtr = oldFlushPtr;
+
+				rp = keepPtr - (keepPtr % XLOG_BLCKSZ);
+
+				/*
+				 * We may have let the record at flushPtr sent, so it's worth
+				 * looking
+				 */
+				while (rp <= flushPtr)
+				{
+					XLogPageHeaderData header;
+
+					/*
+					 * If the page header is not available for now, don't move
+					 * keepPtr forward. We can read it by the next chance.
+					 */
+					if(sentPtr - rp >= sizeof(XLogPageHeaderData))
+					{
+						bool found;
+						/*
+						 * Fetch the page header of the next page. Move
+						 * keepPtr forward only if when it is not a
+						 * continuation page.
+						 */
+						found = XLogRead((char *)&header, rp,
+											 sizeof(XLogPageHeaderData), true);
+						if (found &&
+							(header.xlp_info & XLP_FIRST_IS_CONTRECORD) == 0)
+							keepPtr = rp;
+					}
+					rp += XLOG_BLCKSZ;
+				}
+
+				/*
+				 * If keepPtr is on the same page with flushPtr, it means that
+				 * we are catching up
+				 */
+				if (keepPtr / XLOG_BLCKSZ == flushPtr / XLOG_BLCKSZ)
+					keepPtr = InvalidXLogRecPtr;
+
+				if (oldKeepPtr != keepPtr)
+					elog(LOG, "%lX => %lX / %lX",
+						 oldKeepPtr, keepPtr, flushPtr); 
+			}
+
+			/* keepPtr == InvalidXLogRecPtr means catching up */
+			PhysicalConfirmReceivedLocation(keepPtr != InvalidXLogRecPtr ?
+											keepPtr : flushPtr);
+		}
 	}
 }
 
@@ -2019,6 +2104,7 @@ WalSndKill(int code, Datum arg)
 
 /*
  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
+ * Returns false if the segment file is not found when notfoundok is true.
  *
  * XXX probably this should be improved to suck data directly from the
  * WAL buffers when possible.
@@ -2028,8 +2114,8 @@ WalSndKill(int code, Datum arg)
  * always be one descriptor left open until the process ends, but never
  * more than one.
  */
-static void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+static bool
+XLogRead(char *buf, XLogRecPtr startptr, Size count, bool notfoundok)
 {
 	char	   *p;
 	XLogRecPtr	recptr;
@@ -2106,10 +2192,15 @@ retry:
 				 * removed or recycled.
 				 */
 				if (errno == ENOENT)
+				{
+					if (notfoundok)
+						return false;
+
 					ereport(ERROR,
 							(errcode_for_file_access(),
 							 errmsg("requested WAL segment %s has already been removed",
 								XLogFileNameP(curFileTimeLine, sendSegNo))));
+				}
 				else
 					ereport(ERROR,
 							(errcode_for_file_access(),
@@ -2189,6 +2280,8 @@ retry:
 			goto retry;
 		}
 	}
+
+	return true;
 }
 
 /*
@@ -2393,7 +2486,7 @@ XLogSendPhysical(void)
 	 * calls.
 	 */
 	enlargeStringInfo(&output_message, nbytes);
-	XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+	XLogRead(&output_message.data[output_message.len], startptr, nbytes, false);
 	output_message.len += nbytes;
 	output_message.data[output_message.len] = '\0';
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to