From 21779b4f1e2fa4c7dc6744e55ce4ceb401fdde50 Mon Sep 17 00:00:00 2001
From: Alexander Korotkov <akorotkov@postgresql.org>
Date: Thu, 30 Jun 2022 22:07:12 +0300
Subject: [PATCH 2/2] Allow locking updated tuples in tuple_update() and
 tuple_delete()

Currently, in read committed transaction isolation mode (default), we have the
following sequence of actions when tuple_update()/tuple_delete() finds
the tuple updated by concurrent transaction.

1. Attempt to update/delete tuple with tuple_update()/tuple_delete(), which
   returns TM_Updated.
2. Lock tuple with tuple_lock().
3. Re-evaluate plan qual (recheck if we still need to update/delete and
   calculate the new tuple for update).
4. Second attempt to update/delete tuple with tuple_update()/tuple_delete().
   This attempt should be successful, since the tuple was previously locked.

This patch eliminates step 2 by taking the lock during first
tuple_update()/tuple_delete() call.  Heap table access methods could save
efforts by traversing chain of updated tuples once instead of twice.  Future
undo-based table access methods, which will start from the latest row version.
can immediately place a lock there.

The code in nodeModifyTable.c is simplified by removing the nested switch/case.

Discussion: https://postgr.es/m/CAPpHfdua-YFw3XTprfutzGp28xXLigFtzNbuFY8yPhqeq6X5kg%40mail.gmail.com
Reviewed-by: Aleksander Alekseev, Pavel Borisov, Vignesh C, Mason Sharp
---
 src/backend/access/heap/heapam.c         | 117 ++++++++---
 src/backend/access/heap/heapam_handler.c |  50 ++++-
 src/backend/access/table/tableam.c       |   6 +-
 src/backend/executor/nodeModifyTable.c   | 247 ++++++++---------------
 src/include/access/heapam.h              |  26 ++-
 src/include/access/tableam.h             |  32 ++-
 6 files changed, 267 insertions(+), 211 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 388df94a442..26623d8e25e 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2667,7 +2667,8 @@ xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask)
 TM_Result
 heap_delete(Relation relation, ItemPointer tid,
 			CommandId cid, Snapshot crosscheck, bool wait,
-			TM_FailureData *tmfd, bool changingPart)
+			TM_FailureData *tmfd, bool changingPart, Snapshot snapshot,
+			GetSlotCallback lockedSlotCallback, void *lockedSlotCallbackArg)
 {
 	TM_Result	result;
 	TransactionId xid = GetCurrentTransactionId();
@@ -2865,6 +2866,26 @@ l1:
 			result = TM_Updated;
 	}
 
+	if (result == TM_Updated && lockedSlotCallback)
+	{
+		HeapLockContext context = {buffer, vmbuffer, have_tuple_lock};
+		TupleTableSlot *slot;
+
+		slot = lockedSlotCallback(lockedSlotCallbackArg);
+
+		result = heapam_tuple_lock_internal(relation, tid, snapshot,
+											slot, cid, LockTupleExclusive,
+											wait ? LockWaitBlock : LockWaitError,
+											TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
+											tmfd, &context);
+		if (result == TM_Ok)
+		{
+			tmfd->traversed = true;
+			return TM_Updated;
+		}
+		return result;
+	}
+
 	if (result != TM_Ok)
 	{
 		Assert(result == TM_SelfModified ||
@@ -3088,7 +3109,8 @@ simple_heap_delete(Relation relation, ItemPointer tid)
 	result = heap_delete(relation, tid,
 						 GetCurrentCommandId(true), InvalidSnapshot,
 						 true /* wait for commit */ ,
-						 &tmfd, false /* changingPart */ );
+						 &tmfd, false /* changingPart */ ,
+						 SnapshotAny, NULL, NULL);
 	switch (result)
 	{
 		case TM_SelfModified:
@@ -3128,7 +3150,8 @@ simple_heap_delete(Relation relation, ItemPointer tid)
 TM_Result
 heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
 			CommandId cid, Snapshot crosscheck, bool wait,
-			TM_FailureData *tmfd, LockTupleMode *lockmode)
+			TM_FailureData *tmfd, LockTupleMode *lockmode, Snapshot snapshot,
+			GetSlotCallback lockedSlotCallback, void *lockedSlotCallbackArg)
 {
 	TM_Result	result;
 	TransactionId xid = GetCurrentTransactionId();
@@ -3495,6 +3518,31 @@ l2:
 		}
 	}
 
