Thanks for having a look at this.

On Wed, 1 Feb 2023 at 03:07, Amit Langote <amitlangot...@gmail.com> wrote:
> Maybe you're planning to do it once this patch is post the PoC phase
> (isn't it?), but it would be helpful to have commentary on all the new
> dlist fields.

I've added comments on the new fields.  Maybe we can say the patch is "wip".

> This seems to be replacing what is a cache with an upper limit on the
> number of cacheds locks with something that has no limit on how many
> per-owner locks are remembered.  I wonder whether we'd be doing
> additional work in some cases with the new no-limit implementation
> that wasn't being done before (where the owner's locks array is
> overflowed) or maybe not much because the new implementation of
> ResourceOwner{Remember|Forget}Lock() is simple push/delete of a dlist
> node from the owner's dlist?

It's a good question. The problem is I don't really have a good test
to find out.  The problem is we'd need to benchmark taking fewer than
16 locks. On trying that, I find that there's just too much
variability in the performance between runs to determine if there's
any slowdown.

$ cat 10_locks.sql
select count(pg_advisory_lock(x)) from generate_series(1,10) x;

$ pgbench -f 10_locks.sql@1000 -M prepared -T 10 -n postgres | grep -E "(tps)"
tps = 47809.306088 (without initial connection time)
tps = 66859.789072 (without initial connection time)
tps = 37885.924616 (without initial connection time)

On trying with more locks, I see there are good wins from the patched version.

$ cat 100_locks.sql
select count(pg_advisory_lock(x)) from generate_series(1,100) x;

$ cat 1k_locks.sql
select count(pg_advisory_lock(x)) from generate_series(1,1000) x;

$ cat 10k_locks.sql
select count(pg_advisory_lock(x)) from generate_series(1,10000) x;

Test 1:  Take 100 locks but periodically take 10k locks to bloat the
local lock table.

master:
$ pgbench -f 100_locks.sql@1000 -f 10k_locks.sql@1 -M prepared -T 10
-n postgres | grep -E "(tps|script)"
transaction type: multiple scripts
tps = 2726.197037 (without initial connection time)
SQL script 1: 100_locks.sql
 - 27219 transactions (99.9% of total, tps = 2722.496227)
SQL script 2: 10k_locks.sql
 - 37 transactions (0.1% of total, tps = 3.700810)

patched:
$ pgbench -f 100_locks.sql@1000 -f 10k_locks.sql@1 -M prepared -T 10
-n postgres | grep -E "(tps|script)"
transaction type: multiple scripts
tps = 34047.297822 (without initial connection time)
SQL script 1: 100_locks.sql
 - 340039 transactions (99.9% of total, tps = 34012.688879)
SQL script 2: 10k_locks.sql
 - 346 transactions (0.1% of total, tps = 34.608943)

patched without slab context:
$ pgbench -f 100_locks.sql@1000 -f 10k_locks.sql@1 -M prepared -T 10
-n postgres | grep -E "(tps|script)"
transaction type: multiple scripts
tps = 34851.770846 (without initial connection time)
SQL script 1: 100_locks.sql
 - 348097 transactions (99.9% of total, tps = 34818.662324)
SQL script 2: 10k_locks.sql
 - 331 transactions (0.1% of total, tps = 33.108522)

Test 2: Always take just 100 locks and don't bloat the local lock table.

master:
$ pgbench -f 100_locks.sql@1000 -M prepared -T 10 -n postgres | grep
-E "(tps|script)"
tps = 32682.491548 (without initial connection time)

patched:
$ pgbench -f 100_locks.sql@1000 -M prepared -T 10 -n postgres | grep
-E "(tps|script)"
tps = 35637.241815 (without initial connection time)

patched without slab context:
$ pgbench -f 100_locks.sql@1000 -M prepared -T 10 -n postgres | grep
-E "(tps|script)"
tps = 36192.185181 (without initial connection time)

The attached 0003 patch is an experiment to see if using a slab memory
context has any advantages for storing the LOCALLOCKOWNER structs.
There seems to be a small performance hit from doing this.

> The following comment is now obsolete:
>
> /*
>  * LockReassignCurrentOwner
>  *      Reassign all locks belonging to CurrentResourceOwner to belong
>  *      to its parent resource owner.
>  *
>  * If the caller knows what those locks are, it can pass them as an array.
>  * That speeds up the call significantly, when a lot of locks are held
>  * (e.g pg_dump with a large schema).  Otherwise, pass NULL for locallocks,
>  * and we'll traverse through our hash table to find them.
>  */

I've removed the obsolete part.

I've attached another set of patches. I do need to spend longer
looking at this. I'm mainly attaching these as CI seems to be
highlighting a problem that I'm unable to recreate locally and I
wanted to see if the attached fixes it.

David
From 4a546ad6d33e544dd872b23a94925a262088cd9a Mon Sep 17 00:00:00 2001
From: David Rowley <dgrow...@gmail.com>
Date: Tue, 24 Jan 2023 16:00:58 +1300
Subject: [PATCH v6 1/3] wip-resowner-lock-release-all.

---
 src/backend/commands/discard.c             |   2 +-
 src/backend/replication/logical/launcher.c |   2 +-
 src/backend/storage/lmgr/README            |   6 -
 src/backend/storage/lmgr/lock.c            | 650 ++++++---------------
 src/backend/storage/lmgr/proc.c            |  17 +-
 src/backend/utils/init/postinit.c          |   6 +-
 src/backend/utils/resowner/resowner.c      | 128 ++--
 src/include/storage/lock.h                 |  32 +-
 src/include/utils/resowner_private.h       |   5 +-
 9 files changed, 280 insertions(+), 568 deletions(-)

diff --git a/src/backend/commands/discard.c b/src/backend/commands/discard.c
index 296dc82d2e..edb8b6026e 100644
--- a/src/backend/commands/discard.c
+++ b/src/backend/commands/discard.c
@@ -71,7 +71,7 @@ DiscardAll(bool isTopLevel)
        ResetAllOptions();
        DropAllPreparedStatements();
        Async_UnlistenAll();
-       LockReleaseAll(USER_LOCKMETHOD, true);
+       LockReleaseSession(USER_LOCKMETHOD);
        ResetPlanCache();
        ResetTempTableNamespace();
        ResetSequenceCaches();
diff --git a/src/backend/replication/logical/launcher.c 
b/src/backend/replication/logical/launcher.c
index 970d170e73..8998b55f62 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -798,7 +798,7 @@ logicalrep_worker_onexit(int code, Datum arg)
         * parallel apply mode and will not be released when the worker
         * terminates, so manually release all locks before the worker exits.
         */
-       LockReleaseAll(DEFAULT_LOCKMETHOD, true);
+       LockReleaseSession(DEFAULT_LOCKMETHOD);
 
        ApplyLauncherWakeup();
 }
diff --git a/src/backend/storage/lmgr/README b/src/backend/storage/lmgr/README
index d08ec6c402..9603cc8959 100644
--- a/src/backend/storage/lmgr/README
+++ b/src/backend/storage/lmgr/README
@@ -182,12 +182,6 @@ holdMask -
     subset of the PGPROC object's heldLocks mask (if the PGPROC is
     currently waiting for another lock mode on this lock).
 
-releaseMask -
-    A bitmask for the lock modes due to be released during LockReleaseAll.
-    This must be a subset of the holdMask.  Note that it is modified without
-    taking the partition LWLock, and therefore it is unsafe for any
-    backend except the one owning the PROCLOCK to examine/change it.
-
 lockLink -
     List link for shared memory queue of all the PROCLOCK objects for the
     same LOCK.
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index a87372f33f..7e2dd3a7af 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -22,8 +22,7 @@
  *     Interface:
  *
  *     InitLocks(), GetLocksMethodTable(), GetLockTagsMethodTable(),
- *     LockAcquire(), LockRelease(), LockReleaseAll(),
- *     LockCheckConflicts(), GrantLock()
+ *     LockAcquire(), LockRelease(), LockCheckConflicts(), GrantLock()
  *
  *-------------------------------------------------------------------------
  */
@@ -289,6 +288,9 @@ static LOCALLOCK *awaitedLock;
 static ResourceOwner awaitedOwner;
 
 
