Good day, Andres.
Good day, hackers.

05.05.2025 10:30, Yura Sokolov wrote:
> 21.03.2025 19:33, Andres Freund пишет:
>> Hi,
>>
>> On 2025-03-21 14:35:16 +0300, Yura Sokolov wrote:
>>> From 080c9e0de5e6e10751347e1ff50b65df424744cb Mon Sep 17 00:00:00 2001
>>> From: Yura Sokolov <y.soko...@postgrespro.ru>
>>> Date: Mon, 3 Feb 2025 11:58:33 +0300
>>> Subject: [PATCH v2] sinvaladt.c: use atomic operations on maxMsgNum
>>>
>>> msgnumLock spinlock could be highly contended.
>>> Comment states it was used as memory barrier.
>>> Lets use atomic ops with memory barriers directly instead.
>>>
>>> Note: patch uses pg_read_barrier()/pg_write_barrier() instead of
>>> pg_atomic_read_membarrier_u32()/pg_atomic_write_membarrier_u32() since
>>> no full barrier semantic is required, and explicit read/write barriers
>>> are cheaper at least on x86_64.
>>
>> Is it actually true that full barriers aren't required?  I think we might
>> actually rely on a stronger ordering.
>>
>>
>>> @@ -506,10 +493,9 @@ SIGetDataEntries(SharedInvalidationMessage *data, int 
>>> datasize)
>>>      */
>>>     stateP->hasMessages = false;
>>>
>>> -   /* Fetch current value of maxMsgNum using spinlock */
>>> -   SpinLockAcquire(&segP->msgnumLock);
>>> -   max = segP->maxMsgNum;
>>> -   SpinLockRelease(&segP->msgnumLock);
>>> +   /* Fetch current value of maxMsgNum using atomic with memory barrier */
>>> +   max = pg_atomic_read_u32(&segP->maxMsgNum);
>>> +   pg_read_barrier();
>>>
>>>     if (stateP->resetState)
>>>     {
>>>             /*
>>>              * Force reset.  We can say we have dealt with any messages 
>>> added
>>>              * since the reset, as well; and that means we should clear the
>>>              * signaled flag, too.
>>>              */
>>>             stateP->nextMsgNum = max;
>>>             stateP->resetState = false;
>>>             stateP->signaled = false;
>>>             LWLockRelease(SInvalReadLock);
>>>             return -1;
>>>     }
>>
>> vs
>>
>>> @@ -410,17 +398,16 @@ SIInsertDataEntries(const SharedInvalidationMessage 
>>> *data, int n)
>>>             /*
>>>              * Insert new message(s) into proper slot of circular buffer
>>>              */
>>> -           max = segP->maxMsgNum;
>>> +           max = pg_atomic_read_u32(&segP->maxMsgNum);
>>>             while (nthistime-- > 0)
>>>             {
>>>                     segP->buffer[max % MAXNUMMESSAGES] = *data++;
>>>                     max++;
>>>             }
>>>
>>> -           /* Update current value of maxMsgNum using spinlock */
>>> -           SpinLockAcquire(&segP->msgnumLock);
>>> -           segP->maxMsgNum = max;
>>> -           SpinLockRelease(&segP->msgnumLock);
>>> +           /* Update current value of maxMsgNum using atomic with memory 
>>> barrier */
>>> +           pg_write_barrier();
>>> +           pg_atomic_write_u32(&segP->maxMsgNum, max);
>>>
>>>             /*
>>>              * Now that the maxMsgNum change is globally visible, we give 
>>> everyone
>>>              * a swift kick to make sure they read the newly added messages.
>>>              * Releasing SInvalWriteLock will enforce a full memory 
>>> barrier, so
>>>              * these (unlocked) changes will be committed to memory before 
>>> we exit
>>>              * the function.
>>>              */
>>>             for (i = 0; i < segP->numProcs; i++)
>>>             {
>>>                     ProcState  *stateP = 
>>> &segP->procState[segP->pgprocnos[i]];
>>>
>>>                     stateP->hasMessages = true;
>>>             }
>>
>> On a loosely ordered architecture, the hasMessage=false in SIGetDataEntries()
>> could be reordered with the read of maxMsgNum. Which, afaict, would lead to
>> missing messages. That's not prevented by the pg_write_barrier() in
>> SIInsertDataEntries().  I think there may be other similar dangers.
>>
>> This could be solved by adding full memory barriers in a few places.
> 
> It is quite interesting remark, because SpinLockAcquire may be implemented
> as `__sync_lock_test_and_set(lock, 1)` [1] on ARM/ARM64, and
> `__sync_lock_test_and_set` has only "acquire barrier" semantic as GCC's
> documentation says [2] . More over, `__sync_lock_release` used in this case
> also provides only "release barrier" semantic.
> 
> Therefore, concerning these facts, current code state also doesn't give
> guarantee `stateP->hasMessage = false` will not be reordered with `max =
> segP->maxMsgNum`. Nor following read of messages are guaranteed to happen
> after `max = segP->maxMsgNum`.
> 
> Given your expectations for SpinLockAcquire and SpinLockRelease to be full
> memory barriers, and probably numerous usages of them as such, their
> current implementation on ARM/ARM64 looks to be completely broken, imho.