+	if (result == TM_Updated && lockedSlotCallback)
+	{
+		HeapLockContext context = {buffer, vmbuffer, have_tuple_lock};
+		TupleTableSlot *slot;
+
+		slot = lockedSlotCallback(lockedSlotCallbackArg);
+
+		result = heapam_tuple_lock_internal(relation, otid, snapshot,
+											slot, cid, *lockmode,
+											wait ? LockWaitBlock : LockWaitError,
+											TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
+											tmfd, &context);
+		bms_free(hot_attrs);
+		bms_free(key_attrs);
+		bms_free(id_attrs);
+		bms_free(modified_attrs);
+		bms_free(interesting_attrs);
+		if (result == TM_Ok)
+		{
+			tmfd->traversed = true;
+			return TM_Updated;
+		}
+		return result;
+	}
+
 	if (result != TM_Ok)
 	{
 		Assert(result == TM_SelfModified ||
@@ -4173,7 +4221,7 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
 	result = heap_update(relation, otid, tup,
 						 GetCurrentCommandId(true), InvalidSnapshot,
 						 true /* wait for commit */ ,
-						 &tmfd, &lockmode);
+						 &tmfd, &lockmode, SnapshotAny, NULL, NULL);
 	switch (result)
 	{
 		case TM_SelfModified:
@@ -4255,13 +4303,14 @@ TM_Result
 heap_lock_tuple(Relation relation, HeapTuple tuple,
 				CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy,
 				bool follow_updates,
-				Buffer *buffer, TM_FailureData *tmfd)
+				HeapLockContext *context, TM_FailureData *tmfd)
 {
 	TM_Result	result;
 	ItemPointer tid = &(tuple->t_self);
 	ItemId		lp;
 	Page		page;
-	Buffer		vmbuffer = InvalidBuffer;
+	Buffer		buffer = context->buffer,
+				vmbuffer = context->vmbuffer;
 	BlockNumber block;
 	TransactionId xid,
 				xmax;
@@ -4270,10 +4319,11 @@ heap_lock_tuple(Relation relation, HeapTuple tuple,
 				new_infomask2;
 	bool		first_time = true;
 	bool		skip_tuple_lock = false;
-	bool		have_tuple_lock = false;
+	bool		have_tuple_lock = context->have_tuple_lock;
 	bool		cleared_all_frozen = false;
 
-	*buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
+	if (BufferIsInvalid(buffer))
+		buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
 	block = ItemPointerGetBlockNumber(tid);
 
 	/*
@@ -4282,12 +4332,13 @@ heap_lock_tuple(Relation relation, HeapTuple tuple,
 	 * in the middle of changing this, so we'll need to recheck after we have
 	 * the lock.
 	 */
-	if (PageIsAllVisible(BufferGetPage(*buffer)))
+	if (BufferIsInvalid(vmbuffer) && PageIsAllVisible(BufferGetPage(buffer)))
 		visibilitymap_pin(relation, block, &vmbuffer);
 
-	LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+	if (BufferIsInvalid(context->buffer))
+		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
-	page = BufferGetPage(*buffer);
+	page = BufferGetPage(buffer);
 	lp = PageGetItemId(page, ItemPointerGetOffsetNumber(tid));
 	Assert(ItemIdIsNormal(lp));
 
@@ -4296,7 +4347,7 @@ heap_lock_tuple(Relation relation, HeapTuple tuple,
 	tuple->t_tableOid = RelationGetRelid(relation);
 
 l3:
-	result = HeapTupleSatisfiesUpdate(tuple, cid, *buffer);
+	result = HeapTupleSatisfiesUpdate(tuple, cid, buffer);
 
 	if (result == TM_Invisible)
 	{
@@ -4325,7 +4376,7 @@ l3:
 		infomask2 = tuple->t_data->t_infomask2;
 		ItemPointerCopy(&tuple->t_data->t_ctid, &t_ctid);
 
-		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 
 		/*
 		 * If any subtransaction of the current top transaction already holds
@@ -4477,12 +4528,12 @@ l3:
 					{
 						result = res;
 						/* recovery code expects to have buffer lock held */
-						LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+						LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 						goto failed;
 					}
 				}
 
-				LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+				LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
 				/*
 				 * Make sure it's still an appropriate lock, else start over.
@@ -4517,7 +4568,7 @@ l3:
 			if (HEAP_XMAX_IS_LOCKED_ONLY(infomask) &&
 				!HEAP_XMAX_IS_EXCL_LOCKED(infomask))
 			{
-				LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+				LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
 				/*
 				 * Make sure it's still an appropriate lock, else start over.
@@ -4545,7 +4596,7 @@ l3:
 					 * No conflict, but if the xmax changed under us in the
 					 * meantime, start over.
 					 */
-					LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+					LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 					if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) ||
 						!TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data),
 											 xwait))
