Hi, On 2014-09-10 14:53:07 -0400, Robert Haas wrote: > As discussed on the thread "Spinlocks and compiler/memory barriers", > now that we've made the spinlock primitives function as compiler > barriers (we think), it should be possible to remove volatile > qualifiers from many places in the source code. The attached patch > does this in lwlock.c. If the changes in commit > 0709b7ee72e4bc71ad07b7120acd117265ab51d0 (and follow-on commits) are > correct and complete, applying this shouldn't break anything, while > possibly giving the compiler room to optimize things better than it > does today. > > However, demonstrating the necessity of that commit for these changes > seems to be non-trivial. I tried applying this patch and reverting > commits 5b26278822c69dd76ef89fd50ecc7cdba9c3f035, > b4c28d1b92c81941e4fc124884e51a7c110316bf, and > 0709b7ee72e4bc71ad07b7120acd117265ab51d0 on a PPC64 POWER8 box with a > whopping 192 hardware threads (thanks, IBM!). I then ran the > regression tests repeatedly, and I ran several long pgbench runs with > as many as 350 concurrent clients. No failures.
There's actually one more commit to revert. What I used was: git revert 5b26278822c69dd76ef89fd50ecc7cdba9c3f035 \ b4c28d1b92c81941e4fc124884e51a7c110316bf \ 68e66923ff629c324e219090860dc9e0e0a6f5d6 \ 0709b7ee72e4bc71ad07b7120acd117265ab51d0 > So I'm posting this patch in the hope that others can help. The > relevant tests are: > > 1. If you apply this patch to master and run tests of whatever kind > strikes your fancy, does anything break under high concurrency? If it > does, then the above commits weren't enough to make this safe on your > platform. > > 2. If you apply this patch to master, revert the commits mentioned > above, and again run tests, does anything now break? If it does (but > the first tests were OK), then that shows that those commits did > something useful on your platform. I just tried this on my normal x86 workstation. I applied your lwlock patch and ontop I removed most volatiles (there's a couple still required) from xlog.c. Works for 100 seconds. Then I reverted the above commits. Breaks within seconds: master: LOG: request to flush past end of generated WAL; request 2/E5EC3DE0, currpos 2/E5EC1E60 standby: LOG: record with incorrect prev-link 4/684C3108 at 4/684C3108 and similar. So at least for x86 the compiler barriers are obviously required and seemingly working. I've attached the very quickly written xlog.c de-volatizing patch. Greetings, Andres Freund -- Andres Freund http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
>From 2b68a134925e4b2fb6ff282f5ed83b8f57b10732 Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Wed, 17 Sep 2014 13:21:20 +0200 Subject: [PATCH] xlog.c-remove-volatile --- src/backend/access/transam/xlog.c | 473 ++++++++++++++------------------------ 1 file changed, 176 insertions(+), 297 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 34f2fc0..103f077 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1219,16 +1219,13 @@ begin:; */ if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); /* advance global request to include new block(s) */ - if (xlogctl->LogwrtRqst.Write < EndPos) - xlogctl->LogwrtRqst.Write = EndPos; + if (XLogCtl->LogwrtRqst.Write < EndPos) + XLogCtl->LogwrtRqst.Write = EndPos; /* update local result copy while I have the chance */ - LogwrtResult = xlogctl->LogwrtResult; - SpinLockRelease(&xlogctl->info_lck); + LogwrtResult = XLogCtl->LogwrtResult; + SpinLockRelease(&XLogCtl->info_lck); } /* @@ -1323,7 +1320,7 @@ static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr) { - volatile XLogCtlInsert *Insert = &XLogCtl->Insert; + XLogCtlInsert *Insert = &XLogCtl->Insert; uint64 startbytepos; uint64 endbytepos; uint64 prevbytepos; @@ -1378,7 +1375,7 @@ ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos, static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr) { - volatile XLogCtlInsert *Insert = &XLogCtl->Insert; + XLogCtlInsert *Insert = &XLogCtl->Insert; uint64 startbytepos; uint64 endbytepos; uint64 prevbytepos; @@ -1696,7 +1693,7 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) uint64 bytepos; XLogRecPtr reservedUpto; XLogRecPtr finishedUpto; - volatile XLogCtlInsert *Insert = &XLogCtl->Insert; + XLogCtlInsert *Insert = &XLogCtl->Insert; int i; if (MyProc == NULL) @@ -2131,16 +2128,11 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic) break; /* Before waiting, get info_lck and update LogwrtResult */ - { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - if (xlogctl->LogwrtRqst.Write < OldPageRqstPtr) - xlogctl->LogwrtRqst.Write = OldPageRqstPtr; - LogwrtResult = xlogctl->LogwrtResult; - SpinLockRelease(&xlogctl->info_lck); - } + SpinLockAcquire(&XLogCtl->info_lck); + if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr) + XLogCtl->LogwrtRqst.Write = OldPageRqstPtr; + LogwrtResult = XLogCtl->LogwrtResult; + SpinLockRelease(&XLogCtl->info_lck); /* * Now that we have an up-to-date LogwrtResult value, see if we @@ -2548,16 +2540,13 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) * code in a couple of places. */ { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->LogwrtResult = LogwrtResult; - if (xlogctl->LogwrtRqst.Write < LogwrtResult.Write) - xlogctl->LogwrtRqst.Write = LogwrtResult.Write; - if (xlogctl->LogwrtRqst.Flush < LogwrtResult.Flush) - xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + XLogCtl->LogwrtResult = LogwrtResult; + if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write) + XLogCtl->LogwrtRqst.Write = LogwrtResult.Write; + if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush) + XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush; + SpinLockRelease(&XLogCtl->info_lck); } } @@ -2572,15 +2561,12 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) XLogRecPtr WriteRqstPtr = asyncXactLSN; bool sleeping; - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - LogwrtResult = xlogctl->LogwrtResult; - sleeping = xlogctl->WalWriterSleeping; - if (xlogctl->asyncXactLSN < asyncXactLSN) - xlogctl->asyncXactLSN = asyncXactLSN; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + LogwrtResult = XLogCtl->LogwrtResult; + sleeping = XLogCtl->WalWriterSleeping; + if (XLogCtl->asyncXactLSN < asyncXactLSN) + XLogCtl->asyncXactLSN = asyncXactLSN; + SpinLockRelease(&XLogCtl->info_lck); /* * If the WALWriter is sleeping, we should kick it to make it come out of @@ -2613,12 +2599,9 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->replicationSlotMinLSN = lsn; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + XLogCtl->replicationSlotMinLSN = lsn; + SpinLockRelease(&XLogCtl->info_lck); } @@ -2629,13 +2612,11 @@ XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn) static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; XLogRecPtr retval; - SpinLockAcquire(&xlogctl->info_lck); - retval = xlogctl->replicationSlotMinLSN; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + retval = XLogCtl->replicationSlotMinLSN; + SpinLockRelease(&XLogCtl->info_lck); return retval; } @@ -2671,8 +2652,6 @@ UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force) updateMinRecoveryPoint = false; else if (force || minRecoveryPoint < lsn) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; XLogRecPtr newMinRecoveryPoint; TimeLineID newMinRecoveryPointTLI; @@ -2689,10 +2668,10 @@ UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force) * all. Instead, we just log a warning and continue with recovery. * (See also the comments about corrupt LSNs in XLogFlush.) */ - SpinLockAcquire(&xlogctl->info_lck); - newMinRecoveryPoint = xlogctl->replayEndRecPtr; - newMinRecoveryPointTLI = xlogctl->replayEndTLI; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + newMinRecoveryPoint = XLogCtl->replayEndRecPtr; + newMinRecoveryPointTLI = XLogCtl->replayEndTLI; + SpinLockRelease(&XLogCtl->info_lck); if (!force && newMinRecoveryPoint < lsn) elog(WARNING, @@ -2776,16 +2755,14 @@ XLogFlush(XLogRecPtr record) */ for (;;) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; XLogRecPtr insertpos; /* read LogwrtResult and update local state */ - SpinLockAcquire(&xlogctl->info_lck); - if (WriteRqstPtr < xlogctl->LogwrtRqst.Write) - WriteRqstPtr = xlogctl->LogwrtRqst.Write; - LogwrtResult = xlogctl->LogwrtResult; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write) + WriteRqstPtr = XLogCtl->LogwrtRqst.Write; + LogwrtResult = XLogCtl->LogwrtResult; + SpinLockRelease(&XLogCtl->info_lck); /* done already? */ if (record <= LogwrtResult.Flush) @@ -2922,15 +2899,10 @@ XLogBackgroundFlush(void) return false; /* read LogwrtResult and update local state */ - { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - LogwrtResult = xlogctl->LogwrtResult; - WriteRqstPtr = xlogctl->LogwrtRqst.Write; - SpinLockRelease(&xlogctl->info_lck); - } + SpinLockAcquire(&XLogCtl->info_lck); + LogwrtResult = XLogCtl->LogwrtResult; + WriteRqstPtr = XLogCtl->LogwrtRqst.Write; + SpinLockRelease(&XLogCtl->info_lck); /* back off to last completed page boundary */ WriteRqstPtr -= WriteRqstPtr % XLOG_BLCKSZ; @@ -2938,12 +2910,9 @@ XLogBackgroundFlush(void) /* if we have already flushed that far, consider async commit records */ if (WriteRqstPtr <= LogwrtResult.Flush) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - WriteRqstPtr = xlogctl->asyncXactLSN; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + WriteRqstPtr = XLogCtl->asyncXactLSN; + SpinLockRelease(&XLogCtl->info_lck); flexible = false; /* ensure it all gets written */ } @@ -3054,14 +3023,9 @@ XLogNeedsFlush(XLogRecPtr record) return false; /* read LogwrtResult and update local state */ - { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - LogwrtResult = xlogctl->LogwrtResult; - SpinLockRelease(&xlogctl->info_lck); - } + SpinLockAcquire(&XLogCtl->info_lck); + LogwrtResult = XLogCtl->LogwrtResult; + SpinLockRelease(&XLogCtl->info_lck); /* check again */ if (record <= LogwrtResult.Flush) @@ -3683,13 +3647,11 @@ PreallocXlogFiles(XLogRecPtr endptr) void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; XLogSegNo lastRemovedSegNo; - SpinLockAcquire(&xlogctl->info_lck); - lastRemovedSegNo = xlogctl->lastRemovedSegNo; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + lastRemovedSegNo = XLogCtl->lastRemovedSegNo; + SpinLockRelease(&XLogCtl->info_lck); if (segno <= lastRemovedSegNo) { @@ -3713,13 +3675,11 @@ CheckXLogRemoved(XLogSegNo segno, TimeLineID tli) XLogSegNo XLogGetLastRemovedSegno(void) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; XLogSegNo lastRemovedSegNo; - SpinLockAcquire(&xlogctl->info_lck); - lastRemovedSegNo = xlogctl->lastRemovedSegNo; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + lastRemovedSegNo = XLogCtl->lastRemovedSegNo; + SpinLockRelease(&XLogCtl->info_lck); return lastRemovedSegNo; } @@ -3731,17 +3691,15 @@ XLogGetLastRemovedSegno(void) static void UpdateLastRemovedPtr(char *filename) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; uint32 tli; XLogSegNo segno; XLogFromFileName(filename, &tli, &segno); - SpinLockAcquire(&xlogctl->info_lck); - if (segno > xlogctl->lastRemovedSegNo) - xlogctl->lastRemovedSegNo = segno; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + if (segno > XLogCtl->lastRemovedSegNo) + XLogCtl->lastRemovedSegNo = segno; + SpinLockRelease(&XLogCtl->info_lck); } /* @@ -4699,13 +4657,10 @@ GetFakeLSNForUnloggedRel(void) { XLogRecPtr nextUnloggedLSN; - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - /* increment the unloggedLSN counter, need SpinLock */ - SpinLockAcquire(&xlogctl->ulsn_lck); - nextUnloggedLSN = xlogctl->unloggedLSN++; - SpinLockRelease(&xlogctl->ulsn_lck); + SpinLockAcquire(&XLogCtl->ulsn_lck); + nextUnloggedLSN = XLogCtl->unloggedLSN++; + SpinLockRelease(&XLogCtl->ulsn_lck); return nextUnloggedLSN; } @@ -5737,13 +5692,11 @@ recoveryPausesHere(void) bool RecoveryIsPaused(void) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; bool recoveryPause; - SpinLockAcquire(&xlogctl->info_lck); - recoveryPause = xlogctl->recoveryPause; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + recoveryPause = XLogCtl->recoveryPause; + SpinLockRelease(&XLogCtl->info_lck); return recoveryPause; } @@ -5751,12 +5704,9 @@ RecoveryIsPaused(void) void SetRecoveryPause(bool recoveryPause) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->recoveryPause = recoveryPause; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + XLogCtl->recoveryPause = recoveryPause; + SpinLockRelease(&XLogCtl->info_lck); } /* @@ -5854,12 +5804,9 @@ recoveryApplyDelay(XLogRecord *record) static void SetLatestXTime(TimestampTz xtime) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->recoveryLastXTime = xtime; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + XLogCtl->recoveryLastXTime = xtime; + SpinLockRelease(&XLogCtl->info_lck); } /* @@ -5868,13 +5815,11 @@ SetLatestXTime(TimestampTz xtime) TimestampTz GetLatestXTime(void) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; TimestampTz xtime; - SpinLockAcquire(&xlogctl->info_lck); - xtime = xlogctl->recoveryLastXTime; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + xtime = XLogCtl->recoveryLastXTime; + SpinLockRelease(&XLogCtl->info_lck); return xtime; } @@ -5888,12 +5833,9 @@ GetLatestXTime(void) static void SetCurrentChunkStartTime(TimestampTz xtime) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->currentChunkStartTime = xtime; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + XLogCtl->currentChunkStartTime = xtime; + SpinLockRelease(&XLogCtl->info_lck); } /* @@ -5903,13 +5845,11 @@ SetCurrentChunkStartTime(TimestampTz xtime) TimestampTz GetCurrentChunkReplayStartTime(void) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; TimestampTz xtime; - SpinLockAcquire(&xlogctl->info_lck); - xtime = xlogctl->currentChunkStartTime; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + xtime = XLogCtl->currentChunkStartTime; + SpinLockRelease(&XLogCtl->info_lck); return xtime; } @@ -6433,9 +6373,6 @@ StartupXLOG(void) { int rmid; - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - /* * Update pg_control to show that we are recovering and to show the * selected checkpoint as the place we are starting from. We also mark @@ -6622,18 +6559,18 @@ StartupXLOG(void) * if we had just replayed the record before the REDO location (or the * checkpoint record itself, if it's a shutdown checkpoint). */ - SpinLockAcquire(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); if (checkPoint.redo < RecPtr) - xlogctl->replayEndRecPtr = checkPoint.redo; + XLogCtl->replayEndRecPtr = checkPoint.redo; else - xlogctl->replayEndRecPtr = EndRecPtr; - xlogctl->replayEndTLI = ThisTimeLineID; - xlogctl->lastReplayedEndRecPtr = xlogctl->replayEndRecPtr; - xlogctl->lastReplayedTLI = xlogctl->replayEndTLI; - xlogctl->recoveryLastXTime = 0; - xlogctl->currentChunkStartTime = 0; - xlogctl->recoveryPause = false; - SpinLockRelease(&xlogctl->info_lck); + XLogCtl->replayEndRecPtr = EndRecPtr; + XLogCtl->replayEndTLI = ThisTimeLineID; + XLogCtl->lastReplayedEndRecPtr = XLogCtl->replayEndRecPtr; + XLogCtl->lastReplayedTLI = XLogCtl->replayEndTLI; + XLogCtl->recoveryLastXTime = 0; + XLogCtl->currentChunkStartTime = 0; + XLogCtl->recoveryPause = false; + SpinLockRelease(&XLogCtl->info_lck); /* Also ensure XLogReceiptTime has a sane value */ XLogReceiptTime = GetCurrentTimestamp(); @@ -6732,7 +6669,7 @@ StartupXLOG(void) * otherwise would is a minor issue, so it doesn't seem worth * adding another spinlock cycle to prevent that. */ - if (xlogctl->recoveryPause) + if (((volatile XLogCtlData*) XLogCtl)->recoveryPause) recoveryPausesHere(); /* @@ -6757,7 +6694,7 @@ StartupXLOG(void) * here otherwise pausing during the delay-wait wouldn't * work. */ - if (xlogctl->recoveryPause) + if (((volatile XLogCtlData*) XLogCtl)->recoveryPause) recoveryPausesHere(); } @@ -6830,10 +6767,10 @@ StartupXLOG(void) * Update shared replayEndRecPtr before replaying this record, * so that XLogFlush will update minRecoveryPoint correctly. */ - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->replayEndRecPtr = EndRecPtr; - xlogctl->replayEndTLI = ThisTimeLineID; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + XLogCtl->replayEndRecPtr = EndRecPtr; + XLogCtl->replayEndTLI = ThisTimeLineID; + SpinLockRelease(&XLogCtl->info_lck); /* * If we are attempting to enter Hot Standby mode, process @@ -6853,10 +6790,10 @@ StartupXLOG(void) * Update lastReplayedEndRecPtr after this record has been * successfully replayed. */ - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->lastReplayedEndRecPtr = EndRecPtr; - xlogctl->lastReplayedTLI = ThisTimeLineID; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + XLogCtl->lastReplayedEndRecPtr = EndRecPtr; + XLogCtl->lastReplayedTLI = ThisTimeLineID; + SpinLockRelease(&XLogCtl->info_lck); /* Remember this record as the last-applied one */ LastRec = ReadRecPtr; @@ -7266,14 +7203,9 @@ StartupXLOG(void) * there are no race conditions concerning visibility of other recent * updates to shared memory.) */ - { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->SharedRecoveryInProgress = false; - SpinLockRelease(&xlogctl->info_lck); - } + SpinLockAcquire(&XLogCtl->info_lck); + XLogCtl->SharedRecoveryInProgress = false; + SpinLockRelease(&XLogCtl->info_lck); /* * If there were cascading standby servers connected to us, nudge any wal @@ -7376,12 +7308,9 @@ CheckRecoveryConsistency(void) reachedConsistency && IsUnderPostmaster) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->SharedHotStandbyActive = true; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + XLogCtl->SharedHotStandbyActive = true; + SpinLockRelease(&XLogCtl->info_lck); LocalHotStandbyActive = true; @@ -7466,13 +7395,10 @@ HotStandbyActive(void) return true; else { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - /* spinlock is essential on machines with weak memory ordering! */ - SpinLockAcquire(&xlogctl->info_lck); - LocalHotStandbyActive = xlogctl->SharedHotStandbyActive; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + LocalHotStandbyActive = XLogCtl->SharedHotStandbyActive; + SpinLockRelease(&XLogCtl->info_lck); return LocalHotStandbyActive; } @@ -7687,8 +7613,6 @@ InitXLOGAccess(void) XLogRecPtr GetRedoRecPtr(void) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; XLogRecPtr ptr; /* @@ -7696,9 +7620,9 @@ GetRedoRecPtr(void) * grabbed a WAL insertion lock to read the master copy, someone might * update it just after we've released the lock. */ - SpinLockAcquire(&xlogctl->info_lck); - ptr = xlogctl->RedoRecPtr; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + ptr = XLogCtl->RedoRecPtr; + SpinLockRelease(&XLogCtl->info_lck); if (RedoRecPtr < ptr) RedoRecPtr = ptr; @@ -7717,13 +7641,11 @@ GetRedoRecPtr(void) XLogRecPtr GetInsertRecPtr(void) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; XLogRecPtr recptr; - SpinLockAcquire(&xlogctl->info_lck); - recptr = xlogctl->LogwrtRqst.Write; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + recptr = XLogCtl->LogwrtRqst.Write; + SpinLockRelease(&XLogCtl->info_lck); return recptr; } @@ -7735,13 +7657,11 @@ GetInsertRecPtr(void) XLogRecPtr GetFlushRecPtr(void) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; XLogRecPtr recptr; - SpinLockAcquire(&xlogctl->info_lck); - recptr = xlogctl->LogwrtResult.Flush; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + recptr = XLogCtl->LogwrtResult.Flush; + SpinLockRelease(&XLogCtl->info_lck); return recptr; } @@ -7778,15 +7698,10 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch) TransactionId nextXid; /* Must read checkpoint info first, else have race condition */ - { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - ckptXidEpoch = xlogctl->ckptXidEpoch; - ckptXid = xlogctl->ckptXid; - SpinLockRelease(&xlogctl->info_lck); - } + SpinLockAcquire(&XLogCtl->info_lck); + ckptXidEpoch = XLogCtl->ckptXidEpoch; + ckptXid = XLogCtl->ckptXid; + SpinLockRelease(&XLogCtl->info_lck); /* Now fetch current nextXid */ nextXid = ReadNewTransactionId(); @@ -7989,8 +7904,6 @@ LogCheckpointEnd(bool restartpoint) void CreateCheckPoint(int flags) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; bool shutdown; CheckPoint checkPoint; XLogRecPtr recptr; @@ -8150,7 +8063,7 @@ CreateCheckPoint(int flags) * XLogInserts that happen while we are dumping buffers must assume that * their buffer changes are not included in the checkpoint. */ - RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo; + RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo; /* * Now we can release the WAL insertion locks, allowing other xacts to @@ -8159,9 +8072,9 @@ CreateCheckPoint(int flags) WALInsertLockRelease(); /* Update the info_lck-protected copy of RedoRecPtr as well */ - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->RedoRecPtr = checkPoint.redo; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + XLogCtl->RedoRecPtr = checkPoint.redo; + SpinLockRelease(&XLogCtl->info_lck); /* * If enabled, log checkpoint start. We postpone this until now so as not @@ -8333,15 +8246,10 @@ CreateCheckPoint(int flags) LWLockRelease(ControlFileLock); /* Update shared-memory copy of checkpoint XID/epoch */ - { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->ckptXidEpoch = checkPoint.nextXidEpoch; - xlogctl->ckptXid = checkPoint.nextXid; - SpinLockRelease(&xlogctl->info_lck); - } + SpinLockAcquire(&XLogCtl->info_lck); + XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch; + XLogCtl->ckptXid = checkPoint.nextXid; + SpinLockRelease(&XLogCtl->info_lck); /* * We are now done with critical updates; no need for system panic if we @@ -8496,9 +8404,6 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags) static void RecoveryRestartPoint(const CheckPoint *checkPoint) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - /* * Also refrain from creating a restartpoint if we have seen any * references to non-existent pages. Restarting recovery from the @@ -8520,10 +8425,10 @@ RecoveryRestartPoint(const CheckPoint *checkPoint) * Copy the checkpoint record to shared memory, so that checkpointer can * work out the next time it wants to perform a restartpoint. */ - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->lastCheckPointRecPtr = ReadRecPtr; - xlogctl->lastCheckPoint = *checkPoint; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + XLogCtl->lastCheckPointRecPtr = ReadRecPtr; + XLogCtl->lastCheckPoint = *checkPoint; + SpinLockRelease(&XLogCtl->info_lck); } /* @@ -8545,9 +8450,6 @@ CreateRestartPoint(int flags) XLogSegNo _logSegNo; TimestampTz xtime; - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - /* * Acquire CheckpointLock to ensure only one restartpoint or checkpoint * happens at a time. @@ -8555,10 +8457,10 @@ CreateRestartPoint(int flags) LWLockAcquire(CheckpointLock, LW_EXCLUSIVE); /* Get a local copy of the last safe checkpoint record. */ - SpinLockAcquire(&xlogctl->info_lck); - lastCheckPointRecPtr = xlogctl->lastCheckPointRecPtr; - lastCheckPoint = xlogctl->lastCheckPoint; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + lastCheckPointRecPtr = XLogCtl->lastCheckPointRecPtr; + lastCheckPoint = XLogCtl->lastCheckPoint; + SpinLockRelease(&XLogCtl->info_lck); /* * Check that we're still in recovery mode. It's ok if we exit recovery @@ -8615,15 +8517,18 @@ CreateRestartPoint(int flags) * Like in CreateCheckPoint(), hold off insertions to update it, although * during recovery this is just pro forma, because no WAL insertions are * happening. + * + * NB: XXX: It's safe to access XLogCtl->Insert.RedoRecPtr here because + * that variable is protected by the wal insert lock, not info_lck. */ WALInsertLockAcquireExclusive(); - xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo; + XLogCtl->Insert.RedoRecPtr = lastCheckPoint.redo; WALInsertLockRelease(); /* Also update the info_lck-protected copy */ - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->RedoRecPtr = lastCheckPoint.redo; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + XLogCtl->RedoRecPtr = lastCheckPoint.redo; + SpinLockRelease(&XLogCtl->info_lck); /* * Prepare to accumulate statistics. @@ -9383,15 +9288,10 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record) ControlFile->checkPointCopy.nextXid = checkPoint.nextXid; /* Update shared-memory copy of checkpoint XID/epoch */ - { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->ckptXidEpoch = checkPoint.nextXidEpoch; - xlogctl->ckptXid = checkPoint.nextXid; - SpinLockRelease(&xlogctl->info_lck); - } + SpinLockAcquire(&XLogCtl->info_lck); + XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch; + XLogCtl->ckptXid = checkPoint.nextXid; + SpinLockRelease(&XLogCtl->info_lck); /* * We should've already switched to the new TLI before replaying this @@ -9435,15 +9335,10 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record) ControlFile->checkPointCopy.nextXid = checkPoint.nextXid; /* Update shared-memory copy of checkpoint XID/epoch */ - { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->ckptXidEpoch = checkPoint.nextXidEpoch; - xlogctl->ckptXid = checkPoint.nextXid; - SpinLockRelease(&xlogctl->info_lck); - } + SpinLockAcquire(&XLogCtl->info_lck); + XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch; + XLogCtl->ckptXid = checkPoint.nextXid; + SpinLockRelease(&XLogCtl->info_lck); /* TLI should not change in an on-line checkpoint */ if (checkPoint.ThisTimeLineID != ThisTimeLineID) @@ -9580,8 +9475,6 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record) } else if (info == XLOG_FPW_CHANGE) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; bool fpw; memcpy(&fpw, XLogRecGetData(record), sizeof(bool)); @@ -9593,10 +9486,10 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record) */ if (!fpw) { - SpinLockAcquire(&xlogctl->info_lck); - if (xlogctl->lastFpwDisableRecPtr < ReadRecPtr) - xlogctl->lastFpwDisableRecPtr = ReadRecPtr; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + if (XLogCtl->lastFpwDisableRecPtr < ReadRecPtr) + XLogCtl->lastFpwDisableRecPtr = ReadRecPtr; + SpinLockRelease(&XLogCtl->info_lck); } /* Keep track of full_page_writes */ @@ -9951,8 +9844,6 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, if (backup_started_in_recovery) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; XLogRecPtr recptr; /* @@ -9960,9 +9851,9 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, * (i.e., since last restartpoint used as backup starting * checkpoint) contain full-page writes. */ - SpinLockAcquire(&xlogctl->info_lck); - recptr = xlogctl->lastFpwDisableRecPtr; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + recptr = XLogCtl->lastFpwDisableRecPtr; + SpinLockRelease(&XLogCtl->info_lck); if (!checkpointfpw || startpoint <= recptr) ereport(ERROR, @@ -10305,17 +10196,15 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) */ if (backup_started_in_recovery) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; XLogRecPtr recptr; /* * Check to see if all WAL replayed during online backup contain * full-page writes. */ - SpinLockAcquire(&xlogctl->info_lck); - recptr = xlogctl->lastFpwDisableRecPtr; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + recptr = XLogCtl->lastFpwDisableRecPtr; + SpinLockRelease(&XLogCtl->info_lck); if (startpoint <= recptr) ereport(ERROR, @@ -10502,15 +10391,13 @@ do_pg_abort_backup(void) XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; XLogRecPtr recptr; TimeLineID tli; - SpinLockAcquire(&xlogctl->info_lck); - recptr = xlogctl->lastReplayedEndRecPtr; - tli = xlogctl->lastReplayedTLI; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + recptr = XLogCtl->lastReplayedEndRecPtr; + tli = XLogCtl->lastReplayedTLI; + SpinLockRelease(&XLogCtl->info_lck); if (replayTLI) *replayTLI = tli; @@ -10523,7 +10410,7 @@ GetXLogReplayRecPtr(TimeLineID *replayTLI) XLogRecPtr GetXLogInsertRecPtr(void) { - volatile XLogCtlInsert *Insert = &XLogCtl->Insert; + XLogCtlInsert *Insert = &XLogCtl->Insert; uint64 current_bytepos; SpinLockAcquire(&Insert->insertpos_lck); @@ -10539,14 +10426,9 @@ GetXLogInsertRecPtr(void) XLogRecPtr GetXLogWriteRecPtr(void) { - { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - LogwrtResult = xlogctl->LogwrtResult; - SpinLockRelease(&xlogctl->info_lck); - } + SpinLockAcquire(&XLogCtl->info_lck); + LogwrtResult = XLogCtl->LogwrtResult; + SpinLockRelease(&XLogCtl->info_lck); return LogwrtResult.Write; } @@ -11372,10 +11254,7 @@ WakeupRecovery(void) void SetWalWriterSleeping(bool sleeping) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->WalWriterSleeping = sleeping; - SpinLockRelease(&xlogctl->info_lck); + SpinLockAcquire(&XLogCtl->info_lck); + XLogCtl->WalWriterSleeping = sleeping; + SpinLockRelease(&XLogCtl->info_lck); } -- 2.0.0.rc2.4.g1dc51c6.dirty
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers