On Thu, 2009-10-01 at 18:47 +0300, Heikki Linnakangas wrote:

> And if you could please review the changes I've been doing, just to
> make sure I haven't inadvertently introduced new bugs. That has
> happened before, as you've rightfully reminded me :-).

You posted 17 patches here.

I've reviewed/applied patches 1,3,4,5,6,7,9,10,13,14,15.
That leaves me with some form of issue on 2, 5, 8, 11, 12, 16 and 17.
Sounds a lot, but out of 41 patches in total to date, I have as yet
unresolved issues with 9.

Patch 0017 has significant changes to it, so I'm posting patches here
for further discussion. Main line thought is that I agree with the
changes you wanted to make and I've added a few extra things.

Commit message from repo:
Apply 0017-Revert-changes-to-subtrans.c-and-slru.c.-Instead-cal.patch
but with heavy modifications to fix a number of bugs and make associated
changes. First, StartupSubtrans() positioned itself at oldestXid, so
that when later running transactions complete they could find no page
for them to update and crash. Second, ExtendClog() expected to be able
to write WAL during recovery and so crashed after 32768 xids. This patch
also extends the patch to cover the recently added support for starting
Hot Standby from a shutdown checkpoint, which causes some refactoring.
Various comments reworded, including allowing a lock overflow to cause a
PENDING state, just as we do with subxid overflow. Another bug was also
found, in that failing to make subtrans entries from the initial
snapshot could lead to later abort records hanging because the topxid
was not set. Code is now similar in all code paths. Sounds like a lot of
changes, but mostly subtle changes rather than lengthy ones.

It seems highly likely that you'll find an error in my changes to your
changes also, but they do pass initial testing.

-- 
 Simon Riggs           www.2ndQuadrant.com
*** a/src/backend/access/transam/clog.c
--- b/src/backend/access/transam/clog.c
***************
*** 575,581 **** ExtendCLOG(TransactionId newestXact)
  	LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
  
  	/* Zero the page and make an XLOG entry about it */
! 	ZeroCLOGPage(pageno, true);
  
  	LWLockRelease(CLogControlLock);
  }
--- 575,581 ----
  	LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
  
  	/* Zero the page and make an XLOG entry about it */
! 	ZeroCLOGPage(pageno, !InRecovery);
  
  	LWLockRelease(CLogControlLock);
  }
*** a/src/backend/access/transam/slru.c
--- b/src/backend/access/transam/slru.c
***************
*** 598,605 **** SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
  	 * commands to set the commit status of transactions whose bits are in
  	 * already-truncated segments of the commit log (see notes in
  	 * SlruPhysicalWritePage).	Hence, if we are InRecovery, allow the case
! 	 * where the file doesn't exist, and return zeroes instead. We also
! 	 * return a zeroed page when seek and read fails.
  	 */
  	fd = BasicOpenFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
  	if (fd < 0)
--- 598,604 ----
  	 * commands to set the commit status of transactions whose bits are in
  	 * already-truncated segments of the commit log (see notes in
  	 * SlruPhysicalWritePage).	Hence, if we are InRecovery, allow the case
! 	 * where the file doesn't exist, and return zeroes instead.
  	 */
  	fd = BasicOpenFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
  	if (fd < 0)
