From 6803736e5695ab0ef06d263e9ba260db02d3b80c Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Tue, 1 Aug 2023 09:38:47 +0000
Subject: [PATCH v7] New table AMs for single and multi inserts

---
 src/backend/access/heap/heapam.c         | 180 +++++++++++++++++++++++
 src/backend/access/heap/heapam_handler.c |   6 +
 src/include/access/heapam.h              |  45 ++++++
 src/include/access/tableam.h             | 107 ++++++++++++++
 4 files changed, 338 insertions(+)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 7ed72abe59..ba4347026a 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -68,6 +68,7 @@
 #include "utils/datum.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
+#include "utils/memutils.h"
 #include "utils/relcache.h"
 #include "utils/snapmgr.h"
 #include "utils/spccache.h"
@@ -75,6 +76,7 @@
 
 static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
 									 TransactionId xid, CommandId cid, int options);
+static void heap_multi_insert_flush(TableInsertState *state);
 static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
 								  Buffer newbuf, HeapTuple oldtup,
 								  HeapTuple newtup, HeapTuple old_key_tuple,
@@ -2443,6 +2445,184 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 	pgstat_count_heap_insert(relation, ntuples);
 }
 
+/*
+ * Initialize state required for an insert a single tuple or multiple tuples
+ * into a heap.
+ */
+TableInsertState *
+heap_insert_begin(Relation rel, CommandId cid, int table_am_flags,
+				  int table_insert_flags)
+{
+	TableInsertState *tistate;
+
+	tistate = (TableInsertState *) palloc0(sizeof(TableInsertState));
+	tistate->rel = rel;
+	tistate->cid = cid;
+	tistate->table_am_flags = table_am_flags;
+	tistate->table_insert_flags = table_insert_flags;
+
+	if ((table_am_flags & TABLEAM_USE_MULTI_INSERTS) != 0 ||
+		(table_am_flags & TABLEAM_USE_BULKWRITE_BUFFER_ACCESS_STRATEGY))
+	{
+		tistate->table_am_data =
+			(HeapInsertState *) palloc0(sizeof(HeapInsertState));
+	}
+
+	if ((table_am_flags & TABLEAM_USE_MULTI_INSERTS) != 0)
+	{
+		((HeapInsertState *) tistate->table_am_data)->mistate =
+			(HeapMultiInsertState *) palloc0(sizeof(HeapMultiInsertState));
+
+		((HeapInsertState *) tistate->table_am_data)->mistate->slots =
+				palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS);
+
+		((HeapInsertState *) tistate->table_am_data)->mistate->context =
+				AllocSetContextCreate(CurrentMemoryContext,
+									  "heap_multi_insert_v2 memory context",
+									  ALLOCSET_DEFAULT_SIZES);
+	}
+
+	if ((table_am_flags & TABLEAM_USE_BULKWRITE_BUFFER_ACCESS_STRATEGY) != 0)
+		((HeapInsertState *) tistate->table_am_data)->bistate = GetBulkInsertState();
+
+	return tistate;
+}
+
+/*
+ * Insert a single tuple into a heap.
+ */
+void
+heap_insert_v2(TableInsertState *state, TupleTableSlot *slot)
+{
+	bool		shouldFree = true;
+	HeapTuple	tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
+	BulkInsertState bistate = NULL;
+
+	/* Update tuple with table oid */
+	slot->tts_tableOid = RelationGetRelid(state->rel);
+	tuple->t_tableOid = slot->tts_tableOid;
+
+	if (state->table_am_data != NULL &&
+		((HeapInsertState *) state->table_am_data)->bistate != NULL)
+	{
+		bistate = ((HeapInsertState *) state->table_am_data)->bistate;
+	}
+
+	/* Perform insertion, and copy the resulting ItemPointer */
+	heap_insert(state->rel, tuple, state->cid, state->table_insert_flags,
+				bistate);
+	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
+
+	if (shouldFree)
+		pfree(tuple);
+}
+
+/*
+ * Store passed-in tuple into in-memory buffered slots. When full, insert
+ * multiple tuples from the buffers into heap.
+ */
+void
+heap_multi_insert_v2(TableInsertState *state, TupleTableSlot *slot)
+{
+	TupleTableSlot  *dstslot;
+	HeapMultiInsertState *mistate;
+
+	Assert(state->table_am_data != NULL &&
+		   ((HeapInsertState *) state->table_am_data)->mistate != NULL);
+
+	mistate = ((HeapInsertState *) state->table_am_data)->mistate;
+	dstslot = mistate->slots[mistate->cur_slots];
+
+	if (dstslot == NULL)
+	{
+		dstslot = table_slot_create(state->rel, NULL);
+		mistate->slots[mistate->cur_slots] = dstslot;
+	}
+
+	ExecClearTuple(dstslot);
+	ExecCopySlot(dstslot, slot);
+	mistate->cur_slots++;
+
+	/*
+	 * When passed-in slot is already materialized, memory allocated in slot's
+	 * memory context is a close approximation for us to track the required
+	 * space for the tuple in slot.
+	 *
+	 * For non-materialized slots, the flushing decision happens solely on the
+	 * number of tuples stored in the buffer.
+	 */
+	if (TTS_SHOULDFREE(slot))
+		mistate->cur_size += MemoryContextMemAllocated(slot->tts_mcxt, false);
+
+	if (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS ||
+		mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES)
+		heap_multi_insert_flush(state);
+}
+
+/*
+ * Clean up state used to insert a single or multiple tuples into a heap.
+ */
+void
+heap_insert_end(TableInsertState *state)
+{
+	if (state->table_am_data != NULL &&
+		((HeapInsertState *) state->table_am_data)->mistate != NULL)
+	{
+		HeapMultiInsertState *mistate =
+			((HeapInsertState *) state->table_am_data)->mistate;
+
+		/* Insert remaining tuples from multi-insert buffers */
+		if (mistate->cur_slots > 0 || mistate->cur_size > 0)
+			heap_multi_insert_flush(state);
+
+		MemoryContextDelete(mistate->context);
+
+		for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++)
+			ExecDropSingleTupleTableSlot(mistate->slots[i]);
+
+		pfree(mistate);
+		((HeapInsertState *) state->table_am_data)->mistate = NULL;
+	}
+
+	if (state->table_am_data != NULL &&
+		((HeapInsertState *) state->table_am_data)->bistate != NULL)
+	{
+		FreeBulkInsertState(((HeapInsertState *) state->table_am_data)->bistate);
+	}
+
+	pfree(state->table_am_data);
+	state->table_am_data = NULL;
+	pfree(state);
+}
+
+/*
+ * Insert multiple tuples from in-memory buffered slots into heap.
+ */
+static void
+heap_multi_insert_flush(TableInsertState *state)
+{
+	HeapMultiInsertState *mistate;
+	BulkInsertState bistate = NULL;
+	MemoryContext oldcontext;
+
+	mistate = ((HeapInsertState *) state->table_am_data)->mistate;
+
+	if (state->table_am_data != NULL &&
+		((HeapInsertState *) state->table_am_data)->bistate != NULL)
+	{
+		bistate = ((HeapInsertState *) state->table_am_data)->bistate;
+	}
+
+	oldcontext = MemoryContextSwitchTo(mistate->context);
+	heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots,
+					  state->cid, state->table_insert_flags, bistate);
+	MemoryContextSwitchTo(oldcontext);
+	MemoryContextReset(mistate->context);
+
+	mistate->cur_slots = 0;
+	mistate->cur_size = 0;
+}
+
 /*
  *	simple_heap_insert - insert a tuple
  *
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 5a17112c91..6f144d88dd 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2568,6 +2568,12 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_insert_speculative = heapam_tuple_insert_speculative,
 	.tuple_complete_speculative = heapam_tuple_complete_speculative,
 	.multi_insert = heap_multi_insert,
+
+	.tuple_insert_begin = heap_insert_begin,
+	.tuple_insert_v2 = heap_insert_v2,
+	.tuple_multi_insert_v2 = heap_multi_insert_v2,
+	.tuple_insert_end = heap_insert_end,
+
 	.tuple_delete = heapam_tuple_delete,
 	.tuple_update = heapam_tuple_update,
 	.tuple_lock = heapam_tuple_lock,
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index faf5026519..a1ea26cbd6 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -191,6 +191,40 @@ typedef struct HeapPageFreeze
 
 } HeapPageFreeze;
 
+/*
+ * Maximum number of slots that multi-insert buffers can hold.
+ *
+ * Caution: Don't make this too big, as we could end up with this many tuples
+ * stored in multi insert buffer. For instance, increasing this can cause
+ * quadratic growth in memory requirements during copies into partitioned
+ * tables with a large number of partitions.
+ */
+#define HEAP_MAX_BUFFERED_SLOTS		1000
+
+/* Maximum size of all tuples that multi-insert buffers can hold */
+#define HEAP_MAX_BUFFERED_BYTES		65535
+
+typedef struct HeapMultiInsertState
+{
+	/* Memory context to use for flushing multi-insert buffers */
+	MemoryContext	context;
+
+	/* Array of buffered slots */
+	TupleTableSlot	**slots;
+
+	/* Number of slots that multi-insert buffers currently hold */
+	int		cur_slots;
+
+	/* Size of all tuples that multi-insert buffers currently hold */
+	Size	cur_size;
+} HeapMultiInsertState;
+
+typedef struct HeapInsertState
+{
+	struct BulkInsertStateData	*bistate;
+	HeapMultiInsertState	*mistate;
+} HeapInsertState;
+
 /* ----------------
  *		function prototypes for heap access method
  *
@@ -241,6 +275,17 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
 							  int ntuples, CommandId cid, int options,
 							  BulkInsertState bistate);
+
+extern TableInsertState* heap_insert_begin(Relation rel,
+										   CommandId cid,
+										   int table_am_flags,
+										   int table_insert_flags);
+extern void heap_insert_v2(TableInsertState *state,
+						   TupleTableSlot *slot);
+extern void heap_multi_insert_v2(TableInsertState *state,
+								 TupleTableSlot *slot);
+extern void heap_insert_end(TableInsertState *state);
+
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
 							 CommandId cid, Snapshot crosscheck, bool wait,
 							 struct TM_FailureData *tmfd, bool changingPart);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 230bc39cc0..5ea3eeee8a 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -247,6 +247,35 @@ typedef struct TM_IndexDeleteOp
 	TM_IndexStatus *status;
 } TM_IndexDeleteOp;
 
+/* Use multi (buffer multiple tuples and insert them at once) inserts */
+#define TABLEAM_USE_MULTI_INSERTS 0x000001
+
+/* Use BAS_BULKWRITE buffer access strategy */
+#define TABLEAM_USE_BULKWRITE_BUFFER_ACCESS_STRATEGY 0x000002
+
+
+/* Holds table insert state. */
+typedef struct TableInsertState
+{
+	/* Table AM-agnostic data starts here */
+	Relation	rel;	/* Target relation */
+
+	/*
+	 * Command ID for this insertion. If required, change this for each pass of
+	 * insert functions.
+	 */
+	CommandId	cid;
+
+	/* Table AM options (TABLEAM_XXX macros) */
+	int	table_am_flags;
+
+	/* table_tuple_insert performance options (TABLE_INSERT_XXX macros) */
+	int		table_insert_flags;
+
+	/* Table AM specific data starts here */
+	void	*table_am_data;
+} TableInsertState;
+
 /* "options" flag bits for table_tuple_insert */
 /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */
 #define TABLE_INSERT_SKIP_FSM		0x0002
