On Wed, Apr 3, 2024 at 2:32 PM Bharath Rupireddy
<bharath.rupireddyforpostg...@gmail.com> wrote:
>
> I too prefer the latter so that the caller doesn't have to have two
> paths. The new API can just transparently fallback to single inserts.
> I've implemented that in the attached v17 patch. I also tested the
> default APIs manually, but I'll see if I can add some tests to it the
> default API.

Fixed a compiler warning found via CF bot. Please find the attached
v18 patches. I'm sorry for the noise.

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From ff1278b77e0d6ac6a49f0826602bd948e78c7a91 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 3 Apr 2024 11:56:14 +0000
Subject: [PATCH v18 1/2] Introduce new table modify access methods

---
 src/backend/access/heap/heapam.c         | 189 ++++++++++++++++++++++-
 src/backend/access/heap/heapam_handler.c |   5 +
 src/backend/access/table/tableam.c       |  86 +++++++++++
 src/backend/access/table/tableamapi.c    |   8 +
 src/include/access/heapam.h              |  41 +++++
 src/include/access/tableam.h             | 108 +++++++++++++
 src/tools/pgindent/typedefs.list         |   3 +
 7 files changed, 439 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index b661d9811e..69f8c597d8 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -64,6 +64,7 @@
 #include "storage/standby.h"
 #include "utils/datum.h"
 #include "utils/inval.h"
+#include "utils/memutils.h"
 #include "utils/relcache.h"
 #include "utils/snapmgr.h"
 #include "utils/spccache.h"
@@ -107,7 +108,8 @@ static int	bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate);
 static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
 static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 										bool *copy);
