From 29c9506b23388c089cb7b214596edef304be833b Mon Sep 17 00:00:00 2001
From: Alexander Korotkov <akorotkov@postgresql.org>
Date: Mon, 9 Jan 2023 01:02:12 +0300
Subject: [PATCH] Allow locking updated tuples in tuple_update() and
 tuple_delete()

Currently, in read committed transaction isolation mode (default), we have the
following sequence of actions when tuple_update()/tuple_delete() finds
the tuple updated by concurrent transaction.

 1. Attempt to update/delete tuple with tuple_update()/tuple_delete(), which
    returns TM_Updated.
 2. Lock tuple with tuple_lock().
 3. Re-evaluate plan qual (recheck if we still need to update/delete and
    calculate the new tuple for update).
 4. Second attempt to update/delete tuple with tuple_update()/tuple_delete().
    This attempt should be successful, since the tuple was previously locked.

This patch eliminates step 2 by taking the lock during first
tuple_update()/tuple_delete() call.  Heap table access methods could save
efforts by traversing chain of updated tuples once instead of twice.  Future
undo-based table access methods, which will start from the lastest row version.
can immediately place a lock there.

The code in nodeModifyTable.c is simplified by removing the nested switch/case.
Also, we now table_tuple_fetch_row_version() in ExecUpdate(), because old
version of row should be always the same that we've previously locked.

Discussion: https://postgr.es/m/CAPpHfdua-YFw3XTprfutzGp28xXLigFtzNbuFY8yPhqeq6X5kg%40mail.gmail.com
Reviewed-by: Aleksander Alekseev, Pavel Borisov, Vignesh C, Mason Sharp
---
 src/backend/access/heap/heapam_handler.c | 497 +++++++++++++----------
 src/backend/access/table/tableam.c       |   6 +-
 src/backend/executor/nodeModifyTable.c   | 244 ++++-------
 src/include/access/tableam.h             |  26 +-
 4 files changed, 400 insertions(+), 373 deletions(-)

diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index c4b1916d36..3ada0944e6 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -53,6 +53,12 @@ static bool SampleHeapTupleVisible(TableScanDesc scan, Buffer buffer,
 								   HeapTuple tuple,
 								   OffsetNumber tupoffset);
 
+static TM_Result heapam_tuple_lock_internal(Relation relation, ItemPointer tid,
+											Snapshot snapshot, TupleTableSlot *slot,
+											CommandId cid, LockTupleMode mode,
+											LockWaitPolicy wait_policy, uint8 flags,
+											TM_FailureData *tmfd, bool updated);
+
 static BlockNumber heapam_scan_get_blocks_done(HeapScanDesc hscan);
 
 static const TableAmRoutine heapam_methods;
@@ -299,14 +305,39 @@ heapam_tuple_complete_speculative(Relation relation, TupleTableSlot *slot,
 static TM_Result
 heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
 					Snapshot snapshot, Snapshot crosscheck, bool wait,
-					TM_FailureData *tmfd, bool changingPart)
+					TM_FailureData *tmfd, bool changingPart,
+					bool lockUpdated, TupleTableSlot *lockedSlot)
 {
+	TM_Result	result;
+
 	/*
 	 * Currently Deleting of index tuples are handled at vacuum, in case if
 	 * the storage itself is cleaning the dead tuples by itself, it is the
 	 * time to call the index tuple deletion also.
 	 */
-	return heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart);
+	result = heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart);
+
+	/*
+	 * If lockUpdated is true and the tuple has been concurrently updated, get
+	 * the lock immediately so that on retry we will succeed.
+	 */
+	if (result == TM_Updated && lockUpdated)
+	{
+		Assert(lockedSlot != NULL);
+		result = heapam_tuple_lock_internal(relation, tid, snapshot,
+											lockedSlot, cid, LockTupleExclusive,
+											LockWaitBlock,
+											TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
+											tmfd, true);
+
+		if (result == TM_Ok)
+		{
+			tmfd->traversed = true;
+			return TM_Updated;
+		}
+	}
+
+	return result;
 }
 
 
@@ -314,7 +345,8 @@ static TM_Result
 heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
 					CommandId cid, Snapshot snapshot, Snapshot crosscheck,
 					bool wait, TM_FailureData *tmfd,
-					LockTupleMode *lockmode, bool *update_indexes)
+					LockTupleMode *lockmode, bool *update_indexes,
+					bool lockUpdated, TupleTableSlot *lockedSlot)
 {
 	bool		shouldFree = true;
 	HeapTuple	tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
@@ -341,6 +373,26 @@ heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
 	if (shouldFree)
 		pfree(tuple);
 
+	/*
+	 * If lockUpdated is true and the tuple has been concurrently updated, get
+	 * the lock immediately so that on retry we will succeed.
+	 */
+	if (result == TM_Updated && lockUpdated)
+	{
+		Assert(lockedSlot != NULL);
+		result = heapam_tuple_lock_internal(relation, otid, snapshot,
+											lockedSlot, cid, *lockmode,
+											LockWaitBlock,
+											TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
+											tmfd, true);
+
+		if (result == TM_Ok)
+		{
+			tmfd->traversed = true;
+			return TM_Updated;
+		}
+	}
+
 	return result;
 }
 
