On Thu, Nov 17, 2011 at 10:19 AM, Pavan Deolasee
<pavan.deola...@gmail.com> wrote:
> On Thu, Nov 17, 2011 at 10:01 AM, Robert Haas <robertmh...@gmail.com> wrote:
>
>>
>> I am not convinced that that's a better API.  I mean, consider
>> something like this:
>>
>>    /*
>>     * OK, let's do it.  First let other backends know I'm in ANALYZE.
>>     */
>>    LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
>>    MyProc->vacuumFlags |= PROC_IN_ANALYZE;
>>    LWLockRelease(ProcArrayLock);
>
>> I'm not sure exactly how you'd proposed to rewrite that, but I think
>> it's almost guaranteed to be more than three lines of code.
>
> I would guess the ReqRes will look something like this where
> ReqResRequest/Response would probably be union of all various requests
> and responses, one for each type of request:
>
> struct ReqRes {
>  ReqResRequestType  reqtype;
>  ReqResRequest         req;
>  ReqResResponse      res;
> }
>
> The code above can be rewritten as:
>
> reqRes.reqtype = RR_PROC_SET_VACUUMFLAGS;
> reqRes.req.set_vacuumflags.flags =  PROC_IN_ANALYZE;
> LWLockExecute(ProcArrayLock, LW_EXCLUSIVE, &reqRes);
>

My apologies for hijacking the thread, but the work seems quite
related, so I thought I should post here instead of starting a new
thread.

Here is a WIP patch based on the idea of having a shared Q. A process
trying to access the shared memory protected by a LWLock, sets up the
task in its PGPROC and calls a new API LWLockExecute(). If the LWLock
is available, the task is performed immediately and the function
returns. Otherwise, the process queues up itself on the lock. When the
last shared lock holder or the exclusive lock holder call
LWLockRelease(), it scans through such pending tasks, executes them
via a callback mechanism and wakes all those processes along with any
other normal waiter(s) waiting on LWLockAcquire().

I have only coded for ProcArrayEndTransaction, but it should fairly
easy to extend the usage at some more places, especially those which
does some simple modifications to the protected area. I don't propose
to use the technique for every user of LWLock, but there can be some
obvious candidates, including this one that Robert found out.

I see 35-40% improvement for 32-80 clients on a 5 minutes pgbench -N
run with scale factor of 100 and permanent tables. This is on a
32-core HP IA box.

There are few things that need some deliberations. The pending tasks
are right now executed while holding the mutex (spinlock). This is
good and bad for obvious reasons. We can possibly change that so that
the work is done without holding the spinlock or leave to the caller
to choose the behavior. Doing it without holding the spinlock will
make the technique interesting for many more callers. We can also
rework the task execution so that pending similar requests from
multiple callers can be combined and executed with a single callback,
if the caller knows its safe to do so. I haven't thought through the
API/callback changes to support that, but its definitely possible and
could be quite useful in many cases. For example, status of many
transactions can be checked with a single lookup of the ProcArray. Or
WAL inserts from multiple processes can be combined and written at
once.

Thanks,
Pavan

-- 
Pavan Deolasee
EnterpriseDB     http://www.enterprisedb.com
commit 24f8e349d085e646cb918c552cc8ead7d38f7013
Author: Pavan Deolasee <pavan@ubuntu.(none)>
Date:   Fri Nov 18 15:49:54 2011 +0530

    Implement a shared work Q mechanism. A process can queue its work for later
    execution if the protecting lock is currently not available. Backend which
    releases the last lock will finish the work and wake up the waiting process.

diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 1a48485..59d2958 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -157,6 +157,7 @@ static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray,
 							   TransactionId xmax);
 static TransactionId KnownAssignedXidsGetOldestXmin(void);
 static void KnownAssignedXidsDisplay(int trace_level);
