Alvaro Herrera <[EMAIL PROTECTED]> writes:
> Ok, new version of this patch, adjusted per your comments.  Thanks.

Applied with revisions.  I backed off the idea of removing LockRelease's
return value after looking at contrib/userlock; it seems possible that
some applications out there are depending on being able to issue
release for locks they don't hold.  Also fixed the logic: the code as
submitted was slightly wrong since it would fail to waken waiters if any
modes remain held.  ProcLockWakeup may need to be run even if we still
have a proclock.  So I renamed the function to CleanUpLock, which
hopefully is more indicative of what it really does.

Patch as applied is attached.

                        regards, tom lane

*** contrib/userlock/user_locks.c.orig  Tue May 17 17:35:04 2005
--- contrib/userlock/user_locks.c       Thu May 19 18:31:26 2005
***************
*** 75,79 ****
  int
  user_unlock_all(void)
  {
!       return LockReleaseAll(USER_LOCKMETHOD, true);
  }
--- 75,81 ----
  int
  user_unlock_all(void)
  {
!       LockReleaseAll(USER_LOCKMETHOD, true);
! 
!       return true;
  }
*** src/backend/storage/lmgr/lock.c.orig        Wed May 11 12:13:55 2005
--- src/backend/storage/lmgr/lock.c     Thu May 19 19:21:17 2005
***************
*** 166,171 ****
--- 166,173 ----
                                 int *myHolding);
  static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
                                                PROCLOCK *proclock, LockMethod 
lockMethodTable);
+ static void CleanUpLock(LOCKMETHODID lockmethodid, LOCK *lock,
+                                               PROCLOCK *proclock, bool 
wakeupNeeded);
  
  
  /*
***************
*** 589,601 ****
                         * anyone to release the lock object later.
                         */
                        Assert(SHMQueueEmpty(&(lock->procLocks)));
!                       lock = (LOCK *) 
hash_search(LockMethodLockHash[lockmethodid],
!                                                                               
(void *) &(lock->tag),
!                                                                               
HASH_REMOVE, NULL);
                }
                LWLockRelease(masterLock);
