On Mon, Dec 21, 2020 at 1:17 PM Justin Pryzby <[email protected]> wrote: > On Fri, Dec 18, 2020 at 11:54:39AM -0600, Justin Pryzby wrote: > > On Fri, Dec 18, 2020 at 07:39:14AM +0530, Bharath Rupireddy wrote: > > > On Fri, Dec 18, 2020 at 2:14 AM Justin Pryzby <[email protected]> > > > wrote: > > > > Are you thinking that TableInsertState would eventually have additional > > > > attributes which would apply to other tableams, but not to heap ? Is > > > > heap_insert_begin() really specific to heap ? It's allocating and > > > > populating a > > > > structure based on its arguments, but those same arguments would be > > > > passed to > > > > every other AM's insert_begin routine, too. Do you need a more > > > > flexible data > > > > structure, something that would also accomodate extensions? I'm > > > > thinking of > > > > reloptions as a loose analogy. > > > > > > I could not think of other tableam attributes now. But +1 to have that > > > kind of flexible structure for TableInsertState. So, it can have > > > tableam type and attributes within the union for each type. > > > > Right now you have heap_insert_begin(), and I asked if it was really > > heap-specific. Right now, it populates a struct based on a static list of > > arguments, which are what heap uses. > > > > If you were to implement a burp_insert_begin(), how would it differ from > > heap's? With the current API, they'd (have to) be the same, which means > > either > > that it should apply to all AMs (or have a "default" implementation that > > can be > > overridden by an AM), or that this API assumes that other AMs will want to > > do > > exactly what heap does, and fails to allow other AMs to implement > > optimizations > > for bulk inserts as claimed. > > > > I don't think using a "union" solves the problem, since it can only > > accommodate > > core AMs, and not extensions, so I suggested something like reloptions, > > which > > have a "namespace" prefix (and core has toast.*, like ALTER TABLE t SET > > toast.autovacuum_enabled). > > I think you'd want to handle things like: > > - a compressed AM wants to specify a threshold for a tuple's *compressed* > size > (maybe in addition to the uncompressed size); > - a "columnar" AM wants to specify a threshold size for a column, rather > than for each tuple; > > I'm not proposing to handle those specific parameters, but rather pointing out > that your implementation doesn't allow handling AM-specific considerations, > which I think was the goal. > > The TableInsertState structure would need to store those, and then the AM's > multi_insert_v2 routine would need to make use of them. > > It feels a bit like we'd introduce the idea of an "AM option", except that it > wouldn't be user-facing (or maybe some of them would be?). Maybe I've > misunderstood though, so other opinions are welcome.
Attaching a v2 patch for the new table AMs. This patch has following changes: 1) Made the TableInsertState structure generic by having a void pointer for multi insert state and defined the heap specific multi insert state information in heapam.h. This way each AM can have it's own multi insert state structure and dereference the void pointer using that structure inside the respective AM implementations. 2) Earlier in the v1 patch, the bulk insert state allocation/deallocation was moved to AM level, but I see that there's nothing specific in doing so and I think it should be independent of AM. So I'm doing that in table_insert_begin() and table_insert_end(). Because of this, I had to move the BulkInsert function declarations from heapam.h to tableam.h 3) Corrected the typos and tried to adjust indentation of the code. Note that I have not yet made the multi_insert_v2 API optional as suggested earlier. I will think more on this and update. I'm not posting the updated 0002 to 0004 patches, I plan to do so after a couple of reviews happen on the design of the APIs in 0001. Thoughts? With Regards, Bharath Rupireddy. EnterpriseDB: http://www.enterprisedb.com
From cab7baa6f5c0229816e09a887c0468a1ca4edccb Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Thu, 24 Dec 2020 05:18:13 +0530 Subject: [PATCH v2] New Table AMs for Multi and Single Inserts This patch introduces new table access methods for multi and single inserts. Also implements/rearranges the outside code for heap am into these new APIs. Main design goal of these new APIs is to give flexibility to tableam developers in implementing multi insert logic dependent on the underlying storage engine. Currently, for all the underlying storage engines, we follow the same multi insert logic such as when and how to flush the buffered tuples, tuple size calculation, and this logic doesn't take into account the underlying storage engine capabilities. We can also avoid duplicating multi insert code (for existing COPY, and upcoming CTAS, CREATE/REFRESH MAT VIEW and INSERT SELECTs). We can also move bulk insert state allocation and deallocation inside these APIs --- src/backend/access/heap/heapam.c | 206 +++++++++++++++++++++++ src/backend/access/heap/heapam_handler.c | 5 + src/backend/access/table/tableamapi.c | 7 + src/backend/executor/execTuples.c | 83 ++++++++- src/include/access/heapam.h | 53 +++++- src/include/access/tableam.h | 93 ++++++++++ src/include/executor/tuptable.h | 1 + 7 files changed, 442 insertions(+), 6 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index a9583f3103..baa0f3032e 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -66,6 +66,7 @@ #include "utils/datum.h" #include "utils/inval.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/relcache.h" #include "utils/snapmgr.h" #include "utils/spccache.h" @@ -2371,6 +2372,211 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, pgstat_count_heap_insert(relation, ntuples); } +/* + * heap_insert_begin - allocate and initialize TableInsertState + * + * For single inserts: + * 1) Specify is_multi as false, then multi insert state will be NULL. + * + * For multi inserts: + * 1) Specify is_multi as true, then multi insert state will be allocated and + * initialized. + * + * Other input parameters i.e. relation, command id, options are common for + * both single and multi inserts. + */ +TableInsertState* +heap_insert_begin(Relation rel, CommandId cid, int options, bool is_multi) +{ + TableInsertState *state; + + state = palloc0(sizeof(TableInsertState)); + state->rel = rel; + state->cid = cid; + state->options = options; + /* Below parameters are not used for single inserts. */ + state->mistate = NULL; + state->clear_mi_slots = false; + state->flushed = false; + + if (is_multi) + { + HeapMultiInsertState *mistate; + + mistate = palloc0(sizeof(HeapMultiInsertState)); + mistate->slots = + palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + mistate->max_slots = MAX_BUFFERED_TUPLES; + mistate->max_size = MAX_BUFFERED_BYTES; + mistate->cur_slots = 0; + mistate->cur_size = 0; + /* + * Create a temporary memory context so that we can reset once per + * multi insert batch. + */ + mistate->context = AllocSetContextCreate(CurrentMemoryContext, + "heap_multi_insert", + ALLOCSET_DEFAULT_SIZES); + state->mistate = mistate; + state->clear_mi_slots = true; + state->flushed = false; + } + + return state; +} + +/* + * heap_insert_v2 - insert single tuple into a heap + * + * Insert tuple from the slot into table. This is like heap_insert(). The only + * difference is that the parameters are inside table insert state structure. + */ +void +heap_insert_v2(TableInsertState *state, TupleTableSlot *slot) +{ + bool shouldFree = true; + HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + + /* Update the tuple with table oid */ + slot->tts_tableOid = RelationGetRelid(state->rel); + tuple->t_tableOid = slot->tts_tableOid; + + /* Perform the insertion, and copy the resulting ItemPointer */ + heap_insert(state->rel, tuple, state->cid, state->options, state->bistate); + ItemPointerCopy(&tuple->t_self, &slot->tts_tid); + + if (shouldFree) + pfree(tuple); +} + +/* + * heap_multi_insert_v2 - insert multiple tuples into a heap + * + * Compute the size of the tuple, store it into the buffered slots and insert + * the tuples(flush) from the buffered slots one at a time into the table. + * + * Flush can happen: + * 1) either if all the slots are filled up + * 2) or if the total tuple size of the currently buffered slots are >= + * max_size. + */ +void +heap_multi_insert_v2(TableInsertState *state, TupleTableSlot *slot) +{ + TupleTableSlot *batchslot; + HeapMultiInsertState *mistate = (HeapMultiInsertState *)state->mistate; + Size sz; + + Assert(mistate && mistate->slots); + + if (mistate->slots[mistate->cur_slots] == NULL) + mistate->slots[mistate->cur_slots] = + table_slot_create(state->rel, NULL); + + batchslot = mistate->slots[mistate->cur_slots]; + + ExecCopySlot(batchslot, slot); + + /* Reset the flush state if previously set. */ + if (state->flushed) + state->flushed = false; + + /* + * Calculate the tuple size after the original slot is copied, because the + * copied slot type and the tuple size may change. + */ + sz = GetTupleSize(batchslot, mistate->max_size); + + Assert(sz > 0); + + mistate->cur_slots++; + mistate->cur_size += sz; + + if (mistate->cur_slots >= mistate->max_slots || + mistate->cur_size >= mistate->max_size) + heap_multi_insert_flush(state); +} + +/* + * heap_multi_insert_flush - flush the tuples from buffered slots if any + * + * Flush the buffered tuples, indicate the caller that the flushing happened + * and clear the slots if they are not required outside. Reset the parameters. + */ +void +heap_multi_insert_flush(TableInsertState *state) +{ + HeapMultiInsertState *mistate = (HeapMultiInsertState *)state->mistate; + MemoryContext oldcontext; + + Assert(mistate && mistate->slots && mistate->cur_slots >= 0 && + mistate->context); + + if (mistate->cur_slots == 0) + { + state->flushed = false; + return; + } + + oldcontext = MemoryContextSwitchTo(mistate->context); + + heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots, + state->cid, state->options, state->bistate); + + MemoryContextReset(mistate->context); + MemoryContextSwitchTo(oldcontext); + + /* + * Do not clear the slots always. Sometimes callers may want the slots for + * index insertions or after row trigger executions in which case they have + * to clear the tuples before using for the next insert batch. + */ + if (state->clear_mi_slots) + { + int i; + + for (i = 0; i < mistate->cur_slots; i++) + ExecClearTuple(mistate->slots[i]); + } + + mistate->cur_slots = 0; + mistate->cur_size = 0; + state->flushed = true; +} + +/* + * heap_insert_end - clean up the TableInsertState + * + * For multi inserts, ensure to flush all the remaining buffers with + * heap_multi_insert_flush before calling this function. Buffered slots are + * dropped, short-lived memory context is deleted and mistate is freed up. + * + * And finally free up TableInsertState. + */ +void +heap_insert_end(TableInsertState *state) +{ + if (state->mistate) + { + HeapMultiInsertState *mistate = (HeapMultiInsertState *)state->mistate; + int i; + + /* Ensure that the buffers have been flushed before. */ + Assert(mistate->slots && mistate->cur_slots == 0 && + mistate->context); + + for (i = 0; i < mistate->max_slots && mistate->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(mistate->slots[i]); + + MemoryContextDelete(mistate->context); + + pfree(mistate->slots); + pfree(mistate); + } + + pfree(state); +} + /* * simple_heap_insert - insert a tuple * diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 3eea215b85..eb3da12d9c 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2554,6 +2554,11 @@ static const TableAmRoutine heapam_methods = { .tuple_insert_speculative = heapam_tuple_insert_speculative, .tuple_complete_speculative = heapam_tuple_complete_speculative, .multi_insert = heap_multi_insert, + .tuple_insert_begin = heap_insert_begin, + .tuple_insert_v2 = heap_insert_v2, + .multi_insert_v2 = heap_multi_insert_v2, + .multi_insert_flush = heap_multi_insert_flush, + .tuple_insert_end = heap_insert_end, .tuple_delete = heapam_tuple_delete, .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c index 58de0743ba..6bec0659e4 100644 --- a/src/backend/access/table/tableamapi.c +++ b/src/backend/access/table/tableamapi.c @@ -78,6 +78,13 @@ GetTableAmRoutine(Oid amhandler) Assert(routine->tuple_complete_speculative != NULL); Assert(routine->multi_insert != NULL); + + Assert(routine->tuple_insert_begin != NULL); + Assert(routine->tuple_insert_v2 != NULL); + Assert(routine->multi_insert_v2 != NULL); + Assert(routine->multi_insert_flush != NULL); + Assert(routine->tuple_insert_end != NULL); + Assert(routine->tuple_delete != NULL); Assert(routine->tuple_update != NULL); Assert(routine->tuple_lock != NULL); diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c index 4c90ac5236..fa6f494ab6 100644 --- a/src/backend/executor/execTuples.c +++ b/src/backend/executor/execTuples.c @@ -159,7 +159,11 @@ tts_virtual_materialize(TupleTableSlot *slot) if (TTS_SHOULDFREE(slot)) return; - /* compute size of memory required */ + /* + * Compute size of memory required. This size calculation code is also used + * in GetTupleSize(), hence ensure to have the same changes or fixes here + * and also there. + */ for (int natt = 0; natt < desc->natts; natt++) { Form_pg_attribute att = TupleDescAttr(desc, natt); @@ -1239,6 +1243,83 @@ ExecDropSingleTupleTableSlot(TupleTableSlot *slot) pfree(slot); } +/* + * GetTupleSize - Compute the tuple size given a table slot. + * + * For heap tuple, buffer tuple and minimal tuple slot types return the actual + * tuple size that exists. For virtual tuple, the size is calculated as the + * slot does not have the tuple size. If the computed size exceeds the given + * maxsize for the virtual tuple, this function exits, not investing time in + * further unnecessary calculation. + * + * Important Notes: + * 1) Size calculation code for virtual slots is being used from + * tts_virtual_materialize(), hence ensure to have the same changes or fixes + * here and also there. + * 2) Currently, GetTupleSize() handles the existing heap, buffer, minimal and + * virtual slots. Ensure to add related code in case any new slot type is + * introduced. + */ +inline Size +GetTupleSize(TupleTableSlot *slot, Size maxsize) +{ + Size sz = 0; + HeapTuple tuple = NULL; + + if (TTS_IS_HEAPTUPLE(slot)) + tuple = ((HeapTupleTableSlot *) slot)->tuple; + else if(TTS_IS_BUFFERTUPLE(slot)) + tuple = ((BufferHeapTupleTableSlot *) slot)->base.tuple; + else if(TTS_IS_MINIMALTUPLE(slot)) + tuple = ((MinimalTupleTableSlot *) slot)->tuple; + else if(TTS_IS_VIRTUAL(slot)) + { + /* + * Size calculation code being used here is from + * tts_virtual_materialize(), ensure to have the same changes or fixes + * here and also there. + */ + TupleDesc desc = slot->tts_tupleDescriptor; + + for (int natt = 0; natt < desc->natts; natt++) + { + Form_pg_attribute att = TupleDescAttr(desc, natt); + Datum val; + + if (att->attbyval) + sz += att->attlen; + + if (slot->tts_isnull[natt]) + continue; + + val = slot->tts_values[natt]; + + if (att->attlen == -1 && + VARATT_IS_EXTERNAL_EXPANDED(DatumGetPointer(val))) + { + sz = att_align_nominal(sz, att->attalign); + sz += EOH_get_flat_size(DatumGetEOHP(val)); + } + else + { + sz = att_align_nominal(sz, att->attalign); + sz = att_addlength_datum(sz, att->attlen, val); + } + + /* + * We are not interested in proceeding further if the computed size + * crosses maxsize limit that we are looking for. + */ + if (maxsize != 0 && sz >= maxsize) + break; + } + } + + if (tuple != NULL && !TTS_IS_VIRTUAL(slot)) + sz = tuple->t_len; + + return sz; +} /* ---------------------------------------------------------------- * tuple table slot accessor functions diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 54b2eb7378..c981b4758d 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -36,11 +36,26 @@ #define HEAP_INSERT_NO_LOGICAL TABLE_INSERT_NO_LOGICAL #define HEAP_INSERT_SPECULATIVE 0x0010 -typedef struct BulkInsertStateData *BulkInsertState; struct TupleTableSlot; #define MaxLockTupleMode LockTupleExclusive +/* + * No more than this many tuples per single multi insert batch + * + * Caution: Don't make this too big, as we could end up with this many tuples + * stored in multi insert buffer. Increasing this can cause quadratic growth in + * memory requirements during copies into partitioned tables with a large + * number of partitions. + */ +#define MAX_BUFFERED_TUPLES 1000 + +/* + * Flush multi insert buffers if there are >= this many bytes, as counted by + * the size of the tuples buffered. + */ +#define MAX_BUFFERED_BYTES 65535 + /* * Descriptor for heap table scans. */ @@ -93,6 +108,29 @@ typedef enum HEAPTUPLE_DELETE_IN_PROGRESS /* deleting xact is still in progress */ } HTSV_Result; +/* Holds the multi insert state for heap access method. */ +typedef struct HeapMultiInsertState +{ + /* Switch to short-lived memory context before flushing. */ + MemoryContext context; + /* Array of buffered slots. */ + TupleTableSlot **slots; + /* Maximum number of slots that can be buffered. */ + int32 max_slots; + /* Number of slots that are currently buffered. */ + int32 cur_slots; + /* + * Maximum size (in bytes) of all the tuples that a single batch of + * buffered slots can hold. + */ + int64 max_size; + /* + * Total tuple size (in bytes) of the slots that are currently buffered. + * Flush the buffered slots when cur_size >= max_size. + */ + int64 cur_size; +} HeapMultiInsertState; + /* ---------------- * function prototypes for heap access method * @@ -130,15 +168,20 @@ extern bool heap_hot_search_buffer(ItemPointer tid, Relation relation, extern void heap_get_latest_tid(TableScanDesc scan, ItemPointer tid); -extern BulkInsertState GetBulkInsertState(void); -extern void FreeBulkInsertState(BulkInsertState); -extern void ReleaseBulkInsertStatePin(BulkInsertState bistate); - extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, int options, BulkInsertState bistate); extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate); + +extern TableInsertState* heap_insert_begin(Relation rel, CommandId cid, + int options, bool is_multi); +extern void heap_insert_v2(TableInsertState *state, TupleTableSlot *slot); +extern void heap_multi_insert_v2(TableInsertState *state, + TupleTableSlot *slot); +extern void heap_multi_insert_flush(TableInsertState *state); +extern void heap_insert_end(TableInsertState *state); + extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, struct TM_FailureData *tmfd, bool changingPart); diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 387eb34a61..f3205a520d 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -128,6 +128,38 @@ typedef struct TM_FailureData bool traversed; } TM_FailureData; +/* Holds the table insert state. */ +typedef struct TableInsertState +{ + Relation rel; + /* Bulk insert state if requested, otherwise NULL. */ + struct BulkInsertStateData *bistate; + CommandId cid; + int options; + /* Multi insert state if requested, otherwise NULL. */ + void *mistate; + /* + * Valid only for multi inserts that is when mistate is non NULL. + * Whether to clear the buffered slots after each flush? If the relation + * has indexes or after row triggers, the buffered slots are required + * outside multi insert AM, in which case, clean them in the caller using + * ExecClearTuple() outside the multi insert AM. If true, which is default, + * multi insert AM will clear the slots. + * + * It is good to set this flag by looking at whether the table is having + * any indexes or after row triggers at the beginning of multi insert + * operation, precisely after calling begin insert AM. + */ + bool clear_mi_slots; + /* + * Valid only for multi inserts that is when mistate is non NULL. + * Initially false, set to true by multi insert AM whenever it flushes the + * buffered slots. Caller can use this flag to insert into indexes or + * execute after row triggers and so on if any. + */ + bool flushed; +}TableInsertState; + /* "options" flag bits for table_tuple_insert */ /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */ #define TABLE_INSERT_SKIP_FSM 0x0002 @@ -376,6 +408,17 @@ typedef struct TableAmRoutine void (*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots, CommandId cid, int options, struct BulkInsertStateData *bistate); + TableInsertState* (*tuple_insert_begin) (Relation rel, CommandId cid, + int options, bool is_multi); + + void (*tuple_insert_v2) (TableInsertState *state, TupleTableSlot *slot); + + void (*multi_insert_v2) (TableInsertState *state, TupleTableSlot *slot); + + void (*multi_insert_flush) (TableInsertState *state); + + void (*tuple_insert_end) (TableInsertState *state); + /* see table_tuple_delete() for reference about parameters */ TM_Result (*tuple_delete) (Relation rel, ItemPointer tid, @@ -723,6 +766,8 @@ typedef struct TableAmRoutine } TableAmRoutine; +typedef struct BulkInsertStateData *BulkInsertState; + /* ---------------------------------------------------------------------------- * Slot functions. * ---------------------------------------------------------------------------- @@ -741,6 +786,10 @@ extern const TupleTableSlotOps *table_slot_callbacks(Relation rel); */ extern TupleTableSlot *table_slot_create(Relation rel, List **reglist); +/* Bulk insert state functions. */ +extern BulkInsertState GetBulkInsertState(void); +extern void FreeBulkInsertState(BulkInsertState); +extern void ReleaseBulkInsertStatePin(BulkInsertState bistate); /* ---------------------------------------------------------------------------- * Table scan functions. @@ -1237,6 +1286,50 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, cid, options, bistate); } +static inline TableInsertState* +table_insert_begin(Relation rel, CommandId cid, int options, + bool alloc_bistate, bool is_multi) +{ + TableInsertState *state = rel->rd_tableam->tuple_insert_begin(rel, cid, + options, is_multi); + + /* Allocate bulk insert state here, since it's AM independent. */ + if (alloc_bistate) + state->bistate = GetBulkInsertState(); + else + state->bistate = NULL; + + return state; +} + +static inline void +table_insert_v2(TableInsertState *state, TupleTableSlot *slot) +{ + state->rel->rd_tableam->tuple_insert_v2(state, slot); +} + +static inline void +table_multi_insert_v2(TableInsertState *state, TupleTableSlot *slot) +{ + state->rel->rd_tableam->multi_insert_v2(state, slot); +} + +static inline void +table_multi_insert_flush(TableInsertState *state) +{ + state->rel->rd_tableam->multi_insert_flush(state); +} + +static inline void +table_insert_end(TableInsertState *state) +{ + /* Deallocate bulk insert state here, since it's AM independent. */ + if (state->bistate) + FreeBulkInsertState(state->bistate); + + state->rel->rd_tableam->tuple_insert_end(state); +} + /* * Delete a tuple. * diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h index f7df70b5ab..d7c284d8e3 100644 --- a/src/include/executor/tuptable.h +++ b/src/include/executor/tuptable.h @@ -330,6 +330,7 @@ extern void slot_getmissingattrs(TupleTableSlot *slot, int startAttNum, int lastAttNum); extern void slot_getsomeattrs_int(TupleTableSlot *slot, int attnum); +extern Size GetTupleSize(TupleTableSlot *slot, Size maxsize); #ifndef FRONTEND -- 2.25.1
