From 59bd7de19762241fa53eed6f64510f022345b14b Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Tue, 8 Dec 2020 13:01:32 +0530
Subject: [PATCH v1] COPY With New Multi and Single Insert Table AM

This patch adds new single and multi insert table access method to
COPY code.
---
 src/backend/commands/copyfrom.c | 483 +++++++++++---------------------
 1 file changed, 163 insertions(+), 320 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 1b14e9a6eb..8376af32f5 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -45,10 +45,10 @@
 #include "utils/snapmgr.h"
 
 /*
- * No more than this many tuples per CopyMultiInsertBuffer
+ * No more than this many tuples per multi insert buffer
  *
  * Caution: Don't make this too big, as we could end up with this many
- * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
+ * multi insert buffer items stored in CopyMultiInsertInfo's
  * multiInsertBuffers list.  Increasing this can cause quadratic growth in
  * memory requirements during copies into partitioned tables with a large
  * number of partitions.
@@ -67,31 +67,11 @@
 /* Stores multi-insert data related to a single relation in CopyFrom. */
 typedef struct CopyMultiInsertBuffer
 {
-	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
-	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
-	BulkInsertState bistate;	/* BulkInsertState for this rel */
-	int			nused;			/* number of 'slots' containing tuples */
-	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
-												 * stream */
+	TableInsertState *istate;
+	/* Line # of tuple in copy stream. */
+	uint64		linenos[MAX_BUFFERED_TUPLES];
 } CopyMultiInsertBuffer;
 
-/*
- * Stores one or many CopyMultiInsertBuffers and details about the size and
- * number of tuples which are stored in them.  This allows multiple buffers to
- * exist at once when COPYing into a partitioned table.
- */
-typedef struct CopyMultiInsertInfo
-{
-	List	   *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
-	int			bufferedTuples; /* number of tuples buffered over all buffers */
-	int			bufferedBytes;	/* number of bytes from all buffered tuples */
-	CopyFromState	cstate;			/* Copy state for this CopyMultiInsertInfo */
-	EState	   *estate;			/* Executor state used for COPY */
-	CommandId	mycid;			/* Command Id used for COPY */
-	int			ti_options;		/* table insert options */
-} CopyMultiInsertInfo;
-
-
 /* non-export function prototypes */
 static char *limit_printout_length(const char *str);
 
@@ -204,227 +184,130 @@ limit_printout_length(const char *str)
 	return res;
 }
 
-/*
- * Allocate memory and initialize a new CopyMultiInsertBuffer for this
- * ResultRelInfo.
- */
-static CopyMultiInsertBuffer *
-CopyMultiInsertBufferInit(ResultRelInfo *rri)
+static void
+InitCopyMultiInsertBufferInfo(List **mirri, ResultRelInfo *rri,
+							  CommandId mycid, int ti_options)
 {
 	CopyMultiInsertBuffer *buffer;
+	TriggerDesc *trigdesc = rri->ri_TrigDesc;
 
-	buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
-	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
-	buffer->resultRelInfo = rri;
-	buffer->bistate = GetBulkInsertState();
-	buffer->nused = 0;
-
-	return buffer;
-}
+	buffer = (CopyMultiInsertBuffer *) palloc0(sizeof(CopyMultiInsertBuffer));
 
