Hi,

On 2014-09-10 14:53:07 -0400, Robert Haas wrote:
> As discussed on the thread "Spinlocks and compiler/memory barriers",
> now that we've made the spinlock primitives function as compiler
> barriers (we think), it should be possible to remove volatile
> qualifiers from many places in the source code.  The attached patch
> does this in lwlock.c.  If the changes in commit
> 0709b7ee72e4bc71ad07b7120acd117265ab51d0 (and follow-on commits) are
> correct and complete, applying this shouldn't break anything, while
> possibly giving the compiler room to optimize things better than it
> does today.
> 
> However, demonstrating the necessity of that commit for these changes
> seems to be non-trivial.  I tried applying this patch and reverting
> commits 5b26278822c69dd76ef89fd50ecc7cdba9c3f035,
> b4c28d1b92c81941e4fc124884e51a7c110316bf, and
> 0709b7ee72e4bc71ad07b7120acd117265ab51d0 on a PPC64 POWER8 box with a
> whopping 192 hardware threads (thanks, IBM!).  I then ran the
> regression tests repeatedly, and I ran several long pgbench runs with
> as many as 350 concurrent clients.  No failures.

There's actually one more commit to revert. What I used was:
git revert 5b26278822c69dd76ef89fd50ecc7cdba9c3f035 \
    b4c28d1b92c81941e4fc124884e51a7c110316bf \
    68e66923ff629c324e219090860dc9e0e0a6f5d6 \
    0709b7ee72e4bc71ad07b7120acd117265ab51d0

> So I'm posting this patch in the hope that others can help.  The
> relevant tests are:
> 
> 1. If you apply this patch to master and run tests of whatever kind
> strikes your fancy, does anything break under high concurrency?  If it
> does, then the above commits weren't enough to make this safe on your
> platform.
> 
> 2. If you apply this patch to master, revert the commits mentioned
> above, and again run tests, does anything now break?  If it does (but
> the first tests were OK), then that shows that those commits did
> something useful on your platform.

I just tried this on my normal x86 workstation. I applied your lwlock
patch and ontop I removed most volatiles (there's a couple still
required) from xlog.c. Works for 100 seconds. Then I reverted the above
commits. Breaks within seconds:
master:
LOG:  request to flush past end of generated WAL; request 2/E5EC3DE0, currpos 
2/E5EC1E60
standby:
LOG:  record with incorrect prev-link 4/684C3108 at 4/684C3108
and similar.

So at least for x86 the compiler barriers are obviously required and
seemingly working.

I've attached the very quickly written xlog.c de-volatizing patch.

Greetings,

Andres Freund

-- 
 Andres Freund                     http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
>From 2b68a134925e4b2fb6ff282f5ed83b8f57b10732 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Wed, 17 Sep 2014 13:21:20 +0200
Subject: [PATCH] xlog.c-remove-volatile

---
 src/backend/access/transam/xlog.c | 473 ++++++++++++++------------------------
 1 file changed, 176 insertions(+), 297 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 34f2fc0..103f077 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1219,16 +1219,13 @@ begin:;
 	 */
 	if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
 	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile XLogCtlData *xlogctl = XLogCtl;
-
-		SpinLockAcquire(&xlogctl->info_lck);
+		SpinLockAcquire(&XLogCtl->info_lck);
 		/* advance global request to include new block(s) */
-		if (xlogctl->LogwrtRqst.Write < EndPos)
-			xlogctl->LogwrtRqst.Write = EndPos;
+		if (XLogCtl->LogwrtRqst.Write < EndPos)
+			XLogCtl->LogwrtRqst.Write = EndPos;
 		/* update local result copy while I have the chance */
-		LogwrtResult = xlogctl->LogwrtResult;
-		SpinLockRelease(&xlogctl->info_lck);
+		LogwrtResult = XLogCtl->LogwrtResult;
+		SpinLockRelease(&XLogCtl->info_lck);
 	}
 
 	/*
@@ -1323,7 +1320,7 @@ static void
 ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
 						  XLogRecPtr *PrevPtr)
 {
-	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+	XLogCtlInsert *Insert = &XLogCtl->Insert;
 	uint64		startbytepos;
 	uint64		endbytepos;
 	uint64		prevbytepos;
@@ -1378,7 +1375,7 @@ ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
 static bool
 ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
 {
-	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+	XLogCtlInsert *Insert = &XLogCtl->Insert;
 	uint64		startbytepos;
 	uint64		endbytepos;
 	uint64		prevbytepos;
@@ -1696,7 +1693,7 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 	uint64		bytepos;
 	XLogRecPtr	reservedUpto;
 	XLogRecPtr	finishedUpto;
-	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+	XLogCtlInsert *Insert = &XLogCtl->Insert;
 	int			i;
 
 	if (MyProc == NULL)
@@ -2131,16 +2128,11 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 				break;
 
 			/* Before waiting, get info_lck and update LogwrtResult */
