Conflict resolution improvements are important to include in this
release, as discussed many times. Proposal given here
http://archives.postgresql.org/pgsql-hackers/2009-12/msg01175.php
presents a viable design to improve this.

Following patch is a complete working implementation of that design.
I'm still testing it, but its worth publishing as early as possible to
allow discussion. Not for commit, just yet, but soon.

standby.c changes are to decide whether to defer recovery based upon
relfilenode of WAL record. If resolution deferred, re-check for conflict
during LockAcquire() and fail with snapshot error, just as if resolution
had never been deferred. Also, an optimisation of conflict processing to
avoid continual re-evaluation of conflicts since some are now deferred.

API changes in heapam and nbtxlog to pass thru RelFileNode
API changes in indexcmds, no behaviour changes
procarray changes to implement LatestRemovedXid cache

 backend/access/heap/heapam.c    |    6 -
 backend/access/nbtree/nbtxlog.c |    2 
 backend/commands/indexcmds.c    |    4 -
 backend/storage/ipc/procarray.c |   55 +++++++++++-
 backend/storage/ipc/standby.c   |  112 +++++++++++++++++++++++------
 backend/storage/lmgr/lock.c     |  124 ++++++++++++++++++++++++++++++--
 include/storage/lock.h          |    8 ++
 include/storage/proc.h          |    5 +
 include/storage/standby.h       |    9 ++
 9 files changed, 292 insertions(+), 33 deletions(-)

-- 
 Simon Riggs           www.2ndQuadrant.com
*** a/src/backend/access/heap/heapam.c
--- b/src/backend/access/heap/heapam.c
***************
*** 4139,4145 **** heap_xlog_cleanup_info(XLogRecPtr lsn, XLogRecord *record)
  	xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record);
  
  	if (InHotStandby)
! 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid);
  
  	/*
  	 * Actual operation is a no-op. Record type exists to provide a means
--- 4139,4145 ----
  	xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record);
  
  	if (InHotStandby)
! 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node);
  
  	/*
  	 * Actual operation is a no-op. Record type exists to provide a means
***************
*** 4171,4177 **** heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
  	 * no queries running for which the removed tuples are still visible.
  	 */
  	if (InHotStandby)
! 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid);
  
  	RestoreBkpBlocks(lsn, record, true);
  
--- 4171,4177 ----
  	 * no queries running for which the removed tuples are still visible.
  	 */
  	if (InHotStandby)
! 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node);
  
  	RestoreBkpBlocks(lsn, record, true);
  
***************
*** 4241,4247 **** heap_xlog_freeze(XLogRecPtr lsn, XLogRecord *record)
  	 * consider the frozen xids as running.
  	 */
  	if (InHotStandby)
! 		ResolveRecoveryConflictWithSnapshot(cutoff_xid);
  
  	RestoreBkpBlocks(lsn, record, false);
  
--- 4241,4247 ----
  	 * consider the frozen xids as running.
  	 */
  	if (InHotStandby)
! 		ResolveRecoveryConflictWithSnapshot(cutoff_xid, xlrec->node);
  
  	RestoreBkpBlocks(lsn, record, false);
  
*** a/src/backend/access/nbtree/nbtxlog.c
--- b/src/backend/access/nbtree/nbtxlog.c
***************
*** 833,839 **** btree_redo(XLogRecPtr lsn, XLogRecord *record)
  		 * here is worth some thought and possibly some effort to
  		 * improve.
  		 */
! 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid);
  	}
  
  	/*
--- 833,839 ----
  		 * here is worth some thought and possibly some effort to
  		 * improve.
  		 */
! 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node);
  	}
  
  	/*
*** a/src/backend/commands/indexcmds.c
--- b/src/backend/commands/indexcmds.c
***************
*** 540,546 **** DefineIndex(RangeVar *heapRelation,
  	 * check for that.	Also, prepared xacts are not reported, which is fine
  	 * since they certainly aren't going to do anything more.
  	 */