@@ -522,6 +551,19 @@ typedef struct TableAmRoutine
 	void		(*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots,
 								 CommandId cid, int options, struct BulkInsertStateData *bistate);
 
+	TableInsertState *(*tuple_insert_begin) (Relation rel,
+											 CommandId cid,
+											 int table_am_flags,
+											 int table_insert_flags);
+
+	void (*tuple_insert_v2) (TableInsertState *state,
+							 TupleTableSlot *slot);
+
+	void (*tuple_multi_insert_v2) (TableInsertState *state,
+								   TupleTableSlot *slot);
+
+	void (*tuple_insert_end) (TableInsertState *state);
+
 	/* see table_tuple_delete() for reference about parameters */
 	TM_Result	(*tuple_delete) (Relation rel,
 								 ItemPointer tid,
@@ -1456,6 +1498,71 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
 								  cid, options, bistate);
 }
 
+static inline TableInsertState *
+table_insert_begin(Relation rel, CommandId cid, int table_am_flags,
+				   int table_insert_flags)
+{
+	/* XXX: Really it doesn't have to be an optional callback */
+	if (rel->rd_tableam && rel->rd_tableam->tuple_insert_begin)
+	{
+		return rel->rd_tableam->tuple_insert_begin(rel, cid, table_am_flags,
+												   table_insert_flags);
+	}
+	else
+		ereport(ERROR,
+				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("table_insert_begin access method is not implemented for relation \"%s\"",
+						RelationGetRelationName(rel)));
+}
+
+static inline void
+table_tuple_insert_v2(TableInsertState *state, TupleTableSlot *slot)
+{
+	/* XXX: Really it doesn't have to be an optional callback */
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_insert_begin)
+	{
+		return state->rel->rd_tableam->tuple_insert_v2(state, slot);
+	}
+	else
+		ereport(ERROR,
+				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("table_tuple_insert_v2 access method is not implemented for relation \"%s\"",
+						RelationGetRelationName(state->rel)));
+}
+
+static inline void
+table_multi_insert_v2(TableInsertState *state, TupleTableSlot *slot)
+{
+	/* XXX: Really it doesn't have to be an optional callback */
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_insert_begin)
+	{
+		return state->rel->rd_tableam->tuple_multi_insert_v2(state, slot);
+	}
+	else
+		ereport(ERROR,
+				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("table_multi_insert_v2 access method is not implemented for relation \"%s\"",
+						RelationGetRelationName(state->rel)));
+}
+
+static inline void
+table_insert_end(TableInsertState *state)
+{
+	/* XXX: Really it doesn't have to be an optional callback */
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_insert_begin)
+	{
+		return state->rel->rd_tableam->tuple_insert_end(state);
+	}
+	else
+		ereport(ERROR,
+				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("table_insert_end access method is not implemented for relation \"%s\"",
+						RelationGetRelationName(state->rel)));
+}
+
 /*
  * Delete a tuple.
  *
-- 
2.34.1

