From 2bcbe0ffc5df81b38e6c6f0093eb486669c7a3b2 Mon Sep 17 00:00:00 2001
From: kommih <haribabuk@fast.au.fujitsu.com>
Date: Fri, 3 Aug 2018 11:07:52 +1000
Subject: [PATCH 1/2] COPY's multi_insert path deal with bunch of slots

Support of passing slots instead of tuples when doing
multi insert.
---
 src/backend/access/heap/heapam.c |  31 ++++++----
 src/backend/commands/copy.c      | 100 +++++++++++++++----------------
 src/include/access/heapam.h      |   3 +-
 src/include/access/tableam.h     |   6 +-
 4 files changed, 74 insertions(+), 66 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 40c1a5432d..7d0d1dc234 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2648,7 +2648,7 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
  * temporary context before calling this, if that's a problem.
  */
 void
-heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+heap_multi_insert(Relation relation, TupleTableSlot **slots, int nslots,
 				  CommandId cid, int options, BulkInsertState bistate)
 {
 	TransactionId xid = GetCurrentTransactionId();
@@ -2666,11 +2666,18 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 	saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
 												   HEAP_DEFAULT_FILLFACTOR);
 
-	/* Toast and set header data in all the tuples */
-	heaptuples = palloc(ntuples * sizeof(HeapTuple));
-	for (i = 0; i < ntuples; i++)
-		heaptuples[i] = heap_prepare_insert(relation, tuples[i],
+	/* Toast and set header data in all the slots */
+	heaptuples = palloc(nslots * sizeof(HeapTuple));
+	for (i = 0; i < nslots; i++)
+	{
+		heaptuples[i] = heap_prepare_insert(relation, ExecGetHeapTupleFromSlot(slots[i]),
 											xid, cid, options);
+		if (slots[i]->tts_tupleOid != InvalidOid)
+			HeapTupleSetOid(heaptuples[i], slots[i]->tts_tupleOid);
+
+		if (slots[i]->tts_tableOid != InvalidOid)
+			heaptuples[i]->t_tableOid = slots[i]->tts_tableOid;
+	}
 
 	/*
 	 * Allocate some memory to use for constructing the WAL record. Using
@@ -2706,7 +2713,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 	CheckForSerializableConflictIn(relation, NULL, InvalidBuffer);
 
 	ndone = 0;
-	while (ndone < ntuples)
+	while (ndone < nslots)
 	{
 		Buffer		buffer;
 		Buffer		vmbuffer = InvalidBuffer;
@@ -2732,7 +2739,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 		 * Put that on the page, and then as many other tuples as fit.
 		 */
 		RelationPutHeapTuple(relation, buffer, heaptuples[ndone], false);
-		for (nthispage = 1; ndone + nthispage < ntuples; nthispage++)
+		for (nthispage = 1; ndone + nthispage < nslots; nthispage++)
 		{
 			HeapTuple	heaptup = heaptuples[ndone + nthispage];
 
@@ -2841,7 +2848,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 			 * emitted by this call to heap_multi_insert(). Needed for logical
 			 * decoding so it knows when to cleanup temporary data.
 			 */
-			if (ndone + nthispage == ntuples)
+			if (ndone + nthispage == nslots)
 				xlrec->flags |= XLH_INSERT_LAST_IN_MULTI;
 
 			if (init)
@@ -2904,7 +2911,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 	 */
 	if (IsCatalogRelation(relation))
 	{
-		for (i = 0; i < ntuples; i++)
+		for (i = 0; i < nslots; i++)
 			CacheInvalidateHeapTuple(relation, heaptuples[i], NULL);
 	}
 
@@ -2913,10 +2920,10 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 	 * nothing for untoasted tuples (tuples[i] == heaptuples[i)], but it's
 	 * probably faster to always copy than check.
 	 */
-	for (i = 0; i < ntuples; i++)
-		tuples[i]->t_self = heaptuples[i]->t_self;
+	for (i = 0; i < nslots; i++)
+		slots[i]->tts_tid = heaptuples[i]->t_self;
 
-	pgstat_count_heap_insert(relation, ntuples);
+	pgstat_count_heap_insert(relation, nslots);
 }
 
 /*
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index d49734ddab..62ee8cfea7 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -306,9 +306,9 @@ static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
 			 Datum *values, bool *nulls);
 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
 					CommandId mycid, int hi_options,
-					ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
+					ResultRelInfo *resultRelInfo,
 					BulkInsertState bistate,
-					int nBufferedTuples, HeapTuple *bufferedTuples,
+					int nslots, TupleTableSlot **slots,
 					uint64 firstBufferedLineNo);
 static bool CopyReadLine(CopyState cstate);
 static bool CopyReadLineText(CopyState cstate);
@@ -2310,7 +2310,6 @@ CopyFrom(CopyState cstate)
 	EState	   *estate = CreateExecutorState(); /* for ExecConstraints() */
 	ModifyTableState *mtstate;
 	ExprContext *econtext;
-	TupleTableSlot *myslot;
 	MemoryContext oldcontext = CurrentMemoryContext;
 
 	ErrorContextCallback errcallback;
@@ -2319,12 +2318,11 @@ CopyFrom(CopyState cstate)
 	void       *bistate;
 	uint64		processed = 0;
 	bool		useHeapMultiInsert;
-	int			nBufferedTuples = 0;
+	int			nslots = 0;
 	int			prev_leaf_part_index = -1;
 
-#define MAX_BUFFERED_TUPLES 1000
-	HeapTuple  *bufferedTuples = NULL;	/* initialize to silence warning */
-	Size		bufferedTuplesSize = 0;
+#define MAX_BUFFERED_SLOTS 1000
+	TupleTableSlot  **slots = NULL;	/* initialize to silence warning */
 	uint64		firstBufferedLineNo = 0;
 
 	Assert(cstate->rel);
@@ -2467,10 +2465,6 @@ CopyFrom(CopyState cstate)
 	estate->es_result_relation_info = resultRelInfo;
 	estate->es_range_table = cstate->range_table;
 
-	/* Set up a tuple slot too */
-	myslot = ExecInitExtraTupleSlot(estate, tupDesc,
-									TTS_TYPE_HEAPTUPLE);
-
 	/*
 	 * Set up a ModifyTableState so we can let FDW(s) init themselves for
 	 * foreign-table result relation(s).
@@ -2541,7 +2535,7 @@ CopyFrom(CopyState cstate)
 	else
 	{
 		useHeapMultiInsert = true;
-		bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
+		slots = palloc(MAX_BUFFERED_SLOTS * sizeof(TupleTableSlot *));
 	}
 
 	/*
@@ -2569,11 +2563,17 @@ CopyFrom(CopyState cstate)
 		TupleTableSlot *slot;
 		bool		skip_tuple;
 		Oid			loaded_oid = InvalidOid;
+		int			natts = resultRelInfo->ri_RelationDesc->rd_att->natts;
+		int			cnt;
 
 		CHECK_FOR_INTERRUPTS();
 
-		if (nBufferedTuples == 0)
+		if (nslots == 0)
 		{
+			/* Reset Tupletable slots if any */
+			ExecResetTupleTable(estate->es_tupleTable, false);
+			estate->es_tupleTable = NIL;
+
 			/*
 			 * Reset the per-tuple exprcontext. We can only do this if the
 			 * tuple buffer is empty. (Calling the context the per-tuple
@@ -2588,25 +2588,32 @@ CopyFrom(CopyState cstate)
 		if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
 			break;
 
-		/* And now we can form the input tuple. */
-		tuple = heap_form_tuple(tupDesc, values, nulls);
+		slot = ExecInitExtraTupleSlot(estate,
+						RelationGetDescr(resultRelInfo->ri_RelationDesc),
+						useHeapMultiInsert ? TTS_TYPE_VIRTUAL : TTS_TYPE_HEAPTUPLE);
+
+		/* Directly store the values/nulls array in the slot */
+		memcpy(slot->tts_isnull, nulls, sizeof(bool) * natts);
+		for (cnt = 0; cnt < natts; cnt++)
+		{
+			if (!slot->tts_isnull[cnt])
+				slot->tts_values[cnt] = values[cnt];
+		}
+
+		ExecStoreVirtualTuple(slot);
 
 		if (loaded_oid != InvalidOid)
-			HeapTupleSetOid(tuple, loaded_oid);
+			slot->tts_tupleOid = loaded_oid;
 
 		/*
 		 * Constraints might reference the tableoid column, so initialize
 		 * t_tableOid before evaluating them.
 		 */
-		tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+		slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
 
 		/* Triggers and stuff need to be invoked in query context. */
 		MemoryContextSwitchTo(oldcontext);
 
-		/* Place tuple in tuple slot --- but slot shouldn't free it */
-		slot = myslot;
-		ExecStoreTuple(tuple, slot, InvalidBuffer, false);
-
 		/* Determine the partition to heap_insert the tuple into */
 		if (cstate->partition_tuple_routing)
 		{
@@ -2659,6 +2666,9 @@ CopyFrom(CopyState cstate)
 			 */
 			estate->es_result_relation_info = resultRelInfo;
 
+			/* FIXME: Get the HeapTuple from slot */
+			tuple = ExecGetHeapTupleFromSlot(slot);
+
 			/*
 			 * If we're capturing transition tuples, we might need to convert
 			 * from the partition rowtype to parent rowtype.
@@ -2699,6 +2709,7 @@ CopyFrom(CopyState cstate)
 											  &slot);
 
 			tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+			slot->tts_tableOid = tuple->t_tableOid;
 		}
 
 		skip_tuple = false;
@@ -2709,8 +2720,6 @@ CopyFrom(CopyState cstate)
 		{
 			if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
 				skip_tuple = true;	/* "do nothing" */
-			else				/* trigger might have changed tuple */
-				tuple = ExecGetHeapTupleFromSlot(slot);
 		}
 
 		if (!skip_tuple)
@@ -2746,10 +2755,9 @@ CopyFrom(CopyState cstate)
 				if (useHeapMultiInsert)
 				{
 					/* Add this tuple to the tuple buffer */
-					if (nBufferedTuples == 0)
+					if (nslots == 0)
 						firstBufferedLineNo = cstate->cur_lineno;
-					bufferedTuples[nBufferedTuples++] = tuple;
-					bufferedTuplesSize += tuple->t_len;
+					slots[nslots++] = slot;
 
 					/*
 					 * If the buffer filled up, flush it.  Also flush if the
@@ -2757,15 +2765,13 @@ CopyFrom(CopyState cstate)
 					 * large, to avoid using large amounts of memory for the
 					 * buffer when the tuples are exceptionally wide.
 					 */
-					if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
-						bufferedTuplesSize > 65535)
+					if (nslots == MAX_BUFFERED_SLOTS)
 					{
 						CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-											resultRelInfo, myslot, bistate,
-											nBufferedTuples, bufferedTuples,
+											resultRelInfo, bistate,
+											nslots, slots,
 											firstBufferedLineNo);
-						nBufferedTuples = 0;
-						bufferedTuplesSize = 0;
+						nslots = 0;
 					}
 				}
 				else
