Hi,

On 2019-01-29 07:22:16 -0800, Andres Freund wrote:
> On January 29, 2019 6:25:59 AM PST, Tomas Vondra 
> <tomas.von...@2ndquadrant.com> wrote:
> >On 1/29/19 8:18 AM, David Rowley wrote:
> >> ...
> >> Here are my performance tests of with and without your change to the
> >> memory contexts (I missed where you posted your results).
> >> 
> >> $ cat bench.pl
> >> for (my $i=0; $i < 8912891; $i++) {
> >> print "1\n1\n2\n2\n";
> >> }
> >> 36a1281f86c: (with your change)
> >> 
> >> postgres=# copy listp from program $$perl ~/bench.pl$$ delimiter '|';
> >> COPY 35651564
> >> Time: 26825.142 ms (00:26.825)
> >> Time: 27202.117 ms (00:27.202)
> >> Time: 26266.705 ms (00:26.267)
> >> 
> >> 4be058fe9ec: (before your change)
> >> 
> >> postgres=# copy listp from program $$perl ~/bench.pl$$ delimiter '|';
> >> COPY 35651564
> >> Time: 25645.460 ms (00:25.645)
> >> Time: 25698.193 ms (00:25.698)
> >> Time: 25737.251 ms (00:25.737)
> >> 
> >
> >How do I reproduce this? I don't see this test in the thread you
> >linked,
> >so I'm not sure how many partitions you were using, what's the
> >structure
> >of the table etc.
> 
> I think I might have a patch addressing the problem incidentally. For
> pluggable storage I slotified copy.c, which also removes the first
> heap_form_tuple. Quite possible that nothing more is needed. I've
> removed the batch context altogether in yesterday's rebase, there was
> no need anymore.

Here's a version that applies onto HEAD. It needs a bit more cleanup
(I'm not sure I like the addition of ri_batchInsertSlots to store slots
for each partition, there's some duplicated code around slot array and
slot creation, docs for the multi insert callback).

When measuring inserting into unlogged partitions (to remove the
overhead of WAL logging, which makes it harder to see the raw
performance difference), I see a few percent gains of the slotified
version over the current code.  Same with insertions that have fewer
partition switches.

Sorry for posting this here - David, it'd be cool if you could take a
look anyway. You've played a lot with this code.

Btw, for v13, I hope that either I or somebody else cleans up CopyFrom()
a bit - it's gotten really hard to understand. I think we need to split
it into a few pieces.

Greetings,

Andres Freund
>From 3e8e43232b08eac40bb5f634be158b8577fc844f Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Sun, 20 Jan 2019 13:28:26 -0800
Subject: [PATCH v23] tableam: multi_insert and slotify COPY.

Author:
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/access/heap/heapam.c         |  23 +-
 src/backend/access/heap/heapam_handler.c |   1 +
 src/backend/commands/copy.c              | 322 ++++++++++++-----------
 src/include/access/heapam.h              |   3 +-
 src/include/access/tableam.h             |  14 +
 src/include/nodes/execnodes.h            |   6 +
 6 files changed, 204 insertions(+), 165 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index f3812dd5871..71d14789c98 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2106,7 +2106,7 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
  * temporary context before calling this, if that's a problem.
  */
 void
-heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 				  CommandId cid, int options, BulkInsertState bistate)
 {
 	TransactionId xid = GetCurrentTransactionId();
@@ -2127,11 +2127,18 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 	saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
 												   HEAP_DEFAULT_FILLFACTOR);
 
-	/* Toast and set header data in all the tuples */
+	/* Toast and set header data in all the slots */
 	heaptuples = palloc(ntuples * sizeof(HeapTuple));
 	for (i = 0; i < ntuples; i++)
-		heaptuples[i] = heap_prepare_insert(relation, tuples[i],
-											xid, cid, options);
+	{
+		HeapTuple tuple;
+
+		tuple = ExecFetchSlotHeapTuple(slots[i], true, NULL);
+		slots[i]->tts_tableOid = RelationGetRelid(relation);
+		tuple->t_tableOid = slots[i]->tts_tableOid;
+		heaptuples[i] = heap_prepare_insert(relation, tuple, xid, cid,
+											options);
+	}
 
 	/*
 	 * We're about to do the actual inserts -- but check for conflict first,
@@ -2361,13 +2368,9 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 			CacheInvalidateHeapTuple(relation, heaptuples[i], NULL);
 	}
 
-	/*
-	 * Copy t_self fields back to the caller's original tuples. This does
-	 * nothing for untoasted tuples (tuples[i] == heaptuples[i)], but it's
-	 * probably faster to always copy than check.
-	 */
+	/* copy t_self fields back to the caller's slots */
 	for (i = 0; i < ntuples; i++)