-			{
-				/* use volatile pointer to prevent code rearrangement */
-				volatile XLogCtlData *xlogctl = XLogCtl;
-
-				SpinLockAcquire(&xlogctl->info_lck);
-				if (xlogctl->LogwrtRqst.Write < OldPageRqstPtr)
-					xlogctl->LogwrtRqst.Write = OldPageRqstPtr;
-				LogwrtResult = xlogctl->LogwrtResult;
-				SpinLockRelease(&xlogctl->info_lck);
-			}
+			SpinLockAcquire(&XLogCtl->info_lck);
+			if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
+				XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
+			LogwrtResult = XLogCtl->LogwrtResult;
+			SpinLockRelease(&XLogCtl->info_lck);
 
 			/*
 			 * Now that we have an up-to-date LogwrtResult value, see if we
@@ -2548,16 +2540,13 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 	 * code in a couple of places.
 	 */
 	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile XLogCtlData *xlogctl = XLogCtl;
-
-		SpinLockAcquire(&xlogctl->info_lck);
-		xlogctl->LogwrtResult = LogwrtResult;
-		if (xlogctl->LogwrtRqst.Write < LogwrtResult.Write)
-			xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
-		if (xlogctl->LogwrtRqst.Flush < LogwrtResult.Flush)
-			xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
-		SpinLockRelease(&xlogctl->info_lck);
+		SpinLockAcquire(&XLogCtl->info_lck);
+		XLogCtl->LogwrtResult = LogwrtResult;
+		if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
+			XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
+		if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
+			XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
+		SpinLockRelease(&XLogCtl->info_lck);
 	}
 }
 
@@ -2572,15 +2561,12 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 	XLogRecPtr	WriteRqstPtr = asyncXactLSN;
 	bool		sleeping;
 
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
-
-	SpinLockAcquire(&xlogctl->info_lck);
-	LogwrtResult = xlogctl->LogwrtResult;
-	sleeping = xlogctl->WalWriterSleeping;
-	if (xlogctl->asyncXactLSN < asyncXactLSN)
-		xlogctl->asyncXactLSN = asyncXactLSN;
-	SpinLockRelease(&xlogctl->info_lck);
+	SpinLockAcquire(&XLogCtl->info_lck);
+	LogwrtResult = XLogCtl->LogwrtResult;
+	sleeping = XLogCtl->WalWriterSleeping;
+	if (XLogCtl->asyncXactLSN < asyncXactLSN)
+		XLogCtl->asyncXactLSN = asyncXactLSN;
+	SpinLockRelease(&XLogCtl->info_lck);
 
 	/*
 	 * If the WALWriter is sleeping, we should kick it to make it come out of
@@ -2613,12 +2599,9 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 void
 XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
 {
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
-
-	SpinLockAcquire(&xlogctl->info_lck);
-	xlogctl->replicationSlotMinLSN = lsn;
-	SpinLockRelease(&xlogctl->info_lck);
+	SpinLockAcquire(&XLogCtl->info_lck);
+	XLogCtl->replicationSlotMinLSN = lsn;
+	SpinLockRelease(&XLogCtl->info_lck);
 }
 
 
@@ -2629,13 +2612,11 @@ XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
 static XLogRecPtr
 XLogGetReplicationSlotMinimumLSN(void)
 {
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
 	XLogRecPtr	retval;
 
-	SpinLockAcquire(&xlogctl->info_lck);
-	retval = xlogctl->replicationSlotMinLSN;
-	SpinLockRelease(&xlogctl->info_lck);
+	SpinLockAcquire(&XLogCtl->info_lck);
+	retval = XLogCtl->replicationSlotMinLSN;
+	SpinLockRelease(&XLogCtl->info_lck);
 
 	return retval;
 }
@@ -2671,8 +2652,6 @@ UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force)
 		updateMinRecoveryPoint = false;
 	else if (force || minRecoveryPoint < lsn)
 	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile XLogCtlData *xlogctl = XLogCtl;
 		XLogRecPtr	newMinRecoveryPoint;
 		TimeLineID	newMinRecoveryPointTLI;
 
@@ -2689,10 +2668,10 @@ UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force)
 		 * all.  Instead, we just log a warning and continue with recovery.
 		 * (See also the comments about corrupt LSNs in XLogFlush.)
 		 */
