From e3df01b93e1e1301244a06c34d5ba4483d0554b8 Mon Sep 17 00:00:00 2001
From: "dgrowley@gmail.com" <dgrowley@gmail.com>
Date: Thu, 21 Jun 2018 20:59:35 +1200
Subject: [PATCH v1] Allow multi-inserts during COPY into a partitioned table

CopyFrom allows multi-inserts to be used for non-partitioned tables, but
this was disabled for partitioned tables.  The reason for this appeared to
be due to the fact that the tuple may not belong to the same partition as
the previous tuple did.  Not allowing multi-inserts here greatly slowed
down imports into partitioned tables.  These could take twice as long as a
copy to an equivalent non-partitioned table.  It seems wise to do something
about this, so this commit allows the multi-inserts by flushing the so-far
inserted tuples to the partition when the next tuple does not belong to the
same partition, or when the buffer fills.  This improves performance when
the next tuple in the stream commonly belongs to the same partition as the
previous tuple.

This required a bit of work around the per-tuple memory contexts as we must
flush the tuples when the next tuple does not belong the same partition.
In which case there is no good time to reset the per-tuple context, as
we've already built the new tuple by this time.  In order to work around
this we maintain two per-tuple contexts and just switch between them every
time the partition changes and reset the old one.  This does mean that the
first of each batch of tuples is not allocated in the same memory context
as the others, but that does not matter since we only reset the context
once the previous batch has been inserted.
---
 src/backend/commands/copy.c | 225 +++++++++++++++++++++++++++++++++++---------
 1 file changed, 182 insertions(+), 43 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 3a66cb5025..6143a2ad69 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -47,6 +47,7 @@
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/partcache.h"
 #include "utils/portal.h"
 #include "utils/rel.h"
 #include "utils/rls.h"
@@ -79,6 +80,16 @@ typedef enum EolType
 	EOL_CRNL
 } EolType;
 
+/*
+ * Represents the heap insert method to be used during COPY to.
+ */
+typedef enum CopyInsertMethod
+{
+	CIM_SINGLE,					/* use heap_insert or fdw routine */
+	CIM_MULTI,					/* always use heap_multi_insert */
+	CIM_MULTI_PARTITION			/* use heap_multi_insert only if valid */
+} CopyInsertMethod;
+
 /*
  * This struct contains all the state variables used throughout a COPY
  * operation. For simplicity, we use the same struct for all variants of COPY,
@@ -2312,14 +2323,19 @@ CopyFrom(CopyState cstate)
 	TupleTableSlot *myslot;
 	MemoryContext oldcontext = CurrentMemoryContext;
 
+	PartitionTupleRouting *proute = NULL;
+	ExprContext *secondaryExprContext = NULL;
 	ErrorContextCallback errcallback;
 	CommandId	mycid = GetCurrentCommandId(true);
 	int			hi_options = 0; /* start with default heap_insert options */
 	BulkInsertState bistate;
+	CopyInsertMethod insertMethod;
 	uint64		processed = 0;
-	bool		useHeapMultiInsert;
 	int			nBufferedTuples = 0;
 	int			prev_leaf_part_index = -1;
+	bool		has_before_insert_row_trig;
+	bool		has_instead_insert_row_trig;
+	bool		part_can_multi_insert = false;
 
 #define MAX_BUFFERED_TUPLES 1000
 	HeapTuple  *bufferedTuples = NULL;	/* initialize to silence warning */