+static bool ProcArrayEndTransactionWQ(WorkQueueData *wqdata);
 
 /*
  * Report shared-memory space needed by CreateSharedProcArray.
@@ -331,8 +332,6 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
 
 	elog(LOG, "failed to find proc %p in ProcArray", proc);
 }
-
-
 /*
  * ProcArrayEndTransaction -- mark a transaction as no longer running
  *
@@ -352,33 +351,24 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
 	if (TransactionIdIsValid(latestXid))
 	{
 		/*
-		 * We must lock ProcArrayLock while clearing proc->xid, so that we do
-		 * not exit the set of "running" transactions while someone else is
-		 * taking a snapshot.  See discussion in
-		 * src/backend/access/transam/README.
+		 * Use the shared work queue mechanism to get the work done. If the
+		 * ProcArrayLock is available, it will done immediately, otherwise it
+		 * will be queued up and some other backend (the one who releases the
+		 * lock last) will do it for us and wake us up
+		 *
+		 * Use the shared area in PGPROC to communicate with other backends. We
+		 * can write to the area without holding any lock becuase its
+		 * read/written by other backends only when we are sleeping in the
+		 * queue and only one backend can access it at any time
 		 */
-		Assert(TransactionIdIsValid(proc->xid));
-
-		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
-
-		proc->xid = InvalidTransactionId;
-		proc->lxid = InvalidLocalTransactionId;
-		proc->xmin = InvalidTransactionId;
-		/* must be cleared with xid/xmin: */
-		proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
-		proc->inCommit = false; /* be sure this is cleared in abort */
-		proc->recoveryConflictPending = false;
-
-		/* Clear the subtransaction-XID cache too while holding the lock */
-		proc->subxids.nxids = 0;
-		proc->subxids.overflowed = false;
+		WorkQueueData *wqdata = &proc->wqdata;
 
-		/* Also advance global latestCompletedXid while holding the lock */
-		if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
-								  latestXid))
-			ShmemVariableCache->latestCompletedXid = latestXid;
+		wqdata->wq_reqtype = WQ_END_TRANSACTION;
+		wqdata->wq_reqin.wqin_end_xact.proc = proc;
+		wqdata->wq_reqin.wqin_end_xact.latestXid = latestXid;
+		wqdata->wq_exec = ProcArrayEndTransactionWQ;
 
-		LWLockRelease(ProcArrayLock);
+		LWLockExecute(ProcArrayLock, LW_EXCLUSIVE);
 	}
 	else
 	{
@@ -401,6 +391,38 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
 	}
 }
 
+/*
+ * Do the real work for ProcArrayEndTransaction. We are called while holding
+ * the mutex, so be quick and fast
+ */
+static bool
+ProcArrayEndTransactionWQ(WorkQueueData *wqdata)
+{
+	volatile PGPROC *proc = (PGPROC *) wqdata->wq_reqin.wqin_end_xact.proc;
+	TransactionId latestXid = wqdata->wq_reqin.wqin_end_xact.latestXid;
+
+	Assert(TransactionIdIsValid(proc->xid));
+
+	proc->xid = InvalidTransactionId;
+	proc->lxid = InvalidLocalTransactionId;
+	proc->xmin = InvalidTransactionId;
+	/* must be cleared with xid/xmin: */
+	proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
+	proc->inCommit = false; /* be sure this is cleared in abort */
+	proc->recoveryConflictPending = false;
+
+	/* Clear the subtransaction-XID cache too while holding the lock */
+	proc->subxids.nxids = 0;
+	proc->subxids.overflowed = false;
+
+	/* Also advance global latestCompletedXid while holding the lock */
+	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
+				latestXid))
+		ShmemVariableCache->latestCompletedXid = latestXid;
+
+	return true;
+}
+
 
 /*
  * ProcArrayClearTransaction -- clear the transaction fields
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 079eb29..ab2899e 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -43,6 +43,8 @@ typedef struct LWLock
 	bool		releaseOK;		/* T if ok to release waiters */
 	char		exclusive;		/* # of exclusive holders (0 or 1) */
 	int			shared;			/* # of shared holders (0..MaxBackends) */
+	PGPROC		*sq_head;		/* head of list of pending requests */
+	PGPROC		*sq_tail;		/* tail of list of pending request */
 	PGPROC	   *head;			/* head of list of waiting PGPROCs */
 	PGPROC	   *tail;			/* tail of list of waiting PGPROCs */
 	/* tail is undefined when head is NULL */
