I wrote:
> 1. You set up transformRuleStmt to insert AccessExclusiveLock into
> the "OLD" and "NEW" RTEs for a view.  This is surely wrong; we do
> not want to take exclusive lock on a view just to run a query using
> the view.  It should (usually, anyway) just be AccessShareLock.
> However, because addRangeTableEntryForRelation insists that you
> hold the requested lock type *now*, just changing the parameter
> to AccessShareLock doesn't work.
> I hacked around this for the moment by passing NoLock to
> addRangeTableEntryForRelation and then changing rte->lockmode
> after it returns, but man that's ugly.  It makes me wonder whether
> addRangeTableEntryForRelation should be checking the lockmode at all.

It occurred to me that it'd be reasonable to insist that the caller
holds a lock *at least as strong* as the one being recorded in the RTE,
and that there's also been discussions about verifying that some lock
is held when something like heap_open(foo, NoLock) is attempted.
So I dusted off the part of 0001 that did that, producing the
attached delta patch.

Unfortunately, I can't commit this, because it exposes at least two
pre-existing bugs :-(.  So we'll need to fix those first, which seems
like it should be a separate thread.  I'm just parking this here for
the moment.

I think that the call sites should ultimately look like

        Assert(CheckRelationLockedByMe(...));

but for hunting down the places where the assertion currently fails,
it's more convenient if it's just an elog(WARNING).

                        regards, tom lane

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 3395445..f8a55ee 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1137,6 +1137,11 @@ relation_open(Oid relationId, LOCKMODE lockmode)
 	if (!RelationIsValid(r))
 		elog(ERROR, "could not open relation with OID %u", relationId);
 
+	if (lockmode == NoLock &&
+		!CheckRelationLockedByMe(r, AccessShareLock, true))
+		elog(WARNING, "relation_open: no lock held on %s",
+			 RelationGetRelationName(r));
+
 	/* Make note that we've accessed a temporary relation */
 	if (RelationUsesLocalBuffers(r))
 		MyXactFlags |= XACT_FLAGS_ACCESSEDTEMPREL;
@@ -1183,6 +1188,11 @@ try_relation_open(Oid relationId, LOCKMODE lockmode)
 	if (!RelationIsValid(r))
 		elog(ERROR, "could not open relation with OID %u", relationId);
 
+	if (lockmode == NoLock &&
+		!CheckRelationLockedByMe(r, AccessShareLock, true))
+		elog(WARNING, "try_relation_open: no lock held on %s",
+			 RelationGetRelationName(r));
+
 	/* Make note that we've accessed a temporary relation */
 	if (RelationUsesLocalBuffers(r))
 		MyXactFlags |= XACT_FLAGS_ACCESSEDTEMPREL;
@@ -8084,6 +8094,10 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *
 
 	idx_rel = RelationIdGetRelation(replidindex);
 
+	if (!CheckRelationLockedByMe(idx_rel, AccessShareLock, true))
+		elog(WARNING, "ExtractReplicaIdentity: no lock held on %s",
+			 RelationGetRelationName(idx_rel));
+
 	/* deform tuple, so we have fast access to columns */
 	heap_deform_tuple(tp, desc, values, nulls);
 
diff --git a/src/backend/parser/parse_relation.c b/src/backend/parser/parse_relation.c
index da600dc..aaf5544 100644
--- a/src/backend/parser/parse_relation.c
+++ b/src/backend/parser/parse_relation.c
@@ -28,6 +28,7 @@
 #include "parser/parse_enr.h"
 #include "parser/parse_relation.h"
 #include "parser/parse_type.h"
+#include "storage/lmgr.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
@@ -1272,7 +1273,7 @@ addRangeTableEntry(ParseState *pstate,
  *
  * lockmode is the lock type required for query execution; it must be one
  * of AccessShareLock, RowShareLock, or RowExclusiveLock depending on the
- * RTE's role within the query.  The caller should always hold that lock mode
+ * RTE's role within the query.  The caller must hold that lock mode
  * or a stronger one.
  *
  * Note: properly, lockmode should be declared LOCKMODE not int, but that
@@ -1295,6 +1296,9 @@ addRangeTableEntryForRelation(ParseState *pstate,
 	Assert(lockmode == AccessShareLock ||
 		   lockmode == RowShareLock ||
 		   lockmode == RowExclusiveLock);
+	if (!CheckRelationLockedByMe(rel, lockmode, true))
+		elog(WARNING, "addRangeTableEntryForRelation: no lock held on %s",
+			 RelationGetRelationName(rel));
 
 	rte->rtekind = RTE_RELATION;
 	rte->alias = alias;
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index dc0a439..517ff8e 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -288,6 +288,51 @@ UnlockRelation(Relation relation, LOCKMODE lockmode)
 }
 
 /*
+ *		CheckRelationLockedByMe
+ *
+ * Returns true if current transaction holds a lock on 'relation' of mode
+ * 'lockmode'.  If 'orstronger' is true, a stronger lockmode is also OK.
+ * ("Stronger" is defined as "numerically higher", which is a bit
+ * semantically dubious but is OK for the purposes we use this for.)
+ */
+bool
+CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode, bool orstronger)
+{
+	LOCKTAG		tag;
+
+	SET_LOCKTAG_RELATION(tag,
+						 relation->rd_lockInfo.lockRelId.dbId,
+						 relation->rd_lockInfo.lockRelId.relId);
+
+	if (LockHeldByMe(&tag, lockmode))
+		return true;
+
+	if (orstronger)
+	{
+		LOCKMODE	slockmode;
+
+		for (slockmode = lockmode + 1;
+			 slockmode <= AccessExclusiveLock;
+			 slockmode++)
+		{
+			if (LockHeldByMe(&tag, slockmode))
+			{
+#ifdef NOT_USED
+				/* Sometimes this might be useful for debugging purposes */
+				elog(WARNING, "lock mode %s substituted for %s on relation %s",
+					 GetLockmodeName(tag.locktag_lockmethodid, slockmode),
+					 GetLockmodeName(tag.locktag_lockmethodid, lockmode),
+					 RelationGetRelationName(relation));
+#endif
+				return true;
+			}
+		}
+	}
+
+	return false;
+}
+
+/*
  *		LockHasWaitersRelation
  *
  * This is a function to check whether someone else is waiting for a
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 831ae62..10f6f60 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -564,6 +564,30 @@ DoLockModesConflict(LOCKMODE mode1, LOCKMODE mode2)
 }
 
 /*
+ * LockHeldByMe -- test whether lock 'locktag' is held with mode 'lockmode'
+ *		by the current transaction
+ */
+bool
+LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode)
+{
+	LOCALLOCKTAG localtag;
+	LOCALLOCK  *locallock;
+
+	/*
+	 * See if there is a LOCALLOCK entry for this lock and lockmode
+	 */
+	MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
+	localtag.lock = *locktag;
+	localtag.mode = lockmode;
+
+	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
+										  (void *) &localtag,
+										  HASH_FIND, NULL);
+
+	return (locallock && locallock->nLocks > 0);
+}
+
+/*
  * LockHasWaiters -- look up 'locktag' and check if releasing this
  *		lock would wake up other processes waiting for it.
  */
diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h
index a217de9..e5356b7 100644
--- a/src/include/storage/lmgr.h
+++ b/src/include/storage/lmgr.h
@@ -45,6 +45,8 @@ extern void UnlockRelationOid(Oid relid, LOCKMODE lockmode);
 extern void LockRelation(Relation relation, LOCKMODE lockmode);
 extern bool ConditionalLockRelation(Relation relation, LOCKMODE lockmode);
 extern void UnlockRelation(Relation relation, LOCKMODE lockmode);
+extern bool CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode,
+						bool orstronger);
 extern bool LockHasWaitersRelation(Relation relation, LOCKMODE lockmode);
 
 extern void LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode);
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index ff4df7f..a37fda7 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -540,6 +540,7 @@ extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
 extern void LockReleaseSession(LOCKMETHODID lockmethodid);
 extern void LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks);
 extern void LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks);
+extern bool LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode);
 extern bool LockHasWaiters(const LOCKTAG *locktag,
 			   LOCKMODE lockmode, bool sessionLock);
 extern VirtualTransactionId *GetLockConflicts(const LOCKTAG *locktag,

Reply via email to