From ced8b4ee585dacbfc7c996b9c6d5b789a94a2cbc Mon Sep 17 00:00:00 2001
From: "dgrowley@gmail.com" <dgrowley@gmail.com>
Date: Tue, 24 Jul 2018 12:30:56 +1200
Subject: [PATCH v3] Allow multi-inserts during COPY into a partitioned table

CopyFrom allows multi-inserts to be used for non-partitioned tables, but
this was disabled for partitioned tables.  The reason for this appeared to
be due to the fact that the tuple may not belong to the same partition as
the previous tuple did.  Not allowing multi-inserts here greatly slowed
down imports into partitioned tables.  These could take twice as long as a
copy to an equivalent non-partitioned table.  It seems wise to do something
about this, so this commit allows the multi-inserts by flushing the so-far
inserted tuples to the partition when the next tuple does not belong to the
same partition, or when the buffer fills.  This improves performance when
the next tuple in the stream commonly belongs to the same partition as the
previous tuple.

In cases where the target partition changes on every tuple using
multi-inserts slightly slows the performance.  To get around this we
track the average size of the batches that have been inserted and
adapively enable or disable multi-inserts based on the size of the
batch.  Some testing was done and the regression only seems to exist
when the average size of the insert batch is close to 1, so let's just
enable multi-inserts when the average size is at least 1.3.  More
performance testing might reveal a better number for, this, but since the
slowdown was only 1-2% it does not seem critical enough to spend too much
time calculating it.  In any case it may depend on other factors rather
than just the size of the batch.

Allowing multi-inserts for partitions required a bit of work around the
per-tuple memory contexts as we must flush the tuples when the next tuple
does not belong the same partition.  In which case there is no good time to
reset the per-tuple context, as we've already built the new tuple by this
time.  In order to work around this we maintain two per-tuple contexts and
just switch between them every time the partition changes and reset the old
one.  This does mean that the first of each batch of tuples is not
allocated in the same memory context as the others, but that does not
matter since we only reset the context once the previous batch has been
inserted.
---
 src/backend/commands/copy.c | 330 ++++++++++++++++++++++++++++++++++----------
 1 file changed, 256 insertions(+), 74 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 3a66cb5025..ec4c651b11 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -47,6 +47,7 @@
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/partcache.h"
 #include "utils/portal.h"
 #include "utils/rel.h"
 #include "utils/rls.h"
@@ -79,6 +80,16 @@ typedef enum EolType
 	EOL_CRNL
 } EolType;
 
+/*
+ * Represents the heap insert method to be used during COPY to.
+ */
+typedef enum CopyInsertMethod
+{
+	CIM_SINGLE,					/* use heap_insert or fdw routine */
+	CIM_MULTI,					/* always use heap_multi_insert */
+	CIM_MULTI_CONDITIONAL		/* use heap_multi_insert only if valid */
+} CopyInsertMethod;
+
 /*
  * This struct contains all the state variables used throughout a COPY
  * operation. For simplicity, we use the same struct for all variants of COPY,
@@ -2305,26 +2316,35 @@ CopyFrom(CopyState cstate)
 	Datum	   *values;
 	bool	   *nulls;
 	ResultRelInfo *resultRelInfo;
-	ResultRelInfo *saved_resultRelInfo = NULL;
+	ResultRelInfo *target_resultRelInfo;
 	EState	   *estate = CreateExecutorState(); /* for ExecConstraints() */
 	ModifyTableState *mtstate;
 	ExprContext *econtext;
 	TupleTableSlot *myslot;
 	MemoryContext oldcontext = CurrentMemoryContext;
 
+	PartitionTupleRouting *proute = NULL;
+	ExprContext *secondaryExprContext = NULL;
 	ErrorContextCallback errcallback;
 	CommandId	mycid = GetCurrentCommandId(true);
 	int			hi_options = 0; /* start with default heap_insert options */
 	BulkInsertState bistate;
+	CopyInsertMethod insertMethod;
 	uint64		processed = 0;
-	bool		useHeapMultiInsert;
 	int			nBufferedTuples = 0;
 	int			prev_leaf_part_index = -1;
+	bool		has_before_insert_row_trig;
+	bool		has_instead_insert_row_trig;
+	bool		leafpart_use_multi_insert = false;
 
 #define MAX_BUFFERED_TUPLES 1000
