I spent some time cleaning this up. Details below, but here are the highlights:

* Reverted the removal of wal_writer_delay
* Doesn't rely on WAL writer. Any process can act as the "leader" now.
* Removed heuristic on big flushes
* Uses PGSemaphoreLock/Unlock instead of latches

On 20.01.2012 17:30, Robert Haas wrote:
On Thu, Jan 19, 2012 at 10:46 PM, Peter Geoghegan<pe...@2ndquadrant.com>  wrote:
+       if (delta>  XLOG_SEG_SIZE * CheckPointSegments ||
+                       !ProcGlobal->groupCommitAvailable)

That seems like a gigantic value.  I would think that we'd want to
forget about group commit any time we're behind by more than one
segment (maybe less).

I'm sure that you're right - I myself described it as horrible in my
original mail. I think that whatever value we set this to ought to be
well reasoned. Right now, your magic number doesn't seem much better
than my bogus heuristic (which only ever served as a placeholder
implementation that hinted at a well-principled formula). What's your
basis for suggesting that that limit would always be close to optimal?

It's probably not - I suspect even a single WAL segment still too
large.  I'm pretty sure that it would be safe to always flush up to
the next block boundary, because we always write the whole block
anyway even if it's only partially filled.  So there's no possible
regression there.  Anything larger than "the end of the current 8kB
block" is going to be a trade-off between latency and throughput, and
it seems to me that there's probably only one way to figure out what's
reasonable: test a bunch of different values and see which ones
perform well in practice.  Maybe we could answer part of the question
by writing a test program which does repeated sequential writes of
increasingly-large chunks of data.  We might find out for example that
64kB is basically the same as 8kB because most of the overhead is in
the system call anyway, but beyond that the curve kinks.  Or it may be
that 16kB is already twice as slow as 8kB, or that you can go up to
1MB without a problem.  I don't see a way to know that without testing
it on a couple different pieces of hardware and seeing what happens.

I ripped away that heuristic for a flush that's "too large". After pondering it for a while, I came to the conclusion that as implemented in the patch, it was pointless. The thing is, if the big flush doesn't go through the group commit machinery, it's going to grab the WALWriteLock straight away. Before any smaller commits can proceed, they will need to grab that lock anyway, so the effect is the same as if the big flush had just joined the queue.

Maybe we should have a heuristic to split a large flush into smaller chunks. The WAL segment boundary would be a quite natural split point, for example, because when crossing the file boundary you have to issue separate fsync()s for the files anyway. But I'm happy with leaving that out for now, it's not any worse than it used to be without group commit anyway.

Ugh.  Our current system doesn't even require this code to be
interruptible, I think: you can't receive cancel or die interrupts
either while waiting for an LWLock or while holding it.

Right. Or within HOLD/RESUME_INTERRUPTS blocks.

The patch added some CHECK_FOR_INTERRUPTS() calls into various places in the commit/abort codepaths, but that's all dead code; they're all within a HOLD/RESUME_INTERRUPTS blocks.

I replaced the usage of latches with the more traditional PGSemaphoreLock/Unlock. It semaphore model works just fine in this case, where we have a lwlock to guard the wait queue, and when a process is waiting we know it will be woken up or something messed up at a pretty low level. We don't need a timeout or to wake up at other signals while waiting. Furthermore, the WAL writer didn't have a latch before this patch; it's not many lines of code to initialize the latch and set up the signal handler for it, but it already has a semaphore that's ready to use.

I wonder if we should rename the file into "xlogflush.c" or something like that, to make it clear that this works with any XLOG flushes, not just commits? Group commit is the usual term for this feature, so we should definitely mention that in the comments, but it might be better to rename the files/functions to emphasize that this is about WAL flushing in general.

This probably needs some further cleanup to fix things I've broken, and I haven't done any performance testing, but it's progress. Do you have a shell script or something that you used for the performance tests that I could run? Or would you like to re-run the tests you did earlier with this patch?

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 1760,1809 **** SET ENABLE_SEQSCAN TO OFF;
        </listitem>
       </varlistentry>
  
-      <varlistentry id="guc-commit-delay" xreflabel="commit_delay">
-       <term><varname>commit_delay</varname> (<type>integer</type>)</term>
-       <indexterm>
-        <primary><varname>commit_delay</> configuration parameter</primary>
-       </indexterm>
-       <listitem>
-        <para>
-         When the commit data for a transaction is flushed to disk, any
-         additional commits ready at that time are also flushed out.
-         <varname>commit_delay</varname> adds a time delay, set in
-         microseconds, before a transaction attempts to
-         flush the WAL buffer out to disk.  A nonzero delay can allow more
-         transactions to be committed with only one flush operation, if
-         system load is high enough that additional transactions become
-         ready to commit within the given interval. But the delay is
-         just wasted if no other transactions become ready to
-         commit. Therefore, the delay is only performed if at least
-         <varname>commit_siblings</varname> other transactions are
-         active at the instant that a server process has written its
-         commit record.
-         The default <varname>commit_delay</> is zero (no delay).
-         Since all pending commit data will be written at every flush
-         regardless of this setting, it is rare that adding delay
-         by increasing this parameter will actually improve performance.
-        </para>
-       </listitem>
-      </varlistentry>
- 
-      <varlistentry id="guc-commit-siblings" xreflabel="commit_siblings">
-       <term><varname>commit_siblings</varname> (<type>integer</type>)</term>
-       <indexterm>
-        <primary><varname>commit_siblings</> configuration parameter</primary>
-       </indexterm>
-       <listitem>
-        <para>
-         Minimum number of concurrent open transactions to require
-         before performing the <varname>commit_delay</> delay. A larger
-         value makes it more probable that at least one other
-         transaction will become ready to commit during the delay
-         interval. The default is five transactions.
-        </para>
-       </listitem>
-      </varlistentry>
- 
       </variablelist>
       </sect2>
       <sect2 id="runtime-config-wal-checkpoints">
--- 1760,1765 ----
*** a/doc/src/sgml/wal.sgml
--- b/doc/src/sgml/wal.sgml
***************
*** 367,386 ****
     of data corruption.
    </para>
  
-   <para>
-    <xref linkend="guc-commit-delay"> also sounds very similar to
-    asynchronous commit, but it is actually a synchronous commit method
-    (in fact, <varname>commit_delay</varname> is ignored during an
-    asynchronous commit).  <varname>commit_delay</varname> causes a delay
-    just before a synchronous commit attempts to flush
-    <acronym>WAL</acronym> to disk, in the hope that a single flush
-    executed by one such transaction can also serve other transactions
-    committing at about the same time.  Setting <varname>commit_delay</varname>
-    can only help when there are many concurrently committing transactions,
-    and it is difficult to tune it to a value that actually helps rather
-    than hurt throughput.
-   </para>
- 
   </sect1>
  
   <sect1 id="wal-configuration">
