Hi,

Attached is a new version of the patchset which I intend to commit soon.

Stuff changed since 0.9:

* Greatly simplified locking logic - the whole concept that a lock could
  be spuriously acquired is gone. That cost a small bit of performance
  (0.5%, I thought it'd be much bigger) on x86, but is a noticeable
  performance *benefit* on PPC.

* releaseOK (and other internal flags) are rolled into the former
  'lockcount' variable which is now named state. By having it inside the
  same atomic reasoning about the state gets easier as there's no skew
  between observing the lockcount and other variables.

* The number of queued waiters isn't required anymore, it's only a
  debugging aid (#ifdef LOCK_DEBUG) at this point.

Patches:
0001: errhidecontext() patch
0002: dlist()ify lwWaitLink
0003: LW_SHARED scalability

I've done a fair amount of benchmarking and on bigger system the new
code seems to be a win pretty much generally.

Greetings,

Andres Freund

-- 
 Andres Freund                     http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
>From 5e6af912377a1b43ea0168951238cb6a5e0b233e Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Sun, 21 Dec 2014 15:45:55 +0100
Subject: [PATCH 1/4] Add capability to suppress CONTEXT: messages to elog
 machinery.

Hiding context messages usually is not a good idea - except for rather
verbose debugging/development utensils like LOG_DEBUG. There the
amount of repeated context messages just bloat the log without adding
information.
---
 src/backend/utils/error/elog.c | 24 ++++++++++++++++++++++--
 src/include/utils/elog.h       |  2 ++
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index 2316464..8f04b19 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -1081,6 +1081,25 @@ errhidestmt(bool hide_stmt)
 	return 0;					/* return value does not matter */
 }
 
+/*
+ * errhidestmt --- optionally suppress CONTEXT: field of log entry
+ *
+ * This should only be used for verbose debugging messages where the repeated
+ * inclusion of CONTEXT: bloats the log volume too much.
+ */
+int
+errhidecontext(bool hide_ctx)
+{
+	ErrorData  *edata = &errordata[errordata_stack_depth];
+
+	/* we don't bother incrementing recursion_depth */
+	CHECK_STACK_DEPTH();
+
+	edata->hide_ctx = hide_ctx;
+
+	return 0;					/* return value does not matter */
+}
+
 
 /*
  * errfunction --- add reporting function name to the current error
@@ -2724,7 +2743,8 @@ write_csvlog(ErrorData *edata)
 	appendStringInfoChar(&buf, ',');
 
 	/* errcontext */
-	appendCSVLiteral(&buf, edata->context);
+	if (!edata->hide_ctx)
+		appendCSVLiteral(&buf, edata->context);
 	appendStringInfoChar(&buf, ',');
 
 	/* user query --- only reported if not disabled by the caller */
@@ -2856,7 +2876,7 @@ send_message_to_server_log(ErrorData *edata)
 			append_with_tabs(&buf, edata->internalquery);
 			appendStringInfoChar(&buf, '\n');
 		}
-		if (edata->context)
+		if (edata->context && !edata->hide_ctx)
 		{
 			log_line_prefix(&buf, edata);
 			appendStringInfoString(&buf, _("CONTEXT:  "));
diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h
index 87438b8..ec7ed22 100644
--- a/src/include/utils/elog.h
+++ b/src/include/utils/elog.h
@@ -221,6 +221,7 @@ errcontext_msg(const char *fmt,...)
 __attribute__((format(PG_PRINTF_ATTRIBUTE, 1, 2)));
 
 extern int	errhidestmt(bool hide_stmt);
+extern int	errhidecontext(bool hide_ctx);
 
 extern int	errfunction(const char *funcname);
 extern int	errposition(int cursorpos);
@@ -385,6 +386,7 @@ typedef struct ErrorData
 	bool		output_to_client;		/* will report to client? */
 	bool		show_funcname;	/* true to force funcname inclusion */
 	bool		hide_stmt;		/* true to prevent STATEMENT: inclusion */
+	bool		hide_ctx;		/* true to prevent CONTEXT: inclusion */
 	const char *filename;		/* __FILE__ of ereport() call */
 	int			lineno;			/* __LINE__ of ereport() call */
 	const char *funcname;		/* __func__ of ereport() call */
-- 
2.2.0.rc0.18.ga1ad247

>From 0ef51c826faa53620fc7d8d39d5df6206be729f3 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Tue, 7 Oct 2014 15:32:34 +0200
Subject: [PATCH 2/4] Convert the PGPROC->lwWaitLink list into a dlist instead
 of open coding it.

Besides being shorter and much easier to read it changes the logic in
LWLockRelease() to release all shared lockers when waking up any. This
can yield some significant performance improvements - and the fairness
isn't really much worse than before, as we always allowed new shared
lockers to jump the queue.
---
 src/backend/access/transam/twophase.c |   1 -
 src/backend/storage/lmgr/lwlock.c     | 146 ++++++++++++----------------------
 src/backend/storage/lmgr/proc.c       |   2 -
 src/include/storage/lwlock.h          |   5 +-
 src/include/storage/proc.h            |   3 +-
 5 files changed, 57 insertions(+), 100 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 40de84e..ad3e872 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -390,7 +390,6 @@ MarkAsPreparing(TransactionId xid, const char *gid,
 	proc->roleId = owner;
 	proc->lwWaiting = false;
 	proc->lwWaitMode = 0;
-	proc->lwWaitLink = NULL;
 	proc->waitLock = NULL;
 	proc->waitProcLock = NULL;
 	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 43f4d6b..d9d7044 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -117,9 +117,9 @@ inline static void
 PRINT_LWDEBUG(const char *where, const LWLock *lock)
 {
 	if (Trace_lwlocks)
-		elog(LOG, "%s(%s %d): excl %d shared %d head %p rOK %d",
+		elog(LOG, "%s(%s %d): excl %d shared %d rOK %d",
 			 where, T_NAME(lock), T_ID(lock),
-			 (int) lock->exclusive, lock->shared, lock->head,
+			 (int) lock->exclusive, lock->shared,
 			 (int) lock->releaseOK);
 }
 
@@ -479,8 +479,7 @@ LWLockInitialize(LWLock *lock, int tranche_id)
 	lock->exclusive = 0;
 	lock->shared = 0;
 	lock->tranche = tranche_id;
-	lock->head = NULL;
-	lock->tail = NULL;
+	dlist_init(&lock->waiters);
 }
 
 
@@ -619,12 +618,7 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
 
 		proc->lwWaiting = true;
 		proc->lwWaitMode = mode;
-		proc->lwWaitLink = NULL;
-		if (lock->head == NULL)
-			lock->head = proc;
-		else
-			lock->tail->lwWaitLink = proc;
-		lock->tail = proc;
+		dlist_push_head(&lock->waiters, &proc->lwWaitLink);
 
 		/* Can release the mutex now */
 		SpinLockRelease(&lock->mutex);
@@ -840,12 +834,7 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
 
 		proc->lwWaiting = true;
 		proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
-		proc->lwWaitLink = NULL;
-		if (lock->head == NULL)
-			lock->head = proc;
-		else
-			lock->tail->lwWaitLink = proc;
-		lock->tail = proc;
+		dlist_push_head(&lock->waiters, &proc->lwWaitLink);
 
 		/* Can release the mutex now */
 		SpinLockRelease(&lock->mutex);
@@ -1002,10 +991,7 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 		proc->lwWaiting = true;
 		proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
 		/* waiters are added to the front of the queue */
-		proc->lwWaitLink = lock->head;
-		if (lock->head == NULL)
-			lock->tail = proc;
-		lock->head = proc;
+		dlist_push_head(&lock->waiters, &proc->lwWaitLink);
 
 		/*
 		 * Set releaseOK, to make sure we get woken up as soon as the lock is
@@ -1087,9 +1073,10 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 void
 LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
 {
-	PGPROC	   *head;
-	PGPROC	   *proc;
-	PGPROC	   *next;
+	dlist_head	wakeup;
+	dlist_mutable_iter iter;
+
+	dlist_init(&wakeup);
 
 	/* Acquire mutex.  Time spent holding mutex should be short! */
 	SpinLockAcquire(&lock->mutex);
@@ -1104,24 +1091,16 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
 	 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
 	 * up. They are always in the front of the queue.
 	 */
-	head = lock->head;
-
-	if (head != NULL && head->lwWaitMode == LW_WAIT_UNTIL_FREE)
+	dlist_foreach_modify(iter, &lock->waiters)
 	{
-		proc = head;
-		next = proc->lwWaitLink;
-		while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE)
-		{
-			proc = next;
-			next = next->lwWaitLink;
-		}
+		PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
+
+		if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
+			break;
 
-		/* proc is now the last PGPROC to be released */
-		lock->head = next;
-		proc->lwWaitLink = NULL;
+		dlist_delete(&waiter->lwWaitLink);
+		dlist_push_tail(&wakeup, &waiter->lwWaitLink);
 	}
-	else
-		head = NULL;
 
 	/* We are done updating shared state of the lock itself. */
 	SpinLockRelease(&lock->mutex);
@@ -1129,15 +1108,14 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
 	/*
 	 * Awaken any waiters I removed from the queue.
 	 */
-	while (head != NULL)
+	dlist_foreach_modify(iter, &wakeup)
 	{
-		proc = head;
-		head = proc->lwWaitLink;
-		proc->lwWaitLink = NULL;
+		PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
+		dlist_delete(&waiter->lwWaitLink);
 		/* check comment in LWLockRelease() about this barrier */
 		pg_write_barrier();
-		proc->lwWaiting = false;
-		PGSemaphoreUnlock(&proc->sem);
+		waiter->lwWaiting = false;
+		PGSemaphoreUnlock(&waiter->sem);
 	}
 }
 