-               if (!lock)                              /* hash remove failed? 
*/
-                       elog(WARNING, "lock table corrupted");
                ereport(ERROR,
                                (errcode(ERRCODE_OUT_OF_MEMORY),
                                 errmsg("out of shared memory"),
--- 591,602 ----
                         * anyone to release the lock object later.
                         */
                        Assert(SHMQueueEmpty(&(lock->procLocks)));
!                       if (!hash_search(LockMethodLockHash[lockmethodid],
!                                                        (void *) &(lock->tag),
!                                                        HASH_REMOVE, NULL))
!                               elog(PANIC, "lock table corrupted");
                }
                LWLockRelease(masterLock);
                ereport(ERROR,
                                (errcode(ERRCODE_OUT_OF_MEMORY),
                                 errmsg("out of shared memory"),
***************
*** 708,718 ****
                        {
                                SHMQueueDelete(&proclock->lockLink);
                                SHMQueueDelete(&proclock->procLink);
!                               proclock = (PROCLOCK *) 
hash_search(LockMethodProcLockHash[lockmethodid],
!                                                                               
           (void *) &(proclock->tag),
!                                                                               
                        HASH_REMOVE, NULL);
!                               if (!proclock)
!                                       elog(WARNING, "proclock table 
corrupted");
                        }
                        else
                                PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
--- 709,718 ----
                        {
                                SHMQueueDelete(&proclock->lockLink);
                                SHMQueueDelete(&proclock->procLink);
!                               if 
(!hash_search(LockMethodProcLockHash[lockmethodid],
!                                                                (void *) 
&(proclock->tag),
!                                                                HASH_REMOVE, 
NULL))
!                                       elog(PANIC, "proclock table corrupted");
                        }
                        else
                                PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
***************
*** 784,793 ****
  
        pfree(locallock->lockOwners);
        locallock->lockOwners = NULL;
!       locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
!                                                                               
  (void *) &(locallock->tag),
!                                                                               
  HASH_REMOVE, NULL);
!       if (!locallock)
                elog(WARNING, "locallock table corrupted");
  }
  
--- 784,792 ----
  
        pfree(locallock->lockOwners);
        locallock->lockOwners = NULL;
!       if (!hash_search(LockMethodLocalHash[lockmethodid],
!                                        (void *) &(locallock->tag),
!                                        HASH_REMOVE, NULL))
                elog(WARNING, "locallock table corrupted");
  }
  
***************
*** 994,999 ****
--- 993,1047 ----
  }
  
  /*
+  * CleanUpLock -- clean up after releasing a lock.  We garbage-collect the
+  * proclock and lock objects if possible, and call ProcLockWakeup if there
+  * are remaining requests and the caller says it's OK.  (Normally, this
+  * should be called after UnGrantLock, and wakeupNeeded is the result from
+  * UnGrantLock.)
+  *
+  * The locktable's masterLock must be held at entry, and will be
+  * held at exit.
+  */
+ static void
+ CleanUpLock(LOCKMETHODID lockmethodid, LOCK *lock, PROCLOCK *proclock,
+                       bool wakeupNeeded)
+ {
+       /*
+        * If this was my last hold on this lock, delete my entry in the
+        * proclock table.
+        */
+       if (proclock->holdMask == 0)
+       {
+               PROCLOCK_PRINT("CleanUpLock: deleting", proclock);
+               SHMQueueDelete(&proclock->lockLink);
+               SHMQueueDelete(&proclock->procLink);
+               if (!hash_search(LockMethodProcLockHash[lockmethodid],
+                                                (void *) &(proclock->tag),
+                                                HASH_REMOVE, NULL))
+                       elog(PANIC, "proclock table corrupted");
+       }
+ 
+       if (lock->nRequested == 0)
+       {
+               /*
+                * The caller just released the last lock, so garbage-collect 
the
+                * lock object.
+                */
+               LOCK_PRINT("CleanUpLock: deleting", lock, 0);
+               Assert(SHMQueueEmpty(&(lock->procLocks)));
+               if (!hash_search(LockMethodLockHash[lockmethodid],
+                                                (void *) &(lock->tag),
+                                                HASH_REMOVE, NULL))
+                       elog(PANIC, "lock table corrupted");
+       }
+       else if (wakeupNeeded)
+       {
+               /* There are waiters on this lock, so wake them up. */
+               ProcLockWakeup(LockMethods[lockmethodid], lock);  
+       }
+ }
+ 
+ /*
   * GrantLockLocal -- update the locallock data structures to show
   *            the lock request has been granted.
   *
***************
*** 1160,1189 ****
  
        /*
         * Delete the proclock immediately if it represents no already-held 
locks.
!        * This must happen now because if the owner of the lock decides to 
release
!        * it, and the requested/granted counts then go to zero, LockRelease
!        * expects there to be no remaining proclocks.
!        */
!       if (proclock->holdMask == 0)
!       {
!               PROCLOCK_PRINT("RemoveFromWaitQueue: deleting proclock", 
proclock);
!               SHMQueueDelete(&proclock->lockLink);
!               SHMQueueDelete(&proclock->procLink);
!               proclock = (PROCLOCK *) 
hash_search(LockMethodProcLockHash[lockmethodid],
!                                                                               
        (void *) &(proclock->tag),
!                                                                               
        HASH_REMOVE, NULL);
!               if (!proclock)
!                       elog(WARNING, "proclock table corrupted");
!       }
! 
!       /*
!        * There should still be some requests for the lock ... else what were
!        * we waiting for?  Therefore no need to delete the lock object.
         */
