On Sat, 1 Jan 2022 at 15:40, Zhihong Yu <z...@yugabyte.com> wrote:
> +           locallock->nLocks -= locallockowner->nLocks;
> +           Assert(locallock->nLocks >= 0);
>
> I think the assertion is not needed since the above code is in if block :
>
> +       if (locallockowner->nLocks < locallock->nLocks)
>
> the condition, locallock->nLocks >= 0, would always hold after the 
> subtraction.

That makes sense. I've removed the Assert in the attached patch.
Thanks for looking at the patch

I've also spent a bit more time on the patch. There were quite a few
outdated comments remaining. Also, the PROCLOCK releaseMask field
appears to no longer be needed.

I also did a round of benchmarking on the attached patch using a very
recent master. I've attached .sql files and the script I used to
benchmark.

With 1024 partitions, lock1.sql shows about a 4.75% performance
increase. This would become larger with more partitions and less with
fewer partitions.
With the patch, lock2.sql about a 10% performance increase over master.
lock3.sql does not seem to have changed much and lock4.sql shows a
small regression with the patch of about 2.5%.

I'm not quite sure how worried we should be about lock4.sql slowing
down lightly. 2.5% is fairly small given how hard I'm exercising the
locking code in that test. There's also nothing much to say that the
slowdown is not just due to code alignment changes.

I also understand that Amit L is working on another patch that will
improve the situation for lock1.sql by not taking the locks on
relations that will be run-time pruned at executor startup.  I think
it's still worth solving this regardless of Amit's patch as with
current master we still have a situation where short fast queries
which access a small number of tables can become slower once the
backend has obtained a large number of locks concurrently and bloated
the locallocktable.

As for the patch. I feel it's a pretty invasive change to how we
release locks and the resowner code. I'd be quite happy for some
review of it.

Here are the full results as output by the attached script.

drowley@amd3990x:~$ echo master
master
drowley@amd3990x:~$ ./lockbench.sh
lock1.sql
tps = 38078.011433 (without initial connection time)
tps = 38070.016792 (without initial connection time)
tps = 39223.118769 (without initial connection time)
tps = 37510.105287 (without initial connection time)
tps = 38164.394128 (without initial connection time)
lock2.sql
tps = 247.963797 (without initial connection time)
tps = 247.374174 (without initial connection time)
tps = 248.412744 (without initial connection time)
tps = 248.192629 (without initial connection time)
tps = 248.503728 (without initial connection time)
lock3.sql
tps = 1162.937692 (without initial connection time)
tps = 1160.968689 (without initial connection time)
tps = 1166.908643 (without initial connection time)
tps = 1160.288547 (without initial connection time)
tps = 1160.336572 (without initial connection time)
lock4.sql
tps = 282.173560 (without initial connection time)
tps = 284.470330 (without initial connection time)
tps = 286.089644 (without initial connection time)
tps = 285.548487 (without initial connection time)
tps = 284.313505 (without initial connection time)


drowley@amd3990x:~$ echo Patched
Patched
drowley@amd3990x:~$ ./lockbench.sh
lock1.sql
tps = 40338.975219 (without initial connection time)
tps = 39803.433365 (without initial connection time)
tps = 39504.824194 (without initial connection time)
tps = 39843.422438 (without initial connection time)
tps = 40624.483013 (without initial connection time)
lock2.sql
tps = 274.413309 (without initial connection time)
tps = 271.978813 (without initial connection time)
tps = 275.795091 (without initial connection time)
tps = 273.628649 (without initial connection time)
tps = 273.049977 (without initial connection time)
lock3.sql
tps = 1168.557054 (without initial connection time)
tps = 1168.139469 (without initial connection time)
tps = 1166.366440 (without initial connection time)
tps = 1165.464214 (without initial connection time)
tps = 1167.250809 (without initial connection time)
lock4.sql
tps = 274.842298 (without initial connection time)
tps = 277.911394 (without initial connection time)
tps = 278.702620 (without initial connection time)
tps = 275.715606 (without initial connection time)
tps = 278.816060 (without initial connection time)

David
diff --git a/src/backend/commands/discard.c b/src/backend/commands/discard.c
index c583539e0c..2c43b9e0c8 100644
--- a/src/backend/commands/discard.c
+++ b/src/backend/commands/discard.c
@@ -71,7 +71,7 @@ DiscardAll(bool isTopLevel)
        ResetAllOptions();
        DropAllPreparedStatements();
        Async_UnlistenAll();
-       LockReleaseAll(USER_LOCKMETHOD, true);
+       LockReleaseSession(USER_LOCKMETHOD);
        ResetPlanCache();
        ResetTempTableNamespace();
        ResetSequenceCaches();
diff --git a/src/backend/storage/lmgr/README b/src/backend/storage/lmgr/README
index d08ec6c402..563ba681e5 100644
--- a/src/backend/storage/lmgr/README
+++ b/src/backend/storage/lmgr/README
@@ -182,12 +182,6 @@ holdMask -
     subset of the PGPROC object's heldLocks mask (if the PGPROC is
     currently waiting for another lock mode on this lock).
 
-releaseMask -
-    A bitmask for the lock modes due to be released during LockReleaseAll.
-    This must be a subset of the holdMask.  Note that it is modified without
-    taking the partition LWLock, and therefore it is unsafe for any
-    backend except the one owning the PROCLOCK to examine/change it.
-
 lockLink -
     List link for shared memory queue of all the PROCLOCK objects for the
     same LOCK.
@@ -321,8 +315,7 @@ and will notice any weak lock we take when it does.
 
 Fast-path VXID locks do not use the FastPathStrongRelationLocks table.  The
 first lock taken on a VXID is always the ExclusiveLock taken by its owner.
-Any subsequent lockers are share lockers waiting for the VXID to terminate.
-Indeed, the only reason VXID locks use the lock manager at all (rather than
+Any subsequent lockers are share lockers wait 
 waiting for the VXID to terminate via some other method) is for deadlock
 detection.  Thus, the initial VXID lock can *always* be taken via the fast
 path without checking for conflicts.  Any subsequent locker must check
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index ee2e15c17e..7bc33f476e 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -22,8 +22,7 @@
  *     Interface:
  *
  *     InitLocks(), GetLocksMethodTable(), GetLockTagsMethodTable(),
- *     LockAcquire(), LockRelease(), LockReleaseAll(),
- *     LockCheckConflicts(), GrantLock()
+ *     LockAcquire(), LockRelease(), LockCheckConflicts(), GrantLock()
  *
  *-------------------------------------------------------------------------
  */
@@ -289,6 +288,9 @@ static LOCALLOCK *awaitedLock;
 static ResourceOwner awaitedOwner;
 
 
+static dlist_head session_locks[lengthof(LockMethods)];
+
+
 #ifdef LOCK_DEBUG
 
 /*------
@@ -375,8 +377,9 @@ static void GrantLockLocal(LOCALLOCK *locallock, 
ResourceOwner owner);
 static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode);
 static void FinishStrongLockAcquire(void);
 static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
-static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock);
-static void LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent);
+static void ReleaseLockIfHeld(LOCALLOCKOWNER *locallockowner, bool 
sessionLock);
+static void LockReassignOwner(LOCALLOCKOWNER *locallockowner,
+                                                         ResourceOwner parent);
 static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
                                                PROCLOCK *proclock, LockMethod 
lockMethodTable);
 static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
@@ -477,6 +480,10 @@ InitLocks(void)
                                                                          16,
                                                                          &info,
                                                                          
HASH_ELEM | HASH_BLOBS);
+
+       /* Initialize each element of the session_locks array */
+       for (int i = 0; i < lengthof(LockMethods); i++)
+               dlist_init(&session_locks[i]);
 }
 
 
@@ -701,7 +708,7 @@ LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, 
bool sessionLock)
        {
                PROCLOCK_PRINT("LockHasWaiters: WRONGTYPE", proclock);
                LWLockRelease(partitionLock);
-               elog(WARNING, "you don't own a lock of type %s",
+               elog(PANIC, "you don't own a lock of type %s",
                         lockMethodTable->lockModeNames[lockmode]);
                RemoveLocalLock(locallock);
                return false;
@@ -839,26 +846,9 @@ LockAcquireExtended(const LOCKTAG *locktag,
                locallock->nLocks = 0;
                locallock->holdsStrongLockCount = false;
                locallock->lockCleared = false;
-               locallock->numLockOwners = 0;
-               locallock->maxLockOwners = 8;
-               locallock->lockOwners = NULL;   /* in case next line fails */
-               locallock->lockOwners = (LOCALLOCKOWNER *)
-                       MemoryContextAlloc(TopMemoryContext,
-                                                          
locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
+               dlist_init(&locallock->locallockowners);
        }
-       else
-       {
-               /* Make sure there will be room to remember the lock */
-               if (locallock->numLockOwners >= locallock->maxLockOwners)
-               {
-                       int                     newsize = 
locallock->maxLockOwners * 2;
 
-                       locallock->lockOwners = (LOCALLOCKOWNER *)
-                               repalloc(locallock->lockOwners,
-                                                newsize * 
sizeof(LOCALLOCKOWNER));
-                       locallock->maxLockOwners = newsize;
-               }
-       }
        hashcode = locallock->hashcode;
 
        if (locallockp)
@@ -1268,7 +1258,6 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
                proclock->groupLeader = proc->lockGroupLeader != NULL ?
                        proc->lockGroupLeader : proc;
                proclock->holdMask = 0;
-               proclock->releaseMask = 0;
                /* Add proclock to appropriate lists */
                SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
                SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
@@ -1366,17 +1355,18 @@ CheckAndSetLockHeld(LOCALLOCK *locallock, bool acquired)
 static void
 RemoveLocalLock(LOCALLOCK *locallock)
 {
-       int                     i;
+       dlist_mutable_iter iter;
 
-       for (i = locallock->numLockOwners - 1; i >= 0; i--)
+       dlist_foreach_modify(iter, &locallock->locallockowners)
        {
-               if (locallock->lockOwners[i].owner != NULL)
-                       ResourceOwnerForgetLock(locallock->lockOwners[i].owner, 
locallock);
+               LOCALLOCKOWNER *locallockowner = 
dlist_container(LOCALLOCKOWNER, locallock_node, iter.cur);
+
+               Assert(locallockowner->owner != NULL);
+               dlist_delete(&locallockowner->locallock_node);
+               ResourceOwnerForgetLock(locallockowner);
        }
-       locallock->numLockOwners = 0;
-       if (locallock->lockOwners != NULL)
-               pfree(locallock->lockOwners);
-       locallock->lockOwners = NULL;
+
+       Assert(dlist_is_empty(&locallock->locallockowners));
 
        if (locallock->holdsStrongLockCount)
        {
@@ -1394,7 +1384,7 @@ RemoveLocalLock(LOCALLOCK *locallock)
        if (!hash_search(LockMethodLocalHash,
                                         (void *) &(locallock->tag),
                                         HASH_REMOVE, NULL))
-               elog(WARNING, "locallock table corrupted");
+               elog(PANIC, "locallock table corrupted");
 
        /*
         * Indicate that the lock is released for certain types of locks
@@ -1688,26 +1678,40 @@ CleanUpLock(LOCK *lock, PROCLOCK *proclock,
 static void
 GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
 {
-       LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
-       int                     i;
+       LOCALLOCKOWNER *locallockowner;
+       dlist_iter iter;
 
-       Assert(locallock->numLockOwners < locallock->maxLockOwners);
        /* Count the total */
        locallock->nLocks++;
+
        /* Count the per-owner lock */
-       for (i = 0; i < locallock->numLockOwners; i++)
+       dlist_foreach(iter, &locallock->locallockowners)
        {
-               if (lockOwners[i].owner == owner)
+               locallockowner = dlist_container(LOCALLOCKOWNER, 
locallock_node, iter.cur);
+
+               if (locallockowner->owner == owner)
                {
-                       lockOwners[i].nLocks++;
+                       locallockowner->nLocks++;
                        return;
                }
        }
-       lockOwners[i].owner = owner;
-       lockOwners[i].nLocks = 1;
-       locallock->numLockOwners++;
+
+       locallockowner = MemoryContextAlloc(TopMemoryContext, 
sizeof(LOCALLOCKOWNER));
+       locallockowner->owner = owner;
+       locallockowner->nLocks = 1;
+       locallockowner->locallock = locallock;
+
+       dlist_push_tail(&locallock->locallockowners, 
&locallockowner->locallock_node);
+
        if (owner != NULL)
-               ResourceOwnerRememberLock(owner, locallock);
+               ResourceOwnerRememberLock(owner, locallockowner);
+       else
+       {
+               LOCKMETHODID lockmethodid = 
LOCALLOCK_LOCKMETHOD(*locallockowner->locallock);
+
+               Assert(lockmethodid > 0 && lockmethodid <= 2);
+               dlist_push_tail(&session_locks[lockmethodid - 1], 
&locallockowner->resowner_node);
+       }
 
        /* Indicate that the lock is acquired for certain types of locks. */
        CheckAndSetLockHeld(locallock, true);
@@ -2021,9 +2025,9 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, 
bool sessionLock)
         * Decrease the count for the resource owner.
         */
        {
-               LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
                ResourceOwner owner;
-               int                     i;
+               dlist_iter iter;
+               bool            found = false;
 
                /* Identify owner for lock */
                if (sessionLock)
@@ -2031,24 +2035,29 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, 
bool sessionLock)
                else
                        owner = CurrentResourceOwner;
 
-               for (i = locallock->numLockOwners - 1; i >= 0; i--)
+               dlist_foreach(iter, &locallock->locallockowners)
                {
-                       if (lockOwners[i].owner == owner)
+                       LOCALLOCKOWNER *locallockowner = 
dlist_container(LOCALLOCKOWNER, locallock_node, iter.cur);
+
+                       if (locallockowner->owner != owner)
+                               continue;
+
+                       found = true;
+
+                       if (--locallockowner->nLocks == 0)
                        {
-                               Assert(lockOwners[i].nLocks > 0);
-                               if (--lockOwners[i].nLocks == 0)
-                               {
-                                       if (owner != NULL)
-                                               ResourceOwnerForgetLock(owner, 
locallock);
-                                       /* compact out unused slot */
-                                       locallock->numLockOwners--;
-                                       if (i < locallock->numLockOwners)
-                                               lockOwners[i] = 
lockOwners[locallock->numLockOwners];
-                               }
-                               break;
+                               dlist_delete(&locallockowner->locallock_node);
+
+                               if (owner != NULL)
+                                       ResourceOwnerForgetLock(locallockowner);
+                               else
+                                       
dlist_delete(&locallockowner->resowner_node);
                        }
+
+                       Assert(locallockowner->nLocks >= 0);
                }
-               if (i < 0)
+
+               if (!found)
                {
                        /* don't release a lock belonging to another owner */
                        elog(WARNING, "you don't own a lock of type %s",
@@ -2066,6 +2075,8 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, 
bool sessionLock)
        if (locallock->nLocks > 0)
                return true;
 
+       Assert(locallock->nLocks >= 0);
+
        /*
         * At this point we can no longer suppose we are clear of invalidation
         * messages related to this lock.  Although we'll delete the LOCALLOCK
@@ -2147,7 +2158,7 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, 
bool sessionLock)
        {
                PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
                LWLockRelease(partitionLock);
-               elog(WARNING, "you don't own a lock of type %s",
+               elog(PANIC, "you don't own a lock of type %s",
                         lockMethodTable->lockModeNames[lockmode]);
                RemoveLocalLock(locallock);
                return false;
@@ -2168,283 +2179,44 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, 
bool sessionLock)
        return true;
 }
 
+#ifdef USE_ASSERT_CHECKING
 /*
- * LockReleaseAll -- Release all locks of the specified lock method that
- *             are held by the current process.
- *
- * Well, not necessarily *all* locks.  The available behaviors are:
- *             allLocks == true: release all locks including session locks.
- *             allLocks == false: release all non-session locks.
+ * LockAssertNoneHeld -- Assert that we no longer hold any DEFAULT_LOCKMETHOD
+ * locks during an abort.
  */
-void
-LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
+extern void
+LockAssertNoneHeld(bool isCommit)
 {
        HASH_SEQ_STATUS status;
-       LockMethod      lockMethodTable;
-       int                     i,
-                               numLockModes;
        LOCALLOCK  *locallock;
-       LOCK       *lock;
-       PROCLOCK   *proclock;
-       int                     partition;
-       bool            have_fast_path_lwlock = false;
-
-       if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
-               elog(ERROR, "unrecognized lock method: %d", lockmethodid);
-       lockMethodTable = LockMethods[lockmethodid];
-
-#ifdef LOCK_DEBUG
-       if (*(lockMethodTable->trace_flag))
-               elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
-#endif
-
-       /*
-        * Get rid of our fast-path VXID lock, if appropriate.  Note that this 
is
-        * the only way that the lock we hold on our own VXID can ever get
-        * released: it is always and only released when a toplevel transaction
-        * ends.
-        */
-       if (lockmethodid == DEFAULT_LOCKMETHOD)
-               VirtualXactLockTableCleanup();
-
-       numLockModes = lockMethodTable->numLockModes;
-
-       /*
-        * First we run through the locallock table and get rid of unwanted
-        * entries, then we scan the process's proclocks and get rid of those. 
We
-        * do this separately because we may have multiple locallock entries
-        * pointing to the same proclock, and we daren't end up with any 
dangling
-        * pointers.  Fast-path locks are cleaned up during the locallock table
-        * scan, though.
-        */
-       hash_seq_init(&status, LockMethodLocalHash);
 
-       while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+       if (!isCommit)
        {
-               /*
-                * If the LOCALLOCK entry is unused, we must've run out of 
shared
-                * memory while trying to set up this lock.  Just forget the 
local
-                * entry.
-                */
-               if (locallock->nLocks == 0)
-               {
-                       RemoveLocalLock(locallock);
-                       continue;
-               }
-
-               /* Ignore items that are not of the lockmethod to be removed */
-               if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
-                       continue;
+               hash_seq_init(&status, LockMethodLocalHash);
 
-               /*
-                * If we are asked to release all locks, we can just zap the 
entry.
-                * Otherwise, must scan to see if there are session locks. We 
assume
-                * there is at most one lockOwners entry for session locks.
-                */
-               if (!allLocks)
+               while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != 
NULL)
                {
-                       LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
+                       dlist_iter local_iter;
 
-                       /* If session lock is above array position 0, move it 
down to 0 */
-                       for (i = 0; i < locallock->numLockOwners; i++)
-                       {
-                               if (lockOwners[i].owner == NULL)
-                                       lockOwners[0] = lockOwners[i];
-                               else
-                                       
ResourceOwnerForgetLock(lockOwners[i].owner, locallock);
-                       }
+                       Assert(locallock->nLocks >= 0);
 
-                       if (locallock->numLockOwners > 0 &&
-                               lockOwners[0].owner == NULL &&
-                               lockOwners[0].nLocks > 0)
+                       dlist_foreach(local_iter, &locallock->locallockowners)
                        {
-                               /* Fix the locallock to show just the session 
locks */
-                               locallock->nLocks = lockOwners[0].nLocks;
-                               locallock->numLockOwners = 1;
-                               /* We aren't deleting this locallock, so done */
-                               continue;
-                       }
-                       else
-                               locallock->numLockOwners = 0;
-               }
-
-               /*
-                * If the lock or proclock pointers are NULL, this lock was 
taken via
-                * the relation fast-path (and is not known to have been 
transferred).
-                */
-               if (locallock->proclock == NULL || locallock->lock == NULL)
-               {
-                       LOCKMODE        lockmode = locallock->tag.mode;
-                       Oid                     relid;
+                               LOCALLOCKOWNER *locallockowner = 
dlist_container(LOCALLOCKOWNER,
+                                                                               
                                                 locallock_node,
+                                                                               
                                                 local_iter.cur);
 
-                       /* Verify that a fast-path lock is what we've got. */
-                       if (!EligibleForRelationFastPath(&locallock->tag.lock, 
lockmode))
-                               elog(PANIC, "locallock table corrupted");
-
-                       /*
-                        * If we don't currently hold the LWLock that protects 
our
-                        * fast-path data structures, we must acquire it before 
attempting
-                        * to release the lock via the fast-path.  We will 
continue to
-                        * hold the LWLock until we're done scanning the 
locallock table,
-                        * unless we hit a transferred fast-path lock.  (XXX is 
this
-                        * really such a good idea?  There could be a lot of 
entries ...)
-                        */
-                       if (!have_fast_path_lwlock)
-                       {
-                               LWLockAcquire(&MyProc->fpInfoLock, 
LW_EXCLUSIVE);
-                               have_fast_path_lwlock = true;
-                       }
+                               Assert(locallockowner->owner == NULL);
 
-                       /* Attempt fast-path release. */
-                       relid = locallock->tag.lock.locktag_field2;
-                       if (FastPathUnGrantRelationLock(relid, lockmode))
-                       {
-                               RemoveLocalLock(locallock);
-                               continue;
+                               if (locallockowner->nLocks > 0 &&
+                                       LOCALLOCK_LOCKMETHOD(*locallock) == 
DEFAULT_LOCKMETHOD)
+                                       Assert(false);
                        }
-
-                       /*
-                        * Our lock, originally taken via the fast path, has 
been
-                        * transferred to the main lock table.  That's going to 
require
-                        * some extra work, so release our fast-path lock 
before starting.
-                        */
-                       LWLockRelease(&MyProc->fpInfoLock);
-                       have_fast_path_lwlock = false;
-
-                       /*
-                        * Now dump the lock.  We haven't got a pointer to the 
LOCK or
-                        * PROCLOCK in this case, so we have to handle this a 
bit
-                        * differently than a normal lock release.  
Unfortunately, this
-                        * requires an extra LWLock acquire-and-release cycle 
on the
-                        * partitionLock, but hopefully it shouldn't happen 
often.
-                        */
-                       LockRefindAndRelease(lockMethodTable, MyProc,
-                                                                
&locallock->tag.lock, lockmode, false);
-                       RemoveLocalLock(locallock);
-                       continue;
                }
-
-               /* Mark the proclock to show we need to release this lockmode */
-               if (locallock->nLocks > 0)
-                       locallock->proclock->releaseMask |= 
LOCKBIT_ON(locallock->tag.mode);
-
-               /* And remove the locallock hashtable entry */
-               RemoveLocalLock(locallock);
        }
-
-       /* Done with the fast-path data structures */
-       if (have_fast_path_lwlock)
-               LWLockRelease(&MyProc->fpInfoLock);
-
-       /*
-        * Now, scan each lock partition separately.
-        */
-       for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
-       {
-               LWLock     *partitionLock;
-               SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
-               PROCLOCK   *nextplock;
-
-               partitionLock = LockHashPartitionLockByIndex(partition);
-
-               /*
-                * If the proclock list for this partition is empty, we can skip
-                * acquiring the partition lock.  This optimization is trickier 
than
-                * it looks, because another backend could be in process of 
adding
-                * something to our proclock list due to promoting one of our
-                * fast-path locks.  However, any such lock must be one that we
-                * decided not to delete above, so it's okay to skip it again 
now;
-                * we'd just decide not to delete it again.  We must, however, 
be
-                * careful to re-fetch the list header once we've acquired the
-                * partition lock, to be sure we have a valid, up-to-date 
pointer.
-                * (There is probably no significant risk if pointer 
fetch/store is
-                * atomic, but we don't wish to assume that.)
-                *
-                * XXX This argument assumes that the locallock table correctly
-                * represents all of our fast-path locks.  While allLocks mode
-                * guarantees to clean up all of our normal locks regardless of 
the
-                * locallock situation, we lose that guarantee for fast-path 
locks.
-                * This is not ideal.
-                */
-               if (SHMQueueNext(procLocks, procLocks,
-                                                offsetof(PROCLOCK, procLink)) 
== NULL)
-                       continue;                       /* needn't examine this 
partition */
-
-               LWLockAcquire(partitionLock, LW_EXCLUSIVE);
-
-               for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
-                                                                               
                  offsetof(PROCLOCK, procLink));
-                        proclock;
-                        proclock = nextplock)
-               {
-                       bool            wakeupNeeded = false;
-
-                       /* Get link first, since we may unlink/delete this 
proclock */
-                       nextplock = (PROCLOCK *)
-                               SHMQueueNext(procLocks, &proclock->procLink,
-                                                        offsetof(PROCLOCK, 
procLink));
-
-                       Assert(proclock->tag.myProc == MyProc);
-
-                       lock = proclock->tag.myLock;
-
-                       /* Ignore items that are not of the lockmethod to be 
removed */
-                       if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
-                               continue;
-
-                       /*
-                        * In allLocks mode, force release of all locks even if 
locallock
-                        * table had problems
-                        */
-                       if (allLocks)
-                               proclock->releaseMask = proclock->holdMask;
-                       else
-                               Assert((proclock->releaseMask & 
~proclock->holdMask) == 0);
-
-                       /*
-                        * Ignore items that have nothing to be released, 
unless they have
-                        * holdMask == 0 and are therefore recyclable
-                        */
-                       if (proclock->releaseMask == 0 && proclock->holdMask != 
0)
-                               continue;
-
-                       PROCLOCK_PRINT("LockReleaseAll", proclock);
-                       LOCK_PRINT("LockReleaseAll", lock, 0);
-                       Assert(lock->nRequested >= 0);
-                       Assert(lock->nGranted >= 0);
-                       Assert(lock->nGranted <= lock->nRequested);
-                       Assert((proclock->holdMask & ~lock->grantMask) == 0);
-
-                       /*
-                        * Release the previously-marked lock modes
-                        */
-                       for (i = 1; i <= numLockModes; i++)
-                       {
-                               if (proclock->releaseMask & LOCKBIT_ON(i))
-                                       wakeupNeeded |= UnGrantLock(lock, i, 
proclock,
-                                                                               
                lockMethodTable);
-                       }
-                       Assert((lock->nRequested >= 0) && (lock->nGranted >= 
0));
-                       Assert(lock->nGranted <= lock->nRequested);
-                       LOCK_PRINT("LockReleaseAll: updated", lock, 0);
-
-                       proclock->releaseMask = 0;
-
-                       /* CleanUpLock will wake up waiters if needed. */
-                       CleanUpLock(lock, proclock,
-                                               lockMethodTable,
-                                               LockTagHashCode(&lock->tag),
-                                               wakeupNeeded);
-               }                                               /* loop over 
PROCLOCKs within this partition */
-
-               LWLockRelease(partitionLock);
-       }                                                       /* loop over 
partitions */
-
-#ifdef LOCK_DEBUG
-       if (*(lockMethodTable->trace_flag))
-               elog(LOG, "LockReleaseAll done");
-#endif
+       Assert(MyProc->fpLockBits == 0);
 }
+#endif
 
 /*
  * LockReleaseSession -- Release all session locks of the specified lock method
@@ -2453,59 +2225,41 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
 void
 LockReleaseSession(LOCKMETHODID lockmethodid)
 {
-       HASH_SEQ_STATUS status;
-       LOCALLOCK  *locallock;
+       dlist_mutable_iter iter;
 
        if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
                elog(ERROR, "unrecognized lock method: %d", lockmethodid);
 
-       hash_seq_init(&status, LockMethodLocalHash);
-
-       while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+       dlist_foreach_modify(iter, &session_locks[lockmethodid - 1])
        {
-               /* Ignore items that are not of the specified lock method */
-               if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
-                       continue;
+               LOCALLOCKOWNER *locallockowner = 
dlist_container(LOCALLOCKOWNER, resowner_node, iter.cur);
 
-               ReleaseLockIfHeld(locallock, true);
+               Assert(LOCALLOCK_LOCKMETHOD(*locallockowner->locallock) == 
lockmethodid);
+
+               ReleaseLockIfHeld(locallockowner, true);
        }
+
+       Assert(dlist_is_empty(&session_locks[lockmethodid - 1]));
 }
 
 /*
  * LockReleaseCurrentOwner
- *             Release all locks belonging to CurrentResourceOwner
- *
- * If the caller knows what those locks are, it can pass them as an array.
- * That speeds up the call significantly, when a lot of locks are held.
- * Otherwise, pass NULL for locallocks, and we'll traverse through our hash
- * table to find them.
+ *             Release all locks belonging to 'owner'
  */
-void
-LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks)
+extern void
+LockReleaseCurrentOwner(ResourceOwner owner, dlist_node *resowner_node)
 {
-       if (locallocks == NULL)
-       {
-               HASH_SEQ_STATUS status;
-               LOCALLOCK  *locallock;
+       LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER, 
resowner_node, resowner_node);
 
-               hash_seq_init(&status, LockMethodLocalHash);
-
-               while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != 
NULL)
-                       ReleaseLockIfHeld(locallock, false);
-       }
-       else
-       {
-               int                     i;
+       Assert(locallockowner->owner == owner);
 
-               for (i = nlocks - 1; i >= 0; i--)
-                       ReleaseLockIfHeld(locallocks[i], false);
-       }
+       ReleaseLockIfHeld(locallockowner, false);
 }
 
 /*
  * ReleaseLockIfHeld
- *             Release any session-level locks on this lockable object if 
sessionLock
- *             is true; else, release any locks held by CurrentResourceOwner.
+ *             Release any session-level locks on this 'locallockowner' if
+ *             sessionLock is true; else, release any locks held by 
'locallockowner'.
  *
  * It is tempting to pass this a ResourceOwner pointer (or NULL for session
  * locks), but without refactoring LockRelease() we cannot support releasing
@@ -2516,52 +2270,40 @@ LockReleaseCurrentOwner(LOCALLOCK **locallocks, int 
nlocks)
  * convenience.
  */
 static void
-ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock)
+ReleaseLockIfHeld(LOCALLOCKOWNER *locallockowner, bool sessionLock)
 {
-       ResourceOwner owner;
-       LOCALLOCKOWNER *lockOwners;
-       int                     i;
+       LOCALLOCK *locallock = locallockowner->locallock;
+
+       /* release all references to the lock by this resource owner */
 
-       /* Identify owner for lock (must match LockRelease!) */
        if (sessionLock)
-               owner = NULL;
+               Assert(locallockowner->owner == NULL);
        else
-               owner = CurrentResourceOwner;
+               Assert(locallockowner->owner != NULL);
 
-       /* Scan to see if there are any locks belonging to the target owner */
-       lockOwners = locallock->lockOwners;
-       for (i = locallock->numLockOwners - 1; i >= 0; i--)
+       /* We will still hold this lock after forgetting this ResourceOwner. */
+       if (locallockowner->nLocks < locallock->nLocks)
        {
-               if (lockOwners[i].owner == owner)
-               {
-                       Assert(lockOwners[i].nLocks > 0);
-                       if (lockOwners[i].nLocks < locallock->nLocks)
-                       {
-                               /*
-                                * We will still hold this lock after 
forgetting this
-                                * ResourceOwner.
-                                */
-                               locallock->nLocks -= lockOwners[i].nLocks;
-                               /* compact out unused slot */
-                               locallock->numLockOwners--;
-                               if (owner != NULL)
-                                       ResourceOwnerForgetLock(owner, 
locallock);
-                               if (i < locallock->numLockOwners)
-                                       lockOwners[i] = 
lockOwners[locallock->numLockOwners];
-                       }
-                       else
-                       {
-                               Assert(lockOwners[i].nLocks == 
locallock->nLocks);
-                               /* We want to call LockRelease just once */
-                               lockOwners[i].nLocks = 1;
-                               locallock->nLocks = 1;
-                               if (!LockRelease(&locallock->tag.lock,
-                                                                
locallock->tag.mode,
-                                                                sessionLock))
-                                       elog(WARNING, "ReleaseLockIfHeld: 
failed??");
-                       }
-                       break;
-               }
+               locallock->nLocks -= locallockowner->nLocks;
+               Assert(locallock->nLocks >= 0);
+               dlist_delete(&locallockowner->locallock_node);
+
+               if (sessionLock)
+                       dlist_delete(&locallockowner->resowner_node);
+               else
+                       ResourceOwnerForgetLock(locallockowner);
+       }
+       else
+       {
+               Assert(locallockowner->nLocks == locallock->nLocks);
+               /* We want to call LockRelease just once */
+               locallockowner->nLocks = 1;
+               locallock->nLocks = 1;
+
+               if (!LockRelease(&locallock->tag.lock,
+                                                locallock->tag.mode,
+                                                sessionLock))
+                       elog(PANIC, "ReleaseLockIfHeld: failed??");
        }
 }
 
@@ -2576,75 +2318,46 @@ ReleaseLockIfHeld(LOCALLOCK *locallock, bool 
sessionLock)
  * and we'll traverse through our hash table to find them.
  */
 void
-LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks)
+LockReassignCurrentOwner(ResourceOwner owner, dlist_node *resowner_node)
 {
+       LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER, 
resowner_node, resowner_node);
        ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
 
