From 30a73fc6fc3f75748351a63d35f19202a47c2756 Mon Sep 17 00:00:00 2001
From: "dgrowley@gmail.com" <dgrowley@gmail.com>
Date: Thu, 21 Jun 2018 20:59:35 +1200
Subject: [PATCH v2] Allow multi-inserts during COPY into a partitioned table

CopyFrom allows multi-inserts to be used for non-partitioned tables, but
this was disabled for partitioned tables.  The reason for this appeared to
be due to the fact that the tuple may not belong to the same partition as
the previous tuple did.  Not allowing multi-inserts here greatly slowed
down imports into partitioned tables.  These could take twice as long as a
copy to an equivalent non-partitioned table.  It seems wise to do something
about this, so this commit allows the multi-inserts by flushing the so-far
inserted tuples to the partition when the next tuple does not belong to the
same partition, or when the buffer fills.  This improves performance when
the next tuple in the stream commonly belongs to the same partition as the
previous tuple.

In cases where the target partition changes on every tuple using
multi-inserts slightly slows the performance.  To get around this we
track the average size of the batches that have been inserted and
adapively enable or disable multi-inserts based on the size of the
batch.  Some testing was done and the regression only seems to exist
when the average size of the insert batch is close to 1, so let's just
enable multi-inserts when the average size is at least 1.3.  More
performance testing might reveal a better number for, this, but since the
slowdown was only 1-2% it does not seem critical enough to spend too much
time calculating it.  In any case it may depend on other factors rather
than just the size of the batch.

Allowing multi-inserts for partitions required a bit of work around the
per-tuple memory contexts as we must flush the tuples when the next tuple
does not belong the same partition.  In which case there is no good time to
reset the per-tuple context, as we've already built the new tuple by this
time.  In order to work around this we maintain two per-tuple contexts and
just switch between them every time the partition changes and reset the old
one.  This does mean that the first of each batch of tuples is not
allocated in the same memory context as the others, but that does not
matter since we only reset the context once the previous batch has been
inserted.
---
 src/backend/commands/copy.c | 334 ++++++++++++++++++++++++++++++++++----------
 1 file changed, 260 insertions(+), 74 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 3a66cb5025..6dda559300 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -47,6 +47,7 @@
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/partcache.h"
 #include "utils/portal.h"
 #include "utils/rel.h"
 #include "utils/rls.h"
@@ -79,6 +80,16 @@ typedef enum EolType
 	EOL_CRNL
 } EolType;
 
+/*
+ * Represents the heap insert method to be used during COPY to.
+ */
+typedef enum CopyInsertMethod
+{
+	CIM_SINGLE,					/* use heap_insert or fdw routine */
+	CIM_MULTI,					/* always use heap_multi_insert */
+	CIM_MULTI_CONDITIONAL		/* use heap_multi_insert only if valid */
+} CopyInsertMethod;
+
 /*
  * This struct contains all the state variables used throughout a COPY
  * operation. For simplicity, we use the same struct for all variants of COPY,
@@ -2305,26 +2316,35 @@ CopyFrom(CopyState cstate)
 	Datum	   *values;
 	bool	   *nulls;
 	ResultRelInfo *resultRelInfo;
-	ResultRelInfo *saved_resultRelInfo = NULL;
+	ResultRelInfo *target_resultRelInfo;
 	EState	   *estate = CreateExecutorState(); /* for ExecConstraints() */
 	ModifyTableState *mtstate;
 	ExprContext *econtext;
 	TupleTableSlot *myslot;
 	MemoryContext oldcontext = CurrentMemoryContext;
 
+	PartitionTupleRouting *proute = NULL;
+	ExprContext *secondaryExprContext = NULL;
 	ErrorContextCallback errcallback;
 	CommandId	mycid = GetCurrentCommandId(true);
 	int			hi_options = 0; /* start with default heap_insert options */
 	BulkInsertState bistate;
+	CopyInsertMethod insertMethod;
 	uint64		processed = 0;
-	bool		useHeapMultiInsert;
 	int			nBufferedTuples = 0;
 	int			prev_leaf_part_index = -1;
+	bool		has_before_insert_row_trig;
+	bool		has_instead_insert_row_trig;
+	bool		leafpart_use_multi_insert = false;
 
 #define MAX_BUFFERED_TUPLES 1000
+#define RECHECK_MULTI_INSERT_THRESHOLD 1000
 	HeapTuple  *bufferedTuples = NULL;	/* initialize to silence warning */
 	Size		bufferedTuplesSize = 0;
 	uint64		firstBufferedLineNo = 0;