@@ -350,213 +402,8 @@ heapam_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot,
 				  LockWaitPolicy wait_policy, uint8 flags,
 				  TM_FailureData *tmfd)
 {
-	BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
-	TM_Result	result;
-	Buffer		buffer;
-	HeapTuple	tuple = &bslot->base.tupdata;
-	bool		follow_updates;
-
-	follow_updates = (flags & TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS) != 0;
-	tmfd->traversed = false;
-
-	Assert(TTS_IS_BUFFERTUPLE(slot));
-
-tuple_lock_retry:
-	tuple->t_self = *tid;
-	result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy,
-							 follow_updates, &buffer, tmfd);
-
-	if (result == TM_Updated &&
-		(flags & TUPLE_LOCK_FLAG_FIND_LAST_VERSION))
-	{
-		/* Should not encounter speculative tuple on recheck */
-		Assert(!HeapTupleHeaderIsSpeculative(tuple->t_data));
-
-		ReleaseBuffer(buffer);
-
-		if (!ItemPointerEquals(&tmfd->ctid, &tuple->t_self))
-		{
-			SnapshotData SnapshotDirty;
-			TransactionId priorXmax;
-
-			/* it was updated, so look at the updated version */
-			*tid = tmfd->ctid;
-			/* updated row should have xmin matching this xmax */
-			priorXmax = tmfd->xmax;
-
-			/* signal that a tuple later in the chain is getting locked */
-			tmfd->traversed = true;
-
-			/*
-			 * fetch target tuple
-			 *
-			 * Loop here to deal with updated or busy tuples
-			 */
-			InitDirtySnapshot(SnapshotDirty);
-			for (;;)
-			{
-				if (ItemPointerIndicatesMovedPartitions(tid))
-					ereport(ERROR,
-							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-							 errmsg("tuple to be locked was already moved to another partition due to concurrent update")));
-
-				tuple->t_self = *tid;
-				if (heap_fetch(relation, &SnapshotDirty, tuple, &buffer, true))
-				{
-					/*
-					 * If xmin isn't what we're expecting, the slot must have
-					 * been recycled and reused for an unrelated tuple.  This
-					 * implies that the latest version of the row was deleted,
-					 * so we need do nothing.  (Should be safe to examine xmin
-					 * without getting buffer's content lock.  We assume
-					 * reading a TransactionId to be atomic, and Xmin never
-					 * changes in an existing tuple, except to invalid or
-					 * frozen, and neither of those can match priorXmax.)
-					 */
-					if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple->t_data),
-											 priorXmax))
-					{
-						ReleaseBuffer(buffer);
-						return TM_Deleted;
-					}
-
-					/* otherwise xmin should not be dirty... */
-					if (TransactionIdIsValid(SnapshotDirty.xmin))
-						ereport(ERROR,
-								(errcode(ERRCODE_DATA_CORRUPTED),
-								 errmsg_internal("t_xmin %u is uncommitted in tuple (%u,%u) to be updated in table \"%s\"",
-												 SnapshotDirty.xmin,
-												 ItemPointerGetBlockNumber(&tuple->t_self),
-												 ItemPointerGetOffsetNumber(&tuple->t_self),
-												 RelationGetRelationName(relation))));
-
-					/*
-					 * If tuple is being updated by other transaction then we
-					 * have to wait for its commit/abort, or die trying.
-					 */
-					if (TransactionIdIsValid(SnapshotDirty.xmax))
-					{
-						ReleaseBuffer(buffer);
-						switch (wait_policy)
-						{
-							case LockWaitBlock:
-								XactLockTableWait(SnapshotDirty.xmax,
-												  relation, &tuple->t_self,
-												  XLTW_FetchUpdated);
-								break;
-							case LockWaitSkip:
-								if (!ConditionalXactLockTableWait(SnapshotDirty.xmax))
-									/* skip instead of waiting */
-									return TM_WouldBlock;
-								break;
-							case LockWaitError:
-								if (!ConditionalXactLockTableWait(SnapshotDirty.xmax))
-									ereport(ERROR,
-											(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
-											 errmsg("could not obtain lock on row in relation \"%s\"",
-													RelationGetRelationName(relation))));
-								break;
-						}
-						continue;	/* loop back to repeat heap_fetch */
-					}
-
-					/*
-					 * If tuple was inserted by our own transaction, we have
-					 * to check cmin against cid: cmin >= current CID means
-					 * our command cannot see the tuple, so we should ignore
-					 * it. Otherwise heap_lock_tuple() will throw an error,
-					 * and so would any later attempt to update or delete the
-					 * tuple.  (We need not check cmax because
-					 * HeapTupleSatisfiesDirty will consider a tuple deleted
-					 * by our transaction dead, regardless of cmax.)  We just
-					 * checked that priorXmax == xmin, so we can test that
-					 * variable instead of doing HeapTupleHeaderGetXmin again.
-					 */
-					if (TransactionIdIsCurrentTransactionId(priorXmax) &&
-						HeapTupleHeaderGetCmin(tuple->t_data) >= cid)
-					{
-						tmfd->xmax = priorXmax;
-
-						/*
-						 * Cmin is the problematic value, so store that. See
-						 * above.
-						 */
-						tmfd->cmax = HeapTupleHeaderGetCmin(tuple->t_data);
-						ReleaseBuffer(buffer);
-						return TM_SelfModified;
-					}
-
-					/*
-					 * This is a live tuple, so try to lock it again.
-					 */
-					ReleaseBuffer(buffer);
-					goto tuple_lock_retry;
-				}
-
-				/*
-				 * If the referenced slot was actually empty, the latest
-				 * version of the row must have been deleted, so we need do
-				 * nothing.
-				 */
-				if (tuple->t_data == NULL)
-				{
-					Assert(!BufferIsValid(buffer));
-					return TM_Deleted;
-				}
-
-				/*
-				 * As above, if xmin isn't what we're expecting, do nothing.
-				 */
-				if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple->t_data),
-										 priorXmax))
-				{
-					ReleaseBuffer(buffer);
-					return TM_Deleted;
-				}
-
-				/*
-				 * If we get here, the tuple was found but failed
-				 * SnapshotDirty. Assuming the xmin is either a committed xact
-				 * or our own xact (as it certainly should be if we're trying
-				 * to modify the tuple), this must mean that the row was
-				 * updated or deleted by either a committed xact or our own
-				 * xact.  If it was deleted, we can ignore it; if it was
-				 * updated then chain up to the next version and repeat the
-				 * whole process.
-				 *
-				 * As above, it should be safe to examine xmax and t_ctid
-				 * without the buffer content lock, because they can't be
-				 * changing.  We'd better hold a buffer pin though.
-				 */
-				if (ItemPointerEquals(&tuple->t_self, &tuple->t_data->t_ctid))
-				{
-					/* deleted, so forget about it */
-					ReleaseBuffer(buffer);
-					return TM_Deleted;
-				}
-
-				/* updated, so look at the updated row */
-				*tid = tuple->t_data->t_ctid;
-				/* updated row should have xmin matching this xmax */
-				priorXmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
-				ReleaseBuffer(buffer);
-				/* loop back to fetch next in chain */
-			}
-		}
-		else
-		{
-			/* tuple was deleted, so give up */
-			return TM_Deleted;
-		}
-	}
-
-	slot->tts_tableOid = RelationGetRelid(relation);
-	tuple->t_tableOid = slot->tts_tableOid;
-
-	/* store in slot, transferring existing pin */
-	ExecStorePinnedBufferHeapTuple(tuple, slot, buffer);
-
-	return result;
+	return heapam_tuple_lock_internal(relation, tid, snapshot, slot, cid, mode,
+									  wait_policy, flags, tmfd, false);
 }
 
 
