From e54bfc8a633fbd7c1547de74444a3456f32486cf Mon Sep 17 00:00:00 2001
From: Alexander Korotkov <akorotkov@postgresql.org>
Date: Thu, 30 Jun 2022 22:07:12 +0300
Subject: [PATCH 2/2] Allow locking updated tuples in tuple_update() and
 tuple_delete()

Currently, in read committed transaction isolation mode (default), we have the
following sequence of actions when tuple_update()/tuple_delete() finds
the tuple updated by concurrent transaction.

1. Attempt to update/delete tuple with tuple_update()/tuple_delete(), which
   returns TM_Updated.
2. Lock tuple with tuple_lock().
3. Re-evaluate plan qual (recheck if we still need to update/delete and
   calculate the new tuple for update).
4. Second attempt to update/delete tuple with tuple_update()/tuple_delete().
   This attempt should be successful, since the tuple was previously locked.

This patch eliminates step 2 by taking the lock during first
tuple_update()/tuple_delete() call.  Heap table access methods could save
efforts by traversing chain of updated tuples once instead of twice.  Future
undo-based table access methods, which will start from the latest row version.
can immediately place a lock there.

The code in nodeModifyTable.c is simplified by removing the nested switch/case.

Discussion: https://postgr.es/m/CAPpHfdua-YFw3XTprfutzGp28xXLigFtzNbuFY8yPhqeq6X5kg%40mail.gmail.com
Reviewed-by: Aleksander Alekseev, Pavel Borisov, Vignesh C, Mason Sharp
---
 src/backend/access/heap/heapam.c         | 129 +++++++++---
 src/backend/access/heap/heapam_handler.c |  49 ++++-
 src/backend/access/table/tableam.c       |   6 +-
 src/backend/executor/nodeModifyTable.c   | 249 ++++++++---------------
 src/include/access/heapam.h              |  45 +++-
 src/include/access/tableam.h             |  27 ++-
 src/include/executor/tuptable.h          |  38 ++++
 7 files changed, 329 insertions(+), 214 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 4f50e0dd347..08290435d0e 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2462,7 +2462,8 @@ xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask)
 TM_Result
 heap_delete(Relation relation, ItemPointer tid,
 			CommandId cid, Snapshot crosscheck, bool wait,
-			TM_FailureData *tmfd, bool changingPart)
+			TM_FailureData *tmfd, bool changingPart, Snapshot snapshot,
+			LazyTupleTableSlot *lockedSlot)
 {
 	TM_Result	result;
 	TransactionId xid = GetCurrentTransactionId();
@@ -2661,6 +2662,28 @@ l1:
 			result = TM_Updated;
 	}
 
+	/*
+	 * If tuple was concurrently updates and 'lockedSlot' is given, then we
+	 * lock tuple saving our efforts on its finding.
+	 */
+	if (result == TM_Updated && lockedSlot)
+	{
+		HeapLockContext context = {buffer, vmbuffer, have_tuple_lock};
+
+		result = heapam_tuple_lock_context(relation, tid, snapshot,
+										   LAZY_TTS_EVAL(lockedSlot),
+										   cid, LockTupleExclusive,
+										   wait ? LockWaitBlock : LockWaitError,
+										   TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
+										   tmfd, &context);
+		if (result == TM_Ok)
+		{
+			tmfd->traversed = true;
+			return TM_Updated;
+		}
+		return result;
+	}
+
 	if (result != TM_Ok)
 	{
 		Assert(result == TM_SelfModified ||
@@ -2884,7 +2907,8 @@ simple_heap_delete(Relation relation, ItemPointer tid)
 	result = heap_delete(relation, tid,
 						 GetCurrentCommandId(true), InvalidSnapshot,
 						 true /* wait for commit */ ,
-						 &tmfd, false /* changingPart */ );
+						 &tmfd, false /* changingPart */ ,
+						 SnapshotAny, NULL);
 	switch (result)
 	{
 		case TM_SelfModified:
@@ -2924,7 +2948,8 @@ simple_heap_delete(Relation relation, ItemPointer tid)
 TM_Result
 heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
 			CommandId cid, Snapshot crosscheck, bool wait,
-			TM_FailureData *tmfd, LockTupleMode *lockmode)
+			TM_FailureData *tmfd, LockTupleMode *lockmode, Snapshot snapshot,
+			LazyTupleTableSlot *lockedSlot)
 {
 	TM_Result	result;
 	TransactionId xid = GetCurrentTransactionId();
@@ -3291,6 +3316,33 @@ l2:
 		}
 	}
 