I recognize my mistake and apologize for noise.

I'll try to describe how I understand it now.
There 4 operations done in two threads:
1m. write maxMsgNum
1h. set hasMessages
2h. unset hasMessages
2m. read maxMsgNum

Synchronization should forbid following orders of operations:
- 1h, 2h, 2m, 1m
- 1h, 2m, 1m, 2h
- 1h, 2m, 2h, 1m
- 2m, 1m, 1h, 2h
- 2m, 1h, 1m, 2h
- 2m, 1h, 2h, 1m
In other words: if read of maxMsgNum (2m) happened before write of
maxMsgNum (1m), then unset hasMessages (2h) should not happen after set of
hasMessages (1h).

Given 1m and 2m are wrapped in spin lock:
- while 1h could be reordered before 1m (since SpinLockRelease is just
release barrier), it could not be reordered before SpinLockAcquire (because
it is acquire barrier).
- same way while 2h could be reordered after 2m , it could not be reordered
with SpinLockRelease.

Introducing acquire and release operations, wrong orders above looks like:
- 1a( 1h, 2a{ 2h, 2m 2r}, 1m 1r)
- 1a( 1h, 2a{ 2m, 1m 1r), 2m 2r}
- 1a( 1h, 2a{ 2m, 2h 2r}, 1m 1r)
- 2a{ 2m, 1a( 1m, 1h 1r), 2h 2r}
- 2a{ 2m, 1a( 1h, 1m 1r), 2h 2r}
- 2a{ 2m, 1a( 1h, 2h 2r}, 1m 1r)
Therefore it is clear all wrong orders are forbidden by the spin lock
despite SpinLockAcquire and SpinLockRelease provides only acquire and
release semantics respectively.

Excuse me once again for misunderstanding this at first time.

But still problem of spin lock contention is here. And we found another one
place which have similar problem: numerous readers fights for single
slock_t slowing down them selves and writer. It because of
XLogRecoveryCtlData->info_lck. When there are a lot of logical/physical
walsenders, it could become noticeable bottleneck.
Part of this bottleneck were due to incorrect logical WAL senders target,
and were fixed in commit 5231ed82 [1]. But still there are measurable
speedup from mitigating spin lock contention at this place.

So I propose to introduce another spin lock type capable for Exclusive and
Shared lock modes (i.e. Write/Read modes) and use it in this two places.