! 	old_lockholders = GetLockConflicts(&heaplocktag, ShareLock);
  
  	while (VirtualTransactionIdIsValid(*old_lockholders))
  	{
--- 540,546 ----
  	 * check for that.	Also, prepared xacts are not reported, which is fine
  	 * since they certainly aren't going to do anything more.
  	 */
! 	old_lockholders = GetLockConflicts(&heaplocktag, ShareLock, InvalidTransactionId);
  
  	while (VirtualTransactionIdIsValid(*old_lockholders))
  	{
***************
*** 627,633 **** DefineIndex(RangeVar *heapRelation,
  	 * We once again wait until no transaction can have the table open with
  	 * the index marked as read-only for updates.
  	 */
! 	old_lockholders = GetLockConflicts(&heaplocktag, ShareLock);
  
  	while (VirtualTransactionIdIsValid(*old_lockholders))
  	{
--- 627,633 ----
  	 * We once again wait until no transaction can have the table open with
  	 * the index marked as read-only for updates.
  	 */
! 	old_lockholders = GetLockConflicts(&heaplocktag, ShareLock, InvalidTransactionId);
  
  	while (VirtualTransactionIdIsValid(*old_lockholders))
  	{
*** a/src/backend/storage/ipc/procarray.c
--- b/src/backend/storage/ipc/procarray.c
***************
*** 66,71 **** typedef struct ProcArrayStruct
--- 66,72 ----
  
  	int			numKnownAssignedXids;	/* current number of known assigned xids */
  	int			maxKnownAssignedXids;	/* allocated size of known assigned xids */
+ 
  	/*
  	 * Highest subxid that overflowed KnownAssignedXids array. Similar to
  	 * overflowing cached subxids in PGPROC entries.
***************
*** 73,78 **** typedef struct ProcArrayStruct
--- 74,86 ----
  	TransactionId	lastOverflowedXid;
  
  	/*
+ 	 * During Hot Standby we need to store a visibility limit for to defer recovery
+ 	 * conflict processing. The cached value is accessed during lock processing,
+ 	 * but stored on the procarray header. See lock.c for further details.
+ 	 */
+ 	TransactionId	latestRemovedXidCache[NUM_LOCK_PARTITIONS];
+ 
+ 	/*
  	 * We declare procs[] as 1 entry because C wants a fixed-size array, but
  	 * actually it is maxProcs entries long.
  	 */
***************
*** 220,225 **** CreateSharedProcArray(void)
--- 228,236 ----
  											  HASH_ELEM | HASH_FUNCTION);
  		if (!KnownAssignedXidsHash)
  			elog(FATAL, "could not initialize known assigned xids hash table");
+ 
+ 		/* Initialize the latestRemovedXidCache */
+ 		MemSet(procArray->latestRemovedXidCache, 0, NUM_LOCK_PARTITIONS * sizeof(TransactionId));
  	}
  }
  
***************
*** 1216,1221 **** GetSnapshotData(Snapshot snapshot)
--- 1227,1235 ----
  		RecentGlobalXmin = FirstNormalTransactionId;
  	RecentXmin = xmin;
  
+ 	/* Reset recovery conflict processing limit after deriving new snapshot */
+ 	MyProc->limitXmin = InvalidTransactionId;
+ 
  	snapshot->xmin = xmin;
  	snapshot->xmax = xmax;
  	snapshot->xcnt = count;
***************
*** 1739,1747 **** GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
  			{
  				VirtualTransactionId vxid;
  
! 				GET_VXID_FROM_PGPROC(vxid, *proc);
! 				if (VirtualTransactionIdIsValid(vxid))
! 					vxids[count++] = vxid;
  			}
  		}
  	}
--- 1753,1774 ----
  			{
  				VirtualTransactionId vxid;
  
! 				/*
! 				 * Avoid repeated re-listing of a process for conflict processing
! 				 * by excluding it once resolved up to a particular latestRemovedXid.
! 				 * We reset proc->limitXmin each time we take a new snapshot because
! 				 * that may need to have conflict processing performed on it, as
! 				 * discussed in header comments for this function.
! 				 */
! 				if (!TransactionIdIsValid(proc->limitXmin) ||
! 					TransactionIdFollows(limitXmin, proc->limitXmin))
! 				{
! 					proc->limitXmin = limitXmin;
! 
! 					GET_VXID_FROM_PGPROC(vxid, *proc);
! 					if (VirtualTransactionIdIsValid(vxid))
! 						vxids[count++] = vxid;
! 				}
  			}
  		}
  	}