-		tuples[i]->t_self = heaptuples[i]->t_self;
+		slots[i]->tts_tid = heaptuples[i]->t_self;
 
 	pgstat_count_heap_insert(relation, ntuples);
 }
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 0e1a1fe7b6f..692b949d6fc 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -539,6 +539,7 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_complete_speculative = heapam_tuple_complete_speculative,
 	.tuple_delete = heapam_tuple_delete,
 	.tuple_update = heapam_tuple_update,
+	.multi_insert = heap_multi_insert,
 	.tuple_lock = heapam_tuple_lock,
 
 	.tuple_fetch_row_version = heapam_fetch_row_version,
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 705df8900ba..5ecc4f42835 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -315,13 +315,12 @@ static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
 static void EndCopyTo(CopyState cstate);
 static uint64 DoCopyTo(CopyState cstate);
 static uint64 CopyTo(CopyState cstate);
-static void CopyOneRowTo(CopyState cstate,
-			 Datum *values, bool *nulls);
+static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot);
 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
 					CommandId mycid, int hi_options,
-					ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
+					ResultRelInfo *resultRelInfo,
 					BulkInsertState bistate,
-					int nBufferedTuples, HeapTuple *bufferedTuples,
+					int nBufferedTuples, TupleTableSlot **bufferedSlots,
 					uint64 firstBufferedLineNo);
 static bool CopyReadLine(CopyState cstate);
 static bool CopyReadLineText(CopyState cstate);
@@ -2072,33 +2071,27 @@ CopyTo(CopyState cstate)
 
 	if (cstate->rel)
 	{
-		Datum	   *values;
-		bool	   *nulls;
+		TupleTableSlot *slot;
 		TableScanDesc scandesc;
-		HeapTuple	tuple;
-
-		values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
-		nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
 
 		scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
+		slot = table_slot_create(cstate->rel, NULL);
 
 		processed = 0;
-		while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
+		while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot))
 		{
 			CHECK_FOR_INTERRUPTS();
 
-			/* Deconstruct the tuple ... faster than repeated heap_getattr */
-			heap_deform_tuple(tuple, tupDesc, values, nulls);
+			/* Deconstruct the tuple ... */
+			slot_getallattrs(slot);
 
 			/* Format and send the data */
-			CopyOneRowTo(cstate, values, nulls);
+			CopyOneRowTo(cstate, slot);
 			processed++;
 		}
 
+		ExecDropSingleTupleTableSlot(slot);
 		table_endscan(scandesc);
-
-		pfree(values);
-		pfree(nulls);
 	}
 	else
 	{
@@ -2124,7 +2117,7 @@ CopyTo(CopyState cstate)
  * Emit one row during CopyTo().
  */
 static void
-CopyOneRowTo(CopyState cstate, Datum *values, bool *nulls)
+CopyOneRowTo(CopyState cstate, TupleTableSlot *slot)
 {
 	bool		need_delim = false;
 	FmgrInfo   *out_functions = cstate->out_functions;
@@ -2141,11 +2134,14 @@ CopyOneRowTo(CopyState cstate, Datum *values, bool *nulls)
 		CopySendInt16(cstate, list_length(cstate->attnumlist));
 	}
 
+	/* Make sure the tuple is fully deconstructed */
+	slot_getallattrs(slot);
+
 	foreach(cur, cstate->attnumlist)
 	{
 		int			attnum = lfirst_int(cur);
-		Datum		value = values[attnum - 1];
-		bool		isnull = nulls[attnum - 1];
+		Datum		value = slot->tts_values[attnum - 1];
+		bool		isnull = slot->tts_isnull[attnum - 1];
 
 		if (!cstate->binary)
 		{
@@ -2310,19 +2306,14 @@ limit_printout_length(const char *str)
 uint64
 CopyFrom(CopyState cstate)
 {
-	HeapTuple	tuple;
-	TupleDesc	tupDesc;
-	Datum	   *values;
-	bool	   *nulls;
 	ResultRelInfo *resultRelInfo;
 	ResultRelInfo *target_resultRelInfo;
 	ResultRelInfo *prevResultRelInfo = NULL;
 	EState	   *estate = CreateExecutorState(); /* for ExecConstraints() */
 	ModifyTableState *mtstate;
 	ExprContext *econtext;
-	TupleTableSlot *myslot;
+	TupleTableSlot *singleslot = NULL;
 	MemoryContext oldcontext = CurrentMemoryContext;
-	MemoryContext batchcontext;
 
 	PartitionTupleRouting *proute = NULL;
 	ErrorContextCallback errcallback;
@@ -2338,8 +2329,7 @@ CopyFrom(CopyState cstate)
 
 #define MAX_BUFFERED_TUPLES 1000
 #define RECHECK_MULTI_INSERT_THRESHOLD 1000
-	HeapTuple  *bufferedTuples = NULL;	/* initialize to silence warning */
-	Size		bufferedTuplesSize = 0;
+	Size		bufferedInputSize = 0;
 	uint64		firstBufferedLineNo = 0;
 	uint64		lastPartitionSampleLineNo = 0;
 	uint64		nPartitionChanges = 0;
@@ -2381,8 +2371,6 @@ CopyFrom(CopyState cstate)
 							RelationGetRelationName(cstate->rel))));
 	}
 