+static dlist_head session_locks[lengthof(LockMethods)];
+
+
 #ifdef LOCK_DEBUG
 
 /*------
@@ -375,8 +377,8 @@ static void GrantLockLocal(LOCALLOCK *locallock, 
ResourceOwner owner);
 static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode);
 static void FinishStrongLockAcquire(void);
 static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
-static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock);
-static void LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent);
+static void ReleaseLockIfHeld(LOCALLOCKOWNER *locallockowner, bool 
sessionLock);
+static void LockReassignOwner(LOCALLOCKOWNER *locallockowner, ResourceOwner 
parent);
 static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
                                                PROCLOCK *proclock, LockMethod 
lockMethodTable);
 static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
@@ -477,6 +479,10 @@ InitLocks(void)
                                                                          16,
                                                                          &info,
                                                                          
HASH_ELEM | HASH_BLOBS);
+
+       /* Initialize each element of the session_locks array */
+       for (int i = 0; i < lengthof(LockMethods); i++)
+               dlist_init(&session_locks[i]);
 }
 
 
@@ -839,26 +845,9 @@ LockAcquireExtended(const LOCKTAG *locktag,
                locallock->nLocks = 0;
                locallock->holdsStrongLockCount = false;
                locallock->lockCleared = false;
-               locallock->numLockOwners = 0;
-               locallock->maxLockOwners = 8;
-               locallock->lockOwners = NULL;   /* in case next line fails */
-               locallock->lockOwners = (LOCALLOCKOWNER *)
-                       MemoryContextAlloc(TopMemoryContext,
-                                                          
locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
+               dlist_init(&locallock->locallockowners);
        }
-       else
-       {
-               /* Make sure there will be room to remember the lock */
-               if (locallock->numLockOwners >= locallock->maxLockOwners)
-               {
-                       int                     newsize = 
locallock->maxLockOwners * 2;
 
-                       locallock->lockOwners = (LOCALLOCKOWNER *)
-                               repalloc(locallock->lockOwners,
-                                                newsize * 
sizeof(LOCALLOCKOWNER));
-                       locallock->maxLockOwners = newsize;
-               }
-       }
        hashcode = locallock->hashcode;
 
        if (locallockp)
@@ -1268,7 +1257,6 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
                proclock->groupLeader = proc->lockGroupLeader != NULL ?
                        proc->lockGroupLeader : proc;
                proclock->holdMask = 0;
-               proclock->releaseMask = 0;
                /* Add proclock to appropriate lists */
                dlist_push_tail(&lock->procLocks, &proclock->lockLink);
                dlist_push_tail(&proc->myProcLocks[partition], 
&proclock->procLink);
@@ -1365,17 +1353,18 @@ CheckAndSetLockHeld(LOCALLOCK *locallock, bool acquired)
 static void
 RemoveLocalLock(LOCALLOCK *locallock)
 {
-       int                     i;
+       dlist_mutable_iter iter;
 
-       for (i = locallock->numLockOwners - 1; i >= 0; i--)
+       dlist_foreach_modify(iter, &locallock->locallockowners)
        {
-               if (locallock->lockOwners[i].owner != NULL)
-                       ResourceOwnerForgetLock(locallock->lockOwners[i].owner, 
locallock);
+               LOCALLOCKOWNER *locallockowner = 
dlist_container(LOCALLOCKOWNER, locallock_node, iter.cur);
+
+               Assert(locallockowner->owner != NULL);
+               dlist_delete(&locallockowner->locallock_node);
+               ResourceOwnerForgetLock(locallockowner);
        }
-       locallock->numLockOwners = 0;
-       if (locallock->lockOwners != NULL)
-               pfree(locallock->lockOwners);
-       locallock->lockOwners = NULL;
+
+       Assert(dlist_is_empty(&locallock->locallockowners));
 
        if (locallock->holdsStrongLockCount)
        {
@@ -1683,26 +1672,38 @@ CleanUpLock(LOCK *lock, PROCLOCK *proclock,
 static void
 GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
 {
-       LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
-       int                     i;
+       LOCALLOCKOWNER *locallockowner;
+       dlist_iter      iter;
 
-       Assert(locallock->numLockOwners < locallock->maxLockOwners);
        /* Count the total */
        locallock->nLocks++;
        /* Count the per-owner lock */
-       for (i = 0; i < locallock->numLockOwners; i++)
+       dlist_foreach(iter, &locallock->locallockowners)
        {
-               if (lockOwners[i].owner == owner)
+               locallockowner = dlist_container(LOCALLOCKOWNER, 
locallock_node, iter.cur);
+
+               if (locallockowner->owner == owner)
                {
-                       lockOwners[i].nLocks++;
+                       locallockowner->nLocks++;
                        return;
                }
        }
-       lockOwners[i].owner = owner;
-       lockOwners[i].nLocks = 1;
-       locallock->numLockOwners++;
+       locallockowner = MemoryContextAlloc(TopMemoryContext, 
sizeof(LOCALLOCKOWNER));
+       locallockowner->owner = owner;
+       locallockowner->nLocks = 1;
+       locallockowner->locallock = locallock;
+
+       dlist_push_tail(&locallock->locallockowners, 
&locallockowner->locallock_node);
+
        if (owner != NULL)
-               ResourceOwnerRememberLock(owner, locallock);
+               ResourceOwnerRememberLock(owner, locallockowner);
+       else
+       {
+               LOCKMETHODID lockmethodid = 
LOCALLOCK_LOCKMETHOD(*locallockowner->locallock);
+
+               Assert(lockmethodid > 0 && lockmethodid <= 2);
+               dlist_push_tail(&session_locks[lockmethodid - 1], 
&locallockowner->resowner_node);
+       }
 
        /* Indicate that the lock is acquired for certain types of locks. */
        CheckAndSetLockHeld(locallock, true);
@@ -2015,9 +2016,9 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, 
bool sessionLock)
         * Decrease the count for the resource owner.
         */
        {
-               LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
                ResourceOwner owner;
-               int                     i;
+               dlist_iter      iter;
+               bool            found = false;
 
                /* Identify owner for lock */
                if (sessionLock)
@@ -2025,24 +2026,29 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, 
bool sessionLock)
                else
                        owner = CurrentResourceOwner;
 
-               for (i = locallock->numLockOwners - 1; i >= 0; i--)
+               dlist_foreach(iter, &locallock->locallockowners)
                {
-                       if (lockOwners[i].owner == owner)
+                       LOCALLOCKOWNER *locallockowner = 
dlist_container(LOCALLOCKOWNER, locallock_node, iter.cur);
+
+                       if (locallockowner->owner != owner)
+                               continue;
+
+                       found = true;
+
+                       if (--locallockowner->nLocks == 0)
                        {
-                               Assert(lockOwners[i].nLocks > 0);
-                               if (--lockOwners[i].nLocks == 0)
-                               {
-                                       if (owner != NULL)
-                                               ResourceOwnerForgetLock(owner, 
locallock);
-                                       /* compact out unused slot */
-                                       locallock->numLockOwners--;
-                                       if (i < locallock->numLockOwners)
-                                               lockOwners[i] = 
lockOwners[locallock->numLockOwners];
-                               }
-                               break;
+                               dlist_delete(&locallockowner->locallock_node);
+
+                               if (owner != NULL)
+                                       ResourceOwnerForgetLock(locallockowner);
+                               else
+                                       
dlist_delete(&locallockowner->resowner_node);
                        }
+
+                       Assert(locallockowner->nLocks >= 0);
                }
-               if (i < 0)
+
+               if (!found)
                {
                        /* don't release a lock belonging to another owner */
                        elog(WARNING, "you don't own a lock of type %s",
@@ -2060,6 +2066,8 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, 
bool sessionLock)
        if (locallock->nLocks > 0)
                return true;
 
+       Assert(locallock->nLocks >= 0);
+
        /*
         * At this point we can no longer suppose we are clear of invalidation
         * messages related to this lock.  Although we'll delete the LOCALLOCK
@@ -2162,274 +2170,44 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, 
bool sessionLock)
        return true;
 }
 
+#ifdef USE_ASSERT_CHECKING
 /*
- * LockReleaseAll -- Release all locks of the specified lock method that
- *             are held by the current process.
- *
- * Well, not necessarily *all* locks.  The available behaviors are:
- *             allLocks == true: release all locks including session locks.
- *             allLocks == false: release all non-session locks.
+ * LockAssertNoneHeld -- Assert that we no longer hold any DEFAULT_LOCKMETHOD
+ * locks during an abort.
  */
-void
-LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
+extern void
+LockAssertNoneHeld(bool isCommit)
 {
        HASH_SEQ_STATUS status;
-       LockMethod      lockMethodTable;
-       int                     i,
-                               numLockModes;
        LOCALLOCK  *locallock;
-       LOCK       *lock;
-       int                     partition;
-       bool            have_fast_path_lwlock = false;
-
-       if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
-               elog(ERROR, "unrecognized lock method: %d", lockmethodid);
-       lockMethodTable = LockMethods[lockmethodid];
-
-#ifdef LOCK_DEBUG
-       if (*(lockMethodTable->trace_flag))
-               elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
-#endif
-
-       /*
-        * Get rid of our fast-path VXID lock, if appropriate.  Note that this 
is
-        * the only way that the lock we hold on our own VXID can ever get
-        * released: it is always and only released when a toplevel transaction
-        * ends.
-        */
-       if (lockmethodid == DEFAULT_LOCKMETHOD)
-               VirtualXactLockTableCleanup();
 
-       numLockModes = lockMethodTable->numLockModes;
-
-       /*
-        * First we run through the locallock table and get rid of unwanted
-        * entries, then we scan the process's proclocks and get rid of those. 
We
-        * do this separately because we may have multiple locallock entries
-        * pointing to the same proclock, and we daren't end up with any 
dangling
-        * pointers.  Fast-path locks are cleaned up during the locallock table
-        * scan, though.
-        */
-       hash_seq_init(&status, LockMethodLocalHash);
-
-       while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+       if (!isCommit)
        {
-               /*
-                * If the LOCALLOCK entry is unused, we must've run out of 
shared
-                * memory while trying to set up this lock.  Just forget the 
local
-                * entry.
-                */
-               if (locallock->nLocks == 0)
-               {
-                       RemoveLocalLock(locallock);
-                       continue;
-               }
-
-               /* Ignore items that are not of the lockmethod to be removed */
-               if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
-                       continue;
+               hash_seq_init(&status, LockMethodLocalHash);
 
-               /*
-                * If we are asked to release all locks, we can just zap the 
entry.
-                * Otherwise, must scan to see if there are session locks. We 
assume
-                * there is at most one lockOwners entry for session locks.
-                */
-               if (!allLocks)
+               while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != 
NULL)
                {
-                       LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
+                       dlist_iter      local_iter;
 
-                       /* If session lock is above array position 0, move it 
down to 0 */
-                       for (i = 0; i < locallock->numLockOwners; i++)
-                       {
-                               if (lockOwners[i].owner == NULL)
-                                       lockOwners[0] = lockOwners[i];
-                               else
-                                       
ResourceOwnerForgetLock(lockOwners[i].owner, locallock);
-                       }
+                       Assert(locallock->nLocks >= 0);
 
-                       if (locallock->numLockOwners > 0 &&
-                               lockOwners[0].owner == NULL &&
-                               lockOwners[0].nLocks > 0)
+                       dlist_foreach(local_iter, &locallock->locallockowners)
                        {
-                               /* Fix the locallock to show just the session 
locks */
-                               locallock->nLocks = lockOwners[0].nLocks;
-                               locallock->numLockOwners = 1;
-                               /* We aren't deleting this locallock, so done */
-                               continue;
-                       }
-                       else
-                               locallock->numLockOwners = 0;
-               }
-
-               /*
-                * If the lock or proclock pointers are NULL, this lock was 
taken via
-                * the relation fast-path (and is not known to have been 
transferred).
-                */
-               if (locallock->proclock == NULL || locallock->lock == NULL)
-               {
-                       LOCKMODE        lockmode = locallock->tag.mode;
-                       Oid                     relid;
+                               LOCALLOCKOWNER *locallockowner = 
dlist_container(LOCALLOCKOWNER,
+                                                                               
                                                 locallock_node,
+                                                                               
                                                 local_iter.cur);
 
-                       /* Verify that a fast-path lock is what we've got. */
-                       if (!EligibleForRelationFastPath(&locallock->tag.lock, 
lockmode))
-                               elog(PANIC, "locallock table corrupted");
+                               Assert(locallockowner->owner == NULL);
 
-                       /*
-                        * If we don't currently hold the LWLock that protects 
our
-                        * fast-path data structures, we must acquire it before 
attempting
-                        * to release the lock via the fast-path.  We will 
continue to
-                        * hold the LWLock until we're done scanning the 
locallock table,
-                        * unless we hit a transferred fast-path lock.  (XXX is 
this
-                        * really such a good idea?  There could be a lot of 
entries ...)
-                        */
-                       if (!have_fast_path_lwlock)
-                       {
-                               LWLockAcquire(&MyProc->fpInfoLock, 
LW_EXCLUSIVE);
-                               have_fast_path_lwlock = true;
+                               if (locallockowner->nLocks > 0 &&
+                                       LOCALLOCK_LOCKMETHOD(*locallock) == 
DEFAULT_LOCKMETHOD)
+                                       Assert(false);
                        }
-
-                       /* Attempt fast-path release. */
-                       relid = locallock->tag.lock.locktag_field2;
-                       if (FastPathUnGrantRelationLock(relid, lockmode))
-                       {
-                               RemoveLocalLock(locallock);
-                               continue;
-                       }
-
-                       /*
-                        * Our lock, originally taken via the fast path, has 
been
-                        * transferred to the main lock table.  That's going to 
require
-                        * some extra work, so release our fast-path lock 
before starting.
-                        */
-                       LWLockRelease(&MyProc->fpInfoLock);
-                       have_fast_path_lwlock = false;
-
-                       /*
-                        * Now dump the lock.  We haven't got a pointer to the 
LOCK or
-                        * PROCLOCK in this case, so we have to handle this a 
bit
-                        * differently than a normal lock release.  
Unfortunately, this
-                        * requires an extra LWLock acquire-and-release cycle 
on the
-                        * partitionLock, but hopefully it shouldn't happen 
often.
-                        */
-                       LockRefindAndRelease(lockMethodTable, MyProc,
-                                                                
&locallock->tag.lock, lockmode, false);
-                       RemoveLocalLock(locallock);
-                       continue;
                }
-
-               /* Mark the proclock to show we need to release this lockmode */
-               if (locallock->nLocks > 0)
-                       locallock->proclock->releaseMask |= 
LOCKBIT_ON(locallock->tag.mode);
-
-               /* And remove the locallock hashtable entry */
-               RemoveLocalLock(locallock);
        }
-
-       /* Done with the fast-path data structures */
-       if (have_fast_path_lwlock)
-               LWLockRelease(&MyProc->fpInfoLock);
-
-       /*
-        * Now, scan each lock partition separately.
-        */
-       for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
-       {
-               LWLock     *partitionLock;
-               dlist_head *procLocks = &MyProc->myProcLocks[partition];
-               dlist_mutable_iter proclock_iter;
-
-               partitionLock = LockHashPartitionLockByIndex(partition);
-
-               /*
-                * If the proclock list for this partition is empty, we can skip
-                * acquiring the partition lock.  This optimization is trickier 
than
-                * it looks, because another backend could be in process of 
adding
-                * something to our proclock list due to promoting one of our
-                * fast-path locks.  However, any such lock must be one that we
-                * decided not to delete above, so it's okay to skip it again 
now;
-                * we'd just decide not to delete it again.  We must, however, 
be
-                * careful to re-fetch the list header once we've acquired the
-                * partition lock, to be sure we have a valid, up-to-date 
pointer.
-                * (There is probably no significant risk if pointer 
fetch/store is
-                * atomic, but we don't wish to assume that.)
-                *
-                * XXX This argument assumes that the locallock table correctly
-                * represents all of our fast-path locks.  While allLocks mode
-                * guarantees to clean up all of our normal locks regardless of 
the
-                * locallock situation, we lose that guarantee for fast-path 
locks.
-                * This is not ideal.
-                */
-               if (dlist_is_empty(procLocks))
-                       continue;                       /* needn't examine this 
partition */
-
-               LWLockAcquire(partitionLock, LW_EXCLUSIVE);
-
-               dlist_foreach_modify(proclock_iter, procLocks)
-               {
-                       PROCLOCK   *proclock = dlist_container(PROCLOCK, 
procLink, proclock_iter.cur);
-                       bool            wakeupNeeded = false;
-
-                       Assert(proclock->tag.myProc == MyProc);
-
-                       lock = proclock->tag.myLock;
-
-                       /* Ignore items that are not of the lockmethod to be 
removed */
-                       if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
-                               continue;
-
-                       /*
-                        * In allLocks mode, force release of all locks even if 
locallock
-                        * table had problems
-                        */
-                       if (allLocks)
-                               proclock->releaseMask = proclock->holdMask;
-                       else
-                               Assert((proclock->releaseMask & 
~proclock->holdMask) == 0);
-
-                       /*
-                        * Ignore items that have nothing to be released, 
unless they have
-                        * holdMask == 0 and are therefore recyclable
-                        */
-                       if (proclock->releaseMask == 0 && proclock->holdMask != 
0)
-                               continue;
-
-                       PROCLOCK_PRINT("LockReleaseAll", proclock);
-                       LOCK_PRINT("LockReleaseAll", lock, 0);
-                       Assert(lock->nRequested >= 0);
-                       Assert(lock->nGranted >= 0);
-                       Assert(lock->nGranted <= lock->nRequested);
-                       Assert((proclock->holdMask & ~lock->grantMask) == 0);
-
-                       /*
-                        * Release the previously-marked lock modes
-                        */
-                       for (i = 1; i <= numLockModes; i++)
-                       {
-                               if (proclock->releaseMask & LOCKBIT_ON(i))
-                                       wakeupNeeded |= UnGrantLock(lock, i, 
proclock,
-                                                                               
                lockMethodTable);
-                       }
-                       Assert((lock->nRequested >= 0) && (lock->nGranted >= 
0));
-                       Assert(lock->nGranted <= lock->nRequested);
-                       LOCK_PRINT("LockReleaseAll: updated", lock, 0);
-
-                       proclock->releaseMask = 0;
-
-                       /* CleanUpLock will wake up waiters if needed. */
-                       CleanUpLock(lock, proclock,
-                                               lockMethodTable,
-                                               LockTagHashCode(&lock->tag),
-                                               wakeupNeeded);
-               }                                               /* loop over 
PROCLOCKs within this partition */
-
-               LWLockRelease(partitionLock);
-       }                                                       /* loop over 
partitions */
-
-#ifdef LOCK_DEBUG
-       if (*(lockMethodTable->trace_flag))
-               elog(LOG, "LockReleaseAll done");
-#endif
+       Assert(MyProc->fpLockBits == 0);
 }
+#endif
 
 /*
  * LockReleaseSession -- Release all session locks of the specified lock method
@@ -2438,59 +2216,41 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
 void
 LockReleaseSession(LOCKMETHODID lockmethodid)
 {
-       HASH_SEQ_STATUS status;
-       LOCALLOCK  *locallock;
+       dlist_mutable_iter iter;
 
        if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
                elog(ERROR, "unrecognized lock method: %d", lockmethodid);
 
-       hash_seq_init(&status, LockMethodLocalHash);
-
-       while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+       dlist_foreach_modify(iter, &session_locks[lockmethodid - 1])
        {
-               /* Ignore items that are not of the specified lock method */
-               if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
-                       continue;
+               LOCALLOCKOWNER *locallockowner = 
dlist_container(LOCALLOCKOWNER, resowner_node, iter.cur);
 
-               ReleaseLockIfHeld(locallock, true);
+               Assert(LOCALLOCK_LOCKMETHOD(*locallockowner->locallock) == 
lockmethodid);
+
+               ReleaseLockIfHeld(locallockowner, true);
        }
+
+       Assert(dlist_is_empty(&session_locks[lockmethodid - 1]));
 }
 
 /*
  * LockReleaseCurrentOwner
- *             Release all locks belonging to CurrentResourceOwner
- *
- * If the caller knows what those locks are, it can pass them as an array.
- * That speeds up the call significantly, when a lot of locks are held.
- * Otherwise, pass NULL for locallocks, and we'll traverse through our hash
- * table to find them.
+ *             Release all locks belonging to 'owner'
  */
 void
-LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks)
+LockReleaseCurrentOwner(ResourceOwner owner, dlist_node *resowner_node)
 {
-       if (locallocks == NULL)
-       {
-               HASH_SEQ_STATUS status;
-               LOCALLOCK  *locallock;
+       LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER, 
resowner_node, resowner_node);
 
-               hash_seq_init(&status, LockMethodLocalHash);
+       Assert(locallockowner->owner == owner);
 
-               while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != 
NULL)
-                       ReleaseLockIfHeld(locallock, false);
-       }
-       else
-       {
-               int                     i;
-
-               for (i = nlocks - 1; i >= 0; i--)
-                       ReleaseLockIfHeld(locallocks[i], false);
-       }
+       ReleaseLockIfHeld(locallockowner, false);
 }
 
 /*
  * ReleaseLockIfHeld
- *             Release any session-level locks on this lockable object if 
sessionLock
- *             is true; else, release any locks held by CurrentResourceOwner.
+ *             Release any session-level locks on this 'locallockowner' if
+ *             sessionLock is true; else, release any locks held by 
'locallockowner'.
  *
  * It is tempting to pass this a ResourceOwner pointer (or NULL for session
  * locks), but without refactoring LockRelease() we cannot support releasing
@@ -2501,52 +2261,39 @@ LockReleaseCurrentOwner(LOCALLOCK **locallocks, int 
nlocks)
  * convenience.
  */
 static void
-ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock)
+ReleaseLockIfHeld(LOCALLOCKOWNER *locallockowner, bool sessionLock)
 {
-       ResourceOwner owner;
-       LOCALLOCKOWNER *lockOwners;
-       int                     i;
+       LOCALLOCK  *locallock = locallockowner->locallock;
+
+       /* release all references to the lock by this resource owner */
 
-       /* Identify owner for lock (must match LockRelease!) */
        if (sessionLock)
-               owner = NULL;
+               Assert(locallockowner->owner == NULL);
        else
-               owner = CurrentResourceOwner;
+               Assert(locallockowner->owner != NULL);
 
-       /* Scan to see if there are any locks belonging to the target owner */
-       lockOwners = locallock->lockOwners;
-       for (i = locallock->numLockOwners - 1; i >= 0; i--)
+       /* We will still hold this lock after forgetting this ResourceOwner. */
+       if (locallockowner->nLocks < locallock->nLocks)
        {
-               if (lockOwners[i].owner == owner)
-               {
-                       Assert(lockOwners[i].nLocks > 0);
-                       if (lockOwners[i].nLocks < locallock->nLocks)
-                       {
-                               /*
-                                * We will still hold this lock after 
forgetting this
-                                * ResourceOwner.
-                                */
-                               locallock->nLocks -= lockOwners[i].nLocks;
-                               /* compact out unused slot */
-                               locallock->numLockOwners--;
-                               if (owner != NULL)
-                                       ResourceOwnerForgetLock(owner, 
locallock);
-                               if (i < locallock->numLockOwners)
-                                       lockOwners[i] = 
lockOwners[locallock->numLockOwners];
-                       }
-                       else
-                       {
-                               Assert(lockOwners[i].nLocks == 
locallock->nLocks);
-                               /* We want to call LockRelease just once */
-                               lockOwners[i].nLocks = 1;
-                               locallock->nLocks = 1;
-                               if (!LockRelease(&locallock->tag.lock,
-                                                                
locallock->tag.mode,
-                                                                sessionLock))
-                                       elog(WARNING, "ReleaseLockIfHeld: 
failed??");
-                       }
-                       break;
-               }
+               locallock->nLocks -= locallockowner->nLocks;
+               dlist_delete(&locallockowner->locallock_node);
+
+               if (sessionLock)
+                       dlist_delete(&locallockowner->resowner_node);
+               else
+                       ResourceOwnerForgetLock(locallockowner);
+       }
+       else
+       {
+               Assert(locallockowner->nLocks == locallock->nLocks);
+               /* We want to call LockRelease just once */
+               locallockowner->nLocks = 1;
+               locallock->nLocks = 1;
+
+               if (!LockRelease(&locallock->tag.lock,
+                                                locallock->tag.mode,
+                                                sessionLock))
+                       elog(WARNING, "ReleaseLockIfHeld: failed??");
        }
 }
 
@@ -2561,75 +2308,48 @@ ReleaseLockIfHeld(LOCALLOCK *locallock, bool 
sessionLock)
  * and we'll traverse through our hash table to find them.
  */
 void
-LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks)
+LockReassignCurrentOwner(ResourceOwner owner, dlist_node *resowner_node)
 {
+       LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER,
+                                                                               
                         resowner_node,
+                                                                               
                         resowner_node);
        ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
 