I've tried to implement it with a bits of fairness: both readers and
writers attempters have "waiter" state, which blocks other side. Ie if
reader entered "waiter" state, it blocks concurrent writers (both fresh and
those in "waiter" state). But if writer entered "waiter" state, it blocks
fresh readers (but doesn't block readers, who entered "waiter" state).
Algorithm is self-made, I didn't found exactly same algorithm in the wild.

Patch is attached.

[1]
https://git.postgresql.org/gitweb/?p=postgresql.git;a=commit;h=5231ed8262c94936a69bce41f64076630bbd99a2


PS. Another possibility would be to use versionned optimistic lock:
- lock will be implemented as an pg_atomic_uint64 "version" storage,
- writer increments it both in LockWrite and in UnlockWrite. Therefore lock
is locked for write if version is odd.
- reader performs following loop:
-- wait untils version is even and remember it
-- perform read operations
-- if version didn't change, read is successful. Otherwise repeat loop.

Benefit of this approach: reader doesn't perform write to shared memory at all.

But it should use memory barriers before and after read loop to immitate
SpinLock semantic. And since we have no separate "acquire" and "release"
memory fences, it have to be pg_memory_barrier(). Or we should introduce
"acq/rel" fences

Other gotchas is only simple read operations allowed since data could be
changed while they are read (both found usecases satisfies this condition).
And read operations are repeated in a loop until version didn't change
around, so some kind of syntax sugar should be introduced.

This is well known approach, but I don't remember exact name and could not
google it fast.

-- 
regards
Yura Sokolov aka funny-falcon
From 4afb4f24413352d7911c094481341ddb2663e73c Mon Sep 17 00:00:00 2001
From: Yura Sokolov <y.soko...@postgrespro.ru>
Date: Mon, 2 Jun 2025 19:26:20 +0300
Subject: [PATCH v4] Exclusive-Shared (Read-Write) spin lock.

There are couple of places where spin lock is used just to separate
readers and writers. And there are a lot of readers and one or few writers.
Those places are source of contention in a huge system setups because
readers create bottleneck on always exclusive slock_t.

Mitigate it by introducing read-write spin lock.
Algorithm provides some level of fairness i.e. readers will allow waiting
writer to deterministically acquire lock. Writers also respect those
readers, who already entered logical "waiting" state.

Use this RWSpin lock in sending invalidations and notifies of walsenders.
---
 src/backend/access/transam/xlogrecovery.c |  70 ++++++-------
 src/backend/storage/ipc/sinvaladt.c       |  13 +--
 src/backend/storage/lmgr/Makefile         |   1 +
 src/backend/storage/lmgr/meson.build      |   1 +
 src/backend/storage/lmgr/rwspin.c         |  68 ++++++++++++
 src/include/storage/rwspin.h              | 122 ++++++++++++++++++++++
 src/tools/pgindent/typedefs.list          |   1 +
 7 files changed, 235 insertions(+), 41 deletions(-)
 create mode 100644 src/backend/storage/lmgr/rwspin.c
 create mode 100644 src/include/storage/rwspin.h

diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 6ce979f2d8b..73dbae51406 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -56,7 +56,7 @@
 #include "storage/latch.h"
 #include "storage/pmsignal.h"
 #include "storage/procarray.h"
-#include "storage/spin.h"
+#include "storage/rwspin.h"
 #include "utils/datetime.h"
 #include "utils/fmgrprotos.h"
 #include "utils/guc_hooks.h"
@@ -364,7 +364,7 @@ typedef struct XLogRecoveryCtlData
 	RecoveryPauseState recoveryPauseState;
 	ConditionVariable recoveryNotPausedCV;
 
-	slock_t		info_lck;		/* locks shared variables shown above */
+	RWSpin		info_lck;		/* locks shared variables shown above */
 } XLogRecoveryCtlData;
 
 static XLogRecoveryCtlData *XLogRecoveryCtl = NULL;
@@ -471,7 +471,7 @@ XLogRecoveryShmemInit(void)
 		return;
 	memset(XLogRecoveryCtl, 0, sizeof(XLogRecoveryCtlData));
 
-	SpinLockInit(&XLogRecoveryCtl->info_lck);
+	RWSpinInit(&XLogRecoveryCtl->info_lck);
 	InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
 	ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV);
 }