-		SpinLockAcquire(&xlogctl->info_lck);
-		newMinRecoveryPoint = xlogctl->replayEndRecPtr;
-		newMinRecoveryPointTLI = xlogctl->replayEndTLI;
-		SpinLockRelease(&xlogctl->info_lck);
+		SpinLockAcquire(&XLogCtl->info_lck);
+		newMinRecoveryPoint = XLogCtl->replayEndRecPtr;
+		newMinRecoveryPointTLI = XLogCtl->replayEndTLI;
+		SpinLockRelease(&XLogCtl->info_lck);
 
 		if (!force && newMinRecoveryPoint < lsn)
 			elog(WARNING,
@@ -2776,16 +2755,14 @@ XLogFlush(XLogRecPtr record)
 	 */
 	for (;;)
 	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile XLogCtlData *xlogctl = XLogCtl;
 		XLogRecPtr	insertpos;
 
 		/* read LogwrtResult and update local state */
-		SpinLockAcquire(&xlogctl->info_lck);
-		if (WriteRqstPtr < xlogctl->LogwrtRqst.Write)
-			WriteRqstPtr = xlogctl->LogwrtRqst.Write;
-		LogwrtResult = xlogctl->LogwrtResult;
-		SpinLockRelease(&xlogctl->info_lck);
+		SpinLockAcquire(&XLogCtl->info_lck);
+		if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
+			WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
+		LogwrtResult = XLogCtl->LogwrtResult;
+		SpinLockRelease(&XLogCtl->info_lck);
 
 		/* done already? */
 		if (record <= LogwrtResult.Flush)
@@ -2922,15 +2899,10 @@ XLogBackgroundFlush(void)
 		return false;
 
 	/* read LogwrtResult and update local state */
-	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile XLogCtlData *xlogctl = XLogCtl;
-
-		SpinLockAcquire(&xlogctl->info_lck);
-		LogwrtResult = xlogctl->LogwrtResult;
-		WriteRqstPtr = xlogctl->LogwrtRqst.Write;
-		SpinLockRelease(&xlogctl->info_lck);
-	}
+	SpinLockAcquire(&XLogCtl->info_lck);
+	LogwrtResult = XLogCtl->LogwrtResult;
+	WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
+	SpinLockRelease(&XLogCtl->info_lck);
 
 	/* back off to last completed page boundary */
 	WriteRqstPtr -= WriteRqstPtr % XLOG_BLCKSZ;
@@ -2938,12 +2910,9 @@ XLogBackgroundFlush(void)
 	/* if we have already flushed that far, consider async commit records */
 	if (WriteRqstPtr <= LogwrtResult.Flush)
 	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile XLogCtlData *xlogctl = XLogCtl;
-
-		SpinLockAcquire(&xlogctl->info_lck);
-		WriteRqstPtr = xlogctl->asyncXactLSN;
-		SpinLockRelease(&xlogctl->info_lck);
+		SpinLockAcquire(&XLogCtl->info_lck);
+		WriteRqstPtr = XLogCtl->asyncXactLSN;
+		SpinLockRelease(&XLogCtl->info_lck);
 		flexible = false;		/* ensure it all gets written */
 	}
 
@@ -3054,14 +3023,9 @@ XLogNeedsFlush(XLogRecPtr record)
 		return false;
 
 	/* read LogwrtResult and update local state */
-	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile XLogCtlData *xlogctl = XLogCtl;
-
-		SpinLockAcquire(&xlogctl->info_lck);
-		LogwrtResult = xlogctl->LogwrtResult;
-		SpinLockRelease(&xlogctl->info_lck);
-	}
+	SpinLockAcquire(&XLogCtl->info_lck);
+	LogwrtResult = XLogCtl->LogwrtResult;
+	SpinLockRelease(&XLogCtl->info_lck);
 
 	/* check again */
 	if (record <= LogwrtResult.Flush)