-       Assert(parent != NULL);
-
-       if (locallocks == NULL)
-       {
-               HASH_SEQ_STATUS status;
-               LOCALLOCK  *locallock;
-
-               hash_seq_init(&status, LockMethodLocalHash);
-
-               while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != 
NULL)
-                       LockReassignOwner(locallock, parent);
-       }
-       else
-       {
-               int                     i;
-
-               for (i = nlocks - 1; i >= 0; i--)
-                       LockReassignOwner(locallocks[i], parent);
-       }
+       LockReassignOwner(locallockowner, parent);
 }
 
 /*
- * Subroutine of LockReassignCurrentOwner. Reassigns a given lock belonging to
- * CurrentResourceOwner to its parent.
+ * Subroutine of LockReassignCurrentOwner. Reassigns the given
+ *'locallockowner' to 'parent'.
  */
 static void
-LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent)
+LockReassignOwner(LOCALLOCKOWNER *locallockowner, ResourceOwner parent)
 {
-       LOCALLOCKOWNER *lockOwners;
-       int                     i;
-       int                     ic = -1;
-       int                     ip = -1;
+       dlist_iter      iter;
+       LOCALLOCK  *locallock = locallockowner->locallock;
 
-       /*
-        * Scan to see if there are any locks belonging to current owner or its
-        * parent
-        */
-       lockOwners = locallock->lockOwners;
-       for (i = locallock->numLockOwners - 1; i >= 0; i--)
+       ResourceOwnerForgetLock(locallockowner);
+
+       dlist_foreach(iter, &locallock->locallockowners)
        {
-               if (lockOwners[i].owner == CurrentResourceOwner)
-                       ic = i;
-               else if (lockOwners[i].owner == parent)
-                       ip = i;
-       }
+               LOCALLOCKOWNER *parentlocalowner = 
dlist_container(LOCALLOCKOWNER, locallock_node, iter.cur);
 
-       if (ic < 0)
-               return;                                 /* no current locks */
+               Assert(parentlocalowner->locallock == locallock);
 
-       if (ip < 0)
-       {
-               /* Parent has no slot, so just give it the child's slot */
-               lockOwners[ic].owner = parent;
-               ResourceOwnerRememberLock(parent, locallock);
-       }
-       else
-       {
-               /* Merge child's count with parent's */
-               lockOwners[ip].nLocks += lockOwners[ic].nLocks;
-               /* compact out unused slot */
-               locallock->numLockOwners--;
-               if (ic < locallock->numLockOwners)
-                       lockOwners[ic] = lockOwners[locallock->numLockOwners];
+               if (parentlocalowner->owner != parent)
+                       continue;
+
+               parentlocalowner->nLocks += locallockowner->nLocks;
+
+               locallockowner->nLocks = 0;
+               dlist_delete(&locallockowner->locallock_node);
+               pfree(locallockowner);
+               return;
        }
-       ResourceOwnerForgetLock(CurrentResourceOwner, locallock);
+
+       /* reassign locallockowner to parent resowner */
+       locallockowner->owner = parent;
+       ResourceOwnerRememberLock(parent, locallockowner);
 }
 
 /*
@@ -3101,7 +2821,7 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE 
lockmode, int *countp)
  * We currently use this in two situations: first, to release locks held by
  * prepared transactions on commit (see lock_twophase_postcommit); and second,
  * to release locks taken via the fast-path, transferred to the main hash
- * table, and then released (see LockReleaseAll).
+ * table, and then released (see ResourceOwnerRelease).
  */
 static void
 LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