@@ -269,6 +271,8 @@ CreateLWLocks(void)
 		lock->lock.releaseOK = true;
 		lock->lock.exclusive = 0;
 		lock->lock.shared = 0;
+		lock->lock.sq_head = NULL;
+		lock->lock.sq_tail = NULL;
 		lock->lock.head = NULL;
 		lock->lock.tail = NULL;
 	}
@@ -571,7 +575,7 @@ void
 LWLockRelease(LWLockId lockid)
 {
 	volatile LWLock *lock = &(LWLockArray[lockid].lock);
-	PGPROC	   *head;
+	PGPROC	   *head, *wakeupq_head, *wakeupq_tail;
 	PGPROC	   *proc;
 	int			i;
 
@@ -610,21 +614,48 @@ LWLockRelease(LWLockId lockid)
 	 * if someone has already awakened waiters that haven't yet acquired the
 	 * lock.
 	 */
-	head = lock->head;
-	if (head != NULL)
+	if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
 	{
-		if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
+		/*
+		 * First process any pending requests and add those processes to the
+		 * list of to-be-awaken processes.
+		 */
+		head = lock->sq_head;
+		while (head != NULL)
+		{
+			proc = head;
+
+			Assert(proc->wqdata.wq_reqpending);
+			
+			/* Finish the work */
+			proc->wqdata.wq_exec(&proc->wqdata);
+			proc->wqdata.wq_reqpending = false;
+
+			head = proc->lwWaitLink;
+		}
+		
+		/*
+		 * We must wake up all those processes which are waiting for their
+		 * pending requests to be processed
+		 */
+		wakeupq_head = lock->sq_head;
+		wakeupq_tail = lock->sq_tail;
+
+		lock->sq_head = lock->sq_tail = NULL;
+
+		/*
+		 * Remove the to-be-awakened PGPROCs from the queue.  If the front
+		 * waiter wants exclusive lock, awaken him only. Otherwise awaken
+		 * as many waiters as want shared access.
+		 */
+		head = lock->head;
+		if (head != NULL)
 		{
-			/*
-			 * Remove the to-be-awakened PGPROCs from the queue.  If the front
-			 * waiter wants exclusive lock, awaken him only. Otherwise awaken
-			 * as many waiters as want shared access.
-			 */
 			proc = head;
 			if (!proc->lwExclusive)
 			{
 				while (proc->lwWaitLink != NULL &&
-					   !proc->lwWaitLink->lwExclusive)
+						!proc->lwWaitLink->lwExclusive)
 					proc = proc->lwWaitLink;
 			}
 			/* proc is now the last PGPROC to be released */
@@ -633,11 +664,19 @@ LWLockRelease(LWLockId lockid)
 			/* prevent additional wakeups until retryer gets to run */
 			lock->releaseOK = false;
 		}
+
+		/*
+		 * Add any other processes to be woken up to the list
+		 */
+		if (wakeupq_head)
+			wakeupq_tail->lwWaitLink = head;
 		else
-		{
-			/* lock is still held, can't awaken anything */
-			head = NULL;
-		}
+			wakeupq_head = head;
+	}
+	else
+	{
+		/* lock is still held, can't awaken anything */
+		wakeupq_head = NULL;
 	}
 
 	/* We are done updating shared state of the lock itself. */
@@ -648,6 +687,7 @@ LWLockRelease(LWLockId lockid)
 	/*
 	 * Awaken any waiters I removed from the queue.
 	 */
+	head = wakeupq_head;
 	while (head != NULL)
 	{
 		LOG_LWDEBUG("LWLockRelease", lockid, "release waiter");
@@ -685,6 +725,180 @@ LWLockReleaseAll(void)
 	}
 }
 