@@ -1668,7 +1668,7 @@ PerformWalRecovery(void)
 	 * we had just replayed the record before the REDO location (or the
 	 * checkpoint record itself, if it's a shutdown checkpoint).
 	 */
-	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	RWSpinLockEx(&XLogRecoveryCtl->info_lck);
 	if (RedoStartLSN < CheckPointLoc)
 	{
 		XLogRecoveryCtl->lastReplayedReadRecPtr = InvalidXLogRecPtr;
@@ -1686,7 +1686,7 @@ PerformWalRecovery(void)
 	XLogRecoveryCtl->recoveryLastXTime = 0;
 	XLogRecoveryCtl->currentChunkStartTime = 0;
 	XLogRecoveryCtl->recoveryPauseState = RECOVERY_NOT_PAUSED;
-	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	RWSpinUnlockEx(&XLogRecoveryCtl->info_lck);
 
 	/* Also ensure XLogReceiptTime has a sane value */
 	XLogReceiptTime = GetCurrentTimestamp();
@@ -1977,10 +1977,10 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 	 * Update shared replayEndRecPtr before replaying this record, so that
 	 * XLogFlush will update minRecoveryPoint correctly.
 	 */
-	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	RWSpinLockEx(&XLogRecoveryCtl->info_lck);
 	XLogRecoveryCtl->replayEndRecPtr = xlogreader->EndRecPtr;
 	XLogRecoveryCtl->replayEndTLI = *replayTLI;
-	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	RWSpinUnlockEx(&XLogRecoveryCtl->info_lck);
 
 	/*
 	 * If we are attempting to enter Hot Standby mode, process XIDs we see
@@ -2014,11 +2014,11 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 	 * Update lastReplayedEndRecPtr after this record has been successfully
 	 * replayed.
 	 */
-	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	RWSpinLockEx(&XLogRecoveryCtl->info_lck);
 	XLogRecoveryCtl->lastReplayedReadRecPtr = xlogreader->ReadRecPtr;
 	XLogRecoveryCtl->lastReplayedEndRecPtr = xlogreader->EndRecPtr;
 	XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
-	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	RWSpinUnlockEx(&XLogRecoveryCtl->info_lck);
 
 	/* ------
 	 * Wakeup walsenders:
@@ -2269,9 +2269,9 @@ CheckRecoveryConsistency(void)
 		reachedConsistency &&
 		IsUnderPostmaster)
 	{
-		SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+		RWSpinLockEx(&XLogRecoveryCtl->info_lck);
 		XLogRecoveryCtl->SharedHotStandbyActive = true;
-		SpinLockRelease(&XLogRecoveryCtl->info_lck);
+		RWSpinUnlockEx(&XLogRecoveryCtl->info_lck);
 
 		LocalHotStandbyActive = true;
 
@@ -3082,9 +3082,9 @@ GetRecoveryPauseState(void)
 {
 	RecoveryPauseState state;
 
-	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	RWSpinLockSh(&XLogRecoveryCtl->info_lck);
 	state = XLogRecoveryCtl->recoveryPauseState;
-	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	RWSpinUnlockSh(&XLogRecoveryCtl->info_lck);
 
 	return state;
 }
@@ -3100,14 +3100,14 @@ GetRecoveryPauseState(void)
 void
 SetRecoveryPause(bool recoveryPause)
 {
-	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	RWSpinLockEx(&XLogRecoveryCtl->info_lck);
 
 	if (!recoveryPause)
 		XLogRecoveryCtl->recoveryPauseState = RECOVERY_NOT_PAUSED;
 	else if (XLogRecoveryCtl->recoveryPauseState == RECOVERY_NOT_PAUSED)
 		XLogRecoveryCtl->recoveryPauseState = RECOVERY_PAUSE_REQUESTED;
 
-	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	RWSpinUnlockEx(&XLogRecoveryCtl->info_lck);
 
 	if (!recoveryPause)
 		ConditionVariableBroadcast(&XLogRecoveryCtl->recoveryNotPausedCV);
@@ -3121,10 +3121,10 @@ static void
 ConfirmRecoveryPaused(void)
 {
 	/* If recovery pause is requested then set it paused */
-	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	RWSpinLockEx(&XLogRecoveryCtl->info_lck);
 	if (XLogRecoveryCtl->recoveryPauseState == RECOVERY_PAUSE_REQUESTED)
 		XLogRecoveryCtl->recoveryPauseState = RECOVERY_PAUSED;
-	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	RWSpinUnlockEx(&XLogRecoveryCtl->info_lck);
 }
 
 
@@ -4421,9 +4421,9 @@ PromoteIsTriggered(void)
 	if (LocalPromoteIsTriggered)
 		return true;
 
-	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	RWSpinLockSh(&XLogRecoveryCtl->info_lck);
 	LocalPromoteIsTriggered = XLogRecoveryCtl->SharedPromoteIsTriggered;
-	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	RWSpinUnlockSh(&XLogRecoveryCtl->info_lck);
 
 	return LocalPromoteIsTriggered;
 }