-       Assert(parent != NULL);
-
-       if (locallocks == NULL)
-       {
-               HASH_SEQ_STATUS status;
-               LOCALLOCK  *locallock;
-
-               hash_seq_init(&status, LockMethodLocalHash);
-
-               while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != 
NULL)
-                       LockReassignOwner(locallock, parent);
-       }
-       else
-       {
-               int                     i;
-
-               for (i = nlocks - 1; i >= 0; i--)
-                       LockReassignOwner(locallocks[i], parent);
-       }
+       LockReassignOwner(locallockowner, parent);
 }
 
 /*
- * Subroutine of LockReassignCurrentOwner. Reassigns a given lock belonging to
- * CurrentResourceOwner to its parent.
+ * Subroutine of LockReassignCurrentOwner. Reassigns the given
+ * 'locallockowner' to 'parent'.
  */
 static void
-LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent)
+LockReassignOwner(LOCALLOCKOWNER *locallockowner, ResourceOwner parent)
 {
-       LOCALLOCKOWNER *lockOwners;
-       int                     i;
-       int                     ic = -1;
-       int                     ip = -1;
+       dlist_iter iter;
+       LOCALLOCK *locallock = locallockowner->locallock;
 
-       /*
-        * Scan to see if there are any locks belonging to current owner or its
-        * parent
-        */
-       lockOwners = locallock->lockOwners;
-       for (i = locallock->numLockOwners - 1; i >= 0; i--)
+       ResourceOwnerForgetLock(locallockowner);
+
+       dlist_foreach(iter, &locallock->locallockowners)
        {
-               if (lockOwners[i].owner == CurrentResourceOwner)
-                       ic = i;
-               else if (lockOwners[i].owner == parent)
-                       ip = i;
-       }
+               LOCALLOCKOWNER *parentlocalowner = 
dlist_container(LOCALLOCKOWNER, locallock_node, iter.cur);
 
-       if (ic < 0)
-               return;                                 /* no current locks */
+               Assert(parentlocalowner->locallock == locallock);
 
-       if (ip < 0)
-       {
-               /* Parent has no slot, so just give it the child's slot */
-               lockOwners[ic].owner = parent;
-               ResourceOwnerRememberLock(parent, locallock);
-       }
-       else
-       {
-               /* Merge child's count with parent's */
-               lockOwners[ip].nLocks += lockOwners[ic].nLocks;
-               /* compact out unused slot */
-               locallock->numLockOwners--;
-               if (ic < locallock->numLockOwners)
-                       lockOwners[ic] = lockOwners[locallock->numLockOwners];
+               if (parentlocalowner->owner != parent)
+                       continue;
+
+               parentlocalowner->nLocks += locallockowner->nLocks;
+
+               locallockowner->nLocks = 0;
+               dlist_delete(&locallockowner->locallock_node);
+               pfree(locallockowner);
+               return;
        }
-       ResourceOwnerForgetLock(CurrentResourceOwner, locallock);
+
+       /* reassign locallockowner to parent resowner */
+       locallockowner->owner = parent;
+       ResourceOwnerRememberLock(parent, locallockowner);
 }
 
 /*
@@ -3124,7 +2837,7 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE 
lockmode, int *countp)
  * We currently use this in two situations: first, to release locks held by
  * prepared transactions on commit (see lock_twophase_postcommit); and second,
  * to release locks taken via the fast-path, transferred to the main hash
- * table, and then released (see LockReleaseAll).
+ * table, and then released (see ResourceOwnerRelease).
  */
 static void
 LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