--- 367,372 ----
***************
*** 556,579 ****
    </para>
  
    <para>
-    The <xref linkend="guc-commit-delay"> parameter defines for how many
-    microseconds the server process will sleep after writing a commit
-    record to the log with <function>LogInsert</function> but before
-    performing a <function>LogFlush</function>. This delay allows other
-    server processes to add their commit records to the log so as to have all
-    of them flushed with a single log sync. No sleep will occur if
-    <xref linkend="guc-fsync">
-    is not enabled, or if fewer than <xref linkend="guc-commit-siblings">
-    other sessions are currently in active transactions; this avoids
-    sleeping when it's unlikely that any other session will commit soon.
-    Note that on most platforms, the resolution of a sleep request is
-    ten milliseconds, so that any nonzero <varname>commit_delay</varname>
-    setting between 1 and 10000 microseconds would have the same effect.
-    Good values for these parameters are not yet clear; experimentation
-    is encouraged.
-   </para>
- 
-   <para>
     The <xref linkend="guc-wal-sync-method"> parameter determines how
     <productname>PostgreSQL</productname> will ask the kernel to force
     <acronym>WAL</acronym> updates out to disk.
--- 542,547 ----
*** a/src/backend/access/transam/Makefile
--- b/src/backend/access/transam/Makefile
***************
*** 13,19 **** top_builddir = ../../../..
  include $(top_builddir)/src/Makefile.global
  
  OBJS = clog.o transam.o varsup.o xact.o rmgr.o slru.o subtrans.o multixact.o \
! 	twophase.o twophase_rmgr.o xlog.o xlogfuncs.o xlogutils.o
  
  include $(top_srcdir)/src/backend/common.mk
  
--- 13,19 ----
  include $(top_builddir)/src/Makefile.global
  
  OBJS = clog.o transam.o varsup.o xact.o rmgr.o slru.o subtrans.o multixact.o \
! 	twophase.o twophase_rmgr.o xlog.o xlogfuncs.o xlogutils.o groupcommit.o
  
  include $(top_srcdir)/src/backend/common.mk
  