@@ -3683,13 +3647,11 @@ PreallocXlogFiles(XLogRecPtr endptr)
 void
 CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
 {
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
 	XLogSegNo	lastRemovedSegNo;
 
-	SpinLockAcquire(&xlogctl->info_lck);
-	lastRemovedSegNo = xlogctl->lastRemovedSegNo;
-	SpinLockRelease(&xlogctl->info_lck);
+	SpinLockAcquire(&XLogCtl->info_lck);
+	lastRemovedSegNo = XLogCtl->lastRemovedSegNo;
+	SpinLockRelease(&XLogCtl->info_lck);
 
 	if (segno <= lastRemovedSegNo)
 	{
@@ -3713,13 +3675,11 @@ CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
 XLogSegNo
 XLogGetLastRemovedSegno(void)
 {
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
 	XLogSegNo	lastRemovedSegNo;
 
-	SpinLockAcquire(&xlogctl->info_lck);
-	lastRemovedSegNo = xlogctl->lastRemovedSegNo;
-	SpinLockRelease(&xlogctl->info_lck);
+	SpinLockAcquire(&XLogCtl->info_lck);
+	lastRemovedSegNo = XLogCtl->lastRemovedSegNo;
+	SpinLockRelease(&XLogCtl->info_lck);
 
 	return lastRemovedSegNo;
 }
@@ -3731,17 +3691,15 @@ XLogGetLastRemovedSegno(void)
 static void
 UpdateLastRemovedPtr(char *filename)
 {
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
 	uint32		tli;
 	XLogSegNo	segno;
 
 	XLogFromFileName(filename, &tli, &segno);
 
-	SpinLockAcquire(&xlogctl->info_lck);
-	if (segno > xlogctl->lastRemovedSegNo)
-		xlogctl->lastRemovedSegNo = segno;
-	SpinLockRelease(&xlogctl->info_lck);
+	SpinLockAcquire(&XLogCtl->info_lck);
+	if (segno > XLogCtl->lastRemovedSegNo)
+		XLogCtl->lastRemovedSegNo = segno;
+	SpinLockRelease(&XLogCtl->info_lck);
 }
 
 /*
@@ -4699,13 +4657,10 @@ GetFakeLSNForUnloggedRel(void)
 {
 	XLogRecPtr	nextUnloggedLSN;
 
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
-
 	/* increment the unloggedLSN counter, need SpinLock */
-	SpinLockAcquire(&xlogctl->ulsn_lck);
-	nextUnloggedLSN = xlogctl->unloggedLSN++;
-	SpinLockRelease(&xlogctl->ulsn_lck);
+	SpinLockAcquire(&XLogCtl->ulsn_lck);
+	nextUnloggedLSN = XLogCtl->unloggedLSN++;
+	SpinLockRelease(&XLogCtl->ulsn_lck);
 
 	return nextUnloggedLSN;
 }
@@ -5737,13 +5692,11 @@ recoveryPausesHere(void)
 bool
 RecoveryIsPaused(void)
 {
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
 	bool		recoveryPause;
 
-	SpinLockAcquire(&xlogctl->info_lck);
-	recoveryPause = xlogctl->recoveryPause;
-	SpinLockRelease(&xlogctl->info_lck);
+	SpinLockAcquire(&XLogCtl->info_lck);
+	recoveryPause = XLogCtl->recoveryPause;
+	SpinLockRelease(&XLogCtl->info_lck);
 
 	return recoveryPause;
 }
@@ -5751,12 +5704,9 @@ RecoveryIsPaused(void)
 void
 SetRecoveryPause(bool recoveryPause)
 {
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
-
-	SpinLockAcquire(&xlogctl->info_lck);
-	xlogctl->recoveryPause = recoveryPause;
-	SpinLockRelease(&xlogctl->info_lck);
+	SpinLockAcquire(&XLogCtl->info_lck);
+	XLogCtl->recoveryPause = recoveryPause;
+	SpinLockRelease(&XLogCtl->info_lck);
 }
 
 /*
@@ -5854,12 +5804,9 @@ recoveryApplyDelay(XLogRecord *record)
 static void
 SetLatestXTime(TimestampTz xtime)
 {
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
-
-	SpinLockAcquire(&xlogctl->info_lck);
-	xlogctl->recoveryLastXTime = xtime;
-	SpinLockRelease(&xlogctl->info_lck);
+	SpinLockAcquire(&XLogCtl->info_lck);
+	XLogCtl->recoveryLastXTime = xtime;
+	SpinLockRelease(&XLogCtl->info_lck);
 }
 
 /*
@@ -5868,13 +5815,11 @@ SetLatestXTime(TimestampTz xtime)
 TimestampTz
 GetLatestXTime(void)
 {
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
 	TimestampTz xtime;
 
-	SpinLockAcquire(&xlogctl->info_lck);
-	xtime = xlogctl->recoveryLastXTime;
-	SpinLockRelease(&xlogctl->info_lck);
+	SpinLockAcquire(&XLogCtl->info_lck);
+	xtime = XLogCtl->recoveryLastXTime;
+	SpinLockRelease(&XLogCtl->info_lck);
 
 	return xtime;
 }
@@ -5888,12 +5833,9 @@ GetLatestXTime(void)
 static void
 SetCurrentChunkStartTime(TimestampTz xtime)
 {
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
-
-	SpinLockAcquire(&xlogctl->info_lck);
-	xlogctl->currentChunkStartTime = xtime;
-	SpinLockRelease(&xlogctl->info_lck);
+	SpinLockAcquire(&XLogCtl->info_lck);
+	XLogCtl->currentChunkStartTime = xtime;
+	SpinLockRelease(&XLogCtl->info_lck);
 }
 
 /*
@@ -5903,13 +5845,11 @@ SetCurrentChunkStartTime(TimestampTz xtime)
 TimestampTz
 GetCurrentChunkReplayStartTime(void)
 {
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
 	TimestampTz xtime;
 
-	SpinLockAcquire(&xlogctl->info_lck);
-	xtime = xlogctl->currentChunkStartTime;
-	SpinLockRelease(&xlogctl->info_lck);
+	SpinLockAcquire(&XLogCtl->info_lck);
+	xtime = XLogCtl->currentChunkStartTime;
+	SpinLockRelease(&XLogCtl->info_lck);
 
 	return xtime;
 }
@@ -6433,9 +6373,6 @@ StartupXLOG(void)
 	{
 		int			rmid;
 
-		/* use volatile pointer to prevent code rearrangement */
-		volatile XLogCtlData *xlogctl = XLogCtl;
-
 		/*
 		 * Update pg_control to show that we are recovering and to show the
 		 * selected checkpoint as the place we are starting from. We also mark
@@ -6622,18 +6559,18 @@ StartupXLOG(void)
 		 * if we had just replayed the record before the REDO location (or the
 		 * checkpoint record itself, if it's a shutdown checkpoint).
 		 */
-		SpinLockAcquire(&xlogctl->info_lck);
+		SpinLockAcquire(&XLogCtl->info_lck);
 		if (checkPoint.redo < RecPtr)
-			xlogctl->replayEndRecPtr = checkPoint.redo;
+			XLogCtl->replayEndRecPtr = checkPoint.redo;
 		else
-			xlogctl->replayEndRecPtr = EndRecPtr;
-		xlogctl->replayEndTLI = ThisTimeLineID;
-		xlogctl->lastReplayedEndRecPtr = xlogctl->replayEndRecPtr;
-		xlogctl->lastReplayedTLI = xlogctl->replayEndTLI;
-		xlogctl->recoveryLastXTime = 0;
-		xlogctl->currentChunkStartTime = 0;
-		xlogctl->recoveryPause = false;
-		SpinLockRelease(&xlogctl->info_lck);
+			XLogCtl->replayEndRecPtr = EndRecPtr;
+		XLogCtl->replayEndTLI = ThisTimeLineID;
+		XLogCtl->lastReplayedEndRecPtr = XLogCtl->replayEndRecPtr;
+		XLogCtl->lastReplayedTLI = XLogCtl->replayEndTLI;
+		XLogCtl->recoveryLastXTime = 0;
+		XLogCtl->currentChunkStartTime = 0;
+		XLogCtl->recoveryPause = false;
+		SpinLockRelease(&XLogCtl->info_lck);
 
 		/* Also ensure XLogReceiptTime has a sane value */
 		XLogReceiptTime = GetCurrentTimestamp();
@@ -6732,7 +6669,7 @@ StartupXLOG(void)
 				 * otherwise would is a minor issue, so it doesn't seem worth
 				 * adding another spinlock cycle to prevent that.
 				 */
-				if (xlogctl->recoveryPause)
+				if (((volatile XLogCtlData*) XLogCtl)->recoveryPause)
 					recoveryPausesHere();
 
 				/*
@@ -6757,7 +6694,7 @@ StartupXLOG(void)
 					 * here otherwise pausing during the delay-wait wouldn't
 					 * work.
 					 */
-					if (xlogctl->recoveryPause)
+					if (((volatile XLogCtlData*) XLogCtl)->recoveryPause)
 						recoveryPausesHere();
 				}
 