@@ -2523,6 +2370,236 @@ SampleHeapTupleVisible(TableScanDesc scan, Buffer buffer,
 	}
 }
 
+/*
+ * This routine does the work for heapam_tuple_lock(), but also support
+ * `updated` to re-use the work done by heapam_tuple_update() or
+ * heapam_tuple_delete() on fetching tuple and checking its visibility.
+ */
+static TM_Result
+heapam_tuple_lock_internal(Relation relation, ItemPointer tid, Snapshot snapshot,
+						   TupleTableSlot *slot, CommandId cid, LockTupleMode mode,
+						   LockWaitPolicy wait_policy, uint8 flags,
+						   TM_FailureData *tmfd, bool updated)
+{
+	BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
+	TM_Result	result;
+	Buffer		buffer = InvalidBuffer;
+	HeapTuple	tuple = &bslot->base.tupdata;
+	bool		follow_updates;
+
+	follow_updates = (flags & TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS) != 0;
+	tmfd->traversed = false;
+
+	Assert(TTS_IS_BUFFERTUPLE(slot));
+
+tuple_lock_retry:
+	tuple->t_self = *tid;
+	if (!updated)
+		result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy,
+								 follow_updates, &buffer, tmfd);
+	else
+		result = TM_Updated;
+
+	if (result == TM_Updated &&
+		(flags & TUPLE_LOCK_FLAG_FIND_LAST_VERSION))
+	{
+		if (!updated)
+		{
+			/* Should not encounter speculative tuple on recheck */
+			Assert(!HeapTupleHeaderIsSpeculative(tuple->t_data));
+
+			ReleaseBuffer(buffer);
+		}
+		else
+		{
+			updated = false;
+		}
+
+		if (!ItemPointerEquals(&tmfd->ctid, &tuple->t_self))
+		{
+			SnapshotData SnapshotDirty;
+			TransactionId priorXmax;
+
+			/* it was updated, so look at the updated version */
+			*tid = tmfd->ctid;
+			/* updated row should have xmin matching this xmax */
+			priorXmax = tmfd->xmax;
+
+			/* signal that a tuple later in the chain is getting locked */
+			tmfd->traversed = true;
+
+			/*
+			 * fetch target tuple
+			 *
+			 * Loop here to deal with updated or busy tuples
+			 */
+			InitDirtySnapshot(SnapshotDirty);
+			for (;;)
+			{
+				if (ItemPointerIndicatesMovedPartitions(tid))
+					ereport(ERROR,
+							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+							 errmsg("tuple to be locked was already moved to another partition due to concurrent update")));
+
+				tuple->t_self = *tid;
+				if (heap_fetch(relation, &SnapshotDirty, tuple, &buffer, true))
+				{
+					/*
+					 * If xmin isn't what we're expecting, the slot must have
+					 * been recycled and reused for an unrelated tuple.  This
+					 * implies that the latest version of the row was deleted,
+					 * so we need do nothing.  (Should be safe to examine xmin
+					 * without getting buffer's content lock.  We assume
+					 * reading a TransactionId to be atomic, and Xmin never
+					 * changes in an existing tuple, except to invalid or
+					 * frozen, and neither of those can match priorXmax.)
+					 */
+					if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple->t_data),
+											 priorXmax))
+					{
+						ReleaseBuffer(buffer);
+						return TM_Deleted;
+					}
+
+					/* otherwise xmin should not be dirty... */
+					if (TransactionIdIsValid(SnapshotDirty.xmin))
+						ereport(ERROR,
+								(errcode(ERRCODE_DATA_CORRUPTED),
+								 errmsg_internal("t_xmin %u is uncommitted in tuple (%u,%u) to be updated in table \"%s\"",
+												 SnapshotDirty.xmin,
+												 ItemPointerGetBlockNumber(&tuple->t_self),
+												 ItemPointerGetOffsetNumber(&tuple->t_self),
+												 RelationGetRelationName(relation))));
+
+					/*
+					 * If tuple is being updated by other transaction then we
+					 * have to wait for its commit/abort, or die trying.
+					 */
+					if (TransactionIdIsValid(SnapshotDirty.xmax))
+					{
+						ReleaseBuffer(buffer);
+						switch (wait_policy)
+						{
+							case LockWaitBlock:
+								XactLockTableWait(SnapshotDirty.xmax,
+												  relation, &tuple->t_self,
+												  XLTW_FetchUpdated);
+								break;
+							case LockWaitSkip:
+								if (!ConditionalXactLockTableWait(SnapshotDirty.xmax))
+									/* skip instead of waiting */
+									return TM_WouldBlock;
+								break;
+							case LockWaitError:
+								if (!ConditionalXactLockTableWait(SnapshotDirty.xmax))
+									ereport(ERROR,
+											(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+											 errmsg("could not obtain lock on row in relation \"%s\"",
+													RelationGetRelationName(relation))));
+								break;
+						}
+						continue;	/* loop back to repeat heap_fetch */
+					}
+
+					/*
+					 * If tuple was inserted by our own transaction, we have
+					 * to check cmin against cid: cmin >= current CID means
+					 * our command cannot see the tuple, so we should ignore
+					 * it. Otherwise heap_lock_tuple() will throw an error,
+					 * and so would any later attempt to update or delete the
+					 * tuple.  (We need not check cmax because
+					 * HeapTupleSatisfiesDirty will consider a tuple deleted
+					 * by our transaction dead, regardless of cmax.)  We just
+					 * checked that priorXmax == xmin, so we can test that
+					 * variable instead of doing HeapTupleHeaderGetXmin again.
+					 */
+					if (TransactionIdIsCurrentTransactionId(priorXmax) &&
+						HeapTupleHeaderGetCmin(tuple->t_data) >= cid)
+					{
+						tmfd->xmax = priorXmax;
+
+						/*
+						 * Cmin is the problematic value, so store that. See
+						 * above.
+						 */
+						tmfd->cmax = HeapTupleHeaderGetCmin(tuple->t_data);
+						ReleaseBuffer(buffer);
+						return TM_SelfModified;
+					}
+
+					/*
+					 * This is a live tuple, so try to lock it again.
+					 */
+					ReleaseBuffer(buffer);
+					goto tuple_lock_retry;
+				}
+
+				/*
+				 * If the referenced slot was actually empty, the latest
+				 * version of the row must have been deleted, so we need do
+				 * nothing.
+				 */
+				if (tuple->t_data == NULL)
+				{
+					Assert(!BufferIsValid(buffer));
+					return TM_Deleted;
+				}
+
+				/*
+				 * As above, if xmin isn't what we're expecting, do nothing.
+				 */
+				if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple->t_data),
+										 priorXmax))
+				{
+					ReleaseBuffer(buffer);
+					return TM_Deleted;
+				}
+
+				/*
+				 * If we get here, the tuple was found but failed
+				 * SnapshotDirty. Assuming the xmin is either a committed xact
+				 * or our own xact (as it certainly should be if we're trying
+				 * to modify the tuple), this must mean that the row was
+				 * updated or deleted by either a committed xact or our own
+				 * xact.  If it was deleted, we can ignore it; if it was
+				 * updated then chain up to the next version and repeat the
+				 * whole process.
+				 *
+				 * As above, it should be safe to examine xmax and t_ctid
+				 * without the buffer content lock, because they can't be
+				 * changing.  We'd better hold a buffer pin though.
+				 */
+				if (ItemPointerEquals(&tuple->t_self, &tuple->t_data->t_ctid))
+				{
+					/* deleted, so forget about it */
+					ReleaseBuffer(buffer);
+					return TM_Deleted;
+				}
+
+				/* updated, so look at the updated row */
+				*tid = tuple->t_data->t_ctid;
+				/* updated row should have xmin matching this xmax */
+				priorXmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
+				ReleaseBuffer(buffer);
+				/* loop back to fetch next in chain */
+			}
+		}
+		else
+		{
+			/* tuple was deleted, so give up */
+			return TM_Deleted;
+		}
+	}
+
+	slot->tts_tableOid = RelationGetRelid(relation);
+	tuple->t_tableOid = slot->tts_tableOid;
+
+	/* store in slot, transferring existing pin */
+	ExecStorePinnedBufferHeapTuple(tuple, slot, buffer);
+
+	return result;
+}
+
 
 /* ------------------------------------------------------------------------
  * Definition of the heap table access method.
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index ef0d34fcee..f3476697ff 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -306,7 +306,8 @@ simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot)
 								GetCurrentCommandId(true),
 								snapshot, InvalidSnapshot,
 								true /* wait for commit */ ,
