Simon Riggs wrote:

> OK, spent long time testing various batching scenarios for this using a
> custom test harness to simulate various spreads of xids in transaction
> trees. All looks fine now.

I had a look and was mostly rephrasing some comments and the README
(hopefully I didn't make any of them worse than they were), when I
noticed that the code to iterate thru pages could be refactored.  I
think the change makes the algorithm in TransactionIdSetTreeStatus
easier to follow.

I also noticed that TransactionIdSetPageStatus has a "subcommit" arg
which is unexplained.  I sort-of understand the point, but I think it's
better that you fill in the explanation in the header comment (marked
with XXX)

I hope I didn't break the code with the new function
set_tree_status_by_pages -- please recheck that part.

I didn't test this beyond regression tests.

-- 
Alvaro Herrera                                http://www.CommandPrompt.com/
PostgreSQL Replication, Consulting, Custom Development, 24x7 support
Index: src/backend/access/transam/README
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/access/transam/README,v
retrieving revision 1.11
diff -c -p -r1.11 README
*** src/backend/access/transam/README	21 Mar 2008 13:23:28 -0000	1.11
--- src/backend/access/transam/README	5 Oct 2008 18:33:40 -0000
*************** from disk.  They also allow information 
*** 341,351 ****
  pg_clog records the commit status for each transaction that has been assigned
  an XID.  A transaction can be in progress, committed, aborted, or
  "sub-committed".  This last state means that it's a subtransaction that's no
! longer running, but its parent has not updated its state yet (either it is
! still running, or the backend crashed without updating its status).  A
! sub-committed transaction's status will be updated again to the final value as
! soon as the parent commits or aborts, or when the parent is detected to be
! aborted.
  
  Savepoints are implemented using subtransactions.  A subtransaction is a
  transaction inside a transaction; its commit or abort status is not only
--- 341,360 ----
  pg_clog records the commit status for each transaction that has been assigned
  an XID.  A transaction can be in progress, committed, aborted, or
  "sub-committed".  This last state means that it's a subtransaction that's no
! longer running, but its parent has not updated its state yet.  It is not
! necessary to update a subtransaction's transaction status to subcommit, so we
! can just defer it until main transaction commit.  The main role of marking
! transactions as sub-committed is to provide an atomic commit protocol when
! transaction status is spread across multiple clog pages. As a result, whenever
! transaction status spreads across multiple pages we must use a two-phase commit
! protocol: the first phase is to mark the subtransactions as sub-committed, then
! we mark the top level transaction and all its subtransactions committed (in
! that order).  Thus, subtransactions that have not aborted appear as in-progress
! even when they have already finished, and the subcommit status appears as a
! very short transitoty state during main transaction commit.  Subtransaction
! abort is always marked in clog as soon as it occurs.  When the transaction
! status all fit in a single CLOG page, we atomically mark them all as committed
! without bothering with the intermediate sub-commit state.
  
  Savepoints are implemented using subtransactions.  A subtransaction is a
  transaction inside a transaction; its commit or abort status is not only
Index: src/backend/access/transam/clog.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/access/transam/clog.c,v
retrieving revision 1.47
diff -c -p -r1.47 clog.c
*** src/backend/access/transam/clog.c	1 Aug 2008 13:16:08 -0000	1.47
--- src/backend/access/transam/clog.c	5 Oct 2008 18:36:12 -0000
*************** static int	ZeroCLOGPage(int pageno, bool
*** 80,107 ****
  static bool CLOGPagePrecedes(int page1, int page2);
  static void WriteZeroPageXlogRec(int pageno);
  static void WriteTruncateXlogRec(int pageno);
  
  
  /*
!  * Record the final state of a transaction in the commit log.
   *
   * lsn must be the WAL location of the commit record when recording an async
   * commit.	For a synchronous commit it can be InvalidXLogRecPtr, since the
   * caller guarantees the commit record is already flushed in that case.  It
   * should be InvalidXLogRecPtr for abort cases, too.
   *
   * NB: this is a low-level routine and is NOT the preferred entry point
!  * for most uses; TransactionLogUpdate() in transam.c is the intended caller.
   */
  void
! TransactionIdSetStatus(TransactionId xid, XidStatus status, XLogRecPtr lsn)
  {
- 	int			pageno = TransactionIdToPage(xid);
- 	int			byteno = TransactionIdToByte(xid);
- 	int			bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
  	int			slotno;
! 	char	   *byteptr;
! 	char		byteval;
  
  	Assert(status == TRANSACTION_STATUS_COMMITTED ||
  		   status == TRANSACTION_STATUS_ABORTED ||
--- 80,249 ----
  static bool CLOGPagePrecedes(int page1, int page2);
  static void WriteZeroPageXlogRec(int pageno);
  static void WriteTruncateXlogRec(int pageno);
+ static void TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
+ 					   	   TransactionId *subxids, XidStatus status,
+ 						   XLogRecPtr lsn, int pageno, bool subcommit);
+ static void TransactionIdSetStatusBit(TransactionId xid, XidStatus status,
+ 						  XLogRecPtr lsn, int slotno);
+ static void set_tree_status_by_pages(int nsubxids, TransactionId *subxids,
+ 						 XidStatus status, XLogRecPtr lsn, bool subcommit);
  
  
  /*
!  * TransactionIdSetTreeStatus
!  *
!  * Record the final state of transaction entries in the commit log for
!  * a transaction and its subtransaction tree. Take care to ensure this is
!  * efficient, and as atomic as possible.
!  *
!  * xid is a single xid to set status for. This will typically be
!  * the top level transactionid for a top level commit or abort. It can
!  * also be a subtransaction when we record transaction aborts.
!  *
!  * subxids is an array of xids of length nsubxids, representing subtransactions
!  * in the tree of xid. In various cases nsubxids may be zero.
   *
   * lsn must be the WAL location of the commit record when recording an async
   * commit.	For a synchronous commit it can be InvalidXLogRecPtr, since the
   * caller guarantees the commit record is already flushed in that case.  It
   * should be InvalidXLogRecPtr for abort cases, too.
   *
+  * In the commit case, atomicity is limited by whether all the subxids are in
+  * the same CLOG page as xid.  If they all are, then the lock will be grabbed
+  * only once, and the status will be set to committed directly.  Otherwise
+  * we must
+  *   1. set sub-committed all subxids that are not on the same page as the
+  *      main xid
+  *   2. atomically set committed the main xid and the subxids on the same page
+  *   3. go over the first bunch again and set them committed
+  * Note that as far as concurrent checkers are concerned, main transaction
+  * commit as a whole is still atomic.
+  *
   * NB: this is a low-level routine and is NOT the preferred entry point
!  * for most uses; functions in transam.c are the intended callers.
!  *
!  * XXX Think about issuing FADVISE_WILLNEED on pages that we will need,
!  * but aren't yet in cache, as well as hinting pages not to fall out of
!  * cache yet.
   */
  void
! TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
! 				TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
! {
! 	int		pageno = TransactionIdToPage(xid); /* get page of parent */
! 	int 	i;
! 
! 	Assert(status == TRANSACTION_STATUS_COMMITTED ||
! 		   status == TRANSACTION_STATUS_ABORTED);
! 
! 	/*
! 	 * See how many subxids, if any, are on the same page as the parent, if any.
! 	 */
! 	for (i = 0; i < nsubxids; i++)
! 	{
! 		if (TransactionIdToPage(subxids[i]) != pageno)
! 			break;
! 	}
! 
! 	/*
! 	 * Do all items fit on a single page?
! 	 */
! 	if (i == nsubxids)
! 	{
! 		/*
! 		 * Set the parent and all subtransactions in a single call
! 		 */
! 		TransactionIdSetPageStatus(xid, nsubxids, subxids, status, lsn,
! 								   pageno, true);
! 	}
! 	else
! 	{
! 		int		num_on_first_page = i;
! 
! 		/*
! 		 * If this is a commit then we care about doing this correctly (i.e.
! 		 * using the subcommitted intermediate status).  By here, we know we're
! 		 * updating more than one page of clog, so we must mark entries that
! 		 * are *not* on the first page so that they show as subcommitted before
! 		 * we then return to update the status to fully committed.
! 		 *
! 		 * To avoid touching the first page twice, skip marking subcommitted
! 		 * those subxids in that first page.
! 		 */
! 		if (status == TRANSACTION_STATUS_COMMITTED)
! 			set_tree_status_by_pages(nsubxids - num_on_first_page,
! 									 subxids + num_on_first_page,
! 									 TRANSACTION_STATUS_SUB_COMMITTED, lsn, true);
! 
! 		/*
! 		 * Now set the parent and subtransactions on same page as the parent,
! 		 * if any
! 		 */
! 		pageno = TransactionIdToPage(xid);
! 		TransactionIdSetPageStatus(xid, num_on_first_page, subxids, status,
! 								   lsn, pageno, true);
! 
! 		/*
! 		 * By now, all subtransactions have been subcommitted, so all calls
! 		 * to TransactionIdSetPageStatus() will use subcommit=false after
! 		 * this point for this transaction tree.
! 		 */
! 
! 		/*
! 		 * Now work through the rest of the subxids one clog page at a time,
! 		 * starting from the second page onwards, like we did above.
! 		 */
! 		set_tree_status_by_pages(nsubxids - num_on_first_page,
! 								 subxids + num_on_first_page,
! 								 status, lsn, false);
! 	}
! }
! 
! /*
!  * Helper for TransactionIdSetTreeStatus: set the status for a bunch of
!  * transactions, chunking in the separate CLOG pages involved.
!  */
! static void
! set_tree_status_by_pages(int nsubxids, TransactionId *subxids,
! 						 XidStatus status, XLogRecPtr lsn, bool subcommit)
! {
! 	int		pageno = TransactionIdToPage(subxids[0]);
! 	int		offset = 0;
! 	int		i = 0;
! 
! 	while (i < nsubxids)
! 	{
! 		int		num_on_page = 0;
! 
! 		while (TransactionIdToPage(subxids[i]) == pageno && i < nsubxids)
! 		{
! 			num_on_page++;
! 			i++;
! 		}
! 
! 		TransactionIdSetPageStatus(InvalidTransactionId,
! 								   num_on_page, subxids + offset,
! 								   status, lsn, pageno, subcommit);
! 		offset = i;
! 		pageno = TransactionIdToPage(subxids[offset]);
! 	}
! }
! 
! /*
!  * Record the final state of transaction entries in the commit log for
!  * all entries on a single page.  Atomic only on this page.
!  *
!  * Otherwise API is same as TransactionIdSetTreeStatus()
!  *
!  * XXX describe what "subcommit" is for.
!  */
! static void
! TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
! 						   TransactionId *subxids, XidStatus status,
! 						   XLogRecPtr lsn, int pageno, bool subcommit)
  {
  	int			slotno;
! 	int 		i;
  
  	Assert(status == TRANSACTION_STATUS_COMMITTED ||
  		   status == TRANSACTION_STATUS_ABORTED ||
*************** TransactionIdSetStatus(TransactionId xid
*** 116,124 ****
  	 * mustn't let it reach disk until we've done the appropriate WAL flush.
  	 * But when lsn is invalid, it's OK to scribble on a page while it is
  	 * write-busy, since we don't care if the update reaches disk sooner than
! 	 * we think.  Hence, pass write_ok = XLogRecPtrIsInvalid(lsn).
  	 */
  	slotno = SimpleLruReadPage(ClogCtl, pageno, XLogRecPtrIsInvalid(lsn), xid);
  	byteptr = ClogCtl->shared->page_buffer[slotno] + byteno;
  
  	/* Current state should be 0, subcommitted or target state */
--- 258,317 ----
  	 * mustn't let it reach disk until we've done the appropriate WAL flush.
  	 * But when lsn is invalid, it's OK to scribble on a page while it is
  	 * write-busy, since we don't care if the update reaches disk sooner than
! 	 * we think.
  	 */
  	slotno = SimpleLruReadPage(ClogCtl, pageno, XLogRecPtrIsInvalid(lsn), xid);
+ 
+ 	/*
+ 	 * If we sync-commit more than one xid on this page while it is being
+ 	 * written out, we might find that some of the bits go to disk and others
+ 	 * don't.  That would break atomicity, so if we haven't already
+ 	 * subcommitted the xids for this commit, we do that first and then come
+ 	 * back to start marking commits.
+ 	 *
+ 	 * If using async commit then we already waited for the write I/O to
+ 	 * complete by this point, so nothing to do.
+ 	 */
+ 	if (subcommit && status == TRANSACTION_STATUS_COMMITTED &&
+ 		XLogRecPtrIsInvalid(lsn))
+ 	{
+ 		for (i = 0; i < nsubxids; i++)
+ 		{
+ 			Assert(ClogCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i]));
+ 			TransactionIdSetStatusBit(subxids[i],
+ 									  TRANSACTION_STATUS_SUB_COMMITTED, lsn, slotno);
+ 		}
+ 	}
+ 
+ 	/* Set the main transaction id, if any */
+ 	if (TransactionIdIsValid(xid))
+ 		TransactionIdSetStatusBit(xid, status, lsn, slotno);
+ 
+ 	/* Set the subtransactions */
+ 	for (i = 0; i < nsubxids; i++)
+ 	{
+ 		Assert(ClogCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i]));
+ 		TransactionIdSetStatusBit(subxids[i], status, lsn, slotno);
+ 	}
+ 
+ 	ClogCtl->shared->page_dirty[slotno] = true;
+ 
+ 	LWLockRelease(CLogControlLock);
+ }
+ 
+ /*
+  * Sets the commit status of a single transaction.
+  *
+  * Must be called with CLogControlLock held
+  */
+ static void
+ TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, int slotno)
+ {
+ 	int			byteno = TransactionIdToByte(xid);
+ 	int			bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
+ 	char	   *byteptr;
+ 	char		byteval;
+ 
  	byteptr = ClogCtl->shared->page_buffer[slotno] + byteno;
  
  	/* Current state should be 0, subcommitted or target state */
*************** TransactionIdSetStatus(TransactionId xid
*** 132,139 ****
  	byteval |= (status << bshift);
  	*byteptr = byteval;
  
- 	ClogCtl->shared->page_dirty[slotno] = true;
- 
  	/*
  	 * Update the group LSN if the transaction completion LSN is higher.
  	 *
--- 325,330 ----
*************** TransactionIdSetStatus(TransactionId xid
*** 150,156 ****
  			ClogCtl->shared->group_lsn[lsnindex] = lsn;
  	}
  
- 	LWLockRelease(CLogControlLock);
  }
  
  /*
--- 341,346 ----
Index: src/backend/access/transam/transam.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/access/transam/transam.c,v
retrieving revision 1.76
diff -c -p -r1.76 transam.c
*** src/backend/access/transam/transam.c	26 Mar 2008 18:48:59 -0000	1.76
--- src/backend/access/transam/transam.c	5 Oct 2008 18:46:01 -0000
*************** static const XLogRecPtr InvalidXLogRecPt
*** 40,54 ****
  
  /* Local functions */
  static XidStatus TransactionLogFetch(TransactionId transactionId);
- static void TransactionLogUpdate(TransactionId transactionId,
- 					 XidStatus status, XLogRecPtr lsn);
  
  
  /* ----------------------------------------------------------------
   *		Postgres log access method interface
   *
   *		TransactionLogFetch
-  *		TransactionLogUpdate
   * ----------------------------------------------------------------
   */
  
--- 40,51 ----
*************** TransactionLogFetch(TransactionId transa
*** 100,140 ****
  	return xidstatus;
  }
  
- /*
-  *		TransactionLogUpdate
-  *
-  * Store the new status of a transaction.  The commit record LSN must be
-  * passed when recording an async commit; else it should be InvalidXLogRecPtr.
-  */
- static inline void
- TransactionLogUpdate(TransactionId transactionId,
- 					 XidStatus status, XLogRecPtr lsn)
- {
- 	/*
- 	 * update the commit log
- 	 */
- 	TransactionIdSetStatus(transactionId, status, lsn);
- }
- 
- /*
-  * TransactionLogMultiUpdate
-  *
-  * Update multiple transaction identifiers to a given status.
-  * Don't depend on this being atomic; it's not.
-  */
- static inline void
- TransactionLogMultiUpdate(int nxids, TransactionId *xids,
- 						  XidStatus status, XLogRecPtr lsn)
- {
- 	int			i;
- 
- 	Assert(nxids != 0);
- 
- 	for (i = 0; i < nxids; i++)
- 		TransactionIdSetStatus(xids[i], status, lsn);
- }
- 
- 
  /* ----------------------------------------------------------------
   *						Interface functions
   *
--- 97,102 ----
*************** TransactionLogMultiUpdate(int nxids, Tra
*** 144,154 ****
   *		   these functions test the transaction status of
   *		   a specified transaction id.
   *
!  *		TransactionIdCommit
!  *		TransactionIdAbort
   *		========
!  *		   these functions set the transaction status
!  *		   of the specified xid.
   *
   * See also TransactionIdIsInProgress, which once was in this module
   * but now lives in procarray.c.
--- 106,117 ----
   *		   these functions test the transaction status of
   *		   a specified transaction id.
   *
!  *		TransactionIdCommitTree
!  *		TransactionIdAsyncCommitTree
!  *		TransactionIdAbortTree
   *		========
!  *		   these functions set the transaction status of the specified
!  *		   transaction tree.
   *
   * See also TransactionIdIsInProgress, which once was in this module
   * but now lives in procarray.c.
*************** TransactionIdIsKnownCompleted(Transactio
*** 287,362 ****
  	return false;
  }
  
- 
- /*
-  * TransactionIdCommit
-  *		Commits the transaction associated with the identifier.
-  *
-  * Note:
-  *		Assumes transaction identifier is valid.
-  */
- void
- TransactionIdCommit(TransactionId transactionId)
- {
- 	TransactionLogUpdate(transactionId, TRANSACTION_STATUS_COMMITTED,
- 						 InvalidXLogRecPtr);
- }
- 
- /*
-  * TransactionIdAsyncCommit
-  *		Same as above, but for async commits.  The commit record LSN is needed.
-  */
- void
- TransactionIdAsyncCommit(TransactionId transactionId, XLogRecPtr lsn)
- {
- 	TransactionLogUpdate(transactionId, TRANSACTION_STATUS_COMMITTED, lsn);
- }
- 
- /*
-  * TransactionIdAbort
-  *		Aborts the transaction associated with the identifier.
-  *
-  * Note:
-  *		Assumes transaction identifier is valid.
-  *		No async version of this is needed.
-  */
- void
- TransactionIdAbort(TransactionId transactionId)
- {
- 	TransactionLogUpdate(transactionId, TRANSACTION_STATUS_ABORTED,
- 						 InvalidXLogRecPtr);
- }
- 
- /*
-  * TransactionIdSubCommit
-  *		Marks the subtransaction associated with the identifier as
-  *		sub-committed.
-  *
-  * Note:
-  *		No async version of this is needed.
-  */
- void
- TransactionIdSubCommit(TransactionId transactionId)
- {
- 	TransactionLogUpdate(transactionId, TRANSACTION_STATUS_SUB_COMMITTED,
- 						 InvalidXLogRecPtr);
- }
- 
  /*
   * TransactionIdCommitTree
!  *		Marks all the given transaction ids as committed.
   *
!  * The caller has to be sure that this is used only to mark subcommitted
!  * subtransactions as committed, and only *after* marking the toplevel
!  * parent as committed.  Otherwise there is a race condition against
!  * TransactionIdDidCommit.
   */
  void
! TransactionIdCommitTree(int nxids, TransactionId *xids)
  {
! 	if (nxids > 0)
! 		TransactionLogMultiUpdate(nxids, xids, TRANSACTION_STATUS_COMMITTED,
! 								  InvalidXLogRecPtr);
  }
  
  /*
--- 250,271 ----
  	return false;
  }
  
  /*
   * TransactionIdCommitTree
!  *		Marks the given transaction and children as committed
   *
!  * "xid" is a toplevel transaction commit, and the xids array contains its
!  * committed subtransactions.
!  *
!  * This commit operation is not guaranteed to be atomic, but if not, subxids
!  * are correctly marked subcommit first.
   */
  void
! TransactionIdCommitTree(TransactionId xid, int nxids, TransactionId *xids)
  {
! 	return TransactionIdSetTreeStatus(xid, nxids, xids,
! 		  							  TRANSACTION_STATUS_COMMITTED,
! 									  InvalidXLogRecPtr);
  }
  
  /*
*************** TransactionIdCommitTree(int nxids, Trans
*** 364,392 ****
   *		Same as above, but for async commits.  The commit record LSN is needed.
   */
  void
! TransactionIdAsyncCommitTree(int nxids, TransactionId *xids, XLogRecPtr lsn)
  {
! 	if (nxids > 0)
! 		TransactionLogMultiUpdate(nxids, xids, TRANSACTION_STATUS_COMMITTED,
! 								  lsn);
  }
  
  /*
   * TransactionIdAbortTree
!  *		Marks all the given transaction ids as aborted.
   *
   * We don't need to worry about the non-atomic behavior, since any onlookers
   * will consider all the xacts as not-yet-committed anyway.
   */
  void
! TransactionIdAbortTree(int nxids, TransactionId *xids)
  {
! 	if (nxids > 0)
! 		TransactionLogMultiUpdate(nxids, xids, TRANSACTION_STATUS_ABORTED,
! 								  InvalidXLogRecPtr);
  }
  
- 
  /*
   * TransactionIdPrecedes --- is id1 logically < id2?
   */
--- 273,302 ----
   *		Same as above, but for async commits.  The commit record LSN is needed.
   */
  void
! TransactionIdAsyncCommitTree(TransactionId xid, int nxids, TransactionId *xids,
! 							 XLogRecPtr lsn)
  {
! 	return TransactionIdSetTreeStatus(xid, nxids, xids,
! 									  TRANSACTION_STATUS_COMMITTED, lsn);
  }
  
  /*
   * TransactionIdAbortTree
!  *		Marks the given transaction and children as aborted.
!  *
!  * "xid" is a toplevel transaction commit, and the xids array contains its
!  * committed subtransactions.
   *
   * We don't need to worry about the non-atomic behavior, since any onlookers
   * will consider all the xacts as not-yet-committed anyway.
   */
  void
! TransactionIdAbortTree(TransactionId xid, int nxids, TransactionId *xids)
  {
! 	TransactionIdSetTreeStatus(xid, nxids, xids,
! 							   TRANSACTION_STATUS_ABORTED, InvalidXLogRecPtr);
  }
  
  /*
   * TransactionIdPrecedes --- is id1 logically < id2?
   */
Index: src/backend/access/transam/twophase.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/access/transam/twophase.c,v
retrieving revision 1.45
diff -c -p -r1.45 twophase.c
*** src/backend/access/transam/twophase.c	11 Aug 2008 11:05:10 -0000	1.45
--- src/backend/access/transam/twophase.c	5 Oct 2008 18:33:40 -0000
*************** RecordTransactionCommitPrepared(Transact
*** 1745,1753 ****
  	XLogFlush(recptr);
  
  	/* Mark the transaction committed in pg_clog */
! 	TransactionIdCommit(xid);
! 	/* to avoid race conditions, the parent must commit first */
! 	TransactionIdCommitTree(nchildren, children);
  
  	/* Checkpoint can proceed now */
  	MyProc->inCommit = false;
--- 1745,1751 ----
  	XLogFlush(recptr);
  
  	/* Mark the transaction committed in pg_clog */
! 	TransactionIdCommitTree(xid, nchildren, children);
  
  	/* Checkpoint can proceed now */
  	MyProc->inCommit = false;
*************** RecordTransactionAbortPrepared(Transacti
*** 1822,1829 ****
  	 * Mark the transaction aborted in clog.  This is not absolutely necessary
  	 * but we may as well do it while we are here.
  	 */
! 	TransactionIdAbort(xid);
! 	TransactionIdAbortTree(nchildren, children);
  
  	END_CRIT_SECTION();
  }
--- 1820,1826 ----
  	 * Mark the transaction aborted in clog.  This is not absolutely necessary
  	 * but we may as well do it while we are here.
  	 */
! 	TransactionIdAbortTree(xid, nchildren, children);
  
  	END_CRIT_SECTION();
  }
Index: src/backend/access/transam/xact.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/access/transam/xact.c,v
retrieving revision 1.265
diff -c -p -r1.265 xact.c
*** src/backend/access/transam/xact.c	11 Aug 2008 11:05:10 -0000	1.265
--- src/backend/access/transam/xact.c	5 Oct 2008 18:49:21 -0000
*************** static void CommitTransaction(void);
*** 254,260 ****
  static TransactionId RecordTransactionAbort(bool isSubXact);
  static void StartTransaction(void);
  
- static void RecordSubTransactionCommit(void);
  static void StartSubTransaction(void);
  static void CommitSubTransaction(void);
  static void AbortSubTransaction(void);
--- 254,259 ----
*************** RecordTransactionCommit(void)
*** 952,962 ****
  		 * Now we may update the CLOG, if we wrote a COMMIT record above
  		 */
  		if (markXidCommitted)
! 		{
! 			TransactionIdCommit(xid);
! 			/* to avoid race conditions, the parent must commit first */
! 			TransactionIdCommitTree(nchildren, children);
! 		}
  	}
  	else
  	{
--- 951,957 ----
  		 * Now we may update the CLOG, if we wrote a COMMIT record above
  		 */
  		if (markXidCommitted)
! 			TransactionIdCommitTree(xid, nchildren, children);
  	}
  	else
  	{
*************** RecordTransactionCommit(void)
*** 974,984 ****
  		 * flushed before the CLOG may be updated.
  		 */
  		if (markXidCommitted)
! 		{
! 			TransactionIdAsyncCommit(xid, XactLastRecEnd);
! 			/* to avoid race conditions, the parent must commit first */
! 			TransactionIdAsyncCommitTree(nchildren, children, XactLastRecEnd);
! 		}
  	}
  
  	/*
--- 969,975 ----
  		 * flushed before the CLOG may be updated.
  		 */
  		if (markXidCommitted)
! 			TransactionIdAsyncCommitTree(xid, nchildren, children, XactLastRecEnd);
  	}
  
  	/*
*************** AtSubCommit_childXids(void)
*** 1156,1191 ****
  	s->maxChildXids = 0;
  }
  
- /*
-  * RecordSubTransactionCommit
-  */
- static void
- RecordSubTransactionCommit(void)
- {
- 	TransactionId xid = GetCurrentTransactionIdIfAny();
- 
- 	/*
- 	 * We do not log the subcommit in XLOG; it doesn't matter until the
- 	 * top-level transaction commits.
- 	 *
- 	 * We must mark the subtransaction subcommitted in the CLOG if it had a
- 	 * valid XID assigned.	If it did not, nobody else will ever know about
- 	 * the existence of this subxact.  We don't have to deal with deletions
- 	 * scheduled for on-commit here, since they'll be reassigned to our parent
- 	 * (who might still abort).
- 	 */
- 	if (TransactionIdIsValid(xid))
- 	{
- 		/* XXX does this really need to be a critical section? */
- 		START_CRIT_SECTION();
- 
- 		/* Record subtransaction subcommit */
- 		TransactionIdSubCommit(xid);
- 
- 		END_CRIT_SECTION();
- 	}
- }
- 
  /* ----------------------------------------------------------------
   *						AbortTransaction stuff
   * ----------------------------------------------------------------
--- 1147,1152 ----
*************** RecordTransactionAbort(bool isSubXact)
*** 1288,1301 ****
  	 * waiting for already-aborted subtransactions.  It is OK to do it without
  	 * having flushed the ABORT record to disk, because in event of a crash
  	 * we'd be assumed to have aborted anyway.
- 	 *
- 	 * The ordering here isn't critical but it seems best to mark the parent
- 	 * first.  This assures an atomic transition of all the subtransactions to
- 	 * aborted state from the point of view of concurrent
- 	 * TransactionIdDidAbort calls.
  	 */