*** /dev/null
--- b/src/backend/access/transam/groupcommit.c
***************
*** 0 ****
--- 1,346 ----
+ /*-------------------------------------------------------------------------
+  *
+  * groupcommit.c
+  *
+  * Group Commit is new as of PostgreSQL 9.2. It is a performance optimization
+  * where the fsyncing of WAL is batched across many more-or-less concurrently
+  * committing transactions, thereby amortizing the cost of that notoriously
+  * expensive operation, and potentially greatly increasing transaction
+  * throughput.
+  *
+  * Transactions requesting XlogFlush() wait until the WALWriter has performed
+  * the write for them, then will be woken to continue.  This module contains the
+  * code for waiting and release of backends.
+  *
+  * Manage waiting backends by having a single ordered queue of waiting backends,
+  * so that we can avoid searching through all waiters each time the WAL Writer
+  * writes/fsyncs.
+  *
+  * Copyright (c) 2010-2012, PostgreSQL Global Development Group
+  *
+  * IDENTIFICATION
+  *	  src/backend/access/transam/groupcommit.c
+  *
+  *-------------------------------------------------------------------------
+  */
+ 
+ #include "postgres.h"
+ 
+ #include <unistd.h>
+ 
+ #include "access/groupcommit.h"
+ #include "access/xact.h"
+ #include "access/xlog.h"
+ #include "access/xlog_internal.h"
+ #include "miscadmin.h"
+ #include "storage/pmsignal.h"
+ #include "storage/proc.h"
+ #include "tcop/tcopprot.h"
+ #include "utils/builtins.h"
+ 
+ /*
+  * Shared memory structure
+  */
+ typedef struct
+ {
+ 	SHM_QUEUE	queue;		/* list of processes waiting to be woken up */
+ 	PGPROC	   *driver;		/* process currently flushing the WAL */
+ } GroupCommitData;
+ 
+ static GroupCommitData *gcShared;
+ 
+ 
+ static bool amdriving = false;
+ 
+ /* Shmem functions */
+ Size
+ GroupCommitShmemSize(void)
+ {
+ 	return sizeof(GroupCommitData);
+ }
+ 
+ void
+ GroupCommitShmemInit(void)
+ {
+ 	bool		found;
+ 
+ 	/*
+ 	 * Create or attach to the GroupCommitShared structure.
+ 	 *
+ 	 */
+ 	gcShared = (GroupCommitData *)
+ 		ShmemInitStruct("Group Commit", GroupCommitShmemSize(), &found);
+ 
+ 	if (!found)
+ 	{
+ 		/* First time through, so initialize it */
+ 		SHMQueueInit(&gcShared->queue);
+ 		gcShared->driver = NULL;
+ 	}
+ }
+ 
+ /*
+  * ================================================
+  * Group Commit functions for normal user backends
+  * ================================================
+  */
+ 
+ /*
+  * Wait for group commit, and then return true, if group commit serviced the
+  * request (not necessarily successfully). Otherwise, return false and fastpath
+  * out of here, allowing the backend to make alternative arrangements to flush
+  * its WAL in a more granular fashion. This can happen because the record that
+  * the backend requests to have flushed in so far into the future that to group
+  * commit it would
+  *
+  * Initially backends start in state GROUP_COMMIT_INIT. Backends that service
+  * database connections will then change that state to GROUP_COMMIT_NOT_WAITING
+  * before adding itself to the wait queue. During GroupCommitWakeQueue(), the
+  * WAL Writer changes the state to GROUP_COMMIT_WAIT_COMPLETE once write/fsync
+  * is confirmed.
+  *
+  * The backend then resets its state to GROUP_COMMIT_NOT_WAITING.
+  */
+ void
+ GroupCommitWaitForLSN(XLogRecPtr XactCommitLSN)
+ {
+ 	Assert(SHMQueueIsDetached(&(MyProc->waitLSNLinks)));
+ 	Assert(MyProc->grpCommitState == GROUP_COMMIT_NOT_WAITING);
+ 	Assert(!LWLockHeldByMe(SyncRepLock));
+ 
+ 	Assert(!amdriving);
+ 
+ 	LWLockAcquire(GroupCommitLock, LW_EXCLUSIVE);
+ 
+ 	/* See if we have to drive */
+ 	if (gcShared->driver == NULL)
+ 	{
+ 		amdriving = true;
+ 		gcShared->driver = MyProc;
+ 		LWLockRelease(GroupCommitLock);
+ 
+ 		DoXLogFlush(XactCommitLSN);
+ 		return;
+ 	}
+ 	else
+ 	{
+ 		/*
+ 		 * Set our waitLSN, add ourselves to the queue, and Wait for the
+ 		 * driver to wake us up
+ 		 */
+ 		MyProc->waitLSN = XactCommitLSN;
+ 		MyProc->grpCommitState = GROUP_COMMIT_WAITING;
+ 		BatchCommitQueueInsert(&gcShared->queue);
+ 		Assert(BatchCommitQueueIsOrderedByLSN(&gcShared->queue));
+ 
+ 		LWLockRelease(GroupCommitLock);
+ 
+ #ifdef GROUP_COMMIT_DEBUG
+ 		elog(LOG, "waiting for LSN %X/%X",
+ 			 XactCommitLSN.xlogid,
+ 			 XactCommitLSN.xrecoff);
+ #endif
+ 
+ 		/*
+ 		 * Wait for specified LSN to be confirmed.
+ 		 *
+ 		 * Each proc has its own wait latch, so we perform a normal latch
+ 		 * check/wait loop here.
+ 		 */
+ 		for (;;)
+ 		{
+ 			GroupCommitState CommitState;
+ 
+ 			/*
+ 			 * Wait until the driver services our request to flush WAL (or
+ 			 * wakes us up to become the next driver).
+ 			 */
+ 			PGSemaphoreLock(&MyProc->sem, true);
+ 
+ 			/*
+ 			 * Try checking the state without the lock first.  There's no
+ 			 * guarantee that we'll read the most up-to-date value, so if it
+ 			 * looks like we're still waiting, recheck while holding the lock.
+ 			 * But if it looks like we're done, we must really be done, because
+ 			 * once the driver updates our state to GROUP_COMMIT_WAIT_COMPLETE,
+ 			 * it will never update it again, so we can't be seeing a stale
+ 			 * value in that case. Control will never reach here if its value
+ 			 * is GROUP_COMMIT_INIT, as it is in the case of auxiliary
+ 			 * processes.
+ 			 */
+ 			CommitState = ((volatile PGPROC *) MyProc)->grpCommitState;
+ 			Assert(CommitState != GROUP_COMMIT_NOT_WAITING);
+ 			if (CommitState == GROUP_COMMIT_WAIT_COMPLETE)
+ 				break;
+ 			if (CommitState == GROUP_COMMIT_WAITING)
+ 			{
+ 				LWLockAcquire(GroupCommitLock, LW_EXCLUSIVE);
+ 				CommitState = MyProc->grpCommitState;
+ 
+ 				/*
+ 				 * If we're still not finished, and no-one is driving,
+ 				 * take the driver's seat
+ 				 */
+ 				if (CommitState == GROUP_COMMIT_WAIT_COMPLETE)
+ 				{
+ 					LWLockRelease(GroupCommitLock);
+ 					break;
+ 				}
+ 				if (CommitState == GROUP_COMMIT_WAITING && gcShared->driver == NULL)
+ 				{
+ 					/* Remove ourself from the queue */
+ 					SHMQueueDelete(&(MyProc->waitLSNLinks));
+ 					MyProc->grpCommitState = GROUP_COMMIT_NOT_WAITING;
+ 					MyProc->waitLSN.xlogid = 0;
+ 					MyProc->waitLSN.xrecoff = 0;
+ 
+ 					amdriving = true;
+ 					gcShared->driver = MyProc;
+ 					LWLockRelease(GroupCommitLock);
+ 
+ 					DoXLogFlush(XactCommitLSN);
+ 					return;
+ 				}
+ 
+ 				LWLockRelease(GroupCommitLock);
+ 			}
+ 		}
+ 
+ 		/*
+ 		 * The WAL has been flushed up to our LSN and we have been removed from
+ 		 * the queue. Clean up state and leave.  It's OK to reset these shared
+ 		 * memory fields without holding GroupCommitLock, because the WAL
+ 		 * writer will ignore us anyway when we're not on the queue.
+ 		 */
+ 		Assert(SHMQueueIsDetached(&(MyProc->waitLSNLinks)));
+ 		MyProc->grpCommitState = GROUP_COMMIT_NOT_WAITING;
+ 		MyProc->waitLSN.xlogid = 0;
+ 		MyProc->waitLSN.xrecoff = 0;
+ 	}
+ }
+ 
+ /*
+  * Walk queue from head.  Set the state of any backends that need to be woken
+  * due to waiting on an LSN up to flushLSN, then remove them from the queue and
+  * wake them up.
+  *
+  * Caller must hold GroupCommitLock in exclusive mode.
+  */
+ void
+ GroupCommitReleaseWaiters(XLogRecPtr flushLSN)
+ {
+ 	PGPROC	   *proc = NULL;
+ 	PGPROC	   *thisproc = NULL;
+ 	int			numprocs = 0;
+ 
+ 	LWLockAcquire(GroupCommitLock, LW_EXCLUSIVE);
+ 
+ 	if (gcShared->driver == MyProc)
+ 		gcShared->driver = NULL;
+ 	amdriving = false;
+ 
+ 	Assert(BatchCommitQueueIsOrderedByLSN(&gcShared->queue));
+ 	Assert(!LWLockHeldByMe(SyncRepLock));
+ 
+ 	proc = (PGPROC *) SHMQueueNext(&gcShared->queue,
+ 								   &gcShared->queue,
+ 								   offsetof(PGPROC, waitLSNLinks));
+ 
+ 	while (proc)
+ 	{
+ 		/*
+ 		 * Assume the queue is ordered by LSN (assumption is verified elsewhere
+ 		 * by assertions).
+ 		 */
+ 		if (XLByteLT(flushLSN, proc->waitLSN))
+ 			break;
+ 
+ 		/*
+ 		 * Move to next proc, so we can delete thisproc from the queue.
+ 		 * thisproc is valid, proc may be NULL after this.
+ 		 */
+ 		thisproc = proc;
+ 		proc = (PGPROC *) SHMQueueNext(&gcShared->queue,
+ 									   &(proc->waitLSNLinks),
+ 									   offsetof(PGPROC, waitLSNLinks));
+ 
+ 		/*
+ 		 * Set state to complete; see GroupCommitWaitForLSN() for discussion of
+ 		 * the various states.
+ 		 */
+ 		thisproc->grpCommitState = GROUP_COMMIT_WAIT_COMPLETE;
+ 		/*
+ 		 * Remove thisproc from queue.
+ 		 */
+ 		SHMQueueDelete(&(thisproc->waitLSNLinks));
+ 
+ 		/*
+ 		 * Wake only when we have set state and removed from queue.
+ 		 */
+ 		PGSemaphoreUnlock(&thisproc->sem);
+ 
+ 		numprocs++;
+ 	}
+ 
+ 	/*
+ 	 * Finally, wake up the process that's *last* in the queue, with the
+ 	 * highest LSN. He's not done yet, but will become the next driver.
+ 	 */
+ 	if (gcShared->driver == NULL)
+ 	{
+ 		proc = (PGPROC *) SHMQueuePrev(&gcShared->queue,
+ 									   &gcShared->queue,
+ 									   offsetof(PGPROC, waitLSNLinks));
+ 		if (proc)
+ 			PGSemaphoreUnlock(&proc->sem);
+ 	}
+ 
+ 	LWLockRelease(GroupCommitLock);
+ 
+ #ifdef GROUP_COMMIT_DEBUG
+ 	elog(LOG, "released %d procs up to %X/%X",
+ 		 numprocs,
+ 		 flushLSN.xlogid,
+ 		 flushLSN.xrecoff);
+ #endif
+ }
+ 
+ void
+ GroupCommitCleanupAtProcExit(void)
+ {
+ 	PGPROC	   *proc;
+ 
+ 	/* XXX: check if we're driving, and clear if so */
+ 	if (!SHMQueueIsDetached(&(MyProc->waitLSNLinks)))
+ 	{
+ 		Assert(MyProc->grpCommitState != GROUP_COMMIT_NOT_WAITING);
+ 		LWLockAcquire(GroupCommitLock, LW_EXCLUSIVE);
+ 		SHMQueueDelete(&(MyProc->waitLSNLinks));
+ 		LWLockRelease(GroupCommitLock);
+ 	}
+ 	if (amdriving)
+ 	{
+ 		amdriving = false;
+ 		LWLockAcquire(GroupCommitLock, LW_EXCLUSIVE);
+ 		/*
+ 		 * Zap the old driver. Presumably it was myself, but since we
+ 		 * shouldn't get here in the first place, maybe we're all confused.
+ 		 * In any case, it's always safe to clear the driver field, as long
+ 		 * as we wake up someone from the wait queue to become the new driver.
+ 		 * It's ok if two backends think they're driving.
+ 		 */
+ 		gcShared->driver = NULL;
+ 
+ 		/*
+ 		 * If the queue is not empty, wake up someone to become the new
+ 		 * driver.
+ 		 */
+ 		proc = (PGPROC *) SHMQueueNext(&gcShared->queue,
+ 									   &gcShared->queue,
+ 									   offsetof(PGPROC, waitLSNLinks));
+ 		if (proc)
+ 			PGSemaphoreUnlock(&proc->sem);
+ 
+ 		LWLockRelease(GroupCommitLock);
+ 	}
+ }
*** a/src/backend/access/transam/xact.c
--- b/src/backend/access/transam/xact.c
***************
*** 67,75 **** bool		XactDeferrable;
  
  int			synchronous_commit = SYNCHRONOUS_COMMIT_ON;
  