@@ -1148,10 +1126,12 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
 void
 LWLockRelease(LWLock *lock)
 {
-	PGPROC	   *head;
-	PGPROC	   *proc;
+	dlist_head	wakeup;
+	dlist_mutable_iter iter;
 	int			i;
 
+	dlist_init(&wakeup);
+
 	PRINT_LWDEBUG("LWLockRelease", lock);
 
 	/*
@@ -1187,58 +1167,39 @@ LWLockRelease(LWLock *lock)
 	 * if someone has already awakened waiters that haven't yet acquired the
 	 * lock.
 	 */
-	head = lock->head;
-	if (head != NULL)
+	if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
 	{
-		if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
+		/*
+		 * Remove the to-be-awakened PGPROCs from the queue.
+		 */
+		bool		releaseOK = true;
+		bool		wokeup_somebody = false;
+
+		dlist_foreach_modify(iter, &lock->waiters)
 		{
-			/*
-			 * Remove the to-be-awakened PGPROCs from the queue.
-			 */
-			bool		releaseOK = true;
+			PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
 
-			proc = head;
+			if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
+				continue;
 
-			/*
-			 * First wake up any backends that want to be woken up without
-			 * acquiring the lock.
-			 */
-			while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
-				proc = proc->lwWaitLink;
+			dlist_delete(&waiter->lwWaitLink);
+			dlist_push_tail(&wakeup, &waiter->lwWaitLink);
 
 			/*
-			 * If the front waiter wants exclusive lock, awaken him only.
-			 * Otherwise awaken as many waiters as want shared access.
+			 * Prevent additional wakeups until retryer gets to
+			 * run. Backends that are just waiting for the lock to become
+			 * free don't retry automatically.
 			 */
-			if (proc->lwWaitMode != LW_EXCLUSIVE)
+			if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
 			{
-				while (proc->lwWaitLink != NULL &&
-					   proc->lwWaitLink->lwWaitMode != LW_EXCLUSIVE)
-				{
-					if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
-						releaseOK = false;
-					proc = proc->lwWaitLink;
-				}
-			}
-			/* proc is now the last PGPROC to be released */
-			lock->head = proc->lwWaitLink;
-			proc->lwWaitLink = NULL;
-
-			/*
-			 * Prevent additional wakeups until retryer gets to run. Backends
-			 * that are just waiting for the lock to become free don't retry
-			 * automatically.
-			 */
-			if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
 				releaseOK = false;
+				wokeup_somebody = true;
+			}
 
-			lock->releaseOK = releaseOK;
-		}
-		else
-		{
-			/* lock is still held, can't awaken anything */
-			head = NULL;
+			if(waiter->lwWaitMode == LW_EXCLUSIVE)
+				break;
 		}
+		lock->releaseOK = releaseOK;
 	}
 
 	/* We are done updating shared state of the lock itself. */
@@ -1249,13 +1210,12 @@ LWLockRelease(LWLock *lock)
 	/*
 	 * Awaken any waiters I removed from the queue.
 	 */
-	while (head != NULL)
+	dlist_foreach_modify(iter, &wakeup)
 	{
+		PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
 		LOG_LWDEBUG("LWLockRelease", T_NAME(lock), T_ID(lock),
 					"release waiter");
-		proc = head;
-		head = proc->lwWaitLink;
-		proc->lwWaitLink = NULL;
+		dlist_delete(&waiter->lwWaitLink);
 		/*
 		 * Guarantee that lwWaiting being unset only becomes visible once the
 		 * unlink from the link has completed. Otherwise the target backend
@@ -1267,8 +1227,8 @@ LWLockRelease(LWLock *lock)
 		 * another lock.
 		 */
 		pg_write_barrier();
-		proc->lwWaiting = false;
-		PGSemaphoreUnlock(&proc->sem);
+		waiter->lwWaiting = false;
+		PGSemaphoreUnlock(&waiter->sem);
 	}
 
 	/*
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index ea88a24..a4789fc 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -372,7 +372,6 @@ InitProcess(void)
 		MyPgXact->vacuumFlags |= PROC_IS_AUTOVACUUM;
 	MyProc->lwWaiting = false;
 	MyProc->lwWaitMode = 0;
-	MyProc->lwWaitLink = NULL;
 	MyProc->waitLock = NULL;
 	MyProc->waitProcLock = NULL;
 #ifdef USE_ASSERT_CHECKING
@@ -535,7 +534,6 @@ InitAuxiliaryProcess(void)
 	MyPgXact->vacuumFlags = 0;
 	MyProc->lwWaiting = false;
 	MyProc->lwWaitMode = 0;
-	MyProc->lwWaitLink = NULL;
 	MyProc->waitLock = NULL;
 	MyProc->waitProcLock = NULL;
 #ifdef USE_ASSERT_CHECKING
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 09654a8..c84970a 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -14,6 +14,7 @@
 #ifndef LWLOCK_H
 #define LWLOCK_H
 
+#include "lib/ilist.h"
 #include "storage/s_lock.h"
 
 struct PGPROC;
@@ -50,9 +51,7 @@ typedef struct LWLock
 	char		exclusive;		/* # of exclusive holders (0 or 1) */
 	int			shared;			/* # of shared holders (0..MaxBackends) */
 	int			tranche;		/* tranche ID */