+/*
+ * Acquire the LWLock and execute the given task. If the lock is not available,
+ * we queue the task which will be executed when the lock becomes available.
+ * Even though, both exclusive and shared tasks may be performed, its useful to
+ * use this interface for tasks that need exclusive access to the lock. The
+ * task must be very short and must not perform any IO since it may be done
+ * while holding the spinlock. Also, any ereports must be avoided while
+ * executing the task.
+ *
+ * The caller can assume that the task is finished successfully when the
+ * function returns. If there is any out information, that will be returned
+ * in the MyProc->wqdata shared area.
+ */ 
+void
+LWLockExecute(LWLockId lockid, LWLockMode mode)
+{
+	volatile LWLock *lock = &(LWLockArray[lockid].lock);
+	PGPROC	   *proc = MyProc;
+	WorkQueueData	*wqdata = &proc->wqdata;
+	int			extraWaits = 0;
+	bool		mustwait;
+
+	PRINT_LWDEBUG("LWLockExecute", lockid, lock);
+
+#ifdef LWLOCK_STATS
+	/* Set up local count state first time through in a given process */
+	if (counts_for_pid != MyProcPid)
+	{
+		int		   *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
+		int			numLocks = LWLockCounter[1];
+
+		sh_acquire_counts = calloc(numLocks, sizeof(int));
+		ex_acquire_counts = calloc(numLocks, sizeof(int));
+		block_counts = calloc(numLocks, sizeof(int));
+		counts_for_pid = MyProcPid;
+		on_shmem_exit(print_lwlock_stats, 0);
+	}
+	/* Count lock acquisition attempts */
+	if (mode == LW_EXCLUSIVE)
+		ex_acquire_counts[lockid]++;
+	else
+		sh_acquire_counts[lockid]++;
+#endif   /* LWLOCK_STATS */
+
+	/*
+	 * We can't wait if we haven't got a PGPROC.  This should only occur
+	 * during bootstrap or shared memory initialization.  Put an Assert here
+	 * to catch unsafe coding practices.
+	 */
+	Assert(!(proc == NULL && IsUnderPostmaster));
+
+	/*
+	 * Lock out cancel/die interrupts until we exit the code section protected
+	 * by the LWLock.  This ensures that interrupts will not interfere with
+	 * manipulations of data structures in shared memory.
+	 */
+	HOLD_INTERRUPTS();
+
+	/* Acquire mutex.  Time spent holding mutex should be short! */
+	SpinLockAcquire(&lock->mutex);
+
+	/* If I can get the lock, execute the task and we are done */
+	if (mode == LW_EXCLUSIVE)
+	{
+		if (lock->exclusive == 0 && lock->shared == 0)
+			mustwait = false;
+		else
+			mustwait = true;
+	}
+	else
+	{
+		if (lock->exclusive == 0)
+			mustwait = false;
+		else
+			mustwait = true;
+	}
+
+	if (!mustwait)
+	{
+		/*
+		 * No other process in critical section and none can get in until we
+		 * are holding the mutex
+		 *
+		 * Invoke callback for work execution. We are still holding the
+		 * mutex, so the callback must be short and quick. We rely that the
+		 * users of the function will be smart enough to know that
+		 *
+		 * XXX Alternatively, we can acquire the LWLock in the desired mode
+		 * (lock->shared/lock->exclusive must be incremented above), execute
+		 * the task and then release the lock. May be the caller can pass
+		 * additional information to choose the desired behavior. For now, just
+		 * do this while holding the mutex.
+		 */
+		wqdata->wq_exec(wqdata);
+		/* Can release the mutex now */
+		SpinLockRelease(&lock->mutex);
+	}
+	else
+	{
+		/*
+		 * Add myself to wait queue.
+		 *
+		 * If we don't have a PGPROC structure, there's no way to wait. This
+		 * should never occur, since MyProc should only be null during shared
+		 * memory initialization.
+		 */
+		if (proc == NULL)
+			elog(PANIC, "cannot wait without a PGPROC structure");
+
+		proc->lwWaiting = true;
+		proc->lwExclusive = (mode == LW_EXCLUSIVE);
+		proc->lwWaitLink = NULL;
+
+		/* Mark the request as pending. Its of not much use right now */
+		wqdata->wq_reqpending = true;
+
+		if (lock->sq_head == NULL)
+			lock->sq_head = proc;
+		else
+			lock->sq_tail->lwWaitLink = proc;
+		lock->sq_tail = proc;
+
+		/* XXX Should we set lock->releaseOK to true here ? */
+		/* lock->releaseOK = true; */
+
+		/* Can release the mutex now */
+		SpinLockRelease(&lock->mutex);
+
+		/*
+		 * Wait until awakened.
+		 *
+		 * Since we share the process wait semaphore with the regular lock
+		 * manager and ProcWaitForSignal, and we may need to acquire an LWLock
+		 * while one of those is pending, it is possible that we get awakened
+		 * for a reason other than being signaled by LWLockRelease. If so,
+		 * loop back and wait again.  Once we've gotten the LWLock,
+		 * re-increment the sema by the number of additional signals received,
+		 * so that the lock manager or signal manager will see the received
+		 * signal when it next waits.
+		 */
+		LOG_LWDEBUG("LWLockExecute", lockid, "waiting");
+
+#ifdef LWLOCK_STATS
+		block_counts[lockid]++;
+#endif
+
+		TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode);
+
+		for (;;)
+		{
+			/* "false" means cannot accept cancel/die interrupt here. */
+			PGSemaphoreLock(&proc->sem, false);
+			if (!proc->lwWaiting)
+				break;
+			extraWaits++;
+		}
+
+		TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode);
+
+		LOG_LWDEBUG("LWLockExecute", lockid, "awakened");
+
+		TRACE_POSTGRESQL_LWLOCK_ACQUIRE(lockid, mode);
+		/*
+		 * Fix the process wait semaphore's count for any absorbed wakeups.
+		 */
+		while (extraWaits-- > 0)
+			PGSemaphoreUnlock(&proc->sem);
+	}
+
+	/*
+	 * Now okay to allow cancel/die interrupts.
+	 */
+	RESUME_INTERRUPTS();
+}
 
 /*
  * LWLockHeldByMe - test whether my process currently holds a lock
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 438a48d..7c2d99a 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -14,6 +14,8 @@
 #ifndef LWLOCK_H
 #define LWLOCK_H
 
+#include "storage/wqueue.h"
+
 /*
  * It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS
  * here, but we need them to set up enum LWLockId correctly, and having
@@ -114,5 +116,6 @@ extern Size LWLockShmemSize(void);
 extern void CreateLWLocks(void);
 
 extern void RequestAddinLWLocks(int n);
+extern void LWLockExecute(LWLockId lockid, LWLockMode mode);
 
 #endif   /* LWLOCK_H */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 6e798b1..34874f2 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -154,6 +154,9 @@ struct PGPROC
 	Oid			fpRelId[FP_LOCK_SLOTS_PER_BACKEND]; /* slots for rel oids */
 	bool		fpVXIDLock;		/* are we holding a fast-path VXID lock? */
 	LocalTransactionId fpLocalTransactionId;	/* lxid for fast-path VXID lock */