@@ -4431,9 +4431,9 @@ PromoteIsTriggered(void)
 static void
 SetPromoteIsTriggered(void)
 {
-	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	RWSpinLockEx(&XLogRecoveryCtl->info_lck);
 	XLogRecoveryCtl->SharedPromoteIsTriggered = true;
-	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	RWSpinUnlockEx(&XLogRecoveryCtl->info_lck);
 
 	/*
 	 * Mark the recovery pause state as 'not paused' because the paused state
@@ -4531,9 +4531,9 @@ HotStandbyActive(void)
 	else
 	{
 		/* spinlock is essential on machines with weak memory ordering! */
-		SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+		RWSpinLockSh(&XLogRecoveryCtl->info_lck);
 		LocalHotStandbyActive = XLogRecoveryCtl->SharedHotStandbyActive;
-		SpinLockRelease(&XLogRecoveryCtl->info_lck);
+		RWSpinUnlockSh(&XLogRecoveryCtl->info_lck);
 
 		return LocalHotStandbyActive;
 	}
@@ -4561,10 +4561,10 @@ GetXLogReplayRecPtr(TimeLineID *replayTLI)
 	XLogRecPtr	recptr;
 	TimeLineID	tli;
 
-	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	RWSpinLockSh(&XLogRecoveryCtl->info_lck);
 	recptr = XLogRecoveryCtl->lastReplayedEndRecPtr;
 	tli = XLogRecoveryCtl->lastReplayedTLI;
-	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	RWSpinUnlockSh(&XLogRecoveryCtl->info_lck);
 
 	if (replayTLI)
 		*replayTLI = tli;
@@ -4584,10 +4584,10 @@ GetCurrentReplayRecPtr(TimeLineID *replayEndTLI)
 	XLogRecPtr	recptr;
 	TimeLineID	tli;
 
-	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	RWSpinLockSh(&XLogRecoveryCtl->info_lck);
 	recptr = XLogRecoveryCtl->replayEndRecPtr;
 	tli = XLogRecoveryCtl->replayEndTLI;
-	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	RWSpinUnlockSh(&XLogRecoveryCtl->info_lck);
 
 	if (replayEndTLI)
 		*replayEndTLI = tli;