-								&tmfd, false /* changingPart */ );
+								&tmfd, false /* changingPart */ ,
+								false, NULL);
 
 	switch (result)
 	{
@@ -355,7 +356,8 @@ simple_table_tuple_update(Relation rel, ItemPointer otid,
 								GetCurrentCommandId(true),
 								snapshot, InvalidSnapshot,
 								true /* wait for commit */ ,
-								&tmfd, &lockmode, update_indexes);
+								&tmfd, &lockmode, update_indexes,
+								false, NULL);
 
 	switch (result)
 	{
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 687a5422ea..a110d66b0c 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -1335,7 +1335,8 @@ ExecDeletePrologue(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
  */
 static TM_Result
 ExecDeleteAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
-			  ItemPointer tupleid, bool changingPart)
+			  ItemPointer tupleid, bool changingPart,
+			  bool lockUpdated, TupleTableSlot *lockedSlot)
 {
 	EState	   *estate = context->estate;
 
@@ -1345,7 +1346,9 @@ ExecDeleteAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 							  estate->es_crosscheck_snapshot,
 							  true /* wait for commit */ ,
 							  &context->tmfd,
-							  changingPart);
+							  changingPart,
+							  lockUpdated,
+							  lockedSlot);
 }
 
 /*
@@ -1490,7 +1493,16 @@ ExecDelete(ModifyTableContext *context,
 		 * transaction-snapshot mode transactions.
 		 */
 ldelete:
