diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
new file mode 100644
index cfe3954..3225784
*** a/src/backend/storage/lmgr/lock.c
--- b/src/backend/storage/lmgr/lock.c
*************** SetupLockInTable(LockMethod lockMethodTa
*** 1098,1103 ****
--- 1098,1106 ----
  static void
  RemoveLocalLock(LOCALLOCK *locallock)
  {
+ 	int i;
+ 	for (i = locallock->numLockOwners - 1; i >= 0; i--)
+ 		ResourceOwnerForgetLock(locallock->lockOwners[i].owner, locallock);
  	pfree(locallock->lockOwners);
  	locallock->lockOwners = NULL;
  	if (locallock->holdsStrongLockCount)
*************** GrantLockLocal(LOCALLOCK *locallock, Res
*** 1355,1360 ****
--- 1358,1364 ----
  	lockOwners[i].owner = owner;
  	lockOwners[i].nLocks = 1;
  	locallock->numLockOwners++;
+ 	ResourceOwnerRememberLock(owner,locallock);
  }
  
  /*
*************** LockRelease(const LOCKTAG *locktag, LOCK
*** 1670,1675 ****
--- 1674,1680 ----
  				Assert(lockOwners[i].nLocks > 0);
  				if (--lockOwners[i].nLocks == 0)
  				{
+ 					ResourceOwnerForgetLock(owner, locallock);
  					/* compact out unused slot */
  					locallock->numLockOwners--;
  					if (i < locallock->numLockOwners)
*************** LockReleaseAll(LOCKMETHODID lockmethodid
*** 1862,1874 ****
  		{
  			LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
  
! 			/* If it's above array position 0, move it down to 0 */
! 			for (i = locallock->numLockOwners - 1; i > 0; i--)
  			{
  				if (lockOwners[i].owner == NULL)
  				{
  					lockOwners[0] = lockOwners[i];
! 					break;
  				}
  			}
  
--- 1867,1882 ----
  		{
  			LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
  
! 			/* If session lock is above array position 0, move it down to 0 */
! 			for (i = 0; i < locallock->numLockOwners ; i++)
  			{
  				if (lockOwners[i].owner == NULL)
  				{
  					lockOwners[0] = lockOwners[i];
! 				}
! 				else
! 				{
! 					ResourceOwnerForgetLock(lockOwners[i].owner, locallock);
  				}
  			}
  
*************** LockReleaseAll(LOCKMETHODID lockmethodid
*** 1882,1887 ****
--- 1890,1899 ----
  				/* We aren't deleting this locallock, so done */
  				continue;
  			}
+ 			else
+ 			{
+ 				locallock->numLockOwners=0;
+ 			}
  		}
  
  		/*
*************** ReleaseLockIfHeld(LOCALLOCK *locallock, 
*** 2124,2129 ****
--- 2136,2142 ----
  				locallock->nLocks -= lockOwners[i].nLocks;
  				/* compact out unused slot */
  				locallock->numLockOwners--;
+ 				ResourceOwnerForgetLock(owner, locallock);
  				if (i < locallock->numLockOwners)
  					lockOwners[i] = lockOwners[locallock->numLockOwners];
  			}
*************** ReleaseLockIfHeld(LOCALLOCK *locallock, 
*** 2144,2201 ****
  }
  
  /*
!  * LockReassignCurrentOwner
   *		Reassign all locks belonging to CurrentResourceOwner to belong
   *		to its parent resource owner
   */
  void
! LockReassignCurrentOwner(void)
  {
- 	ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
  	HASH_SEQ_STATUS status;
  	LOCALLOCK  *locallock;
- 	LOCALLOCKOWNER *lockOwners;
- 
- 	Assert(parent != NULL);
  
  	hash_seq_init(&status, LockMethodLocalHash);
  
  	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
  	{
! 		int			i;
! 		int			ic = -1;
! 		int			ip = -1;
  
- 		/*
- 		 * Scan to see if there are any locks belonging to current owner or
- 		 * its parent
- 		 */
- 		lockOwners = locallock->lockOwners;
- 		for (i = locallock->numLockOwners - 1; i >= 0; i--)
- 		{
- 			if (lockOwners[i].owner == CurrentResourceOwner)
- 				ic = i;
- 			else if (lockOwners[i].owner == parent)
- 				ip = i;
- 		}
  
! 		if (ic < 0)
! 			continue;			/* no current locks */
  
! 		if (ip < 0)
! 		{
! 			/* Parent has no slot, so just give it child's slot */
! 			lockOwners[ic].owner = parent;
! 		}
! 		else
! 		{
! 			/* Merge child's count with parent's */
! 			lockOwners[ip].nLocks += lockOwners[ic].nLocks;
! 			/* compact out unused slot */
! 			locallock->numLockOwners--;
! 			if (ic < locallock->numLockOwners)
! 				lockOwners[ic] = lockOwners[locallock->numLockOwners];
! 		}
  	}
  }
  
--- 2157,2229 ----
  }
  
  /*
!  * LockReassignCurrentOwnerAll
   *		Reassign all locks belonging to CurrentResourceOwner to belong
   *		to its parent resource owner
   */
  void
! LockReassignCurrentOwnerAll(void)
  {
  	HASH_SEQ_STATUS status;
  	LOCALLOCK  *locallock;
  
  	hash_seq_init(&status, LockMethodLocalHash);
  
  	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
  	{
! 		LockReassignCurrentOwner(locallock);
! 	};
! };
  
  
! /*
!  * LockReassignCurrentOwner
!  *		Reassign a given lock belonging to CurrentResourceOwner to belong
!  *		to its parent resource owner
!  */
! void
! LockReassignCurrentOwner(LOCALLOCK * locallock)
! {
! 	ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
! 	LOCALLOCKOWNER *lockOwners;
! 	int			i;
! 	int			ic = -1;
! 	int			ip = -1;
  
! 	Assert(parent != NULL);
! 
! 	/*
! 	 * Scan to see if there are any locks belonging to current owner or
! 	 * its parent
! 	 */
! 	lockOwners = locallock->lockOwners;
! 	for (i = locallock->numLockOwners - 1; i >= 0; i--)
! 	{
! 		if (lockOwners[i].owner == CurrentResourceOwner)
! 			ic = i;
! 		else if (lockOwners[i].owner == parent)
! 			ip = i;
! 	}
! 
! 	if (ic < 0)
! 		return;			/* no current locks */
! 
! 	if (ip < 0)
! 	{
! 		/* Parent has no slot, so just give it child's slot */
! 		lockOwners[ic].owner = parent;
! 		ResourceOwnerRememberLock(parent, locallock);
! 		ResourceOwnerForgetLock(CurrentResourceOwner, locallock);
! 	}
! 	else
! 	{
! 		/* Merge child's count with parent's */
! 		lockOwners[ip].nLocks += lockOwners[ic].nLocks;
! 		/* compact out unused slot */
! 		locallock->numLockOwners--;
! 		if (ic < locallock->numLockOwners)
! 			lockOwners[ic] = lockOwners[locallock->numLockOwners];
! 		ResourceOwnerForgetLock(CurrentResourceOwner, locallock);
  	}
  }
  
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
new file mode 100644
index 7c771eb..bfd3908
*** a/src/backend/utils/resowner/resowner.c
--- b/src/backend/utils/resowner/resowner.c
***************
*** 27,32 ****
--- 27,33 ----
  #include "utils/rel.h"
  #include "utils/snapmgr.h"
  
+ #define MAX_RESOWNER_LOCKS 10
  
  /*
   * ResourceOwner objects look like this
*************** typedef struct ResourceOwnerData
*** 43,48 ****
--- 44,53 ----
  	Buffer	   *buffers;		/* dynamically allocated array */
  	int			maxbuffers;		/* currently allocated array size */
  
+ 	/* We have built-in support for remembering up to MAX_RESOWNER_LOCKS locks */
+ 	int			nlocks;		/* number of owned locks */
+ 	LOCALLOCK  *locks[MAX_RESOWNER_LOCKS];	/* list of owned locks */
+ 
  	/* We have built-in support for remembering catcache references */
  	int			ncatrefs;		/* number of owned catcache pins */
  	HeapTuple  *catrefs;		/* dynamically allocated array */
*************** ResourceOwnerReleaseInternal(ResourceOwn
*** 270,282 ****
  			/*
  			 * Release locks retail.  Note that if we are committing a
  			 * subtransaction, we do NOT release its locks yet, but transfer
! 			 * them to the parent.
  			 */
  			Assert(owner->parent != NULL);
! 			if (isCommit)
! 				LockReassignCurrentOwner();
  			else
! 				LockReleaseCurrentOwner();
  		}
  	}
  	else if (phase == RESOURCE_RELEASE_AFTER_LOCKS)
--- 275,310 ----
  			/*
  			 * Release locks retail.  Note that if we are committing a
  			 * subtransaction, we do NOT release its locks yet, but transfer
! 			 * them to the parent.  If the list of locks has over-flowed,
! 			 * scour the entire lock table to decide what to release or 
! 			 * reassign.  Otherwise, release or reassign the remembered 
! 			 * locks.  This optimization is important for pg_dump and for
! 			 * other cases where the parent or other ResourceOwners already 
! 			 * own a large number of locks.
  			 */
  			Assert(owner->parent != NULL);
! 			if (isCommit) 
! 			{
! 				if (owner->nlocks > MAX_RESOWNER_LOCKS)
! 					LockReassignCurrentOwnerAll();
! 				else 
! 				{
! 					while (owner->nlocks > 0) {
! 						LockReassignCurrentOwner(owner->locks[owner->nlocks-1]);
! 					}
! 				}
! 			}
  			else
! 			{
! 				if (owner->nlocks > MAX_RESOWNER_LOCKS)
! 					LockReleaseCurrentOwner();
! 				else
! 				{
! 					while (owner->nlocks > 0) {
! 						LockRelease(&owner->locks[owner->nlocks-1]->tag.lock, owner->locks[owner->nlocks-1]->tag.mode, false);
! 					}
! 				}
! 			}
  		}
  	}
  	else if (phase == RESOURCE_RELEASE_AFTER_LOCKS)