+	/*
+	 * If tuple was concurrently updates and 'lockedSlot' is given, then we
+	 * lock tuple saving our efforts on its finding.
+	 */
+	if (result == TM_Updated && lockedSlot)
+	{
+		HeapLockContext context = {buffer, vmbuffer, have_tuple_lock};
+
+		result = heapam_tuple_lock_context(relation, otid, snapshot,
+										   LAZY_TTS_EVAL(lockedSlot),
+										   cid, *lockmode,
+										   wait ? LockWaitBlock : LockWaitError,
+										   TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
+										   tmfd, &context);
+		bms_free(hot_attrs);
+		bms_free(key_attrs);
+		bms_free(id_attrs);
+		bms_free(modified_attrs);
+		bms_free(interesting_attrs);
+		if (result == TM_Ok)
+		{
+			tmfd->traversed = true;
+			return TM_Updated;
+		}
+		return result;
+	}
+
 	if (result != TM_Ok)
 	{
 		Assert(result == TM_SelfModified ||
@@ -3960,7 +4012,7 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
 	result = heap_update(relation, otid, tup,
 						 GetCurrentCommandId(true), InvalidSnapshot,
 						 true /* wait for commit */ ,
-						 &tmfd, &lockmode);
+						 &tmfd, &lockmode, SnapshotAny, NULL);
 	switch (result)
 	{
 		case TM_SelfModified:
@@ -4021,10 +4073,12 @@ get_mxact_status_for_lock(LockTupleMode mode, bool is_update)
  *	wait_policy: what to do if tuple lock is not available
  *	follow_updates: if true, follow the update chain to also lock descendant
  *		tuples.
+ *	*context: a context containing the previous efforts on finding the
+ *		target tuple.
  *
  * Output parameters:
  *	*tuple: all fields filled in
- *	*buffer: set to buffer holding tuple (pinned but not locked at exit)
+ *	context->buffer: set to buffer holding tuple (pinned but not locked at exit)
  *	*tmfd: filled in failure cases (see below)
  *
  * Function results are the same as the ones for table_tuple_lock().
@@ -4042,13 +4096,14 @@ TM_Result
 heap_lock_tuple(Relation relation, HeapTuple tuple,
 				CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy,
 				bool follow_updates,
-				Buffer *buffer, TM_FailureData *tmfd)
+				HeapLockContext *context, TM_FailureData *tmfd)
 {
 	TM_Result	result;
 	ItemPointer tid = &(tuple->t_self);
 	ItemId		lp;
 	Page		page;
-	Buffer		vmbuffer = InvalidBuffer;
+	Buffer		buffer = context->buffer,
+				vmbuffer = context->vmbuffer;
 	BlockNumber block;
 	TransactionId xid,
 				xmax;
@@ -4057,10 +4112,11 @@ heap_lock_tuple(Relation relation, HeapTuple tuple,
 				new_infomask2;
 	bool		first_time = true;
 	bool		skip_tuple_lock = false;
-	bool		have_tuple_lock = false;
+	bool		have_tuple_lock = context->have_tuple_lock;
 	bool		cleared_all_frozen = false;
 
-	*buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
+	if (BufferIsInvalid(buffer))
+		buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
 	block = ItemPointerGetBlockNumber(tid);
 
 	/*
@@ -4069,12 +4125,17 @@ heap_lock_tuple(Relation relation, HeapTuple tuple,
 	 * in the middle of changing this, so we'll need to recheck after we have
 	 * the lock.
 	 */
-	if (PageIsAllVisible(BufferGetPage(*buffer)))
+	if (BufferIsInvalid(vmbuffer) && PageIsAllVisible(BufferGetPage(buffer)))
 		visibilitymap_pin(relation, block, &vmbuffer);
 
-	LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+	/*
+	 * If the valid buffer is given in the 'context' then it should be already
+	 * locked.  Lock it otherwise.
+	 */
+	if (BufferIsInvalid(context->buffer))
+		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
-	page = BufferGetPage(*buffer);
+	page = BufferGetPage(buffer);
 	lp = PageGetItemId(page, ItemPointerGetOffsetNumber(tid));
 	Assert(ItemIdIsNormal(lp));
 
@@ -4083,7 +4144,7 @@ heap_lock_tuple(Relation relation, HeapTuple tuple,
 	tuple->t_tableOid = RelationGetRelid(relation);
 
 l3:
-	result = HeapTupleSatisfiesUpdate(tuple, cid, *buffer);
+	result = HeapTupleSatisfiesUpdate(tuple, cid, buffer);
 
 	if (result == TM_Invisible)
 	{
@@ -4112,7 +4173,7 @@ l3:
 		infomask2 = tuple->t_data->t_infomask2;
 		ItemPointerCopy(&tuple->t_data->t_ctid, &t_ctid);
 
-		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 
 		/*
 		 * If any subtransaction of the current top transaction already holds
@@ -4264,12 +4325,12 @@ l3:
 					{
 						result = res;
 						/* recovery code expects to have buffer lock held */
-						LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+						LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 						goto failed;
 					}
 				}
 
-				LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+				LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
 				/*
 				 * Make sure it's still an appropriate lock, else start over.
@@ -4304,7 +4365,7 @@ l3:
 			if (HEAP_XMAX_IS_LOCKED_ONLY(infomask) &&
 				!HEAP_XMAX_IS_EXCL_LOCKED(infomask))
 			{
-				LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+				LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
 				/*
 				 * Make sure it's still an appropriate lock, else start over.
@@ -4332,7 +4393,7 @@ l3:
 					 * No conflict, but if the xmax changed under us in the
 					 * meantime, start over.
 					 */
-					LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+					LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 					if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) ||
 						!TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data),
 											 xwait))