@@ -2783,15 +2789,12 @@ CopyFrom(CopyState cstate)
 						if (slot == NULL)	/* "do nothing" */
 							goto next_tuple;
 
-						/* FDW might have changed tuple */
-						tuple = ExecGetHeapTupleFromSlot(slot);
-
 						/*
 						 * AFTER ROW Triggers might reference the tableoid
 						 * column, so initialize t_tableOid before evaluating
 						 * them.
 						 */
-						tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+						slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
 					}
 					else
 					{
@@ -2834,10 +2837,10 @@ next_tuple:
 	}
 
 	/* Flush any remaining buffered tuples */
-	if (nBufferedTuples > 0)
+	if (nslots > 0)
 		CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-							resultRelInfo, myslot, bistate,
-							nBufferedTuples, bufferedTuples,
+							resultRelInfo, bistate,
+							nslots, slots,
 							firstBufferedLineNo);
 
 	/* Done, clean up */
@@ -2900,8 +2903,7 @@ next_tuple:
 static void
 CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 					int hi_options, ResultRelInfo *resultRelInfo,
-					TupleTableSlot *myslot, BulkInsertState bistate,
-					int nBufferedTuples, HeapTuple *bufferedTuples,
+					BulkInsertState bistate, int nslots, TupleTableSlot **slots,
 					uint64 firstBufferedLineNo)
 {
 	MemoryContext oldcontext;
@@ -2921,8 +2923,8 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 	 */
 	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 	table_multi_insert(cstate->rel,
-					   bufferedTuples,
-					   nBufferedTuples,
+					   slots,
+					   nslots,
 					   mycid,
 					   hi_options,
 					   bistate);
@@ -2934,16 +2936,15 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 	 */
 	if (resultRelInfo->ri_NumIndices > 0)
 	{
-		for (i = 0; i < nBufferedTuples; i++)
+		for (i = 0; i < nslots; i++)
 		{
 			List	   *recheckIndexes;
 
 			cstate->cur_lineno = firstBufferedLineNo + i;
-			ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
 			recheckIndexes =
-				ExecInsertIndexTuples(myslot, estate, false, NULL, NIL);
+				ExecInsertIndexTuples(slots[i], estate, false, NULL, NIL);
 			ExecARInsertTriggers(estate, resultRelInfo,
-								 myslot,
+								 slots[i],
 								 recheckIndexes, cstate->transition_capture);
 			list_free(recheckIndexes);
 		}
@@ -2957,12 +2958,11 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 			 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
 			  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
 	{
-		for (i = 0; i < nBufferedTuples; i++)
+		for (i = 0; i < nslots; i++)
 		{
 			cstate->cur_lineno = firstBufferedLineNo + i;
-			ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
 			ExecARInsertTriggers(estate, resultRelInfo,
-								 myslot,
+								 slots[i],
 								 NIL, cstate->transition_capture);
 		}
 	}
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 5f89b5b174..a36a0c49e9 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -16,6 +16,7 @@
 
 #include "access/sdir.h"
 #include "access/skey.h"
+#include "executor/tuptable.h"
 #include "nodes/lockoptions.h"
 #include "nodes/primnodes.h"
 #include "storage/bufpage.h"
@@ -168,7 +169,7 @@ extern void ReleaseBulkInsertStatePin(BulkInsertState bistate);
 
 extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 			int options, BulkInsertState bistate);
-extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+extern void heap_multi_insert(Relation relation, TupleTableSlot **slots, int nslots,
 				  CommandId cid, int options, BulkInsertState bistate);
 extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
 			CommandId cid, Snapshot crosscheck, bool wait,
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index df60ba3316..9912a171fb 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -103,7 +103,7 @@ typedef HTSU_Result (*TupleLock_function) (Relation relation,
 										   uint8 flags,
 										   HeapUpdateFailureData *hufd);
 
-typedef void (*MultiInsert_function) (Relation relation, HeapTuple *tuples, int ntuples,
+typedef void (*MultiInsert_function) (Relation relation, TupleTableSlot **slots, int nslots,
 									  CommandId cid, int options, BulkInsertState bistate);
 
 typedef void (*TupleGetLatestTid_function) (Relation relation,
@@ -611,10 +611,10 @@ table_fetch_follow_check(Relation rel,
  *	table_multi_insert	- insert multiple tuple into a table
  */
 static inline void
-table_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+table_multi_insert(Relation relation, TupleTableSlot **slots, int nslots,
 					 CommandId cid, int options, BulkInsertState bistate)
 {
-	relation->rd_tableamroutine->multi_insert(relation, tuples, ntuples,
+	relation->rd_tableamroutine->multi_insert(relation, slots, nslots,
 										   cid, options, bistate);
 }
 
-- 
2.18.0.windows.1