@@ -4557,7 +4608,7 @@ l3:
 			}
 			else if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask))
 			{
-				LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+				LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
 				/* if the xmax changed in the meantime, start over */
 				if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) ||
@@ -4585,7 +4636,7 @@ l3:
 			TransactionIdIsCurrentTransactionId(xwait))
 		{
 			/* ... but if the xmax changed in the meantime, start over */
-			LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 			if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) ||
 				!TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data),
 									 xwait))
@@ -4607,7 +4658,7 @@ l3:
 		 */
 		if (require_sleep && (result == TM_Updated || result == TM_Deleted))
 		{
-			LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 			goto failed;
 		}
 		else if (require_sleep)
@@ -4632,7 +4683,7 @@ l3:
 				 */
 				result = TM_WouldBlock;
 				/* recovery code expects to have buffer lock held */
-				LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+				LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 				goto failed;
 			}
 
@@ -4658,7 +4709,7 @@ l3:
 						{
 							result = TM_WouldBlock;
 							/* recovery code expects to have buffer lock held */
-							LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+							LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 							goto failed;
 						}
 						break;
@@ -4698,7 +4749,7 @@ l3:
 						{
 							result = TM_WouldBlock;
 							/* recovery code expects to have buffer lock held */
-							LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+							LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 							goto failed;
 						}
 						break;
@@ -4724,12 +4775,12 @@ l3:
 				{
 					result = res;
 					/* recovery code expects to have buffer lock held */
-					LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+					LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 					goto failed;
 				}
 			}
 
-			LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
 			/*
 			 * xwait is done, but if xwait had just locked the tuple then some
@@ -4751,7 +4802,7 @@ l3:
 				 * don't check for this in the multixact case, because some
 				 * locker transactions might still be running.
 				 */
-				UpdateXmaxHintBits(tuple->t_data, *buffer, xwait);
+				UpdateXmaxHintBits(tuple->t_data, buffer, xwait);
 			}
 		}
 
@@ -4810,9 +4861,9 @@ failed:
 	 */
 	if (vmbuffer == InvalidBuffer && PageIsAllVisible(page))
 	{
-		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 		visibilitymap_pin(relation, block, &vmbuffer);
-		LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 		goto l3;
 	}
 
@@ -4875,7 +4926,7 @@ failed:
 		cleared_all_frozen = true;
 
 
-	MarkBufferDirty(*buffer);
+	MarkBufferDirty(buffer);
 
 	/*
 	 * XLOG stuff.  You might think that we don't need an XLOG record because
@@ -4895,7 +4946,7 @@ failed:
 		XLogRecPtr	recptr;
 
 		XLogBeginInsert();
-		XLogRegisterBuffer(0, *buffer, REGBUF_STANDARD);
+		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
 
 		xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self);
 		xlrec.locking_xid = xid;
@@ -4916,7 +4967,7 @@ failed:
 	result = TM_Ok;
 
 out_locked:
-	LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 
 out_unlocked:
 	if (BufferIsValid(vmbuffer))
@@ -4934,6 +4985,10 @@ out_unlocked:
 	if (have_tuple_lock)
 		UnlockTupleTuplock(relation, tid, mode);
 
+	context->buffer = buffer;
+	context->vmbuffer = InvalidBuffer;
+	context->have_tuple_lock = false;
+
 	return result;
 }
 
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index c4b1916d36e..6f024b5cccd 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -299,14 +299,21 @@ heapam_tuple_complete_speculative(Relation relation, TupleTableSlot *slot,
 static TM_Result
 heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
 					Snapshot snapshot, Snapshot crosscheck, bool wait,
-					TM_FailureData *tmfd, bool changingPart)
+					TM_FailureData *tmfd, bool changingPart,
+					GetSlotCallback lockedSlotCallback,
+					void *lockedSlotCallbackArg)
 {
+	TM_Result	result;
+
 	/*
 	 * Currently Deleting of index tuples are handled at vacuum, in case if
 	 * the storage itself is cleaning the dead tuples by itself, it is the
 	 * time to call the index tuple deletion also.
 	 */