@@ -4344,7 +4405,7 @@ l3:
 			}
 			else if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask))
 			{
-				LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+				LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
 				/* if the xmax changed in the meantime, start over */
 				if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) ||
@@ -4372,7 +4433,7 @@ l3:
 			TransactionIdIsCurrentTransactionId(xwait))
 		{
 			/* ... but if the xmax changed in the meantime, start over */
-			LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 			if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) ||
 				!TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data),
 									 xwait))
@@ -4394,7 +4455,7 @@ l3:
 		 */
 		if (require_sleep && (result == TM_Updated || result == TM_Deleted))
 		{
-			LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 			goto failed;
 		}
 		else if (require_sleep)
@@ -4419,7 +4480,7 @@ l3:
 				 */
 				result = TM_WouldBlock;
 				/* recovery code expects to have buffer lock held */
-				LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+				LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 				goto failed;
 			}
 
@@ -4445,7 +4506,7 @@ l3:
 						{
 							result = TM_WouldBlock;
 							/* recovery code expects to have buffer lock held */
-							LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+							LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 							goto failed;
 						}
 						break;
@@ -4485,7 +4546,7 @@ l3:
 						{
 							result = TM_WouldBlock;
 							/* recovery code expects to have buffer lock held */
-							LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+							LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 							goto failed;
 						}
 						break;
@@ -4511,12 +4572,12 @@ l3:
 				{
 					result = res;
 					/* recovery code expects to have buffer lock held */
-					LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+					LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 					goto failed;
 				}
 			}
 
-			LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
 			/*
 			 * xwait is done, but if xwait had just locked the tuple then some
@@ -4538,7 +4599,7 @@ l3:
 				 * don't check for this in the multixact case, because some
 				 * locker transactions might still be running.
 				 */
-				UpdateXmaxHintBits(tuple->t_data, *buffer, xwait);
+				UpdateXmaxHintBits(tuple->t_data, buffer, xwait);
 			}
 		}
 
@@ -4597,9 +4658,9 @@ failed:
 	 */
 	if (vmbuffer == InvalidBuffer && PageIsAllVisible(page))
 	{
-		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 		visibilitymap_pin(relation, block, &vmbuffer);
-		LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 		goto l3;
 	}
 
@@ -4662,7 +4723,7 @@ failed:
 		cleared_all_frozen = true;
 
 
-	MarkBufferDirty(*buffer);
+	MarkBufferDirty(buffer);
 
 	/*
 	 * XLOG stuff.  You might think that we don't need an XLOG record because
@@ -4682,7 +4743,7 @@ failed:
 		XLogRecPtr	recptr;
 
 		XLogBeginInsert();
-		XLogRegisterBuffer(0, *buffer, REGBUF_STANDARD);
+		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
 
 		xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self);
 		xlrec.locking_xid = xid;
@@ -4703,7 +4764,7 @@ failed:
 	result = TM_Ok;
 
 out_locked:
-	LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 
 out_unlocked:
 	if (BufferIsValid(vmbuffer))
@@ -4721,6 +4782,10 @@ out_unlocked:
 	if (have_tuple_lock)
 		UnlockTupleTuplock(relation, tid, mode);
 
+	context->buffer = buffer;
+	context->vmbuffer = InvalidBuffer;
+	context->have_tuple_lock = false;
+
 	return result;
 }
 
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index c4b1916d36e..bbb8af473bd 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -299,14 +299,20 @@ heapam_tuple_complete_speculative(Relation relation, TupleTableSlot *slot,
 static TM_Result
 heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
 					Snapshot snapshot, Snapshot crosscheck, bool wait,
-					TM_FailureData *tmfd, bool changingPart)
+					TM_FailureData *tmfd, bool changingPart,
+					LazyTupleTableSlot *lockedSlot)
 {
+	TM_Result	result;
+
 	/*
 	 * Currently Deleting of index tuples are handled at vacuum, in case if
 	 * the storage itself is cleaning the dead tuples by itself, it is the
 	 * time to call the index tuple deletion also.
 	 */
-	return heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart);
+	result = heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart,
+						 snapshot, lockedSlot);
+
+	return result;
 }
 
 
@@ -314,7 +320,8 @@ static TM_Result
 heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
 					CommandId cid, Snapshot snapshot, Snapshot crosscheck,
 					bool wait, TM_FailureData *tmfd,