@@ -3237,10 +2957,9 @@ CheckForSessionAndXactLocks(void)
 
        while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
        {
-               LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
                PerLockTagEntry *hentry;
                bool            found;
-               int                     i;
+               dlist_iter      iter;
 
                /*
                 * Ignore VXID locks.  We don't want those to be held by 
prepared
@@ -3261,9 +2980,13 @@ CheckForSessionAndXactLocks(void)
                        hentry->sessLock = hentry->xactLock = false;
 
                /* Scan to see if we hold lock at session or xact level or both 
*/
-               for (i = locallock->numLockOwners - 1; i >= 0; i--)
+               dlist_foreach(iter, &locallock->locallockowners)
                {
-                       if (lockOwners[i].owner == NULL)
+                       LOCALLOCKOWNER *locallockowner = 
dlist_container(LOCALLOCKOWNER,
+                                                                               
                                         locallock_node,
+                                                                               
                                         iter.cur);
+
+                       if (locallockowner->owner == NULL)
                                hentry->sessLock = true;
                        else
                                hentry->xactLock = true;
@@ -3310,10 +3033,9 @@ AtPrepare_Locks(void)
        while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
        {
                TwoPhaseLockRecord record;
-               LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
                bool            haveSessionLock;
                bool            haveXactLock;
-               int                     i;
+               dlist_iter      iter;
 
                /*
                 * Ignore VXID locks.  We don't want those to be held by 
prepared
@@ -3328,9 +3050,13 @@ AtPrepare_Locks(void)
 
                /* Scan to see whether we hold it at session or transaction 
level */
                haveSessionLock = haveXactLock = false;
-               for (i = locallock->numLockOwners - 1; i >= 0; i--)
+               dlist_foreach(iter, &locallock->locallockowners)
                {
-                       if (lockOwners[i].owner == NULL)
+                       LOCALLOCKOWNER *locallockowner = 
dlist_container(LOCALLOCKOWNER,
+                                                                               
                                         locallock_node,
+                                                                               
                                         iter.cur);
+
+                       if (locallockowner->owner == NULL)
                                haveSessionLock = true;
                        else
                                haveXactLock = true;
@@ -3388,8 +3114,8 @@ AtPrepare_Locks(void)
  * pointers in the transaction's resource owner.  This is OK at the
  * moment since resowner.c doesn't try to free locks retail at a toplevel
  * transaction commit or abort.  We could alternatively zero out nLocks
- * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll,
- * but that probably costs more cycles.
+ * and leave the LOCALLOCK entries to be garbage-collected by
+ * ResourceOwnerRelease, but that probably costs more cycles.
  */
 void
 PostPrepare_Locks(TransactionId xid)
@@ -3422,10 +3148,9 @@ PostPrepare_Locks(TransactionId xid)
 
        while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
        {
-               LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
                bool            haveSessionLock;
                bool            haveXactLock;
-               int                     i;
+               dlist_iter      iter;
 
                if (locallock->proclock == NULL || locallock->lock == NULL)
                {
@@ -3444,9 +3169,13 @@ PostPrepare_Locks(TransactionId xid)
 
                /* Scan to see whether we hold it at session or transaction 
level */
                haveSessionLock = haveXactLock = false;
-               for (i = locallock->numLockOwners - 1; i >= 0; i--)
+               dlist_foreach(iter, &locallock->locallockowners)
                {
-                       if (lockOwners[i].owner == NULL)
+                       LOCALLOCKOWNER *locallockowner = 
dlist_container(LOCALLOCKOWNER,
+                                                                               
                                         locallock_node,
+                                                                               
                                         iter.cur);
+
+                       if (locallockowner->owner == NULL)
                                haveSessionLock = true;
                        else
                                haveXactLock = true;
@@ -3462,10 +3191,6 @@ PostPrepare_Locks(TransactionId xid)
                                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                         errmsg("cannot PREPARE while holding 
both session-level and transaction-level locks on the same object")));
 
-               /* Mark the proclock to show we need to release this lockmode */
-               if (locallock->nLocks > 0)
-                       locallock->proclock->releaseMask |= 
LOCKBIT_ON(locallock->tag.mode);
-
                /* And remove the locallock hashtable entry */
                RemoveLocalLock(locallock);
        }
@@ -3483,11 +3208,7 @@ PostPrepare_Locks(TransactionId xid)
 
                /*
                 * If the proclock list for this partition is empty, we can skip
-                * acquiring the partition lock.  This optimization is safer 
than the
-                * situation in LockReleaseAll, because we got rid of any 
fast-path
-                * locks during AtPrepare_Locks, so there cannot be any case 
where
-                * another backend is adding something to our lists now.  For 
safety,
-                * though, we code this the same way as in LockReleaseAll.
+                * acquiring the partition lock.
                 */
                if (dlist_is_empty(procLocks))
                        continue;                       /* needn't examine this 
partition */
@@ -3513,14 +3234,6 @@ PostPrepare_Locks(TransactionId xid)
                        Assert(lock->nGranted <= lock->nRequested);
                        Assert((proclock->holdMask & ~lock->grantMask) == 0);
 
-                       /* Ignore it if nothing to release (must be a session 
lock) */
-                       if (proclock->releaseMask == 0)
-                               continue;
-
-                       /* Else we should be releasing all locks */
-                       if (proclock->releaseMask != proclock->holdMask)
-                               elog(PANIC, "we seem to have dropped a bit 
somewhere");
-
                        /*
                         * We cannot simply modify proclock->tag.myProc to 
reassign
                         * ownership of the lock, because that's part of the 
hash key and
@@ -4288,7 +4001,6 @@ lock_twophase_recover(TransactionId xid, uint16 info,
                Assert(proc->lockGroupLeader == NULL);
                proclock->groupLeader = proc;
                proclock->holdMask = 0;
-               proclock->releaseMask = 0;
                /* Add proclock to appropriate lists */
                dlist_push_tail(&lock->procLocks, &proclock->lockLink);
                dlist_push_tail(&proc->myProcLocks[partition],
@@ -4425,7 +4137,7 @@ lock_twophase_postabort(TransactionId xid, uint16 info,
  *
  *             We don't bother recording this lock in the local lock table, 
since it's
  *             only ever released at the end of a transaction.  Instead,
- *             LockReleaseAll() calls VirtualXactLockTableCleanup().
+ *             ProcReleaseLocks() calls VirtualXactLockTableCleanup().
  */
 void
 VirtualXactLockTableInsert(VirtualTransactionId vxid)
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 22b4278610..1addef790a 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -777,10 +777,17 @@ ProcReleaseLocks(bool isCommit)
                return;
        /* If waiting, get off wait queue (should only be needed after error) */
        LockErrorCleanup();
-       /* Release standard locks, including session-level if aborting */
-       LockReleaseAll(DEFAULT_LOCKMETHOD, !isCommit);
-       /* Release transaction-level advisory locks */
-       LockReleaseAll(USER_LOCKMETHOD, false);
+
+       VirtualXactLockTableCleanup();
+
+       /* Release session-level locks if aborting */
+       if (!isCommit)
+               LockReleaseSession(DEFAULT_LOCKMETHOD);
+
+#ifdef USE_ASSERT_CHECKING
+       /* Ensure all locks were released */
+       LockAssertNoneHeld(isCommit);
+#endif
 }
 
 
@@ -861,6 +868,8 @@ ProcKill(int code, Datum arg)
                LWLockRelease(leader_lwlock);
        }
 
+       Assert(MyProc->fpLockBits == 0);
+
        /*
         * Reset MyLatch to the process local one.  This is so that signal
         * handlers et al can continue using the latch after the shared latch
diff --git a/src/backend/utils/init/postinit.c 
b/src/backend/utils/init/postinit.c
index 2f07ca7a0e..0547e3d076 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -1325,10 +1325,10 @@ ShutdownPostgres(int code, Datum arg)
        AbortOutOfAnyTransaction();
 
        /*
-        * User locks are not released by transaction end, so be sure to release
-        * them explicitly.
+        * Session locks are not released by transaction end, so be sure to
+        * release them explicitly.
         */
-       LockReleaseAll(USER_LOCKMETHOD, true);
+       LockReleaseSession(USER_LOCKMETHOD);
 }
 
 
diff --git a/src/backend/utils/resowner/resowner.c 
b/src/backend/utils/resowner/resowner.c
index 19b6241e45..321ea15c78 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -33,6 +33,7 @@
 #include "utils/resowner_private.h"
 #include "utils/snapmgr.h"
 
+#include "lib/ilist.h"
 
 /*
  * All resource IDs managed by this code are required to fit into a Datum,
@@ -91,24 +92,6 @@ typedef struct ResourceArray
 #define RESARRAY_MAX_ITEMS(capacity) \
        ((capacity) <= RESARRAY_MAX_ARRAY ? (capacity) : (capacity)/4 * 3)
 
-/*
- * To speed up bulk releasing or reassigning locks from a resource owner to
- * its parent, each resource owner has a small cache of locks it owns. The
- * lock manager has the same information in its local lock hash table, and
- * we fall back on that if cache overflows, but traversing the hash table
- * is slower when there are a lot of locks belonging to other resource owners.
- *
- * MAX_RESOWNER_LOCKS is the size of the per-resource owner cache. It's
- * chosen based on some testing with pg_dump with a large schema. When the
- * tests were done (on 9.2), resource owners in a pg_dump run contained up
- * to 9 locks, regardless of the schema size, except for the top resource
- * owner which contained much more (overflowing the cache). 15 seems like a
- * nice round number that's somewhat higher than what pg_dump needs. Note that
- * making this number larger is not free - the bigger the cache, the slower
- * it is to release locks (in retail), when a resource owner holds many locks.
- */
-#define MAX_RESOWNER_LOCKS 15
-
 /*
  * ResourceOwner objects look like this
  */
@@ -133,9 +116,7 @@ typedef struct ResourceOwnerData
        ResourceArray cryptohasharr;    /* cryptohash contexts */
        ResourceArray hmacarr;          /* HMAC contexts */
 
-       /* We can remember up to MAX_RESOWNER_LOCKS references to local locks. 
*/
-       int                     nlocks;                 /* number of owned 
locks */
-       LOCALLOCK  *locks[MAX_RESOWNER_LOCKS];  /* list of owned locks */
+       dlist_head      locks;                  /* dlist of owned locks */
 }                      ResourceOwnerData;
 
 
@@ -452,6 +433,7 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name)
        ResourceArrayInit(&(owner->jitarr), PointerGetDatum(NULL));
        ResourceArrayInit(&(owner->cryptohasharr), PointerGetDatum(NULL));
        ResourceArrayInit(&(owner->hmacarr), PointerGetDatum(NULL));
+       dlist_init(&owner->locks);
 
        return owner;
 }
@@ -586,8 +568,15 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
        }
        else if (phase == RESOURCE_RELEASE_LOCKS)
        {
+               dlist_mutable_iter iter;
+
                if (isTopLevel)
                {
+                       dlist_foreach_modify(iter, &owner->locks)
+                               LockReleaseCurrentOwner(owner, iter.cur);
+
+                       Assert(dlist_is_empty(&owner->locks));
+
                        /*
                         * For a top-level xact we are going to release all 
locks (or at
                         * least all non-session locks), so just do a single 
lmgr call at
@@ -606,30 +595,20 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
                         * subtransaction, we do NOT release its locks yet, but 
transfer
                         * them to the parent.
                         */
-                       LOCALLOCK **locks;
-                       int                     nlocks;
-
-                       Assert(owner->parent != NULL);
-
-                       /*
-                        * Pass the list of locks owned by this resource owner 
to the lock
-                        * manager, unless it has overflowed.
-                        */
-                       if (owner->nlocks > MAX_RESOWNER_LOCKS)
+                       if (isCommit)
                        {
-                               locks = NULL;
-                               nlocks = 0;
+                               dlist_foreach_modify(iter, &owner->locks)
+                                       LockReassignCurrentOwner(owner, 
iter.cur);
+
+                               Assert(dlist_is_empty(&owner->locks));
                        }
                        else
                        {
-                               locks = owner->locks;
-                               nlocks = owner->nlocks;
-                       }
+                               dlist_foreach_modify(iter, &owner->locks)
+                                       LockReleaseCurrentOwner(owner, 
iter.cur);
 
-                       if (isCommit)
-                               LockReassignCurrentOwner(locks, nlocks);
-                       else
-                               LockReleaseCurrentOwner(locks, nlocks);
+                               Assert(dlist_is_empty(&owner->locks));
+                       }
                }
        }
        else if (phase == RESOURCE_RELEASE_AFTER_LOCKS)
@@ -757,7 +736,7 @@ ResourceOwnerDelete(ResourceOwner owner)
        Assert(owner->jitarr.nitems == 0);
        Assert(owner->cryptohasharr.nitems == 0);
        Assert(owner->hmacarr.nitems == 0);
-       Assert(owner->nlocks == 0 || owner->nlocks == MAX_RESOWNER_LOCKS + 1);
+       Assert(dlist_is_empty(&owner->locks));
 
        /*
         * Delete children.  The recursive call will delink the child from me, 
so
@@ -978,54 +957,61 @@ ResourceOwnerForgetBuffer(ResourceOwner owner, Buffer 
buffer)
 
 /*
  * Remember that a Local Lock is owned by a ResourceOwner
- *
- * This is different from the other Remember functions in that the list of
- * locks is only a lossy cache. It can hold up to MAX_RESOWNER_LOCKS entries,
- * and when it overflows, we stop tracking locks. The point of only remembering
- * only up to MAX_RESOWNER_LOCKS entries is that if a lot of locks are held,
- * ResourceOwnerForgetLock doesn't need to scan through a large array to find
- * the entry.
  */
 void
-ResourceOwnerRememberLock(ResourceOwner owner, LOCALLOCK *locallock)
+ResourceOwnerRememberLock(ResourceOwner owner, LOCALLOCKOWNER *locallockowner)
 {
-       Assert(locallock != NULL);
-
-       if (owner->nlocks > MAX_RESOWNER_LOCKS)
-               return;                                 /* we have already 
overflowed */
+       Assert(owner != NULL);
+       Assert(locallockowner != NULL);
 
-       if (owner->nlocks < MAX_RESOWNER_LOCKS)
-               owner->locks[owner->nlocks] = locallock;
-       else
+#ifdef USE_ASSERT_CHECKING
        {
-               /* overflowed */
+               dlist_iter      iter;
+
+               dlist_foreach(iter, &owner->locks)
+               {
+                       LOCALLOCKOWNER *i = dlist_container(LOCALLOCKOWNER, 
resowner_node, iter.cur);
+
+                       Assert(i->locallock != locallockowner->locallock);
+               }
        }
-       owner->nlocks++;
+#endif
+
+       dlist_push_tail(&owner->locks, &locallockowner->resowner_node);
 }
 
 /*
- * Forget that a Local Lock is owned by a ResourceOwner
+ * Forget that a Local Lock is owned by the given LOCALLOCKOWNER.
  */
 void
-ResourceOwnerForgetLock(ResourceOwner owner, LOCALLOCK *locallock)
+ResourceOwnerForgetLock(LOCALLOCKOWNER *locallockowner)
 {
-       int                     i;
+#ifdef USE_ASSERT_CHECKING
+       ResourceOwner owner;
 
-       if (owner->nlocks > MAX_RESOWNER_LOCKS)
-               return;                                 /* we have overflowed */
+       Assert(locallockowner != NULL);
+
+       owner = locallockowner->owner;
 
-       Assert(owner->nlocks > 0);
-       for (i = owner->nlocks - 1; i >= 0; i--)
        {
-               if (locallock == owner->locks[i])
+               dlist_iter      iter;
+               bool            found = false;
+
+               dlist_foreach(iter, &owner->locks)
                {
-                       owner->locks[i] = owner->locks[owner->nlocks - 1];
-                       owner->nlocks--;
-                       return;
+                       LOCALLOCKOWNER *owner = dlist_container(LOCALLOCKOWNER, 
resowner_node, iter.cur);
+
+                       if (locallockowner == owner)
+                       {
+                               Assert(!found);
+                               found = true;
+                       }
                }
+
+               Assert(found);
        }
-       elog(ERROR, "lock reference %p is not owned by resource owner %s",
-                locallock, owner->name);
+#endif
+       dlist_delete(&locallockowner->resowner_node);
 }
 
 /*
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 6ae434596a..f2617f805e 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -24,6 +24,7 @@
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "utils/timestamp.h"
+#include "lib/ilist.h"
 
 /* struct PGPROC is declared in proc.h, but must forward-reference it */
 typedef struct PGPROC PGPROC;
@@ -349,10 +350,6 @@ typedef struct LOCK
  * Otherwise, proclock objects whose holdMasks are zero are recycled
  * as soon as convenient.
  *
- * releaseMask is workspace for LockReleaseAll(): it shows the locks due
- * to be released during the current call.  This must only be examined or
- * set by the backend owning the PROCLOCK.
- *
  * Each PROCLOCK object is linked into lists for both the associated LOCK
  * object and the owning PGPROC object.  Note that the PROCLOCK is entered
  * into these lists as soon as it is created, even if no lock has yet been
@@ -374,7 +371,6 @@ typedef struct PROCLOCK
        /* data */
        PGPROC     *groupLeader;        /* proc's lock group leader, or proc 
itself */
        LOCKMASK        holdMask;               /* bitmask for lock types 
currently held */
-       LOCKMASK        releaseMask;    /* bitmask for lock types to be 
released */
        dlist_node      lockLink;               /* list link in LOCK's list of 
proclocks */
        dlist_node      procLink;               /* list link in PGPROC's list 
of proclocks */
 } PROCLOCK;
@@ -420,6 +416,13 @@ typedef struct LOCALLOCKOWNER
         * Must use a forward struct reference to avoid circularity.
         */
        struct ResourceOwnerData *owner;
+
+       dlist_node      resowner_node;
+
+       dlist_node      locallock_node;
+
+       struct LOCALLOCK *locallock;
+
        int64           nLocks;                 /* # of times held by this 
owner */
 } LOCALLOCKOWNER;
 