@@ -2504,8 +2520,6 @@ CopyFrom(CopyState cstate)
 	 */
 	if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 	{
-		PartitionTupleRouting *proute;
-
 		proute = cstate->partition_tuple_routing =
 			ExecSetupPartitionTupleRouting(NULL, cstate->rel);
 
@@ -2527,23 +2541,69 @@ CopyFrom(CopyState cstate)
 	 * expressions. Such triggers or expressions might query the table we're
 	 * inserting to, and act differently if the tuples that have already been
 	 * processed and prepared for insertion are not there.  We also can't do
-	 * it if the table is foreign or partitioned.
+	 * bulk inserts with foreign tables or for partitioned tables with a
+	 * statement level insert trigger.  It might be possible to allow
+	 * partitioned tables with such triggers in the future, but for now
+	 * CopyFromInsertBatch expects that any BR insert and statement level
+	 * insert triggers are on the same relation.
+	 *
+	 * Note: It does not matter if any partitions have any volatile default
+	 * expression as we use the defaults from the target of the COPY command.
 	 */
 	if ((resultRelInfo->ri_TrigDesc != NULL &&
 		 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
 		  resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
+		(proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
+		 resultRelInfo->ri_TrigDesc->trig_insert_new_table) ||
 		resultRelInfo->ri_FdwRoutine != NULL ||
-		cstate->partition_tuple_routing != NULL ||
 		cstate->volatile_defexprs)
 	{
-		useHeapMultiInsert = false;
+		insertMethod = CIM_SINGLE;
 	}
 	else
 	{
-		useHeapMultiInsert = true;
+		/*
+		 * For partitioned tables, we may still be able to perform bulk
+		 * inserts for sets of consecutive tuples which belong to the same
+		 * partition.  However, the possibility of this depends on which
+		 * types of triggers exist on the partition.  We must disable bulk
+		 * inserts if the partition is a foreign table or it has any BR insert
+		 * or insert instead triggers (same as we checked above for the parent
+		 * table).  Since the partition's resultRelInfos are initialized only
+		 * when we actually need to insert the first tuple into them, we must
+		 * have the intermediate insert method of CIM_MULTI_PARTITION to flag
+		 * that we must later determine if we can use bulk-inserts for the
+		 * partition being inserted into.
+		 *
+		 * Normally, when performing bulk inserts we just flush the insert
+		 * buffer whenever it becomes full, but for the partitioned table
+		 * case, we flush it whenever the current tuple does not belong to
+		 * the same partition as the previous tuple, and since we flush the
+		 * previous partitions buffer once the new tuple has already been
+		 * built, we're unable to reset the estate since we'd free the memory
+		 * which the new tuple is stored.  To work around this we maintain a
+		 * secondary expression context and alternate between these when the
+		 * partition changes.  This does mean we do store the first new tuple
+		 * in a different context than subsequent tuples, but that does not
+		 * matter, providing we don't free anything while it's still needed.
+		 */
+		if (proute)
+		{
+			insertMethod = CIM_MULTI_PARTITION;
+			secondaryExprContext = CreateExprContext(estate);
+		}
+		else
+			insertMethod = CIM_MULTI;
+
 		bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
 	}
 
+	has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
+		resultRelInfo->ri_TrigDesc->trig_insert_before_row);
+
+	has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
+		resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
+
 	/*
 	 * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
 	 * should do this for COPY, since it's not really an "INSERT" statement as
@@ -2608,11 +2668,9 @@ CopyFrom(CopyState cstate)
 		ExecStoreTuple(tuple, slot, InvalidBuffer, false);
 
 		/* Determine the partition to heap_insert the tuple into */
-		if (cstate->partition_tuple_routing)
+		if (proute)
 		{
 			int			leaf_part_index;
-			PartitionTupleRouting *proute = cstate->partition_tuple_routing;
-
 			/*
 			 * Away we go ... If we end up not finding a partition after all,
 			 * ExecFindPartition() does not return and errors out instead.
@@ -2628,31 +2686,94 @@ CopyFrom(CopyState cstate)
 			Assert(leaf_part_index >= 0 &&
 				   leaf_part_index < proute->num_partitions);
 
-			/*
-			 * If this tuple is mapped to a partition that is not same as the
-			 * previous one, we'd better make the bulk insert mechanism gets a
-			 * new buffer.
-			 */
+			saved_resultRelInfo = resultRelInfo;
+
 			if (prev_leaf_part_index != leaf_part_index)
 			{
+				/*
+				 * When performing bulk-inserts into partitioned tables we
+				 * must insert the tuples seen so far to the heap whenever the
+				 * partition changes.  This might seem wasteful in cases where
+				 * the partition changes on each tuple, but in cases where
+				 * we're bulk loading into a single partition or the data
+				 * being loaded is ordered in partition order then performance
+				 * gains can easily be seen.
+				 */
+				if (nBufferedTuples > 0)
+				{
+					ExprContext *swapcontext;
+					ResultRelInfo *presultRelInfo;
+
+					presultRelInfo = proute->partitions[prev_leaf_part_index];
+
+					CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+										presultRelInfo, myslot, bistate,
+										nBufferedTuples, bufferedTuples,
+										firstBufferedLineNo);
+					nBufferedTuples = 0;
+					bufferedTuplesSize = 0;
+
+					Assert(secondaryExprContext);
+
+					/*
+					 * Normally we reset the per-tuple context whenever the
+					 * bufferedTuples array is empty at the beginning of the
+					 * loop, however, it is possible since we flush the buffer
+					 * here that the buffer is never empty at the start of the
+					 * loop.  To prevent the per-tuple context from never
+					 * being reset we maintain a second context and alternate
+					 * between them when the partition changes.  We can now
+					 * reset secondaryExprContext as this is no longer needed,
+					 * since we just flushed any tuples stored in it.  We also
+					 * now switch over to the other context.  This does mean
+					 * that the first tuple in the buffer won't be in the same
+					 * context as the others, but that does not matter since
+					 * we only reset it after the flush.
+					 */
+					ReScanExprContext(secondaryExprContext);
+
+					swapcontext = secondaryExprContext;
+					secondaryExprContext = estate->es_per_tuple_exprcontext;
+					estate->es_per_tuple_exprcontext = swapcontext;
+				}
+
+				/*
+				 * Save the old ResultRelInfo and switch to the one
+				 * corresponding to the selected partition.
+				 */
+				resultRelInfo = proute->partitions[leaf_part_index];
+				if (resultRelInfo == NULL)
+				{
+					resultRelInfo = ExecInitPartitionInfo(mtstate,
+														  saved_resultRelInfo,
+														  proute, estate,
+														  leaf_part_index);
+					proute->partitions[leaf_part_index] = resultRelInfo;
+					Assert(resultRelInfo != NULL);
+				}
+
+				/* Determine which triggers exist on this partition */
+				has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
+					resultRelInfo->ri_TrigDesc->trig_insert_before_row);
+
+				has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
+					resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
+
+				/* Check if we can multi-insert into this partition */
+				part_can_multi_insert = insertMethod == CIM_MULTI_PARTITION &&
+					!has_before_insert_row_trig &&
+					!has_instead_insert_row_trig &&
+					!resultRelInfo->ri_FdwRoutine;
+
+				/*
+				 * We'd better make the bulk insert mechanism gets a new
+				 * buffer when the partition being inserted into changes.
+				 */
 				ReleaseBulkInsertStatePin(bistate);
 				prev_leaf_part_index = leaf_part_index;
 			}