@@ -3179,7 +2892,7 @@ LockRefindAndRelease(LockMethod lockMethodTable, PGPROC 
*proc,
        {
                PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock);
                LWLockRelease(partitionLock);
-               elog(WARNING, "you don't own a lock of type %s",
+               elog(PANIC, "you don't own a lock of type %s",
                         lockMethodTable->lockModeNames[lockmode]);
                return;
        }
@@ -3260,10 +2973,9 @@ CheckForSessionAndXactLocks(void)
 
        while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
        {
-               LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
                PerLockTagEntry *hentry;
                bool            found;
-               int                     i;
+               dlist_iter iter;
 
                /*
                 * Ignore VXID locks.  We don't want those to be held by 
prepared
@@ -3284,9 +2996,13 @@ CheckForSessionAndXactLocks(void)
                        hentry->sessLock = hentry->xactLock = false;
 
                /* Scan to see if we hold lock at session or xact level or both 
*/
-               for (i = locallock->numLockOwners - 1; i >= 0; i--)
+               dlist_foreach(iter, &locallock->locallockowners)
                {
-                       if (lockOwners[i].owner == NULL)
+                       LOCALLOCKOWNER *locallockowner = 
dlist_container(LOCALLOCKOWNER,
+                                                                               
                                         locallock_node,
+                                                                               
                                         iter.cur);
+
+                       if (locallockowner->owner == NULL)
                                hentry->sessLock = true;
                        else
                                hentry->xactLock = true;
@@ -3333,10 +3049,9 @@ AtPrepare_Locks(void)
        while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
        {
                TwoPhaseLockRecord record;
-               LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
                bool            haveSessionLock;
                bool            haveXactLock;
-               int                     i;
+               dlist_iter iter;
 
                /*
                 * Ignore VXID locks.  We don't want those to be held by 
prepared
@@ -3351,9 +3066,13 @@ AtPrepare_Locks(void)
 
                /* Scan to see whether we hold it at session or transaction 
level */
                haveSessionLock = haveXactLock = false;
-               for (i = locallock->numLockOwners - 1; i >= 0; i--)
+               dlist_foreach(iter, &locallock->locallockowners)
                {
-                       if (lockOwners[i].owner == NULL)
+                       LOCALLOCKOWNER *locallockowner = 
dlist_container(LOCALLOCKOWNER,
+                                                                               
                                         locallock_node,
+                                                                               
                                         iter.cur);
+
+                       if (locallockowner->owner == NULL)
                                haveSessionLock = true;
                        else
                                haveXactLock = true;
@@ -3411,8 +3130,8 @@ AtPrepare_Locks(void)
  * pointers in the transaction's resource owner.  This is OK at the
  * moment since resowner.c doesn't try to free locks retail at a toplevel
  * transaction commit or abort.  We could alternatively zero out nLocks
- * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll,
- * but that probably costs more cycles.
+ * and leave the LOCALLOCK entries to be garbage-collected by
+ * ResourceOwnerRelease, but that probably costs more cycles.
  */
 void
 PostPrepare_Locks(TransactionId xid)
@@ -3445,10 +3164,9 @@ PostPrepare_Locks(TransactionId xid)
 
        while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
        {
-               LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
                bool            haveSessionLock;
                bool            haveXactLock;
-               int                     i;
+               dlist_iter iter;
 
                if (locallock->proclock == NULL || locallock->lock == NULL)
                {
@@ -3467,9 +3185,13 @@ PostPrepare_Locks(TransactionId xid)
 
                /* Scan to see whether we hold it at session or transaction 
level */
                haveSessionLock = haveXactLock = false;
-               for (i = locallock->numLockOwners - 1; i >= 0; i--)
+               dlist_foreach(iter, &locallock->locallockowners)
                {
-                       if (lockOwners[i].owner == NULL)
+                       LOCALLOCKOWNER *locallockowner = 
dlist_container(LOCALLOCKOWNER,
+                                                                               
                                         locallock_node,
+                                                                               
                                         iter.cur);
+
+                       if (locallockowner->owner == NULL)
                                haveSessionLock = true;
                        else
                                haveXactLock = true;
@@ -3485,10 +3207,6 @@ PostPrepare_Locks(TransactionId xid)
                                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                         errmsg("cannot PREPARE while holding 
both session-level and transaction-level locks on the same object")));
 
-               /* Mark the proclock to show we need to release this lockmode */
-               if (locallock->nLocks > 0)
-                       locallock->proclock->releaseMask |= 
LOCKBIT_ON(locallock->tag.mode);
-
                /* And remove the locallock hashtable entry */
                RemoveLocalLock(locallock);
        }
@@ -3506,11 +3224,7 @@ PostPrepare_Locks(TransactionId xid)
 
                /*
                 * If the proclock list for this partition is empty, we can skip
-                * acquiring the partition lock.  This optimization is safer 
than the
-                * situation in LockReleaseAll, because we got rid of any 
fast-path
-                * locks during AtPrepare_Locks, so there cannot be any case 
where
-                * another backend is adding something to our lists now.  For 
safety,
-                * though, we code this the same way as in LockReleaseAll.
+                * acquiring the partition lock.
                 */
                if (SHMQueueNext(procLocks, procLocks,
                                                 offsetof(PROCLOCK, procLink)) 
== NULL)
@@ -3543,14 +3257,6 @@ PostPrepare_Locks(TransactionId xid)
                        Assert(lock->nGranted <= lock->nRequested);
                        Assert((proclock->holdMask & ~lock->grantMask) == 0);
 
-                       /* Ignore it if nothing to release (must be a session 
lock) */
-                       if (proclock->releaseMask == 0)
-                               continue;
-
-                       /* Else we should be releasing all locks */
-                       if (proclock->releaseMask != proclock->holdMask)
-                               elog(PANIC, "we seem to have dropped a bit 
somewhere");
-
                        /*
                         * We cannot simply modify proclock->tag.myProc to 
reassign
                         * ownership of the lock, because that's part of the 
hash key and
@@ -3600,6 +3306,7 @@ PostPrepare_Locks(TransactionId xid)
        }                                                       /* loop over 
partitions */
 
        END_CRIT_SECTION();
+
 }
 
 
@@ -4336,7 +4043,6 @@ lock_twophase_recover(TransactionId xid, uint16 info,
                Assert(proc->lockGroupLeader == NULL);
                proclock->groupLeader = proc;
                proclock->holdMask = 0;
-               proclock->releaseMask = 0;
                /* Add proclock to appropriate lists */
                SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
                SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
@@ -4473,7 +4179,7 @@ lock_twophase_postabort(TransactionId xid, uint16 info,
  *
  *             We don't bother recording this lock in the local lock table, 
since it's
  *             only ever released at the end of a transaction.  Instead,
- *             LockReleaseAll() calls VirtualXactLockTableCleanup().
+ *             ProcReleaseLocks() calls VirtualXactLockTableCleanup().
  */
 void
 VirtualXactLockTableInsert(VirtualTransactionId vxid)
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 90283f8a9f..5c2e1cbd06 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -779,10 +779,17 @@ ProcReleaseLocks(bool isCommit)
                return;
        /* If waiting, get off wait queue (should only be needed after error) */
        LockErrorCleanup();
-       /* Release standard locks, including session-level if aborting */
-       LockReleaseAll(DEFAULT_LOCKMETHOD, !isCommit);
-       /* Release transaction-level advisory locks */
-       LockReleaseAll(USER_LOCKMETHOD, false);
+
+       VirtualXactLockTableCleanup();
+
+       /* Release session-level locks if aborting */
+       if (!isCommit)
+               LockReleaseSession(DEFAULT_LOCKMETHOD);
+
+#ifdef USE_ASSERT_CHECKING
+       /* Ensure all locks were released */
+       LockAssertNoneHeld(isCommit);
+#endif
 }
 
 
@@ -864,6 +871,8 @@ ProcKill(int code, Datum arg)
                LWLockRelease(leader_lwlock);
        }
 
+       Assert(MyProc->fpLockBits == 0);
+
        /*
         * Reset MyLatch to the process local one.  This is so that signal
         * handlers et al can continue using the latch after the shared latch
diff --git a/src/backend/utils/init/postinit.c 
b/src/backend/utils/init/postinit.c
index 86d193c89f..1cf3d77f92 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -1261,7 +1261,7 @@ ShutdownPostgres(int code, Datum arg)
         * User locks are not released by transaction end, so be sure to release
         * them explicitly.
         */
-       LockReleaseAll(USER_LOCKMETHOD, true);
+       LockReleaseSession(USER_LOCKMETHOD);
 
        /*
         * temp debugging aid to analyze 019_replslot_limit failures
diff --git a/src/backend/utils/resowner/resowner.c 
b/src/backend/utils/resowner/resowner.c
index 3236b1b919..52f015b7f8 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -33,6 +33,7 @@
 #include "utils/resowner_private.h"
 #include "utils/snapmgr.h"
 
+#include "lib/ilist.h"
 
 /*
  * All resource IDs managed by this code are required to fit into a Datum,
@@ -133,9 +134,7 @@ typedef struct ResourceOwnerData
        ResourceArray cryptohasharr;    /* cryptohash contexts */
        ResourceArray hmacarr;          /* HMAC contexts */
 
-       /* We can remember up to MAX_RESOWNER_LOCKS references to local locks. 
*/
-       int                     nlocks;                 /* number of owned 
locks */
-       LOCALLOCK  *locks[MAX_RESOWNER_LOCKS];  /* list of owned locks */
+       dlist_head  locks;                      /* dlist of owned locks */
 }                      ResourceOwnerData;
 
 
@@ -452,6 +451,7 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name)
        ResourceArrayInit(&(owner->jitarr), PointerGetDatum(NULL));
        ResourceArrayInit(&(owner->cryptohasharr), PointerGetDatum(NULL));
        ResourceArrayInit(&(owner->hmacarr), PointerGetDatum(NULL));
+       dlist_init(&owner->locks);
 
        return owner;
 }
@@ -585,8 +585,15 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
        }
        else if (phase == RESOURCE_RELEASE_LOCKS)
        {
+               dlist_mutable_iter iter;
+
                if (isTopLevel)
                {
+                       dlist_foreach_modify(iter, &owner->locks)
+                               LockReleaseCurrentOwner(owner, iter.cur);
+
+                       Assert(dlist_is_empty(&owner->locks));
+
                        /*
                         * For a top-level xact we are going to release all 
locks (or at
                         * least all non-session locks), so just do a single 
lmgr call at
@@ -605,30 +612,20 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
                         * subtransaction, we do NOT release its locks yet, but 
transfer
                         * them to the parent.
                         */
-                       LOCALLOCK **locks;
-                       int                     nlocks;
-
-                       Assert(owner->parent != NULL);
-
-                       /*
-                        * Pass the list of locks owned by this resource owner 
to the lock
-                        * manager, unless it has overflowed.
-                        */
-                       if (owner->nlocks > MAX_RESOWNER_LOCKS)
+                       if (isCommit)
                        {
-                               locks = NULL;
-                               nlocks = 0;
+                               dlist_foreach_modify(iter, &owner->locks)
+                                       LockReassignCurrentOwner(owner, 
iter.cur);
+
+                               Assert(dlist_is_empty(&owner->locks));
                        }
                        else
                        {
-                               locks = owner->locks;
-                               nlocks = owner->nlocks;
-                       }
+                               dlist_foreach_modify(iter, &owner->locks)
+                                       LockReleaseCurrentOwner(owner, 
iter.cur);
 
-                       if (isCommit)
-                               LockReassignCurrentOwner(locks, nlocks);
-                       else
-                               LockReleaseCurrentOwner(locks, nlocks);
+                               Assert(dlist_is_empty(&owner->locks));
+                       }
                }
        }
        else if (phase == RESOURCE_RELEASE_AFTER_LOCKS)
@@ -752,7 +749,7 @@ ResourceOwnerDelete(ResourceOwner owner)
        Assert(owner->jitarr.nitems == 0);
        Assert(owner->cryptohasharr.nitems == 0);
        Assert(owner->hmacarr.nitems == 0);
-       Assert(owner->nlocks == 0 || owner->nlocks == MAX_RESOWNER_LOCKS + 1);
+       Assert(dlist_is_empty(&owner->locks));
 
        /*
         * Delete children.  The recursive call will delink the child from me, 
so
@@ -974,54 +971,61 @@ ResourceOwnerForgetBuffer(ResourceOwner owner, Buffer 
buffer)
 
 /*
  * Remember that a Local Lock is owned by a ResourceOwner
- *
- * This is different from the other Remember functions in that the list of
- * locks is only a lossy cache. It can hold up to MAX_RESOWNER_LOCKS entries,
- * and when it overflows, we stop tracking locks. The point of only remembering
- * only up to MAX_RESOWNER_LOCKS entries is that if a lot of locks are held,
- * ResourceOwnerForgetLock doesn't need to scan through a large array to find
- * the entry.
  */
 void
-ResourceOwnerRememberLock(ResourceOwner owner, LOCALLOCK *locallock)
+ResourceOwnerRememberLock(ResourceOwner owner, LOCALLOCKOWNER *locallockowner)
 {
-       Assert(locallock != NULL);
-
-       if (owner->nlocks > MAX_RESOWNER_LOCKS)
-               return;                                 /* we have already 
overflowed */
+       Assert(owner != NULL);
+       Assert(locallockowner != NULL);
 
-       if (owner->nlocks < MAX_RESOWNER_LOCKS)
-               owner->locks[owner->nlocks] = locallock;
-       else
+#ifdef USE_ASSERT_CHECKING
        {
-               /* overflowed */
+               dlist_iter iter;
+
+               dlist_foreach(iter, &owner->locks)
+               {
+                       LOCALLOCKOWNER *i = dlist_container(LOCALLOCKOWNER, 
resowner_node, iter.cur);
+
+                       Assert(i->locallock != locallockowner->locallock);
+               }
        }
-       owner->nlocks++;
+#endif
+
+       dlist_push_tail(&owner->locks, &locallockowner->resowner_node);
 }
 
 /*
- * Forget that a Local Lock is owned by a ResourceOwner
+ * Forget that a Local Lock is owned by the given LOCALLOCKOWNER.
  */
 void
-ResourceOwnerForgetLock(ResourceOwner owner, LOCALLOCK *locallock)
+ResourceOwnerForgetLock(LOCALLOCKOWNER *locallockowner)
 {
-       int                     i;
+#ifdef USE_ASSERT_CHECKING
+       ResourceOwner owner;
+
+       Assert(locallockowner != NULL);
 
-       if (owner->nlocks > MAX_RESOWNER_LOCKS)
-               return;                                 /* we have overflowed */
+       owner = locallockowner->owner;
 
-       Assert(owner->nlocks > 0);
-       for (i = owner->nlocks - 1; i >= 0; i--)
        {
-               if (locallock == owner->locks[i])
+               dlist_iter iter;
+               bool found = false;
+
+               dlist_foreach(iter, &owner->locks)
                {
-                       owner->locks[i] = owner->locks[owner->nlocks - 1];
-                       owner->nlocks--;
-                       return;
+                       LOCALLOCKOWNER *owner = dlist_container(LOCALLOCKOWNER, 
resowner_node, iter.cur);
+
+                       if (locallockowner == owner)
+                       {
+                               Assert(!found);
+                               found = true;
+                       }
                }
+
+               Assert(found);
        }
-       elog(ERROR, "lock reference %p is not owned by resource owner %s",
-                locallock, owner->name);
+#endif
+       dlist_delete(&locallockowner->resowner_node);
 }
 
 /*
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index dc537e20f2..f664bd5136 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -23,6 +23,7 @@
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "utils/timestamp.h"
+#include "lib/ilist.h"
 
 /* struct PGPROC is declared in proc.h, but must forward-reference it */
 typedef struct PGPROC PGPROC;
@@ -341,10 +342,6 @@ typedef struct LOCK
  * Otherwise, proclock objects whose holdMasks are zero are recycled
  * as soon as convenient.
  *
- * releaseMask is workspace for LockReleaseAll(): it shows the locks due
- * to be released during the current call.  This must only be examined or
- * set by the backend owning the PROCLOCK.
- *
  * Each PROCLOCK object is linked into lists for both the associated LOCK
  * object and the owning PGPROC object.  Note that the PROCLOCK is entered
  * into these lists as soon as it is created, even if no lock has yet been
@@ -366,7 +363,6 @@ typedef struct PROCLOCK
        /* data */
        PGPROC     *groupLeader;        /* proc's lock group leader, or proc 
itself */
        LOCKMASK        holdMask;               /* bitmask for lock types 
currently held */
-       LOCKMASK        releaseMask;    /* bitmask for lock types to be 
released */
        SHM_QUEUE       lockLink;               /* list link in LOCK's list of 
proclocks */
        SHM_QUEUE       procLink;               /* list link in PGPROC's list 
of proclocks */
 } PROCLOCK;
@@ -412,6 +408,13 @@ typedef struct LOCALLOCKOWNER
         * Must use a forward struct reference to avoid circularity.
         */
        struct ResourceOwnerData *owner;
+
+       dlist_node  resowner_node;
+
+       dlist_node  locallock_node;
+
+       struct LOCALLOCK *locallock;
+
        int64           nLocks;                 /* # of times held by this 
owner */
 } LOCALLOCKOWNER;
 
@@ -425,9 +428,9 @@ typedef struct LOCALLOCK
        LOCK       *lock;                       /* associated LOCK object, if 
any */
        PROCLOCK   *proclock;           /* associated PROCLOCK object, if any */
        int64           nLocks;                 /* total number of times lock 
is held */
-       int                     numLockOwners;  /* # of relevant ResourceOwners 
*/
-       int                     maxLockOwners;  /* allocated size of array */
-       LOCALLOCKOWNER *lockOwners; /* dynamically resizable array */
+
+       dlist_head locallockowners;     /* dlist of LOCALLOCKOWNER */
+
        bool            holdsStrongLockCount;   /* bumped 
FastPathStrongRelationLocks */
        bool            lockCleared;    /* we read all sinval msgs for lock */
 } LOCALLOCK;