-		result = ExecDeleteAct(context, resultRelInfo, tupleid, changingPart);
+
+		/*
+		 * Ask ExecDeleteAct() to immediately place the lock the updated tuple
+		 * if we will need EvalPlanQual() in that case to handle it.
+		 */
+		if (!IsolationUsesXactSnapshot())
+			slot = ExecGetReturningSlot(estate, resultRelInfo);
+
+		result = ExecDeleteAct(context, resultRelInfo, tupleid, changingPart,
+							   !IsolationUsesXactSnapshot(), slot);
 
 		switch (result)
 		{
@@ -1543,87 +1555,34 @@ ldelete:
 								 errmsg("could not serialize access due to concurrent update")));
 
 					/*
-					 * Already know that we're going to need to do EPQ, so
-					 * fetch tuple directly into the right slot.
+					 * ExecDeleteAct() has already locked the old tuple for
+					 * us. Now we need to copy it to the right slot.
 					 */
 					EvalPlanQualBegin(context->epqstate);
 					inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc,
 												 resultRelInfo->ri_RangeTableIndex);
+					ExecCopySlot(inputslot, slot);
 
-					result = table_tuple_lock(resultRelationDesc, tupleid,
-											  estate->es_snapshot,
-											  inputslot, estate->es_output_cid,
-											  LockTupleExclusive, LockWaitBlock,
-											  TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
-											  &context->tmfd);
+					Assert(context->tmfd.traversed);
+					epqslot = EvalPlanQual(context->epqstate,
+										   resultRelationDesc,
+										   resultRelInfo->ri_RangeTableIndex,
+										   inputslot);
+					if (TupIsNull(epqslot))
+						/* Tuple not passing quals anymore, exiting... */
+						return NULL;
 