-	struct PGPROC *head;		/* head of list of waiting PGPROCs */
-	struct PGPROC *tail;		/* tail of list of waiting PGPROCs */
-	/* tail is undefined when head is NULL */
+	dlist_head	waiters;		/* list of waiting PGPROCs */
 } LWLock;
 
 /*
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 4ad4164..e868f0c 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -15,6 +15,7 @@
 #define _PROC_H_
 
 #include "access/xlogdefs.h"
+#include "lib/ilist.h"
 #include "storage/latch.h"
 #include "storage/lock.h"
 #include "storage/pg_sema.h"
@@ -104,7 +105,7 @@ struct PGPROC
 	/* Info about LWLock the process is currently waiting for, if any. */
 	bool		lwWaiting;		/* true if waiting for an LW lock */
 	uint8		lwWaitMode;		/* lwlock mode being waited for */
-	struct PGPROC *lwWaitLink;	/* next waiter for same LW lock */
+	dlist_node	lwWaitLink;		/* position in LW lock wait list */
 
 	/* Info about lock the process is currently waiting for, if any. */
 	/* waitLock and waitProcLock are NULL if not currently waiting. */
-- 
2.2.0.rc0.18.ga1ad247

>From 5e78af23020185b1a870537e9f107d89cea36bae Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 18 Sep 2014 16:14:16 +0200
Subject: [PATCH 3/4] Wait free LW_SHARED LWLock acquisition.

The old LWLock implementation had the problem that concurrent shared
lock acquisitions required exclusively acquiring a spinlock. Often
that could lead to acquirers waiting behind the spinlock, even if the
actual LWLock was free.

The new implementation doesn't acquire the spinlock when acquiring the
lock itself. Instead the new atomic operations are used to atomically
manipulate the state. Only the waitqueue, used only in the slow path,
is still protected by the spinlock. Check lwlock.c's header for an
explanation about the used algorithm.

For some common workloads on larger machines this can yield
significant performance improvements. Particularly in read mostly
workloads.

Reviewed-By: Amit Kapila and Robert Haas
Author: Andres Freund

Discussion: 20130926225545.gb26...@awork2.anarazel.de
---
 src/backend/storage/lmgr/lwlock.c | 931 ++++++++++++++++++++++++++------------
 src/include/storage/lwlock.h      |  24 +-
 2 files changed, 653 insertions(+), 302 deletions(-)

diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index d9d7044..f0db0a6 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -24,7 +24,53 @@
  * IDENTIFICATION
  *	  src/backend/storage/lmgr/lwlock.c
  *
- *-------------------------------------------------------------------------
+ * NOTES:
+ *
+ * This used to be a pretty straight forward reader-writer lock
+ * implementation, in which the internal state was protected by a
+ * spinlock. Unfortunately the overhead of taking the spinlock proved to be
+ * too high for workloads/locks that were taken in shared mode very
+ * frequently. Often we were spinning in the (obviously exclusive) spinlock,
+ * while trying to acquire a shared lock that was actually free.
+ *
+ * Thus a new implementation was devised that provides wait-free shared lock
+ * acquisition for locks that aren't exclusively locked.
+ *
+ * The basic idea is to have a single atomic variable 'lockcount' instead of
+ * the formerly separate shared and exclusive counters and to use atomic
+ * operations to acquire the lock. That's fairly easy to do for plain
+ * rw-spinlocks, but a lot harder for something like LWLocks that want to wait
+ * in the OS.
+ *
+ * For lock acquisition we use an atomic compare-and-exchange on the lockcount
+ * variable. For exclusive lock we swap in a sentinel value
+ * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders.
+ *
+ * To release the lock we use an atomic decrement to release the lock. If the
+ * new value is zero (we get that atomically), we know we can/have to release
+ * waiters.
+ *
+ * Obviously it is important that the sentinel value for exclusive locks
+ * doesn't conflict with the maximum number of possible share lockers -
+ * luckily MAX_BACKENDS makes that easily possible.
+ *
+ *
+ * The attentive reader might have noticed that naively doing the above has a
+ * glaring race condition: We try to lock using the atomic operations and
+ * notice that we have to wait. Unfortunately by the time we have finished
+ * queuing, the former locker very well might have already finished it's
+ * work. That's problematic because we're now stuck waiting inside the OS.
+
+ * To mitigate those races we use a two phased attempt at locking:
+ *   Phase 1: Try to do it atomically, if we succeed, nice
+ *   Phase 2: Add ourselves to the waitqueue of the lock
+ *   Phase 3: Try to grab the lock again, if we succeed, remove ourselves from
+ *            the queue
+ *   Phase 4: Sleep till wake-up, goto Phase 1
+ *
+ * This protects us against the problem from above as nobody can release too
+ *    quick, before we're queued, since after Phase 2 we're already queued.
+ * -------------------------------------------------------------------------
  */
 #include "postgres.h"
 
@@ -35,8 +81,8 @@
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
+#include "postmaster/postmaster.h"
 #include "replication/slot.h"
-#include "storage/barrier.h"
 #include "storage/ipc.h"
 #include "storage/predicate.h"
 #include "storage/proc.h"
@@ -51,6 +97,16 @@
 /* We use the ShmemLock spinlock to protect LWLockAssign */
 extern slock_t *ShmemLock;
 
+#define LW_FLAG_HAS_WAITERS			((uint32) 1 << 30)
+#define LW_FLAG_RELEASE_OK			((uint32) 1 << 29)
+
+#define LW_VAL_EXCLUSIVE			((uint32) 1 << 24)
+#define LW_VAL_SHARED				1
+
+#define LW_LOCK_MASK				((uint32) ((1 << 25)-1))
+/* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */
+#define LW_SHARED_MASK				((uint32)(1 << 23))
+
 /*
  * This is indexed by tranche ID and stores metadata for all tranches known
  * to the current backend.
@@ -81,8 +137,15 @@ static LWLockTranche MainLWLockTranche;
  */
 #define MAX_SIMUL_LWLOCKS	200
 
+/* struct representing the LWLocks we're holding */
+typedef struct LWLockHandle
+{
+	LWLock *lock;
+	LWLockMode	mode;
+} LWLockHandle;
+
 static int	num_held_lwlocks = 0;
-static LWLock *held_lwlocks[MAX_SIMUL_LWLOCKS];
+static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
 
 static int	lock_addin_request = 0;
 static bool lock_addin_request_allowed = true;
@@ -103,6 +166,7 @@ typedef struct lwlock_stats
 	int			sh_acquire_count;
 	int			ex_acquire_count;
 	int			block_count;
+	int			dequeue_self_count;
 	int			spin_delay_count;
 }	lwlock_stats;
 
@@ -114,24 +178,42 @@ static lwlock_stats lwlock_stats_dummy;
 bool		Trace_lwlocks = false;
 
 inline static void
-PRINT_LWDEBUG(const char *where, const LWLock *lock)
+PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode)
 {
+	/* hide statement & context here, otherwise the log is just too verbose */
 	if (Trace_lwlocks)
-		elog(LOG, "%s(%s %d): excl %d shared %d rOK %d",
-			 where, T_NAME(lock), T_ID(lock),
-			 (int) lock->exclusive, lock->shared,
-			 (int) lock->releaseOK);
+	{
+		uint32 state = pg_atomic_read_u32(&lock->state);
+		ereport(LOG,
+				(errhidestmt(true),
+				 errhidecontext(true),
+				 errmsg("%d: %s(%s %d): excl %u shared %u haswaiters %u waiters %u rOK %d",
+						MyProcPid,
+						where, T_NAME(lock), T_ID(lock),
+						!!(state & LW_VAL_EXCLUSIVE),
+						state & LW_SHARED_MASK,
+						!!(state & LW_FLAG_HAS_WAITERS),
+						pg_atomic_read_u32(&lock->nwaiters),
+						!!(state & LW_FLAG_RELEASE_OK))));
+	}
 }
 
 inline static void