+
+	/* Shared work queue */
+	WorkQueueData	wqdata;
 };
 
 /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */
diff --git a/src/include/storage/wqueue.h b/src/include/storage/wqueue.h
new file mode 100644
index 0000000..7ee5cd3
--- /dev/null
+++ b/src/include/storage/wqueue.h
@@ -0,0 +1,44 @@
+/*
+ * wqueue.h
+ *
+ * 	Implement shared work queue for processing
+ *
+ * 	src/include/storage/wqueue.h
+ */
+
+#ifndef _WQUEUE_H
+#define _WQUEUE_H
+
+typedef enum WQRequestType
+{
+	WQ_NO_REQUEST = 0,
+	WQ_END_TRANSACTION
+} WQRequestType;
+
+typedef union WQRequestIn
+{
+	struct {
+		void			*proc;
+		TransactionId	latestXid;
+	} wqin_end_xact;
+} WQRequestIn;
+
+typedef union WQRequestOut
+{
+	bool	wqout_status;
+} WQRequestOut;
+
+struct WorkQueueData;
+
+typedef bool (*WQExecuteReq) (struct WorkQueueData *wqdata);
+
+typedef struct WorkQueueData
+{
+	WQRequestType	wq_reqtype;
+	bool			wq_reqpending;
+	WQRequestIn		wq_reqin;
+	WQRequestOut	wq_reqout;
+	WQExecuteReq	wq_exec;
+} WorkQueueData;
+
+#endif
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to