-	tupDesc = RelationGetDescr(cstate->rel);
-
 	/*----------
 	 * Check to see if we can avoid writing WAL
 	 *
@@ -2517,10 +2505,6 @@ CopyFrom(CopyState cstate)
 
 	ExecInitRangeTable(estate, cstate->range_table);
 
-	/* Set up a tuple slot too */
-	myslot = ExecInitExtraTupleSlot(estate, tupDesc,
-									&TTSOpsHeapTuple);
-
 	/*
 	 * Set up a ModifyTableState so we can let FDW(s) init themselves for
 	 * foreign-table result relation(s).
@@ -2642,7 +2626,20 @@ CopyFrom(CopyState cstate)
 		else
 			insertMethod = CIM_MULTI;
 
-		bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
+		resultRelInfo->ri_batchInsertSlots =
+			palloc0(MAX_BUFFERED_TUPLES * sizeof(TupleTableSlot *));
+	}
+
+	/*
+	 * If not using batch mode (which allocates slots as needed) set up a
+	 * tuple slot too. When inserting into a partitioned table, we also need
+	 * one, even if we might batch insert, to read the tuple in the root
+	 * partition's form.
+	 */
+	if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
+	{
+		singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
+									   &estate->es_tupleTable);
 	}
 
 	has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
@@ -2659,9 +2656,6 @@ CopyFrom(CopyState cstate)
 	 */
 	ExecBSInsertTriggers(estate, resultRelInfo);
 
-	values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
-	nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
-
 	bistate = GetBulkInsertState();
 	econtext = GetPerTupleExprContext(estate);
 
@@ -2671,17 +2665,9 @@ CopyFrom(CopyState cstate)
 	errcallback.previous = error_context_stack;
 	error_context_stack = &errcallback;
 