!       Assert(waitLock->nRequested > 0);
! 
!       /* See if any other waiters for the lock can be woken up now */
!       ProcLockWakeup(LockMethods[lockmethodid], waitLock);
  }
  
  /*
--- 1208,1219 ----
  
        /*
         * Delete the proclock immediately if it represents no already-held 
locks.
!        * (This must happen now because if the owner of the lock decides to
!        * release it, and the requested/granted counts then go to zero,
!        * LockRelease expects there to be no remaining proclocks.)
!        * Then see if any other waiters for the lock can be woken up now.
         */
!       CleanUpLock(lockmethodid, waitLock, proclock, true);
  }
  
  /*
***************
*** 1221,1230 ****
        Assert(lockmethodid < NumLockMethods);
        lockMethodTable = LockMethods[lockmethodid];
        if (!lockMethodTable)
!       {
!               elog(WARNING, "lockMethodTable is null in LockRelease");
!               return FALSE;
!       }
  
        /*
         * Find the LOCALLOCK entry for this lock and lockmode
--- 1251,1257 ----
        Assert(lockmethodid < NumLockMethods);
        lockMethodTable = LockMethods[lockmethodid];
        if (!lockMethodTable)
!               elog(ERROR, "bad lock method: %d", lockmethodid);
  
        /*
         * Find the LOCALLOCK entry for this lock and lockmode
***************
*** 1328,1383 ****
                return FALSE;
        }
  
-       wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
- 
        /*
!        * If this was my last hold on this lock, delete my entry in the
!        * proclock table.
         */
!       if (proclock->holdMask == 0)
!       {
!               PROCLOCK_PRINT("LockRelease: deleting proclock", proclock);
!               SHMQueueDelete(&proclock->lockLink);
!               SHMQueueDelete(&proclock->procLink);
!               proclock = (PROCLOCK *) 
hash_search(LockMethodProcLockHash[lockmethodid],
!                                                                               
        (void *) &(proclock->tag),
!                                                                               
        HASH_REMOVE, NULL);
!               if (!proclock)
!               {
!                       LWLockRelease(masterLock);
!                       elog(WARNING, "proclock table corrupted");
!                       RemoveLocalLock(locallock);
!                       return FALSE;
!               }
!       }
  
!       if (lock->nRequested == 0)
!       {
!               /*
!                * We've just released the last lock, so garbage-collect the 
lock
!                * object.
!                */
!               LOCK_PRINT("LockRelease: deleting lock", lock, lockmode);
!               Assert(SHMQueueEmpty(&(lock->procLocks)));
!               lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
!                                                                       (void 
*) &(lock->tag),
!                                                                       
HASH_REMOVE, NULL);
!               if (!lock)
!               {
!                       LWLockRelease(masterLock);
!                       elog(WARNING, "lock table corrupted");
!                       RemoveLocalLock(locallock);
!                       return FALSE;
!               }
!       }
!       else
!       {
!               /*
!                * Wake up waiters if needed.
!                */
!               if (wakeupNeeded)
!                       ProcLockWakeup(lockMethodTable, lock);
!       }
  
        LWLockRelease(masterLock);
  
--- 1355,1366 ----
                return FALSE;
        }
  
        /*
!        * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
         */
!       wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
  
!       CleanUpLock(lockmethodid, lock, proclock, wakeupNeeded);
  
        LWLockRelease(masterLock);
  
***************
*** 1397,1403 ****
   * allxids == false: release all locks with Xid != 0
   * (zero is the Xid used for "session" locks).
   */