-					LockTupleMode *lockmode, bool *update_indexes)
+					LockTupleMode *lockmode, bool *update_indexes,
+					LazyTupleTableSlot *lockedSlot)
 {
 	bool		shouldFree = true;
 	HeapTuple	tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
@@ -325,7 +332,7 @@ heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
 	tuple->t_tableOid = slot->tts_tableOid;
 
 	result = heap_update(relation, otid, tuple, cid, crosscheck, wait,
-						 tmfd, lockmode);
+						 tmfd, lockmode, snapshot, lockedSlot);
 	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
 
 	/*
@@ -349,12 +356,28 @@ heapam_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot,
 				  TupleTableSlot *slot, CommandId cid, LockTupleMode mode,
 				  LockWaitPolicy wait_policy, uint8 flags,
 				  TM_FailureData *tmfd)
+{
+	return heapam_tuple_lock_context(relation, tid, snapshot, slot, cid, mode,
+									 wait_policy, flags, tmfd, NULL);
+}
+
+/*
+ * This routine does the work for heapam_tuple_lock(), but also support
+ * `context` to re-use the work done by heapam_tuple_update() or
+ * heapam_tuple_delete() on fetching tuple and checking its visibility.
+ */
+TM_Result
+heapam_tuple_lock_context(Relation relation, ItemPointer tid, Snapshot snapshot,
+						  TupleTableSlot *slot, CommandId cid,
+						  LockTupleMode mode, LockWaitPolicy wait_policy,
+						  uint8 flags, TM_FailureData *tmfd,
+						  HeapLockContext *context)
 {
 	BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
 	TM_Result	result;
-	Buffer		buffer;
 	HeapTuple	tuple = &bslot->base.tupdata;
 	bool		follow_updates;
+	Buffer		buffer = InvalidBuffer;
 
 	follow_updates = (flags & TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS) != 0;
 	tmfd->traversed = false;
@@ -363,8 +386,20 @@ heapam_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot,
 
 tuple_lock_retry:
 	tuple->t_self = *tid;
-	result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy,
-							 follow_updates, &buffer, tmfd);
+	if (!context)
+	{
+		HeapLockContext cxt = {InvalidBuffer, InvalidBuffer, false};
+		result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy,
+								 follow_updates, &cxt, tmfd);
+		buffer = cxt.buffer;
+	}
+	else
+	{
+		result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy,
+								 follow_updates, context, tmfd);
+		buffer = context->buffer;
+		context = NULL;
+	}
 
 	if (result == TM_Updated &&
 		(flags & TUPLE_LOCK_FLAG_FIND_LAST_VERSION))
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index ef0d34fceee..4cfdb4066f6 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -306,7 +306,8 @@ simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot)
 								GetCurrentCommandId(true),
 								snapshot, InvalidSnapshot,
 								true /* wait for commit */ ,
-								&tmfd, false /* changingPart */ );
+								&tmfd, false /* changingPart */ ,
+								NULL);
 
 	switch (result)
 	{
@@ -355,7 +356,8 @@ simple_table_tuple_update(Relation rel, ItemPointer otid,
 								GetCurrentCommandId(true),
 								snapshot, InvalidSnapshot,
 								true /* wait for commit */ ,
-								&tmfd, &lockmode, update_indexes);
+								&tmfd, &lockmode, update_indexes,
+								NULL);
 
 	switch (result)
 	{
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 839e8fe0d04..70aeeeed6fc 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -1354,6 +1354,23 @@ ExecDeletePrologue(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 	return true;
 }
 
+typedef struct
+{
+	EPQState *epqstate;
+	ResultRelInfo *resultRelInfo;
+} GetEPQSlotArg;
+
+
+static TupleTableSlot *
+GetEPQSlot(void *arg)
+{
+	GetEPQSlotArg *slotArg = (GetEPQSlotArg *) arg;
+
+	return EvalPlanQualSlot(slotArg->epqstate,
+							slotArg->resultRelInfo->ri_RelationDesc,
+							slotArg->resultRelInfo->ri_RangeTableIndex);
+}
+
 /*
  * ExecDeleteAct -- subroutine for ExecDelete
  *
@@ -1366,14 +1383,18 @@ ExecDeleteAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 			  ItemPointer tupleid, bool changingPart)
 {
 	EState	   *estate = context->estate;
+	GetEPQSlotArg slotArg = {.epqstate = context->epqstate, .resultRelInfo = resultRelInfo};
+	LazyTupleTableSlot lazyEPQSlot;
 
+	MAKE_LAZY_TTS(&lazyEPQSlot, GetEPQSlot, &slotArg);
 	return table_tuple_delete(resultRelInfo->ri_RelationDesc, tupleid,
 							  estate->es_output_cid,
 							  estate->es_snapshot,
 							  estate->es_crosscheck_snapshot,
 							  true /* wait for commit */ ,
 							  &context->tmfd,
-							  changingPart);
+							  changingPart,
+							  &lazyEPQSlot);
 }
 
 /*
@@ -1571,102 +1592,46 @@ ldelete:
 								 errmsg("could not serialize access due to concurrent update")));
 
 					/*
-					 * Already know that we're going to need to do EPQ, so
-					 * fetch tuple directly into the right slot.
+					 * ExecDeleteAct() has already locked the old tuple for
+					 * us. Now we need to copy it to the right slot.
 					 */
 					EvalPlanQualBegin(context->epqstate);
 					inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc,
 												 resultRelInfo->ri_RangeTableIndex);