-LOG_LWDEBUG(const char *where, const char *name, int index, const char *msg)
+LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg)
 {
+	/* hide statement & context here, otherwise the log is just too verbose */
 	if (Trace_lwlocks)
-		elog(LOG, "%s(%s %d): %s", where, name, index, msg);
+	{
+		ereport(LOG,
+				(errhidestmt(true),
+				 errhidecontext(true),
+				 errmsg("%s(%s %d): %s", where, T_NAME(lock), T_ID(lock), msg)));
+	}
 }
+
 #else							/* not LOCK_DEBUG */
-#define PRINT_LWDEBUG(a,b)
-#define LOG_LWDEBUG(a,b,c,d)
+#define PRINT_LWDEBUG(a,b,c) ((void)0)
+#define LOG_LWDEBUG(a,b,c) ((void)0)
 #endif   /* LOCK_DEBUG */
 
 #ifdef LWLOCK_STATS
@@ -192,11 +274,11 @@ print_lwlock_stats(int code, Datum arg)
 	while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL)
 	{
 		fprintf(stderr,
-			  "PID %d lwlock %s %d: shacq %u exacq %u blk %u spindelay %u\n",
+				"PID %d lwlock %s %d: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n",
 				MyProcPid, LWLockTrancheArray[lwstats->key.tranche]->name,
 				lwstats->key.instance, lwstats->sh_acquire_count,
 				lwstats->ex_acquire_count, lwstats->block_count,
-				lwstats->spin_delay_count);
+				lwstats->spin_delay_count, lwstats->dequeue_self_count);
 	}
 
 	LWLockRelease(&MainLWLockArray[0].lock);
@@ -226,6 +308,7 @@ get_lwlock_stats_entry(LWLock *lock)
 		lwstats->sh_acquire_count = 0;
 		lwstats->ex_acquire_count = 0;
 		lwstats->block_count = 0;
+		lwstats->dequeue_self_count = 0;
 		lwstats->spin_delay_count = 0;
 	}
 	return lwstats;