- int			CommitDelay = 0;	/* precommit delay in microseconds */
- int			CommitSiblings = 5; /* # concurrent xacts needed to sleep */
- 
  /*
   * MyXactAccessedTempRel is set when a temporary relation is accessed.
   * We don't allow PREPARE TRANSACTION in that case.  (This is global
--- 67,72 ----
***************
*** 1094,1115 **** RecordTransactionCommit(void)
  	if ((wrote_xlog && synchronous_commit > SYNCHRONOUS_COMMIT_OFF) ||
  		forceSyncCommit || nrels > 0)
  	{
- 		/*
- 		 * Synchronous commit case:
- 		 *
- 		 * Sleep before flush! So we can flush more than one commit records
- 		 * per single fsync.  (The idea is some other backend may do the
- 		 * XLogFlush while we're sleeping.  This needs work still, because on
- 		 * most Unixen, the minimum select() delay is 10msec or more, which is
- 		 * way too long.)
- 		 *
- 		 * We do not sleep if enableFsync is not turned on, nor if there are
- 		 * fewer than CommitSiblings other backends with active transactions.
- 		 */
- 		if (CommitDelay > 0 && enableFsync &&
- 			MinimumActiveBackends(CommitSiblings))
- 			pg_usleep(CommitDelay);
- 
  		XLogFlush(XactLastRecEnd);
  
  		/*
--- 1091,1096 ----
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 24,29 ****
--- 24,30 ----
  #include <unistd.h>
  
  #include "access/clog.h"
+ #include "access/groupcommit.h"
  #include "access/multixact.h"
  #include "access/subtrans.h"
  #include "access/transam.h"
***************
*** 2020,2033 **** UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force)
   * Ensure that all XLOG data through the given position is flushed to disk.
   *
   * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
!  * already held, and we try to avoid acquiring it if possible.
   */
  void
  XLogFlush(XLogRecPtr record)
  {
- 	XLogRecPtr	WriteRqstPtr;
- 	XLogwrtRqst WriteRqst;
- 
  	/*
  	 * During REDO, we are reading not writing WAL.  Therefore, instead of
  	 * trying to flush the WAL, we should update minRecoveryPoint instead. We
--- 2021,2036 ----
   * Ensure that all XLOG data through the given position is flushed to disk.
   *
   * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
!  * already held, and we try to avoid acquiring it if possible. Moreover,
!  * XLogFlush is the main and only entry point for group commit.
!  *
!  * We'll actually call XLogWrite from here in the event of not being able to
!  * participate in a group commit, due to it a backend requesting the flushing of
!  * an LSN that is very far from the flush pointer's current position.
   */
  void
  XLogFlush(XLogRecPtr record)
  {
  	/*
  	 * During REDO, we are reading not writing WAL.  Therefore, instead of
  	 * trying to flush the WAL, we should update minRecoveryPoint instead. We
***************
*** 2059,2079 **** XLogFlush(XLogRecPtr record)
  	 * Since fsync is usually a horribly expensive operation, we try to
  	 * piggyback as much data as we can on each fsync: if we see any more data
  	 * entered into the xlog buffer, we'll write and fsync that too, so that
! 	 * the final value of LogwrtResult.Flush is as large as possible. This
! 	 * gives us some chance of avoiding another fsync immediately after.
  	 */
  
- 	/* initialize to given target; may increase below */
- 	WriteRqstPtr = record;
- 
  	/* read LogwrtResult and update local state */
  	{
  		/* use volatile pointer to prevent code rearrangement */
  		volatile XLogCtlData *xlogctl = XLogCtl;
  
  		SpinLockAcquire(&xlogctl->info_lck);
! 		if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
! 			WriteRqstPtr = xlogctl->LogwrtRqst.Write;
  		LogwrtResult = xlogctl->LogwrtResult;
  		SpinLockRelease(&xlogctl->info_lck);
  	}
--- 2062,2083 ----
  	 * Since fsync is usually a horribly expensive operation, we try to
  	 * piggyback as much data as we can on each fsync: if we see any more data
  	 * entered into the xlog buffer, we'll write and fsync that too, so that
! 	 * the final value of LogwrtResult. Flush is as large as is reasonable; in
! 	 * the group commit case (i.e. ordinary multiuser backends that a group
! 	 * commit would be useful for), that will be as many backends as are queued
! 	 * up.
  	 */
  
  	/* read LogwrtResult and update local state */
  	{
  		/* use volatile pointer to prevent code rearrangement */
  		volatile XLogCtlData *xlogctl = XLogCtl;
  
  		SpinLockAcquire(&xlogctl->info_lck);
! 		if (XLByteLT(xlogctl->LogwrtRqst.Write, record))
! 			xlogctl->LogwrtRqst.Write = record;
! 		if (XLByteLT(xlogctl->LogwrtRqst.Flush, record))
! 			xlogctl->LogwrtRqst.Flush = record;
  		LogwrtResult = xlogctl->LogwrtResult;
  		SpinLockRelease(&xlogctl->info_lck);
  	}
***************
*** 2081,2116 **** XLogFlush(XLogRecPtr record)
  	/* done already? */
  	if (!XLByteLE(record, LogwrtResult.Flush))
  	{
! 		/* now wait for the write lock */
! 		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
! 		LogwrtResult = XLogCtl->Write.LogwrtResult;
! 		if (!XLByteLE(record, LogwrtResult.Flush))
! 		{
! 			/* try to write/flush later additions to XLOG as well */
! 			if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
! 			{
! 				XLogCtlInsert *Insert = &XLogCtl->Insert;
! 				uint32		freespace = INSERT_FREESPACE(Insert);
  
! 				if (freespace < SizeOfXLogRecord)		/* buffer is full */
! 					WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
! 				else
! 				{
! 					WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
! 					WriteRqstPtr.xrecoff -= freespace;
! 				}
! 				LWLockRelease(WALInsertLock);
! 				WriteRqst.Write = WriteRqstPtr;
! 				WriteRqst.Flush = WriteRqstPtr;
! 			}
! 			else
! 			{
! 				WriteRqst.Write = WriteRqstPtr;
! 				WriteRqst.Flush = record;
! 			}
! 			XLogWrite(WriteRqst, false, false);
! 		}
! 		LWLockRelease(WALWriteLock);
  	}
  
  	END_CRIT_SECTION();
--- 2085,2100 ----
  	/* done already? */
  	if (!XLByteLE(record, LogwrtResult.Flush))
  	{
! 		/* use volatile pointer to prevent code rearrangement */
! 		volatile XLogCtlData *xlogctl = XLogCtl;
  
! 		/* group commit */
! 		GroupCommitWaitForLSN(record);
! 
! 		/* update our result */
! 		SpinLockAcquire(&xlogctl->info_lck);
! 		LogwrtResult = xlogctl->LogwrtResult;
! 		SpinLockRelease(&xlogctl->info_lck);
  	}
  
  	END_CRIT_SECTION();
***************
*** 2144,2149 **** XLogFlush(XLogRecPtr record)
--- 2128,2180 ----
  }
  
  /*
+  * A lower-level version of XLogFlush(). This is used from group commit code
+  * to actually perform a flush, while XLogFlush() is the facade for the
+  * group commit code. Got it?
+  */
+ XLogRecPtr
+ DoXLogFlush(XLogRecPtr record)
+ {
+ 	XLogRecPtr	WriteRqstPtr;
+ 	XLogwrtRqst WriteRqst;
+ 
+ 	WriteRqstPtr = record;
+ 
+ 	/* read LogwrtResult and update local state */
+ 	{
+ 		/* use volatile pointer to prevent code rearrangement */
+ 		volatile XLogCtlData *xlogctl = XLogCtl;
+ 
+ 		SpinLockAcquire(&xlogctl->info_lck);
+ 		if (XLByteLT(xlogctl->LogwrtRqst.Write, record))
+ 			WriteRqstPtr = xlogctl->LogwrtRqst.Write;
+ 		LogwrtResult = xlogctl->LogwrtResult;
+ 		SpinLockRelease(&xlogctl->info_lck);
+ 	}
+ 
+ 	/* done already? */
+ 	if (!XLByteLE(record, LogwrtResult.Flush))
+ 	{
+ 		/* now wait for the write lock */
+ 		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+ 
+ 		LogwrtResult = XLogCtl->Write.LogwrtResult;
+ 		if (!XLByteLE(record, LogwrtResult.Flush))
+ 		{
+ 			WriteRqst.Write = WriteRqstPtr;
+ 			WriteRqst.Flush = WriteRqstPtr;
+ 			XLogWrite(WriteRqst, false, false);
+ 		}
+ 		LWLockRelease(WALWriteLock);
+ 	}
+ 
+ 	/* Wake up everyone else that we flushed as a side-effect */
+ 	GroupCommitReleaseWaiters(LogwrtResult.Flush);
+ 
+ 	return LogwrtResult.Flush;
+ }
+ 
+ /*
   * Flush xlog, but without specifying exactly where to flush to.
   *
   * We normally flush only completed blocks; but if there is nothing to do on
***************
*** 2235,2240 **** XLogBackgroundFlush(void)
--- 2266,2281 ----
  	LWLockRelease(WALWriteLock);
  
  	END_CRIT_SECTION();
+ 
+ 	/*
+ 	 * Wake up everyone else that we flushed as a side-effect. If there are
+ 	 * active commits happening, the driver will get the WALWriteLock as
+ 	 * soon as we release it, and if it no longer has any work to do it will
+ 	 * wake up everyone. But if it does, ie. if our flush didn't cover the
+ 	 * current driver, it's good that we wake up everyone that we did flush,
+ 	 * or they will have to wait for another flush cycle.
+ 	 */
+ 	GroupCommitReleaseWaiters(LogwrtResult.Flush);
  }
  
  /*
*** a/src/backend/replication/syncrep.c
--- b/src/backend/replication/syncrep.c
***************
*** 34,39 ****
--- 34,45 ----
   * take some time. Once caught up, the current highest priority standby
   * will release waiters from the queue.
   *
+  * As of Postgres 9.2, some of the functions that were previously private to
+  * this module are generalized and exposed as generic "batch commit"
+  * infrastructure, for use by the group commit feature, which similarly batches
+  * commits, though it does so merely as a local performance optimization for
+  * user backends.
+  *
   * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
   *
   * IDENTIFICATION
***************
*** 65,81 **** char	   *SyncRepStandbyNames;
  #define SyncRepRequested() \
  	(max_wal_senders > 0 && synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH)
  
  static bool announce_next_takeover = true;
  
- static void SyncRepQueueInsert(void);
  static void SyncRepCancelWait(void);
  
  static int	SyncRepGetStandbyPriority(void);
  
- #ifdef USE_ASSERT_CHECKING
- static bool SyncRepQueueIsOrderedByLSN(void);
- #endif
- 
  /*
   * ===========================================================
   * Synchronous Replication functions for normal user backends
--- 71,86 ----
  #define SyncRepRequested() \
  	(max_wal_senders > 0 && synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH)
  
+ /* Convenience macros for SyncRep use of batch commit infrastructure */
+ #define SyncRepQueueInsert()			BatchCommitQueueInsert(&(WalSndCtl->SyncRepQueue))
+ #define SyncRepQueueIsOrderedByLSN()	BatchCommitQueueIsOrderedByLSN(&(WalSndCtl->SyncRepQueue))
+ 
  static bool announce_next_takeover = true;
  
  static void SyncRepCancelWait(void);
  
  static int	SyncRepGetStandbyPriority(void);
  
  /*
   * ===========================================================
   * Synchronous Replication functions for normal user backends
***************
*** 105,112 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
  	if (!SyncRepRequested() || !SyncStandbysDefined())
  		return;
  
! 	Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
  	Assert(WalSndCtl != NULL);
  
  	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
  	Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
--- 110,118 ----
  	if (!SyncRepRequested() || !SyncStandbysDefined())
  		return;
  
! 	Assert(SHMQueueIsDetached(&(MyProc->waitLSNLinks)));
  	Assert(WalSndCtl != NULL);
+ 	Assert(!LWLockHeldByMe(GroupCommitLock));
  
  	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
  	Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
***************
*** 253,259 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
  	 * holding SyncRepLock, because any walsenders will ignore us anyway when
  	 * we're not on the queue.
  	 */