-
-					result = table_tuple_lock(resultRelationDesc, tupleid,
-											  estate->es_snapshot,
-											  inputslot, estate->es_output_cid,
-											  LockTupleExclusive, LockWaitBlock,
-											  TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
-											  &context->tmfd);
-
-					switch (result)
+					/*
+					 * Save locked table for further processing of
+					 * RETURNING clause.
+					 */
+					if (processReturning &&
+						resultRelInfo->ri_projectReturning &&
+						!resultRelInfo->ri_FdwRoutine)
 					{
-						case TM_Ok:
-							Assert(context->tmfd.traversed);
-
-							/*
-							 * Save locked tuple for further processing of
-							 * RETURNING clause.
-							 */
-							if (processReturning &&
-								resultRelInfo->ri_projectReturning &&
-								!resultRelInfo->ri_FdwRoutine)
-							{
-								TupleTableSlot *returningSlot;
-								returningSlot = ExecGetReturningSlot(estate, resultRelInfo);
-								ExecCopySlot(returningSlot, inputslot);
-								ExecMaterializeSlot(returningSlot);
-							}
-
-							epqslot = EvalPlanQual(context->epqstate,
-												   resultRelationDesc,
-												   resultRelInfo->ri_RangeTableIndex,
-												   inputslot);
-							if (TupIsNull(epqslot))
-								/* Tuple not passing quals anymore, exiting... */
-								return NULL;
-
-							/*
-							 * If requested, skip delete and pass back the
-							 * updated row.
-							 */
-							if (epqreturnslot)
-							{
-								*epqreturnslot = epqslot;
-								return NULL;
-							}
-							else
-								goto ldelete;
-
-						case TM_SelfModified:
-
-							/*
-							 * This can be reached when following an update
-							 * chain from a tuple updated by another session,
-							 * reaching a tuple that was already updated in
-							 * this transaction. If previously updated by this
-							 * command, ignore the delete, otherwise error
-							 * out.
-							 *
-							 * See also TM_SelfModified response to
-							 * table_tuple_delete() above.
-							 */
-							if (context->tmfd.cmax != estate->es_output_cid)
-								ereport(ERROR,
-										(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
-										 errmsg("tuple to be deleted was already modified by an operation triggered by the current command"),
-										 errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
-							return NULL;
-
-						case TM_Deleted:
-							/* tuple already deleted; nothing to do */
-							return NULL;
+						TupleTableSlot *returningSlot;
+						returningSlot = ExecGetReturningSlot(estate, resultRelInfo);
+						ExecCopySlot(returningSlot, inputslot);
+						ExecMaterializeSlot(returningSlot);
+					}
 
-						default:
+					Assert(context->tmfd.traversed);
+					epqslot = EvalPlanQual(context->epqstate,
+										   resultRelationDesc,
+										   resultRelInfo->ri_RangeTableIndex,
+										   inputslot);
+					if (TupIsNull(epqslot))
+						/* Tuple not passing quals anymore, exiting... */
+						return NULL;
 
-							/*
-							 * TM_Invisible should be impossible because we're
-							 * waiting for updated row versions, and would
-							 * already have errored out if the first version
-							 * is invisible.
-							 *
-							 * TM_Updated should be impossible, because we're
-							 * locking the latest version via
-							 * TUPLE_LOCK_FLAG_FIND_LAST_VERSION.
-							 */
-							elog(ERROR, "unexpected table_tuple_lock status: %u",
-								 result);
-							return NULL;
+					/*
+					 * If requested, skip delete and pass back the updated
+					 * row.
+					 */
+					if (epqreturnslot)
+					{
+						*epqreturnslot = epqslot;
+						return NULL;
 					}
-
-					Assert(false);
-					break;
+					else
+						goto ldelete;
 				}
 
 			case TM_Deleted:
@@ -2006,6 +1971,8 @@ ExecUpdateAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 	Relation	resultRelationDesc = resultRelInfo->ri_RelationDesc;
 	bool		partition_constraint_failed;
 	TM_Result	result;
+	GetEPQSlotArg slotArg = {.epqstate = context->epqstate, .resultRelInfo = resultRelInfo};
+	LazyTupleTableSlot lazyEPQSlot;
 
 	updateCxt->crossPartUpdate = false;
 
@@ -2129,13 +2096,15 @@ lreplace:
 	 * for referential integrity updates in transaction-snapshot mode
 	 * transactions.
 	 */
+	MAKE_LAZY_TTS(&lazyEPQSlot, GetEPQSlot, &slotArg);
 	result = table_tuple_update(resultRelationDesc, tupleid, slot,
 								estate->es_output_cid,
 								estate->es_snapshot,
 								estate->es_crosscheck_snapshot,
 								true /* wait for commit */ ,
 								&context->tmfd, &updateCxt->lockmode,
-								&updateCxt->updateIndexes);
+								&updateCxt->updateIndexes,
+								&lazyEPQSlot);
 	if (result == TM_Ok)
 		updateCxt->updated = true;
 