***************
*** 620,633 **** SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
  
  	if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
  	{
- 		if (InRecovery)
- 		{
- 			ereport(LOG,
- 					(errmsg("file \"%s\" doesn't exist, reading as zeroes",
- 							path)));
- 			MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
- 			return true;
- 		}
  		slru_errcause = SLRU_SEEK_FAILED;
  		slru_errno = errno;
  		close(fd);
--- 619,624 ----
***************
*** 637,650 **** SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
  	errno = 0;
  	if (read(fd, shared->page_buffer[slotno], BLCKSZ) != BLCKSZ)
  	{
- 		if (InRecovery)
- 		{
- 			ereport(LOG,
- 					(errmsg("file \"%s\" doesn't exist, reading as zeroes",
- 							path)));
- 			MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
- 			return true;
- 		}
  		slru_errcause = SLRU_READ_FAILED;
  		slru_errno = errno;
  		close(fd);
--- 628,633 ----
*** a/src/backend/access/transam/subtrans.c
--- b/src/backend/access/transam/subtrans.c
***************
*** 31,37 ****
  #include "access/slru.h"
  #include "access/subtrans.h"
  #include "access/transam.h"
- #include "miscadmin.h"
  #include "pg_trace.h"
  #include "utils/snapmgr.h"
  
--- 31,36 ----
***************
*** 45,52 ****
   * 0xFFFFFFFF/SUBTRANS_XACTS_PER_PAGE, and segment numbering at
   * 0xFFFFFFFF/SUBTRANS_XACTS_PER_PAGE/SLRU_SEGMENTS_PER_PAGE.  We need take no
   * explicit notice of that fact in this module, except when comparing segment
!  * and page numbers in TruncateSUBTRANS (see SubTransPagePrecedes)
!  * and in recovery when we do ExtendSUBTRANS.
   */
  
  /* We need four bytes per xact */
--- 44,50 ----
   * 0xFFFFFFFF/SUBTRANS_XACTS_PER_PAGE, and segment numbering at
   * 0xFFFFFFFF/SUBTRANS_XACTS_PER_PAGE/SLRU_SEGMENTS_PER_PAGE.  We need take no
   * explicit notice of that fact in this module, except when comparing segment
!  * and page numbers in TruncateSUBTRANS (see SubTransPagePrecedes).
   */
  
  /* We need four bytes per xact */
***************
*** 85,96 **** SubTransSetParent(TransactionId xid, TransactionId parent)
  	ptr = (TransactionId *) SubTransCtl->shared->page_buffer[slotno];
  	ptr += entryno;
  
! 	/*
! 	 * Current state should be 0, except in recovery where we may
! 	 * need to reset the value multiple times
! 	 */
! 	Assert(*ptr == InvalidTransactionId ||
! 			(InRecovery && *ptr == parent));
  
  	*ptr = parent;
  
--- 83,90 ----
  	ptr = (TransactionId *) SubTransCtl->shared->page_buffer[slotno];
  	ptr += entryno;
  
! 	/* Current state should be 0 */
! 	Assert(*ptr == InvalidTransactionId);
  
  	*ptr = parent;
  
***************
*** 229,247 **** ZeroSUBTRANSPage(int pageno)
  /*
   * This must be called ONCE during postmaster or standalone-backend startup,
   * after StartupXLOG has initialized ShmemVariableCache->nextXid.
   */
  void
  StartupSUBTRANS(TransactionId oldestActiveXID)
  {
! 	TransactionId xid = ShmemVariableCache->nextXid;
! 	int			pageno = TransactionIdToPage(xid);
! 
! 	LWLockAcquire(SubtransControlLock, LW_EXCLUSIVE);
  
  	/*
! 	 * Initialize our idea of the latest page number.
  	 */
! 	SubTransCtl->shared->latest_page_number = pageno;
  
  	LWLockRelease(SubtransControlLock);
  }
--- 223,255 ----
  /*
   * This must be called ONCE during postmaster or standalone-backend startup,
   * after StartupXLOG has initialized ShmemVariableCache->nextXid.
+  *
+  * oldestActiveXID is the oldest XID of any prepared transaction, or nextXid
+  * if there are none.
   */
  void
  StartupSUBTRANS(TransactionId oldestActiveXID)
  {
! 	int			startPage;
! 	int			endPage;
  
  	/*
! 	 * Since we don't expect pg_subtrans to be valid across crashes, we
! 	 * initialize the currently-active page(s) to zeroes during startup.
! 	 * Whenever we advance into a new page, ExtendSUBTRANS will likewise zero
! 	 * the new page without regard to whatever was previously on disk.
  	 */
! 	LWLockAcquire(SubtransControlLock, LW_EXCLUSIVE);
! 
! 	startPage = TransactionIdToPage(oldestActiveXID);
! 	endPage = TransactionIdToPage(ShmemVariableCache->nextXid);
! 
! 	while (startPage != endPage)
! 	{
! 		(void) ZeroSUBTRANSPage(startPage);
! 		startPage++;
! 	}
! 	(void) ZeroSUBTRANSPage(startPage);
  
  	LWLockRelease(SubtransControlLock);
  }
***************
*** 294,335 **** void
  ExtendSUBTRANS(TransactionId newestXact)
  {
  	int			pageno;
- 	static int last_pageno = 0;
  
! 	Assert(TransactionIdIsNormal(newestXact));
  
! 	if (!InRecovery)
! 	{
! 		/*
! 		 * No work except at first XID of a page.  But beware: just after
! 		 * wraparound, the first XID of page zero is FirstNormalTransactionId.
! 		 */
! 		if (TransactionIdToEntry(newestXact) != 0 &&
! 			!TransactionIdEquals(newestXact, FirstNormalTransactionId))
! 			return;
! 
! 		pageno = TransactionIdToPage(newestXact);
! 	}
! 	else
! 	{
! 		/*
! 		 * InRecovery we keep track of the last page we extended, so
! 		 * we can compare that against incoming XIDs. This will only
! 		 * ever be run by startup process, so keep it as a static variable
! 		 * rather than hiding behind the SubtransControlLock.
! 		 */
! 		pageno = TransactionIdToPage(newestXact);
! 
! 		if (pageno == last_pageno ||
! 			SubTransPagePrecedes(pageno, last_pageno))
! 			return;
! 
! 		ereport(trace_recovery(DEBUG1),
! 				(errmsg("extend subtrans  xid %u page %d last_page %d",
! 						newestXact, pageno, last_pageno)));
! 
! 		last_pageno = pageno;
! 	}
  
  	LWLockAcquire(SubtransControlLock, LW_EXCLUSIVE);
  
--- 302,317 ----
  ExtendSUBTRANS(TransactionId newestXact)
  {
  	int			pageno;
  
! 	/*
! 	 * No work except at first XID of a page.  But beware: just after
! 	 * wraparound, the first XID of page zero is FirstNormalTransactionId.
! 	 */
! 	if (TransactionIdToEntry(newestXact) != 0 &&
! 		!TransactionIdEquals(newestXact, FirstNormalTransactionId))
! 		return;
  
! 	pageno = TransactionIdToPage(newestXact);
  
  	LWLockAcquire(SubtransControlLock, LW_EXCLUSIVE);
  
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 5994,5999 **** StartupXLOG(void)
--- 5994,6000 ----
  	uint32		freespace;
  	TransactionId oldestActiveXID;
  	bool		bgwriterLaunched = false;
+ 	bool		backendsAllowed = false;
  
  	/*
  	 * Read control file and check XLOG status looks valid.
***************
*** 6319,6325 **** StartupXLOG(void)
  			bool		recoveryContinue = true;
  			bool		recoveryApply = true;
  			bool		reachedMinRecoveryPoint = false;
- 			bool		backendsAllowed = false;
  			ErrorContextCallback errcontext;
  
  			/* use volatile pointer to prevent code rearrangement */
--- 6320,6325 ----
***************
*** 6689,6699 **** StartupXLOG(void)
  	ShmemVariableCache->latestCompletedXid = ShmemVariableCache->nextXid;
  	TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
  
! 	/* Start up the commit log and related stuff, too */
! 	/* XXXHS: perhaps this should go after XactClearRecoveryTransactions */
! 	StartupCLOG();
! 	StartupSUBTRANS(oldestActiveXID);
! 	StartupMultiXact();
  
  	/* Reload shared-memory state for prepared transactions */
  	RecoverPreparedTransactions();
--- 6689,6701 ----
  	ShmemVariableCache->latestCompletedXid = ShmemVariableCache->nextXid;
  	TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
  
! 	/* Start up the commit log and related stuff, too, if not done already */
! 	if (!backendsAllowed)
! 	{
! 		StartupCLOG();
! 		StartupSUBTRANS(oldestActiveXID);
! 		StartupMultiXact();
! 	}
  
  	/* Reload shared-memory state for prepared transactions */
  	RecoverPreparedTransactions();
*** a/src/backend/storage/ipc/procarray.c
--- b/src/backend/storage/ipc/procarray.c
***************
*** 45,50 ****
--- 45,52 ----
  
  #include <signal.h>
  
+ #include "access/clog.h"
+ #include "access/multixact.h"
  #include "access/subtrans.h"
  #include "access/transam.h"
  #include "access/xlog.h"
***************
*** 129,134 **** static void DisplayXidCache(void);
--- 131,139 ----
  #define xc_slow_answer_inc()		((void) 0)
  #endif   /* XIDCACHE_DEBUG */
  
+ static void RecoverySnapshotStateMachine(int newstate,
+ 							 TransactionId oldestXid, TransactionId latestXid);
+ 
  /* Primitives for KnownAssignedXids array handling for standby */
  static Size KnownAssignedXidsShmemSize(int size);
  static void KnownAssignedXidsInit(int size);
***************
*** 470,493 **** ProcArrayApplyRecoveryInfo(XLogRecPtr lsn, xl_xact_running_xacts *xlrec)
  	{
  		if (TransactionIdPrecedes(recoverySnapshotPendingXmin,
  								  xlrec->oldestRunningXid))
! 		{
! 			recoverySnapshotState = RECOVERY_SNAPSHOT_READY;
! 			elog(trace_recovery(DEBUG2), 
! 					"running xact data now proven complete");
! 			elog(trace_recovery(DEBUG2), 
! 					"recovery snapshots are now enabled");
! 		}
! 		return;
! 	}
! 
! 	/*
! 	 * Can't initialise with an incomplete set of lock information.
! 	 * XXX: Can't we go into pending state like with overflowed subxids?
! 	 */
! 	if (xlrec->lock_overflow)
! 	{
! 		elog(trace_recovery(DEBUG2), 
! 				"running xact data has incomplete lock data");
  		return;
  	}
  
--- 475,483 ----
  	{
  		if (TransactionIdPrecedes(recoverySnapshotPendingXmin,
  								  xlrec->oldestRunningXid))
! 			RecoverySnapshotStateMachine(RECOVERY_SNAPSHOT_READY,
! 										 xlrec->oldestRunningXid,
! 										 xlrec->latestRunningXid);
  		return;
  	}
  
***************
*** 499,514 **** ProcArrayApplyRecoveryInfo(XLogRecPtr lsn, xl_xact_running_xacts *xlrec)
  	/*
  	 * If the snapshot overflowed, then we still initialise with what we
  	 * know, but the recovery snapshot isn't fully valid yet because we
! 	 * know there are some subxids missing (ergo we don't know which ones)
  	 */
! 	if (!xlrec->subxid_overflow)
! 		recoverySnapshotState = RECOVERY_SNAPSHOT_READY;
  	else
! 	{
! 		recoverySnapshotState = RECOVERY_SNAPSHOT_PENDING;
! 		ereport(LOG, 
! 				(errmsg("consistent state delayed because recovery snapshot incomplete")));
! 	}
  
  	xids = palloc(sizeof(TransactionId) * (xlrec->xcnt + xlrec->subxcnt));
  	nxids = 0;
--- 489,506 ----
  	/*
  	 * If the snapshot overflowed, then we still initialise with what we
  	 * know, but the recovery snapshot isn't fully valid yet because we
! 	 * know we have information missing. We either have missing subxids
! 	 * or missing locks, doesn't really matter which but which track each
! 	 * separately to help with debugging.
  	 */
! 	if (xlrec->subxid_overflow || xlrec->lock_overflow)
! 		RecoverySnapshotStateMachine(RECOVERY_SNAPSHOT_PENDING,
! 										 xlrec->oldestRunningXid,
! 										 xlrec->latestRunningXid);
  	else
! 		RecoverySnapshotStateMachine(RECOVERY_SNAPSHOT_READY,
! 										 xlrec->oldestRunningXid,
! 										 xlrec->latestRunningXid);
  
  	xids = palloc(sizeof(TransactionId) * (xlrec->xcnt + xlrec->subxcnt));
  	nxids = 0;
***************
*** 522,532 **** ProcArrayApplyRecoveryInfo(XLogRecPtr lsn, xl_xact_running_xacts *xlrec)
  
  	/*
  	 * Scan through the incoming array of RunningXacts and collect xids.
! 	 * We don't use SubtransSetParent because it doesn't matter yet. If
! 	 * we aren't overflowed then all xids will fit in snapshot and so we
! 	 * don't need subtrans. If we later overflow, an xid assignment record
! 	 * will add xids to subtrans. If RunningXacts is overflowed then we
! 	 * don't have enough information to correctly update subtrans anyway.	
  	 */
  	for (xid_index = 0; xid_index < xlrec->xcnt; xid_index++)
  	{
--- 514,522 ----
  
  	/*
  	 * Scan through the incoming array of RunningXacts and collect xids.
! 	 * We mark SubtransSetParent, just as we would in other cases. That 
! 	 * is OK because we performed StartupSubtrans() when we changed state,
! 	 * above.
  	 */
  	for (xid_index = 0; xid_index < xlrec->xcnt; xid_index++)
  	{
***************
*** 537,544 **** ProcArrayApplyRecoveryInfo(XLogRecPtr lsn, xl_xact_running_xacts *xlrec)
  
  		xids[nxids++] = xid;
  		for(i = 0; i < rxact[xid_index].nsubxids; i++)
! 			xids[nxids++] = subxip[rxact[xid_index].subx_offset + i];
! 
  	}
  
  	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
--- 527,537 ----
  
  		xids[nxids++] = xid;
  		for(i = 0; i < rxact[xid_index].nsubxids; i++)
! 		{
! 			TransactionId subxid = subxip[rxact[xid_index].subx_offset + i];
! 			xids[nxids++] = subxid;
! 			SubTransSetParent(subxid, xid);
! 		}
  	}
  
  	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
***************
*** 565,575 **** ProcArrayApplyRecoveryInfo(XLogRecPtr lsn, xl_xact_running_xacts *xlrec)
  	loggableLocks = (xl_rel_lock *) &(xlrec->xrun[(xlrec->xcnt + xlrec->subxcnt)]);
  	relation_redo_locks(loggableLocks, xlrec->numLocks);
  
! 	elog(trace_recovery(DEBUG2), 
! 		"running transaction data initialized");
! 	if (recoverySnapshotState == RECOVERY_SNAPSHOT_READY)
! 		elog(trace_recovery(DEBUG2), 
! 			"recovery snapshots are now enabled");
  }
  
  /*
--- 558,631 ----
  	loggableLocks = (xl_rel_lock *) &(xlrec->xrun[(xlrec->xcnt + xlrec->subxcnt)]);
  	relation_redo_locks(loggableLocks, xlrec->numLocks);
  
! 	/* nextXid must be beyond any observed xid */
! 	if (TransactionIdFollowsOrEquals(latestObservedXid,
! 									 ShmemVariableCache->nextXid))
! 	{
! 		ShmemVariableCache->nextXid = latestObservedXid;
! 		TransactionIdAdvance(ShmemVariableCache->nextXid);
! 	}
! }
! 
! static void
! RecoverySnapshotStateMachine(int newstate, 
! 							 TransactionId oldestXid, TransactionId latestXid)
! {
! 	TransactionId xid = oldestXid;
! 	Assert(newstate > recoverySnapshotState);
! 
! 	switch (recoverySnapshotState)
! 	{
! 		case RECOVERY_SNAPSHOT_UNINITIALIZED:
! 
! 				ereport(trace_recovery(DEBUG2), 
! 						(errmsg("running transaction data initialized")));
! 
! 				/* Startup commit log and other stuff */
! 				StartupCLOG();
! 				StartupSUBTRANS(oldestXid);
! 				StartupMultiXact();
! 
! 				TransactionIdAdvance(xid);
! 				while (TransactionIdPrecedesOrEquals(xid, latestXid))
! 				{
! 					/*
! 					 * Extend clog and subtrans like we do in 
! 					 * GetNewTransactionId() during normal operation.
! 					 */
! 					ExtendCLOG(xid);
! 					ExtendSUBTRANS(xid);
! 
! 					TransactionIdAdvance(xid);
! 				}
! 
! 				if (newstate == RECOVERY_SNAPSHOT_READY)
! 					ereport(trace_recovery(DEBUG1), 
! 							(errmsg("recovery snapshots are now enabled")));
! 				else if (newstate == RECOVERY_SNAPSHOT_PENDING)
! 			 		ereport(LOG, 
! 							(errmsg("consistent state delayed because "
! 									"recovery snapshot incomplete")));
! 				break;
! 
! 		case RECOVERY_SNAPSHOT_PENDING:
! 
! 				if (newstate == RECOVERY_SNAPSHOT_READY)
! 				{
! 					ereport(trace_recovery(DEBUG2), 
! 							(errmsg("running xact data now proven complete")));
! 					ereport(trace_recovery(DEBUG1), 
! 							(errmsg("recovery snapshots are now enabled")));
! 					break;
! 				}
! 
! 		case RECOVERY_SNAPSHOT_READY:
! 		default:
! 				elog(ERROR, "invalid value for recoverySnapshotState");
! 				break;
! 	}
! 	
! 	recoverySnapshotState = newstate;
  }
  
  /*
***************
*** 582,598 **** ProcArrayInitRecoveryInfo(void)
  {
  	Assert(InHotStandby);
  
! 	recoverySnapshotState = RECOVERY_SNAPSHOT_READY;
  
  	/* also initialize latestCompletedXid, to nextXid - 1 */
  	ShmemVariableCache->latestCompletedXid = ShmemVariableCache->nextXid;
  	TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
  	latestObservedXid = ShmemVariableCache->latestCompletedXid;
- 
- 	elog(trace_recovery(DEBUG2), 
- 		"running transaction data initialized");
- 	elog(trace_recovery(DEBUG2), 
- 		"recovery snapshots are now enabled");
  }
  
  /*
--- 638,650 ----
  {
  	Assert(InHotStandby);
  
! 	RecoverySnapshotStateMachine(RECOVERY_SNAPSHOT_READY, 
! 								 ShmemVariableCache->nextXid, InvalidTransactionId);
  
  	/* also initialize latestCompletedXid, to nextXid - 1 */
  	ShmemVariableCache->latestCompletedXid = ShmemVariableCache->nextXid;
  	TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
  	latestObservedXid = ShmemVariableCache->latestCompletedXid;
  }
  
  /*
***************
*** 2311,2316 **** RecordKnownAssignedTransactionIds(TransactionId xid)
--- 2363,2376 ----
  						(errmsg("recording unobserved xid %u (latestObservedXid %u)",
  		 							next_expected_xid, latestObservedXid)));
  			KnownAssignedXidsAdd(&next_expected_xid, 1);
+ 
+ 			/*
+ 			 * Extend clog and subtrans like we do in GetNewTransactionId()
+ 			 * during normal operation.
+ 			 */
+ 			ExtendCLOG(next_expected_xid);
+ 			ExtendSUBTRANS(next_expected_xid);
+ 
  			TransactionIdAdvance(next_expected_xid);
  		}
  