@@ -336,6 +419,9 @@ LWLockShmemSize(void)
 void
 CreateLWLocks(void)
 {
+	StaticAssertExpr(LW_VAL_EXCLUSIVE > (uint32) MAX_BACKENDS,
+					 "MAX_BACKENDS too big for lwlock.c");
+
 	if (!IsUnderPostmaster)
 	{
 		int			numLocks = NumLWLocks();
@@ -475,13 +561,330 @@ void
 LWLockInitialize(LWLock *lock, int tranche_id)
 {
 	SpinLockInit(&lock->mutex);
-	lock->releaseOK = true;
-	lock->exclusive = 0;
-	lock->shared = 0;
+	pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK);
+#ifdef LOCK_DEBUG
+	pg_atomic_init_u32(&lock->nwaiters, 0);
+#endif
 	lock->tranche = tranche_id;
 	dlist_init(&lock->waiters);
 }
 
+/*
+ * Internal function that tries to atomically acquire the lwlock in the passed
+ * in mode.
+ *
+ * This function will not block waiting for a lock to become free - that's the
+ * callers job.
+ *
+ * Returns true if the lock isn't free and we need to wait.
+ */
+static bool
+LWLockAttemptLock(LWLock* lock, LWLockMode mode)
+{
+	AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED);
+
+	/* loop until we've determined whether we could acquire the lock or not */
+	while (true)
+	{
+		uint32 old_state;
+		uint32 expected_state;
+		uint32 desired_state;
+		bool lock_free;
+
+		old_state = pg_atomic_read_u32(&lock->state);
+		expected_state = old_state;
+		desired_state = expected_state;
+
+		if (mode == LW_EXCLUSIVE)
+		{
+			lock_free = (expected_state & LW_LOCK_MASK) == 0;
+			if (lock_free)
+				desired_state += LW_VAL_EXCLUSIVE;
+		}
+		else
+		{
+			lock_free = (expected_state & LW_VAL_EXCLUSIVE) == 0;
+			if (lock_free)
+				desired_state += LW_VAL_SHARED;
+		}
+
+		/*
+		 * Attempt to swap in the state we are expecting. If we didn't see
+		 * lock to be free, that's just the old value. If we saw it as free,
+		 * we'll attempt to mark it acquired. The reason that we always swap
+		 * in the value is that this doubles as a memory barrier. We could try
+		 * to be smarter and only swap in values if we saw the lock as free,
+		 * but benchmark haven't shown it as beneficial so far.
+		 *
+		 * Retry if the value changed since we last looked at it.
+		 */
+		if (pg_atomic_compare_exchange_u32(&lock->state,
+										   &expected_state, desired_state))
+		{
+			if (lock_free)
+			{
+				/* Great! Got the lock. */
+#ifdef LOCK_DEBUG
+				if (mode == LW_EXCLUSIVE)
+					lock->owner = MyProc;
+#endif
+				return false;
+			}
+			else
+				return true; /* someobdy else has the lock */
+		}
+	}
+	pg_unreachable();
+}
+
+/*
+ * Wakeup all the lockers that currently have a chance to acquire the lock.
+ */
+static void
+LWLockWakeup(LWLock *lock)
+{
+	bool		new_release_ok;
+	bool		wokeup_somebody = false;
+	dlist_head	wakeup;
+	dlist_mutable_iter iter;
+#ifdef LWLOCK_STATS
+	lwlock_stats *lwstats;
+
+	lwstats = get_lwlock_stats_entry(lock);
+#endif
+
+	dlist_init(&wakeup);
+
+	new_release_ok = true;
+
+	/* Acquire mutex.  Time spent holding mutex should be short! */
+#ifdef LWLOCK_STATS
+	lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
+#else
+	SpinLockAcquire(&lock->mutex);
+#endif
+
+	dlist_foreach_modify(iter, &lock->waiters)
+	{
+		PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
+
+		if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
+			continue;
+
+		dlist_delete(&waiter->lwWaitLink);
+		dlist_push_tail(&wakeup, &waiter->lwWaitLink);
+
+		if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
+		{
+			/*
+			 * Prevent additional wakeups until retryer gets to run. Backends
+			 * that are just waiting for the lock to become free don't retry
+			 * automatically.
+			 */
+			new_release_ok = false;
+			/*
+			 * Don't wakeup (further) exclusive locks.
+			 */
+			wokeup_somebody = true;
+		}
+
+		/*
+		 * Once we've woken up an exclusive lock, there's no point in waking
+		 * up anybody else.
+		 */
+		if(waiter->lwWaitMode == LW_EXCLUSIVE)
+			break;
+	}
+
+	Assert(dlist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS);
+
+	/* Unset both flags at once if required */
+	if (!new_release_ok && dlist_is_empty(&wakeup))
+		pg_atomic_fetch_and_u32(&lock->state,
+								~(LW_FLAG_RELEASE_OK | LW_FLAG_HAS_WAITERS));
+	else if (!new_release_ok)
+		pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_RELEASE_OK);
+	else if (dlist_is_empty(&wakeup))
+		pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS);
+	else if (new_release_ok)
+		pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
+
+	/* We are done updating the shared state of the lock queue. */
+	SpinLockRelease(&lock->mutex);
+
+	/* Awaken any waiters I removed from the queue. */
+	dlist_foreach_modify(iter, &wakeup)
+	{
+		PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
+
+		LOG_LWDEBUG("LWLockRelease", lock, "release waiter");
+		dlist_delete(&waiter->lwWaitLink);
+		/*
+		 * Guarantee that lwWaiting being unset only becomes visible once the
+		 * unlink from the link has completed. Otherwise the target backend
+		 * could be woken up for other reason and enqueue for a new lock - if
+		 * that happens before the list unlink happens, the list would end up
+		 * being corrupted.
+		 *
+		 * The barrier pairs with the SpinLockAcquire() when enqueing for
+		 * another lock.
+		 */
+		pg_write_barrier();
+		waiter->lwWaiting = false;
+		PGSemaphoreUnlock(&waiter->sem);
+	}
+}
+
+/*
+ * Add ourselves to the end of the queue.
+ *
+ * NB: Mode can be LW_WAIT_UNTIL_FREE here!
+ */
+static void
+LWLockQueueSelf(LWLock *lock, LWLockMode mode)
+{
+#ifdef LWLOCK_STATS
+	lwlock_stats *lwstats;
+
+	lwstats = get_lwlock_stats_entry(lock);
+#endif
+
+	/*
+	 * If we don't have a PGPROC structure, there's no way to wait. This
+	 * should never occur, since MyProc should only be null during shared
+	 * memory initialization.
+	 */
+	if (MyProc == NULL)
+		elog(PANIC, "cannot wait without a PGPROC structure");
+
+	if (MyProc->lwWaiting)
+		elog(PANIC, "queueing for lock while waiting on another one");
+
+#ifdef LWLOCK_STATS
+	lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
+#else
+	SpinLockAcquire(&lock->mutex);
+#endif
+
+	/* setting the flag is protected by the spinlock */
+	pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS);
+
+	MyProc->lwWaiting = true;
+	MyProc->lwWaitMode = mode;
+
+	/* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */
+	if (mode == LW_WAIT_UNTIL_FREE)
+		dlist_push_head(&lock->waiters, &MyProc->lwWaitLink);
+	else
+		dlist_push_tail(&lock->waiters, &MyProc->lwWaitLink);
+
+	/* Can release the mutex now */
+	SpinLockRelease(&lock->mutex);
+
+#ifdef LOCK_DEBUG
+	pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
+#endif
+
+}
+
+/*
+ * Remove ourselves from the waitlist.
+ *
+ * This is used if we queued ourselves because we thought we needed to sleep
+ * but, after further checking, we discovered that we don't actually need to
+ * do so. Returns false if somebody else already has woken us up, otherwise
+ * returns true.
+ */
+static void
+LWLockDequeueSelf(LWLock *lock)
+{
+	bool	found = false;
+	dlist_mutable_iter iter;
+
+#ifdef LWLOCK_STATS
+	lwlock_stats *lwstats;
+
+	lwstats = get_lwlock_stats_entry(lock);
+
+	lwstats->dequeue_self_count++;
+#endif
+
+#ifdef LWLOCK_STATS
+	lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
+#else
+	SpinLockAcquire(&lock->mutex);
+#endif
+
+	/*
+	 * Can't just remove ourselves from the list, but we need to iterate over
+	 * all entries as somebody else could have unqueued us.
+	 */
+	dlist_foreach_modify(iter, &lock->waiters)
+	{
+		PGPROC *proc = dlist_container(PGPROC, lwWaitLink, iter.cur);
+		if (proc == MyProc)
+		{
+			found = true;
+			dlist_delete(&proc->lwWaitLink);
+			break;
+		}
+	}
+
+	if (dlist_is_empty(&lock->waiters) &&
+		(pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0)
+	{
+		pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS);
+	}
+
+	SpinLockRelease(&lock->mutex);
+
+	/* clear waiting state again, nice for debugging */
+	if (found)
+		MyProc->lwWaiting = false;
+	else
+	{
+		int		extraWaits = 0;
+
+		/*
+		 * Somebody else dequeued us and has or will wake us up. Deal with the
+		 * superflous absorption of a wakeup.
+		 */
+
+		/*
+		 * Reset releaseOk if somebody woke us before we removed ourselves -
+		 * they'll have set it to false.
+		 */
+		pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
+
+		/*
+		 * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would
+		 * get reset at some inconvenient point later. Most of the time this
+		 * will immediately return.
+		 */
+		for (;;)
+		{
+			/* "false" means cannot accept cancel/die interrupt here. */
+			PGSemaphoreLock(&MyProc->sem, false);
+			if (!MyProc->lwWaiting)
+				break;
+			extraWaits++;
+		}
+
+		/*
+		 * Fix the process wait semaphore's count for any absorbed wakeups.
+		 */
+		while (extraWaits-- > 0)
+			PGSemaphoreUnlock(&MyProc->sem);
+	}
+
+#ifdef LOCK_DEBUG
+	{
+		/* not waiting anymore */
+		uint32 nwaiters = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
+		Assert(nwaiters < MAX_BACKENDS);
+	}
+#endif
+}
 
 /*
  * LWLockAcquire - acquire a lightweight lock in the specified mode
@@ -513,18 +916,19 @@ static inline bool
 LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
 {
 	PGPROC	   *proc = MyProc;
-	bool		retry = false;
 	bool		result = true;
 	int			extraWaits = 0;
 #ifdef LWLOCK_STATS
 	lwlock_stats *lwstats;
+
+	lwstats = get_lwlock_stats_entry(lock);
 #endif
 
-	PRINT_LWDEBUG("LWLockAcquire", lock);
+	AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
 
-#ifdef LWLOCK_STATS
-	lwstats = get_lwlock_stats_entry(lock);
+	PRINT_LWDEBUG("LWLockAcquire", lock, mode);
 
+#ifdef LWLOCK_STATS
 	/* Count lock acquisition attempts */
 	if (mode == LW_EXCLUSIVE)
 		lwstats->ex_acquire_count++;
@@ -570,58 +974,44 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
 	{
 		bool		mustwait;
 
-		/* Acquire mutex.  Time spent holding mutex should be short! */
-#ifdef LWLOCK_STATS
-		lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
-#else
-		SpinLockAcquire(&lock->mutex);
-#endif
-
-		/* If retrying, allow LWLockRelease to release waiters again */
-		if (retry)
-			lock->releaseOK = true;
-
-		/* If I can get the lock, do so quickly. */
-		if (mode == LW_EXCLUSIVE)
-		{
-			if (lock->exclusive == 0 && lock->shared == 0)
-			{
-				lock->exclusive++;
-				mustwait = false;
-			}
-			else
-				mustwait = true;
-		}
-		else
-		{
-			if (lock->exclusive == 0)
-			{
-				lock->shared++;
-				mustwait = false;
-			}
-			else
-				mustwait = true;
-		}
+		/*
+		 * Try to grab the lock the first time, we're not in the waitqueue
+		 * yet/anymore.
+		 */
+		mustwait = LWLockAttemptLock(lock, mode);
 
 		if (!mustwait)
+		{
+			/* XXX: remove before commit? */
+			LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
 			break;				/* got the lock */
+		}
 
 		/*
-		 * Add myself to wait queue.
-		 *
-		 * If we don't have a PGPROC structure, there's no way to wait. This
-		 * should never occur, since MyProc should only be null during shared
-		 * memory initialization.
+		 * Ok, at this point we couldn't grab the lock on the first try. We
+		 * cannot simply queue ourselves to the end of the list and wait to be
+		 * woken up because by now the lock could long have been released.
+		 * Instead add us to the queue and try to grab the lock again. If we
+		 * succeed we need to revert the queuing and be happy, otherwise we
+		 * recheck the lock. If we still couldn't grab it, we know that the
+		 * other lock will see our queue entries when releasing since they
+		 * existed before we checked for the lock.
 		 */
-		if (proc == NULL)
-			elog(PANIC, "cannot wait without a PGPROC structure");
 
-		proc->lwWaiting = true;
-		proc->lwWaitMode = mode;
-		dlist_push_head(&lock->waiters, &proc->lwWaitLink);
+		/* add to the queue */
+		LWLockQueueSelf(lock, mode);
+
+		/* we're now guaranteed to be woken up if necessary */
+		mustwait = LWLockAttemptLock(lock, mode);
 
-		/* Can release the mutex now */
-		SpinLockRelease(&lock->mutex);
+		/* ok, grabbed the lock the second time round, need to undo queueing */
+		if (!mustwait)
+		{
+			LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
+
+			LWLockDequeueSelf(lock);
+			break;
+		}
 
 		/*
 		 * Wait until awakened.
@@ -635,7 +1025,7 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
 		 * so that the lock manager or signal manager will see the received
 		 * signal when it next waits.
 		 */
-		LOG_LWDEBUG("LWLockAcquire", T_NAME(lock), T_ID(lock), "waiting");
+		LOG_LWDEBUG("LWLockAcquire", lock, "waiting");
 
 #ifdef LWLOCK_STATS
 		lwstats->block_count++;
@@ -652,12 +1042,22 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
 			extraWaits++;
 		}
 
