From 293e789f9c1a63748147acd613c556961f1dc5c4 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Fri, 25 Nov 2022 10:53:56 +0000
Subject: [PATCH v1] WAL Insertion Lock Improvements

---
 src/backend/access/transam/xlog.c |  8 +++--
 src/backend/storage/lmgr/lwlock.c | 56 +++++++++++++++++--------------
 src/include/storage/lwlock.h      |  7 ++--
 3 files changed, 41 insertions(+), 30 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index a31fbbff78..b3f758abb3 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -376,7 +376,7 @@ typedef struct XLogwrtResult
 typedef struct
 {
 	LWLock		lock;
-	XLogRecPtr	insertingAt;
+	pg_atomic_uint64	insertingAt;
 	XLogRecPtr	lastImportantAt;
 } WALInsertLock;
 
@@ -1482,6 +1482,10 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 	{
 		XLogRecPtr	insertingat = InvalidXLogRecPtr;
 
+		/* Quickly check and continue if no one holds the lock. */
+		if (!IsLWLockHeld(&WALInsertLocks[i].l.lock))
+			continue;
+
 		do
 		{
 			/*
@@ -4602,7 +4606,7 @@ XLOGShmemInit(void)
 	for (i = 0; i < NUM_XLOGINSERT_LOCKS; i++)
 	{
 		LWLockInitialize(&WALInsertLocks[i].l.lock, LWTRANCHE_WAL_INSERT);
-		WALInsertLocks[i].l.insertingAt = InvalidXLogRecPtr;
+		pg_atomic_init_u64(&WALInsertLocks[i].l.insertingAt, InvalidXLogRecPtr);
 		WALInsertLocks[i].l.lastImportantAt = InvalidXLogRecPtr;
 	}
 
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index a5ad36ca78..3c2048224b 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -1536,6 +1536,15 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
 	return !mustwait;
 }
 
+bool
+IsLWLockHeld(LWLock *lock)
+{
+	uint32	state;
+
+	state = pg_atomic_read_u32(&lock->state);
+	return ((state & LW_VAL_EXCLUSIVE) != 0) || ((state & LW_VAL_SHARED) != 0);
+}
+
 /*
  * Does the lwlock in its current state need to wait for the variable value to
  * change?
@@ -1546,9 +1555,8 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
  * *result is set to true if the lock was free, and false otherwise.
  */
 static bool
-LWLockConflictsWithVar(LWLock *lock,
-					   uint64 *valptr, uint64 oldval, uint64 *newval,
-					   bool *result)
+LWLockConflictsWithVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 oldval,
+					   uint64 *newval, bool *result)
 {
 	bool		mustwait;
 	uint64		value;
@@ -1570,14 +1578,7 @@ LWLockConflictsWithVar(LWLock *lock,
 
 	*result = false;
 
-	/*
-	 * Read value using the lwlock's wait list lock, as we can't generally
-	 * rely on atomic 64 bit reads/stores.  TODO: On platforms with a way to
-	 * do atomic 64 bit reads/writes the spinlock should be optimized away.
-	 */
-	LWLockWaitListLock(lock);
-	value = *valptr;
-	LWLockWaitListUnlock(lock);
+	value = pg_atomic_read_u64(valptr);
 
 	if (value != oldval)
 	{
@@ -1606,7 +1607,8 @@ LWLockConflictsWithVar(LWLock *lock,
  * in shared mode, returns 'true'.
  */
 bool
-LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
+LWLockWaitForVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 oldval,
+				 uint64 *newval)
 {
 	PGPROC	   *proc = MyProc;
 	int			extraWaits = 0;
@@ -1741,21 +1743,32 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
  * The caller must be holding the lock in exclusive mode.
  */
 void
-LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
+LWLockUpdateVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 val)
 {
 	proclist_head wakeup;
 	proclist_mutable_iter iter;
 
 	PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
 
+	/*
+	 * When there are no waiters, quickly update the variable and exit without
+	 * taking wait list lock.
+	 */
+	if ((pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) == 0)
+	{
+		/* Update the lock's value atomically */
+		pg_atomic_write_u64(valptr, val);
+		return;
+	}
+
 	proclist_init(&wakeup);
 
 	LWLockWaitListLock(lock);
 
 	Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE);
 
-	/* Update the lock's value */
-	*valptr = val;
+	/* Update the lock's value atomically */
+	pg_atomic_write_u64(valptr, val);
 
 	/*
 	 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
@@ -1872,17 +1885,10 @@ LWLockRelease(LWLock *lock)
  * LWLockReleaseClearVar - release a previously acquired lock, reset variable
  */
 void
-LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
+LWLockReleaseClearVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 val)
 {
-	LWLockWaitListLock(lock);
-
-	/*
-	 * Set the variable's value before releasing the lock, that prevents race
-	 * a race condition wherein a new locker acquires the lock, but hasn't yet
-	 * set the variables value.
-	 */
-	*valptr = val;
-	LWLockWaitListUnlock(lock);
+	/* Update the lock's value atomically */
+	pg_atomic_write_u64(valptr, val);
 
 	LWLockRelease(lock);
 }
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index a494cb598f..3fae217d26 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -125,14 +125,15 @@ extern bool LWLockAcquire(LWLock *lock, LWLockMode mode);
 extern bool LWLockConditionalAcquire(LWLock *lock, LWLockMode mode);
 extern bool LWLockAcquireOrWait(LWLock *lock, LWLockMode mode);
 extern void LWLockRelease(LWLock *lock);
-extern void LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val);
+extern bool IsLWLockHeld(LWLock *lock);
+extern void LWLockReleaseClearVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 val);
 extern void LWLockReleaseAll(void);
 extern bool LWLockHeldByMe(LWLock *lock);
 extern bool LWLockAnyHeldByMe(LWLock *lock, int nlocks, size_t stride);
 extern bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode);
 
-extern bool LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval);
-extern void LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val);
+extern bool LWLockWaitForVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 oldval, uint64 *newval);
+extern void LWLockUpdateVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 val);
 
 extern Size LWLockShmemSize(void);
 extern void CreateLWLocks(void);
-- 
2.34.1