+	uint64		lastPartitionSampleLineNo = 0;
+	uint64		nPartitionChanges = 0;
+	double		avgTuplesPerPartChange = 0;
 
 	Assert(cstate->rel);
 
@@ -2455,6 +2475,7 @@ CopyFrom(CopyState cstate)
 					  1,		/* dummy rangetable index */
 					  NULL,
 					  0);
+	target_resultRelInfo = resultRelInfo;
 
 	/* Verify the named relation is a valid target for INSERT */
 	CheckValidResultRel(resultRelInfo, CMD_INSERT);
@@ -2504,8 +2525,6 @@ CopyFrom(CopyState cstate)
 	 */
 	if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 	{
-		PartitionTupleRouting *proute;
-
 		proute = cstate->partition_tuple_routing =
 			ExecSetupPartitionTupleRouting(NULL, cstate->rel);
 
@@ -2522,28 +2541,92 @@ CopyFrom(CopyState cstate)
 	/*
 	 * It's more efficient to prepare a bunch of tuples for insertion, and
 	 * insert them in one heap_multi_insert() call, than call heap_insert()
-	 * separately for every tuple. However, we can't do that if there are
-	 * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
-	 * expressions. Such triggers or expressions might query the table we're
-	 * inserting to, and act differently if the tuples that have already been
-	 * processed and prepared for insertion are not there.  We also can't do
-	 * it if the table is foreign or partitioned.
+	 * separately for every tuple. However, there are a number of reasons
+	 * why we might not be able to do this.  These are explained below.
 	 */
-	if ((resultRelInfo->ri_TrigDesc != NULL &&
+	if (resultRelInfo->ri_TrigDesc != NULL &&
 		 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
-		  resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
-		resultRelInfo->ri_FdwRoutine != NULL ||
-		cstate->partition_tuple_routing != NULL ||
-		cstate->volatile_defexprs)
+		  resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
 	{
-		useHeapMultiInsert = false;
+		/*
+		 * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
+		 * triggers on the table. Such triggers might query the table we're
+		 * inserting into and act differently if the tuples that have already
+		 * been processed any prepared for insertion are not there.
+		 */
+		insertMethod = CIM_SINGLE;
+	}
+	else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
+		 resultRelInfo->ri_TrigDesc->trig_insert_new_table)
+	{
+		/*
+		 * For partitioned tables we can't support multi-inserts when there
+		 * are any statement level insert triggers.  (It might be possible to
+		 * allow partitioned tables with such triggers in the future, but for
+		 * now CopyFromInsertBatch expects that any BR insert and statement
+		 * level insert triggers are on the same relation.
+		 */
+		insertMethod = CIM_SINGLE;
+	}
+	else if (resultRelInfo->ri_FdwRoutine != NULL ||
+			 cstate->volatile_defexprs)
+	{
+		/*
+		 * Can't support multi-inserts to foreign tables or if there are
+		 * any volatile default expressions in the table.  Similarly to
+		 * the trigger case above, such expressions may query the table
+		 * we're inserting into.
+		 *
+		 * Note: It does not matter if any partitions have any volatile
+		 * default expressions as we use the defaults from the target of the
+		 * COPY command.
+		 */
+		insertMethod = CIM_SINGLE;
 	}
 	else
 	{
-		useHeapMultiInsert = true;
+		/*
+		 * For partitioned tables, we may still be able to perform bulk
+		 * inserts for sets of consecutive tuples which belong to the same
+		 * partition.  However, the possibility of this depends on which
+		 * types of triggers exist on the partition.  We must disable bulk
+		 * inserts if the partition is a foreign table or it has any BR insert
+		 * or insert instead triggers (same as we checked above for the parent
+		 * table).  Since the partition's resultRelInfos are initialized only
+		 * when we actually need to insert the first tuple into them, we must
+		 * have the intermediate insert method of CIM_MULTI_CONDITIONAL to
+		 * flag that we must later determine if we can use bulk-inserts for
+		 * the partition being inserted into.
+		 *
+		 * Normally, when performing bulk inserts we just flush the insert
+		 * buffer whenever it becomes full, but for the partitioned table
+		 * case, we flush it whenever the current tuple does not belong to
+		 * the same partition as the previous tuple, and since we flush the
+		 * previous partition's buffer once the new tuple has already been
+		 * built, we're unable to reset the estate since we'd free the memory
+		 * in which the new tuple is stored.  To work around this we maintain
+		 * a secondary expression context and alternate between these when the
+		 * partition changes.  This does mean we do store the first new tuple
+		 * in a different context than subsequent tuples, but that does not
+		 * matter, providing we don't free anything while it's still needed.
+		 */
+		if (proute)
+		{
+			insertMethod = CIM_MULTI_CONDITIONAL;
+			secondaryExprContext = CreateExprContext(estate);
+		}
+		else
+			insertMethod = CIM_MULTI;
+
 		bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
 	}
 
+	has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
+		resultRelInfo->ri_TrigDesc->trig_insert_before_row);
+
+	has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
+		resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
+
 	/*
 	 * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
 	 * should do this for COPY, since it's not really an "INSERT" statement as
@@ -2598,7 +2681,7 @@ CopyFrom(CopyState cstate)
 		 * Constraints might reference the tableoid column, so initialize
 		 * t_tableOid before evaluating them.
 		 */
-		tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+		tuple->t_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
 
 		/* Triggers and stuff need to be invoked in query context. */
 		MemoryContextSwitchTo(oldcontext);
@@ -2608,11 +2691,9 @@ CopyFrom(CopyState cstate)
 		ExecStoreTuple(tuple, slot, InvalidBuffer, false);
 
 		/* Determine the partition to heap_insert the tuple into */
-		if (cstate->partition_tuple_routing)
+		if (proute)
 		{
 			int			leaf_part_index;
-			PartitionTupleRouting *proute = cstate->partition_tuple_routing;
-
 			/*
 			 * Away we go ... If we end up not finding a partition after all,
 			 * ExecFindPartition() does not return and errors out instead.
@@ -2621,38 +2702,135 @@ CopyFrom(CopyState cstate)
 			 * will get us the ResultRelInfo and TupleConversionMap for the
 			 * partition, respectively.
 			 */
-			leaf_part_index = ExecFindPartition(resultRelInfo,
+			leaf_part_index = ExecFindPartition(target_resultRelInfo,
 												proute->partition_dispatch_info,
 												slot,
 												estate);
 			Assert(leaf_part_index >= 0 &&
 				   leaf_part_index < proute->num_partitions);
 
-			/*
-			 * If this tuple is mapped to a partition that is not same as the
-			 * previous one, we'd better make the bulk insert mechanism gets a
-			 * new buffer.
-			 */
 			if (prev_leaf_part_index != leaf_part_index)
 			{
+				/*
+				 * When performing bulk-inserts into partitioned tables we
+				 * must insert the tuples seen so far to the heap whenever the
+				 * partition changes.  This might seem wasteful in cases where
+				 * the partition changes on each tuple, but in cases where
+				 * we're bulk loading into a single partition or the data
+				 * being loaded is ordered in partition order then performance
+				 * gains can easily be seen.
+				 */
+				if (nBufferedTuples > 0)
+				{
+					ExprContext *swapcontext;
+					ResultRelInfo *presultRelInfo;
+
+					presultRelInfo = proute->partitions[prev_leaf_part_index];
+
+					CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+										presultRelInfo, myslot, bistate,
+										nBufferedTuples, bufferedTuples,
+										firstBufferedLineNo);
+					nBufferedTuples = 0;
+					bufferedTuplesSize = 0;
+
+					Assert(secondaryExprContext);
+
+					/*
+					 * Normally we reset the per-tuple context whenever the
+					 * bufferedTuples array is empty at the beginning of the
+					 * loop, however, it is possible since we flush the buffer
+					 * here that the buffer is never empty at the start of the
+					 * loop.  To prevent the per-tuple context from never
+					 * being reset we maintain a second context and alternate
+					 * between them when the partition changes.  We can now
+					 * reset secondaryExprContext as this is no longer needed,
+					 * since we just flushed any tuples stored in it.  We also
+					 * now switch over to the other context.  This does mean
+					 * that the first tuple in the buffer won't be in the same
+					 * context as the others, but that does not matter since
+					 * we only reset it after the flush.
+					 */
+					ReScanExprContext(secondaryExprContext);
+
+					swapcontext = secondaryExprContext;
+					secondaryExprContext = estate->es_per_tuple_exprcontext;
+					estate->es_per_tuple_exprcontext = swapcontext;
+				}
+
+				/*
+				 * Overwrite resultRelInfo with the corresponding partition's
+				 * one.
+				 */
+				resultRelInfo = proute->partitions[leaf_part_index];
+				if (resultRelInfo == NULL)
+				{
+					resultRelInfo = ExecInitPartitionInfo(mtstate,
+														  target_resultRelInfo,
+														  proute, estate,
+														  leaf_part_index);
+					proute->partitions[leaf_part_index] = resultRelInfo;
+					Assert(resultRelInfo != NULL);
+				}
+
+				/* Determine which triggers exist on this partition */
+				has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
+					resultRelInfo->ri_TrigDesc->trig_insert_before_row);
+
+				has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
+					resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
+
+				/* Check if we can multi-insert into this partition */
+				if (insertMethod == CIM_MULTI_CONDITIONAL)
+				{
+					nPartitionChanges++;
+
+					/*
+					 * Here we adaptively enable multi-inserts based on the
+					 * average number of tuples per recent multi-insert batch.
+					 * We recalculate the average every
+					 * RECHECK_MULTI_INSERT_THRESHOLD instead of taking the
+					 * average over the whole copy.  This allows us to enable
+					 * multi-inserts when we get periods in the copy stream
+					 * that have tuples commonly belonging to the same
+					 * partition, and disable when the partition is changing
+					 * too often.
+					 */
+					if (lastPartitionSampleLineNo <= (cstate->cur_lineno -
+											RECHECK_MULTI_INSERT_THRESHOLD))
+					{
+						avgTuplesPerPartChange =
+							(cstate->cur_lineno - lastPartitionSampleLineNo) /
+							(double) nPartitionChanges;
+
+						lastPartitionSampleLineNo = cstate->cur_lineno;
+						nPartitionChanges = 0;
+					}
+
+					/*
+					 * Tests have shown that using multi-inserts when the
+					 * partition changes on every tuple slightly decreases
+					 * the performance, however, there are benefits even when
+					 * only some batches have just 2 tuples, so let's enable
+					 * multi-inserts even when the average is quite low.
+					 */
+					leafpart_use_multi_insert = !has_before_insert_row_trig &&
+											!has_instead_insert_row_trig &&
+										resultRelInfo->ri_FdwRoutine == NULL &&
+										avgTuplesPerPartChange >= 1.3;
+				}
+				else
+					leafpart_use_multi_insert = false;
+
+				/*
+				 * We'd better make the bulk insert mechanism gets a new
+				 * buffer when the partition being inserted into changes.
+				 */
 				ReleaseBulkInsertStatePin(bistate);
 				prev_leaf_part_index = leaf_part_index;
 			}
-
-			/*
-			 * Save the old ResultRelInfo and switch to the one corresponding
-			 * to the selected partition.
-			 */
-			saved_resultRelInfo = resultRelInfo;
-			resultRelInfo = proute->partitions[leaf_part_index];
-			if (resultRelInfo == NULL)
-			{
-				resultRelInfo = ExecInitPartitionInfo(mtstate,
-													  saved_resultRelInfo,
-													  proute, estate,
-													  leaf_part_index);
-				Assert(resultRelInfo != NULL);
-			}
+			else
+				resultRelInfo = proute->partitions[leaf_part_index];
 
 			/*
 			 * For ExecInsertIndexTuples() to work on the partition's indexes
@@ -2665,8 +2843,7 @@ CopyFrom(CopyState cstate)
 			 */
 			if (cstate->transition_capture != NULL)
 			{
-				if (resultRelInfo->ri_TrigDesc &&
-					resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+				if (has_before_insert_row_trig)
 				{
 					/*
 					 * If there are any BEFORE triggers on the partition,
@@ -2675,7 +2852,7 @@ CopyFrom(CopyState cstate)
 					 */
 					cstate->transition_capture->tcs_original_insert_tuple = NULL;
 					cstate->transition_capture->tcs_map =
-						TupConvMapForLeaf(proute, saved_resultRelInfo,
+						TupConvMapForLeaf(proute, target_resultRelInfo,
 										  leaf_part_index);
 				}
 				else
@@ -2704,8 +2881,7 @@ CopyFrom(CopyState cstate)
 		skip_tuple = false;
 
 		/* BEFORE ROW INSERT Triggers */
-		if (resultRelInfo->ri_TrigDesc &&
-			resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+		if (has_before_insert_row_trig)
 		{
 			slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
 
@@ -2717,8 +2893,7 @@ CopyFrom(CopyState cstate)
 
 		if (!skip_tuple)
 		{
-			if (resultRelInfo->ri_TrigDesc &&
-				resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
+			if (has_instead_insert_row_trig)
 			{
 				/* Pass the data to the INSTEAD ROW INSERT trigger */
 				ExecIRInsertTriggers(estate, resultRelInfo, slot);
@@ -2740,12 +2915,15 @@ CopyFrom(CopyState cstate)
 				 * partition.
 				 */
 				if (resultRelInfo->ri_PartitionCheck &&
-					(saved_resultRelInfo == NULL ||
-					 (resultRelInfo->ri_TrigDesc &&
-					  resultRelInfo->ri_TrigDesc->trig_insert_before_row)))
+					(proute == NULL || has_before_insert_row_trig))
 					ExecPartitionCheck(resultRelInfo, slot, estate, true);
 
-				if (useHeapMultiInsert)
+				/*
+				 * Perform multi-inserts when enabled, or when loading a
+				 * partitioned table that can support multi-inserts as
+				 * determined above.
+				 */
+				if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
 				{
 					/* Add this tuple to the tuple buffer */
 					if (nBufferedTuples == 0)
@@ -2783,7 +2961,7 @@ CopyFrom(CopyState cstate)
 																			   NULL);
 
 						if (slot == NULL)	/* "do nothing" */
-							goto next_tuple;
+							continue;		/* next tuple please */
 
 						/* FDW might have changed tuple */
 						tuple = ExecMaterializeSlot(slot);
@@ -2823,22 +3001,28 @@ CopyFrom(CopyState cstate)
 			 */
 			processed++;
 		}
-
-next_tuple:
-		/* Restore the saved ResultRelInfo */
-		if (saved_resultRelInfo)
-		{
-			resultRelInfo = saved_resultRelInfo;
-			estate->es_result_relation_info = resultRelInfo;
-		}
 	}
 
 	/* Flush any remaining buffered tuples */
 	if (nBufferedTuples > 0)
-		CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-							resultRelInfo, myslot, bistate,
-							nBufferedTuples, bufferedTuples,
-							firstBufferedLineNo);
+	{
+		if (insertMethod == CIM_MULTI_CONDITIONAL)
+		{
+			ResultRelInfo *presultRelInfo;
+
+			presultRelInfo = proute->partitions[prev_leaf_part_index];
+
+			CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+								presultRelInfo, myslot, bistate,
+								nBufferedTuples, bufferedTuples,
+								firstBufferedLineNo);
+		}
+		else
+			CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+								resultRelInfo, myslot, bistate,
+								nBufferedTuples, bufferedTuples,
+								firstBufferedLineNo);
+	}
 
 	/* Done, clean up */
 	error_context_stack = errcallback.previous;
@@ -2855,7 +3039,7 @@ next_tuple:
 		pq_endmsgread();
 
 	/* Execute AFTER STATEMENT insertion triggers */
-	ExecASInsertTriggers(estate, resultRelInfo, cstate->transition_capture);
+	ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
 
 	/* Handle queued AFTER triggers */
 	AfterTriggerEndQuery(estate);
@@ -2866,12 +3050,12 @@ next_tuple:
 	ExecResetTupleTable(estate->es_tupleTable, false);
 
 	/* Allow the FDW to shut down */
-	if (resultRelInfo->ri_FdwRoutine != NULL &&
-		resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
-		resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
-													   resultRelInfo);
+	if (target_resultRelInfo->ri_FdwRoutine != NULL &&
+		target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
+		target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
+													   target_resultRelInfo);
 
-	ExecCloseIndices(resultRelInfo);
+	ExecCloseIndices(target_resultRelInfo);
 
 	/* Close all the partitioned tables, leaf partitions, and their indices */
 	if (cstate->partition_tuple_routing)
@@ -2907,6 +3091,7 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 	MemoryContext oldcontext;
 	int			i;
 	uint64		save_cur_lineno;
+	bool		line_buf_valid = cstate->line_buf_valid;
 
 	/*
 	 * Print error context information correctly, if one of the operations
@@ -2920,7 +3105,7 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 	 * before calling it.
 	 */
 	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	heap_multi_insert(cstate->rel,
+	heap_multi_insert(resultRelInfo->ri_RelationDesc,
 					  bufferedTuples,
 					  nBufferedTuples,
 					  mycid,
@@ -2967,7 +3152,8 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 		}
 	}
 
-	/* reset cur_lineno to where we were */
+	/* reset cur_lineno and line_buf_valid to what they were */
+	cstate->line_buf_valid = line_buf_valid;
 	cstate->cur_lineno = save_cur_lineno;
 }
 
-- 
2.17.1