-	/*
-	 * Set up memory context for batches. For cases without batching we could
-	 * use the per-tuple context, but it's simpler to just use it every time.
-	 */
-	batchcontext = AllocSetContextCreate(CurrentMemoryContext,
-										 "batch context",
-										 ALLOCSET_DEFAULT_SIZES);
-
 	for (;;)
 	{
-		TupleTableSlot *slot;
+		TupleTableSlot *myslot;
 		bool		skip_tuple;
 
 		CHECK_FOR_INTERRUPTS();
@@ -2692,20 +2678,37 @@ CopyFrom(CopyState cstate)
 		 */
 		ResetPerTupleExprContext(estate);
 
+		if (insertMethod == CIM_SINGLE || proute)
+		{
+			myslot = singleslot;
+			Assert(myslot != NULL);
+		}
+		else
+		{
+			Assert(resultRelInfo == target_resultRelInfo);
+
+			if (resultRelInfo->ri_batchInsertSlots[nBufferedTuples] == NULL)
+			{
+				resultRelInfo->ri_batchInsertSlots[nBufferedTuples] =
+					table_slot_create(resultRelInfo->ri_RelationDesc,
+									  &estate->es_tupleTable);
+			}
+			myslot = resultRelInfo->ri_batchInsertSlots[nBufferedTuples];
+		}
+
 		/*
 		 * Switch to per-tuple context before calling NextCopyFrom, which does
 		 * evaluate default expressions etc. and requires per-tuple context.
 		 */
 		MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 
-		if (!NextCopyFrom(cstate, econtext, values, nulls))
+		ExecClearTuple(myslot);
+
+		/* Directly store the values/nulls array in the slot */
+		if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
 			break;
 
-		/* Switch into per-batch memory context before forming the tuple. */
-		MemoryContextSwitchTo(batchcontext);
-
-		/* And now we can form the input tuple. */
-		tuple = heap_form_tuple(tupDesc, values, nulls);
+		ExecStoreVirtualTuple(myslot);
 
 		/*
 		 * Constraints might reference the tableoid column, so (re-)initialize
@@ -2716,10 +2719,6 @@ CopyFrom(CopyState cstate)
 		/* Triggers and stuff need to be invoked in query context. */
 		MemoryContextSwitchTo(oldcontext);
 
-		/* Place tuple in tuple slot --- but slot shouldn't free it */
-		slot = myslot;
-		ExecStoreHeapTuple(tuple, slot, false);
-
 		if (cstate->whereClause)
 		{
 			econtext->ecxt_scantuple = myslot;
@@ -2738,7 +2737,7 @@ CopyFrom(CopyState cstate)
 			 * if the found partition is not suitable for INSERTs.
 			 */
 			resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
-											  proute, slot, estate);
+											  proute, myslot, estate);
 
 			if (prevResultRelInfo != resultRelInfo)
 			{
@@ -2752,38 +2751,15 @@ CopyFrom(CopyState cstate)
 					 */
 					if (nBufferedTuples > 0)
 					{
-						MemoryContext	oldcontext;
+						TupleTableSlot **slots =
+							prevResultRelInfo->ri_batchInsertSlots;
 
 						CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-											prevResultRelInfo, myslot, bistate,
-											nBufferedTuples, bufferedTuples,
+											prevResultRelInfo, bistate,
+											nBufferedTuples,
+											slots,
 											firstBufferedLineNo);
 						nBufferedTuples = 0;
-						bufferedTuplesSize = 0;
-
-						/*
-						 * The tuple is already allocated in the batch context, which
-						 * we want to reset.  So to keep the tuple we copy it into the
-						 * short-lived (per-tuple) context, reset the batch context
-						 * and then copy it back into the per-batch one.
-						 */
-						oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-						tuple = heap_copytuple(tuple);
-						MemoryContextSwitchTo(oldcontext);
-
-						/* cleanup the old batch */
-						MemoryContextReset(batchcontext);
-
-						/* copy the tuple back to the per-batch context */
-						oldcontext = MemoryContextSwitchTo(batchcontext);
-						tuple = heap_copytuple(tuple);
-						MemoryContextSwitchTo(oldcontext);
-
-						/*
-						 * Also push the tuple copy to the slot (resetting the context
-						 * invalidated the slot contents).
-						 */
-						ExecStoreHeapTuple(tuple, slot, false);
 					}
 
 					nPartitionChanges++;
@@ -2878,26 +2854,64 @@ CopyFrom(CopyState cstate)
 			 * rowtype.
 			 */
 			map = resultRelInfo->ri_PartitionInfo->pi_RootToPartitionMap;