-
+static void heap_modify_buffer_flush(TableModifyState *state);
+static void heap_modify_insert_end(TableModifyState *state);
 
 /*
  * Each tuple lock mode has a corresponding heavyweight lock, and one or two
@@ -2441,6 +2443,191 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 	*insert_indexes = true;
 }
 
+/*
+ * Initialize heap modify state.
+ */
+TableModifyState *
+heap_modify_begin(Relation rel, int modify_flags, CommandId cid,
+				  int options)
+{
+	TableModifyState *state;
+	MemoryContext context;
+	MemoryContext oldcontext;
+
+	context = AllocSetContextCreate(CurrentMemoryContext,
+									"heap_modify memory context",
+									ALLOCSET_DEFAULT_SIZES);
+
+	oldcontext = MemoryContextSwitchTo(context);
+	state = palloc0(sizeof(TableModifyState));
+	state->rel = rel;
+	state->modify_flags = modify_flags;
+	state->mctx = context;
+	state->cid = cid;
+	state->options = options;
+	state->insert_indexes = false;
+	state->modify_end_cb = NULL;	/* To be installed lazily */
+	MemoryContextSwitchTo(oldcontext);
+
+	return state;
+}
+
+/*
+ * Store passed-in tuple into in-memory buffered slots. When full, insert
+ * multiple tuples from the buffers into heap.
+ */
+void
+heap_modify_buffer_insert(TableModifyState *state,
+						  TupleTableSlot *slot)
+{
+	TupleTableSlot *dstslot;
+	HeapInsertState *istate;
+	HeapMultiInsertState *mistate;
+	MemoryContext oldcontext;
+
+	oldcontext = MemoryContextSwitchTo(state->mctx);
+
+	/* First time through, initialize heap insert state */
+	if (state->data == NULL)
+	{
+		istate = (HeapInsertState *) palloc0(sizeof(HeapInsertState));
+		istate->bistate = NULL;
+		istate->mistate = NULL;
+		state->data = istate;
+
+		if ((state->modify_flags & TM_FLAG_MULTI_INSERTS) != 0)
+		{
+			mistate = (HeapMultiInsertState *) palloc0(sizeof(HeapMultiInsertState));
+			mistate->slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS);
+			istate->mistate = mistate;
+		}
+
+		if ((state->modify_flags & TM_FLAG_BAS_BULKWRITE) != 0)
+			istate->bistate = GetBulkInsertState();
+
+		state->modify_end_cb = heap_modify_insert_end;
+	}
+
+	istate = (HeapInsertState *) state->data;
+	Assert(istate->mistate != NULL);
+	mistate = istate->mistate;
+	Assert(istate->bistate != NULL);
+
+	dstslot = mistate->slots[mistate->cur_slots];
+	if (dstslot == NULL)
+	{
+		/*
+		 * We use virtual tuple slots buffered slots for leveraging the
+		 * optimization it provides to minimize physical data copying. The
+		 * virtual slot gets materialized when we copy (via below
+		 * ExecCopySlot) the tuples from the source slot which can be of any
+		 * type. This way, it is ensured that the tuple storage doesn't depend
+		 * on external memory, because all the datums that aren't passed by
+		 * value are copied into the slot's memory context.
+		 */
+		dstslot = MakeTupleTableSlot(RelationGetDescr(state->rel),
+									 &TTSOpsVirtual);
+		mistate->slots[mistate->cur_slots] = dstslot;
+	}
+
+	ExecClearTuple(dstslot);
+	ExecCopySlot(dstslot, slot);
+
+	mistate->cur_slots++;
+
+	/*
+	 * Memory allocated for the whole tuple is in slot's memory context, so
+	 * use it keep track of the total space occupied by all buffered tuples.
+	 */
+	if (TTS_SHOULDFREE(dstslot))
+		mistate->cur_size += MemoryContextMemAllocated(dstslot->tts_mcxt, false);
+
+	if (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS ||
+		mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES)
+		heap_modify_buffer_flush(state);
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Insert multiple tuples from in-memory buffered slots into heap.
+ */
+static void
+heap_modify_buffer_flush(TableModifyState *state)
+{
+	HeapInsertState *istate;
+	HeapMultiInsertState *mistate;
+	MemoryContext oldcontext;
+
+	/* Quick exit if we haven't inserted anything yet */
+	if (state->data == NULL)
+		return;
+
+	istate = (HeapInsertState *) state->data;
+	Assert(istate->mistate != NULL);
+	mistate = istate->mistate;
+	Assert(istate->bistate != NULL);
+
+	if (mistate->cur_slots == 0)
+		return;
+
+	oldcontext = MemoryContextSwitchTo(state->mctx);
+
+	heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots,
+					  state->cid, state->options, istate->bistate,
+					  &state->insert_indexes);
+
+	mistate->cur_slots = 0;
+	mistate->cur_size = 0;
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Heap insert specific callback used for performing work at the end like
+ * flushing buffered tuples if any, cleaning up the insert state and buffered
+ * slots.
+ */
+static void
+heap_modify_insert_end(TableModifyState *state)
+{
+	HeapInsertState *istate;
+
+	/* Quick exit if we haven't inserted anything yet */
+	if (state->data == NULL)
+		return;
+
+	istate = (HeapInsertState *) state->data;
+
+	if (istate->mistate != NULL)
+	{
+		HeapMultiInsertState *mistate = istate->mistate;
+
+		heap_modify_buffer_flush(state);
+
+		Assert(mistate->cur_slots == 0 &&
+			   mistate->cur_size == 0);
+
+		for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++)
+			ExecDropSingleTupleTableSlot(mistate->slots[i]);
+	}
+
+	if (istate->bistate != NULL)
+		FreeBulkInsertState(istate->bistate);
+}
+
+/*
+ * Clean heap modify state.
+ */
+void
+heap_modify_end(TableModifyState *state)
+{
+	if (state->modify_end_cb != NULL)
+		state->modify_end_cb(state);
+
+	MemoryContextDelete(state->mctx);
+}
+
 /*
  *	simple_heap_insert - insert a tuple
  *
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index c86000d245..f3aa29851d 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2638,6 +2638,11 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_insert_speculative = heapam_tuple_insert_speculative,
 	.tuple_complete_speculative = heapam_tuple_complete_speculative,
 	.multi_insert = heap_multi_insert,
+
+	.tuple_modify_begin = heap_modify_begin,
+	.tuple_modify_buffer_insert = heap_modify_buffer_insert,
+	.tuple_modify_end = heap_modify_end,
+
 	.tuple_delete = heapam_tuple_delete,
 	.tuple_update = heapam_tuple_update,
 	.tuple_lock = heapam_tuple_lock,
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index 805d222ceb..4c7b5433ec 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -21,6 +21,7 @@
 
 #include <math.h>
 
+#include "access/heapam.h"		/* just for BulkInsertState */
 #include "access/syncscan.h"
 #include "access/tableam.h"
 #include "access/xact.h"