@@ -2286,7 +2255,7 @@ ExecCrossPartitionUpdateForeignKey(ModifyTableContext *context,
 static TupleTableSlot *
 ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 		   ItemPointer tupleid, HeapTuple oldtuple, TupleTableSlot *slot,
-		   bool canSetTag)
+		   bool canSetTag, bool locked)
 {
 	EState	   *estate = context->estate;
 	Relation	resultRelationDesc = resultRelInfo->ri_RelationDesc;
@@ -2408,81 +2377,39 @@ redo_act:
 						ereport(ERROR,
 								(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 								 errmsg("could not serialize access due to concurrent update")));
+					Assert(!locked);
 
 					/*
-					 * Already know that we're going to need to do EPQ, so
-					 * fetch tuple directly into the right slot.
+					 * ExecUpdateAct() has already locked the old tuple for
+					 * us. Now we need to copy it to the right slot.
 					 */
 					inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc,
 												 resultRelInfo->ri_RangeTableIndex);
 
-					result = table_tuple_lock(resultRelationDesc, tupleid,
-											  estate->es_snapshot,
-											  inputslot, estate->es_output_cid,
-											  updateCxt.lockmode, LockWaitBlock,
-											  TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
-											  &context->tmfd);
-
-					switch (result)
-					{
-						case TM_Ok:
-							Assert(context->tmfd.traversed);
-
-							/* Make sure ri_oldTupleSlot is initialized. */
-							if (unlikely(!resultRelInfo->ri_projectNewInfoValid))
-								ExecInitUpdateProjection(context->mtstate,
-														 resultRelInfo);
-
-							/*
-							 * Save the locked tuple for further calculation of
-							 * the new tuple.
-							 */
-							oldSlot = resultRelInfo->ri_oldTupleSlot;
-							ExecCopySlot(oldSlot, inputslot);
-							ExecMaterializeSlot(oldSlot);
-
-							epqslot = EvalPlanQual(context->epqstate,
-												   resultRelationDesc,
-												   resultRelInfo->ri_RangeTableIndex,
-												   inputslot);
-							if (TupIsNull(epqslot))
-								/* Tuple not passing quals anymore, exiting... */
-								return NULL;
-
-							slot = ExecGetUpdateNewTuple(resultRelInfo,
-														 epqslot, oldSlot);
-							goto redo_act;
+					/* Make sure ri_oldTupleSlot is initialized. */
+					if (unlikely(!resultRelInfo->ri_projectNewInfoValid))
+						ExecInitUpdateProjection(context->mtstate,
+												resultRelInfo);
 
-						case TM_Deleted:
-							/* tuple already deleted; nothing to do */
-							return NULL;
-
-						case TM_SelfModified:
-
-							/*
-							 * This can be reached when following an update
-							 * chain from a tuple updated by another session,
-							 * reaching a tuple that was already updated in
-							 * this transaction. If previously modified by
-							 * this command, ignore the redundant update,
-							 * otherwise error out.
-							 *
-							 * See also TM_SelfModified response to
-							 * table_tuple_update() above.
-							 */
-							if (context->tmfd.cmax != estate->es_output_cid)
-								ereport(ERROR,
-										(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
-										 errmsg("tuple to be updated was already modified by an operation triggered by the current command"),
-										 errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
-							return NULL;
-
-						default:
-							/* see table_tuple_lock call in ExecDelete() */
-							elog(ERROR, "unexpected table_tuple_lock status: %u",
-								 result);
-							return NULL;
-					}
+					/*
+					 * Save the locked tuple for further calculation of
+					 * the new tuple.
+					 */
+					oldSlot = resultRelInfo->ri_oldTupleSlot;
+					ExecCopySlot(oldSlot, inputslot);
+					ExecMaterializeSlot(oldSlot);
+					Assert(context->tmfd.traversed);
+
+					epqslot = EvalPlanQual(context->epqstate,
+										   resultRelationDesc,
+										   resultRelInfo->ri_RangeTableIndex,
+										   inputslot);
+					if (TupIsNull(epqslot))
+						/* Tuple not passing quals anymore, exiting... */
+						return NULL;
+					slot = ExecGetUpdateNewTuple(resultRelInfo,
+												 epqslot, oldSlot);
+					goto redo_act;
 				}
 
 				break;
@@ -2726,7 +2653,7 @@ ExecOnConflictUpdate(ModifyTableContext *context,
 	*returning = ExecUpdate(context, resultRelInfo,
 							conflictTid, NULL,
 							resultRelInfo->ri_onConflict->oc_ProjSlot,
-							canSetTag);
+							canSetTag, true);
 
 	/*
 	 * Clear out existing tuple, as there might not be another conflict among
@@ -3897,7 +3824,7 @@ ExecModifyTable(PlanState *pstate)
 
 				/* Now apply the update. */
 				slot = ExecUpdate(&context, resultRelInfo, tupleid, oldtuple,
-								  slot, node->canSetTag);
+								  slot, node->canSetTag, false);
 				break;
 
 			case CMD_DELETE:
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 8d74d1b7e30..c18372c7032 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -191,6 +191,32 @@ typedef struct HeapPageFreeze
 
 } HeapPageFreeze;
 