@@ -6830,10 +6767,10 @@ StartupXLOG(void)
 				 * Update shared replayEndRecPtr before replaying this record,
 				 * so that XLogFlush will update minRecoveryPoint correctly.
 				 */
-				SpinLockAcquire(&xlogctl->info_lck);
-				xlogctl->replayEndRecPtr = EndRecPtr;
-				xlogctl->replayEndTLI = ThisTimeLineID;
-				SpinLockRelease(&xlogctl->info_lck);
+				SpinLockAcquire(&XLogCtl->info_lck);
+				XLogCtl->replayEndRecPtr = EndRecPtr;
+				XLogCtl->replayEndTLI = ThisTimeLineID;
+				SpinLockRelease(&XLogCtl->info_lck);
 
 				/*
 				 * If we are attempting to enter Hot Standby mode, process
@@ -6853,10 +6790,10 @@ StartupXLOG(void)
 				 * Update lastReplayedEndRecPtr after this record has been
 				 * successfully replayed.
 				 */
-				SpinLockAcquire(&xlogctl->info_lck);
-				xlogctl->lastReplayedEndRecPtr = EndRecPtr;
-				xlogctl->lastReplayedTLI = ThisTimeLineID;
-				SpinLockRelease(&xlogctl->info_lck);
+				SpinLockAcquire(&XLogCtl->info_lck);
+				XLogCtl->lastReplayedEndRecPtr = EndRecPtr;
+				XLogCtl->lastReplayedTLI = ThisTimeLineID;
+				SpinLockRelease(&XLogCtl->info_lck);
 
 				/* Remember this record as the last-applied one */
 				LastRec = ReadRecPtr;
@@ -7266,14 +7203,9 @@ StartupXLOG(void)
 	 * there are no race conditions concerning visibility of other recent
 	 * updates to shared memory.)
 	 */
-	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile XLogCtlData *xlogctl = XLogCtl;
-
-		SpinLockAcquire(&xlogctl->info_lck);
-		xlogctl->SharedRecoveryInProgress = false;
-		SpinLockRelease(&xlogctl->info_lck);
-	}
+	SpinLockAcquire(&XLogCtl->info_lck);
+	XLogCtl->SharedRecoveryInProgress = false;
+	SpinLockRelease(&XLogCtl->info_lck);
 
 	/*
 	 * If there were cascading standby servers connected to us, nudge any wal
@@ -7376,12 +7308,9 @@ CheckRecoveryConsistency(void)
 		reachedConsistency &&
 		IsUnderPostmaster)
 	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile XLogCtlData *xlogctl = XLogCtl;
-
-		SpinLockAcquire(&xlogctl->info_lck);
-		xlogctl->SharedHotStandbyActive = true;
-		SpinLockRelease(&xlogctl->info_lck);
+		SpinLockAcquire(&XLogCtl->info_lck);
+		XLogCtl->SharedHotStandbyActive = true;
+		SpinLockRelease(&XLogCtl->info_lck);
 
 		LocalHotStandbyActive = true;
 
@@ -7466,13 +7395,10 @@ HotStandbyActive(void)
 		return true;
 	else
 	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile XLogCtlData *xlogctl = XLogCtl;
-
 		/* spinlock is essential on machines with weak memory ordering! */
-		SpinLockAcquire(&xlogctl->info_lck);
-		LocalHotStandbyActive = xlogctl->SharedHotStandbyActive;
-		SpinLockRelease(&xlogctl->info_lck);
+		SpinLockAcquire(&XLogCtl->info_lck);
+		LocalHotStandbyActive = XLogCtl->SharedHotStandbyActive;
+		SpinLockRelease(&XLogCtl->info_lck);
 
 		return LocalHotStandbyActive;
 	}