-			if (map != NULL)
+			if (insertMethod == CIM_SINGLE ||
+				(insertMethod == CIM_MULTI_CONDITIONAL && !leafpart_use_multi_insert))
 			{
-				TupleTableSlot *new_slot;
-				MemoryContext oldcontext;
+				/* non batch insert */
 
-				new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
-				Assert(new_slot != NULL);
-
-				slot = execute_attr_map_slot(map->attrMap, slot, new_slot);
+				if (map != NULL)
+				{
+					TupleTableSlot *new_slot;
 
+					new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
+					myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
+				}
+			}
+			else
+			{
 				/*
-				 * Get the tuple in the per-batch context, so that it will be
-				 * freed after each batch insert.
+				 * Batch insert into partitioned table.
 				 */
-				oldcontext = MemoryContextSwitchTo(batchcontext);
-				tuple = ExecCopySlotHeapTuple(slot);
-				MemoryContextSwitchTo(oldcontext);
+
+				TupleTableSlot **slots =
+					resultRelInfo->ri_batchInsertSlots;
+				TupleTableSlot *new_slot;
+
+				/* no other path available for partitioned table */
+				Assert(insertMethod == CIM_MULTI_CONDITIONAL);
+
+				/* Ensure partition ResultRelInfo has a batchInsertSlots array */
+				if (!slots)
+				{
+					slots = palloc0(MAX_BUFFERED_TUPLES * sizeof(TupleTableSlot *));
+					resultRelInfo->ri_batchInsertSlots = slots;
+				}
+				if (slots[nBufferedTuples] == NULL)
+				{
+					slots[nBufferedTuples] =
+						table_slot_create(resultRelInfo->ri_RelationDesc,
+										  &estate->es_tupleTable);
+				}
+				new_slot = slots[nBufferedTuples];
+
+				if (map != NULL)
+					myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
+				else
+				{
+					/*
+					 * This looks more expensive than it is (Believe me, I
+					 * optimized it away. Twice). The input is in virtual
+					 * form, and we'll materialize the slot below - for most
+					 * slot types the copy performs the work materialization
+					 * would later require anyway.
+					 */
+					ExecCopySlot(new_slot, myslot);
+					myslot = new_slot;
+				}
 			}
 
-			slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+			/* ensure that triggers etc see the right relation  */
+			myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
 		}
 
 		skip_tuple = false;
@@ -2905,7 +2919,7 @@ CopyFrom(CopyState cstate)
 		/* BEFORE ROW INSERT Triggers */
 		if (has_before_insert_row_trig)
 		{
-			if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
+			if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
 				skip_tuple = true;	/* "do nothing" */
 		}
 
@@ -2918,7 +2932,7 @@ CopyFrom(CopyState cstate)
 			 */
 			if (has_instead_insert_row_trig)
 			{
-				ExecIRInsertTriggers(estate, resultRelInfo, slot);
+				ExecIRInsertTriggers(estate, resultRelInfo, myslot);
 			}
 			else
 			{
@@ -2928,7 +2942,7 @@ CopyFrom(CopyState cstate)
 				 */
 				if (resultRelInfo->ri_FdwRoutine == NULL &&
 					resultRelInfo->ri_RelationDesc->rd_att->constr)
-					ExecConstraints(resultRelInfo, slot, estate);
+					ExecConstraints(resultRelInfo, myslot, estate);
 
 				/*
 				 * Also check the tuple against the partition constraint, if
@@ -2938,7 +2952,7 @@ CopyFrom(CopyState cstate)
 				 */
 				if (resultRelInfo->ri_PartitionCheck &&
 					(proute == NULL || has_before_insert_row_trig))
-					ExecPartitionCheck(resultRelInfo, slot, estate, true);
+					ExecPartitionCheck(resultRelInfo, myslot, estate, true);
 
 				/*
 				 * Perform multi-inserts when enabled, or when loading a
@@ -2950,8 +2964,17 @@ CopyFrom(CopyState cstate)
 					/* Add this tuple to the tuple buffer */
 					if (nBufferedTuples == 0)
 						firstBufferedLineNo = cstate->cur_lineno;
-					bufferedTuples[nBufferedTuples++] = tuple;
-					bufferedTuplesSize += tuple->t_len;
+
+					/*
+					 * The slot previously might point into the per-tuple
+					 * context. For batching it needs to be longer lived.
+					 */
+					ExecMaterializeSlot(myslot);
+
+					Assert(resultRelInfo->ri_batchInsertSlots[nBufferedTuples] == myslot);
+
+					nBufferedTuples++;
+					bufferedInputSize += cstate->line_buf.len;
 
 					/*
 					 * If the buffer filled up, flush it.  Also flush if the
@@ -2960,17 +2983,15 @@ CopyFrom(CopyState cstate)
 					 * buffer when the tuples are exceptionally wide.
 					 */
 					if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
-						bufferedTuplesSize > 65535)
+						bufferedInputSize > 65535)
 					{
 						CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-											resultRelInfo, myslot, bistate,
-											nBufferedTuples, bufferedTuples,
+											resultRelInfo, bistate,
+											nBufferedTuples,
+											resultRelInfo->ri_batchInsertSlots,
 											firstBufferedLineNo);
 						nBufferedTuples = 0;