@@ -433,9 +436,9 @@ typedef struct LOCALLOCK
        LOCK       *lock;                       /* associated LOCK object, if 
any */
        PROCLOCK   *proclock;           /* associated PROCLOCK object, if any */
        int64           nLocks;                 /* total number of times lock 
is held */
-       int                     numLockOwners;  /* # of relevant ResourceOwners 
*/
-       int                     maxLockOwners;  /* allocated size of array */
-       LOCALLOCKOWNER *lockOwners; /* dynamically resizable array */
+
+       dlist_head      locallockowners;        /* dlist of LOCALLOCKOWNER */
+
        bool            holdsStrongLockCount;   /* bumped 
FastPathStrongRelationLocks */
        bool            lockCleared;    /* we read all sinval msgs for lock */
 } LOCALLOCK;
@@ -564,10 +567,17 @@ extern void AbortStrongLockAcquire(void);
 extern void MarkLockClear(LOCALLOCK *locallock);
 extern bool LockRelease(const LOCKTAG *locktag,
                                                LOCKMODE lockmode, bool 
sessionLock);
-extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
+
+#ifdef USE_ASSERT_CHECKING
+extern void LockAssertNoneHeld(bool isCommit);
+#endif
+
 extern void LockReleaseSession(LOCKMETHODID lockmethodid);