-	return heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart);
+	result = heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart,
+						 snapshot, lockedSlotCallback, lockedSlotCallbackArg);
+
+	return result;
 }
 
 
@@ -314,7 +321,9 @@ static TM_Result
 heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
 					CommandId cid, Snapshot snapshot, Snapshot crosscheck,
 					bool wait, TM_FailureData *tmfd,
-					LockTupleMode *lockmode, bool *update_indexes)
+					LockTupleMode *lockmode, bool *update_indexes,
+					GetSlotCallback lockedSlotCallback,
+					void *lockedSlotCallbackArg)
 {
 	bool		shouldFree = true;
 	HeapTuple	tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
@@ -325,7 +334,7 @@ heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
 	tuple->t_tableOid = slot->tts_tableOid;
 
 	result = heap_update(relation, otid, tuple, cid, crosscheck, wait,
-						 tmfd, lockmode);
+						 tmfd, lockmode, snapshot, lockedSlotCallback, lockedSlotCallbackArg);
 	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
 
 	/*
@@ -349,12 +358,27 @@ heapam_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot,
 				  TupleTableSlot *slot, CommandId cid, LockTupleMode mode,
 				  LockWaitPolicy wait_policy, uint8 flags,
 				  TM_FailureData *tmfd)
+{
+	return heapam_tuple_lock_internal(relation, tid, snapshot, slot, cid, mode,
+									  wait_policy, flags, tmfd, NULL);
+}
+
+/*
+ * This routine does the work for heapam_tuple_lock(), but also support
+ * `updated` to re-use the work done by heapam_tuple_update() or
+ * heapam_tuple_delete() on fetching tuple and checking its visibility.
+ */
+TM_Result
+heapam_tuple_lock_internal(Relation relation, ItemPointer tid, Snapshot snapshot,
+						   TupleTableSlot *slot, CommandId cid, LockTupleMode mode,
+						   LockWaitPolicy wait_policy, uint8 flags,
+						   TM_FailureData *tmfd, HeapLockContext *existingContext)
 {
 	BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
 	TM_Result	result;
-	Buffer		buffer;
 	HeapTuple	tuple = &bslot->base.tupdata;
 	bool		follow_updates;
+	Buffer		buffer = InvalidBuffer;
 
 	follow_updates = (flags & TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS) != 0;
 	tmfd->traversed = false;
@@ -363,8 +387,20 @@ heapam_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot,
 
 tuple_lock_retry:
 	tuple->t_self = *tid;
-	result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy,
-							 follow_updates, &buffer, tmfd);
+	if (!existingContext)
+	{
+		HeapLockContext context = {InvalidBuffer, InvalidBuffer, false};
+		result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy,
+								 follow_updates, &context, tmfd);
+		buffer = context.buffer;
+	}
+	else
+	{
+		result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy,
+								 follow_updates, existingContext, tmfd);
+		buffer = existingContext->buffer;
+		existingContext = NULL;
+	}
 
 	if (result == TM_Updated &&
 		(flags & TUPLE_LOCK_FLAG_FIND_LAST_VERSION))
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index ef0d34fceee..a87b86ff614 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -306,7 +306,8 @@ simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot)
 								GetCurrentCommandId(true),
 								snapshot, InvalidSnapshot,
 								true /* wait for commit */ ,