@@ -29,6 +30,7 @@
 #include "storage/bufmgr.h"
 #include "storage/shmem.h"
 #include "storage/smgr.h"
+#include "utils/memutils.h"
 
 /*
  * Constants to control the behavior of block allocation to parallel workers
@@ -48,6 +50,7 @@
 char	   *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD;
 bool		synchronize_seqscans = true;
 
+static void default_table_modify_insert_end(TableModifyState *state);
 
 /* ----------------------------------------------------------------------------
  * Slot functions.
@@ -772,3 +775,86 @@ table_block_relation_estimate_size(Relation rel, int32 *attr_widths,
 	else
 		*allvisfrac = (double) relallvisible / curpages;
 }
+
+/*
+ * Initialize default table modify state.
+ */
+TableModifyState *
+default_table_modify_begin(Relation rel, int modify_flags, CommandId cid,
+						   int options)
+{
+	TableModifyState *state;
+	MemoryContext context;
+	MemoryContext oldcontext;
+
+	context = AllocSetContextCreate(CurrentMemoryContext,
+									"default_table_modify memory context",
+									ALLOCSET_DEFAULT_SIZES);
+
+	oldcontext = MemoryContextSwitchTo(context);
+	state = palloc0(sizeof(TableModifyState));
+	state->rel = rel;
+	state->modify_flags = modify_flags;
+	state->mctx = context;
+	state->cid = cid;
+	state->options = options;
+	state->insert_indexes = false;
+	state->modify_end_cb = NULL;	/* To be installed lazily */
+	MemoryContextSwitchTo(oldcontext);
+
+	return state;
+}
+
+/*
+ * Default table modify implementation for inserts.
+ */
+void
+default_table_modify_buffer_insert(TableModifyState *state,
+								   TupleTableSlot *slot)
+{
+	MemoryContext oldcontext;
+
+	oldcontext = MemoryContextSwitchTo(state->mctx);
+
+	/* First time through, initialize default table modify state */
+	if (state->data == NULL)
+	{
+		if ((state->modify_flags & TM_FLAG_BAS_BULKWRITE) != 0)
+			state->data = (BulkInsertState) GetBulkInsertState();
+
+		state->modify_end_cb = default_table_modify_insert_end;
+	}
+
+	/* Fallback to table AM single insert routine */
+	table_tuple_insert(state->rel,
+					   slot,
+					   state->cid,
+					   state->options,
+					   (BulkInsertState) state->data,
+					   &state->insert_indexes);
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Default table modify insert specific callback used for performing work at
+ * the end like cleaning up the bulk insert state.
+ */
+static void
+default_table_modify_insert_end(TableModifyState *state)
+{
+	if (state->data != NULL)
+		FreeBulkInsertState((BulkInsertState) state->data);
+}
+
+/*
+ * Clean default table modify state.
+ */
+void
+default_table_modify_end(TableModifyState *state)
+{
+	if (state->modify_end_cb != NULL)
+		state->modify_end_cb(state);
+
+	MemoryContextDelete(state->mctx);
+}
diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c
index 55b8caeadf..9c095b93e7 100644
--- a/src/backend/access/table/tableamapi.c
+++ b/src/backend/access/table/tableamapi.c
@@ -95,6 +95,14 @@ GetTableAmRoutine(Oid amhandler)
 	Assert(routine->scan_sample_next_block != NULL);
 	Assert(routine->scan_sample_next_tuple != NULL);
 
+	/* optional, but either all of them are defined or none. */
+	Assert((routine->tuple_modify_begin == NULL &&
+			routine->tuple_modify_buffer_insert == NULL &&
+			routine->tuple_modify_end == NULL) ||
+		   (routine->tuple_modify_begin != NULL &&
+			routine->tuple_modify_buffer_insert != NULL &&
+			routine->tuple_modify_end != NULL));
+
 	return routine;
 }
 
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index b632fe953c..b35ba5509b 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -236,6 +236,36 @@ htsv_get_valid_status(int status)
 	return (HTSV_Result) status;
 }
 