***************
*** 2559,2561 **** KnownAssignedXidsDisplay(int trace_level)
--- 2586,2610 ----
  
  	pfree(buf.data);
  }
+ 
+ /*
+  * LatestRemovedXidCache provides a way of deferring recovery conflicts up to
+  * the point where a query requests a lock on a relation. These functions provide
+  * the cache required by recovery conflict processing. (see storage/lmgr/lock.c)
+  * Function prototypes are in standby.h, not procarray.h for convenience.
+  */
+ void
+ ProcArraySetLatestRemovedXidCache(int partition, TransactionId latestRemovedXid)
+ {
+ 	ProcArrayStruct *arrayP = procArray;
+ 
+ 	arrayP->latestRemovedXidCache[partition] = latestRemovedXid;
+ }
+ 
+ TransactionId
+ ProcArrayGetLatestRemovedXidCache(int partition)
+ {
+ 	ProcArrayStruct *arrayP = procArray;
+ 
+ 	return arrayP->latestRemovedXidCache[partition];
+ }
*** a/src/backend/storage/ipc/standby.c
--- b/src/backend/storage/ipc/standby.c
***************
*** 27,32 ****
--- 27,33 ----
  #include "storage/procarray.h"
  #include "storage/sinvaladt.h"
  #include "storage/standby.h"
+ #include "tcop/tcopprot.h"
  #include "utils/ps_status.h"
  
  int		vacuum_defer_cleanup_age;
***************
*** 34,40 **** int		vacuum_defer_cleanup_age;
  static List *RecoveryLockList;
  
  static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
! 									   ProcSignalReason reason);
  static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid);
  static void LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
  static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
--- 35,43 ----
  static List *RecoveryLockList;
  
  static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
! 									   ProcSignalReason reason,
! 									   TransactionId latestRemovedXid,
! 									   Oid dbOid, Oid relOid);
  static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid);
  static void LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
  static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
***************
*** 157,165 **** WaitExceedsMaxStandbyDelay(void)
   */
  static void
  ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
! 									   ProcSignalReason reason)
  {
  	char		waitactivitymsg[100];
  
  	while (VirtualTransactionIdIsValid(*waitlist))
  	{
--- 160,171 ----
   */
  static void
  ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
! 									   ProcSignalReason reason,
! 									   TransactionId latestRemovedXid,
! 									   Oid dbOid, Oid relOid)
  {
  	char		waitactivitymsg[100];
+ 	VirtualTransactionId *lockconflicts = NULL;
  
  	while (VirtualTransactionIdIsValid(*waitlist))
  	{
***************
*** 203,221 **** ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
  			if (WaitExceedsMaxStandbyDelay())
  			{
  				pid_t pid;
  
  				/*
! 				 * Now find out who to throw out of the balloon.
  				 */
! 				Assert(VirtualTransactionIdIsValid(*waitlist));
! 				pid = CancelVirtualTransaction(*waitlist, reason);
! 
! 				/*
! 				 * Wait awhile for it to die so that we avoid flooding an
! 				 * unresponsive backend when system is heavily loaded.
! 				 */
! 				if (pid != 0)
! 					pg_usleep(5000);
  			}
  		}
  
--- 209,279 ----
  			if (WaitExceedsMaxStandbyDelay())
  			{
  				pid_t pid;
+ 				bool cancel = true;
  
  				/*
! 				 * If it is a snapshot conflict decide whether we should resolve
! 				 * immediately or defer the resolution until later. This must be
! 				 * designed carefully so that it is a deferral in all cases, in
! 				 * particular we must not set the latestRemovedXid until when the
! 				 * stand_delay has reached max, which is now!
! 				 *
! 				 * If we had a snapshot conflict then we also check for lock conflicts.
! 				 * Lock conflicts disregard whether backends are holding or waiting for
! 				 * the lock, so we get everybody that has declared an intention on the
! 				 * lock. We are careful to acquire the lock partition lock in exclusive
! 				 * mode, so that no concurrent lock requestors slip by while we do this.
! 				 * While holding the lock partition lock we also record the latestRemovedXid
! 				 * so that anybody arriving after we release the lock will cancel
! 				 * themselves using the same errors they would have got here.
! 				 *
! 				 * If conflicts do exist yet don't conflict with the snapshot then we
! 				 * then we do not need to cancel. So we need to merge the two lists
! 				 * to determine who can be spared, for now.
! 				 *
! 				 * XXX O(N^2) at present, but neither list is long in the common case.
  				 */
! 				if (reason == PROCSIG_RECOVERY_CONFLICT_SNAPSHOT)
! 				{
! 					LOCKTAG			locktag;
! 					VirtualTransactionId *conflicts;
! 
! 					SET_LOCKTAG_RELATION(locktag, dbOid, relOid);
! 
! 					if (!lockconflicts)
! 						lockconflicts = GetLockConflicts(&locktag, AccessExclusiveLock,
! 															latestRemovedXid);
! 					conflicts = lockconflicts;
! 					cancel = false;
! 					while (VirtualTransactionIdIsValid(*conflicts))
! 					{
! 						if (waitlist->backendId == conflicts->backendId &&
! 							waitlist->localTransactionId == conflicts->localTransactionId)
! 						{
! 							cancel = true;
! 							break;
! 						}
! 						conflicts++;
! 					}
! 				}
! 
! 				if (cancel)
! 				{
! 					/*
! 					 * Now find out who to throw out of the balloon.
! 					 */
! 					Assert(VirtualTransactionIdIsValid(*waitlist));
! 					pid = CancelVirtualTransaction(*waitlist, reason);
! 
! 					/*
! 					 * Wait awhile for it to die so that we avoid flooding an
! 					 * unresponsive backend when system is heavily loaded.
! 					 */
! 					if (pid != 0)
! 						pg_usleep(5000);
! 				}
! 				else
! 					break;
  			}
  		}
  
***************
*** 232,238 **** ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
  }
  
  void
! ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid)
  {
  	VirtualTransactionId *backends;
  
--- 290,296 ----
  }
  
  void
! ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
  {
  	VirtualTransactionId *backends;
  
***************
*** 240,246 **** ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid)
  										 InvalidOid);
  
  	ResolveRecoveryConflictWithVirtualXIDs(backends,
! 										   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
  }
  
  void
--- 298,318 ----
  										 InvalidOid);
  
  	ResolveRecoveryConflictWithVirtualXIDs(backends,
! 										   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
! 										   latestRemovedXid,
! 										   node.dbNode, node.relNode);
! }
! 
! /*
!  * ResolveRecoveryConflictWithRemovedTransactionId is called to resolve
!  * a deferred conflict.
!  *
!  * Treat as a self-interrupt, but don't bother actually sending the signal.
!  */
! void
! ResolveRecoveryConflictWithRemovedTransactionId(void)
! {
! 	RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
  }
  
  void
***************
*** 270,276 **** ResolveRecoveryConflictWithTablespace(Oid tsid)
  	temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
  												InvalidOid);
  	ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
! 										   PROCSIG_RECOVERY_CONFLICT_TABLESPACE);
  }
  
  void
--- 342,350 ----
  	temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
  												InvalidOid);
  	ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
! 										   PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
! 										   InvalidTransactionId,
! 										   0, 0);
  }
  
  void
