03.02.2025 19:49, Heikki Linnakangas пишет:
> On 03/02/2025 13:05, Yura Sokolov wrote:
>> Investigating some performance issues of a client, our engineers found
>> msgnumLock to be contended.
>>
>> Looking closer it is obvious it is not needed at all: it used only as
>> memory barrier. It is even stated in comment at file start:
>>
>> * We deal with that by having a spinlock that readers must take for just
>> * long enough to read maxMsgNum, while writers take it for just long enough
>> * to write maxMsgNum. (The exact rule is that you need the spinlock to
>> * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
>> * spinlock to write maxMsgNum unless you are holding both locks.)
>> *
>> * Note: since maxMsgNum is an int and hence presumably atomically readable/
>> * writable, the spinlock might seem unnecessary. The reason it is needed
>> * is to provide a memory barrier: we need to be sure that messages written
>> * to the array are actually there before maxMsgNum is increased, and that
>> * readers will see that data after fetching maxMsgNum.
>>
>> So we changed maxMsgNum to be pg_atomic_uint32, and put appropriate memory
>> barriers:
>> - pg_write_barrier() before write to maxMsgNum (when only SInvalWriteLock
>> is held)
>> - pg_read_barrier() after read of maxMsgNum (when only SInvalReadLock is
>> held)
>>
>> It improved situation for our client.
>>
>> Note: pg_(write|read)_barrier() is chosen instead of
>> pg_atomic_(read|write)_membarrier_u32() because it certainly cheaper at
>> least on x86_64 where it is translated to just a compiler barrier (empty
>> asm statement).
>> At least pg_atomic_read_membarrier_u32() is implemented currently as a
>> write operation, that's not good for contended place.
>
> Makes sense.
>
>
>> @@ -640,8 +626,12 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
>> */
>> if (min >= MSGNUMWRAPAROUND)
>> {
>> - segP->minMsgNum -= MSGNUMWRAPAROUND;
>> - segP->maxMsgNum -= MSGNUMWRAPAROUND;
>> + /*
>> + * we don't need memory barrier here, but using sub_fetch is
>> just
>> + * simpler than read_u32+write_u32 pair, and this place is not
>> + * contented.
>> + */
>> + pg_atomic_sub_fetch_u32(&segP->maxMsgNum, MSGNUMWRAPAROUND);
>> for (i = 0; i < segP->numProcs; i++)
>> segP->procState[segP->pgprocnos[i]].nextMsgNum -=
>> MSGNUMWRAPAROUND;
>> }
>
> Did you lose the 'segP->minMsgNum -= MSGNUMWRAPAROUND;' here? Do we have
> any tests for the wraparound?
Oops, yes, thanks. I didn't miss it in private version, but lose during
cherry-picking on top of 'master'. Fixed.
Also removed comment above sub_fetch.
> Now that maxMsgNum is unsigned, should we switch to uint32 for minMsgNum
> and nextThreshold for consistency? They still don't need to be atomic
> IIRC, they're protected by SInvalReadLock and SInvalWriteLock.
Ok, I did. And couple of local variables.
There are signed arguments of SIInsertDataEntries, SIGetDataEntries,
SICleanupQueue . Should they be changed to unsigned?
> I kind of wonder why we need that MSGNUMWRAPAROUND limit at all; can't
> we just let the integer wrap around naturally? (after switching to
> unsigned, that is). That doesn't need to be part of this patch though,
> it can be done separately, if it's worthwhile...
There still will be need to handle wraparound, but in comparisons then.
Overall, code will not be simpler, I think. Are there any other benefits
from its removal?
-------
regards,
Yura Sokolov aka funny-falcon
From 37cfdbe3e09aee16d68ee001ae3e8501327cd24a Mon Sep 17 00:00:00 2001
From: Yura Sokolov <y.soko...@postgrespro.ru>
Date: Mon, 3 Feb 2025 11:58:33 +0300
Subject: [PATCH v1] sinvaladt.c: use atomic operations on maxMsgNum
msgnumLock spinlock could be highly contended.
Comment states it was used as memory barrier.
Lets use atomic ops with memory barriers directly instead.
Note: patch uses pg_read_barrier()/pg_write_barrier() instead of
pg_atomic_read_membarrier_u32()/pg_atomic_write_membarrier_u32() since
no full barrier semantic is required, and explicit read/write barriers
are cheaper at least on x86_64.
---
src/backend/storage/ipc/sinvaladt.c | 74 ++++++++++++-----------------
1 file changed, 30 insertions(+), 44 deletions(-)
diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c
index 2da91738c32..c3b89afce9f 100644
--- a/src/backend/storage/ipc/sinvaladt.c
+++ b/src/backend/storage/ipc/sinvaladt.c
@@ -86,19 +86,10 @@
* has no need to touch anyone's ProcState, except in the infrequent cases
* when SICleanupQueue is needed. The only point of overlap is that
* the writer wants to change maxMsgNum while readers need to read it.
- * We deal with that by having a spinlock that readers must take for just
- * long enough to read maxMsgNum, while writers take it for just long enough
- * to write maxMsgNum. (The exact rule is that you need the spinlock to
- * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
- * spinlock to write maxMsgNum unless you are holding both locks.)
- *
- * Note: since maxMsgNum is an int and hence presumably atomically readable/
- * writable, the spinlock might seem unnecessary. The reason it is needed
- * is to provide a memory barrier: we need to be sure that messages written
- * to the array are actually there before maxMsgNum is increased, and that
- * readers will see that data after fetching maxMsgNum. Multiprocessors
- * that have weak memory-ordering guarantees can fail without the memory
- * barrier instructions that are included in the spinlock sequences.
+ * We deal with that by using atomic operations with proper memory barriers.
+ * (The exact rule is that you need the read barrier after atomic read
+ * maxMsgNum if you are not holding SInvalWriteLock, and you need the
+ * write barrier before write maxMsgNum unless you are holding both locks.)
*/
@@ -139,7 +130,7 @@ typedef struct ProcState
/* procPid is zero in an inactive ProcState array entry. */
pid_t procPid; /* PID of backend, for signaling */
/* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
- int nextMsgNum; /* next message number to read */
+ uint32 nextMsgNum; /* next message number to read */
bool resetState; /* backend needs to reset its state */
bool signaled; /* backend has been sent catchup signal */
bool hasMessages; /* backend has unread messages */
@@ -167,11 +158,9 @@ typedef struct SISeg
/*
* General state information
*/
- int minMsgNum; /* oldest message still needed */
- int maxMsgNum; /* next message number to be assigned */
- int nextThreshold; /* # of messages to call SICleanupQueue */
-
- slock_t msgnumLock; /* spinlock protecting maxMsgNum */
+ uint32 minMsgNum; /* oldest message still needed */
+ pg_atomic_uint32 maxMsgNum; /* next message number to be assigned */
+ uint32 nextThreshold; /* # of messages to call SICleanupQueue */
/*
* Circular buffer holding shared-inval messages
@@ -243,9 +232,8 @@ SharedInvalShmemInit(void)
/* Clear message counters, save size of procState array, init spinlock */
shmInvalBuffer->minMsgNum = 0;
- shmInvalBuffer->maxMsgNum = 0;
+ pg_atomic_init_u32(&shmInvalBuffer->maxMsgNum, 0);
shmInvalBuffer->nextThreshold = CLEANUP_MIN;
- SpinLockInit(&shmInvalBuffer->msgnumLock);
/* The buffer[] array is initially all unused, so we need not fill it */
@@ -303,7 +291,7 @@ SharedInvalBackendInit(bool sendOnly)
/* mark myself active, with all extant messages already read */
stateP->procPid = MyProcPid;
- stateP->nextMsgNum = segP->maxMsgNum;
+ stateP->nextMsgNum = pg_atomic_read_u32(&segP->maxMsgNum);
stateP->resetState = false;
stateP->signaled = false;
stateP->hasMessages = false;
@@ -382,8 +370,8 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
while (n > 0)
{
int nthistime = Min(n, WRITE_QUANTUM);
- int numMsgs;
- int max;
+ uint32 numMsgs;
+ uint32 max;
int i;
n -= nthistime;
@@ -399,7 +387,7 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
*/
for (;;)
{
- numMsgs = segP->maxMsgNum - segP->minMsgNum;
+ numMsgs = pg_atomic_read_u32(&segP->maxMsgNum) - segP->minMsgNum;
if (numMsgs + nthistime > MAXNUMMESSAGES ||
numMsgs >= segP->nextThreshold)
SICleanupQueue(true, nthistime);
@@ -410,17 +398,16 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
/*
* Insert new message(s) into proper slot of circular buffer
*/
- max = segP->maxMsgNum;
+ max = pg_atomic_read_u32(&segP->maxMsgNum);
while (nthistime-- > 0)
{
segP->buffer[max % MAXNUMMESSAGES] = *data++;
max++;
}
- /* Update current value of maxMsgNum using spinlock */
- SpinLockAcquire(&segP->msgnumLock);
- segP->maxMsgNum = max;
- SpinLockRelease(&segP->msgnumLock);
+ /* Update current value of maxMsgNum using atomic with memory barrier */
+ pg_write_barrier();
+ pg_atomic_write_u32(&segP->maxMsgNum, max);
/*
* Now that the maxMsgNum change is globally visible, we give everyone
@@ -473,7 +460,7 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
{
SISeg *segP;
ProcState *stateP;
- int max;
+ uint32 max;
int n;
segP = shmInvalBuffer;
@@ -506,10 +493,9 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
*/
stateP->hasMessages = false;
- /* Fetch current value of maxMsgNum using spinlock */
- SpinLockAcquire(&segP->msgnumLock);
- max = segP->maxMsgNum;
- SpinLockRelease(&segP->msgnumLock);
+ /* Fetch current value of maxMsgNum using atomic with memory barrier */
+ max = pg_atomic_read_u32(&segP->maxMsgNum);
+ pg_read_barrier();
if (stateP->resetState)
{
@@ -576,11 +562,11 @@ void
SICleanupQueue(bool callerHasWriteLock, int minFree)
{
SISeg *segP = shmInvalBuffer;
- int min,
+ uint32 min,
minsig,
lowbound,
- numMsgs,
- i;
+ numMsgs;
+ int i;
ProcState *needSig = NULL;
/* Lock out all writers and readers */
@@ -595,14 +581,14 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
* backends here it is possible for them to keep sending messages without
* a problem even when they are the only active backend.
*/
- min = segP->maxMsgNum;
- minsig = min - SIG_THRESHOLD;
- lowbound = min - MAXNUMMESSAGES + minFree;
+ min = pg_atomic_read_u32(&segP->maxMsgNum);
+ minsig = min - Min(min, SIG_THRESHOLD);
+ lowbound = min - Min(min, MAXNUMMESSAGES - minFree);
for (i = 0; i < segP->numProcs; i++)
{
ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
- int n = stateP->nextMsgNum;
+ uint32 n = stateP->nextMsgNum;
/* Ignore if already in reset state */
Assert(stateP->procPid != 0);
@@ -641,7 +627,7 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
if (min >= MSGNUMWRAPAROUND)
{
segP->minMsgNum -= MSGNUMWRAPAROUND;
- segP->maxMsgNum -= MSGNUMWRAPAROUND;
+ pg_atomic_sub_fetch_u32(&segP->maxMsgNum, MSGNUMWRAPAROUND);
for (i = 0; i < segP->numProcs; i++)
segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND;
}
@@ -650,7 +636,7 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
* Determine how many messages are still in the queue, and set the
* threshold at which we should repeat SICleanupQueue().
*/
- numMsgs = segP->maxMsgNum - segP->minMsgNum;
+ numMsgs = pg_atomic_read_u32(&segP->maxMsgNum) - segP->minMsgNum;
if (numMsgs < CLEANUP_MIN)
segP->nextThreshold = CLEANUP_MIN;
else
--
2.43.0