-/*
- * Make a new buffer for this ResultRelInfo.
- */
-static inline void
-CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
-							   ResultRelInfo *rri)
-{
-	CopyMultiInsertBuffer *buffer;
+	buffer->istate = table_insert_begin(rri->ri_RelationDesc,
+										mycid,
+										ti_options,
+										true,
+										true,
+										MAX_BUFFERED_TUPLES,
+										MAX_BUFFERED_BYTES);
 
-	buffer = CopyMultiInsertBufferInit(rri);
+	if (rri->ri_NumIndices ||
+		(trigdesc && (trigdesc->trig_insert_after_row ||
+		trigdesc->trig_insert_new_table)))
+		buffer->istate->mistate->clear_slots = false;
 
-	/* Setup back-link so we can easily find this buffer again */
 	rri->ri_CopyMultiInsertBuffer = buffer;
-	/* Record that we're tracking this buffer */
-	miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
-}
-
-/*
- * Initialize an already allocated CopyMultiInsertInfo.
- *
- * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
- * for that table.
- */
-static void
-CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
-						CopyFromState cstate, EState *estate, CommandId mycid,
-						int ti_options)
-{
-	miinfo->multiInsertBuffers = NIL;
-	miinfo->bufferedTuples = 0;
-	miinfo->bufferedBytes = 0;
-	miinfo->cstate = cstate;
-	miinfo->estate = estate;
-	miinfo->mycid = mycid;
-	miinfo->ti_options = ti_options;
 
-	/*
-	 * Only setup the buffer when not dealing with a partitioned table.
-	 * Buffers for partitioned tables will just be setup when we need to send
-	 * tuples their way for the first time.
-	 */
-	if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-		CopyMultiInsertInfoSetupBuffer(miinfo, rri);
+	*mirri = lappend(*mirri, rri);
 }
 
-/*
- * Returns true if the buffers are full
- */
-static inline bool
-CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
-{
-	if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
-		miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
-		return true;
-	return false;
-}
-
-/*
- * Returns true if we have no buffered tuples
- */
-static inline bool
-CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
-{
-	return miinfo->bufferedTuples == 0;
-}
-
-/*
- * Write the tuples stored in 'buffer' out to the table.
- */
-static inline void
-CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
-						   CopyMultiInsertBuffer *buffer)
+static void
+HandleAfterRowEvents(ResultRelInfo *rri, EState *estate,
+					 CopyFromState cstate, int32 cur_slots)
 {
-	MemoryContext oldcontext;
-	int			i;
-	uint64		save_cur_lineno;
-	CopyFromState	cstate = miinfo->cstate;
-	EState	   *estate = miinfo->estate;
-	CommandId	mycid = miinfo->mycid;
-	int			ti_options = miinfo->ti_options;
+	int i;
+	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+	TableInsertState *istate = buffer->istate;
+	uint64		save_cur_lineno = cstate->cur_lineno;
 	bool		line_buf_valid = cstate->line_buf_valid;
-	int			nused = buffer->nused;
-	ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
-	TupleTableSlot **slots = buffer->slots;
 
-	/*
-	 * Print error context information correctly, if one of the operations
-	 * below fail.
-	 */
 	cstate->line_buf_valid = false;
-	save_cur_lineno = cstate->cur_lineno;
-
-	/*
-	 * table_multi_insert may leak memory, so switch to short-lived memory
-	 * context before calling it.
-	 */
-	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	table_multi_insert(resultRelInfo->ri_RelationDesc,
-					   slots,
-					   nused,
-					   mycid,
-					   ti_options,
-					   buffer->bistate);
-	MemoryContextSwitchTo(oldcontext);
-
-	for (i = 0; i < nused; i++)
+	for (i = 0; i < cur_slots; i++)
 	{
 		/*
-		 * If there are any indexes, update them for all the inserted tuples,
-		 * and run AFTER ROW INSERT triggers.
-		 */
-		if (resultRelInfo->ri_NumIndices > 0)
+		* If there are any indexes, update them for all the inserted tuples,
+		* and run AFTER ROW INSERT triggers.
+		*/
+		if (rri->ri_NumIndices > 0)
 		{
-			List	   *recheckIndexes;
+			List       *recheckIndexes;
 
 			cstate->cur_lineno = buffer->linenos[i];
 			recheckIndexes =
-				ExecInsertIndexTuples(resultRelInfo,
-									  buffer->slots[i], estate, false, NULL,
-									  NIL);
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], recheckIndexes,
+					ExecInsertIndexTuples(rri,
+										  istate->mistate->slots[i], estate,
+										  false,
+										  NULL,
+										  NULL);
+
+			ExecARInsertTriggers(estate,
+								 rri,
+								 istate->mistate->slots[i],
+								 recheckIndexes,
 								 cstate->transition_capture);
+
 			list_free(recheckIndexes);
 		}
 
 		/*
-		 * There's no indexes, but see if we need to run AFTER ROW INSERT
-		 * triggers anyway.
-		 */
-		else if (resultRelInfo->ri_TrigDesc != NULL &&
-				 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
-				  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+		* There's no indexes, but see if we need to run AFTER ROW INSERT
+		* triggers anyway.
+		*/
+		else if (rri->ri_TrigDesc != NULL &&
+				(rri->ri_TrigDesc->trig_insert_after_row ||
+				 rri->ri_TrigDesc->trig_insert_new_table))
 		{
 			cstate->cur_lineno = buffer->linenos[i];
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], NIL, cstate->transition_capture);
+			ExecARInsertTriggers(estate,
+								 rri,
+								 istate->mistate->slots[i],
+								 NULL,
+								 cstate->transition_capture);
 		}
 