***************
*** 321,327 **** ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
  	while (!lock_acquired)
  	{
  		if (++num_attempts < 3)
! 			backends = GetLockConflicts(&locktag, AccessExclusiveLock);
  		else
  		{
  			backends = GetConflictingVirtualXIDs(InvalidTransactionId,
--- 395,403 ----
  	while (!lock_acquired)
  	{
  		if (++num_attempts < 3)
! 			backends = GetLockConflicts(&locktag,
! 										AccessExclusiveLock,
! 										InvalidTransactionId);
  		else
  		{
  			backends = GetConflictingVirtualXIDs(InvalidTransactionId,
***************
*** 330,336 **** ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
  		}
  
  		ResolveRecoveryConflictWithVirtualXIDs(backends,
! 											   PROCSIG_RECOVERY_CONFLICT_LOCK);
  
  		if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
  											!= LOCKACQUIRE_NOT_AVAIL)
--- 406,414 ----
  		}
  
  		ResolveRecoveryConflictWithVirtualXIDs(backends,
! 											   PROCSIG_RECOVERY_CONFLICT_LOCK,
! 											   InvalidTransactionId,
! 											   0, 0);
  
  		if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
  											!= LOCKACQUIRE_NOT_AVAIL)
*** a/src/backend/storage/lmgr/lock.c
--- b/src/backend/storage/lmgr/lock.c
***************
*** 35,40 ****
--- 35,41 ----
  #include "access/transam.h"
  #include "access/twophase.h"
  #include "access/twophase_rmgr.h"
+ #include "access/xact.h"
  #include "miscadmin.h"
  #include "pg_trace.h"
  #include "pgstat.h"
***************
*** 439,444 **** ProcLockHashCode(const PROCLOCKTAG *proclocktag, uint32 hashcode)
--- 440,486 ----
  	return lockhash;
  }
  
+ /*
+  * SetRecoveryConflictLatestRemovedXid functions manage the latestRemovedXid cache.
+  * During Hot Standby we generate a list of conflicts and then resolve only those
+  * conflicts that have a clear and present lock conflict. We defer the conflict
+  * for other relations to some later time, hopefully never. The deferral mechanism
+  * is that we store the latestRemovedXid that applies to a relation behind the
+  * lock partition that applies for that relation. These routines exist to provide
+  * a level of modularity to allow us to alter the cacheing mechanism as needed.
+  *
+  * It is possible to defer the recovery conflict right up until the point that
+  * we access a particular data block. Doing that means we would have to cache
+  * latestRemovedXid for each data block, even those that have exited and re-entered
+  * cache. It would also mean we would need to re-check latestRemovedXid each time
+  * we access a data block. Both of those factors make that level of granularity
+  * more trouble than it would likely be worth in practice. So we check the
+  * latestRemovedXid once each time we lock a relation: smaller cache, fewer checks.
+  *
+  * We store the latestRemovedXid on the lock entry, giving easy access to the value
+  * that applies for that relation. If a lock is heavily used or held for a long
+  * duration then it will effectively have its own private cache. When the lock entry
+  * is removed, we lose the cached latestRemovedXid value.
+  *
+  * When a lock entry is created we set the value from the cache. The current cache
+  * is just a single value for each partition, though that could be extended to have
+  * multiple entries for frequently used relations. The current cache is stored on
+  * the header of the ProcArrayStruct as a convenient place to store shmem data
+  * and could be relocated elsewhere if required.
+  */
+ static void
+ LockSetRecoveryConflictLatestRemovedXidForPartition(int partition, TransactionId latestRemovedXid)
+ {
+ 	Assert(InHotStandby);
+ 
+ 	ProcArraySetLatestRemovedXidCache(partition, latestRemovedXid);
+ }
+ 
+ static TransactionId
+ LockGetRecoveryConflictLatestRemovedXidForPartition(int partition)
+ {
+ 	return ProcArrayGetLatestRemovedXidCache(partition);
+ }
  
  /*
   * LockAcquire -- Check for lock conflicts, sleep if conflict found,
***************
*** 632,637 **** LockAcquireExtended(const LOCKTAG *locktag,
--- 674,685 ----
  		MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
  		MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
  		LOCK_PRINT("LockAcquire: new", lock, lockmode);
+ 
+ 		/*
+ 		 * Start the latestRemovedXid from the partition's cached value
+ 		 */
+ 		if (TransactionStartedDuringRecovery())
+ 			lock->latestRemovedXid = LockGetRecoveryConflictLatestRemovedXidForPartition(partition);
  	}
  	else
  	{
***************
*** 642,647 **** LockAcquireExtended(const LOCKTAG *locktag,
--- 690,726 ----
  	}
  
  	/*
+ 	 * If latestRemovedXid is set, resolve any recovery conflict. We do this
+ 	 * here whether we wait or not. If our snapshot conflicts then we will
+ 	 * be cancelled, so there is no need for us to re-check this after we
+ 	 * wake from a wait for the lock.
+ 	 */
+ 	if (TransactionIdIsValid(lock->latestRemovedXid) &&
+ 		TransactionIdIsValid(MyProc->xmin) &&
+ 		TransactionIdFollowsOrEquals(lock->latestRemovedXid, MyProc->xmin))
+ 	{
+ 		if (lock->nRequested == 0)
+ 		{
+ 			/*
+ 			 * There are no other requestors of this lock, so garbage-collect
+ 			 * the lock object.  We *must* do this to avoid a permanent leak
+ 			 * of shared memory, because there won't be anything to cause
+ 			 * anyone to release the lock object later.
+ 			 */
+ 			Assert(SHMQueueEmpty(&(lock->procLocks)));
+ 			if (!hash_search_with_hash_value(LockMethodLockHash,
+ 											 (void *) &(lock->tag),
+ 											 hashcode,
+ 											 HASH_REMOVE,
+ 											 NULL))
+ 				elog(PANIC, "lock table corrupted");
+ 		}
+ 		LWLockRelease(partitionLock);
+ 		ResolveRecoveryConflictWithRemovedTransactionId();
+ 		/* Not reached */
+ 	}
+ 
+ 	/*
  	 * Create the hash key for the proclock table.
  	 */
  	proclocktag.myLock = lock;
***************
*** 1788,1796 **** LockReassignCurrentOwner(void)
   * uses of the result.
   */
  VirtualTransactionId *
! GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
  {
! 	VirtualTransactionId *vxids;
  	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
  	LockMethod	lockMethodTable;
  	LOCK	   *lock;
--- 1867,1875 ----
   * uses of the result.
   */
  VirtualTransactionId *
! GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, TransactionId latestRemovedXid)
  {
! 	static VirtualTransactionId *vxids;
  	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
  	LockMethod	lockMethodTable;
  	LOCK	   *lock;
***************
*** 1798,1803 **** GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
--- 1877,1883 ----
  	SHM_QUEUE  *procLocks;
  	PROCLOCK   *proclock;
  	uint32		hashcode;
+ 	int			partition;
  	LWLockId	partitionLock;
  	int			count = 0;
  
***************
*** 1812,1827 **** GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
  	 * need enough space for MaxBackends + a terminator, since prepared xacts
  	 * don't count.
  	 */
! 	vxids = (VirtualTransactionId *)
! 		palloc0(sizeof(VirtualTransactionId) * (MaxBackends + 1));
  
  	/*
  	 * Look up the lock object matching the tag.
  	 */
  	hashcode = LockTagHashCode(locktag);
  	partitionLock = LockHashPartitionLock(hashcode);
  
! 	LWLockAcquire(partitionLock, LW_SHARED);
  
  	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
  												(void *) locktag,
--- 1892,1929 ----
  	 * need enough space for MaxBackends + a terminator, since prepared xacts
  	 * don't count.
  	 */
! 	if (!InHotStandby)
! 		vxids = (VirtualTransactionId *)
! 			palloc0(sizeof(VirtualTransactionId) * (MaxBackends + 1));
! 	else
! 	{
! 		if (vxids == NULL)
! 		{
! 			vxids = (VirtualTransactionId *)
! 				malloc(sizeof(VirtualTransactionId) * (MaxBackends + 1));
! 			if (vxids == NULL)
! 				ereport(ERROR,
! 					(errcode(ERRCODE_OUT_OF_MEMORY),
! 					 errmsg("out of memory")));
! 
! 		}
! 	}
  
  	/*
  	 * Look up the lock object matching the tag.
  	 */
  	hashcode = LockTagHashCode(locktag);
+ 	partition = LockHashPartition(hashcode);
  	partitionLock = LockHashPartitionLock(hashcode);
  
! 	/*
! 	 * If we are setting latestRemovedXid we need to get exclusive lock
! 	 * to avoid race conditions.
! 	 */
! 	if (TransactionIdIsValid(latestRemovedXid))
! 		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
! 	else
! 		LWLockAcquire(partitionLock, LW_SHARED);
  
  	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
  												(void *) locktag,
***************
*** 1831,1836 **** GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
--- 1933,1944 ----
  	if (!lock)
  	{
  		/*
+ 		 * Set the latestRemovedXid for the partition.
+ 		 */
+ 		if (TransactionIdIsValid(latestRemovedXid))
+ 			LockSetRecoveryConflictLatestRemovedXidForPartition(partition, latestRemovedXid);
+ 
+ 		/*
  		 * If the lock object doesn't exist, there is nothing holding a lock
  		 * on this lockable object.
  		 */
***************
*** 1839,1844 **** GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
--- 1947,1958 ----
  	}
  
  	/*
+ 	 * Set the latestRemovedXid for the lock
+ 	 */
+ 	if (TransactionIdIsValid(latestRemovedXid))
+ 		lock->latestRemovedXid = latestRemovedXid;
+ 
+ 	/*
  	 * Examine each existing holder (or awaiter) of the lock.
  	 */
  	conflictMask = lockMethodTable->conflictTab[lockmode];
*** a/src/include/storage/lock.h
--- b/src/include/storage/lock.h
***************
*** 312,317 **** typedef struct LOCK
--- 312,323 ----
  	int			nRequested;		/* total of requested[] array */
  	int			granted[MAX_LOCKMODES]; /* counts of granted locks */
  	int			nGranted;		/* total of granted[] array */
+ 
+ 	/*
+ 	 * For Hot Standby, the xid limit for access to this table. Allows deferred
+ 	 * and selective conflict processing based upon the relation id.
+ 	 */
+ 	TransactionId latestRemovedXid;
  } LOCK;
  
  #define LOCK_LOCKMETHOD(lock) ((LOCKMETHODID) (lock).tag.locktag_lockmethodid)
***************
*** 488,494 **** extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
  extern void LockReleaseCurrentOwner(void);
  extern void LockReassignCurrentOwner(void);
  extern VirtualTransactionId *GetLockConflicts(const LOCKTAG *locktag,
! 				 LOCKMODE lockmode);
  extern void AtPrepare_Locks(void);
  extern void PostPrepare_Locks(TransactionId xid);
  extern int LockCheckConflicts(LockMethod lockMethodTable,
--- 494,500 ----
  extern void LockReleaseCurrentOwner(void);
  extern void LockReassignCurrentOwner(void);
  extern VirtualTransactionId *GetLockConflicts(const LOCKTAG *locktag,
! 				 LOCKMODE lockmode, TransactionId latestRemovedXid);
  extern void AtPrepare_Locks(void);
  extern void PostPrepare_Locks(TransactionId xid);
  extern int LockCheckConflicts(LockMethod lockMethodTable,
*** a/src/include/storage/proc.h
--- b/src/include/storage/proc.h
***************
*** 102,107 **** struct PGPROC
--- 102,112 ----
  	 */
  	bool		recoveryConflictPending;
  
+ 	TransactionId limitXmin;	/* The latest xid for which conflict processing
+ 								 * has been already performed for this proc.
+ 								 * Updated by snapshot conflicts, reset when
+ 								 * we take a new snapshot. */
+ 
  	/* Info about LWLock the process is currently waiting for, if any. */
  	bool		lwWaiting;		/* true if waiting for an LW lock */
  	bool		lwExclusive;	/* true if waiting for exclusive access */
*** a/src/include/storage/standby.h
--- b/src/include/storage/standby.h
***************
*** 16,34 ****
  
  #include "access/xlog.h"
  #include "storage/lock.h"
  
  extern int	vacuum_defer_cleanup_age;
  
  extern void InitRecoveryTransactionEnvironment(void);
  extern void ShutdownRecoveryTransactionEnvironment(void);
  
! extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid);
  extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
  extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
  
  extern void ResolveRecoveryConflictWithBufferPin(void);
  extern void SendRecoveryConflictWithBufferPin(void);
  
  /*
   * Standby Rmgr (RM_STANDBY_ID)
   *
--- 16,41 ----
  
  #include "access/xlog.h"
  #include "storage/lock.h"
+ #include "storage/relfilenode.h"
  
  extern int	vacuum_defer_cleanup_age;
  
  extern void InitRecoveryTransactionEnvironment(void);
  extern void ShutdownRecoveryTransactionEnvironment(void);
  
! extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
! 												RelFileNode node);
! extern void ResolveRecoveryConflictWithRemovedTransactionId(void);
  extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
  extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
  
  extern void ResolveRecoveryConflictWithBufferPin(void);
  extern void SendRecoveryConflictWithBufferPin(void);
  
+ /* Prototypes here rather than in procarray.h */
+ extern void ProcArraySetLatestRemovedXidCache(int partition, TransactionId latestRemovedXid);
+ extern TransactionId ProcArrayGetLatestRemovedXidCache(int partition);
+ 
  /*
   * Standby Rmgr (RM_STANDBY_ID)
   *
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to