@@ -7687,8 +7613,6 @@ InitXLOGAccess(void)
 XLogRecPtr
 GetRedoRecPtr(void)
 {
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
 	XLogRecPtr	ptr;
 
 	/*
@@ -7696,9 +7620,9 @@ GetRedoRecPtr(void)
 	 * grabbed a WAL insertion lock to read the master copy, someone might
 	 * update it just after we've released the lock.
 	 */
-	SpinLockAcquire(&xlogctl->info_lck);
-	ptr = xlogctl->RedoRecPtr;
-	SpinLockRelease(&xlogctl->info_lck);
+	SpinLockAcquire(&XLogCtl->info_lck);
+	ptr = XLogCtl->RedoRecPtr;
+	SpinLockRelease(&XLogCtl->info_lck);
 
 	if (RedoRecPtr < ptr)
 		RedoRecPtr = ptr;
@@ -7717,13 +7641,11 @@ GetRedoRecPtr(void)
 XLogRecPtr
 GetInsertRecPtr(void)
 {
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
 	XLogRecPtr	recptr;
 
-	SpinLockAcquire(&xlogctl->info_lck);
-	recptr = xlogctl->LogwrtRqst.Write;
-	SpinLockRelease(&xlogctl->info_lck);
+	SpinLockAcquire(&XLogCtl->info_lck);
+	recptr = XLogCtl->LogwrtRqst.Write;
+	SpinLockRelease(&XLogCtl->info_lck);
 
 	return recptr;
 }
@@ -7735,13 +7657,11 @@ GetInsertRecPtr(void)
 XLogRecPtr
 GetFlushRecPtr(void)
 {
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
 	XLogRecPtr	recptr;
 
-	SpinLockAcquire(&xlogctl->info_lck);
-	recptr = xlogctl->LogwrtResult.Flush;
-	SpinLockRelease(&xlogctl->info_lck);
+	SpinLockAcquire(&XLogCtl->info_lck);
+	recptr = XLogCtl->LogwrtResult.Flush;
+	SpinLockRelease(&XLogCtl->info_lck);
 
 	return recptr;
 }
@@ -7778,15 +7698,10 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
 	TransactionId nextXid;
 
 	/* Must read checkpoint info first, else have race condition */
-	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile XLogCtlData *xlogctl = XLogCtl;
-
-		SpinLockAcquire(&xlogctl->info_lck);
-		ckptXidEpoch = xlogctl->ckptXidEpoch;
-		ckptXid = xlogctl->ckptXid;
-		SpinLockRelease(&xlogctl->info_lck);
-	}
+	SpinLockAcquire(&XLogCtl->info_lck);
+	ckptXidEpoch = XLogCtl->ckptXidEpoch;
+	ckptXid = XLogCtl->ckptXid;
+	SpinLockRelease(&XLogCtl->info_lck);
 
 	/* Now fetch current nextXid */
 	nextXid = ReadNewTransactionId();
@@ -7989,8 +7904,6 @@ LogCheckpointEnd(bool restartpoint)
 void
 CreateCheckPoint(int flags)
 {
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
 	bool		shutdown;
 	CheckPoint	checkPoint;
 	XLogRecPtr	recptr;
@@ -8150,7 +8063,7 @@ CreateCheckPoint(int flags)
 	 * XLogInserts that happen while we are dumping buffers must assume that
 	 * their buffer changes are not included in the checkpoint.
 	 */
-	RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
+	RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
 
 	/*
 	 * Now we can release the WAL insertion locks, allowing other xacts to
@@ -8159,9 +8072,9 @@ CreateCheckPoint(int flags)
 	WALInsertLockRelease();
 
 	/* Update the info_lck-protected copy of RedoRecPtr as well */
-	SpinLockAcquire(&xlogctl->info_lck);
-	xlogctl->RedoRecPtr = checkPoint.redo;
-	SpinLockRelease(&xlogctl->info_lck);
+	SpinLockAcquire(&XLogCtl->info_lck);
+	XLogCtl->RedoRecPtr = checkPoint.redo;
+	SpinLockRelease(&XLogCtl->info_lck);
 
 	/*
 	 * If enabled, log checkpoint start.  We postpone this until now so as not
@@ -8333,15 +8246,10 @@ CreateCheckPoint(int flags)
 	LWLockRelease(ControlFileLock);
 
 	/* Update shared-memory copy of checkpoint XID/epoch */
-	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile XLogCtlData *xlogctl = XLogCtl;
-
-		SpinLockAcquire(&xlogctl->info_lck);
-		xlogctl->ckptXidEpoch = checkPoint.nextXidEpoch;
-		xlogctl->ckptXid = checkPoint.nextXid;
-		SpinLockRelease(&xlogctl->info_lck);
-	}
+	SpinLockAcquire(&XLogCtl->info_lck);
+	XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
+	XLogCtl->ckptXid = checkPoint.nextXid;
+	SpinLockRelease(&XLogCtl->info_lck);
 
 	/*
 	 * We are now done with critical updates; no need for system panic if we
@@ -8496,9 +8404,6 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 static void
 RecoveryRestartPoint(const CheckPoint *checkPoint)
 {
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
-
 	/*
 	 * Also refrain from creating a restartpoint if we have seen any
 	 * references to non-existent pages. Restarting recovery from the
@@ -8520,10 +8425,10 @@ RecoveryRestartPoint(const CheckPoint *checkPoint)
 	 * Copy the checkpoint record to shared memory, so that checkpointer can
 	 * work out the next time it wants to perform a restartpoint.
 	 */