-		ExecClearTuple(slots[i]);
+		ExecClearTuple(istate->mistate->slots[i]);
 	}
 
-	/* Mark that all slots are free */
-	buffer->nused = 0;
-
 	/* reset cur_lineno and line_buf_valid to what they were */
 	cstate->line_buf_valid = line_buf_valid;
 	cstate->cur_lineno = save_cur_lineno;
 }
 
-/*
- * Drop used slots and free member for this buffer.
- *
- * The buffer must be flushed before cleanup.
- */
-static inline void
-CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
-							 CopyMultiInsertBuffer *buffer)
+static void
+CopyMultiInsertBufferTuple(ResultRelInfo *rri, TupleTableSlot *slot,
+					   CopyFromState cstate, EState *estate)
 {
-	int			i;
-
-	/* Ensure buffer was flushed */
-	Assert(buffer->nused == 0);
-
-	/* Remove back-link to ourself */
-	buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
-
-	FreeBulkInsertState(buffer->bistate);
+	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+	TableInsertState *istate = buffer->istate;
+	int32 cur_slots = istate->mistate->cur_slots;
 
-	/* Since we only create slots on demand, just drop the non-null ones. */
-	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
-		ExecDropSingleTupleTableSlot(buffer->slots[i]);
+	buffer->linenos[istate->mistate->cur_slots] = cstate->cur_lineno;
+	istate->mistate->cur_tup_size = cstate->line_buf.len;
 
-	table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
-							 miinfo->ti_options);
+	table_multi_insert_v2(buffer->istate, slot);
 
-	pfree(buffer);
+	if (istate->mistate->flushed)
+		HandleAfterRowEvents(rri, estate, cstate, cur_slots);
 }
 
-/*
- * Write out all stored tuples in all buffers out to the tables.
- *
- * Once flushed we also trim the tracked buffers list down to size by removing
- * the buffers created earliest first.
- *
- * Callers should pass 'curr_rri' is the ResultRelInfo that's currently being
- * used.  When cleaning up old buffers we'll never remove the one for
- * 'curr_rri'.
- */
-static inline void
-CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
+static void
+CopyMulitInsertFlushBuffers(List **mirri, ResultRelInfo *curr_rri,
+							CopyFromState cstate, EState *estate)
 {
 	ListCell   *lc;
 
-	foreach(lc, miinfo->multiInsertBuffers)
+	foreach(lc, *mirri)
 	{
-		CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
+		ResultRelInfo *rri = lfirst(lc);
+		CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+		TableInsertState *istate = buffer->istate;
+		int32 cur_slots = istate->mistate->cur_slots;
 
-		CopyMultiInsertBufferFlush(miinfo, buffer);
-	}
+		table_multi_insert_flush(istate);
 
-	miinfo->bufferedTuples = 0;
-	miinfo->bufferedBytes = 0;
+		if (istate->mistate->flushed)
+			HandleAfterRowEvents(rri, estate, cstate, cur_slots);
+	}
 
 	/*
 	 * Trim the list of tracked buffers down if it exceeds the limit.  Here we
@@ -432,87 +315,62 @@ CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
 	 * likely that these older ones will be needed than the ones that were
 	 * just created.
 	 */