! 	TransactionIdAbort(xid);
! 	TransactionIdAbortTree(nchildren, children);
  
  	END_CRIT_SECTION();
  
--- 1249,1256 ----
  	 * waiting for already-aborted subtransactions.  It is OK to do it without
  	 * having flushed the ABORT record to disk, because in event of a crash
  	 * we'd be assumed to have aborted anyway.
  	 */
! 	TransactionIdAbortTree(xid, nchildren, children);
  
  	END_CRIT_SECTION();
  
*************** CommitSubTransaction(void)
*** 3791,3798 ****
  	/* Must CCI to ensure commands of subtransaction are seen as done */
  	CommandCounterIncrement();
  
! 	/* Mark subtransaction as subcommitted */
! 	RecordSubTransactionCommit();
  
  	/* Post-commit cleanup */
  	if (TransactionIdIsValid(s->transactionId))
--- 3746,3757 ----
  	/* Must CCI to ensure commands of subtransaction are seen as done */
  	CommandCounterIncrement();
  
! 	/* 
! 	 * Prior to 8.4 we marked subcommit in clog at this point.
! 	 * We now only perform that step, if required, as part of the
! 	 * atomic update of the whole transaction tree at top level 
! 	 * commit or abort.
! 	 */
  
  	/* Post-commit cleanup */
  	if (TransactionIdIsValid(s->transactionId))