@@ -556,10 +559,17 @@ extern void AbortStrongLockAcquire(void);
 extern void MarkLockClear(LOCALLOCK *locallock);
 extern bool LockRelease(const LOCKTAG *locktag,
                                                LOCKMODE lockmode, bool 
sessionLock);
-extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
+
+#ifdef USE_ASSERT_CHECKING
+extern void LockAssertNoneHeld(bool isCommit);
+#endif
+
 extern void LockReleaseSession(LOCKMETHODID lockmethodid);
-extern void LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks);
-extern void LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks);
+struct ResourceOwnerData;
+extern void LockReleaseCurrentOwner(struct ResourceOwnerData *owner,
+                                                                       
dlist_node *resowner_node);
+extern void LockReassignCurrentOwner(struct ResourceOwnerData *owner,
+                                                                        
dlist_node *resowner_node);
 extern bool LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode);
 #ifdef USE_ASSERT_CHECKING
 extern HTAB *GetLockMethodLocalHash(void);
diff --git a/src/include/utils/resowner_private.h 
b/src/include/utils/resowner_private.h
index d01cccc27c..ab4796d287 100644
--- a/src/include/utils/resowner_private.h
+++ b/src/include/utils/resowner_private.h
@@ -31,8 +31,9 @@ extern void ResourceOwnerRememberBuffer(ResourceOwner owner, 
Buffer buffer);
 extern void ResourceOwnerForgetBuffer(ResourceOwner owner, Buffer buffer);
 
 /* support for local lock management */