+/*
+ * Maximum number of slots that multi-insert buffers can hold.
+ *
+ * Caution: Don't make this too big, as we could end up with this many tuples
+ * stored in multi insert buffer.
+ */
+#define HEAP_MAX_BUFFERED_SLOTS		1000
+
+/* Maximum size of all tuples that multi-insert buffers can hold */
+#define HEAP_MAX_BUFFERED_BYTES		65535
+
+typedef struct HeapMultiInsertState
+{
+	/* Array of buffered slots */
+	TupleTableSlot **slots;
+
+	/* Number of buffered slots currently held */
+	int			cur_slots;
+
+	/* Approximate size of all tuples currently held in buffered slots */
+	Size		cur_size;
+} HeapMultiInsertState;
+
+typedef struct HeapInsertState
+{
+	struct BulkInsertStateData *bistate;
+	HeapMultiInsertState *mistate;
+} HeapInsertState;
+
+
 /* ----------------
  *		function prototypes for heap access method
  *
@@ -286,6 +316,17 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
 							  int ntuples, CommandId cid, int options,
 							  BulkInsertState bistate, bool *insert_indexes);
+
+extern TableModifyState *heap_modify_begin(Relation rel,
+										   int modify_flags,
+										   CommandId cid,
+										   int options);
+
+extern void heap_modify_buffer_insert(TableModifyState *state,
+									  TupleTableSlot *slot);
+
+extern void heap_modify_end(TableModifyState *state);
+
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
 							 CommandId cid, Snapshot crosscheck, int options,
 							 struct TM_FailureData *tmfd, bool changingPart,
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 2c1a540155..fef9202022 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -248,6 +248,35 @@ typedef struct TM_IndexDeleteOp
 	TM_IndexStatus *status;
 } TM_IndexDeleteOp;
 
+/* Table modify flags */
+
+/* Use multi inserts, i.e. buffer multiple tuples and insert them at once */
+#define TM_FLAG_MULTI_INSERTS	0x000001
+
+/* Use BAS_BULKWRITE buffer access strategy */
+#define TM_FLAG_BAS_BULKWRITE	0x000002
+
+struct TableModifyState;
+
+/* Table AM specific callback that gets called in table_modify_end() */
+typedef void (*TableModifyEndCP) (struct TableModifyState *state);
+
+/* Holds table modify state */
+typedef struct TableModifyState
+{
+	Relation	rel;
+	int			modify_flags;
+	MemoryContext mctx;
+	CommandId	cid;
+	int			options;
+	bool		insert_indexes;
+
+	/* Table AM specific data starts here */
+	void	   *data;
+
+	TableModifyEndCP modify_end_cb;
+} TableModifyState;
+
 /* "options" flag bits for table_tuple_insert */
 /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */
 #define TABLE_INSERT_SKIP_FSM		0x0002
@@ -584,6 +613,18 @@ typedef struct TableAmRoutine
 	void		(*finish_bulk_insert) (Relation rel, int options);
 
 