-						bufferedTuplesSize = 0;
-
-						/* free memory occupied by tuples from the batch */
-						MemoryContextReset(batchcontext);
+						bufferedInputSize = 0;
 					}
 				}
 				else
@@ -2980,12 +3001,12 @@ CopyFrom(CopyState cstate)
 					/* OK, store the tuple */
 					if (resultRelInfo->ri_FdwRoutine != NULL)
 					{
-						slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
-																			   resultRelInfo,
-																			   slot,
-																			   NULL);
+						myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
+																				 resultRelInfo,
+																				 myslot,
+																				 NULL);
 
-						if (slot == NULL)	/* "do nothing" */
+						if (myslot == NULL)	/* "do nothing" */
 							continue;	/* next tuple please */
 
 						/*
@@ -2993,27 +3014,26 @@ CopyFrom(CopyState cstate)
 						 * column, so (re-)initialize tts_tableOid before
 						 * evaluating them.
 						 */
-						slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+						myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
 					}
 					else
 					{
-						tuple = ExecFetchSlotHeapTuple(slot, true, NULL);
-						heap_insert(resultRelInfo->ri_RelationDesc, tuple,
-									mycid, hi_options, bistate);
-						ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
-						slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+						/* OK, store the tuple and create index entries for it */
+						table_insert(resultRelInfo->ri_RelationDesc, myslot,
+									 mycid, hi_options, bistate);
 					}
 
+
 					/* And create index entries for it */
 					if (resultRelInfo->ri_NumIndices > 0)
-						recheckIndexes = ExecInsertIndexTuples(slot,
+						recheckIndexes = ExecInsertIndexTuples(myslot,
 															   estate,
 															   false,
 															   NULL,
 															   NIL);
 
 					/* AFTER ROW INSERT Triggers */
-					ExecARInsertTriggers(estate, resultRelInfo, slot,
+					ExecARInsertTriggers(estate, resultRelInfo, myslot,
 										 recheckIndexes, cstate->transition_capture);
 
 					list_free(recheckIndexes);
@@ -3035,26 +3055,26 @@ CopyFrom(CopyState cstate)
 		if (insertMethod == CIM_MULTI_CONDITIONAL)
 		{
 			CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-								prevResultRelInfo, myslot, bistate,
-								nBufferedTuples, bufferedTuples,
+								prevResultRelInfo, bistate,
+								nBufferedTuples,
+								prevResultRelInfo->ri_batchInsertSlots,
 								firstBufferedLineNo);
 		}
 		else
 			CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-								resultRelInfo, myslot, bistate,
-								nBufferedTuples, bufferedTuples,
+								resultRelInfo, bistate,
+								nBufferedTuples,
+								resultRelInfo->ri_batchInsertSlots,
 								firstBufferedLineNo);
 	}
 
 	/* Done, clean up */
 	error_context_stack = errcallback.previous;
 
-	FreeBulkInsertState(bistate);
+	ReleaseBulkInsertStatePin(bistate);
 
 	MemoryContextSwitchTo(oldcontext);
 
-	MemoryContextDelete(batchcontext);
-
 	/*
 	 * In the old protocol, tell pqcomm that we can process normal protocol
 	 * messages again.
@@ -3068,9 +3088,6 @@ CopyFrom(CopyState cstate)
 	/* Handle queued AFTER triggers */
 	AfterTriggerEndQuery(estate);
 
-	pfree(values);
-	pfree(nulls);
-
 	ExecResetTupleTable(estate->es_tupleTable, false);
 
 	/* Allow the FDW to shut down */
@@ -3108,8 +3125,7 @@ CopyFrom(CopyState cstate)
 static void
 CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 					int hi_options, ResultRelInfo *resultRelInfo,