-extern void ResourceOwnerRememberLock(ResourceOwner owner, LOCALLOCK 
*locallock);
-extern void ResourceOwnerForgetLock(ResourceOwner owner, LOCALLOCK *locallock);
+extern void ResourceOwnerRememberLock(ResourceOwner owner,
+                                                                         
LOCALLOCKOWNER *locallock);
+extern void ResourceOwnerForgetLock(LOCALLOCKOWNER *locallock);
 
 /* support for catcache refcount management */
 extern void ResourceOwnerEnlargeCatCacheRefs(ResourceOwner owner);
diff --git a/src/tools/make_mkid b/src/tools/make_mkid
deleted file mode 100755
index 6f160614cd..0000000000
--- a/src/tools/make_mkid
+++ /dev/null
@@ -1,11 +0,0 @@
-#!/bin/sh
-
-# src/tools/make_mkid
-
-mkid `find \`pwd\`/ \( -name _deadcode -a -prune \) -o \
-       -type f -name '*.[chyl]' -print|sed 's;//;/;g'`
-
-find . \( -name .git -a -prune \) -o -type d -print  |while read DIR
-do
-       [ "$DIR" != "." ] && ln -f -s `echo "$DIR" | sed 's;/[^/]*;/..;g'`/ID 
$DIR/ID
-done

Attachment: lock2.sql
Description: Binary data

Attachment: lock1.sql
Description: Binary data

#!/bin/bash

dbname=postgres
sec=10

for file in "lock1.sql" "lock2.sql" "lock3.sql" "lock4.sql"
do
        echo $file
        for i in {1..5}
        do
                pgbench -n -f $file -T $sec -M prepared $dbname | grep tps
        done
done

Attachment: lock4.sql
Description: Binary data

Attachment: lock3.sql
Description: Binary data

Reply via email to