-								&tmfd, false /* changingPart */ );
+								&tmfd, false /* changingPart */ ,
+								NULL, NULL);
 
 	switch (result)
 	{
@@ -355,7 +356,8 @@ simple_table_tuple_update(Relation rel, ItemPointer otid,
 								GetCurrentCommandId(true),
 								snapshot, InvalidSnapshot,
 								true /* wait for commit */ ,
-								&tmfd, &lockmode, update_indexes);
+								&tmfd, &lockmode, update_indexes,
+								NULL, NULL);
 
 	switch (result)
 	{
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 87a4e553b9e..e7860d770cf 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -1326,6 +1326,23 @@ ExecDeletePrologue(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 	return true;
 }
 
+typedef struct
+{
+	EPQState *epqstate;
+	ResultRelInfo *resultRelInfo;
+} GetEPQSlotArg;
+
+
+static TupleTableSlot *
+GetEPQSlot(void *arg)
+{
+	GetEPQSlotArg *slotArg = (GetEPQSlotArg *) arg;
+
+	return EvalPlanQualSlot(slotArg->epqstate,
+							slotArg->resultRelInfo->ri_RelationDesc,
+							slotArg->resultRelInfo->ri_RangeTableIndex);
+}
+
 /*
  * ExecDeleteAct -- subroutine for ExecDelete
  *
@@ -1338,6 +1355,7 @@ ExecDeleteAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 			  ItemPointer tupleid, bool changingPart)
 {
 	EState	   *estate = context->estate;
+	GetEPQSlotArg slotArg = {.epqstate = context->epqstate, .resultRelInfo = resultRelInfo};
 
 	return table_tuple_delete(resultRelInfo->ri_RelationDesc, tupleid,
 							  estate->es_output_cid,
@@ -1345,7 +1363,9 @@ ExecDeleteAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 							  estate->es_crosscheck_snapshot,
 							  true /* wait for commit */ ,
 							  &context->tmfd,
-							  changingPart);
+							  changingPart,
+							  GetEPQSlot,
+							  &slotArg);
 }
 
 /*
@@ -1543,102 +1563,46 @@ ldelete:
 								 errmsg("could not serialize access due to concurrent update")));
 
 					/*
-					 * Already know that we're going to need to do EPQ, so
-					 * fetch tuple directly into the right slot.
+					 * ExecDeleteAct() has already locked the old tuple for
+					 * us. Now we need to copy it to the right slot.
 					 */
 					EvalPlanQualBegin(context->epqstate);
 					inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc,
 												 resultRelInfo->ri_RangeTableIndex);
-
-					result = table_tuple_lock(resultRelationDesc, tupleid,
-											  estate->es_snapshot,
-											  inputslot, estate->es_output_cid,
-											  LockTupleExclusive, LockWaitBlock,
-											  TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
-											  &context->tmfd);
-
-					switch (result)
+					/*
+					 * Save locked table for further processing of
+					 * RETURNING clause.
+					 */
+					if (processReturning &&
+						resultRelInfo->ri_projectReturning &&
+						!resultRelInfo->ri_FdwRoutine)
 					{
-						case TM_Ok:
-							Assert(context->tmfd.traversed);
-
-							/*
-							 * Save locked tuple for further processing of
-							 * RETURNING clause.
-							 */
-							if (processReturning &&
-								resultRelInfo->ri_projectReturning &&
-								!resultRelInfo->ri_FdwRoutine)
-							{
-								TupleTableSlot *returningSlot;
-								returningSlot = ExecGetReturningSlot(estate, resultRelInfo);
-								ExecCopySlot(returningSlot, inputslot);
-								ExecMaterializeSlot(returningSlot);
-							}
-
-							epqslot = EvalPlanQual(context->epqstate,
-												   resultRelationDesc,
-												   resultRelInfo->ri_RangeTableIndex,
-												   inputslot);
-							if (TupIsNull(epqslot))
-								/* Tuple not passing quals anymore, exiting... */
-								return NULL;
-
-							/*
-							 * If requested, skip delete and pass back the
-							 * updated row.
-							 */
-							if (epqreturnslot)
-							{
-								*epqreturnslot = epqslot;
-								return NULL;
-							}
-							else
-								goto ldelete;
-
-						case TM_SelfModified:
-
-							/*
-							 * This can be reached when following an update
-							 * chain from a tuple updated by another session,
-							 * reaching a tuple that was already updated in
-							 * this transaction. If previously updated by this
-							 * command, ignore the delete, otherwise error
-							 * out.
-							 *
-							 * See also TM_SelfModified response to
-							 * table_tuple_delete() above.
-							 */
-							if (context->tmfd.cmax != estate->es_output_cid)
-								ereport(ERROR,
-										(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
-										 errmsg("tuple to be deleted was already modified by an operation triggered by the current command"),
-										 errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
-							return NULL;
-
-						case TM_Deleted:
-							/* tuple already deleted; nothing to do */
-							return NULL;
+						TupleTableSlot *returningSlot;
+						returningSlot = ExecGetReturningSlot(estate, resultRelInfo);
+						ExecCopySlot(returningSlot, inputslot);
+						ExecMaterializeSlot(returningSlot);
+					}
 
-						default:
+					Assert(context->tmfd.traversed);
+					epqslot = EvalPlanQual(context->epqstate,
+										   resultRelationDesc,
+										   resultRelInfo->ri_RangeTableIndex,
+										   inputslot);
+					if (TupIsNull(epqslot))
+						/* Tuple not passing quals anymore, exiting... */
+						return NULL;
 
-							/*
-							 * TM_Invisible should be impossible because we're
-							 * waiting for updated row versions, and would
-							 * already have errored out if the first version
-							 * is invisible.
-							 *
-							 * TM_Updated should be impossible, because we're
-							 * locking the latest version via
-							 * TUPLE_LOCK_FLAG_FIND_LAST_VERSION.
-							 */
-							elog(ERROR, "unexpected table_tuple_lock status: %u",
-								 result);
-							return NULL;
+					/*
+					 * If requested, skip delete and pass back the updated
+					 * row.
+					 */
+					if (epqreturnslot)
+					{
+						*epqreturnslot = epqslot;
+						return NULL;
 					}
-
-					Assert(false);
-					break;
+					else
+						goto ldelete;
 				}
 
 			case TM_Deleted:
@@ -1980,6 +1944,7 @@ ExecUpdateAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 	Relation	resultRelationDesc = resultRelInfo->ri_RelationDesc;
 	bool		partition_constraint_failed;
 	TM_Result	result;
+	GetEPQSlotArg slotArg = {.epqstate = context->epqstate, .resultRelInfo = resultRelInfo};
 
 	updateCxt->crossPartUpdate = false;
 
@@ -2108,7 +2073,9 @@ lreplace:
 								estate->es_crosscheck_snapshot,
 								true /* wait for commit */ ,
 								&context->tmfd, &updateCxt->lockmode,
-								&updateCxt->updateIndexes);
+								&updateCxt->updateIndexes,
+								GetEPQSlot,
+								&slotArg);
 	if (result == TM_Ok)
 		updateCxt->updated = true;
 