! 	Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
  	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
  	MyProc->waitLSN.xlogid = 0;
  	MyProc->waitLSN.xrecoff = 0;
--- 259,265 ----
  	 * holding SyncRepLock, because any walsenders will ignore us anyway when
  	 * we're not on the queue.
  	 */
! 	Assert(SHMQueueIsDetached(&(MyProc->waitLSNLinks)));
  	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
  	MyProc->waitLSN.xlogid = 0;
  	MyProc->waitLSN.xrecoff = 0;
***************
*** 267,285 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
  }
  
  /*
!  * Insert MyProc into SyncRepQueue, maintaining sorted invariant.
   *
   * Usually we will go at tail of queue, though it's possible that we arrive
   * here out of order, so start at tail and work back to insertion point.
   */
! static void
! SyncRepQueueInsert(void)
  {
  	PGPROC	   *proc;
  
! 	proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
! 								   &(WalSndCtl->SyncRepQueue),
! 								   offsetof(PGPROC, syncRepLinks));
  
  	while (proc)
  	{
--- 273,293 ----
  }
  
  /*
!  * Insert MyProc into a queue (usually SyncRepQueue), maintaining sorted
!  * invariant.
   *
   * Usually we will go at tail of queue, though it's possible that we arrive
   * here out of order, so start at tail and work back to insertion point.
   */
