I wrote:
> We could perhaps fix this with a less invasive change than what you
> suggest here, by attacking the missed-call-due-to-recursion aspect
> rather than monkeying with how relcache rebuild itself works.

Seeing that rearranging the relcache rebuild logic is looking less than
trivial, I thought it'd be a good idea to investigate this alternative
approach.

Essentially, the problem here is that lmgr.c supposes that
AcceptInvalidationMessages is an atomic operation, which it most
certainly isn't.  In reality, it's unsafe to suppose that we can skip
reading incoming inval messages until we have *completed*
AcceptInvalidationMessages, not just invoked it.

However, we can fix that very easily, by retaining one additional bit
of state in each LOCALLOCK entry, which records whether we know we have
completed AcceptInvalidationMessages at least once subsequent to obtaining
that lock.  In the attached draft patch, I refer to that state as having
"cleared" the lock, but I'm not super pleased with that terminology ---
anybody have a better idea?

A small problem with the lock.c API as it stands is that we'd have to do
an additional hashtable lookup to re-find the relevant LOCALLOCK entry.
In the attached, I fixed that by extending LockAcquireExtended's API
to allow the caller to obtain a pointer to the LOCALLOCK entry, making
it trivially cheap to set the flag after we finish
AcceptInvalidationMessages.  LockAcquireExtended has only one external
caller right now, which makes me think it's OK to change its API rather
than introduce YA variant entry point, but that could be argued.

There are two ways we could adjust the return value from
LockAcquire(Extended) to cope with this requirement.  What I did here
was to add an additional LockAcquireResult code, so that "already held"
can be distinguished from "already held and cleared".  But we don't
really need to make that distinction, at least not in the core code.
So another way would be to redefine LOCKACQUIRE_ALREADY_HELD as meaning
"held and cleared", and then just return LOCKACQUIRE_OK if you haven't
called MarkLockClear for the lock.  I'm not entirely sure which way is
less likely to break any third-party code that might be inspecting the
result of LockAcquire.  You could probably argue it either way depending
on what you think the third-party code is doing with the result.

Anyway, the attached appears to fix the problem: Andres' test script
fails in fractions of a second with HEAD on my workstation, but it's
run error-free for quite awhile now with this patch.  And this is sure
a lot simpler than any relcache rebuild refactoring we're likely to come
up with.

Discuss.

                        regards, tom lane

PS: this also fixes what seems to me to be a buglet in the fast-path
locks stuff: there are failure paths out of LockAcquire that don't
remove the failed LOCALLOCK entry.  Not any more.

diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 2e07702..e924c96 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -661,7 +661,8 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
 
 	SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
 
-	LockAcquireExtended(&locktag, AccessExclusiveLock, true, false, false);
+	LockAcquireExtended(&locktag, AccessExclusiveLock, true, false,
+						false, NULL);
 }
 
 static void
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index 7b2dcb6..dc0a439 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -105,11 +105,12 @@ void
 LockRelationOid(Oid relid, LOCKMODE lockmode)
 {
 	LOCKTAG		tag;
+	LOCALLOCK  *locallock;
 	LockAcquireResult res;
 
 	SetLocktagRelationOid(&tag, relid);
 
-	res = LockAcquire(&tag, lockmode, false, false);
+	res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
 
 	/*
 	 * Now that we have the lock, check for invalidation messages, so that we
@@ -120,9 +121,18 @@ LockRelationOid(Oid relid, LOCKMODE lockmode)
 	 * relcache entry in an undesirable way.  (In the case where our own xact
 	 * modifies the rel, the relcache update happens via
 	 * CommandCounterIncrement, not here.)
+	 *
+	 * However, in corner cases where code acts on tables (usually catalogs)
+	 * recursively, we might get here while still processing invalidation
+	 * messages in some outer execution of this function or a sibling.  The
+	 * "cleared" status of the lock tells us whether we really are done
+	 * absorbing relevant inval messages.
 	 */
-	if (res != LOCKACQUIRE_ALREADY_HELD)
+	if (res != LOCKACQUIRE_ALREADY_CLEAR)
+	{
 		AcceptInvalidationMessages();
+		MarkLockClear(locallock);
+	}
 }
 
 /*
@@ -138,11 +148,12 @@ bool
 ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
 {
 	LOCKTAG		tag;
+	LOCALLOCK  *locallock;
 	LockAcquireResult res;
 
 	SetLocktagRelationOid(&tag, relid);
 
-	res = LockAcquire(&tag, lockmode, false, true);
+	res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
 
 	if (res == LOCKACQUIRE_NOT_AVAIL)
 		return false;
@@ -151,8 +162,11 @@ ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
 	 * Now that we have the lock, check for invalidation messages; see notes
 	 * in LockRelationOid.
 	 */
