From 3393076461ff724968de44c98688f0784c5d492b Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilip.kumar@enterprisedb.com>
Date: Mon, 9 Mar 2020 17:17:53 +0530
Subject: [PATCH v3 2/3] WIP-Extend the patch for handling PageLock

---
 src/backend/access/transam/xact.c | 14 ++++----
 src/backend/storage/lmgr/lmgr.c   | 18 ++++++++++-
 src/backend/storage/lmgr/lock.c   | 67 +++++++++++++++++++++++++++++++++------
 src/include/storage/lock.h        |  5 ++-
 4 files changed, 85 insertions(+), 19 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index ca64712..ec7b7f8 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2267,8 +2267,8 @@ CommitTransaction(void)
 	XactTopFullTransactionId = InvalidFullTransactionId;
 	nParallelCurrentXids = 0;
 
-	/* Reset the relation extension lock held count. */
-	ResetRelExtLockHeldCount();
+	/* Reset the relation extension/page lock held count. */
+	ResetRelExtPageLockHeldCount();
 
 	/*
 	 * done with commit processing, set current transaction state back to
@@ -2738,8 +2738,8 @@ AbortTransaction(void)
 		pgstat_report_xact_timestamp(0);
 	}
 
-	/* Reset the relation extension lock held count. */
-	ResetRelExtLockHeldCount();
+	/* Reset the relation extension/page lock held count. */
+	ResetRelExtPageLockHeldCount();
 
 	/*
 	 * State remains TRANS_ABORT until CleanupTransaction().
@@ -5012,8 +5012,8 @@ AbortSubTransaction(void)
 		AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
 	}
 
-	/* Reset the relation extension lock held count. */
-	ResetRelExtLockHeldCount();
+	/* Reset the relation extension/Page lock held count. */
+	ResetRelExtPageLockHeldCount();
 
 	/*
 	 * Restore the upper transaction's read-only state, too.  This should be
@@ -5074,7 +5074,7 @@ PushTransaction(void)
 	 * Relation extension lock must not be held while starting a new
 	 * sub-transaction.
 	 */
-	Assert(!IsRelExtLockHeld());
+	Assert(!(IsRelExtLockHeld() || IsPageLockHeld()));
 
 	/*
 	 * We keep subtransaction state nodes in TopTransactionContext.
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index 26760f8..b0df063 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -492,6 +492,9 @@ LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
 					 blkno);
 
 	(void) LockAcquire(&tag, lockmode, false, false);
+
+	/* Increment the lock held count. */
+	IncrementPageLockHeldCount();
 }
 
 /*
@@ -504,13 +507,22 @@ bool
 ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
 {
 	LOCKTAG		tag;
+	LockAcquireResult result;
 
 	SET_LOCKTAG_PAGE(tag,
 					 relation->rd_lockInfo.lockRelId.dbId,
 					 relation->rd_lockInfo.lockRelId.relId,
 					 blkno);
 
-	return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
+	result = LockAcquire(&tag, lockmode, false, true);
+	if (result != LOCKACQUIRE_NOT_AVAIL)
+	{
+		/* Increment the lock held count. */
+		IncrementPageLockHeldCount();
+		return true;
+	}
+
+	return false;
 }
 
 /*
@@ -527,6 +539,10 @@ UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
 					 blkno);
 
 	LockRelease(&tag, lockmode, false);
+
+	/* Decrement the lock held count. */
+	DecrementPageLockHeldCount();
+
 }
 
 /*
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 59c9ca3..1fcff29 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -171,13 +171,14 @@ typedef struct TwoPhaseLockRecord
 static int	FastPathLocalUseCount = 0;
 
 /*
- * Count of number of relation extension lock currently held by this backend.
- * We need this counter so that we can ensure that while holding the relation
- * extension lock we are not trying to acquire any other heavy weight lock.
- * Basically, that will ensuring that the proc holding relation extension lock
- * can not wait for any another lock.
+ * Count of number of relation extension/page lock currently held by this
+ * backend. We need this counter so that we can ensure that while holding the
+ * relation extension/page lock we are not trying to acquire any other heavy
+ * weight lock which can cause deadlock.  Basically, that will ensure that the
+ * proc holding relation extension/page lock can not wait for any another lock.
  */
 static int	RelationExtensionLockHeldCount = 0;
+static int	PageLockHeldCount = 0;
 
 /* Macros for manipulating proc->fpLockBits */
 #define FAST_PATH_BITS_PER_SLOT			3
@@ -851,14 +852,24 @@ LockAcquireExtended(const LOCKTAG *locktag,
 
 	/*
 	 * We should not acquire any other lock if we are already holding the
-	 * relation extension lock.  Only exception is that if we are trying to
+	 * relation extension/page lock.  Only exception is that if we are trying to
 	 * acquire the relation extension lock then we can hold the relation
-	 * extension on the same relation.
+	 * extension/page lock.
 	 */
 	Assert(!IsRelExtLockHeld() ||
 		   ((locktag->locktag_type == LOCKTAG_RELATION_EXTEND) && found));
 
 	/*
+	 * XXX While holding the page lock we don't need to ensure that whether we
+	 * are trying to acquire the relation extension lock on the same relation
+	 * or any other relation.  Because the above assert is ensuring that after
+	 * holding the relation extension lock we are not going to wait for any
+	 * other process.
+	 */
+	Assert(!IsPageLockHeld() ||
+			(locktag->locktag_type == LOCKTAG_RELATION_EXTEND));	
+
+	/*
 	 * Prepare to emit a WAL record if acquisition of this lock needs to be
 	 * replayed in a standby server.
 	 *
@@ -4547,12 +4558,48 @@ DecrementRelExtLockHeldCount()
 }
 
 /*
- * ResetRelExtLockHeldCount
+ * ResetRelExtPageLockHeldCount
  *
- * Reset the relation extension lock hold count;
+ * Reset the relation extension/page lock hold count;
  */
 void
-ResetRelExtLockHeldCount()
+ResetRelExtPageLockHeldCount()
 {
 	RelationExtensionLockHeldCount = 0;
+	PageLockHeldCount = 0;
+}
+
+/*
+ * IsRelExtLockHeld
+ *
+ * Is relation extension lock is held by this backend.
+ */
+bool
+IsPageLockHeld()
+{
+	return PageLockHeldCount > 0;
+}
+
+/*
+ * IncrementPageLockHeldCount
+ *
+ * Increment the page lock hold count.
+ */
+void
+IncrementPageLockHeldCount()
+{
+	PageLockHeldCount++;
+}
+
+/*
+ * DecrementPageLockHeldCount
+ *
+ * Decrement the page lock hold count;
+ */
+void
+DecrementPageLockHeldCount()
+{
+	/* We must hold the page lock. */
+	Assert(PageLockHeldCount > 0);
+	PageLockHeldCount--;
 }
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index c31a5f3..ed8fbdc 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -585,7 +585,10 @@ extern int	LockWaiterCount(const LOCKTAG *locktag);
 extern bool IsRelExtLockHeld(void);
 extern void IncrementRelExtLockHeldCount(void);
 extern void DecrementRelExtLockHeldCount(void);
-extern void ResetRelExtLockHeldCount(void);
+extern void ResetRelExtPageLockHeldCount(void);
+extern bool IsPageLockHeld(void);
+extern void IncrementPageLockHeldCount(void);
+extern void DecrementPageLockHeldCount(void);
 
 #ifdef LOCK_DEBUG
 extern void DumpLocks(PGPROC *proc);
-- 
1.8.3.1