+		/* Retrying, allow LWLockRelease to release waiters again. */
+		pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
+
+#ifdef LOCK_DEBUG
+		{
+			/* not waiting anymore */
+			uint32 nwaiters = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
+			Assert(nwaiters < MAX_BACKENDS);
+		}
+#endif
+
 		TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock), mode);
 
-		LOG_LWDEBUG("LWLockAcquire", T_NAME(lock), T_ID(lock), "awakened");
+		LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
 
 		/* Now loop back and try to acquire lock again. */
-		retry = true;
 		result = false;
 	}
 
@@ -665,13 +1065,11 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
 	if (valptr)
 		*valptr = val;
 
-	/* We are done updating shared state of the lock itself. */
-	SpinLockRelease(&lock->mutex);
-
 	TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), T_ID(lock), mode);
 
 	/* Add lock to list of locks held by this backend */
-	held_lwlocks[num_held_lwlocks++] = lock;
+	held_lwlocks[num_held_lwlocks].lock = lock;
+	held_lwlocks[num_held_lwlocks++].mode = mode;
 
 	/*
 	 * Fix the process wait semaphore's count for any absorbed wakeups.
@@ -694,7 +1092,9 @@ LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
 {
 	bool		mustwait;
 
-	PRINT_LWDEBUG("LWLockConditionalAcquire", lock);
+	AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
+
+	PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode);
 
 	/* Ensure we will have room to remember the lock */
 	if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
@@ -707,50 +1107,24 @@ LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
 	 */
 	HOLD_INTERRUPTS();
 
-	/* Acquire mutex.  Time spent holding mutex should be short! */
-	SpinLockAcquire(&lock->mutex);
-
-	/* If I can get the lock, do so quickly. */
-	if (mode == LW_EXCLUSIVE)
-	{
-		if (lock->exclusive == 0 && lock->shared == 0)
-		{
-			lock->exclusive++;
-			mustwait = false;
-		}
-		else
-			mustwait = true;
-	}
-	else
-	{
-		if (lock->exclusive == 0)
-		{
-			lock->shared++;
-			mustwait = false;
-		}
-		else
-			mustwait = true;
-	}
-
-	/* We are done updating shared state of the lock itself. */
-	SpinLockRelease(&lock->mutex);
+	/* Check for the lock */
+	mustwait = LWLockAttemptLock(lock, mode);
 
 	if (mustwait)
 	{
 		/* Failed to get lock, so release interrupt holdoff */
 		RESUME_INTERRUPTS();
-		LOG_LWDEBUG("LWLockConditionalAcquire",
-					T_NAME(lock), T_ID(lock), "failed");
-		TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock),
-												 T_ID(lock), mode);
+
+		LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed");
+		TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), T_ID(lock), mode);
 	}
 	else
 	{
 		/* Add lock to list of locks held by this backend */
-		held_lwlocks[num_held_lwlocks++] = lock;
+		held_lwlocks[num_held_lwlocks].lock = lock;
+		held_lwlocks[num_held_lwlocks++].mode = mode;
 		TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), T_ID(lock), mode);
 	}
-
 	return !mustwait;
 }
 
@@ -776,14 +1150,14 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
 	int			extraWaits = 0;
 #ifdef LWLOCK_STATS
 	lwlock_stats *lwstats;
-#endif
-
-	PRINT_LWDEBUG("LWLockAcquireOrWait", lock);
 
-#ifdef LWLOCK_STATS
 	lwstats = get_lwlock_stats_entry(lock);
 #endif
 
+	Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
+
+	PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode);
+
 	/* Ensure we will have room to remember the lock */
 	if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
 		elog(ERROR, "too many LWLocks taken");
@@ -795,81 +1169,63 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
 	 */
 	HOLD_INTERRUPTS();
 
-	/* Acquire mutex.  Time spent holding mutex should be short! */
-	SpinLockAcquire(&lock->mutex);
-
-	/* If I can get the lock, do so quickly. */
-	if (mode == LW_EXCLUSIVE)
-	{
-		if (lock->exclusive == 0 && lock->shared == 0)
-		{
-			lock->exclusive++;
-			mustwait = false;
-		}
-		else
-			mustwait = true;
-	}
-	else
-	{
-		if (lock->exclusive == 0)
-		{
-			lock->shared++;
-			mustwait = false;
-		}
-		else
-			mustwait = true;
-	}
+	/*
+	 * NB: We're using nearly the same twice-in-a-row lock acquisition
+	 * protocol as LWLockAcquire(). Check its comments for details.
+	 */
+	mustwait = LWLockAttemptLock(lock, mode);
 
 	if (mustwait)
 	{
-		/*
-		 * Add myself to wait queue.
-		 *
-		 * If we don't have a PGPROC structure, there's no way to wait.  This
-		 * should never occur, since MyProc should only be null during shared
-		 * memory initialization.
-		 */
-		if (proc == NULL)
-			elog(PANIC, "cannot wait without a PGPROC structure");
-
-		proc->lwWaiting = true;
-		proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
-		dlist_push_head(&lock->waiters, &proc->lwWaitLink);
+		LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
 
-		/* Can release the mutex now */
-		SpinLockRelease(&lock->mutex);
+		mustwait = LWLockAttemptLock(lock, mode);
 
-		/*
-		 * Wait until awakened.  Like in LWLockAcquire, be prepared for bogus
-		 * wakups, because we share the semaphore with ProcWaitForSignal.
-		 */
-		LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(lock), T_ID(lock),
-					"waiting");
+		if (mustwait)
+		{
+			/*
+			 * Wait until awakened.  Like in LWLockAcquire, be prepared for bogus
+			 * wakups, because we share the semaphore with ProcWaitForSignal.
+			 */
+			LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting");
 
 #ifdef LWLOCK_STATS
-		lwstats->block_count++;
+			lwstats->block_count++;
 #endif
+			TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(l), T_ID(l), mode);
 
-		TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), T_ID(lock), mode);
+			for (;;)
+			{
+				/* "false" means cannot accept cancel/die interrupt here. */
+				PGSemaphoreLock(&proc->sem, false);
+				if (!proc->lwWaiting)
+					break;
+				extraWaits++;
+			}
 
-		for (;;)
-		{
-			/* "false" means cannot accept cancel/die interrupt here. */
-			PGSemaphoreLock(&proc->sem, false);
-			if (!proc->lwWaiting)
-				break;
-			extraWaits++;
-		}
+#ifdef LOCK_DEBUG
+			{
+				/* not waiting anymore */
+				uint32 nwaiters = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
+				Assert(nwaiters < MAX_BACKENDS);
+			}
+#endif
+			TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock), mode);
 
-		TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock), mode);
+			LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened");
+		}
+		else
+		{
+			LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue");
 
-		LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(lock), T_ID(lock),
-					"awakened");
-	}
-	else
-	{
-		/* We are done updating shared state of the lock itself. */
-		SpinLockRelease(&lock->mutex);
+			/*
+			  * Got lock in the second attempt, undo queueing. We need to
+			  * treat this as having successfully acquired the lock, otherwise
+			  * we'd not necessarily wake up people we've prevented from
+			  * acquiring the lock.
+			  */
+			LWLockDequeueSelf(lock);
+		}
 	}
 
 	/*
@@ -882,16 +1238,17 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
 	{
 		/* Failed to get lock, so release interrupt holdoff */
 		RESUME_INTERRUPTS();
-		LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(lock), T_ID(lock), "failed");
+		LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed");
 		TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), T_ID(lock),
 													 mode);
 	}
 	else
 	{
+		LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded");
 		/* Add lock to list of locks held by this backend */
-		held_lwlocks[num_held_lwlocks++] = lock;
-		TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), T_ID(lock),
-												mode);
+		held_lwlocks[num_held_lwlocks].lock = lock;
+		held_lwlocks[num_held_lwlocks++].mode = mode;
+		TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), T_ID(lock), mode);
 	}
 
 	return !mustwait;
@@ -923,13 +1280,11 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 	bool		result = false;
 #ifdef LWLOCK_STATS
 	lwlock_stats *lwstats;
-#endif
-
-	PRINT_LWDEBUG("LWLockWaitForVar", lock);
 
-#ifdef LWLOCK_STATS
 	lwstats = get_lwlock_stats_entry(lock);
-#endif   /* LWLOCK_STATS */
+#endif
+
+	PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
 
 	/*
 	 * Quick test first to see if it the slot is free right now.
@@ -938,7 +1293,7 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 	 * barrier here as far as the current usage is concerned.  But that might
 	 * not be safe in general.
 	 */
-	if (lock->exclusive == 0)
+	if ((pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) == 0)
 		return true;
 
 	/*
@@ -956,21 +1311,24 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 		bool		mustwait;
 		uint64		value;
 
-		/* Acquire mutex.  Time spent holding mutex should be short! */
+		mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
+
+		if (mustwait)
+		{
+			/*
+			 * Perform comparison using spinlock as we can't rely on atomic 64
+			 * bit reads/stores.
+			 */
 #ifdef LWLOCK_STATS
-		lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
+			lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
 #else
-		SpinLockAcquire(&lock->mutex);
+			SpinLockAcquire(&lock->mutex);
 #endif
 
-		/* Is the lock now free, and if not, does the value match? */
-		if (lock->exclusive == 0)
-		{
-			result = true;
-			mustwait = false;
-		}
-		else
-		{
+			/*
+			 * XXX: We can significantly optimize this on platforms with 64bit
+			 * atomics.
+			 */
 			value = *valptr;
 			if (value != oldval)
 			{
@@ -980,27 +1338,42 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 			}
 			else
 				mustwait = true;
+			SpinLockRelease(&lock->mutex);
 		}
+		else
+			mustwait = false;
 
 		if (!mustwait)
 			break;				/* the lock was free or value didn't match */
 
 		/*
-		 * Add myself to wait queue.
+		 * Add myself to wait queue. Note that this is racy, somebody else
+		 * could wakeup before we're finished queuing.
+		 * NB: We're using nearly the same twice-in-a-row lock acquisition
+		 * protocol as LWLockAcquire(). Check its comments for details.
+		 */
+		LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
+
+		/*
+		 * Set RELEASE_OK flag, to make sure we get woken up as soon as the
+		 * lock is released.
 		 */
-		proc->lwWaiting = true;
-		proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
-		/* waiters are added to the front of the queue */
-		dlist_push_head(&lock->waiters, &proc->lwWaitLink);
+		pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
 
 		/*
-		 * Set releaseOK, to make sure we get woken up as soon as the lock is
-		 * released.
+		 * We're now guaranteed to be woken up if necessary. Recheck the
+		 * lock's state.
 		 */
-		lock->releaseOK = true;
+		mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
+
+		/* Ok, lock is free after we queued ourselves. Undo queueing. */
+		if (!mustwait)
+		{
+			LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
 
-		/* Can release the mutex now */
-		SpinLockRelease(&lock->mutex);
+			LWLockDequeueSelf(lock);
+			break;
+		}
 
 		/*
 		 * Wait until awakened.
@@ -1014,7 +1387,7 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 		 * so that the lock manager or signal manager will see the received
 		 * signal when it next waits.
 		 */
-		LOG_LWDEBUG("LWLockWaitForVar", T_NAME(lock), T_ID(lock), "waiting");
+		LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting");
 
 #ifdef LWLOCK_STATS
 		lwstats->block_count++;
@@ -1032,17 +1405,22 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 			extraWaits++;
 		}
 
+#ifdef LOCK_DEBUG
+		{
+			/* not waiting anymore */
+			uint32 nwaiters = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
+			Assert(nwaiters < MAX_BACKENDS);
+		}
+#endif
+
 		TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock),
 										  LW_EXCLUSIVE);
 
-		LOG_LWDEBUG("LWLockWaitForVar", T_NAME(lock), T_ID(lock), "awakened");
+		LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened");
 
 		/* Now loop back and check the status of the lock again. */
 	}
 
-	/* We are done updating shared state of the lock itself. */
-	SpinLockRelease(&lock->mutex);
-
 	TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), T_ID(lock), LW_EXCLUSIVE);
 
 	/*
@@ -1075,14 +1453,24 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
 {
 	dlist_head	wakeup;
 	dlist_mutable_iter iter;
+#ifdef LWLOCK_STATS
+	lwlock_stats *lwstats;
+
+	lwstats = get_lwlock_stats_entry(lock);
+#endif
+
+	PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
 
 	dlist_init(&wakeup);
 
 	/* Acquire mutex.  Time spent holding mutex should be short! */
+#ifdef LWLOCK_STATS
+	lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
+#else
 	SpinLockAcquire(&lock->mutex);
+#endif
 
-	/* we should hold the lock */
-	Assert(lock->exclusive == 1);
+	Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE);
 
 	/* Update the lock's value */
 	*valptr = val;
@@ -1112,7 +1500,7 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
 	{
 		PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
 		dlist_delete(&waiter->lwWaitLink);
-		/* check comment in LWLockRelease() about this barrier */
+		/* check comment in LWLockWakeup() about this barrier */
 		pg_write_barrier();
 		waiter->lwWaiting = false;
 		PGSemaphoreUnlock(&waiter->sem);
@@ -1126,22 +1514,22 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
 void
 LWLockRelease(LWLock *lock)
 {
-	dlist_head	wakeup;
-	dlist_mutable_iter iter;
+	LWLockMode	mode;
+	uint32		oldstate;
+	bool		check_waiters;
 	int			i;
 
-	dlist_init(&wakeup);
-
-	PRINT_LWDEBUG("LWLockRelease", lock);
-
 	/*
 	 * Remove lock from list of locks held.  Usually, but not always, it will
 	 * be the latest-acquired lock; so search array backwards.
 	 */
 	for (i = num_held_lwlocks; --i >= 0;)
 	{
-		if (lock == held_lwlocks[i])
+		if (lock == held_lwlocks[i].lock)
+		{
+			mode = held_lwlocks[i].mode;
 			break;
+		}
 	}
 	if (i < 0)
 		elog(ERROR, "lock %s %d is not held", T_NAME(lock), T_ID(lock));
@@ -1149,88 +1537,45 @@ LWLockRelease(LWLock *lock)
 	for (; i < num_held_lwlocks; i++)
 		held_lwlocks[i] = held_lwlocks[i + 1];
 
-	/* Acquire mutex.  Time spent holding mutex should be short! */
-	SpinLockAcquire(&lock->mutex);
-
-	/* Release my hold on lock */
-	if (lock->exclusive > 0)
-		lock->exclusive--;
-	else
-	{
-		Assert(lock->shared > 0);
-		lock->shared--;
-	}
+	PRINT_LWDEBUG("LWLockRelease", lock, mode);
 
 	/*
-	 * See if I need to awaken any waiters.  If I released a non-last shared
-	 * hold, there cannot be anything to do.  Also, do not awaken any waiters
-	 * if someone has already awakened waiters that haven't yet acquired the
-	 * lock.
+	 * Release my hold on lock, after that it can immediately be acquired by
+	 * others, even if we still have to wakeup other waiters.
 	 */
-	if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
-	{
-		/*
-		 * Remove the to-be-awakened PGPROCs from the queue.
-		 */
-		bool		releaseOK = true;
-		bool		wokeup_somebody = false;
-
-		dlist_foreach_modify(iter, &lock->waiters)
-		{
-			PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
-
-			if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
-				continue;
-
-			dlist_delete(&waiter->lwWaitLink);
-			dlist_push_tail(&wakeup, &waiter->lwWaitLink);
-
-			/*
-			 * Prevent additional wakeups until retryer gets to
-			 * run. Backends that are just waiting for the lock to become
-			 * free don't retry automatically.
-			 */
-			if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
-			{
-				releaseOK = false;
-				wokeup_somebody = true;
-			}
+	if (mode == LW_EXCLUSIVE)
+		oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
+	else
+		oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);
 
-			if(waiter->lwWaitMode == LW_EXCLUSIVE)
-				break;
-		}
-		lock->releaseOK = releaseOK;
-	}
+	/* nobody else can have that kind of lock */
+	Assert(!(oldstate & LW_VAL_EXCLUSIVE));
 
-	/* We are done updating shared state of the lock itself. */
-	SpinLockRelease(&lock->mutex);
 
-	TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock), T_ID(lock));
+	/*
+	 * We're still waiting for backends to get scheduled, don't wake them up
+	 * again.
+	 */
+	if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) ==
+		(LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) &&
+		(oldstate & LW_LOCK_MASK) == 0)
+		check_waiters = true;
+	else
+		check_waiters = false;
 
 	/*
-	 * Awaken any waiters I removed from the queue.
+	 * As waking up waiters requires the spinlock to be acquired, only do so
+	 * if necessary.
 	 */
-	dlist_foreach_modify(iter, &wakeup)
+	if (check_waiters)
 	{
-		PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
-		LOG_LWDEBUG("LWLockRelease", T_NAME(lock), T_ID(lock),
-					"release waiter");
-		dlist_delete(&waiter->lwWaitLink);
-		/*
-		 * Guarantee that lwWaiting being unset only becomes visible once the
-		 * unlink from the link has completed. Otherwise the target backend
-		 * could be woken up for other reason and enqueue for a new lock - if
-		 * that happens before the list unlink happens, the list would end up
-		 * being corrupted.
-		 *
-		 * The barrier pairs with the SpinLockAcquire() when enqueing for
-		 * another lock.
-		 */
-		pg_write_barrier();
-		waiter->lwWaiting = false;
-		PGSemaphoreUnlock(&waiter->sem);
+		/* XXX: remove before commit? */
+		LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
+		LWLockWakeup(lock);
 	}
 
+	TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock), T_ID(lock));
+
 	/*
 	 * Now okay to allow cancel/die interrupts.
 	 */
@@ -1254,7 +1599,7 @@ LWLockReleaseAll(void)
 	{
 		HOLD_INTERRUPTS();		/* match the upcoming RESUME_INTERRUPTS */
 
-		LWLockRelease(held_lwlocks[num_held_lwlocks - 1]);
+		LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
 	}
 }
 
@@ -1262,8 +1607,8 @@ LWLockReleaseAll(void)
 /*
  * LWLockHeldByMe - test whether my process currently holds a lock
  *
- * This is meant as debug support only.  We do not distinguish whether the
- * lock is held shared or exclusive.
+ * This is meant as debug support only.  We currently do not distinguish
+ * whether the lock is held shared or exclusive.
  */
 bool
 LWLockHeldByMe(LWLock *l)
@@ -1272,7 +1617,7 @@ LWLockHeldByMe(LWLock *l)
 
 	for (i = 0; i < num_held_lwlocks; i++)
 	{
-		if (held_lwlocks[i] == l)
+		if (held_lwlocks[i].lock == l)
 			return true;
 	}
 	return false;
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index c84970a..f15a951 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -16,6 +16,7 @@
 
 #include "lib/ilist.h"
 #include "storage/s_lock.h"
+#include "port/atomics.h"
 
 struct PGPROC;
 
@@ -47,11 +48,16 @@ typedef struct LWLockTranche
 typedef struct LWLock
 {
 	slock_t		mutex;			/* Protects LWLock and queue of PGPROCs */
-	bool		releaseOK;		/* T if ok to release waiters */
-	char		exclusive;		/* # of exclusive holders (0 or 1) */
-	int			shared;			/* # of shared holders (0..MaxBackends) */
-	int			tranche;		/* tranche ID */
+	uint16		tranche;		/* tranche ID */
+
+	pg_atomic_uint32 state;		/* state of exlusive/nonexclusive lockers */
+#ifdef LOCK_DEBUG
+	pg_atomic_uint32 nwaiters;	/* number of waiters */
+#endif
 	dlist_head	waiters;		/* list of waiting PGPROCs */
+#ifdef LOCK_DEBUG
+	struct PGPROC *owner;		/* last exlusive owner of the lock */
+#endif
 } LWLock;
 
 /*
@@ -66,11 +72,11 @@ typedef struct LWLock
  * (Of course, we have to also ensure that the array start address is suitably
  * aligned.)
  *
- * Even on a 32-bit platform, an lwlock will be more than 16 bytes, because
- * it contains 2 integers and 2 pointers, plus other stuff.  It should fit
- * into 32 bytes, though, unless slock_t is really big.  On a 64-bit platform,
- * it should fit into 32 bytes unless slock_t is larger than 4 bytes.  We
- * allow for that just in case.
+ * On a 32-bit platforms a LWLock will these days fit into 16 bytes, but since
+ * that didn't use to be the case and cramming more lwlocks into a cacheline
+ * might be detrimental performancewise we still use 32 byte alignment
+ * there. So, both on 32 and 64 bit platforms, it should fit into 32 bytes
+ * unless slock_t is really big.  We allow for that just in case.
  */
 #define LWLOCK_PADDED_SIZE	(sizeof(LWLock) <= 32 ? 32 : 64)
 
-- 
2.2.0.rc0.18.ga1ad247

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to