*************** ResourceOwnerDelete(ResourceOwner owner)
*** 357,362 ****
--- 385,391 ----
  
  	/* And it better not own any resources, either */
  	Assert(owner->nbuffers == 0);
+ 	Assert(owner->nlocks == 0 || owner->nlocks == MAX_RESOWNER_LOCKS + 1);
  	Assert(owner->ncatrefs == 0);
  	Assert(owner->ncatlistrefs == 0);
  	Assert(owner->nrelrefs == 0);
*************** ResourceOwnerEnlargeBuffers(ResourceOwne
*** 531,536 ****
--- 560,614 ----
  	}
  }
  
+ 
+ /*
+  * Remember that a Local Lock is owned by a ResourceOwner
+  *
+  * If owner->nlocks > MAX_RESOWNER_LOCKS then the array of locks
+  * has over-flowed.
+  *
+  * We allow the case owner == NULL because of session level locks
+  */
+ void
+ ResourceOwnerRememberLock(ResourceOwner owner, LOCALLOCK * locallock)
+ {
+ 	if (owner != NULL && owner->nlocks <= MAX_RESOWNER_LOCKS) 
+ 	{
+ 		if (owner->nlocks < MAX_RESOWNER_LOCKS)
+ 			owner->locks[owner->nlocks] = locallock;
+ 		owner->nlocks++;
+ 	};
+ };
+ 
+ 
+ /*
+  * Forget that a Local Lock is owned by a ResourceOwner
+  *
+  * If owner->nlocks > MAX_RESOWNER_LOCKS then the array of locks
+  * has over-flowed, so do no actually attempt to forget.
+  *
+  * We allow the case owner == NULL because of session level locks
+  */
+ void
+ ResourceOwnerForgetLock(ResourceOwner owner, LOCALLOCK *locallock)
+ {
+ 	if (owner != NULL && owner->nlocks <= MAX_RESOWNER_LOCKS) 
+ 	{
+ 		int			i;
+ 		Assert(owner->nlocks > 0);
+ 		owner->nlocks--;
+ 		for (i = owner->nlocks; i >= 0; i--)
+ 		{
+ 			if (locallock==owner->locks[i])
+ 			{
+ 				owner->locks[i]=owner->locks[owner->nlocks];
+ 				return;
+ 			}
+ 		}
+ 		elog(ERROR, "Can't find lock to remove");
+ 	};
+ };
+ 
  /*
   * Remember that a buffer pin is owned by a ResourceOwner
   *
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
new file mode 100644
index d629ac2..93e65ee
*** a/src/include/storage/lock.h
--- b/src/include/storage/lock.h
*************** extern bool LockRelease(const LOCKTAG *l
*** 493,499 ****
  extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
  extern void LockReleaseSession(LOCKMETHODID lockmethodid);
  extern void LockReleaseCurrentOwner(void);
! extern void LockReassignCurrentOwner(void);
  extern VirtualTransactionId *GetLockConflicts(const LOCKTAG *locktag,
  				 LOCKMODE lockmode);
  extern void AtPrepare_Locks(void);
--- 493,500 ----
  extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
  extern void LockReleaseSession(LOCKMETHODID lockmethodid);
  extern void LockReleaseCurrentOwner(void);
! extern void LockReassignCurrentOwnerAll(void);
! extern void LockReassignCurrentOwner(LOCALLOCK *locallock);
  extern VirtualTransactionId *GetLockConflicts(const LOCKTAG *locktag,
  				 LOCKMODE lockmode);
  extern void AtPrepare_Locks(void);
diff --git a/src/include/utils/resowner.h b/src/include/utils/resowner.h
new file mode 100644
index 11034e4..399ea2b
*** a/src/include/utils/resowner.h
--- b/src/include/utils/resowner.h
***************
*** 20,25 ****
--- 20,26 ----
  #define RESOWNER_H
  
  #include "storage/fd.h"
+ #include "storage/lock.h"
  #include "utils/catcache.h"
  #include "utils/plancache.h"
  #include "utils/snapshot.h"
*************** extern void ResourceOwnerEnlargeBuffers(
*** 89,94 ****
--- 90,100 ----
  extern void ResourceOwnerRememberBuffer(ResourceOwner owner, Buffer buffer);
  extern void ResourceOwnerForgetBuffer(ResourceOwner owner, Buffer buffer);
  
+ 
+ /* support for local lock management */
+ extern void ResourceOwnerRememberLock(ResourceOwner owner, LOCALLOCK *locallock);
+ extern void ResourceOwnerForgetLock(ResourceOwner owner, LOCALLOCK *locallock);
+ 
  /* support for catcache refcount management */
  extern void ResourceOwnerEnlargeCatCacheRefs(ResourceOwner owner);
  extern void ResourceOwnerRememberCatCacheRef(ResourceOwner owner,
