Hi, Andres
21.03.2025 19:33, Andres Freund wrote:
> Hi,
>
> On 2025-03-21 14:35:16 +0300, Yura Sokolov wrote:
>> From 080c9e0de5e6e10751347e1ff50b65df424744cb Mon Sep 17 00:00:00 2001
>> From: Yura Sokolov <y.soko...@postgrespro.ru>
>> Date: Mon, 3 Feb 2025 11:58:33 +0300
>> Subject: [PATCH v2] sinvaladt.c: use atomic operations on maxMsgNum
>>
>> msgnumLock spinlock could be highly contended.
>> Comment states it was used as memory barrier.
>> Lets use atomic ops with memory barriers directly instead.
>>
>> Note: patch uses pg_read_barrier()/pg_write_barrier() instead of
>> pg_atomic_read_membarrier_u32()/pg_atomic_write_membarrier_u32() since
>> no full barrier semantic is required, and explicit read/write barriers
>> are cheaper at least on x86_64.
>
> Is it actually true that full barriers aren't required? I think we might
> actually rely on a stronger ordering.
>
>
>> @@ -506,10 +493,9 @@ SIGetDataEntries(SharedInvalidationMessage *data, int
>> datasize)
>> */
>> stateP->hasMessages = false;
>>
>> - /* Fetch current value of maxMsgNum using spinlock */
>> - SpinLockAcquire(&segP->msgnumLock);
>> - max = segP->maxMsgNum;
>> - SpinLockRelease(&segP->msgnumLock);
>> + /* Fetch current value of maxMsgNum using atomic with memory barrier */
>> + max = pg_atomic_read_u32(&segP->maxMsgNum);
>> + pg_read_barrier();
>>
>> if (stateP->resetState)
>> {
>> /*
>> * Force reset. We can say we have dealt with any messages
>> added
>> * since the reset, as well; and that means we should clear the
>> * signaled flag, too.
>> */
>> stateP->nextMsgNum = max;
>> stateP->resetState = false;
>> stateP->signaled = false;
>> LWLockRelease(SInvalReadLock);
>> return -1;
>> }
>
> vs
>
>> @@ -410,17 +398,16 @@ SIInsertDataEntries(const SharedInvalidationMessage
>> *data, int n)
>> /*
>> * Insert new message(s) into proper slot of circular buffer
>> */
>> - max = segP->maxMsgNum;
>> + max = pg_atomic_read_u32(&segP->maxMsgNum);
>> while (nthistime-- > 0)
>> {
>> segP->buffer[max % MAXNUMMESSAGES] = *data++;
>> max++;
>> }
>>
>> - /* Update current value of maxMsgNum using spinlock */
>> - SpinLockAcquire(&segP->msgnumLock);
>> - segP->maxMsgNum = max;
>> - SpinLockRelease(&segP->msgnumLock);
>> + /* Update current value of maxMsgNum using atomic with memory
>> barrier */
>> + pg_write_barrier();
>> + pg_atomic_write_u32(&segP->maxMsgNum, max);
>>
>> /*
>> * Now that the maxMsgNum change is globally visible, we give
>> everyone
>> * a swift kick to make sure they read the newly added messages.
>> * Releasing SInvalWriteLock will enforce a full memory
>> barrier, so
>> * these (unlocked) changes will be committed to memory before
>> we exit
>> * the function.
>> */
>> for (i = 0; i < segP->numProcs; i++)
>> {
>> ProcState *stateP =
>> &segP->procState[segP->pgprocnos[i]];
>>
>> stateP->hasMessages = true;
>> }
>
> On a loosely ordered architecture, the hasMessage=false in SIGetDataEntries()
> could be reordered with the read of maxMsgNum. Which, afaict, would lead to
> missing messages. That's not prevented by the pg_write_barrier() in
> SIInsertDataEntries(). I think there may be other similar dangers.
>
> This could be solved by adding full memory barriers in a few places.
Big thanks for review and suggestion!
I agree, pg_memory_barrier should be added before read of segP->maxMsgNum.
I think, change of stateP->hasMessages to atomic variable is better way,
but it will change sizeof ProcState.
I don't see the need to full barrier after read of maxMsgNum, since other
ProcState fields are protected by SInvalReadLock, aren't they? So I leave
read_barrier there.
I still avoid use of read_membarrier since it is actually write operation.
Although pg_memory_barrier is implemented as write operation as well at
x86_64, but on memory cell on process's stack, so it will not be contended.
And atomic_write_membarrier is used to write maxMsgNum just to simplify
code. If backport is considered, then write_barriers before and after could
be used instead.
Fixes version is attached.
> But:
>
> I'd also like to know a bit more about the motivation here - I can easily
> believe that you hit contention around the shared inval queue, but I find it
> somewhat hard to believe that a spinlock that's acquired *once* per batch
> ("quantum"), covering a single read/write, is going to be the bottleneck,
> rather than the much longer held LWLock, that protects iterating over all
> procs.
>
> Have you verified that this actually addresses the performance issue?
Problem on this spinlock were observed at least by two independent technical
supports. First, some friendly vendor company shared the idea to remove it.
We don't know exactly their situation. But I suppose it was quite similar
to situation out tech support investigated at our client some months later:
(Cite from tech support report:)
> Almost 20% of CPU time is spend at backtraces like:
4b0d2d s_lock (/opt/pgpro/ent-15/bin/postgres)
49c847 SIGetDataEntries
49bf94 ReceiveSharedInvalidMessages
4a14ba LockRelationOid
1671f4 relation_open
1de1cd table_open
5e82aa RelationGetStatExtList
402a01 get_relation_statistics (inlined)
402a01 get_relation_info
407a9e build_simple_rel
3daa1d add_base_rels_to_query
3daa1d add_base_rels_to_query
3dd92b query_planner
Client has many NUMA-nodes in single machine, and software actively
generates invalidation messages (probably, due active usage of temporary
tables).
Since, backtrace is quite obvious and ends at s_lock, the patch have to help.
--
regards
Yura Sokolov aka funny-falcon
From de96fd271a107ce4300f758822d7cc2f9ab1ff1c Mon Sep 17 00:00:00 2001
From: Yura Sokolov <y.soko...@postgrespro.ru>
Date: Mon, 3 Feb 2025 11:58:33 +0300
Subject: [PATCH v3] sinvaladt.c: use atomic operations on maxMsgNum
msgnumLock spinlock could be contended.
Comment states it was used as memory barrier.
Lets use atomic ops with memory barriers directly instead.
---
src/backend/storage/ipc/sinvaladt.c | 80 +++++++++++++----------------
1 file changed, 36 insertions(+), 44 deletions(-)
diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c
index 2da91738c32..400bfdb0072 100644
--- a/src/backend/storage/ipc/sinvaladt.c
+++ b/src/backend/storage/ipc/sinvaladt.c
@@ -86,19 +86,7 @@
* has no need to touch anyone's ProcState, except in the infrequent cases
* when SICleanupQueue is needed. The only point of overlap is that
* the writer wants to change maxMsgNum while readers need to read it.
- * We deal with that by having a spinlock that readers must take for just
- * long enough to read maxMsgNum, while writers take it for just long enough
- * to write maxMsgNum. (The exact rule is that you need the spinlock to
- * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
- * spinlock to write maxMsgNum unless you are holding both locks.)
- *
- * Note: since maxMsgNum is an int and hence presumably atomically readable/
- * writable, the spinlock might seem unnecessary. The reason it is needed
- * is to provide a memory barrier: we need to be sure that messages written
- * to the array are actually there before maxMsgNum is increased, and that
- * readers will see that data after fetching maxMsgNum. Multiprocessors
- * that have weak memory-ordering guarantees can fail without the memory
- * barrier instructions that are included in the spinlock sequences.
+ * We deal with that by using atomic operations with proper memory barriers.
*/
@@ -139,7 +127,7 @@ typedef struct ProcState
/* procPid is zero in an inactive ProcState array entry. */
pid_t procPid; /* PID of backend, for signaling */
/* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
- int nextMsgNum; /* next message number to read */
+ uint32 nextMsgNum; /* next message number to read */
bool resetState; /* backend needs to reset its state */
bool signaled; /* backend has been sent catchup signal */
bool hasMessages; /* backend has unread messages */
@@ -167,11 +155,9 @@ typedef struct SISeg
/*
* General state information
*/
- int minMsgNum; /* oldest message still needed */
- int maxMsgNum; /* next message number to be assigned */
- int nextThreshold; /* # of messages to call SICleanupQueue */
-
- slock_t msgnumLock; /* spinlock protecting maxMsgNum */
+ uint32 minMsgNum; /* oldest message still needed */
+ pg_atomic_uint32 maxMsgNum; /* next message number to be assigned */
+ uint32 nextThreshold; /* # of messages to call SICleanupQueue */
/*
* Circular buffer holding shared-inval messages
@@ -243,9 +229,8 @@ SharedInvalShmemInit(void)
/* Clear message counters, save size of procState array, init spinlock */
shmInvalBuffer->minMsgNum = 0;
- shmInvalBuffer->maxMsgNum = 0;
+ pg_atomic_init_u32(&shmInvalBuffer->maxMsgNum, 0);
shmInvalBuffer->nextThreshold = CLEANUP_MIN;
- SpinLockInit(&shmInvalBuffer->msgnumLock);
/* The buffer[] array is initially all unused, so we need not fill it */
@@ -303,7 +288,7 @@ SharedInvalBackendInit(bool sendOnly)
/* mark myself active, with all extant messages already read */
stateP->procPid = MyProcPid;
- stateP->nextMsgNum = segP->maxMsgNum;
+ stateP->nextMsgNum = pg_atomic_read_u32(&segP->maxMsgNum);
stateP->resetState = false;
stateP->signaled = false;
stateP->hasMessages = false;
@@ -382,8 +367,8 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
while (n > 0)
{
int nthistime = Min(n, WRITE_QUANTUM);
- int numMsgs;
- int max;
+ uint32 numMsgs;
+ uint32 max;
int i;
n -= nthistime;
@@ -399,7 +384,7 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
*/
for (;;)
{
- numMsgs = segP->maxMsgNum - segP->minMsgNum;
+ numMsgs = pg_atomic_read_u32(&segP->maxMsgNum) - segP->minMsgNum;
if (numMsgs + nthistime > MAXNUMMESSAGES ||
numMsgs >= segP->nextThreshold)
SICleanupQueue(true, nthistime);
@@ -410,17 +395,20 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
/*
* Insert new message(s) into proper slot of circular buffer
*/
- max = segP->maxMsgNum;
+ max = pg_atomic_read_u32(&segP->maxMsgNum);
while (nthistime-- > 0)
{
segP->buffer[max % MAXNUMMESSAGES] = *data++;
max++;
}
- /* Update current value of maxMsgNum using spinlock */
- SpinLockAcquire(&segP->msgnumLock);
- segP->maxMsgNum = max;
- SpinLockRelease(&segP->msgnumLock);
+ /*
+ * We need to write maxMsgNum strictly after write of messages and
+ * strictly before write to hasMessages. Two write barriers could be
+ * used before and after. But to simplify code, write_membarrier is
+ * used.
+ */
+ pg_atomic_write_membarrier_u32(&segP->maxMsgNum, max);
/*
* Now that the maxMsgNum change is globally visible, we give everyone
@@ -473,7 +461,7 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
{
SISeg *segP;
ProcState *stateP;
- int max;
+ uint32 max;
int n;
segP = shmInvalBuffer;
@@ -506,10 +494,14 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
*/
stateP->hasMessages = false;
- /* Fetch current value of maxMsgNum using spinlock */
- SpinLockAcquire(&segP->msgnumLock);
- max = segP->maxMsgNum;
- SpinLockRelease(&segP->msgnumLock);
+ /*
+ * Full barrier before read of maxMsgNum is to synchronize against
+ * hasMessages=false. To synchronize message reads read barrier after is
+ * enough.
+ */
+ pg_memory_barrier();
+ max = pg_atomic_read_u32(&segP->maxMsgNum);
+ pg_read_barrier();
if (stateP->resetState)
{
@@ -576,11 +568,11 @@ void
SICleanupQueue(bool callerHasWriteLock, int minFree)
{
SISeg *segP = shmInvalBuffer;
- int min,
+ uint32 min,
minsig,
lowbound,
- numMsgs,
- i;
+ numMsgs;
+ int i;
ProcState *needSig = NULL;
/* Lock out all writers and readers */
@@ -595,14 +587,14 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
* backends here it is possible for them to keep sending messages without
* a problem even when they are the only active backend.
*/
- min = segP->maxMsgNum;
- minsig = min - SIG_THRESHOLD;
- lowbound = min - MAXNUMMESSAGES + minFree;
+ min = pg_atomic_read_u32(&segP->maxMsgNum);
+ minsig = min - Min(min, SIG_THRESHOLD);
+ lowbound = min - Min(min, MAXNUMMESSAGES - minFree);
for (i = 0; i < segP->numProcs; i++)
{
ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
- int n = stateP->nextMsgNum;
+ uint32 n = stateP->nextMsgNum;
/* Ignore if already in reset state */
Assert(stateP->procPid != 0);
@@ -641,7 +633,7 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
if (min >= MSGNUMWRAPAROUND)
{
segP->minMsgNum -= MSGNUMWRAPAROUND;
- segP->maxMsgNum -= MSGNUMWRAPAROUND;
+ pg_atomic_sub_fetch_u32(&segP->maxMsgNum, MSGNUMWRAPAROUND);
for (i = 0; i < segP->numProcs; i++)
segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND;
}
@@ -650,7 +642,7 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
* Determine how many messages are still in the queue, and set the
* threshold at which we should repeat SICleanupQueue().
*/
- numMsgs = segP->maxMsgNum - segP->minMsgNum;
+ numMsgs = pg_atomic_read_u32(&segP->maxMsgNum) - segP->minMsgNum;
if (numMsgs < CLEANUP_MIN)
segP->nextThreshold = CLEANUP_MIN;
else
--
2.43.0