-	if (res != LOCKACQUIRE_ALREADY_HELD)
+	if (res != LOCKACQUIRE_ALREADY_CLEAR)
+	{
 		AcceptInvalidationMessages();
+		MarkLockClear(locallock);
+	}
 
 	return true;
 }
@@ -199,20 +213,24 @@ void
 LockRelation(Relation relation, LOCKMODE lockmode)
 {
 	LOCKTAG		tag;
+	LOCALLOCK  *locallock;
 	LockAcquireResult res;
 
 	SET_LOCKTAG_RELATION(tag,
 						 relation->rd_lockInfo.lockRelId.dbId,
 						 relation->rd_lockInfo.lockRelId.relId);
 
-	res = LockAcquire(&tag, lockmode, false, false);
+	res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
 
 	/*
 	 * Now that we have the lock, check for invalidation messages; see notes
 	 * in LockRelationOid.
 	 */
-	if (res != LOCKACQUIRE_ALREADY_HELD)
+	if (res != LOCKACQUIRE_ALREADY_CLEAR)
+	{
 		AcceptInvalidationMessages();
+		MarkLockClear(locallock);
+	}
 }
 
 /*
@@ -226,13 +244,14 @@ bool
 ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
 {
 	LOCKTAG		tag;
+	LOCALLOCK  *locallock;
 	LockAcquireResult res;
 
 	SET_LOCKTAG_RELATION(tag,
 						 relation->rd_lockInfo.lockRelId.dbId,
 						 relation->rd_lockInfo.lockRelId.relId);
 
-	res = LockAcquire(&tag, lockmode, false, true);
+	res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
 
 	if (res == LOCKACQUIRE_NOT_AVAIL)
 		return false;
@@ -241,8 +260,11 @@ ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
 	 * Now that we have the lock, check for invalidation messages; see notes
 	 * in LockRelationOid.
 	 */
-	if (res != LOCKACQUIRE_ALREADY_HELD)
+	if (res != LOCKACQUIRE_ALREADY_CLEAR)
+	{
 		AcceptInvalidationMessages();
+		MarkLockClear(locallock);
+	}
 
 	return true;
 }
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index dc3d8d9..58d1f32 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -669,6 +669,7 @@ LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
  *		LOCKACQUIRE_NOT_AVAIL		lock not available, and dontWait=true
  *		LOCKACQUIRE_OK				lock successfully acquired
  *		LOCKACQUIRE_ALREADY_HELD	incremented count for lock already held
+ *		LOCKACQUIRE_ALREADY_CLEAR	incremented count for lock already clear
  *
  * In the normal case where dontWait=false and the caller doesn't need to
  * distinguish a freshly acquired lock from one already taken earlier in