-extern void LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks);
-extern void LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks);
+struct ResourceOwnerData;
+extern void LockReleaseCurrentOwner(struct ResourceOwnerData *owner,
+                                                                       
dlist_node *resowner_node);
+extern void LockReassignCurrentOwner(struct ResourceOwnerData *owner,
+                                                                        
dlist_node *resowner_node);
 extern bool LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode);
 #ifdef USE_ASSERT_CHECKING
 extern HTAB *GetLockMethodLocalHash(void);
diff --git a/src/include/utils/resowner_private.h 
b/src/include/utils/resowner_private.h
index 1b1f3181b5..ad13f13401 100644
--- a/src/include/utils/resowner_private.h
+++ b/src/include/utils/resowner_private.h
@@ -31,8 +31,9 @@ extern void ResourceOwnerRememberBuffer(ResourceOwner owner, 
Buffer buffer);
 extern void ResourceOwnerForgetBuffer(ResourceOwner owner, Buffer buffer);
 
 /* support for local lock management */
-extern void ResourceOwnerRememberLock(ResourceOwner owner, LOCALLOCK 
*locallock);
-extern void ResourceOwnerForgetLock(ResourceOwner owner, LOCALLOCK *locallock);
+extern void ResourceOwnerRememberLock(ResourceOwner owner,
+                                                                         
LOCALLOCKOWNER *locallock);
+extern void ResourceOwnerForgetLock(LOCALLOCKOWNER *locallock);
 
 /* support for catcache refcount management */
 extern void ResourceOwnerEnlargeCatCacheRefs(ResourceOwner owner);
-- 
2.37.2

From 4bb7647efef51e53f35bb3a08a361f057ef27707 Mon Sep 17 00:00:00 2001
From: David Rowley <dgrow...@gmail.com>
Date: Fri, 10 Feb 2023 14:29:23 +1300
Subject: [PATCH v6 2/3] fixup! wip-resowner-lock-release-all.