+	/* ------------------------------------------------------------------------
+	 * Table Modify related functions.
+	 * ------------------------------------------------------------------------
+	 */
+	TableModifyState *(*tuple_modify_begin) (Relation rel,
+											 int modify_flags,
+											 CommandId cid,
+											 int options);
+	void		(*tuple_modify_buffer_insert) (TableModifyState *state,
+											   TupleTableSlot *slot);
+	void		(*tuple_modify_end) (TableModifyState *state);
+
 	/* ------------------------------------------------------------------------
 	 * DDL related functionality.
 	 * ------------------------------------------------------------------------
@@ -1604,6 +1645,73 @@ table_finish_bulk_insert(Relation rel, int options)
 		rel->rd_tableam->finish_bulk_insert(rel, options);
 }
 
+/* ------------------------------------------------------------------------
+ * Table Modify related functions.
+ * ------------------------------------------------------------------------
+ */
+extern TableModifyState *default_table_modify_begin(Relation rel, int modify_flags,
+													CommandId cid, int options);
+extern void	default_table_modify_buffer_insert(TableModifyState *state,
+											   TupleTableSlot *slot);
+extern void default_table_modify_end(TableModifyState *state);
+
+static inline TableModifyState *
+table_modify_begin(Relation rel, int modify_flags, CommandId cid, int options)
+{
+	if (rel->rd_tableam &&
+		rel->rd_tableam->tuple_modify_begin != NULL)
+	{
+		return rel->rd_tableam->tuple_modify_begin(rel, modify_flags,
+												   cid, options);
+	}
+	else if (rel->rd_tableam &&
+			 rel->rd_tableam->tuple_modify_begin == NULL)
+	{
+		/* Fallback to a default implementation */
+		return default_table_modify_begin(rel, modify_flags,
+										  cid, options);
+	}
+	else
+		Assert(false);
+
+	return NULL;
+}
+
+static inline void
+table_modify_buffer_insert(TableModifyState *state, TupleTableSlot *slot)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_modify_buffer_insert != NULL)
+	{
+		state->rel->rd_tableam->tuple_modify_buffer_insert(state, slot);
+	}
+	else if (state->rel->rd_tableam &&
+			 state->rel->rd_tableam->tuple_modify_buffer_insert == NULL)
+	{
+		/* Fallback to a default implementation */
+		default_table_modify_buffer_insert(state, slot);
+	}
+	else
+		Assert(false);
+}
+
+static inline void
+table_modify_end(TableModifyState *state)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_modify_end != NULL)
+	{
+		state->rel->rd_tableam->tuple_modify_end(state);
+	}
+	else if (state->rel->rd_tableam &&
+			 state->rel->rd_tableam->tuple_modify_end == NULL)
+	{
+		/* Fallback to a default implementation */
+		default_table_modify_end(state);
+	}
+	else
+		Assert(false);
+}
 
 /* ------------------------------------------------------------------------
  * DDL related functionality.
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 2b01a3081e..edaa4d26f0 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1123,6 +1123,8 @@ HeadlineJsonState
 HeadlineParsedText
 HeadlineWordEntry
 HeapCheckContext
+HeapInsertState
+HeapMultiInsertState
 HeapPageFreeze
 HeapScanDesc
 HeapTuple
@@ -2814,6 +2816,7 @@ TableFuncScan
 TableFuncScanState
 TableInfo
 TableLikeClause
+TableModifyState
 TableSampleClause
 TableScanDesc
 TableScanDescData
-- 
2.34.1

From 7cbb0630ec2cfd278384676536577cf445c0a092 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 3 Apr 2024 11:56:31 +0000
Subject: [PATCH v18 2/2] Optimize CTAS, CMV, RMV with multi inserts

---
 src/backend/commands/createas.c | 27 +++++++++------------------
 src/backend/commands/matview.c  | 26 +++++++++-----------------
 2 files changed, 18 insertions(+), 35 deletions(-)

diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index afd3dace07..00c1271f93 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -53,9 +53,7 @@ typedef struct
 	/* These fields are filled by intorel_startup: */
 	Relation	rel;			/* relation to write to */
 	ObjectAddress reladdr;		/* address of rel, for ExecCreateTableAs */
-	CommandId	output_cid;		/* cmin to insert in output tuples */
-	int			ti_options;		/* table_tuple_insert performance options */
-	BulkInsertState bistate;	/* bulk insert state */
+	TableModifyState *mstate;	/* table insert state */
 } DR_intorel;
 
 /* utility functions for CTAS definition creation */
@@ -552,17 +550,19 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	 */
 	myState->rel = intoRelationDesc;
 	myState->reladdr = intoRelationAddr;