+#define RECHECK_MULTI_INSERT_THRESHOLD 1000
 	HeapTuple  *bufferedTuples = NULL;	/* initialize to silence warning */
 	Size		bufferedTuplesSize = 0;
 	uint64		firstBufferedLineNo = 0;
+	uint64		lastPartitionSampleLineNo = 0;
+	uint64		nPartitionChanges = 0;
+	double		avgTuplesPerPartChange = 0;
 
 	Assert(cstate->rel);
 
@@ -2455,6 +2475,7 @@ CopyFrom(CopyState cstate)
 					  1,		/* dummy rangetable index */
 					  NULL,
 					  0);
+	target_resultRelInfo = resultRelInfo;
 
 	/* Verify the named relation is a valid target for INSERT */
 	CheckValidResultRel(resultRelInfo, CMD_INSERT);
@@ -2504,8 +2525,6 @@ CopyFrom(CopyState cstate)
 	 */
 	if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 	{
-		PartitionTupleRouting *proute;
-
 		proute = cstate->partition_tuple_routing =
 			ExecSetupPartitionTupleRouting(NULL, cstate->rel);
 
@@ -2522,28 +2541,92 @@ CopyFrom(CopyState cstate)
 	/*
 	 * It's more efficient to prepare a bunch of tuples for insertion, and
 	 * insert them in one heap_multi_insert() call, than call heap_insert()
-	 * separately for every tuple. However, we can't do that if there are
-	 * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
-	 * expressions. Such triggers or expressions might query the table we're
-	 * inserting to, and act differently if the tuples that have already been
-	 * processed and prepared for insertion are not there.  We also can't do
-	 * it if the table is foreign or partitioned.
+	 * separately for every tuple. However, there are a number of reasons why
+	 * we might not be able to do this.  These are explained below.
 	 */
-	if ((resultRelInfo->ri_TrigDesc != NULL &&
-		 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
-		  resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
-		resultRelInfo->ri_FdwRoutine != NULL ||
-		cstate->partition_tuple_routing != NULL ||
-		cstate->volatile_defexprs)
+	if (resultRelInfo->ri_TrigDesc != NULL &&
+		(resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
+		 resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
+	{
+		/*
+		 * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
+		 * triggers on the table. Such triggers might query the table we're
+		 * inserting into and act differently if the tuples that have already
+		 * been processed and prepared for insertion are not there.
+		 */
+		insertMethod = CIM_SINGLE;
+	}
+	else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
+			 resultRelInfo->ri_TrigDesc->trig_insert_new_table)
+	{
+		/*
+		 * For partitioned tables we can't support multi-inserts when there
+		 * are any statement level insert triggers.  (It might be possible to
+		 * allow partitioned tables with such triggers in the future, but for
+		 * now CopyFromInsertBatch expects that any before row insert and
+		 * statement level insert triggers are on the same relation.
+		 */
+		insertMethod = CIM_SINGLE;
+	}
+	else if (resultRelInfo->ri_FdwRoutine != NULL ||
+			 cstate->volatile_defexprs)
 	{
-		useHeapMultiInsert = false;
+		/*
+		 * Can't support multi-inserts to foreign tables or if there are any
+		 * volatile default expressions in the table.  Similarly to the
+		 * trigger case above, such expressions may query the table we're
+		 * inserting into.
+		 *
+		 * Note: It does not matter if any partitions have any volatile
+		 * default expressions as we use the defaults from the target of the
+		 * COPY command.
+		 */
+		insertMethod = CIM_SINGLE;
 	}
 	else
 	{
-		useHeapMultiInsert = true;
+		/*
+		 * For partitioned tables, we may still be able to perform bulk
+		 * inserts for sets of consecutive tuples which belong to the same
+		 * partition.  However, the possibility of this depends on which types
+		 * of triggers exist on the partition.  We must disable bulk inserts
+		 * if the partition is a foreign table or it has any before row insert
+		 * or insert instead triggers (same as we checked above for the parent
+		 * table).  Since the partition's resultRelInfos are initialized only
+		 * when we actually need to insert the first tuple into them, we must
+		 * have the intermediate insert method of CIM_MULTI_CONDITIONAL to
+		 * flag that we must later determine if we can use bulk-inserts for
+		 * the partition being inserted into.
+		 *
+		 * Normally, when performing bulk inserts we just flush the insert
+		 * buffer whenever it becomes full, but for the partitioned table
+		 * case, we flush it whenever the current tuple does not belong to the
+		 * same partition as the previous tuple, and since we flush the
+		 * previous partition's buffer once the new tuple has already been
+		 * built, we're unable to reset the estate since we'd free the memory
+		 * in which the new tuple is stored.  To work around this we maintain
+		 * a secondary expression context and alternate between these when the
+		 * partition changes.  This does mean we do store the first new tuple
+		 * in a different context than subsequent tuples, but that does not
+		 * matter, providing we don't free anything while it's still needed.
+		 */
+		if (proute)
+		{
+			insertMethod = CIM_MULTI_CONDITIONAL;
+			secondaryExprContext = CreateExprContext(estate);
+		}
+		else
+			insertMethod = CIM_MULTI;
+
 		bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
 	}
 
+	has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
+								  resultRelInfo->ri_TrigDesc->trig_insert_before_row);
+
+	has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
+								   resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
+
 	/*
 	 * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
 	 * should do this for COPY, since it's not really an "INSERT" statement as
@@ -2598,7 +2681,7 @@ CopyFrom(CopyState cstate)
 		 * Constraints might reference the tableoid column, so initialize
 		 * t_tableOid before evaluating them.
 		 */
-		tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+		tuple->t_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
 
 		/* Triggers and stuff need to be invoked in query context. */
 		MemoryContextSwitchTo(oldcontext);
@@ -2608,10 +2691,9 @@ CopyFrom(CopyState cstate)
 		ExecStoreTuple(tuple, slot, InvalidBuffer, false);
 
 		/* Determine the partition to heap_insert the tuple into */
-		if (cstate->partition_tuple_routing)
+		if (proute)
 		{
 			int			leaf_part_index;
-			PartitionTupleRouting *proute = cstate->partition_tuple_routing;
 
 			/*
 			 * Away we go ... If we end up not finding a partition after all,
@@ -2621,39 +2703,131 @@ CopyFrom(CopyState cstate)
 			 * will get us the ResultRelInfo and TupleConversionMap for the
 			 * partition, respectively.
 			 */
-			leaf_part_index = ExecFindPartition(resultRelInfo,
+			leaf_part_index = ExecFindPartition(target_resultRelInfo,
 												proute->partition_dispatch_info,
 												slot,
 												estate);
 			Assert(leaf_part_index >= 0 &&
 				   leaf_part_index < proute->num_partitions);
 
-			/*
-			 * If this tuple is mapped to a partition that is not same as the
-			 * previous one, we'd better make the bulk insert mechanism gets a
-			 * new buffer.
-			 */
 			if (prev_leaf_part_index != leaf_part_index)
 			{
+				/* Check if we can multi-insert into this partition */
+				if (insertMethod == CIM_MULTI_CONDITIONAL)
+				{
+					/*
+					 * When performing bulk-inserts into partitioned tables we
+					 * must insert the tuples seen so far to the heap whenever
+					 * the partition changes.
+					 */
+					if (nBufferedTuples > 0)
+					{
+						ExprContext *swapcontext;
+						ResultRelInfo *presultRelInfo;
+
+						presultRelInfo = proute->partitions[prev_leaf_part_index];
+
+						CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+											presultRelInfo, myslot, bistate,
+											nBufferedTuples, bufferedTuples,
+											firstBufferedLineNo);
+						nBufferedTuples = 0;
+						bufferedTuplesSize = 0;
+
+						Assert(secondaryExprContext);
+
+						/*
+						 * Normally we reset the per-tuple context whenever
+						 * the bufferedTuples array is empty at the beginning
+						 * of the loop, however, it is possible since we flush
+						 * the buffer here that the buffer is never empty at
+						 * the start of the loop.  To prevent the per-tuple
+						 * context from never being reset we maintain a second
+						 * context and alternate between them when the
+						 * partition changes.  We can now reset
+						 * secondaryExprContext as this is no longer needed,
+						 * since we just flushed any tuples stored in it.  We
+						 * also now switch over to the other context.  This
+						 * does mean that the first tuple in the buffer won't
+						 * be in the same context as the others, but that does
+						 * not matter since we only reset it after the flush.
+						 */
+						ReScanExprContext(secondaryExprContext);
+
+						swapcontext = secondaryExprContext;
+						secondaryExprContext = estate->es_per_tuple_exprcontext;
+						estate->es_per_tuple_exprcontext = swapcontext;
+					}
+
+					nPartitionChanges++;
+
+					/*
+					 * Here we adaptively enable multi-inserts based on the
+					 * average number of tuples from recent multi-insert
+					 * batches.  We recalculate the average every
+					 * RECHECK_MULTI_INSERT_THRESHOLD tuples instead of taking
+					 * the average over the whole copy.  This allows us to
+					 * enable multi-inserts when we get periods in the copy
+					 * stream that have tuples commonly belonging to the same
+					 * partition, and disable when the partition is changing
+					 * too often.
+					 */
+					if (unlikely(lastPartitionSampleLineNo <= (cstate->cur_lineno -
+															   RECHECK_MULTI_INSERT_THRESHOLD)))
+					{
+						avgTuplesPerPartChange =
+							(cstate->cur_lineno - lastPartitionSampleLineNo) /
+							(double) nPartitionChanges;
+
+						lastPartitionSampleLineNo = cstate->cur_lineno;
+						nPartitionChanges = 0;
+					}
+
+					/*
+					 * Tests have shown that using multi-inserts when the
+					 * partition changes on every tuple slightly decreases the
+					 * performance, however, there are benefits even when only
+					 * some batches have just 2 tuples, so let's enable
+					 * multi-inserts even when the average is quite low.
+					 */
+					leafpart_use_multi_insert = avgTuplesPerPartChange >= 1.3 &&
+						!has_before_insert_row_trig &&
+						!has_instead_insert_row_trig &&
+						resultRelInfo->ri_FdwRoutine == NULL;
+				}
+				else
+					leafpart_use_multi_insert = false;
+
+				/*
+				 * Overwrite resultRelInfo with the corresponding partition's
+				 * one.
+				 */
+				resultRelInfo = proute->partitions[leaf_part_index];
+				if (unlikely(resultRelInfo == NULL))
+				{
+					resultRelInfo = ExecInitPartitionInfo(mtstate,
+														  target_resultRelInfo,
+														  proute, estate,
+														  leaf_part_index);
+					proute->partitions[leaf_part_index] = resultRelInfo;
+					Assert(resultRelInfo != NULL);
+				}
+
+				/* Determine which triggers exist on this partition */
+				has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
+											  resultRelInfo->ri_TrigDesc->trig_insert_before_row);
+
+				has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
+											   resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
+
+				/*
+				 * We'd better make the bulk insert mechanism gets a new
+				 * buffer when the partition being inserted into changes.
+				 */
 				ReleaseBulkInsertStatePin(bistate);
 				prev_leaf_part_index = leaf_part_index;
 			}
 
-			/*
-			 * Save the old ResultRelInfo and switch to the one corresponding
-			 * to the selected partition.
-			 */
-			saved_resultRelInfo = resultRelInfo;
-			resultRelInfo = proute->partitions[leaf_part_index];
-			if (resultRelInfo == NULL)
-			{
-				resultRelInfo = ExecInitPartitionInfo(mtstate,
-													  saved_resultRelInfo,
-													  proute, estate,
-													  leaf_part_index);
-				Assert(resultRelInfo != NULL);
-			}
-
 			/*
 			 * For ExecInsertIndexTuples() to work on the partition's indexes
 			 */
@@ -2665,8 +2839,7 @@ CopyFrom(CopyState cstate)
 			 */
 			if (cstate->transition_capture != NULL)
 			{
-				if (resultRelInfo->ri_TrigDesc &&
-					resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+				if (has_before_insert_row_trig)
 				{
 					/*
 					 * If there are any BEFORE triggers on the partition,
@@ -2675,7 +2848,7 @@ CopyFrom(CopyState cstate)
 					 */
 					cstate->transition_capture->tcs_original_insert_tuple = NULL;
 					cstate->transition_capture->tcs_map =
-						TupConvMapForLeaf(proute, saved_resultRelInfo,
+						TupConvMapForLeaf(proute, target_resultRelInfo,
 										  leaf_part_index);
 				}
 				else
@@ -2704,8 +2877,7 @@ CopyFrom(CopyState cstate)
 		skip_tuple = false;
 
 		/* BEFORE ROW INSERT Triggers */
-		if (resultRelInfo->ri_TrigDesc &&
-			resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+		if (has_before_insert_row_trig)
 		{
 			slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
 
@@ -2717,8 +2889,7 @@ CopyFrom(CopyState cstate)
 
 		if (!skip_tuple)
 		{
-			if (resultRelInfo->ri_TrigDesc &&
-				resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
+			if (has_instead_insert_row_trig)
 			{
 				/* Pass the data to the INSTEAD ROW INSERT trigger */
 				ExecIRInsertTriggers(estate, resultRelInfo, slot);
@@ -2740,12 +2911,15 @@ CopyFrom(CopyState cstate)
 				 * partition.
 				 */
 				if (resultRelInfo->ri_PartitionCheck &&
-					(saved_resultRelInfo == NULL ||
-					 (resultRelInfo->ri_TrigDesc &&
-					  resultRelInfo->ri_TrigDesc->trig_insert_before_row)))
+					(proute == NULL || has_before_insert_row_trig))
 					ExecPartitionCheck(resultRelInfo, slot, estate, true);
 
-				if (useHeapMultiInsert)
+				/*
+				 * Perform multi-inserts when enabled, or when loading a
+				 * partitioned table that can support multi-inserts as
+				 * determined above.
+				 */
+				if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
 				{
 					/* Add this tuple to the tuple buffer */
 					if (nBufferedTuples == 0)
@@ -2783,7 +2957,7 @@ CopyFrom(CopyState cstate)
 																			   NULL);
 
 						if (slot == NULL)	/* "do nothing" */
-							goto next_tuple;
+							continue;	/* next tuple please */
 
 						/* FDW might have changed tuple */
 						tuple = ExecMaterializeSlot(slot);
@@ -2823,22 +2997,28 @@ CopyFrom(CopyState cstate)
 			 */
 			processed++;
 		}
-
-next_tuple:
-		/* Restore the saved ResultRelInfo */
-		if (saved_resultRelInfo)
-		{
-			resultRelInfo = saved_resultRelInfo;
-			estate->es_result_relation_info = resultRelInfo;
-		}
 	}
 
 	/* Flush any remaining buffered tuples */
 	if (nBufferedTuples > 0)
-		CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-							resultRelInfo, myslot, bistate,
-							nBufferedTuples, bufferedTuples,
-							firstBufferedLineNo);
+	{
+		if (insertMethod == CIM_MULTI_CONDITIONAL)
+		{
+			ResultRelInfo *presultRelInfo;
+
+			presultRelInfo = proute->partitions[prev_leaf_part_index];
+
+			CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+								presultRelInfo, myslot, bistate,
+								nBufferedTuples, bufferedTuples,
+								firstBufferedLineNo);
+		}
+		else
+			CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+								resultRelInfo, myslot, bistate,
+								nBufferedTuples, bufferedTuples,
+								firstBufferedLineNo);
+	}
 
 	/* Done, clean up */
 	error_context_stack = errcallback.previous;
@@ -2855,7 +3035,7 @@ next_tuple:
 		pq_endmsgread();
 
 	/* Execute AFTER STATEMENT insertion triggers */
-	ExecASInsertTriggers(estate, resultRelInfo, cstate->transition_capture);
+	ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
 
 	/* Handle queued AFTER triggers */
 	AfterTriggerEndQuery(estate);
@@ -2866,12 +3046,12 @@ next_tuple:
 	ExecResetTupleTable(estate->es_tupleTable, false);
 
 	/* Allow the FDW to shut down */
-	if (resultRelInfo->ri_FdwRoutine != NULL &&
-		resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
-		resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
-													   resultRelInfo);
+	if (target_resultRelInfo->ri_FdwRoutine != NULL &&
+		target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
+		target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
+															  target_resultRelInfo);
 
-	ExecCloseIndices(resultRelInfo);
+	ExecCloseIndices(target_resultRelInfo);
 
 	/* Close all the partitioned tables, leaf partitions, and their indices */
 	if (cstate->partition_tuple_routing)
@@ -2907,6 +3087,7 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 	MemoryContext oldcontext;
 	int			i;
 	uint64		save_cur_lineno;
+	bool		line_buf_valid = cstate->line_buf_valid;
 
 	/*
 	 * Print error context information correctly, if one of the operations
@@ -2920,7 +3101,7 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 	 * before calling it.
 	 */
 	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	heap_multi_insert(cstate->rel,
+	heap_multi_insert(resultRelInfo->ri_RelationDesc,
 					  bufferedTuples,
 					  nBufferedTuples,
 					  mycid,
@@ -2967,7 +3148,8 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 		}
 	}
 
-	/* reset cur_lineno to where we were */
+	/* reset cur_lineno and line_buf_valid to what they were */
+	cstate->line_buf_valid = line_buf_valid;
 	cstate->cur_lineno = save_cur_lineno;
 }
 
-- 
2.17.1