-	while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
+	while (list_length(*mirri) > MAX_PARTITION_BUFFERS)
 	{
+		ResultRelInfo *rri;
 		CopyMultiInsertBuffer *buffer;
+		TableInsertState *istate;
+		int ti_options;
 
-		buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
+		rri = (ResultRelInfo *) linitial(*mirri);
 
 		/*
 		 * We never want to remove the buffer that's currently being used, so
 		 * if we happen to find that then move it to the end of the list.
 		 */
-		if (buffer->resultRelInfo == curr_rri)
+		if (rri == curr_rri)
 		{
-			miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
-			miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
-			buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
+			*mirri = list_delete_first(*mirri);
+			*mirri = lappend(*mirri, rri);
+			rri = (ResultRelInfo *) linitial(*mirri);
 		}
 
-		CopyMultiInsertBufferCleanup(miinfo, buffer);
-		miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
-	}
-}
+		buffer = rri->ri_CopyMultiInsertBuffer;
+		istate = buffer->istate;
+		istate->mistate->clear_slots = true;
+		ti_options = istate->options;
 
-/*
- * Cleanup allocated buffers and free memory
- */
-static inline void
-CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
-{
-	ListCell   *lc;
+		table_insert_end(istate);
 
-	foreach(lc, miinfo->multiInsertBuffers)
-		CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
+		table_finish_bulk_insert(rri->ri_RelationDesc, ti_options);
 
-	list_free(miinfo->multiInsertBuffers);
+		*mirri = list_delete_first(*mirri);
+	}
 }
 
-/*
- * Get the next TupleTableSlot that the next tuple should be stored in.
- *
- * Callers must ensure that the buffer is not full.
- *
- * Note: 'miinfo' is unused but has been included for consistency with the
- * other functions in this area.
- */
-static inline TupleTableSlot *
-CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
-								ResultRelInfo *rri)
+static void
+CopyMulitInsertDropBuffers(List *mirri)
 {
-	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
-	int			nused = buffer->nused;
-
-	Assert(buffer != NULL);
-	Assert(nused < MAX_BUFFERED_TUPLES);
+	ListCell   *lc;
 
-	if (buffer->slots[nused] == NULL)
-		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
-	return buffer->slots[nused];
-}
+	foreach(lc, mirri)
+	{
+		int ti_options;
+		ResultRelInfo *rri = lfirst(lc);
+		CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+		TableInsertState *istate = buffer->istate;
 
-/*
- * Record the previously reserved TupleTableSlot that was reserved by
- * CopyMultiInsertInfoNextFreeSlot as being consumed.
- */
-static inline void
-CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
-						 TupleTableSlot *slot, int tuplen, uint64 lineno)
-{
-	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+		istate->mistate->clear_slots = true;
+		ti_options = istate->options;
 
-	Assert(buffer != NULL);
-	Assert(slot == buffer->slots[buffer->nused]);
+		table_insert_end(istate);
 
-	/* Store the line number so we can properly report any errors later */
-	buffer->linenos[buffer->nused] = lineno;
+		table_finish_bulk_insert(rri->ri_RelationDesc, ti_options);
 
-	/* Record this slot as being used */
-	buffer->nused++;
+		pfree(buffer);
+	}
 
-	/* Update how many tuples are stored and their size */
-	miinfo->bufferedTuples++;
-	miinfo->bufferedBytes += tuplen;
+	list_free(mirri);
 }
 
 /*
@@ -527,20 +385,20 @@ CopyFrom(CopyFromState cstate)
 	EState	   *estate = CreateExecutorState(); /* for ExecConstraints() */
 	ModifyTableState *mtstate;
 	ExprContext *econtext;