-					TupleTableSlot *myslot, BulkInsertState bistate,
-					int nBufferedTuples, HeapTuple *bufferedTuples,
+					BulkInsertState bistate, int nBufferedTuples, TupleTableSlot **bufferedSlots,
 					uint64 firstBufferedLineNo)
 {
 	MemoryContext oldcontext;
@@ -3129,12 +3145,12 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 	 * before calling it.
 	 */
 	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	heap_multi_insert(resultRelInfo->ri_RelationDesc,
-					  bufferedTuples,
-					  nBufferedTuples,
-					  mycid,
-					  hi_options,
-					  bistate);
+	table_multi_insert(resultRelInfo->ri_RelationDesc,
+					   bufferedSlots,
+					   nBufferedTuples,
+					   mycid,
+					   hi_options,
+					   bistate);
 	MemoryContextSwitchTo(oldcontext);
 
 	/*
@@ -3148,12 +3164,11 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 			List	   *recheckIndexes;
 
 			cstate->cur_lineno = firstBufferedLineNo + i;
-			ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
 			recheckIndexes =
-				ExecInsertIndexTuples(myslot,
-									  estate, false, NULL, NIL);
+				ExecInsertIndexTuples(bufferedSlots[i], estate, false, NULL,
+									  NIL);
 			ExecARInsertTriggers(estate, resultRelInfo,
-								 myslot,
+								 bufferedSlots[i],
 								 recheckIndexes, cstate->transition_capture);
 			list_free(recheckIndexes);
 		}
@@ -3170,13 +3185,15 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 		for (i = 0; i < nBufferedTuples; i++)
 		{
 			cstate->cur_lineno = firstBufferedLineNo + i;
-			ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
 			ExecARInsertTriggers(estate, resultRelInfo,
-								 myslot,
+								 bufferedSlots[i],
 								 NIL, cstate->transition_capture);
 		}
 	}
 
+	for (i = 0; i < nBufferedTuples; i++)
+		ExecClearTuple(bufferedSlots[i]);
+
 	/* reset cur_lineno and line_buf_valid to what they were */
 	cstate->line_buf_valid = line_buf_valid;
 	cstate->cur_lineno = save_cur_lineno;
@@ -4966,11 +4983,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
 	DR_copy    *myState = (DR_copy *) self;
 	CopyState	cstate = myState->cstate;
 
-	/* Make sure the tuple is fully deconstructed */
-	slot_getallattrs(slot);
-
-	/* And send the data */
-	CopyOneRowTo(cstate, slot->tts_values, slot->tts_isnull);
+	/* Send the data */
+	CopyOneRowTo(cstate, slot);
 	myState->processed++;
 
 	return true;
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 4c077755d54..ed0e2de144d 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -36,6 +36,7 @@
 #define HEAP_INSERT_SPECULATIVE 0x0010
 
 typedef struct BulkInsertStateData *BulkInsertState;
+struct TupleTableSlot;
 
 #define MaxLockTupleMode	LockTupleExclusive
 
@@ -143,7 +144,7 @@ extern void ReleaseBulkInsertStatePin(BulkInsertState bistate);
 
 extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 			int options, BulkInsertState bistate);
-extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples,
 				  CommandId cid, int options, BulkInsertState bistate);
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
 			CommandId cid, Snapshot crosscheck, bool wait,
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 37890dc2f5c..2bd92cbd530 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -350,6 +350,9 @@ typedef struct TableAmRoutine
 								 LockTupleMode *lockmode,
 								 bool *update_indexes);
 
+	void		(*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots,
+								 CommandId cid, int options, struct BulkInsertStateData *bistate);
+
 	/* see table_insert() for reference about parameters */
 	TM_Result	(*tuple_lock) (Relation rel,
 							   ItemPointer tid,
@@ -875,6 +878,17 @@ table_update(Relation rel, ItemPointer otid, TupleTableSlot *slot,
 										 lockmode, update_indexes);
 }
 
+/*
+ *	table_multi_insert	- insert multiple tuple into a table
+ */
+static inline void
+table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
+				   CommandId cid, int options, struct BulkInsertStateData *bistate)
+{
+	rel->rd_tableam->multi_insert(rel, slots, nslots,
+								  cid, options, bistate);
+}
+
 /*
  * Lock a tuple in the specified mode.
  *
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 869c303e157..1311b854a99 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -476,6 +476,12 @@ typedef struct ResultRelInfo
 	/* relation descriptor for root partitioned table */
 	Relation	ri_PartitionRoot;
 
+	/*
+	 * When batch inserting into a table, it might be necessary to have one
+	 * slot per input row, up to the largest possible batch size.
+	 */
+	TupleTableSlot **ri_batchInsertSlots;
+
 	/* Additional information specific to partition tuple routing */
 	struct PartitionRoutingInfo *ri_PartitionInfo;
 } ResultRelInfo;
-- 
2.21.0.dirty

Reply via email to