-	SpinLockAcquire(&xlogctl->info_lck);
-	xlogctl->lastCheckPointRecPtr = ReadRecPtr;
-	xlogctl->lastCheckPoint = *checkPoint;
-	SpinLockRelease(&xlogctl->info_lck);
+	SpinLockAcquire(&XLogCtl->info_lck);
+	XLogCtl->lastCheckPointRecPtr = ReadRecPtr;
+	XLogCtl->lastCheckPoint = *checkPoint;
+	SpinLockRelease(&XLogCtl->info_lck);
 }
 
 /*
@@ -8545,9 +8450,6 @@ CreateRestartPoint(int flags)
 	XLogSegNo	_logSegNo;
 	TimestampTz xtime;
 
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
-
 	/*
 	 * Acquire CheckpointLock to ensure only one restartpoint or checkpoint
 	 * happens at a time.
@@ -8555,10 +8457,10 @@ CreateRestartPoint(int flags)
 	LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
 
 	/* Get a local copy of the last safe checkpoint record. */
-	SpinLockAcquire(&xlogctl->info_lck);
-	lastCheckPointRecPtr = xlogctl->lastCheckPointRecPtr;
-	lastCheckPoint = xlogctl->lastCheckPoint;
-	SpinLockRelease(&xlogctl->info_lck);
+	SpinLockAcquire(&XLogCtl->info_lck);
+	lastCheckPointRecPtr = XLogCtl->lastCheckPointRecPtr;
+	lastCheckPoint = XLogCtl->lastCheckPoint;
+	SpinLockRelease(&XLogCtl->info_lck);
 
 	/*
 	 * Check that we're still in recovery mode. It's ok if we exit recovery
@@ -8615,15 +8517,18 @@ CreateRestartPoint(int flags)
 	 * Like in CreateCheckPoint(), hold off insertions to update it, although
 	 * during recovery this is just pro forma, because no WAL insertions are
 	 * happening.
+	 *
+	 * NB: XXX: It's safe to access XLogCtl->Insert.RedoRecPtr here because
+	 * that variable is protected by the wal insert lock, not info_lck.
 	 */
 	WALInsertLockAcquireExclusive();
-	xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo;
+	XLogCtl->Insert.RedoRecPtr = lastCheckPoint.redo;
 	WALInsertLockRelease();
 
 	/* Also update the info_lck-protected copy */
-	SpinLockAcquire(&xlogctl->info_lck);
-	xlogctl->RedoRecPtr = lastCheckPoint.redo;
-	SpinLockRelease(&xlogctl->info_lck);
+	SpinLockAcquire(&XLogCtl->info_lck);
+	XLogCtl->RedoRecPtr = lastCheckPoint.redo;
+	SpinLockRelease(&XLogCtl->info_lck);
 
 	/*
 	 * Prepare to accumulate statistics.
@@ -9383,15 +9288,10 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 		ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
 
 		/* Update shared-memory copy of checkpoint XID/epoch */
-		{
-			/* use volatile pointer to prevent code rearrangement */
-			volatile XLogCtlData *xlogctl = XLogCtl;
-
-			SpinLockAcquire(&xlogctl->info_lck);
-			xlogctl->ckptXidEpoch = checkPoint.nextXidEpoch;
-			xlogctl->ckptXid = checkPoint.nextXid;
-			SpinLockRelease(&xlogctl->info_lck);
-		}
+		SpinLockAcquire(&XLogCtl->info_lck);
+		XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
+		XLogCtl->ckptXid = checkPoint.nextXid;
+		SpinLockRelease(&XLogCtl->info_lck);
 
 		/*
 		 * We should've already switched to the new TLI before replaying this
@@ -9435,15 +9335,10 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 		ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
 
 		/* Update shared-memory copy of checkpoint XID/epoch */
-		{
-			/* use volatile pointer to prevent code rearrangement */
-			volatile XLogCtlData *xlogctl = XLogCtl;
-
-			SpinLockAcquire(&xlogctl->info_lck);
-			xlogctl->ckptXidEpoch = checkPoint.nextXidEpoch;
-			xlogctl->ckptXid = checkPoint.nextXid;
-			SpinLockRelease(&xlogctl->info_lck);
-		}
+		SpinLockAcquire(&XLogCtl->info_lck);
+		XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
+		XLogCtl->ckptXid = checkPoint.nextXid;
+		SpinLockRelease(&XLogCtl->info_lck);
 
 		/* TLI should not change in an on-line checkpoint */
 		if (checkPoint.ThisTimeLineID != ThisTimeLineID)