! void
! BatchCommitQueueInsert(SHM_QUEUE *waitQueue)
  {
  	PGPROC	   *proc;
  
! 
! 	proc = (PGPROC *) SHMQueuePrev(waitQueue,
! 								   waitQueue,
! 								   offsetof(PGPROC, waitLSNLinks));
  
  	while (proc)
  	{
***************
*** 290,304 **** SyncRepQueueInsert(void)
  		if (XLByteLT(proc->waitLSN, MyProc->waitLSN))
  			break;
  
! 		proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
! 									   &(proc->syncRepLinks),
! 									   offsetof(PGPROC, syncRepLinks));
  	}
  
  	if (proc)
! 		SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
  	else
! 		SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue), &(MyProc->syncRepLinks));
  }
  
  /*
--- 298,312 ----
  		if (XLByteLT(proc->waitLSN, MyProc->waitLSN))
  			break;
  
! 		proc = (PGPROC *) SHMQueuePrev(waitQueue,
! 									   &(proc->waitLSNLinks),
! 									   offsetof(PGPROC, waitLSNLinks));
  	}
  
  	if (proc)
! 		SHMQueueInsertAfter(&(proc->waitLSNLinks), &(MyProc->waitLSNLinks));
  	else
! 		SHMQueueInsertAfter(waitQueue, &(MyProc->waitLSNLinks));
  }
  
  /*
***************
*** 307,315 **** SyncRepQueueInsert(void)
  static void
  SyncRepCancelWait(void)
  {
  	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
! 	if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
! 		SHMQueueDelete(&(MyProc->syncRepLinks));
  	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
  	LWLockRelease(SyncRepLock);
  }
--- 315,324 ----
  static void
  SyncRepCancelWait(void)
  {
+ 	Assert(!LWLockHeldByMe(GroupCommitLock));
  	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
! 	if (!SHMQueueIsDetached(&(MyProc->waitLSNLinks)))
! 		SHMQueueDelete(&(MyProc->waitLSNLinks));
  	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
  	LWLockRelease(SyncRepLock);
  }
***************
*** 317,326 **** SyncRepCancelWait(void)
  void
  SyncRepCleanupAtProcExit(void)
  {
! 	if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
  	{
  		LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
! 		SHMQueueDelete(&(MyProc->syncRepLinks));
  		LWLockRelease(SyncRepLock);
  	}
  }
--- 326,335 ----
  void
  SyncRepCleanupAtProcExit(void)
  {
! 	if (!SHMQueueIsDetached(&(MyProc->waitLSNLinks)))
  	{
  		LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
! 		SHMQueueDelete(&(MyProc->waitLSNLinks));
  		LWLockRelease(SyncRepLock);
  	}
  }
***************
*** 388,393 **** SyncRepReleaseWaiters(void)
--- 397,403 ----
  	 * change pg_stat_get_wal_senders().
  	 */
  	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+ 	Assert(!LWLockHeldByMe(GroupCommitLock));
  
  	for (i = 0; i < max_wal_senders; i++)
  	{
***************
*** 522,531 **** SyncRepWakeQueue(bool all)
  	int			numprocs = 0;
  
  	Assert(SyncRepQueueIsOrderedByLSN());
  
  	proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
  								   &(WalSndCtl->SyncRepQueue),
! 								   offsetof(PGPROC, syncRepLinks));
  
  	while (proc)
  	{
--- 532,542 ----
  	int			numprocs = 0;
  
  	Assert(SyncRepQueueIsOrderedByLSN());
+ 	Assert(!LWLockHeldByMe(GroupCommitLock));
  
  	proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
  								   &(WalSndCtl->SyncRepQueue),
! 								   offsetof(PGPROC, waitLSNLinks));
  
  	while (proc)
  	{
***************
*** 541,548 **** SyncRepWakeQueue(bool all)
  		 */
  		thisproc = proc;
  		proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
! 									   &(proc->syncRepLinks),
! 									   offsetof(PGPROC, syncRepLinks));
  
  		/*
  		 * Set state to complete; see SyncRepWaitForLSN() for discussion of
--- 552,559 ----
  		 */
  		thisproc = proc;
  		proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