*************** xact_redo_commit(xl_xact_commit *xlrec, 
*** 4259,4269 ****
  	TransactionId max_xid;
  	int			i;
  
! 	TransactionIdCommit(xid);
! 
! 	/* Mark committed subtransactions as committed */
  	sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
! 	TransactionIdCommitTree(xlrec->nsubxacts, sub_xids);
  
  	/* Make sure nextXid is beyond any XID mentioned in the record */
  	max_xid = xid;
--- 4218,4226 ----
  	TransactionId max_xid;
  	int			i;
  
! 	/* Mark the transaction committed in pg_clog */
  	sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
! 	TransactionIdCommitTree(xid, xlrec->nsubxacts, sub_xids);
  
  	/* Make sure nextXid is beyond any XID mentioned in the record */
  	max_xid = xid;
*************** xact_redo_abort(xl_xact_abort *xlrec, Tr
*** 4299,4309 ****
  	TransactionId max_xid;
  	int			i;
  
! 	TransactionIdAbort(xid);
! 
! 	/* Mark subtransactions as aborted */
  	sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
! 	TransactionIdAbortTree(xlrec->nsubxacts, sub_xids);
  
  	/* Make sure nextXid is beyond any XID mentioned in the record */
  	max_xid = xid;
--- 4256,4264 ----
  	TransactionId max_xid;
  	int			i;
  
! 	/* Mark the transaction aborted in pg_clog */
  	sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
! 	TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
  
  	/* Make sure nextXid is beyond any XID mentioned in the record */
  	max_xid = xid;
Index: src/include/access/clog.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/access/clog.h,v
retrieving revision 1.21
diff -c -p -r1.21 clog.h
*** src/include/access/clog.h	1 Jan 2008 19:45:56 -0000	1.21
--- src/include/access/clog.h	5 Oct 2008 18:33:40 -0000
*************** typedef int XidStatus;
*** 32,38 ****
  #define NUM_CLOG_BUFFERS	8
  
  
! extern void TransactionIdSetStatus(TransactionId xid, XidStatus status, XLogRecPtr lsn);
  extern XidStatus TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn);
  
  extern Size CLOGShmemSize(void);
--- 32,39 ----
  #define NUM_CLOG_BUFFERS	8
  
  
! extern void TransactionIdSetTreeStatus(TransactionId xid, int nsubxids, 
! 					TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
  extern XidStatus TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn);
  
  extern Size CLOGShmemSize(void);
Index: src/include/access/transam.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/access/transam.h,v
retrieving revision 1.65
diff -c -p -r1.65 transam.h
*** src/include/access/transam.h	11 Mar 2008 20:20:35 -0000	1.65
--- src/include/access/transam.h	5 Oct 2008 18:33:40 -0000
*************** extern VariableCache ShmemVariableCache;
*** 139,151 ****
  extern bool TransactionIdDidCommit(TransactionId transactionId);
  extern bool TransactionIdDidAbort(TransactionId transactionId);
  extern bool TransactionIdIsKnownCompleted(TransactionId transactionId);
- extern void TransactionIdCommit(TransactionId transactionId);
- extern void TransactionIdAsyncCommit(TransactionId transactionId, XLogRecPtr lsn);
  extern void TransactionIdAbort(TransactionId transactionId);
! extern void TransactionIdSubCommit(TransactionId transactionId);
! extern void TransactionIdCommitTree(int nxids, TransactionId *xids);
! extern void TransactionIdAsyncCommitTree(int nxids, TransactionId *xids, XLogRecPtr lsn);
! extern void TransactionIdAbortTree(int nxids, TransactionId *xids);
  extern bool TransactionIdPrecedes(TransactionId id1, TransactionId id2);
  extern bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2);
  extern bool TransactionIdFollows(TransactionId id1, TransactionId id2);
--- 139,148 ----
  extern bool TransactionIdDidCommit(TransactionId transactionId);
  extern bool TransactionIdDidAbort(TransactionId transactionId);
  extern bool TransactionIdIsKnownCompleted(TransactionId transactionId);
  extern void TransactionIdAbort(TransactionId transactionId);
! extern void TransactionIdCommitTree(TransactionId xid, int nxids, TransactionId *xids);
! extern void TransactionIdAsyncCommitTree(TransactionId xid, int nxids, TransactionId *xids, XLogRecPtr lsn);
! extern void TransactionIdAbortTree(TransactionId xid, int nxids, TransactionId *xids);
  extern bool TransactionIdPrecedes(TransactionId id1, TransactionId id2);
  extern bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2);
  extern bool TransactionIdFollows(TransactionId id1, TransactionId id2);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to