***************
*** 2318,2323 **** RecordKnownAssignedTransactionIds(TransactionId xid)
--- 2378,2391 ----
  
  		latestObservedXid = xid;
  	}
+ 
+ 	/* nextXid must be beyond any observed xid */
+ 	if (TransactionIdFollowsOrEquals(latestObservedXid,
+ 									 ShmemVariableCache->nextXid))
+ 	{
+ 		ShmemVariableCache->nextXid = latestObservedXid;
+ 		TransactionIdAdvance(ShmemVariableCache->nextXid);
+ 	}
  }
  
  void
diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c
index c6c27a7..9d97527 100644
--- a/src/backend/access/transam/clog.c
+++ b/src/backend/access/transam/clog.c
@@ -575,7 +575,7 @@ ExtendCLOG(TransactionId newestXact)
 	LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
 
 	/* Zero the page and make an XLOG entry about it */
-	ZeroCLOGPage(pageno, true);
+	ZeroCLOGPage(pageno, !InRecovery);
 
 	LWLockRelease(CLogControlLock);
 }
diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c
index 3f89087..68e3869 100644
--- a/src/backend/access/transam/slru.c
+++ b/src/backend/access/transam/slru.c
@@ -598,8 +598,7 @@ SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
 	 * commands to set the commit status of transactions whose bits are in
 	 * already-truncated segments of the commit log (see notes in
 	 * SlruPhysicalWritePage).	Hence, if we are InRecovery, allow the case