! 									   &(proc->waitLSNLinks),
! 									   offsetof(PGPROC, waitLSNLinks));
  
  		/*
  		 * Set state to complete; see SyncRepWaitForLSN() for discussion of
***************
*** 553,559 **** SyncRepWakeQueue(bool all)
  		/*
  		 * Remove thisproc from queue.
  		 */
! 		SHMQueueDelete(&(thisproc->syncRepLinks));
  
  		/*
  		 * Wake only when we have set state and removed from queue.
--- 564,570 ----
  		/*
  		 * Remove thisproc from queue.
  		 */
! 		SHMQueueDelete(&(thisproc->waitLSNLinks));
  
  		/*
  		 * Wake only when we have set state and removed from queue.
***************
*** 604,611 **** SyncRepUpdateSyncStandbysDefined(void)
  }
  
  #ifdef USE_ASSERT_CHECKING
! static bool
! SyncRepQueueIsOrderedByLSN(void)
  {
  	PGPROC	   *proc = NULL;
  	XLogRecPtr	lastLSN;
--- 615,622 ----
  }
  
  #ifdef USE_ASSERT_CHECKING
! bool
! BatchCommitQueueIsOrderedByLSN(SHM_QUEUE *waitQueue)
  {
  	PGPROC	   *proc = NULL;
  	XLogRecPtr	lastLSN;
***************
*** 613,621 **** SyncRepQueueIsOrderedByLSN(void)
  	lastLSN.xlogid = 0;
  	lastLSN.xrecoff = 0;
  
! 	proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
! 								   &(WalSndCtl->SyncRepQueue),
! 								   offsetof(PGPROC, syncRepLinks));
  
  	while (proc)
  	{
--- 624,632 ----
  	lastLSN.xlogid = 0;
  	lastLSN.xrecoff = 0;
  
! 	proc = (PGPROC *) SHMQueueNext(waitQueue,
! 								   waitQueue,
! 								   offsetof(PGPROC, waitLSNLinks));
  
  	while (proc)
  	{
***************
*** 628,636 **** SyncRepQueueIsOrderedByLSN(void)
  
  		lastLSN = proc->waitLSN;
  
! 		proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
! 									   &(proc->syncRepLinks),
! 									   offsetof(PGPROC, syncRepLinks));
  	}
  
  	return true;
--- 639,647 ----
  
  		lastLSN = proc->waitLSN;
  
! 		proc = (PGPROC *) SHMQueueNext(waitQueue,
! 									   &(proc->waitLSNLinks),
! 									   offsetof(PGPROC, waitLSNLinks));
  	}
  
  	return true;
*** a/src/backend/storage/ipc/ipci.c
--- b/src/backend/storage/ipc/ipci.c
***************
*** 126,131 **** CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
--- 126,132 ----
  		size = add_size(size, BTreeShmemSize());
  		size = add_size(size, SyncScanShmemSize());
  		size = add_size(size, AsyncShmemSize());
+ 		size = add_size(size, GroupCommitShmemSize());
  #ifdef EXEC_BACKEND
  		size = add_size(size, ShmemBackendArraySize());
  #endif
***************
*** 190,195 **** CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
--- 191,197 ----
  	 * Set up xlog, clog, and buffers
  	 */
  	XLOGShmemInit();
+ 	GroupCommitShmemInit();
  	CLOGShmemInit();
  	SUBTRANSShmemInit();
  	MultiXactShmemInit();
*** a/src/backend/storage/lmgr/proc.c
--- b/src/backend/storage/lmgr/proc.c
***************
*** 35,40 ****
--- 35,41 ----
  #include <unistd.h>
  #include <sys/time.h>
  
+ #include "access/groupcommit.h"
  #include "access/transam.h"
  #include "access/twophase.h"
  #include "access/xact.h"
***************
*** 377,387 **** InitProcess(void)
  #endif
  	MyProc->recoveryConflictPending = false;
  
! 	/* Initialize fields for sync rep */
  	MyProc->waitLSN.xlogid = 0;
  	MyProc->waitLSN.xrecoff = 0;
  	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
! 	SHMQueueElemInit(&(MyProc->syncRepLinks));
  
  	/*
  	 * Acquire ownership of the PGPROC's latch, so that we can use WaitLatch.
--- 378,389 ----
  #endif
  	MyProc->recoveryConflictPending = false;
  
! 	/* Initialize fields for waiting on WAL LSNs */
  	MyProc->waitLSN.xlogid = 0;
  	MyProc->waitLSN.xrecoff = 0;
  	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
! 	MyProc->grpCommitState = GROUP_COMMIT_NOT_WAITING;
! 	SHMQueueElemInit(&(MyProc->waitLSNLinks));
  
  	/*
  	 * Acquire ownership of the PGPROC's latch, so that we can use WaitLatch.
***************
*** 739,746 **** ProcKill(int code, Datum arg)
  
  	Assert(MyProc != NULL);
  
! 	/* Make sure we're out of the sync rep lists */
  	SyncRepCleanupAtProcExit();
  
  #ifdef USE_ASSERT_CHECKING
  	if (assert_enabled)
--- 741,749 ----
  
  	Assert(MyProc != NULL);
  
! 	/* Make sure we're out of the wait for LSN lists */
  	SyncRepCleanupAtProcExit();
+ 	GroupCommitCleanupAtProcExit();
  
  #ifdef USE_ASSERT_CHECKING
  	if (assert_enabled)
***************
*** 776,781 **** ProcKill(int code, Datum arg)
--- 779,786 ----
  		MyProc->links.next = (SHM_QUEUE *) procglobal->freeProcs;
  		procglobal->freeProcs = MyProc;
  	}
+ 	/* Invalidate group commit state */
+ 	MyProc->grpCommitState = GROUP_COMMIT_NOT_WAITING;
  
  	/* PGPROC struct isn't mine anymore */
  	MyProc = NULL;
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 125,132 ****
  
  /* XXX these should appear in other modules' header files */
  extern bool Log_disconnections;
- extern int	CommitDelay;
- extern int	CommitSiblings;
  extern char *default_tablespace;
  extern char *temp_tablespaces;
  extern bool synchronize_seqscans;