-	TupleTableSlot *singleslot = NULL;
+	TupleTableSlot *slot = NULL;
 	MemoryContext oldcontext = CurrentMemoryContext;
 
 	PartitionTupleRouting *proute = NULL;
 	ErrorContextCallback errcallback;
 	CommandId	mycid = GetCurrentCommandId(true);
 	int			ti_options = 0; /* start with default options for insert */
-	BulkInsertState bistate = NULL;
 	CopyInsertMethod insertMethod;
-	CopyMultiInsertInfo multiInsertInfo = {0};	/* pacify compiler */
 	uint64		processed = 0;
 	bool		has_before_insert_row_trig;
 	bool		has_instead_insert_row_trig;
 	bool		leafpart_use_multi_insert = false;
+	List	   *multi_insert_rris = NULL;
+	TableInsertState *istate = NULL;
 
 	Assert(cstate->rel);
 	Assert(list_length(cstate->range_table) == 1);
@@ -723,7 +581,7 @@ CopyFrom(CopyFromState cstate)
 		 * For partitioned tables we can't support multi-inserts when there
 		 * are any statement level insert triggers. It might be possible to
 		 * allow partitioned tables with such triggers in the future, but for
-		 * now, CopyMultiInsertInfoFlush expects that any before row insert
+		 * now, CopyMulitInsertFlushBuffers expects that any before row insert
 		 * and statement level insert triggers are on the same relation.
 		 */
 		insertMethod = CIM_SINGLE;
@@ -771,22 +629,22 @@ CopyFrom(CopyFromState cstate)
 		else
 			insertMethod = CIM_MULTI;
 
-		CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
-								estate, mycid, ti_options);
+		/*
+		* Only setup the buffer when not dealing with a partitioned table.
+		* Buffers for partitioned tables will just be setup when we need to
+		* send tuples their way for the first time.
+		*/
+		if (!proute)
+			InitCopyMultiInsertBufferInfo(&multi_insert_rris, resultRelInfo,
+										  mycid, ti_options);
 	}
 
 	/*
-	 * If not using batch mode (which allocates slots as needed) set up a
-	 * tuple slot too. When inserting into a partitioned table, we also need
-	 * one, even if we might batch insert, to read the tuple in the root
-	 * partition's form.
+	 * Set up a tuple slot to which the input data from copy stream is read
+	 * into and used for inserts into table.
 	 */
-	if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
-	{
-		singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
-									   &estate->es_tupleTable);
-		bistate = GetBulkInsertState();
-	}
+	slot = table_slot_create(resultRelInfo->ri_RelationDesc,
+									&estate->es_tupleTable);
 
 	has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
 								  resultRelInfo->ri_TrigDesc->trig_insert_before_row);
@@ -824,19 +682,8 @@ CopyFrom(CopyFromState cstate)
 		ResetPerTupleExprContext(estate);
 
 		/* select slot to (initially) load row into */
-		if (insertMethod == CIM_SINGLE || proute)
-		{
-			myslot = singleslot;
-			Assert(myslot != NULL);
-		}
-		else
-		{
-			Assert(resultRelInfo == target_resultRelInfo);
-			Assert(insertMethod == CIM_MULTI);
-
-			myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
-													 resultRelInfo);
-		}
+		myslot = slot;
+		Assert(myslot != NULL);
 
 		/*
 		 * Switch to per-tuple context before calling NextCopyFrom, which does
@@ -904,21 +751,22 @@ CopyFrom(CopyFromState cstate)
 				if (leafpart_use_multi_insert)
 				{
 					if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
-						CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
-													   resultRelInfo);
+						InitCopyMultiInsertBufferInfo(&multi_insert_rris,
+													  resultRelInfo, mycid,
+													  ti_options);
 				}
-				else if (insertMethod == CIM_MULTI_CONDITIONAL &&
-						 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
+				else if (insertMethod == CIM_MULTI_CONDITIONAL)
 				{
 					/*
 					 * Flush pending inserts if this partition can't use
 					 * batching, so rows are visible to triggers etc.
 					 */