-					switch (result)
+					/*
+					 * If requested, skip delete and pass back the updated
+					 * row.
+					 */
+					if (epqreturnslot)
 					{
-						case TM_Ok:
-							Assert(context->tmfd.traversed);
-							epqslot = EvalPlanQual(context->epqstate,
-												   resultRelationDesc,
-												   resultRelInfo->ri_RangeTableIndex,
-												   inputslot);
-							if (TupIsNull(epqslot))
-								/* Tuple not passing quals anymore, exiting... */
-								return NULL;
-
-							/*
-							 * If requested, skip delete and pass back the
-							 * updated row.
-							 */
-							if (epqreturnslot)
-							{
-								*epqreturnslot = epqslot;
-								return NULL;
-							}
-							else
-								goto ldelete;
-
-						case TM_SelfModified:
-
-							/*
-							 * This can be reached when following an update
-							 * chain from a tuple updated by another session,
-							 * reaching a tuple that was already updated in
-							 * this transaction. If previously updated by this
-							 * command, ignore the delete, otherwise error
-							 * out.
-							 *
-							 * See also TM_SelfModified response to
-							 * table_tuple_delete() above.
-							 */
-							if (context->tmfd.cmax != estate->es_output_cid)
-								ereport(ERROR,
-										(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
-										 errmsg("tuple to be deleted was already modified by an operation triggered by the current command"),
-										 errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
-							return NULL;
-
-						case TM_Deleted:
-							/* tuple already deleted; nothing to do */
-							return NULL;
-
-						default:
-
-							/*
-							 * TM_Invisible should be impossible because we're
-							 * waiting for updated row versions, and would
-							 * already have errored out if the first version
-							 * is invisible.
-							 *
-							 * TM_Updated should be impossible, because we're
-							 * locking the latest version via
-							 * TUPLE_LOCK_FLAG_FIND_LAST_VERSION.
-							 */
-							elog(ERROR, "unexpected table_tuple_lock status: %u",
-								 result);
-							return NULL;
+						*epqreturnslot = epqslot;
+						return NULL;
 					}
-
-					Assert(false);
-					break;
+					else
+						goto ldelete;
 				}
 
 			case TM_Deleted:
@@ -1680,7 +1639,7 @@ ldelete:
 			{
 				ExecForceStoreHeapTuple(oldtuple, slot, false);
 			}
-			else
+			else if (TupIsNull(slot))
 			{
 				if (!table_tuple_fetch_row_version(resultRelationDesc, tupleid,
 												   SnapshotAny, slot))
@@ -1955,7 +1914,8 @@ ExecUpdatePrepareSlot(ResultRelInfo *resultRelInfo,
 static TM_Result
 ExecUpdateAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 			  ItemPointer tupleid, HeapTuple oldtuple, TupleTableSlot *slot,
-			  bool canSetTag, UpdateContext *updateCxt)
+			  bool canSetTag, UpdateContext *updateCxt, bool lockUpdated,
+			  TupleTableSlot *lockedSlot)
 {
 	EState	   *estate = context->estate;
 	Relation	resultRelationDesc = resultRelInfo->ri_RelationDesc;
@@ -2089,7 +2049,8 @@ lreplace:
 								estate->es_crosscheck_snapshot,
 								true /* wait for commit */ ,
 								&context->tmfd, &updateCxt->lockmode,
-								&updateCxt->updateIndexes);
+								&updateCxt->updateIndexes,
+								lockUpdated, lockedSlot);
 	if (result == TM_Ok)
 		updateCxt->updated = true;
 