! bool
  LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
  {
        HASH_SEQ_STATUS status;
--- 1380,1386 ----
   * allxids == false: release all locks with Xid != 0
   * (zero is the Xid used for "session" locks).
   */
! void
  LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
  {
        HASH_SEQ_STATUS status;
***************
*** 1418,1427 ****
        Assert(lockmethodid < NumLockMethods);
        lockMethodTable = LockMethods[lockmethodid];
        if (!lockMethodTable)
!       {
!               elog(WARNING, "bad lock method: %d", lockmethodid);
!               return FALSE;
!       }
  
        numLockModes = lockMethodTable->numLockModes;
        masterLock = lockMethodTable->masterLock;
--- 1401,1407 ----
        Assert(lockmethodid < NumLockMethods);
        lockMethodTable = LockMethods[lockmethodid];
        if (!lockMethodTable)
!               elog(ERROR, "bad lock method: %d", lockmethodid);
  
        numLockModes = lockMethodTable->numLockModes;
        masterLock = lockMethodTable->masterLock;
***************
*** 1516,1563 ****
                Assert(lock->nGranted <= lock->nRequested);
                LOCK_PRINT("LockReleaseAll: updated", lock, 0);
  
!               PROCLOCK_PRINT("LockReleaseAll: deleting", proclock);
! 
!               /*
!                * Remove the proclock entry from the linked lists
!                */
!               SHMQueueDelete(&proclock->lockLink);
!               SHMQueueDelete(&proclock->procLink);
! 
!               /*
!                * remove the proclock entry from the hashtable
!                */
!               proclock = (PROCLOCK *) 
hash_search(LockMethodProcLockHash[lockmethodid],
!                                                                               
        (void *) &(proclock->tag),
!                                                                               
        HASH_REMOVE,
!                                                                               
        NULL);
!               if (!proclock)
!               {
!                       LWLockRelease(masterLock);
!                       elog(WARNING, "proclock table corrupted");
!                       return FALSE;
!               }
  
!               if (lock->nRequested == 0)
!               {
!                       /*
!                        * We've just released the last lock, so 
garbage-collect the
!                        * lock object.
!                        */
!                       LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
!                       Assert(SHMQueueEmpty(&(lock->procLocks)));
!                       lock = (LOCK *) 
hash_search(LockMethodLockHash[lockmethodid],
!                                                                               
(void *) &(lock->tag),
!                                                                               
HASH_REMOVE, NULL);
!                       if (!lock)
!                       {
!                               LWLockRelease(masterLock);
!                               elog(WARNING, "lock table corrupted");
!                               return FALSE;
!                       }
!               }
!               else if (wakeupNeeded)
!                       ProcLockWakeup(lockMethodTable, lock);
  
  next_item:
                proclock = nextHolder;
--- 1496,1505 ----
                Assert(lock->nGranted <= lock->nRequested);
                LOCK_PRINT("LockReleaseAll: updated", lock, 0);
  
!               Assert(proclock->holdMask == 0);
  
!               /* CleanUpLock will wake up waiters if needed. */
!               CleanUpLock(lockmethodid, lock, proclock, wakeupNeeded);
  
  next_item:
                proclock = nextHolder;
***************
*** 1569,1576 ****
        if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
                elog(LOG, "LockReleaseAll done");
  #endif
- 
-       return TRUE;
  }
  
  /*
--- 1511,1516 ----
*** src/include/storage/lock.h.orig     Fri Apr 29 18:28:24 2005
--- src/include/storage/lock.h  Thu May 19 18:31:09 2005
***************
*** 361,367 ****
                        TransactionId xid, LOCKMODE lockmode, bool dontWait);
  extern bool LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
                        TransactionId xid, LOCKMODE lockmode);
! extern bool LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids);
  extern void LockReleaseCurrentOwner(void);
  extern void LockReassignCurrentOwner(void);
  extern int LockCheckConflicts(LockMethod lockMethodTable,
--- 361,367 ----
                        TransactionId xid, LOCKMODE lockmode, bool dontWait);
  extern bool LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
                        TransactionId xid, LOCKMODE lockmode);
! extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids);
  extern void LockReleaseCurrentOwner(void);
  extern void LockReassignCurrentOwner(void);
  extern int LockCheckConflicts(LockMethod lockMethodTable,

---------------------------(end of broadcast)---------------------------
TIP 8: explain analyze is your friend

Reply via email to