@@ -2259,7 +2226,7 @@ ExecCrossPartitionUpdateForeignKey(ModifyTableContext *context,
 static TupleTableSlot *
 ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 		   ItemPointer tupleid, HeapTuple oldtuple, TupleTableSlot *slot,
-		   bool canSetTag)
+		   bool canSetTag, bool locked)
 {
 	EState	   *estate = context->estate;
 	Relation	resultRelationDesc = resultRelInfo->ri_RelationDesc;
@@ -2376,81 +2343,39 @@ redo_act:
 						ereport(ERROR,
 								(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 								 errmsg("could not serialize access due to concurrent update")));
+					Assert(!locked);
 
 					/*
-					 * Already know that we're going to need to do EPQ, so
-					 * fetch tuple directly into the right slot.
+					 * ExecUpdateAct() has already locked the old tuple for
+					 * us. Now we need to copy it to the right slot.
 					 */
 					inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc,
 												 resultRelInfo->ri_RangeTableIndex);
 
-					result = table_tuple_lock(resultRelationDesc, tupleid,
-											  estate->es_snapshot,
-											  inputslot, estate->es_output_cid,
-											  updateCxt.lockmode, LockWaitBlock,
-											  TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
-											  &context->tmfd);
-
-					switch (result)
-					{
-						case TM_Ok:
-							Assert(context->tmfd.traversed);
-
-							/* Make sure ri_oldTupleSlot is initialized. */
-							if (unlikely(!resultRelInfo->ri_projectNewInfoValid))
-								ExecInitUpdateProjection(context->mtstate,
-														 resultRelInfo);
-
-							/*
-							 * Save the locked tuple for further calculation of
-							 * the new tuple.
-							 */
-							oldSlot = resultRelInfo->ri_oldTupleSlot;
-							ExecCopySlot(oldSlot, inputslot);
-							ExecMaterializeSlot(oldSlot);
-
-							epqslot = EvalPlanQual(context->epqstate,
-												   resultRelationDesc,
-												   resultRelInfo->ri_RangeTableIndex,
-												   inputslot);
-							if (TupIsNull(epqslot))
-								/* Tuple not passing quals anymore, exiting... */
-								return NULL;
-
-							slot = ExecGetUpdateNewTuple(resultRelInfo,
-														 epqslot, oldSlot);
-							goto redo_act;
+					/* Make sure ri_oldTupleSlot is initialized. */
+					if (unlikely(!resultRelInfo->ri_projectNewInfoValid))
+						ExecInitUpdateProjection(context->mtstate,
+												resultRelInfo);
 
-						case TM_Deleted:
-							/* tuple already deleted; nothing to do */
-							return NULL;
-
-						case TM_SelfModified:
-
-							/*
-							 * This can be reached when following an update
-							 * chain from a tuple updated by another session,
-							 * reaching a tuple that was already updated in
-							 * this transaction. If previously modified by
-							 * this command, ignore the redundant update,
-							 * otherwise error out.
-							 *
-							 * See also TM_SelfModified response to
-							 * table_tuple_update() above.
-							 */
-							if (context->tmfd.cmax != estate->es_output_cid)
-								ereport(ERROR,
-										(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
-										 errmsg("tuple to be updated was already modified by an operation triggered by the current command"),
-										 errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
-							return NULL;
-
-						default:
-							/* see table_tuple_lock call in ExecDelete() */
-							elog(ERROR, "unexpected table_tuple_lock status: %u",
-								 result);
-							return NULL;
-					}
+					/*
+					 * Save the locked tuple for further calculation of
+					 * the new tuple.
+					 */
+					oldSlot = resultRelInfo->ri_oldTupleSlot;
+					ExecCopySlot(oldSlot, inputslot);
+					ExecMaterializeSlot(oldSlot);
+					Assert(context->tmfd.traversed);
+
+					epqslot = EvalPlanQual(context->epqstate,
+										   resultRelationDesc,
+										   resultRelInfo->ri_RangeTableIndex,
+										   inputslot);
+					if (TupIsNull(epqslot))
+						/* Tuple not passing quals anymore, exiting... */
+						return NULL;
+					slot = ExecGetUpdateNewTuple(resultRelInfo,
+												 epqslot, oldSlot);
+					goto redo_act;
 				}
 
 				break;
@@ -2694,7 +2619,7 @@ ExecOnConflictUpdate(ModifyTableContext *context,
 	*returning = ExecUpdate(context, resultRelInfo,
 							conflictTid, NULL,
 							resultRelInfo->ri_onConflict->oc_ProjSlot,
-							canSetTag);
+							canSetTag, true);
 
 	/*
 	 * Clear out existing tuple, as there might not be another conflict among
@@ -3867,7 +3792,7 @@ ExecModifyTable(PlanState *pstate)
 
 				/* Now apply the update. */
 				slot = ExecUpdate(&context, resultRelInfo, tupleid, oldtuple,
-								  slot, node->canSetTag);
+								  slot, node->canSetTag, false);
 				break;
 
 			case CMD_DELETE:
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 417108f1e01..a5e8a90c508 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -191,6 +191,14 @@ typedef struct HeapPageFreeze
 
 } HeapPageFreeze;
 