-	 * where the file doesn't exist, and return zeroes instead. We also
-	 * return a zeroed page when seek and read fails.
+	 * where the file doesn't exist, and return zeroes instead.
 	 */
 	fd = BasicOpenFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
 	if (fd < 0)
@@ -620,14 +619,6 @@ SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
 
 	if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
 	{
-		if (InRecovery)
-		{
-			ereport(LOG,
-					(errmsg("file \"%s\" doesn't exist, reading as zeroes",
-							path)));
-			MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
-			return true;
-		}
 		slru_errcause = SLRU_SEEK_FAILED;
 		slru_errno = errno;
 		close(fd);
@@ -637,14 +628,6 @@ SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
 	errno = 0;
 	if (read(fd, shared->page_buffer[slotno], BLCKSZ) != BLCKSZ)
 	{
-		if (InRecovery)
-		{
-			ereport(LOG,
-					(errmsg("file \"%s\" doesn't exist, reading as zeroes",
-							path)));
-			MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
-			return true;
-		}
 		slru_errcause = SLRU_READ_FAILED;
 		slru_errno = errno;
 		close(fd);
diff --git a/src/backend/access/transam/subtrans.c b/src/backend/access/transam/subtrans.c
index e9b3fbc..0dbd216 100644
--- a/src/backend/access/transam/subtrans.c
+++ b/src/backend/access/transam/subtrans.c
@@ -31,7 +31,6 @@
 #include "access/slru.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
-#include "miscadmin.h"
 #include "pg_trace.h"
 #include "utils/snapmgr.h"
 
@@ -45,8 +44,7 @@
  * 0xFFFFFFFF/SUBTRANS_XACTS_PER_PAGE, and segment numbering at
  * 0xFFFFFFFF/SUBTRANS_XACTS_PER_PAGE/SLRU_SEGMENTS_PER_PAGE.  We need take no
  * explicit notice of that fact in this module, except when comparing segment
- * and page numbers in TruncateSUBTRANS (see SubTransPagePrecedes)
- * and in recovery when we do ExtendSUBTRANS.
+ * and page numbers in TruncateSUBTRANS (see SubTransPagePrecedes).
  */
 
 /* We need four bytes per xact */
@@ -85,12 +83,8 @@ SubTransSetParent(TransactionId xid, TransactionId parent)
 	ptr = (TransactionId *) SubTransCtl->shared->page_buffer[slotno];
 	ptr += entryno;
 
-	/*
-	 * Current state should be 0, except in recovery where we may
-	 * need to reset the value multiple times
-	 */
-	Assert(*ptr == InvalidTransactionId ||
-			(InRecovery && *ptr == parent));
+	/* Current state should be 0 */
+	Assert(*ptr == InvalidTransactionId);
 
 	*ptr = parent;
 
@@ -229,19 +223,33 @@ ZeroSUBTRANSPage(int pageno)
 /*
  * This must be called ONCE during postmaster or standalone-backend startup,
  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
+ *
+ * oldestActiveXID is the oldest XID of any prepared transaction, or nextXid
+ * if there are none.
  */
 void
 StartupSUBTRANS(TransactionId oldestActiveXID)
 {
-	TransactionId xid = ShmemVariableCache->nextXid;
-	int			pageno = TransactionIdToPage(xid);
-
-	LWLockAcquire(SubtransControlLock, LW_EXCLUSIVE);
+	int			startPage;
+	int			endPage;
 
 	/*
-	 * Initialize our idea of the latest page number.
+	 * Since we don't expect pg_subtrans to be valid across crashes, we
+	 * initialize the currently-active page(s) to zeroes during startup.
+	 * Whenever we advance into a new page, ExtendSUBTRANS will likewise zero
+	 * the new page without regard to whatever was previously on disk.
 	 */
-	SubTransCtl->shared->latest_page_number = pageno;
+	LWLockAcquire(SubtransControlLock, LW_EXCLUSIVE);
+
+	startPage = TransactionIdToPage(oldestActiveXID);
+	endPage = TransactionIdToPage(ShmemVariableCache->nextXid);
+
+	while (startPage != endPage)
+	{
+		(void) ZeroSUBTRANSPage(startPage);
+		startPage++;
+	}
+	(void) ZeroSUBTRANSPage(startPage);
 
 	LWLockRelease(SubtransControlLock);
 }
@@ -294,42 +302,16 @@ void
 ExtendSUBTRANS(TransactionId newestXact)
 {
 	int			pageno;
-	static int last_pageno = 0;
 
-	Assert(TransactionIdIsNormal(newestXact));
+	/*
+	 * No work except at first XID of a page.  But beware: just after
+	 * wraparound, the first XID of page zero is FirstNormalTransactionId.
+	 */
+	if (TransactionIdToEntry(newestXact) != 0 &&
+		!TransactionIdEquals(newestXact, FirstNormalTransactionId))
+		return;
 
-	if (!InRecovery)
-	{
-		/*
-		 * No work except at first XID of a page.  But beware: just after
-		 * wraparound, the first XID of page zero is FirstNormalTransactionId.
-		 */
-		if (TransactionIdToEntry(newestXact) != 0 &&
-			!TransactionIdEquals(newestXact, FirstNormalTransactionId))
-			return;
-
-		pageno = TransactionIdToPage(newestXact);
-	}
-	else
-	{
-		/*
-		 * InRecovery we keep track of the last page we extended, so
-		 * we can compare that against incoming XIDs. This will only
-		 * ever be run by startup process, so keep it as a static variable
-		 * rather than hiding behind the SubtransControlLock.
-		 */
-		pageno = TransactionIdToPage(newestXact);
-
-		if (pageno == last_pageno ||
-			SubTransPagePrecedes(pageno, last_pageno))
-			return;
-
-		ereport(trace_recovery(DEBUG1),
-				(errmsg("extend subtrans  xid %u page %d last_page %d",
-						newestXact, pageno, last_pageno)));
-
-		last_pageno = pageno;
-	}
+	pageno = TransactionIdToPage(newestXact);
 
 	LWLockAcquire(SubtransControlLock, LW_EXCLUSIVE);
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 0613001..932e57f 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5994,6 +5994,7 @@ StartupXLOG(void)
 	uint32		freespace;
 	TransactionId oldestActiveXID;
 	bool		bgwriterLaunched = false;
+	bool		backendsAllowed = false;
 
 	/*
 	 * Read control file and check XLOG status looks valid.
@@ -6319,7 +6320,6 @@ StartupXLOG(void)
 			bool		recoveryContinue = true;
 			bool		recoveryApply = true;
 			bool		reachedMinRecoveryPoint = false;
-			bool		backendsAllowed = false;
 			ErrorContextCallback errcontext;
 
 			/* use volatile pointer to prevent code rearrangement */
@@ -6689,11 +6689,13 @@ StartupXLOG(void)
 	ShmemVariableCache->latestCompletedXid = ShmemVariableCache->nextXid;
 	TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
 
-	/* Start up the commit log and related stuff, too */
-	/* XXXHS: perhaps this should go after XactClearRecoveryTransactions */
-	StartupCLOG();
-	StartupSUBTRANS(oldestActiveXID);
-	StartupMultiXact();
+	/* Start up the commit log and related stuff, too, if not done already */
+	if (!backendsAllowed)
+	{
+		StartupCLOG();
+		StartupSUBTRANS(oldestActiveXID);
+		StartupMultiXact();
+	}
 
 	/* Reload shared-memory state for prepared transactions */
 	RecoverPreparedTransactions();
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 7d1f42c..f6f50be 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -45,6 +45,8 @@
 
 #include <signal.h>
 
+#include "access/clog.h"
+#include "access/multixact.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/xlog.h"
@@ -129,6 +131,9 @@ static void DisplayXidCache(void);
 #define xc_slow_answer_inc()		((void) 0)
 #endif   /* XIDCACHE_DEBUG */
 
+static void RecoverySnapshotStateMachine(int newstate,
+							 TransactionId oldestXid, TransactionId latestXid);
+
 /* Primitives for KnownAssignedXids array handling for standby */
 static Size KnownAssignedXidsShmemSize(int size);
 static void KnownAssignedXidsInit(int size);
@@ -470,24 +475,9 @@ ProcArrayApplyRecoveryInfo(XLogRecPtr lsn, xl_xact_running_xacts *xlrec)
 	{
 		if (TransactionIdPrecedes(recoverySnapshotPendingXmin,
 								  xlrec->oldestRunningXid))
-		{
-			recoverySnapshotState = RECOVERY_SNAPSHOT_READY;
-			elog(trace_recovery(DEBUG2), 
-					"running xact data now proven complete");
-			elog(trace_recovery(DEBUG2), 
-					"recovery snapshots are now enabled");
-		}
-		return;
-	}
-
-	/*
-	 * Can't initialise with an incomplete set of lock information.
-	 * XXX: Can't we go into pending state like with overflowed subxids?
-	 */
-	if (xlrec->lock_overflow)
-	{
-		elog(trace_recovery(DEBUG2), 
-				"running xact data has incomplete lock data");
+			RecoverySnapshotStateMachine(RECOVERY_SNAPSHOT_READY,
+										 xlrec->oldestRunningXid,
+										 xlrec->latestRunningXid);
 		return;
 	}
 
@@ -499,16 +489,18 @@ ProcArrayApplyRecoveryInfo(XLogRecPtr lsn, xl_xact_running_xacts *xlrec)
 	/*
 	 * If the snapshot overflowed, then we still initialise with what we
 	 * know, but the recovery snapshot isn't fully valid yet because we
-	 * know there are some subxids missing (ergo we don't know which ones)
+	 * know we have information missing. We either have missing subxids
+	 * or missing locks, doesn't really matter which but which track each
+	 * separately to help with debugging.
 	 */
-	if (!xlrec->subxid_overflow)
-		recoverySnapshotState = RECOVERY_SNAPSHOT_READY;
+	if (xlrec->subxid_overflow || xlrec->lock_overflow)
+		RecoverySnapshotStateMachine(RECOVERY_SNAPSHOT_PENDING,
+										 xlrec->oldestRunningXid,
+										 xlrec->latestRunningXid);
 	else
-	{
-		recoverySnapshotState = RECOVERY_SNAPSHOT_PENDING;
-		ereport(LOG, 
-				(errmsg("consistent state delayed because recovery snapshot incomplete")));
-	}
+		RecoverySnapshotStateMachine(RECOVERY_SNAPSHOT_READY,
+										 xlrec->oldestRunningXid,
+										 xlrec->latestRunningXid);
 
 	xids = palloc(sizeof(TransactionId) * (xlrec->xcnt + xlrec->subxcnt));
 	nxids = 0;
@@ -522,11 +514,9 @@ ProcArrayApplyRecoveryInfo(XLogRecPtr lsn, xl_xact_running_xacts *xlrec)
 
 	/*
 	 * Scan through the incoming array of RunningXacts and collect xids.
-	 * We don't use SubtransSetParent because it doesn't matter yet. If
-	 * we aren't overflowed then all xids will fit in snapshot and so we
-	 * don't need subtrans. If we later overflow, an xid assignment record
-	 * will add xids to subtrans. If RunningXacts is overflowed then we
-	 * don't have enough information to correctly update subtrans anyway.	
+	 * We mark SubtransSetParent, just as we would in other cases. That 
+	 * is OK because we performed StartupSubtrans() when we changed state,
+	 * above.
 	 */
 	for (xid_index = 0; xid_index < xlrec->xcnt; xid_index++)
 	{
@@ -537,8 +527,11 @@ ProcArrayApplyRecoveryInfo(XLogRecPtr lsn, xl_xact_running_xacts *xlrec)
 
 		xids[nxids++] = xid;
 		for(i = 0; i < rxact[xid_index].nsubxids; i++)
-			xids[nxids++] = subxip[rxact[xid_index].subx_offset + i];
-
+		{
+			TransactionId subxid = subxip[rxact[xid_index].subx_offset + i];
+			xids[nxids++] = subxid;
+			SubTransSetParent(subxid, xid);
+		}
 	}
 
 	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
@@ -565,11 +558,74 @@ ProcArrayApplyRecoveryInfo(XLogRecPtr lsn, xl_xact_running_xacts *xlrec)
 	loggableLocks = (xl_rel_lock *) &(xlrec->xrun[(xlrec->xcnt + xlrec->subxcnt)]);
 	relation_redo_locks(loggableLocks, xlrec->numLocks);
 
-	elog(trace_recovery(DEBUG2), 
-		"running transaction data initialized");
-	if (recoverySnapshotState == RECOVERY_SNAPSHOT_READY)
-		elog(trace_recovery(DEBUG2), 
-			"recovery snapshots are now enabled");
+	/* nextXid must be beyond any observed xid */
+	if (TransactionIdFollowsOrEquals(latestObservedXid,
+									 ShmemVariableCache->nextXid))
+	{
+		ShmemVariableCache->nextXid = latestObservedXid;
+		TransactionIdAdvance(ShmemVariableCache->nextXid);
+	}
+}
+
+static void
+RecoverySnapshotStateMachine(int newstate, 
+							 TransactionId oldestXid, TransactionId latestXid)
+{
+	TransactionId xid = oldestXid;
+	Assert(newstate > recoverySnapshotState);
+
+	switch (recoverySnapshotState)
+	{
+		case RECOVERY_SNAPSHOT_UNINITIALIZED:
+
+				ereport(trace_recovery(DEBUG2), 
+						(errmsg("running transaction data initialized")));
+
+				/* Startup commit log and other stuff */
+				StartupCLOG();
+				StartupSUBTRANS(oldestXid);
+				StartupMultiXact();
+
+				TransactionIdAdvance(xid);
+				while (TransactionIdPrecedesOrEquals(xid, latestXid))
+				{
+					/*
+					 * Extend clog and subtrans like we do in 
+					 * GetNewTransactionId() during normal operation.
+					 */
+					ExtendCLOG(xid);
+					ExtendSUBTRANS(xid);
+
+					TransactionIdAdvance(xid);
+				}
+
+				if (newstate == RECOVERY_SNAPSHOT_READY)
+					ereport(trace_recovery(DEBUG1), 
+							(errmsg("recovery snapshots are now enabled")));
+				else if (newstate == RECOVERY_SNAPSHOT_PENDING)
+			 		ereport(LOG, 
+							(errmsg("consistent state delayed because "
+									"recovery snapshot incomplete")));
+				break;
+
+		case RECOVERY_SNAPSHOT_PENDING:
+
+				if (newstate == RECOVERY_SNAPSHOT_READY)
+				{
+					ereport(trace_recovery(DEBUG2), 
+							(errmsg("running xact data now proven complete")));
+					ereport(trace_recovery(DEBUG1), 
+							(errmsg("recovery snapshots are now enabled")));
+					break;
+				}
+
+		case RECOVERY_SNAPSHOT_READY:
+		default:
+				elog(ERROR, "invalid value for recoverySnapshotState");
+				break;
+	}
+	
+	recoverySnapshotState = newstate;
 }
 
 /*
@@ -582,17 +638,13 @@ ProcArrayInitRecoveryInfo(void)
 {
 	Assert(InHotStandby);
 
-	recoverySnapshotState = RECOVERY_SNAPSHOT_READY;
+	RecoverySnapshotStateMachine(RECOVERY_SNAPSHOT_READY, 
+								 ShmemVariableCache->nextXid, InvalidTransactionId);
 
 	/* also initialize latestCompletedXid, to nextXid - 1 */
 	ShmemVariableCache->latestCompletedXid = ShmemVariableCache->nextXid;
 	TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
 	latestObservedXid = ShmemVariableCache->latestCompletedXid;
-
-	elog(trace_recovery(DEBUG2), 
-		"running transaction data initialized");
-	elog(trace_recovery(DEBUG2), 
-		"recovery snapshots are now enabled");
 }
 
 /*
@@ -2311,6 +2363,14 @@ RecordKnownAssignedTransactionIds(TransactionId xid)
 						(errmsg("recording unobserved xid %u (latestObservedXid %u)",
 		 							next_expected_xid, latestObservedXid)));
 			KnownAssignedXidsAdd(&next_expected_xid, 1);
+
+			/*
+			 * Extend clog and subtrans like we do in GetNewTransactionId()
+			 * during normal operation.
+			 */
+			ExtendCLOG(next_expected_xid);
+			ExtendSUBTRANS(next_expected_xid);
+
 			TransactionIdAdvance(next_expected_xid);
 		}
 
@@ -2318,6 +2378,14 @@ RecordKnownAssignedTransactionIds(TransactionId xid)
 
 		latestObservedXid = xid;
 	}
+
+	/* nextXid must be beyond any observed xid */
+	if (TransactionIdFollowsOrEquals(latestObservedXid,
+									 ShmemVariableCache->nextXid))
+	{
+		ShmemVariableCache->nextXid = latestObservedXid;
+		TransactionIdAdvance(ShmemVariableCache->nextXid);
+	}
 }
 
 void
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to