---
 src/backend/replication/logical/launcher.c |  2 +-
 src/backend/storage/lmgr/lock.c            | 19 +++++--------------
 src/backend/utils/resowner/resowner.c      | 20 +++++++++++++++++---
 src/include/storage/lock.h                 | 11 +++++------
 4 files changed, 28 insertions(+), 24 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c 
b/src/backend/replication/logical/launcher.c
index 8998b55f62..8ba6a51945 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -798,7 +798,7 @@ logicalrep_worker_onexit(int code, Datum arg)
         * parallel apply mode and will not be released when the worker
         * terminates, so manually release all locks before the worker exits.
         */
-       LockReleaseSession(DEFAULT_LOCKMETHOD);
+       ProcReleaseLocks(false);
 
        ApplyLauncherWakeup();
 }
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 7e2dd3a7af..1033d57b88 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -2045,6 +2045,7 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, 
bool sessionLock)
                                        
dlist_delete(&locallockowner->resowner_node);
                        }
 
+                       /* ensure nLocks didn't go negative */
                        Assert(locallockowner->nLocks >= 0);
                }
 
@@ -2066,7 +2067,7 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, 
bool sessionLock)
        if (locallock->nLocks > 0)
                return true;
 
-       Assert(locallock->nLocks >= 0);
+       Assert(locallock->nLocks == 0);
 
        /*
         * At this point we can no longer suppose we are clear of invalidation
@@ -2175,7 +2176,7 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, 
bool sessionLock)
  * LockAssertNoneHeld -- Assert that we no longer hold any DEFAULT_LOCKMETHOD
  * locks during an abort.
  */
-extern void
+void
 LockAssertNoneHeld(bool isCommit)
 {
        HASH_SEQ_STATUS status;
@@ -2238,10 +2239,8 @@ LockReleaseSession(LOCKMETHODID lockmethodid)
  *             Release all locks belonging to 'owner'
  */
 void
-LockReleaseCurrentOwner(ResourceOwner owner, dlist_node *resowner_node)
+LockReleaseCurrentOwner(ResourceOwner owner, LOCALLOCKOWNER *locallockowner)
 {
-       LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER, 
resowner_node, resowner_node);
-
        Assert(locallockowner->owner == owner);
 
        ReleaseLockIfHeld(locallockowner, false);
@@ -2301,18 +2300,10 @@ ReleaseLockIfHeld(LOCALLOCKOWNER *locallockowner, bool 
sessionLock)
  * LockReassignCurrentOwner
  *             Reassign all locks belonging to CurrentResourceOwner to belong
  *             to its parent resource owner.
- *
- * If the caller knows what those locks are, it can pass them as an array.
- * That speeds up the call significantly, when a lot of locks are held
- * (e.g pg_dump with a large schema).  Otherwise, pass NULL for locallocks,
- * and we'll traverse through our hash table to find them.
  */
 void
-LockReassignCurrentOwner(ResourceOwner owner, dlist_node *resowner_node)
+LockReassignCurrentOwner(LOCALLOCKOWNER *locallockowner)
 {
-       LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER,
-                                                                               
                         resowner_node,
-                                                                               
                         resowner_node);
        ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
 
        LockReassignOwner(locallockowner, parent);
diff --git a/src/backend/utils/resowner/resowner.c 
b/src/backend/utils/resowner/resowner.c
index 321ea15c78..46a9a3ca42 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -573,7 +573,11 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
                if (isTopLevel)
                {
                        dlist_foreach_modify(iter, &owner->locks)
-                               LockReleaseCurrentOwner(owner, iter.cur);
+                       {
+                               LOCALLOCKOWNER *locallockowner = 
dlist_container(LOCALLOCKOWNER, resowner_node, iter.cur);
+
+                               LockReleaseCurrentOwner(owner, locallockowner);
+                       }
 
                        Assert(dlist_is_empty(&owner->locks));
 
@@ -598,14 +602,24 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
                        if (isCommit)
                        {
                                dlist_foreach_modify(iter, &owner->locks)
-                                       LockReassignCurrentOwner(owner, 
iter.cur);
+                               {
+                                       LOCALLOCKOWNER *locallockowner = 
dlist_container(LOCALLOCKOWNER,
+                                                                               
                                                         resowner_node,
+                                                                               
                                                         iter.cur);
+
+                                       
LockReassignCurrentOwner(locallockowner);
+                               }
 
                                Assert(dlist_is_empty(&owner->locks));
                        }
                        else
                        {
                                dlist_foreach_modify(iter, &owner->locks)
-                                       LockReleaseCurrentOwner(owner, 
iter.cur);
+                               {
+                                       LOCALLOCKOWNER *locallockowner = 
dlist_container(LOCALLOCKOWNER, resowner_node, iter.cur);
+
+                                       LockReleaseCurrentOwner(owner, 
locallockowner);
+                               }
 
                                Assert(dlist_is_empty(&owner->locks));
                        }
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index f2617f805e..e3861a8ea5 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -417,11 +417,11 @@ typedef struct LOCALLOCKOWNER
         */
        struct ResourceOwnerData *owner;
 
-       dlist_node      resowner_node;
+       dlist_node      resowner_node;  /* dlist link for ResourceOwner.locks */
 
-       dlist_node      locallock_node;
+       dlist_node      locallock_node; /* dlist link for 
LOCALLOCK.locallockowners */
 
-       struct LOCALLOCK *locallock;
+       struct LOCALLOCK *locallock;    /* pointer to the corresponding 
LOCALLOCK */
 
        int64           nLocks;                 /* # of times held by this 
owner */
 } LOCALLOCKOWNER;
@@ -575,9 +575,8 @@ extern void LockAssertNoneHeld(bool isCommit);
 extern void LockReleaseSession(LOCKMETHODID lockmethodid);
 struct ResourceOwnerData;
 extern void LockReleaseCurrentOwner(struct ResourceOwnerData *owner,
-                                                                       
dlist_node *resowner_node);
-extern void LockReassignCurrentOwner(struct ResourceOwnerData *owner,
-                                                                        
dlist_node *resowner_node);
+                                                                       
LOCALLOCKOWNER *locallockowner);
+extern void LockReassignCurrentOwner(LOCALLOCKOWNER *locallockowner);
 extern bool LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode);
 #ifdef USE_ASSERT_CHECKING
 extern HTAB *GetLockMethodLocalHash(void);
-- 
2.37.2

From c00e0f64278bc35745cb0bb11e95eb5349ba50c6 Mon Sep 17 00:00:00 2001
From: David Rowley <dgrow...@gmail.com>
Date: Fri, 10 Feb 2023 15:10:06 +1300
Subject: [PATCH v6 3/3] Use a slab context type for storage of LOCALLOCKOWNERs

---
 src/backend/storage/lmgr/lock.c | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 1033d57b88..803a5bb482 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -281,6 +281,8 @@ static HTAB *LockMethodLockHash;
 static HTAB *LockMethodProcLockHash;
 static HTAB *LockMethodLocalHash;
 
+/* A memory context for storing LOCALLOCKOWNER structs */
+MemoryContext LocalLockOwnerContext;
 
 /* private state for error cleanup */
 static LOCALLOCK *StrongLockInProgress;
@@ -483,6 +485,17 @@ InitLocks(void)
        /* Initialize each element of the session_locks array */
        for (int i = 0; i < lengthof(LockMethods); i++)
                dlist_init(&session_locks[i]);
+
+       /*
+        * Create a slab context for storing LOCALLOCKOWNERs.  Slab seems like a
+        * good context type for this as it will manage fragmentation better 
than
+        * aset.c contexts and it will free() excess memory rather than maintain
+        * excessively long freelists after a large surge in locking 
requirements.
+        */
+       LocalLockOwnerContext = SlabContextCreate(TopMemoryContext,
+                                                                               
          "LOCALLOCKOWNER context",
+                                                                               
          SLAB_DEFAULT_BLOCK_SIZE,
+                                                                               
          sizeof(LOCALLOCKOWNER));
 }
 
 
@@ -1688,7 +1701,7 @@ GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
                        return;
                }
        }
-       locallockowner = MemoryContextAlloc(TopMemoryContext, 
sizeof(LOCALLOCKOWNER));
+       locallockowner = MemoryContextAlloc(LocalLockOwnerContext, 
sizeof(LOCALLOCKOWNER));
        locallockowner->owner = owner;
        locallockowner->nLocks = 1;
        locallockowner->locallock = locallock;
-- 
2.37.2

Reply via email to