+typedef struct
+{
+	Buffer		buffer;
+	Buffer		vmbuffer;
+	bool		have_tuple_lock;
+} HeapLockContext;
+
+
 /* ----------------
  *		function prototypes for heap access method
  *
@@ -243,17 +251,24 @@ extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
 							  BulkInsertState bistate);
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
 							 CommandId cid, Snapshot crosscheck, bool wait,
-							 struct TM_FailureData *tmfd, bool changingPart);
+							 struct TM_FailureData *tmfd, bool changingPart,
+							 Snapshot snapshot,
+							 GetSlotCallback lockedSlotCallback,
+							 void *lockedSlotCallbackArg);
 extern void heap_finish_speculative(Relation relation, ItemPointer tid);
 extern void heap_abort_speculative(Relation relation, ItemPointer tid);
 extern TM_Result heap_update(Relation relation, ItemPointer otid,
 							 HeapTuple newtup,
 							 CommandId cid, Snapshot crosscheck, bool wait,
-							 struct TM_FailureData *tmfd, LockTupleMode *lockmode);
+							 struct TM_FailureData *tmfd, LockTupleMode *lockmode,
+							 Snapshot snapshot,
+							 GetSlotCallback lockedSlotCallback,
+							 void *lockedSlotCallbackArg);
 extern TM_Result heap_lock_tuple(Relation relation, HeapTuple tuple,
 								 CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy,
 								 bool follow_updates,
-								 Buffer *buffer, struct TM_FailureData *tmfd);
+								 HeapLockContext *context,
+								 struct TM_FailureData *tmfd);
 
 extern void heap_inplace_update(Relation relation, HeapTuple tuple);
 extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple,
@@ -328,4 +343,9 @@ extern bool ResolveCminCmaxDuringDecoding(struct HTAB *tuplecid_data,
 extern void HeapCheckForSerializableConflictOut(bool visible, Relation relation, HeapTuple tuple,
 												Buffer buffer, Snapshot snapshot);
 
+extern TM_Result heapam_tuple_lock_internal(Relation relation, ItemPointer tid, Snapshot snapshot,
+						   TupleTableSlot *slot, CommandId cid, LockTupleMode mode,
+						   LockWaitPolicy wait_policy, uint8 flags,
+						   TM_FailureData *tmfd, HeapLockContext *existingContext);
+
 #endif							/* HEAPAM_H */
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 3fb184717f6..71b93f1674b 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -252,6 +252,9 @@ typedef void (*IndexBuildCallback) (Relation index,
 									bool tupleIsAlive,
 									void *state);
 