-					CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+					CopyMulitInsertFlushBuffers(&multi_insert_rris,
+												resultRelInfo, cstate, estate);
 				}
 
-				if (bistate != NULL)
-					ReleaseBulkInsertStatePin(bistate);
+				if (istate && istate->bistate)
+					ReleaseBulkInsertStatePin(istate->bistate);
 				prevResultRelInfo = resultRelInfo;
 			}
 
@@ -960,8 +808,8 @@ CopyFrom(CopyFromState cstate)
 				/* no other path available for partitioned table */
 				Assert(insertMethod == CIM_MULTI_CONDITIONAL);
 
-				batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
-															resultRelInfo);
+				batchslot = table_slot_create(resultRelInfo->ri_RelationDesc,
+											  &estate->es_tupleTable);
 
 				if (map != NULL)
 					myslot = execute_attr_map_slot(map->attrMap, myslot,
@@ -1033,24 +881,9 @@ CopyFrom(CopyFromState cstate)
 				/* Store the slot in the multi-insert buffer, when enabled. */
 				if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
 				{
-					/*
-					 * The slot previously might point into the per-tuple
-					 * context. For batching it needs to be longer lived.
-					 */
-					ExecMaterializeSlot(myslot);
-
 					/* Add this tuple to the tuple buffer */
-					CopyMultiInsertInfoStore(&multiInsertInfo,
-											 resultRelInfo, myslot,
-											 cstate->line_buf.len,
-											 cstate->cur_lineno);
-
-					/*
-					 * If enough inserts have queued up, then flush all
-					 * buffers out to their tables.
-					 */
-					if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
-						CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+					CopyMultiInsertBufferTuple(resultRelInfo, myslot, cstate,
+											   estate);
 				}
 				else
 				{
@@ -1076,9 +909,21 @@ CopyFrom(CopyFromState cstate)
 					}
 					else
 					{
+						if (!istate)
+						{
+							istate = table_insert_begin(resultRelInfo->ri_RelationDesc,
+														mycid,
+														ti_options,
+														true,
+														false,
+														-1,
+														-1);
+						}
+
+						istate->rel = resultRelInfo->ri_RelationDesc;
+
 						/* OK, store the tuple and create index entries for it */
-						table_tuple_insert(resultRelInfo->ri_RelationDesc,
-										   myslot, mycid, ti_options, bistate);
+						table_insert_v2(istate, myslot);
 
 						if (resultRelInfo->ri_NumIndices > 0)
 							recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
@@ -1108,16 +953,14 @@ CopyFrom(CopyFromState cstate)
 
 	/* Flush any remaining buffered tuples */
 	if (insertMethod != CIM_SINGLE)
-	{
-		if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
-			CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
-	}
+		CopyMulitInsertFlushBuffers(&multi_insert_rris, resultRelInfo,
+									cstate, estate);
 
 	/* Done, clean up */
 	error_context_stack = errcallback.previous;
 
-	if (bistate != NULL)
-		FreeBulkInsertState(bistate);
+	if (istate)
+		table_insert_end(istate);
 
 	MemoryContextSwitchTo(oldcontext);
 
@@ -1144,7 +987,7 @@ CopyFrom(CopyFromState cstate)
 
 	/* Tear down the multi-insert buffer data */
 	if (insertMethod != CIM_SINGLE)
-		CopyMultiInsertInfoCleanup(&multiInsertInfo);
+		CopyMulitInsertDropBuffers(multi_insert_rris);
 
 	/* Close all the partitioned tables, leaf partitions, and their indices */
 	if (proute)
-- 
2.25.1