@@ -4604,9 +4604,9 @@ GetCurrentReplayRecPtr(TimeLineID *replayEndTLI)
 static void
 SetLatestXTime(TimestampTz xtime)
 {
-	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	RWSpinLockEx(&XLogRecoveryCtl->info_lck);
 	XLogRecoveryCtl->recoveryLastXTime = xtime;
-	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	RWSpinUnlockEx(&XLogRecoveryCtl->info_lck);
 }
 
 /*
@@ -4617,9 +4617,9 @@ GetLatestXTime(void)
 {
 	TimestampTz xtime;
 
-	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	RWSpinLockSh(&XLogRecoveryCtl->info_lck);
 	xtime = XLogRecoveryCtl->recoveryLastXTime;
-	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	RWSpinUnlockSh(&XLogRecoveryCtl->info_lck);
 
 	return xtime;
 }
@@ -4633,9 +4633,9 @@ GetLatestXTime(void)
 static void
 SetCurrentChunkStartTime(TimestampTz xtime)
 {
-	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	RWSpinLockEx(&XLogRecoveryCtl->info_lck);
 	XLogRecoveryCtl->currentChunkStartTime = xtime;
-	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	RWSpinUnlockEx(&XLogRecoveryCtl->info_lck);
 }
 
 /*
@@ -4647,9 +4647,9 @@ GetCurrentChunkReplayStartTime(void)
 {
 	TimestampTz xtime;
 
-	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	RWSpinLockSh(&XLogRecoveryCtl->info_lck);
 	xtime = XLogRecoveryCtl->currentChunkStartTime;
-	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	RWSpinUnlockSh(&XLogRecoveryCtl->info_lck);
 
 	return xtime;
 }
diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c
index c5748b690f4..b8e1739a2eb 100644
--- a/src/backend/storage/ipc/sinvaladt.c
+++ b/src/backend/storage/ipc/sinvaladt.c
@@ -25,6 +25,7 @@
 #include "storage/shmem.h"
 #include "storage/sinvaladt.h"
 #include "storage/spin.h"
+#include "storage/rwspin.h"
 
 /*
  * Conceptually, the shared cache invalidation messages are stored in an
@@ -171,7 +172,7 @@ typedef struct SISeg
 	int			maxMsgNum;		/* next message number to be assigned */
 	int			nextThreshold;	/* # of messages to call SICleanupQueue */
 
-	slock_t		msgnumLock;		/* spinlock protecting maxMsgNum */
+	RWSpin		msgnumLock;		/* spinlock protecting maxMsgNum */
 
 	/*
 	 * Circular buffer holding shared-inval messages
@@ -246,7 +247,7 @@ SharedInvalShmemInit(void)
 	shmInvalBuffer->minMsgNum = 0;
 	shmInvalBuffer->maxMsgNum = 0;
 	shmInvalBuffer->nextThreshold = CLEANUP_MIN;
-	SpinLockInit(&shmInvalBuffer->msgnumLock);
+	RWSpinInit(&shmInvalBuffer->msgnumLock);
 
 	/* The buffer[] array is initially all unused, so we need not fill it */
 
@@ -419,9 +420,9 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
 		}
 
 		/* Update current value of maxMsgNum using spinlock */
-		SpinLockAcquire(&segP->msgnumLock);
+		RWSpinLockEx(&segP->msgnumLock);
 		segP->maxMsgNum = max;
-		SpinLockRelease(&segP->msgnumLock);
+		RWSpinUnlockEx(&segP->msgnumLock);
 
 		/*
 		 * Now that the maxMsgNum change is globally visible, we give everyone
@@ -508,9 +509,9 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
 	stateP->hasMessages = false;
 
 	/* Fetch current value of maxMsgNum using spinlock */
-	SpinLockAcquire(&segP->msgnumLock);
+	RWSpinLockSh(&segP->msgnumLock);
 	max = segP->maxMsgNum;