-	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM;
 
 	/*
 	 * If WITH NO DATA is specified, there is no need to set up the state for
 	 * bulk inserts as there are no tuples to insert.
 	 */
 	if (!into->skipData)
-		myState->bistate = GetBulkInsertState();
+		myState->mstate = table_modify_begin(intoRelationDesc,
+											 TM_FLAG_MULTI_INSERTS |
+											 TM_FLAG_BAS_BULKWRITE,
+											 GetCurrentCommandId(true),
+											 TABLE_INSERT_SKIP_FSM);
 	else
-		myState->bistate = NULL;
+		myState->mstate = NULL;
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -578,7 +578,6 @@ static bool
 intorel_receive(TupleTableSlot *slot, DestReceiver *self)
 {
 	DR_intorel *myState = (DR_intorel *) self;
-	bool		insertIndexes;
 
 	/* Nothing to insert if WITH NO DATA is specified. */
 	if (!myState->into->skipData)
@@ -591,12 +590,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
 		 * would not be cheap either. This also doesn't allow accessing per-AM
 		 * data (say a tuple's xmin), but since we don't do that here...
 		 */
-		table_tuple_insert(myState->rel,
-						   slot,
-						   myState->output_cid,
-						   myState->ti_options,
-						   myState->bistate,
-						   &insertIndexes);
+		table_modify_buffer_insert(myState->mstate, slot);
 	}
 
 	/* We know this is a newly created relation, so there are no indexes */
@@ -614,10 +608,7 @@ intorel_shutdown(DestReceiver *self)
 	IntoClause *into = myState->into;
 
 	if (!into->skipData)
-	{
-		FreeBulkInsertState(myState->bistate);
-		table_finish_bulk_insert(myState->rel, myState->ti_options);
-	}
+		table_modify_end(myState->mstate);
 
 	/* close rel, but keep lock until commit */
 	table_close(myState->rel, NoLock);
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 9ec13d0984..f03aa1cff3 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -48,9 +48,7 @@ typedef struct
 	Oid			transientoid;	/* OID of new heap into which to store */
 	/* These fields are filled by transientrel_startup: */
 	Relation	transientrel;	/* relation to write to */
-	CommandId	output_cid;		/* cmin to insert in output tuples */
-	int			ti_options;		/* table_tuple_insert performance options */
-	BulkInsertState bistate;	/* bulk insert state */
+	TableModifyState *mstate;	/* table insert state */
 } DR_transientrel;
 
 static int	matview_maintenance_depth = 0;
@@ -458,9 +456,12 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	 * Fill private fields of myState for use by later routines
 	 */
 	myState->transientrel = transientrel;
-	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
-	myState->bistate = GetBulkInsertState();
+	myState->mstate = table_modify_begin(transientrel,
+										 TM_FLAG_MULTI_INSERTS |
+										 TM_FLAG_BAS_BULKWRITE,
+										 GetCurrentCommandId(true),
+										 TABLE_INSERT_SKIP_FSM |
+										 TABLE_INSERT_FROZEN);
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -476,7 +477,6 @@ static bool
 transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
 {
 	DR_transientrel *myState = (DR_transientrel *) self;
-	bool		insertIndexes;
 
 	/*
 	 * Note that the input slot might not be of the type of the target
@@ -486,13 +486,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
 	 * cheap either. This also doesn't allow accessing per-AM data (say a
 	 * tuple's xmin), but since we don't do that here...
 	 */
-
-	table_tuple_insert(myState->transientrel,
-					   slot,
-					   myState->output_cid,
-					   myState->ti_options,
-					   myState->bistate,
-					   &insertIndexes);
+	table_modify_buffer_insert(myState->mstate, slot);
 
 	/* We know this is a newly created relation, so there are no indexes */
 
@@ -507,9 +501,7 @@ transientrel_shutdown(DestReceiver *self)
 {
 	DR_transientrel *myState = (DR_transientrel *) self;
 
-	FreeBulkInsertState(myState->bistate);
-
-	table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+	table_modify_end(myState->mstate);
 
 	/* close transientrel, but keep lock until commit */
 	table_close(myState->transientrel, NoLock);
-- 
2.34.1

Reply via email to