@@ -685,7 +686,8 @@ LockAcquire(const LOCKTAG *locktag,
 			bool sessionLock,
 			bool dontWait)
 {
-	return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true);
+	return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait,
+							   true, NULL);
 }
 
 /*
@@ -696,13 +698,17 @@ LockAcquire(const LOCKTAG *locktag,
  * caller to note that the lock table is full and then begin taking
  * extreme action to reduce the number of other lock holders before
  * retrying the action.
+ *
+ * If locallockp isn't NULL, *locallockp receives a pointer to the LOCALLOCK
+ * table entry if a lock is successfully acquired, or NULL if not.
  */
 LockAcquireResult
 LockAcquireExtended(const LOCKTAG *locktag,
 					LOCKMODE lockmode,
 					bool sessionLock,
 					bool dontWait,
-					bool reportMemoryError)
+					bool reportMemoryError,
+					LOCALLOCK **locallockp)
 {
 	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
 	LockMethod	lockMethodTable;
@@ -766,9 +772,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
 		locallock->proclock = NULL;
 		locallock->hashcode = LockTagHashCode(&(localtag.lock));
 		locallock->nLocks = 0;
+		locallock->holdsStrongLockCount = false;
+		locallock->lockCleared = false;
 		locallock->numLockOwners = 0;
 		locallock->maxLockOwners = 8;
-		locallock->holdsStrongLockCount = false;
 		locallock->lockOwners = NULL;	/* in case next line fails */
 		locallock->lockOwners = (LOCALLOCKOWNER *)
 			MemoryContextAlloc(TopMemoryContext,
@@ -789,13 +796,22 @@ LockAcquireExtended(const LOCKTAG *locktag,
 	}
 	hashcode = locallock->hashcode;
 
+	if (locallockp)
+		*locallockp = locallock;
+
 	/*
 	 * If we already hold the lock, we can just increase the count locally.
+	 *
+	 * If lockCleared is already set, caller need not worry about absorbing
+	 * sinval messages related to the lock's object.
 	 */
 	if (locallock->nLocks > 0)
 	{
 		GrantLockLocal(locallock, owner);
-		return LOCKACQUIRE_ALREADY_HELD;
+		if (locallock->lockCleared)
+			return LOCKACQUIRE_ALREADY_CLEAR;
+		else
+			return LOCKACQUIRE_ALREADY_HELD;
 	}
 
 	/*
@@ -877,6 +893,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
 										   hashcode))
 		{
 			AbortStrongLockAcquire();
+			if (locallock->nLocks == 0)
+				RemoveLocalLock(locallock);
+			if (locallockp)
+				*locallockp = NULL;
 			if (reportMemoryError)
 				ereport(ERROR,
 						(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -911,6 +931,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
 	{
 		AbortStrongLockAcquire();
 		LWLockRelease(partitionLock);
+		if (locallock->nLocks == 0)
+			RemoveLocalLock(locallock);
+		if (locallockp)
+			*locallockp = NULL;
 		if (reportMemoryError)
 			ereport(ERROR,
 					(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -976,6 +1000,8 @@ LockAcquireExtended(const LOCKTAG *locktag,
 			LWLockRelease(partitionLock);
 			if (locallock->nLocks == 0)
 				RemoveLocalLock(locallock);
+			if (locallockp)
+				*locallockp = NULL;
 			return LOCKACQUIRE_NOT_AVAIL;
 		}
 
@@ -1646,6 +1672,20 @@ GrantAwaitedLock(void)
 }
 
 /*
+ * MarkLockClear -- mark an acquired lock as "clear"
+ *
+ * This means that we know we have absorbed all sinval messages that other
+ * sessions generated before we acquired this lock, and so we can confidently
+ * assume we know about any catalog changes protected by this lock.
+ */
+void
+MarkLockClear(LOCALLOCK *locallock)
+{
+	Assert(locallock->nLocks > 0);
+	locallock->lockCleared = true;
+}
+
+/*
  * WaitOnLock -- wait to acquire a lock
  *
  * Caller must have set MyProc->heldLocks to reflect locks already held
@@ -1909,6 +1949,15 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
 	if (locallock->nLocks > 0)
 		return true;
 
+	/*
+	 * At this point we can no longer suppose we are clear of invalidation
+	 * messages related to this lock.  Although we'll delete the LOCALLOCK
+	 * object before any intentional return from this routine, it seems worth
+	 * the trouble to explicitly reset lockCleared right now, just in case
+	 * some error prevents us from deleting the LOCALLOCK.
+	 */
+	locallock->lockCleared = false;
+
 	/* Attempt fast release of any lock eligible for the fast path. */
 	if (EligibleForRelationFastPath(locktag, lockmode) &&
 		FastPathLocalUseCount > 0)
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 777da71..ff4df7f 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -408,9 +408,10 @@ typedef struct LOCALLOCK
 	PROCLOCK   *proclock;		/* associated PROCLOCK object, if any */
 	uint32		hashcode;		/* copy of LOCKTAG's hash value */
 	int64		nLocks;			/* total number of times lock is held */
+	bool		holdsStrongLockCount;	/* bumped FastPathStrongRelationLocks */
+	bool		lockCleared;	/* we read all sinval msgs for lock */
 	int			numLockOwners;	/* # of relevant ResourceOwners */
 	int			maxLockOwners;	/* allocated size of array */
-	bool		holdsStrongLockCount;	/* bumped FastPathStrongRelationLocks */
 	LOCALLOCKOWNER *lockOwners; /* dynamically resizable array */
 } LOCALLOCK;
 
@@ -472,7 +473,8 @@ typedef enum
 {
 	LOCKACQUIRE_NOT_AVAIL,		/* lock not available, and dontWait=true */
 	LOCKACQUIRE_OK,				/* lock successfully acquired */
-	LOCKACQUIRE_ALREADY_HELD	/* incremented count for lock already held */
+	LOCKACQUIRE_ALREADY_HELD,	/* incremented count for lock already held */
+	LOCKACQUIRE_ALREADY_CLEAR	/* incremented count for lock already clear */
 } LockAcquireResult;
 
 /* Deadlock states identified by DeadLockCheck() */
@@ -528,8 +530,10 @@ extern LockAcquireResult LockAcquireExtended(const LOCKTAG *locktag,
 					LOCKMODE lockmode,
 					bool sessionLock,
 					bool dontWait,
-					bool report_memory_error);
+					bool reportMemoryError,
+					LOCALLOCK **locallockp);
 extern void AbortStrongLockAcquire(void);
+extern void MarkLockClear(LOCALLOCK *locallock);
 extern bool LockRelease(const LOCKTAG *locktag,
 			LOCKMODE lockmode, bool sessionLock);
 extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);

Reply via email to