+/* Typedef for callback function for table_index_build_scan */
+typedef TupleTableSlot *(*GetSlotCallback) (void *arg);
+
 /*
  * API struct for a table AM.  Note this must be allocated in a
  * server-lifetime manner, typically as a static const struct, which then gets
@@ -514,7 +517,9 @@ typedef struct TableAmRoutine
 								 Snapshot crosscheck,
 								 bool wait,
 								 TM_FailureData *tmfd,
-								 bool changingPart);
+								 bool changingPart,
+								 GetSlotCallback lockedSlotCallback,
+								 void *lockedSlotCallbackArg);
 
 	/* see table_tuple_update() for reference about parameters */
 	TM_Result	(*tuple_update) (Relation rel,
@@ -526,7 +531,9 @@ typedef struct TableAmRoutine
 								 bool wait,
 								 TM_FailureData *tmfd,
 								 LockTupleMode *lockmode,
-								 bool *update_indexes);
+								 bool *update_indexes,
+								 GetSlotCallback lockedSlotCallback,
+								 void *lockedSlotCallbackArg);
 
 	/* see table_tuple_lock() for reference about parameters */
 	TM_Result	(*tuple_lock) (Relation rel,
@@ -1449,6 +1456,8 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
  *	tmfd - filled in failure cases (see below)
  *	changingPart - true iff the tuple is being moved to another partition
  *		table due to an update of the partition key. Otherwise, false.
+ *	lockedSlot - slot to save the locked tuple if should lock the last row
+ *		version during the concurrent update. NULL if not needed.
  *
  * Normal, successful return value is TM_Ok, which means we did actually
  * delete it.  Failure return codes are TM_SelfModified, TM_Updated, and
@@ -1461,11 +1470,15 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
 static inline TM_Result
 table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid,
 				   Snapshot snapshot, Snapshot crosscheck, bool wait,
-				   TM_FailureData *tmfd, bool changingPart)
+				   TM_FailureData *tmfd, bool changingPart,
+				   GetSlotCallback lockedSlotCallback,
+				   void *lockedSlotCallbackArg)
 {
 	return rel->rd_tableam->tuple_delete(rel, tid, cid,
 										 snapshot, crosscheck,
-										 wait, tmfd, changingPart);
+										 wait, tmfd, changingPart,
+										 lockedSlotCallback,
+										 lockedSlotCallbackArg);
 }
 
 /*
@@ -1487,7 +1500,9 @@ table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid,
  *	lockmode - filled with lock mode acquired on tuple
  *  update_indexes - in success cases this is set to true if new index entries
  *		are required for this tuple
- *
+ * 	lockedSlot - slot to save the locked tuple if should lock the last row
+ *		version during the concurrent update. NULL if not needed.
+
  * Normal, successful return value is TM_Ok, which means we did actually
  * update it.  Failure return codes are TM_SelfModified, TM_Updated, and
  * TM_BeingModified (the last only possible if wait == false).
@@ -1506,12 +1521,15 @@ static inline TM_Result
 table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot,
 				   CommandId cid, Snapshot snapshot, Snapshot crosscheck,
 				   bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode,
-				   bool *update_indexes)
+				   bool *update_indexes, GetSlotCallback lockedSlotCallback,
+				   void *lockedSlotCallbackArg)
 {
 	return rel->rd_tableam->tuple_update(rel, otid, slot,
 										 cid, snapshot, crosscheck,
 										 wait, tmfd,
-										 lockmode, update_indexes);
+										 lockmode, update_indexes,
+										 lockedSlotCallback,
+										 lockedSlotCallbackArg);
 }
 
 /*
-- 
2.37.1 (Apple Git-137.1)