-
-			/*
-			 * Save the old ResultRelInfo and switch to the one corresponding
-			 * to the selected partition.
-			 */
-			saved_resultRelInfo = resultRelInfo;
-			resultRelInfo = proute->partitions[leaf_part_index];
-			if (resultRelInfo == NULL)
-			{
-				resultRelInfo = ExecInitPartitionInfo(mtstate,
-													  saved_resultRelInfo,
-													  proute, estate,
-													  leaf_part_index);
-				Assert(resultRelInfo != NULL);
-			}
+			else
+				resultRelInfo = proute->partitions[leaf_part_index];
 
 			/*
 			 * For ExecInsertIndexTuples() to work on the partition's indexes
@@ -2665,8 +2786,7 @@ CopyFrom(CopyState cstate)
 			 */
 			if (cstate->transition_capture != NULL)
 			{
-				if (resultRelInfo->ri_TrigDesc &&
-					resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+				if (has_before_insert_row_trig)
 				{
 					/*
 					 * If there are any BEFORE triggers on the partition,
@@ -2704,8 +2824,7 @@ CopyFrom(CopyState cstate)
 		skip_tuple = false;
 
 		/* BEFORE ROW INSERT Triggers */
-		if (resultRelInfo->ri_TrigDesc &&
-			resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+		if (has_before_insert_row_trig)
 		{
 			slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
 
@@ -2717,8 +2836,7 @@ CopyFrom(CopyState cstate)
 
 		if (!skip_tuple)
 		{
-			if (resultRelInfo->ri_TrigDesc &&
-				resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
+			if (has_instead_insert_row_trig)
 			{
 				/* Pass the data to the INSTEAD ROW INSERT trigger */
 				ExecIRInsertTriggers(estate, resultRelInfo, slot);
@@ -2745,7 +2863,12 @@ CopyFrom(CopyState cstate)
 					  resultRelInfo->ri_TrigDesc->trig_insert_before_row)))
 					ExecPartitionCheck(resultRelInfo, slot, estate, true);
 
-				if (useHeapMultiInsert)
+				/*
+				 * Perform multi-inserts when enabled, or when loading a
+				 * partitioned table that can support multi-inserts as
+				 * determined above.
+				 */
+				if (insertMethod == CIM_MULTI || part_can_multi_insert)
 				{
 					/* Add this tuple to the tuple buffer */
 					if (nBufferedTuples == 0)
@@ -2835,10 +2958,24 @@ next_tuple:
 
 	/* Flush any remaining buffered tuples */
 	if (nBufferedTuples > 0)
-		CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-							resultRelInfo, myslot, bistate,
-							nBufferedTuples, bufferedTuples,
-							firstBufferedLineNo);
+	{
+		if (insertMethod == CIM_MULTI_PARTITION)
+		{
+			ResultRelInfo *presultRelInfo;
+
+			presultRelInfo = proute->partitions[prev_leaf_part_index];
+
+			CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+								presultRelInfo, myslot, bistate,
+								nBufferedTuples, bufferedTuples,
+								firstBufferedLineNo);
+		}
+		else
+			CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+								resultRelInfo, myslot, bistate,
+								nBufferedTuples, bufferedTuples,
+								firstBufferedLineNo);
+	}
 
 	/* Done, clean up */
 	error_context_stack = errcallback.previous;
@@ -2907,6 +3044,7 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 	MemoryContext oldcontext;
 	int			i;
 	uint64		save_cur_lineno;
+	bool		line_buf_valid = cstate->line_buf_valid;
 
 	/*
 	 * Print error context information correctly, if one of the operations
@@ -2920,7 +3058,7 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 	 * before calling it.
 	 */
 	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	heap_multi_insert(cstate->rel,
+	heap_multi_insert(resultRelInfo->ri_RelationDesc,
 					  bufferedTuples,
 					  nBufferedTuples,
 					  mycid,
@@ -2967,7 +3105,8 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 		}
 	}
 
-	/* reset cur_lineno to where we were */
+	/* reset cur_lineno and line_buf_valid to what they were */
+	cstate->line_buf_valid = line_buf_valid;
 	cstate->cur_lineno = save_cur_lineno;
 }
 
-- 
2.16.2.windows.1