+/*
+ * The data structure allowing to pass to heapam_tuple_lock_context() and
+ * heap_lock_tuple() previous efforts on finding the target tuple.
+ */
+typedef struct
+{
+	/*
+	 * If valid buffer is given then it must be an exclusively locked buffer
+	 * containing the target tuple.
+	 */
+	Buffer		buffer;
+
+	/*
+	 * If valid buffer is given then it must be visibility map buffer
+	 * corresponding to the page containing the target tuple.
+	 */
+	Buffer		vmbuffer;
+
+	/*
+	 * A flag inficating that we've previously obtained a tuple lock in
+	 * the target mode.
+	 */
+	bool		have_tuple_lock;
+} HeapLockContext;
+
+
 /* ----------------
  *		function prototypes for heap access method
  *
@@ -243,17 +269,22 @@ extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
 							  BulkInsertState bistate);
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
 							 CommandId cid, Snapshot crosscheck, bool wait,
-							 struct TM_FailureData *tmfd, bool changingPart);
+							 struct TM_FailureData *tmfd, bool changingPart,
+							 Snapshot snapshot,
+							 LazyTupleTableSlot *lockedSlot);
 extern void heap_finish_speculative(Relation relation, ItemPointer tid);
 extern void heap_abort_speculative(Relation relation, ItemPointer tid);
 extern TM_Result heap_update(Relation relation, ItemPointer otid,
 							 HeapTuple newtup,
 							 CommandId cid, Snapshot crosscheck, bool wait,
-							 struct TM_FailureData *tmfd, LockTupleMode *lockmode);
+							 struct TM_FailureData *tmfd, LockTupleMode *lockmode,
+							 Snapshot snapshot,
+							 LazyTupleTableSlot *lockedSlot);
 extern TM_Result heap_lock_tuple(Relation relation, HeapTuple tuple,
 								 CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy,
 								 bool follow_updates,
-								 Buffer *buffer, struct TM_FailureData *tmfd);
+								 HeapLockContext *context,
+								 struct TM_FailureData *tmfd);
 
 extern void heap_inplace_update(Relation relation, HeapTuple tuple);
 extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple,
@@ -328,4 +359,12 @@ extern bool ResolveCminCmaxDuringDecoding(struct HTAB *tuplecid_data,
 extern void HeapCheckForSerializableConflictOut(bool visible, Relation relation, HeapTuple tuple,
 												Buffer buffer, Snapshot snapshot);
 
+extern TM_Result heapam_tuple_lock_context(Relation relation, ItemPointer tid,
+										   Snapshot snapshot,
+										   TupleTableSlot *slot,
+										   CommandId cid, LockTupleMode mode,
+										   LockWaitPolicy wait_policy,
+										   uint8 flags, TM_FailureData *tmfd,
+										   HeapLockContext *context);
+
 #endif							/* HEAPAM_H */
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 652e96f1b0b..6f1eed71307 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -514,7 +514,8 @@ typedef struct TableAmRoutine
 								 Snapshot crosscheck,
 								 bool wait,
 								 TM_FailureData *tmfd,