@@ -2240,7 +2201,7 @@ ExecCrossPartitionUpdateForeignKey(ModifyTableContext *context,
 static TupleTableSlot *
 ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 		   ItemPointer tupleid, HeapTuple oldtuple, TupleTableSlot *slot,
-		   bool canSetTag)
+		   bool canSetTag, bool locked)
 {
 	EState	   *estate = context->estate;
 	Relation	resultRelationDesc = resultRelInfo->ri_RelationDesc;
@@ -2293,12 +2254,37 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 	}
 	else
 	{
+		TupleTableSlot *oldSlot;
+		bool		lockUpdated;
+
 		/* Fill in the slot appropriately */
 		ExecUpdatePrepareSlot(resultRelInfo, slot, estate);
 
 redo_act:
+
+		/*
+		 * Ask ExecUpdateAct() to immediately place the lock the updated tuple
+		 * if we will need EvalPlanQual() in that case to handle it.
+		 */
+		if (!IsolationUsesXactSnapshot() && !locked)
+		{
+			/* Make sure ri_oldTupleSlot is initialized. */
+			if (unlikely(!resultRelInfo->ri_projectNewInfoValid))
+				ExecInitUpdateProjection(context->mtstate,
+										 resultRelInfo);
+
+			/* Fetch the most recent version of old tuple. */
+			lockUpdated = true;
+			oldSlot = resultRelInfo->ri_oldTupleSlot;
+		}
+		else
+		{
+			lockUpdated = false;
+			oldSlot = NULL;
+		}
+
 		result = ExecUpdateAct(context, resultRelInfo, tupleid, oldtuple, slot,
-							   canSetTag, &updateCxt);
+							   canSetTag, &updateCxt, lockUpdated, oldSlot);
 
 		/*
 		 * If ExecUpdateAct reports that a cross-partition update was done,
@@ -2351,86 +2337,32 @@ redo_act:
 				{
 					TupleTableSlot *inputslot;
 					TupleTableSlot *epqslot;
-					TupleTableSlot *oldSlot;
 
 					if (IsolationUsesXactSnapshot())
 						ereport(ERROR,
 								(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 								 errmsg("could not serialize access due to concurrent update")));
+					Assert(!locked);
 
 					/*
-					 * Already know that we're going to need to do EPQ, so
-					 * fetch tuple directly into the right slot.
+					 * ExecUpdateAct() has already locked the old tuple for
+					 * us. Now we need to copy it to the right slot.
 					 */
 					inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc,
 												 resultRelInfo->ri_RangeTableIndex);
-
-					result = table_tuple_lock(resultRelationDesc, tupleid,
-											  estate->es_snapshot,
-											  inputslot, estate->es_output_cid,
-											  updateCxt.lockmode, LockWaitBlock,
-											  TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
-											  &context->tmfd);
-
-					switch (result)
-					{
-						case TM_Ok:
-							Assert(context->tmfd.traversed);
-
-							epqslot = EvalPlanQual(context->epqstate,
-												   resultRelationDesc,
-												   resultRelInfo->ri_RangeTableIndex,
-												   inputslot);
-							if (TupIsNull(epqslot))
-								/* Tuple not passing quals anymore, exiting... */
-								return NULL;
-
-							/* Make sure ri_oldTupleSlot is initialized. */
-							if (unlikely(!resultRelInfo->ri_projectNewInfoValid))
-								ExecInitUpdateProjection(context->mtstate,
-														 resultRelInfo);
-
-							/* Fetch the most recent version of old tuple. */
-							oldSlot = resultRelInfo->ri_oldTupleSlot;
-							if (!table_tuple_fetch_row_version(resultRelationDesc,
-															   tupleid,
-															   SnapshotAny,
-															   oldSlot))
-								elog(ERROR, "failed to fetch tuple being updated");
-							slot = ExecGetUpdateNewTuple(resultRelInfo,
-														 epqslot, oldSlot);
-							goto redo_act;
-
-						case TM_Deleted:
-							/* tuple already deleted; nothing to do */
-							return NULL;
-
-						case TM_SelfModified:
-
-							/*
-							 * This can be reached when following an update
-							 * chain from a tuple updated by another session,
-							 * reaching a tuple that was already updated in
-							 * this transaction. If previously modified by
-							 * this command, ignore the redundant update,
-							 * otherwise error out.
-							 *
-							 * See also TM_SelfModified response to
-							 * table_tuple_update() above.
-							 */
-							if (context->tmfd.cmax != estate->es_output_cid)
-								ereport(ERROR,
-										(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
-										 errmsg("tuple to be updated was already modified by an operation triggered by the current command"),
-										 errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
-							return NULL;
-
-						default:
-							/* see table_tuple_lock call in ExecDelete() */
-							elog(ERROR, "unexpected table_tuple_lock status: %u",
-								 result);
-							return NULL;
-					}
+					ExecCopySlot(inputslot, oldSlot);
+					Assert(context->tmfd.traversed);
+
+					epqslot = EvalPlanQual(context->epqstate,
+										   resultRelationDesc,
+										   resultRelInfo->ri_RangeTableIndex,
+										   inputslot);
+					if (TupIsNull(epqslot))
+						/* Tuple not passing quals anymore, exiting... */
+						return NULL;
+					slot = ExecGetUpdateNewTuple(resultRelInfo,
+												 epqslot, oldSlot);
+					goto redo_act;
 				}
 
 				break;
@@ -2674,7 +2606,7 @@ ExecOnConflictUpdate(ModifyTableContext *context,
 	*returning = ExecUpdate(context, resultRelInfo,
 							conflictTid, NULL,
 							resultRelInfo->ri_onConflict->oc_ProjSlot,
-							canSetTag);
+							canSetTag, true);
 
 	/*
 	 * Clear out existing tuple, as there might not be another conflict among
@@ -2881,7 +2813,8 @@ lmerge_matched:
 				}
 				ExecUpdatePrepareSlot(resultRelInfo, newslot, context->estate);
 				result = ExecUpdateAct(context, resultRelInfo, tupleid, NULL,
-									   newslot, mtstate->canSetTag, &updateCxt);
+									   newslot, mtstate->canSetTag, &updateCxt,
+									   false, NULL);
 				if (result == TM_Ok && updateCxt.updated)
 				{
 					ExecUpdateEpilogue(context, &updateCxt, resultRelInfo,
@@ -2899,7 +2832,8 @@ lmerge_matched:
 					result = TM_Ok;
 					break;
 				}
-				result = ExecDeleteAct(context, resultRelInfo, tupleid, false);
+				result = ExecDeleteAct(context, resultRelInfo, tupleid, false,
+									   false, NULL);
 				if (result == TM_Ok)
 				{
 					ExecDeleteEpilogue(context, resultRelInfo, tupleid, NULL,
@@ -3847,7 +3781,7 @@ ExecModifyTable(PlanState *pstate)
 
 				/* Now apply the update. */
 				slot = ExecUpdate(&context, resultRelInfo, tupleid, oldtuple,
-								  slot, node->canSetTag);
+								  slot, node->canSetTag, false);
 				break;
 
 			case CMD_DELETE:
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 3fb184717f..65797cd9f9 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -514,7 +514,9 @@ typedef struct TableAmRoutine
 								 Snapshot crosscheck,
 								 bool wait,
 								 TM_FailureData *tmfd,
-								 bool changingPart);
+								 bool changingPart,
+								 bool lockUpdated,
+								 TupleTableSlot *lockedSlot);
 
 	/* see table_tuple_update() for reference about parameters */
 	TM_Result	(*tuple_update) (Relation rel,
@@ -526,7 +528,9 @@ typedef struct TableAmRoutine
 								 bool wait,
 								 TM_FailureData *tmfd,
 								 LockTupleMode *lockmode,
-								 bool *update_indexes);
+								 bool *update_indexes,
+								 bool lockUpdated,
+								 TupleTableSlot *lockedSlot);
 
 	/* see table_tuple_lock() for reference about parameters */
 	TM_Result	(*tuple_lock) (Relation rel,
@@ -1449,6 +1453,9 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
  *	tmfd - filled in failure cases (see below)
  *	changingPart - true iff the tuple is being moved to another partition
  *		table due to an update of the partition key. Otherwise, false.
+ *	lockUpdated - true if should lock the last row version if the concurrent
+ *		update is detected
+ *	lockedSlot - slot to save the locked tuple when lockUpdated is true
  *
  * Normal, successful return value is TM_Ok, which means we did actually
  * delete it.  Failure return codes are TM_SelfModified, TM_Updated, and
@@ -1461,11 +1468,13 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
 static inline TM_Result
 table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid,
 				   Snapshot snapshot, Snapshot crosscheck, bool wait,
-				   TM_FailureData *tmfd, bool changingPart)
+				   TM_FailureData *tmfd, bool changingPart,
+				   bool lockUpdated, TupleTableSlot *lockedSlot)
 {
 	return rel->rd_tableam->tuple_delete(rel, tid, cid,
 										 snapshot, crosscheck,
-										 wait, tmfd, changingPart);
+										 wait, tmfd, changingPart,
+										 lockUpdated, lockedSlot);
 }
 
 /*
@@ -1487,6 +1496,9 @@ table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid,
  *	lockmode - filled with lock mode acquired on tuple
  *  update_indexes - in success cases this is set to true if new index entries
  *		are required for this tuple
+ *	lockUpdated - true if should lock the last row version if the concurrent
+ *		update is detected
+ *	lockedSlot - slot to save the locked tuple when lockUpdated is true
  *
  * Normal, successful return value is TM_Ok, which means we did actually
  * update it.  Failure return codes are TM_SelfModified, TM_Updated, and
@@ -1506,12 +1518,14 @@ static inline TM_Result
 table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot,
 				   CommandId cid, Snapshot snapshot, Snapshot crosscheck,
 				   bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode,
-				   bool *update_indexes)
+				   bool *update_indexes, bool lockUpdated,
+				   TupleTableSlot *lockedSlot)
 {
 	return rel->rd_tableam->tuple_update(rel, otid, slot,
 										 cid, snapshot, crosscheck,
 										 wait, tmfd,
-										 lockmode, update_indexes);
+										 lockmode, update_indexes,
+										 lockUpdated, lockedSlot);
 }
 
 /*
-- 
2.39.0