--- 125,130 ----
***************
*** 2019,2046 **** static struct config_int ConfigureNamesInt[] =
  	},
  
  	{
- 		{"commit_delay", PGC_USERSET, WAL_SETTINGS,
- 			gettext_noop("Sets the delay in microseconds between transaction commit and "
- 						 "flushing WAL to disk."),
- 			NULL
- 		},
- 		&CommitDelay,
- 		0, 0, 100000,
- 		NULL, NULL, NULL
- 	},
- 
- 	{
- 		{"commit_siblings", PGC_USERSET, WAL_SETTINGS,
- 			gettext_noop("Sets the minimum concurrent open transactions before performing "
- 						 "commit_delay."),
- 			NULL
- 		},
- 		&CommitSiblings,
- 		5, 0, 1000,
- 		NULL, NULL, NULL
- 	},
- 
- 	{
  		{"extra_float_digits", PGC_USERSET, CLIENT_CONN_LOCALE,
  			gettext_noop("Sets the number of digits displayed for floating-point values."),
  			gettext_noop("This affects real, double precision, and geometric data types. "
--- 2017,2022 ----
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 171,179 ****
  					# (change requires restart)
  #wal_writer_delay = 200ms		# 1-10000 milliseconds
  
- #commit_delay = 0			# range 0-100000, in microseconds
- #commit_siblings = 5			# range 1-1000
- 
  # - Checkpoints -
  
  #checkpoint_segments = 3		# in logfile segments, min 1, 16MB each
--- 171,176 ----
*** /dev/null
--- b/src/include/access/groupcommit.h
***************
*** 0 ****
--- 1,42 ----
+ /*-------------------------------------------------------------------------
+  *
+  * groupcommit.h
+  *	  Group commit interface
+  *
+  *	  Exports from access/transam/groupcommit.c.
+  *
+  * Copyright (c) 2010-2011, PostgreSQL Global Development Group
+  *
+  * IDENTIFICATION
+  *		src/include/access/groupcommit.h
+  *
+  *-------------------------------------------------------------------------
+  */
+ #ifndef _GROUPCOMMIT_H
+ #define _GROUPCOMMIT_H
+ 
+ #include "access/xlogdefs.h"
+ #include "replication/syncrep.h"
+ 
+ typedef enum GroupCommitState
+ {
+ 	GROUP_COMMIT_NOT_WAITING,
+ 	GROUP_COMMIT_WAITING,
+ 	GROUP_COMMIT_WAIT_COMPLETE
+ } GroupCommitState;
+ 
+ extern PGDLLIMPORT bool UseGroupCommit;
+ 
+ /* function prototypes */
+ extern Size GroupCommitShmemSize(void);
+ extern void GroupCommitShmemInit(void);
+ 
+ /* called by user backends */
+ extern void GroupCommitWaitForLSN(XLogRecPtr XactCommitLSN);
+ /* called at backend exit */
+ extern void GroupCommitCleanupAtProcExit(void);
+ 
+ /* called by the driver */
+ extern void GroupCommitReleaseWaiters(XLogRecPtr flushLSN);
+ 
+ #endif   /* _GROUPCOMMIT_H */
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 265,270 **** extern CheckpointStatsData CheckpointStats;
--- 265,271 ----
  
  extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
  extern void XLogFlush(XLogRecPtr RecPtr);
+ extern XLogRecPtr DoXLogFlush(XLogRecPtr RecPtr);
  extern void XLogBackgroundFlush(void);
  extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
  extern int XLogFileInit(uint32 log, uint32 seg,
*** a/src/include/replication/syncrep.h
--- b/src/include/replication/syncrep.h
***************
*** 14,19 ****
--- 14,20 ----
  #define _SYNCREP_H
  
  #include "utils/guc.h"
+ #include "storage/shmem.h"
  
  /* syncRepState */
  #define SYNC_REP_NOT_WAITING		0
***************
*** 41,44 **** extern int	SyncRepWakeQueue(bool all);
--- 42,52 ----
  
  extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
  
+ /* Batch commit infrastructure, used by sync rep and group commit */
+ extern void BatchCommitQueueInsert(SHM_QUEUE *waitQueue);
+ 
+ #ifdef USE_ASSERT_CHECKING
+ extern bool BatchCommitQueueIsOrderedByLSN(SHM_QUEUE *waitQueue);
+ #endif
+ 
  #endif   /* _SYNCREP_H */
*** a/src/include/storage/latch.h
--- b/src/include/storage/latch.h
***************
*** 25,31 ****
   * and must be initialized at postmaster startup by InitSharedLatch. Before
   * a shared latch can be waited on, it must be associated with a process
   * with OwnLatch. Only the process owning the latch can wait on it, but any
!  * process can set it.
   *
   * There are three basic operations on a latch:
   *
--- 25,36 ----
   * and must be initialized at postmaster startup by InitSharedLatch. Before
   * a shared latch can be waited on, it must be associated with a process
   * with OwnLatch. Only the process owning the latch can wait on it, but any
!  * process can set it. Note that the use of the process latch (which is a field
!  * in PGPROC) is generally preferred to using an ad-hoc shared latch, as generic
!  * signal handlers will call SetLatch on the process latch. Signals have
!  * the potential to invalidate the Latch timeout on certain platforms, resulting
!  * in a denial-of-service, so it is important to verify that all signal handlers
!  * within the process call SetLatch().
   *
   * There are three basic operations on a latch:
   *
*** a/src/include/storage/lwlock.h
--- b/src/include/storage/lwlock.h
***************
*** 79,84 **** typedef enum LWLockId
--- 79,85 ----
  	SerializablePredicateLockListLock,
  	OldSerXidLock,
  	SyncRepLock,
+ 	GroupCommitLock,
  	/* Individual lock IDs end here */
  	FirstBufMappingLock,
  	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
*** a/src/include/storage/proc.h
--- b/src/include/storage/proc.h
***************
*** 119,126 **** struct PGPROC
  	 * syncRepLinks used only while holding SyncRepLock.
  	 */
  	XLogRecPtr	waitLSN;		/* waiting for this LSN or higher */
  	int			syncRepState;	/* wait state for sync rep */
! 	SHM_QUEUE	syncRepLinks;	/* list link if process is in syncrep queue */
  
  	/*
  	 * All PROCLOCK objects for locks held or awaited by this backend are
--- 119,127 ----
  	 * syncRepLinks used only while holding SyncRepLock.
  	 */
  	XLogRecPtr	waitLSN;		/* waiting for this LSN or higher */
+ 	int			grpCommitState;	/* wait state for group commit */
  	int			syncRepState;	/* wait state for sync rep */
! 	SHM_QUEUE	waitLSNLinks;	/* list link if process is in WAL queue */
  
  	/*
  	 * All PROCLOCK objects for locks held or awaited by this backend are
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to