@@ -9580,8 +9475,6 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 	}
 	else if (info == XLOG_FPW_CHANGE)
 	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile XLogCtlData *xlogctl = XLogCtl;
 		bool		fpw;
 
 		memcpy(&fpw, XLogRecGetData(record), sizeof(bool));
@@ -9593,10 +9486,10 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 		 */
 		if (!fpw)
 		{
-			SpinLockAcquire(&xlogctl->info_lck);
-			if (xlogctl->lastFpwDisableRecPtr < ReadRecPtr)
-				xlogctl->lastFpwDisableRecPtr = ReadRecPtr;
-			SpinLockRelease(&xlogctl->info_lck);
+			SpinLockAcquire(&XLogCtl->info_lck);
+			if (XLogCtl->lastFpwDisableRecPtr < ReadRecPtr)
+				XLogCtl->lastFpwDisableRecPtr = ReadRecPtr;
+			SpinLockRelease(&XLogCtl->info_lck);
 		}
 
 		/* Keep track of full_page_writes */
@@ -9951,8 +9844,6 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 
 			if (backup_started_in_recovery)
 			{
-				/* use volatile pointer to prevent code rearrangement */
-				volatile XLogCtlData *xlogctl = XLogCtl;
 				XLogRecPtr	recptr;
 
 				/*
@@ -9960,9 +9851,9 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 				 * (i.e., since last restartpoint used as backup starting
 				 * checkpoint) contain full-page writes.
 				 */
-				SpinLockAcquire(&xlogctl->info_lck);
-				recptr = xlogctl->lastFpwDisableRecPtr;
-				SpinLockRelease(&xlogctl->info_lck);
+				SpinLockAcquire(&XLogCtl->info_lck);
+				recptr = XLogCtl->lastFpwDisableRecPtr;
+				SpinLockRelease(&XLogCtl->info_lck);
 
 				if (!checkpointfpw || startpoint <= recptr)
 					ereport(ERROR,
@@ -10305,17 +10196,15 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
 	 */
 	if (backup_started_in_recovery)
 	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile XLogCtlData *xlogctl = XLogCtl;
 		XLogRecPtr	recptr;
 
 		/*
 		 * Check to see if all WAL replayed during online backup contain
 		 * full-page writes.
 		 */
-		SpinLockAcquire(&xlogctl->info_lck);
-		recptr = xlogctl->lastFpwDisableRecPtr;
-		SpinLockRelease(&xlogctl->info_lck);
+		SpinLockAcquire(&XLogCtl->info_lck);
+		recptr = XLogCtl->lastFpwDisableRecPtr;
+		SpinLockRelease(&XLogCtl->info_lck);
 
 		if (startpoint <= recptr)
 			ereport(ERROR,
@@ -10502,15 +10391,13 @@ do_pg_abort_backup(void)
 XLogRecPtr
 GetXLogReplayRecPtr(TimeLineID *replayTLI)
 {
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
 	XLogRecPtr	recptr;
 	TimeLineID	tli;
 
-	SpinLockAcquire(&xlogctl->info_lck);
-	recptr = xlogctl->lastReplayedEndRecPtr;
-	tli = xlogctl->lastReplayedTLI;
-	SpinLockRelease(&xlogctl->info_lck);
+	SpinLockAcquire(&XLogCtl->info_lck);
+	recptr = XLogCtl->lastReplayedEndRecPtr;
+	tli = XLogCtl->lastReplayedTLI;
+	SpinLockRelease(&XLogCtl->info_lck);
 
 	if (replayTLI)
 		*replayTLI = tli;
@@ -10523,7 +10410,7 @@ GetXLogReplayRecPtr(TimeLineID *replayTLI)
 XLogRecPtr
 GetXLogInsertRecPtr(void)
 {
-	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+	XLogCtlInsert *Insert = &XLogCtl->Insert;
 	uint64		current_bytepos;
 
 	SpinLockAcquire(&Insert->insertpos_lck);
@@ -10539,14 +10426,9 @@ GetXLogInsertRecPtr(void)
 XLogRecPtr
 GetXLogWriteRecPtr(void)
 {
-	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile XLogCtlData *xlogctl = XLogCtl;
-
-		SpinLockAcquire(&xlogctl->info_lck);
-		LogwrtResult = xlogctl->LogwrtResult;
-		SpinLockRelease(&xlogctl->info_lck);
-	}
+	SpinLockAcquire(&XLogCtl->info_lck);
+	LogwrtResult = XLogCtl->LogwrtResult;
+	SpinLockRelease(&XLogCtl->info_lck);
 
 	return LogwrtResult.Write;
 }
@@ -11372,10 +11254,7 @@ WakeupRecovery(void)
 void
 SetWalWriterSleeping(bool sleeping)
 {
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
-
-	SpinLockAcquire(&xlogctl->info_lck);
-	xlogctl->WalWriterSleeping = sleeping;
-	SpinLockRelease(&xlogctl->info_lck);
+	SpinLockAcquire(&XLogCtl->info_lck);
+	XLogCtl->WalWriterSleeping = sleeping;
+	SpinLockRelease(&XLogCtl->info_lck);
 }
-- 
2.0.0.rc2.4.g1dc51c6.dirty

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to