-								 bool changingPart);
+								 bool changingPart,
+								 LazyTupleTableSlot *lockedSlot);
 
 	/* see table_tuple_update() for reference about parameters */
 	TM_Result	(*tuple_update) (Relation rel,
@@ -526,7 +527,8 @@ typedef struct TableAmRoutine
 								 bool wait,
 								 TM_FailureData *tmfd,
 								 LockTupleMode *lockmode,
-								 bool *update_indexes);
+								 bool *update_indexes,
+								 LazyTupleTableSlot *lockedSlot);
 
 	/* see table_tuple_lock() for reference about parameters */
 	TM_Result	(*tuple_lock) (Relation rel,
@@ -1441,7 +1443,7 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
 }
 
 /*
- * Delete a tuple.
+ * Delete a tuple (or lock last tuple version if lockedSlot is given).
  *
  * NB: do not call this directly unless prepared to deal with
  * concurrent-update conditions.  Use simple_table_tuple_delete instead.
@@ -1457,6 +1459,8 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
  *	tmfd - filled in failure cases (see below)
  *	changingPart - true iff the tuple is being moved to another partition
  *		table due to an update of the partition key. Otherwise, false.
+ *	lockedSlot - lazy slot to save the locked tuple if should lock the last row
+ *		version during the concurrent update. NULL if not needed.
  *
  * Normal, successful return value is TM_Ok, which means we did actually
  * delete it.  Failure return codes are TM_SelfModified, TM_Updated, and
@@ -1469,15 +1473,17 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
 static inline TM_Result
 table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid,
 				   Snapshot snapshot, Snapshot crosscheck, bool wait,
-				   TM_FailureData *tmfd, bool changingPart)
+				   TM_FailureData *tmfd, bool changingPart,
+				   LazyTupleTableSlot *lockedSlot)
 {
 	return rel->rd_tableam->tuple_delete(rel, tid, cid,
 										 snapshot, crosscheck,
-										 wait, tmfd, changingPart);
+										 wait, tmfd, changingPart,
+										 lockedSlot);
 }
 
 /*
- * Update a tuple.
+ * Update a tuple (or lock last tuple version if lockedSlot is given).
  *
  * NB: do not call this directly unless you are prepared to deal with
  * concurrent-update conditions.  Use simple_table_tuple_update instead.
@@ -1495,7 +1501,9 @@ table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid,
  *	lockmode - filled with lock mode acquired on tuple
  *  update_indexes - in success cases this is set to true if new index entries
  *		are required for this tuple
- *
+ * 	lockedSlot - lazy slot to save the locked tuple if should lock the last row
+ *		version during the concurrent update. NULL if not needed.
+
  * Normal, successful return value is TM_Ok, which means we did actually
  * update it.  Failure return codes are TM_SelfModified, TM_Updated, and
  * TM_BeingModified (the last only possible if wait == false).
@@ -1514,12 +1522,13 @@ static inline TM_Result
 table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot,
 				   CommandId cid, Snapshot snapshot, Snapshot crosscheck,
 				   bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode,
-				   bool *update_indexes)
+				   bool *update_indexes, LazyTupleTableSlot *lockedSlot)
 {
 	return rel->rd_tableam->tuple_update(rel, otid, slot,
 										 cid, snapshot, crosscheck,
 										 wait, tmfd,
-										 lockmode, update_indexes);
+										 lockmode, update_indexes,
+										 lockedSlot);
 }
 
 /*
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index 882be39f029..61b7b16ef75 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -300,6 +300,44 @@ typedef struct MinimalTupleTableSlot
 #define TupIsNull(slot) \
 	((slot) == NULL || TTS_EMPTY(slot))
 
+/*----------
+ * LazyTupleTableSlot -- a lazy version of TupleTableSlot.
+ *
+ * Sometimes caller might need to pass to the function a slot, which most
+ * likely will reain undemanded.
+ * Preallocating such slot would be a waste of resources in the  majority of cases.
+ * Lazy slot is aimed to resolve this problem.  It is basically a
+ * promise to allocate the slot once it's needed.  Once callee needs the slot,
+ * it could get it using LAZY_TTS_EVAL(lazySlot) macro.
+ */
+typedef struct
+{
+	TupleTableSlot *slot;		/* cached slot or NULL if not yet allocated */
+	TupleTableSlot *(*getSlot) (void *arg); /* callback for slot allocation */
+	void	   *getSlotArg;		/* argument for the callback above */
+} LazyTupleTableSlot;
+
+/*
+ * A constructor for the lazy slot.
+ */
+#define MAKE_LAZY_TTS(lazySlot, callback, arg) \
+	do { \
+		(lazySlot)->slot = NULL; \
+		(lazySlot)->getSlot = callback; \
+		(lazySlot)->getSlotArg = arg; \
+	} while (false)
+
+/*
+ * Macro for lazy slot evaluation.  NULL lazy slot evaluates to NULL slot.
+ * Cached version is used if present.  Use the callback otherwise.
+ */
+#define LAZY_TTS_EVAL(lazySlot) \
+	((lazySlot) ? \
+		((lazySlot)->slot ? \
+			(lazySlot)->slot : \
+			((lazySlot)->slot = (lazySlot)->getSlot((lazySlot)->getSlotArg))) : \
+		NULL)
+
 /* in executor/execTuples.c */
 extern TupleTableSlot *MakeTupleTableSlot(TupleDesc tupleDesc,
 										  const TupleTableSlotOps *tts_ops);
-- 
2.37.1 (Apple Git-137.1)