-	SpinLockRelease(&segP->msgnumLock);
+	RWSpinUnlockSh(&segP->msgnumLock);
 
 	if (stateP->resetState)
 	{
diff --git a/src/backend/storage/lmgr/Makefile b/src/backend/storage/lmgr/Makefile
index 6cbaf23b855..c8f5147ae9e 100644
--- a/src/backend/storage/lmgr/Makefile
+++ b/src/backend/storage/lmgr/Makefile
@@ -21,6 +21,7 @@ OBJS = \
 	predicate.o \
 	proc.o \
 	s_lock.o \
+	rwspin.o
 
 include $(top_srcdir)/src/backend/common.mk
 
diff --git a/src/backend/storage/lmgr/meson.build b/src/backend/storage/lmgr/meson.build
index a5490c1047f..94e810885ed 100644
--- a/src/backend/storage/lmgr/meson.build
+++ b/src/backend/storage/lmgr/meson.build
@@ -9,4 +9,5 @@ backend_sources += files(
   'predicate.c',
   'proc.c',
   's_lock.c',
+  'rwspin.c',
 )
diff --git a/src/backend/storage/lmgr/rwspin.c b/src/backend/storage/lmgr/rwspin.c
new file mode 100644
index 00000000000..584249d5cb1
--- /dev/null
+++ b/src/backend/storage/lmgr/rwspin.c
@@ -0,0 +1,68 @@
+#include "storage/rwspin.h"
+#include "storage/s_lock.h"
+
+void
+RWSpinLockEx_slowpath(RWSpin *spin, uint32 val)
+{
+	SpinDelayStatus delay;
+
+	init_local_spin_delay(&delay);
+	do
+	{
+		if ((val & RW_SPIN_EX_WAIT) == 0)
+		{
+			/*
+			 * Enter ExWaiter state, so concurrent ShTriers will step down
+			 * their attempts.
+			 */
+			val = pg_atomic_fetch_or_u32(spin, RW_SPIN_EX_WAIT);
+			val |= RW_SPIN_EX_WAIT;
+		}
+		else
+		{
+			perform_spin_delay(&delay);
+			val = pg_atomic_read_u32(spin);
+		}
+	} while (val > RW_SPIN_EX_WAIT ||
+			 !pg_atomic_compare_exchange_u32(spin, &val, RW_SPIN_EX_LOCK));
+	finish_spin_delay(&delay);
+}
+
+void
+RWSpinLockSh_slowpath(RWSpin *spin, uint32 val)
+{
+	SpinDelayStatus delay;
+
+	init_local_spin_delay(&delay);
+
+	/* If there is ExWaiter, we have to step down and remain to be ShTrier. */
+	while ((val & RW_SPIN_EX_WAIT) != 0)
+	{
+		val = pg_atomic_sub_fetch_u32(spin, RW_SPIN_SH_MARK);
+		do
+		{
+			perform_spin_delay(&delay);
+			val = pg_atomic_read_u32(spin);
+		} while ((val & RW_SPIN_EX_WAIT) != 0);
+		/* ExWaiter disappeared, we may retry */
+		val = pg_atomic_add_fetch_u32(spin, RW_SPIN_SH_MARK);
+	}
+
+	/*
+	 * There is no ExWaiter, so we don't have to step down any more. If there
+	 * is ExHolder, then we've entered ShWaiter state and have to wait until
+	 * lock is released. Otherwise we are already ShLocker.
+	 */
+	while ((val & RW_SPIN_EX_LOCK) != 0)
+	{
+		perform_spin_delay(&delay);
+		val = pg_atomic_read_u32(spin);
+	}
+	finish_spin_delay(&delay);
+
+	/*
+	 * Atomics are tricky, and last loop performs only reads. So it is better
+	 * to place memory barrier here.
+	 */
+	pg_memory_barrier();
+}
diff --git a/src/include/storage/rwspin.h b/src/include/storage/rwspin.h
new file mode 100644
index 00000000000..1a7c5ee9c0f
--- /dev/null
+++ b/src/include/storage/rwspin.h
@@ -0,0 +1,122 @@
+/*-------------------------------------------------------------
+ *
+ * rwspin.h
+ *     Exclusive-Shared (Read-Write) capable spin lock implementation.
+ *
+ * Implementeds algorithm provides some level of fairness:
+ * - read attempters allows write waiters to lock the lock
+ * - write attempters allows read waiters to lock the lock.
+ * It is done through usage of intermediate "waiter" states.
+ *
+ * Guarantees and warnings are almost the same as for slock_t. Read them at
+ * spin.h
+ */
+
+#ifndef RWSPIN_H
+#define RWSPIN_H
+
+#include "c.h"
+#include "port/atomics.h"
+
+typedef pg_atomic_uint32 RWSpin;
+
+/* Initialize RWSpin. */
+static inline void RWSpinInit(RWSpin *spin);
+
+/* Acquire RWSpin in exclusive mode. */
+static inline void RWSpinLockEx(RWSpin *spin);
+
+/* Release exclusive lock of RWSpin. */
+static inline void RWSpinUnlockEx(RWSpin *spin);
+
+/* Acquire RWSpin in shared mode. */
+static inline void RWSpinLockSh(RWSpin *spin);
+
+/* Release shared lock of RWSpin. */
+static inline void RWSpinUnlockSh(RWSpin *spin);
+
+/* Implementation */
+
+/*
+ * Exclusive-Shared "fair" spin lock is implemented with following algorithm:
+ * - attempter could be in following states:
+ *   Exclusive Lock Trier  (ExTrier)
+ *   Exclusive Lock Waiter (ExWaiter)
+ *   Exclusive Lock Holder (ExHolder)
+ *   Shared Lock Trier     (ShTrier)
+ *   Shared Lock Waiter    (ShWaiter)
+ *   Shared Lock Holder    (ShHolder)
+ * - ExTrier can become ExHolder only if there are no any holder or ShWaiter.
+ *   Otherwise it marks existence of ExWaiter and become ExWaiter.
+ * - ExWaiter waits until there are no any holder and ShWaiter. Then it tries
+ *   to become ExHolder.
+ *   Note: ExWaiter clears its mark on acquiring lock. So other ExWaiters
+ *   effectively become ExTrier and have to reassign mark.
+ * - ShTrier increments shared lock marks.
+ *   If there are ExWaiter, it decrements shared lock marks and waits for
+ *   ExWaiter to dismissing. Then it retries.
+ *   If there are ExHolder, it becomes ShWaiter.
+ *   Otherwise it becomes ShHolder.
+ * - ShWaiter just waits until ExHolder will release the lock.
+ *	 Since ExWaiter will not switch to ExHolder in presence of ShWaiter,
+ *   ShWaiter need not to check for it.
+ *   When ExHolder release the lock all ShWaiters become ShHolder.
+ */
+
+static inline void
+RWSpinInit(RWSpin *spin)
+{
+	pg_atomic_init_u32(spin, 0);
+}
+
+enum
+{
+	RW_SPIN_EX_WAIT = 1,
+	RW_SPIN_EX_LOCK = 2,
+	RW_SPIN_EX_MASK = RW_SPIN_EX_WAIT | RW_SPIN_EX_LOCK,
+	RW_SPIN_SH_MARK = 4,
+};
+
+extern void RWSpinLockEx_slowpath(RWSpin *spin, uint32 val);
+extern void RWSpinLockSh_slowpath(RWSpin *spin, uint32 val);
+
+static inline void
+RWSpinLockEx(RWSpin *spin)
+{
+	uint32		old = pg_atomic_read_u32(spin);
+
+	if (old > RW_SPIN_EX_WAIT ||
+		!pg_atomic_compare_exchange_u32(spin, &old, RW_SPIN_EX_LOCK))
+		RWSpinLockEx_slowpath(spin, old);
+}
+
+static inline void
+RWSpinUnlockEx(RWSpin *spin)
+{
+	pg_atomic_sub_fetch_u32(spin, RW_SPIN_EX_LOCK);
+}
+
+static inline void
+RWSpinLockSh(RWSpin *spin)
+{
+	/*
+	 * Try to lock in share mode or mark as waiter for share lock. Both is
+	 * done with incrementing share mark count.
+	 */
+	uint32		val = pg_atomic_add_fetch_u32(spin, RW_SPIN_SH_MARK);
+
+	/*
+	 * If there are exclusive locker or waiter for exclusive lock, we have to
+	 * do more complex things.
+	 */
+	if (unlikely((val & RW_SPIN_EX_MASK) != 0))
+		RWSpinLockSh_slowpath(spin, val);
+}
+
+static inline void
+RWSpinUnlockSh(RWSpin *spin)
+{
+	pg_atomic_sub_fetch_u32(spin, RW_SPIN_SH_MARK);
+}
+
+#endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index a8346cda633..37cd705fcf1 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2399,6 +2399,7 @@ RTEPermissionInfo
 RWConflict
 RWConflictData
 RWConflictPoolHeader
+RWSpin
 Range
 RangeBound
 RangeBox
-- 
2.43.0

Reply via email to