On 2019-04-03 06:41:49 +1300, David Rowley wrote:
> However, I've ended up not doing it that way as the patch requires
> more than just an array of TupleTableSlots to be stored in the
> ResultRelInfo, it'll pretty much need all of what I have in
> CopyMultiInsertBuffer, which includes line numbers for error
> reporting, a BulkInsertState and a counter to track how many of the
> slots are used.  I had thoughts that we could just tag the whole
> CopyMultiInsertBuffer in ResultRelInfo, but that requires moving it
> somewhere a bit more generic. Another thought would be that we have
> something like "void *extra;" in ResultRelInfo that we can just borrow
> for copy.c.  It may be interesting to try this out to see if it saves
> much in the way of performance.

Hm, we could just forwad declare struct CopyMultiInsertBuffer in a
header, and only define it in copy.c. That doesn't sound insane to me.


> I've got the patch into a working state now and wrote a bunch of
> comments. I've not really done a thorough read back of the code again.
> I normally wait until the light is coming from a different angle
> before doing that, but there does not seem to be much time to wait for
> that in this case, so here's v2.  Performance seems to be about the
> same as what I reported yesterday.

Cool.


> +/*
> + * Multi-Insert handling code for COPY FROM.  Inserts are performed in
> + * batches of up to MAX_BUFFERED_TUPLES.

This should probably be moved to the top of the file.


> + * When COPY FROM is used on a partitioned table, we attempt to maintain
> + * only MAX_PARTITION_BUFFERS buffers at once.  Buffers that are completely
> + * unused in each batch are removed so that we end up not keeping buffers for
> + * partitions that we might not insert into again.
> + */
> +#define MAX_BUFFERED_TUPLES          1000
> +#define MAX_BUFFERED_BYTES           65535
> +#define MAX_PARTITION_BUFFERS        15

> +/*
> + * CopyMultiInsertBuffer
> + *           Stores multi-insert data related to a single relation in 
> CopyFrom.
> + */
> +typedef struct

Please don't create anonymous structs - they can't be forward declared,
and some debugging tools handle them worse than named structs. And it
makes naming CopyMultiInsertBuffer in the comment less necessary.


> +{
> +     Oid                     relid;                  /* Relation ID this 
> insert data is for */
> +     TupleTableSlot **slots;         /* Array of MAX_BUFFERED_TUPLES to store
> +                                                              * tuples */
> +     uint64     *linenos;            /* Line # of tuple in copy stream */

It could make sense to allocate those two together, or even as part of
the CopyMultiInsertBuffer itself, to reduce allocator overhead.


> +     else
> +     {
> +             CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) 
> palloc(sizeof(CopyMultiInsertBuffer));
> +
> +             /* Non-partitioned table.  Just setup the buffer for the table. 
> */
> +             buffer->relid = RelationGetRelid(rri->ri_RelationDesc);
> +             buffer->slots = palloc0(MAX_BUFFERED_TUPLES * 
> sizeof(TupleTableSlot *));
> +             buffer->linenos = palloc(MAX_BUFFERED_TUPLES * sizeof(uint64));
> +             buffer->resultRelInfo = rri;
> +             buffer->bistate = GetBulkInsertState();
> +             buffer->nused = 0;
> +             miinfo->multiInsertBufferTab = NULL;
> +             miinfo->buffer = buffer;
> +             miinfo->nbuffers = 1;
> +     }

Can this be moved into a helper function?


> +     /*
> +      * heap_multi_insert leaks memory, so switch to short-lived memory 
> context
> +      * before calling it.
> +      */

s/heap_multi_insert/table_multi_insert/



> +     /*
> +      * If there are any indexes, update them for all the inserted tuples, 
> and
> +      * run AFTER ROW INSERT triggers.
> +      */
> +     if (resultRelInfo->ri_NumIndices > 0)
> +     {
> +             for (i = 0; i < nBufferedTuples; i++)
> +             {
> +                     List       *recheckIndexes;
> +
> +                     cstate->cur_lineno = buffer->linenos[i];
> +                     recheckIndexes =
> +                             ExecInsertIndexTuples(buffer->slots[i], estate, 
> false, NULL,
> +                                                                       NIL);
> +                     ExecARInsertTriggers(estate, resultRelInfo,
> +                                                              
> buffer->slots[i],
> +                                                              
> recheckIndexes, cstate->transition_capture);
> +                     list_free(recheckIndexes);
> +             }
> +     }
> +
> +     /*
> +      * There's no indexes, but see if we need to run AFTER ROW INSERT 
> triggers
> +      * anyway.
> +      */
> +     else if (resultRelInfo->ri_TrigDesc != NULL &&
> +                      (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
> +                       resultRelInfo->ri_TrigDesc->trig_insert_new_table))
> +     {
> +             for (i = 0; i < nBufferedTuples; i++)
> +             {
> +                     cstate->cur_lineno = buffer->linenos[i];
> +                     ExecARInsertTriggers(estate, resultRelInfo,
> +                                                              
> bufferedSlots[i],
> +                                                              NIL, 
> cstate->transition_capture);
> +             }
> +     }

> +     for (i = 0; i < nBufferedTuples; i++)
> +             ExecClearTuple(bufferedSlots[i]);

I wonder about combining these loops somehow. But it's probably ok.


> +/*
> + * CopyMultiInsertBuffer_Cleanup
> + *           Drop used slots and free member for this buffer.  The buffer
> + *           must be flushed before cleanup.
> + */
> +static inline void
> +CopyMultiInsertBuffer_Cleanup(CopyMultiInsertBuffer *buffer)
> +{
> +     int                     i;
> +
> +     ReleaseBulkInsertStatePin(buffer->bistate);

Shouldn't this FreeBulkInsertState() rather than
ReleaseBulkInsertStatePin()?


> +
> +/*
> + * CopyMultiInsertBuffer_RemoveBuffer
> + *           Remove a buffer from being tracked by miinfo
> + */
> +static inline void
> +CopyMultiInsertBuffer_RemoveBuffer(CopyMultiInsertInfo *miinfo,
> +                                                                
> CopyMultiInsertBuffer *buffer)
> +{
> +     Oid                     relid = buffer->relid;
> +
> +     CopyMultiInsertBuffer_Cleanup(buffer);
> +
> +     hash_search(miinfo->multiInsertBufferTab, (void *) &relid, HASH_REMOVE,
> +                             NULL);
> +     miinfo->nbuffers--;

Aren't we leaking the CopyMultiInsertBuffer itself here?

> +}
> +
> +/*
> + * CopyMultiInsertInfo_Flush
> + *           Write out all stored tuples in all buffers out to the tables.
> + *
> + * To save us from ending up with buffers for 1000s of partitions we remove
> + * buffers belonging to partitions that we've seen no tuples for in this 
> batch
> + */
> +static inline void
> +CopyMultiInsertInfo_Flush(CopyMultiInsertInfo *miinfo, CopyState cstate,
> +                                               EState *estate, CommandId 
> mycid, int ti_options)
> +{
> +     CopyMultiInsertBuffer *buffer;
> +
> +     /*
> +      * If just storing buffers for a non-partitioned table, then just flush
> +      * that buffer.
> +      */
> +     if (miinfo->multiInsertBufferTab == NULL)
> +     {
> +             buffer = miinfo->buffer;
> +
> +             CopyMultiInsertInfo_FlushSingleBuffer(buffer, cstate, estate, 
> mycid,
> +                                                                             
>           ti_options);
> +     }
> +     else
> +     {
> +             HASH_SEQ_STATUS status;
> +
> +             /*
> +              * Otherwise make a pass over the hash table and flush all 
> buffers
> +              * that have any tuples stored in them.
> +              */
> +             hash_seq_init(&status, miinfo->multiInsertBufferTab);
> +
> +             while ((buffer = (CopyMultiInsertBuffer *) 
> hash_seq_search(&status)) != NULL)
> +             {
> +                     if (buffer->nused > 0)
> +                     {
> +                             /* Flush the buffer if it was used */
> +                             CopyMultiInsertInfo_FlushSingleBuffer(buffer, 
> cstate, estate,
> +                                                                             
>                           mycid, ti_options);
> +                     }
> +                     else
> +                     {
> +                             /*
> +                              * Otherwise just remove it.  If we saw no 
> tuples for it this
> +                              * batch, then likely its best to make way for 
> buffers for
> +                              * other partitions.
> +                              */
> +                             CopyMultiInsertBuffer_RemoveBuffer(miinfo, 
> buffer);
> +                     }
> +             }
> +     }
> +
> +     miinfo->bufferedTuples = 0;
> +     miinfo->bufferedBytes = 0;
> +}
> +
> +/*
> + * CopyMultiInsertInfo_Cleanup
> + *           Cleanup allocated buffers and free memory
> + */
> +static inline void
> +CopyMultiInsertInfo_Cleanup(CopyMultiInsertInfo *miinfo)
> +{
> +     if (miinfo->multiInsertBufferTab == NULL)
> +             CopyMultiInsertBuffer_Cleanup(miinfo->buffer);
> +     else
> +     {
> +             HASH_SEQ_STATUS status;
> +             CopyMultiInsertBuffer *buffer;
> +
> +             hash_seq_init(&status, miinfo->multiInsertBufferTab);
> +
> +             while ((buffer = (CopyMultiInsertBuffer *) 
> hash_seq_search(&status)) != NULL)
> +             {
> +                     Assert(buffer->nused == 0);
> +                     CopyMultiInsertBuffer_Cleanup(buffer);
> +             }
> +
> +             hash_destroy(miinfo->multiInsertBufferTab);
> +     }
> +}
> +
> +/*
> + * CopyMultiInsertInfo_NextFreeSlot
> + *           Get the next TupleTableSlot that the next tuple should be 
> stored in.
> + *
> + * Callers must ensure that the buffer is not full.
> + */
> +static inline TupleTableSlot *
> +CopyMultiInsertInfo_NextFreeSlot(CopyMultiInsertInfo *miinfo,
> +                                                              ResultRelInfo 
> *rri)
> +{
> +     CopyMultiInsertBuffer *buffer = miinfo->buffer;
> +     int                     nused = buffer->nused;
> +
> +     Assert(nused < MAX_BUFFERED_TUPLES);
> +
> +     if (buffer->slots[nused] == NULL)
> +             buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, 
> NULL);
> +     return buffer->slots[nused];
> +}
> +
> +/*
> + * CopyMultiInsertInfo_Store
> + *           Consume the previously reserved TupleTableSlot that was 
> reserved by
> + *           CopyMultiInsertInfo_NextFreeSlot.
> + */
> +static inline void
> +CopyMultiInsertInfo_Store(CopyMultiInsertInfo *miinfo, TupleTableSlot *slot,
> +                                               int tuplen, uint64 lineno)
> +{
> +     CopyMultiInsertBuffer *buffer = miinfo->buffer;
> +
> +     Assert(slot == buffer->slots[buffer->nused]);
> +
> +     /* Store the line number so we can properly report any errors later */
> +     buffer->linenos[buffer->nused] = lineno;
> +
> +     /* Record this slot as being used */
> +     buffer->nused++;
> +
> +     /* Update how many tuples are stored and their size */
> +     miinfo->bufferedTuples++;
> +     miinfo->bufferedBytes += tuplen;
> +}
> +
>  /*
>   * Copy FROM file to relation.
>   */
>  uint64
>  CopyFrom(CopyState cstate)
>  {
> -     HeapTuple       tuple;
> -     TupleDesc       tupDesc;
> -     Datum      *values;
> -     bool       *nulls;
>       ResultRelInfo *resultRelInfo;
>       ResultRelInfo *target_resultRelInfo;
>       ResultRelInfo *prevResultRelInfo = NULL;
>       EState     *estate = CreateExecutorState(); /* for ExecConstraints() */
>       ModifyTableState *mtstate;
>       ExprContext *econtext;
> -     TupleTableSlot *myslot;
> +     TupleTableSlot *singleslot = NULL;
>       MemoryContext oldcontext = CurrentMemoryContext;
> -     MemoryContext batchcontext;
>  
>       PartitionTupleRouting *proute = NULL;
>       ErrorContextCallback errcallback;
>       CommandId       mycid = GetCurrentCommandId(true);
>       int                     ti_options = 0; /* start with default 
> table_insert options */
> -     BulkInsertState bistate;
> +     BulkInsertState bistate = NULL;
>       CopyInsertMethod insertMethod;
> +     CopyMultiInsertInfo multiInsertInfo;
>       uint64          processed = 0;
> -     int                     nBufferedTuples = 0;
>       bool            has_before_insert_row_trig;
>       bool            has_instead_insert_row_trig;
>       bool            leafpart_use_multi_insert = false;
>  
> -#define MAX_BUFFERED_TUPLES 1000
> -#define RECHECK_MULTI_INSERT_THRESHOLD 1000
> -     HeapTuple  *bufferedTuples = NULL;      /* initialize to silence 
> warning */
> -     Size            bufferedTuplesSize = 0;
> -     uint64          firstBufferedLineNo = 0;
> -     uint64          lastPartitionSampleLineNo = 0;
> -     uint64          nPartitionChanges = 0;
> -     double          avgTuplesPerPartChange = 0;
> -
>       Assert(cstate->rel);
>  
> +     memset(&multiInsertInfo, 0, sizeof(CopyMultiInsertInfo));
> +
>       /*
>        * The target must be a plain, foreign, or partitioned relation, or have
>        * an INSTEAD OF INSERT row trigger.  (Currently, such triggers are only
> @@ -2382,8 +2767,6 @@ CopyFrom(CopyState cstate)
>                                                       
> RelationGetRelationName(cstate->rel))));
>       }
>  
> -     tupDesc = RelationGetDescr(cstate->rel);
> -
>       /*----------
>        * Check to see if we can avoid writing WAL
>        *
> @@ -2467,8 +2850,8 @@ CopyFrom(CopyState cstate)
>               if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
>               {
>                       ereport(ERROR,
> -                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
> -                                     errmsg("cannot perform FREEZE on a 
> partitioned table")));
> +                                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
> +                                      errmsg("cannot perform FREEZE on a 
> partitioned table")));
>               }
>  
>               /*
> @@ -2518,10 +2901,6 @@ CopyFrom(CopyState cstate)
>  
>       ExecInitRangeTable(estate, cstate->range_table);
>  
> -     /* Set up a tuple slot too */
> -     myslot = ExecInitExtraTupleSlot(estate, tupDesc,
> -                                                                     
> &TTSOpsHeapTuple);
> -
>       /*
>        * Set up a ModifyTableState so we can let FDW(s) init themselves for
>        * foreign-table result relation(s).
> @@ -2565,10 +2944,11 @@ CopyFrom(CopyState cstate)
>                                                                               
>                 &mtstate->ps);
>  
>       /*
> -      * It's more efficient to prepare a bunch of tuples for insertion, and
> -      * insert them in one heap_multi_insert() call, than call heap_insert()
> -      * separately for every tuple. However, there are a number of reasons 
> why
> -      * we might not be able to do this.  These are explained below.
> +      * It's generally more efficient to prepare a bunch of tuples for
> +      * insertion, and insert them in one table_multi_insert() call, than 
> call
> +      * table_insert() separately for every tuple. However, there are a 
> number
> +      * of reasons why we might not be able to do this.  These are explained
> +      * below.
>        */
>       if (resultRelInfo->ri_TrigDesc != NULL &&
>               (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
> @@ -2589,8 +2969,8 @@ CopyFrom(CopyState cstate)
>                * For partitioned tables we can't support multi-inserts when 
> there
>                * are any statement level insert triggers. It might be 
> possible to
>                * allow partitioned tables with such triggers in the future, 
> but for
> -              * now, CopyFromInsertBatch expects that any before row insert 
> and
> -              * statement level insert triggers are on the same relation.
> +              * now, CopyMultiInsertInfo_Flush expects that any before row 
> insert
> +              * and statement level insert triggers are on the same relation.
>                */
>               insertMethod = CIM_SINGLE;
>       }
> @@ -2622,8 +3002,7 @@ CopyFrom(CopyState cstate)
>       {
>               /*
>                * For partitioned tables, we may still be able to perform bulk
> -              * inserts for sets of consecutive tuples which belong to the 
> same
> -              * partition.  However, the possibility of this depends on 
> which types
> +              * inserts.  However, the possibility of this depends on which 
> types
>                * of triggers exist on the partition.  We must disable bulk 
> inserts
>                * if the partition is a foreign table or it has any before row 
> insert
>                * or insert instead triggers (same as we checked above for the 
> parent
> @@ -2632,18 +3011,27 @@ CopyFrom(CopyState cstate)
>                * have the intermediate insert method of CIM_MULTI_CONDITIONAL 
> to
>                * flag that we must later determine if we can use bulk-inserts 
> for
>                * the partition being inserted into.
> -              *
> -              * Normally, when performing bulk inserts we just flush the 
> insert
> -              * buffer whenever it becomes full, but for the partitioned 
> table
> -              * case, we flush it whenever the current tuple does not belong 
> to the
> -              * same partition as the previous tuple.
>                */
>               if (proute)
>                       insertMethod = CIM_MULTI_CONDITIONAL;
>               else
>                       insertMethod = CIM_MULTI;
>  
> -             bufferedTuples = palloc(MAX_BUFFERED_TUPLES * 
> sizeof(HeapTuple));
> +             CopyMultiInsertInfo_Init(&multiInsertInfo, resultRelInfo,
> +                                                              proute != 
> NULL);
> +     }
> +
> +     /*
> +      * If not using batch mode (which allocates slots as needed) set up a
> +      * tuple slot too. When inserting into a partitioned table, we also need
> +      * one, even if we might batch insert, to read the tuple in the root
> +      * partition's form.
> +      */
> +     if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
> +     {
> +             singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
> +                                                                        
> &estate->es_tupleTable);
> +             bistate = GetBulkInsertState();
>       }
>  
>       has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
> @@ -2660,10 +3048,6 @@ CopyFrom(CopyState cstate)
>        */
>       ExecBSInsertTriggers(estate, resultRelInfo);
>  
> -     values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
> -     nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
> -
> -     bistate = GetBulkInsertState();
>       econtext = GetPerTupleExprContext(estate);
>  
>       /* Set up callback to identify error line number */
> @@ -2672,17 +3056,9 @@ CopyFrom(CopyState cstate)
>       errcallback.previous = error_context_stack;
>       error_context_stack = &errcallback;
>  
> -     /*
> -      * Set up memory context for batches. For cases without batching we 
> could
> -      * use the per-tuple context, but it's simpler to just use it every 
> time.
> -      */
> -     batchcontext = AllocSetContextCreate(CurrentMemoryContext,
> -                                                                             
>  "batch context",
> -                                                                             
>  ALLOCSET_DEFAULT_SIZES);
> -
>       for (;;)
>       {
> -             TupleTableSlot *slot;
> +             TupleTableSlot *myslot;
>               bool            skip_tuple;
>  
>               CHECK_FOR_INTERRUPTS();
> @@ -2693,20 +3069,33 @@ CopyFrom(CopyState cstate)
>                */
>               ResetPerTupleExprContext(estate);
>  
> +             if (insertMethod == CIM_SINGLE || proute)
> +             {
> +                     myslot = singleslot;
> +                     Assert(myslot != NULL);
> +             }
> +             else
> +             {
> +                     Assert(resultRelInfo == target_resultRelInfo);
> +                     Assert(insertMethod == CIM_MULTI);
> +
> +                     myslot = 
> CopyMultiInsertInfo_NextFreeSlot(&multiInsertInfo,
> +                                                                             
>                           resultRelInfo);
> +             }
> +
>               /*
>                * Switch to per-tuple context before calling NextCopyFrom, 
> which does
>                * evaluate default expressions etc. and requires per-tuple 
> context.
>                */
>               MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
>  
> -             if (!NextCopyFrom(cstate, econtext, values, nulls))
> -                     break;
> +             ExecClearTuple(myslot);
>  
> -             /* Switch into per-batch memory context before forming the 
> tuple. */
> -             MemoryContextSwitchTo(batchcontext);
> +             /* Directly store the values/nulls array in the slot */
> +             if (!NextCopyFrom(cstate, econtext, myslot->tts_values, 
> myslot->tts_isnull))
> +                     break;
>  
> -             /* And now we can form the input tuple. */
> -             tuple = heap_form_tuple(tupDesc, values, nulls);
> +             ExecStoreVirtualTuple(myslot);
>  
>               /*
>                * Constraints might reference the tableoid column, so 
> (re-)initialize
> @@ -2717,18 +3106,15 @@ CopyFrom(CopyState cstate)
>               /* Triggers and stuff need to be invoked in query context. */
>               MemoryContextSwitchTo(oldcontext);
>  
> -             /* Place tuple in tuple slot --- but slot shouldn't free it */
> -             slot = myslot;
> -             ExecStoreHeapTuple(tuple, slot, false);
> -
>               if (cstate->whereClause)
>               {
>                       econtext->ecxt_scantuple = myslot;
> +                     /* Skip items that don't match the COPY's WHERE clause 
> */
>                       if (!ExecQual(cstate->qualexpr, econtext))
>                               continue;
>               }
>  
> -             /* Determine the partition to heap_insert the tuple into */
> +             /* Determine the partition to table_insert the tuple into */
>               if (proute)
>               {
>                       TupleConversionMap *map;
> @@ -2739,80 +3125,10 @@ CopyFrom(CopyState cstate)
>                        * if the found partition is not suitable for INSERTs.
>                        */
>                       resultRelInfo = ExecFindPartition(mtstate, 
> target_resultRelInfo,
> -                                                                             
>           proute, slot, estate);
> +                                                                             
>           proute, myslot, estate);
>  
>                       if (prevResultRelInfo != resultRelInfo)
>                       {
> -                             /* Check if we can multi-insert into this 
> partition */
> -                             if (insertMethod == CIM_MULTI_CONDITIONAL)
> -                             {
> -                                     /*
> -                                      * When performing bulk-inserts into 
> partitioned tables we
> -                                      * must insert the tuples seen so far 
> to the heap whenever
> -                                      * the partition changes.
> -                                      */
> -                                     if (nBufferedTuples > 0)
> -                                     {
> -                                             MemoryContext   oldcontext;
> -
> -                                             CopyFromInsertBatch(cstate, 
> estate, mycid, ti_options,
> -                                                                             
>         prevResultRelInfo, myslot, bistate,
> -                                                                             
>         nBufferedTuples, bufferedTuples,
> -                                                                             
>         firstBufferedLineNo);
> -                                             nBufferedTuples = 0;
> -                                             bufferedTuplesSize = 0;
> -
> -                                             /*
> -                                              * The tuple is already 
> allocated in the batch context, which
> -                                              * we want to reset.  So to 
> keep the tuple we copy it into the
> -                                              * short-lived (per-tuple) 
> context, reset the batch context
> -                                              * and then copy it back into 
> the per-batch one.
> -                                              */
> -                                             oldcontext = 
> MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
> -                                             tuple = heap_copytuple(tuple);
> -                                             
> MemoryContextSwitchTo(oldcontext);
> -
> -                                             /* cleanup the old batch */
> -                                             
> MemoryContextReset(batchcontext);
> -
> -                                             /* copy the tuple back to the 
> per-batch context */
> -                                             oldcontext = 
> MemoryContextSwitchTo(batchcontext);
> -                                             tuple = heap_copytuple(tuple);
> -                                             
> MemoryContextSwitchTo(oldcontext);
> -
> -                                             /*
> -                                              * Also push the tuple copy to 
> the slot (resetting the context
> -                                              * invalidated the slot 
> contents).
> -                                              */
> -                                             ExecStoreHeapTuple(tuple, slot, 
> false);
> -                                     }
> -
> -                                     nPartitionChanges++;
> -
> -                                     /*
> -                                      * Here we adaptively enable 
> multi-inserts based on the
> -                                      * average number of tuples from recent 
> multi-insert
> -                                      * batches.  We recalculate the average 
> every
> -                                      * RECHECK_MULTI_INSERT_THRESHOLD 
> tuples instead of taking
> -                                      * the average over the whole copy.  
> This allows us to
> -                                      * enable multi-inserts when we get 
> periods in the copy
> -                                      * stream that have tuples commonly 
> belonging to the same
> -                                      * partition, and disable when the 
> partition is changing
> -                                      * too often.
> -                                      */
> -                                     if (unlikely(lastPartitionSampleLineNo 
> <= (cstate->cur_lineno -
> -                                                                             
>                                            RECHECK_MULTI_INSERT_THRESHOLD)
> -                                                              && 
> cstate->cur_lineno >= RECHECK_MULTI_INSERT_THRESHOLD))
> -                                     {
> -                                             avgTuplesPerPartChange =
> -                                                     (cstate->cur_lineno - 
> lastPartitionSampleLineNo) /
> -                                                     (double) 
> nPartitionChanges;
> -
> -                                             lastPartitionSampleLineNo = 
> cstate->cur_lineno;
> -                                             nPartitionChanges = 0;
> -                                     }
> -                             }
> -
>                               /* Determine which triggers exist on this 
> partition */
>                               has_before_insert_row_trig = 
> (resultRelInfo->ri_TrigDesc &&
>                                                                               
>           resultRelInfo->ri_TrigDesc->trig_insert_before_row);
> @@ -2821,22 +3137,19 @@ CopyFrom(CopyState cstate)
>                                                                               
>            resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
>  
>                               /*
> -                              * Tests have shown that using multi-inserts 
> when the
> -                              * partition changes on every tuple slightly 
> decreases the
> -                              * performance, however, there are benefits 
> even when only
> -                              * some batches have just 2 tuples, so let's 
> enable
> -                              * multi-inserts even when the average is quite 
> low.
> +                              * Enable multi-inserts when the partition has 
> BEFORE/INSTEAD
> +                              * OF triggers, or if the partition is a 
> foreign partition.
>                                */
>                               leafpart_use_multi_insert = insertMethod == 
> CIM_MULTI_CONDITIONAL &&
> -                                     avgTuplesPerPartChange >= 1.3 &&
>                                       !has_before_insert_row_trig &&
>                                       !has_instead_insert_row_trig &&
>                                       resultRelInfo->ri_FdwRoutine == NULL;
>  
> -                             /*
> -                              * We'd better make the bulk insert mechanism 
> gets a new
> -                              * buffer when the partition being inserted 
> into changes.
> -                              */
> +                             /* Set the multi-insert buffer to use for this 
> partition. */
> +                             if (leafpart_use_multi_insert)
> +                                     
> CopyMultiInsertInfo_SetCurrentBuffer(&multiInsertInfo,
> +                                                                             
>                                  resultRelInfo);
> +
>                               ReleaseBulkInsertStatePin(bistate);
>                               prevResultRelInfo = resultRelInfo;
>                       }
> @@ -2879,26 +3192,48 @@ CopyFrom(CopyState cstate)
>                        * rowtype.
>                        */
>                       map = 
> resultRelInfo->ri_PartitionInfo->pi_RootToPartitionMap;
> -                     if (map != NULL)
> +                     if (insertMethod == CIM_SINGLE || 
> !leafpart_use_multi_insert)
>                       {
> -                             TupleTableSlot *new_slot;
> -                             MemoryContext oldcontext;
> -
> -                             new_slot = 
> resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
> -                             Assert(new_slot != NULL);
> -
> -                             slot = execute_attr_map_slot(map->attrMap, 
> slot, new_slot);
> +                             /* non batch insert */
> +                             if (map != NULL)
> +                             {
> +                                     TupleTableSlot *new_slot;
>  
> +                                     new_slot = 
> resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
> +                                     myslot = 
> execute_attr_map_slot(map->attrMap, myslot, new_slot);
> +                             }
> +                     }
> +                     else
> +                     {
>                               /*
> -                              * Get the tuple in the per-batch context, so 
> that it will be
> -                              * freed after each batch insert.
> +                              * Batch insert into partitioned table.
>                                */
> -                             oldcontext = 
> MemoryContextSwitchTo(batchcontext);
> -                             tuple = ExecCopySlotHeapTuple(slot);
> -                             MemoryContextSwitchTo(oldcontext);
> +                             TupleTableSlot *nextslot;
> +
> +                             /* no other path available for partitioned 
> table */
> +                             Assert(insertMethod == CIM_MULTI_CONDITIONAL);
> +
> +                             nextslot = 
> CopyMultiInsertInfo_NextFreeSlot(&multiInsertInfo,
> +                                                                             
>                                         resultRelInfo);
> +
> +                             if (map != NULL)
> +                                     myslot = 
> execute_attr_map_slot(map->attrMap, myslot, nextslot);
> +                             else
> +                             {
> +                                     /*
> +                                      * This looks more expensive than it is 
> (Believe me, I
> +                                      * optimized it away. Twice). The input 
> is in virtual
> +                                      * form, and we'll materialize the slot 
> below - for most
> +                                      * slot types the copy performs the 
> work materialization
> +                                      * would later require anyway.
> +                                      */
> +                                     ExecCopySlot(nextslot, myslot);
> +                                     myslot = nextslot;
> +                             }
>                       }
>  
> -                     slot->tts_tableOid = 
> RelationGetRelid(resultRelInfo->ri_RelationDesc);
> +                     /* ensure that triggers etc see the right relation  */
> +                     myslot->tts_tableOid = 
> RelationGetRelid(resultRelInfo->ri_RelationDesc);
>               }
>  
>               skip_tuple = false;
> @@ -2906,7 +3241,7 @@ CopyFrom(CopyState cstate)
>               /* BEFORE ROW INSERT Triggers */
>               if (has_before_insert_row_trig)
>               {
> -                     if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
> +                     if (!ExecBRInsertTriggers(estate, resultRelInfo, 
> myslot))
>                               skip_tuple = true;      /* "do nothing" */
>               }
>  
> @@ -2919,7 +3254,7 @@ CopyFrom(CopyState cstate)
>                        */
>                       if (has_instead_insert_row_trig)
>                       {
> -                             ExecIRInsertTriggers(estate, resultRelInfo, 
> slot);
> +                             ExecIRInsertTriggers(estate, resultRelInfo, 
> myslot);
>                       }
>                       else
>                       {
> @@ -2931,12 +3266,7 @@ CopyFrom(CopyState cstate)
>                                */
>                               if 
> (resultRelInfo->ri_RelationDesc->rd_att->constr &&
>                                       
> resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
> -                             {
> -                                     ExecComputeStoredGenerated(estate, 
> slot);
> -                                     MemoryContextSwitchTo(batchcontext);
> -                                     tuple = ExecCopySlotHeapTuple(slot);
> -                                     MemoryContextSwitchTo(oldcontext);
> -                             }
> +                                     ExecComputeStoredGenerated(estate, 
> myslot);
>  
>                               /*
>                                * If the target is a plain table, check the 
> constraints of
> @@ -2944,7 +3274,7 @@ CopyFrom(CopyState cstate)
>                                */
>                               if (resultRelInfo->ri_FdwRoutine == NULL &&
>                                       
> resultRelInfo->ri_RelationDesc->rd_att->constr)
> -                                     ExecConstraints(resultRelInfo, slot, 
> estate);
> +                                     ExecConstraints(resultRelInfo, myslot, 
> estate);
>  
>                               /*
>                                * Also check the tuple against the partition 
> constraint, if
> @@ -2954,7 +3284,7 @@ CopyFrom(CopyState cstate)
>                                */
>                               if (resultRelInfo->ri_PartitionCheck &&
>                                       (proute == NULL || 
> has_before_insert_row_trig))
> -                                     ExecPartitionCheck(resultRelInfo, slot, 
> estate, true);
> +                                     ExecPartitionCheck(resultRelInfo, 
> myslot, estate, true);
>  
>                               /*
>                                * Perform multi-inserts when enabled, or when 
> loading a
> @@ -2963,31 +3293,21 @@ CopyFrom(CopyState cstate)
>                                */
>                               if (insertMethod == CIM_MULTI || 
> leafpart_use_multi_insert)
>                               {
> -                                     /* Add this tuple to the tuple buffer */
> -                                     if (nBufferedTuples == 0)
> -                                             firstBufferedLineNo = 
> cstate->cur_lineno;
> -                                     bufferedTuples[nBufferedTuples++] = 
> tuple;
> -                                     bufferedTuplesSize += tuple->t_len;
> -
>                                       /*
> -                                      * If the buffer filled up, flush it.  
> Also flush if the
> -                                      * total size of all the tuples in the 
> buffer becomes
> -                                      * large, to avoid using large amounts 
> of memory for the
> -                                      * buffer when the tuples are 
> exceptionally wide.
> +                                      * The slot previously might point into 
> the per-tuple
> +                                      * context. For batching it needs to be 
> longer lived.
>                                        */
> -                                     if (nBufferedTuples == 
> MAX_BUFFERED_TUPLES ||
> -                                             bufferedTuplesSize > 65535)
> -                                     {
> -                                             CopyFromInsertBatch(cstate, 
> estate, mycid, ti_options,
> -                                                                             
>         resultRelInfo, myslot, bistate,
> -                                                                             
>         nBufferedTuples, bufferedTuples,
> -                                                                             
>         firstBufferedLineNo);
> -                                             nBufferedTuples = 0;
> -                                             bufferedTuplesSize = 0;
> -
> -                                             /* free memory occupied by 
> tuples from the batch */
> -                                             
> MemoryContextReset(batchcontext);
> -                                     }
> +                                     ExecMaterializeSlot(myslot);
> +
> +                                     /* Add this tuple to the tuple buffer */
> +                                     
> CopyMultiInsertInfo_Store(&multiInsertInfo, myslot,
> +                                                                             
>           cstate->line_buf.len,
> +                                                                             
>           cstate->cur_lineno);
> +
> +                                     /* If the buffer filled up, flush it. */
> +                                     if 
> (CopyMultiInsertInfo_IsFull(&multiInsertInfo))
> +                                             
> CopyMultiInsertInfo_Flush(&multiInsertInfo, cstate,
> +                                                                             
>                   estate, mycid, ti_options);
>                               }
>                               else
>                               {
> @@ -2996,12 +3316,12 @@ CopyFrom(CopyState cstate)
>                                       /* OK, store the tuple */
>                                       if (resultRelInfo->ri_FdwRoutine != 
> NULL)
>                                       {
> -                                             slot = 
> resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
> -                                                                             
>                                                                            
> resultRelInfo,
> -                                                                             
>                                                                            
> slot,
> -                                                                             
>                                                                            
> NULL);
> +                                             myslot = 
> resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
> +                                                                             
>                                                                               
>    resultRelInfo,
> +                                                                             
>                                                                               
>    myslot,
> +                                                                             
>                                                                               
>    NULL);
>  
> -                                             if (slot == NULL)       /* "do 
> nothing" */
> +                                             if (myslot == NULL) /* "do 
> nothing" */
>                                                       continue;       /* next 
> tuple please */
>  
>                                               /*
> @@ -3009,27 +3329,26 @@ CopyFrom(CopyState cstate)
>                                                * column, so (re-)initialize 
> tts_tableOid before
>                                                * evaluating them.
>                                                */
> -                                             slot->tts_tableOid = 
> RelationGetRelid(resultRelInfo->ri_RelationDesc);
> +                                             myslot->tts_tableOid = 
> RelationGetRelid(resultRelInfo->ri_RelationDesc);
>                                       }
>                                       else
>                                       {
> -                                             tuple = 
> ExecFetchSlotHeapTuple(slot, true, NULL);
> -                                             
> heap_insert(resultRelInfo->ri_RelationDesc, tuple,
> -                                                                     mycid, 
> ti_options, bistate);
> -                                             ItemPointerCopy(&tuple->t_self, 
> &slot->tts_tid);
> -                                             slot->tts_tableOid = 
> RelationGetRelid(resultRelInfo->ri_RelationDesc);
> +                                             /* OK, store the tuple and 
> create index entries for it */
> +                                             
> table_insert(resultRelInfo->ri_RelationDesc, myslot,
> +                                                                      mycid, 
> ti_options, bistate);
>                                       }
>  
> +
>                                       /* And create index entries for it */
>                                       if (resultRelInfo->ri_NumIndices > 0)
> -                                             recheckIndexes = 
> ExecInsertIndexTuples(slot,
> +                                             recheckIndexes = 
> ExecInsertIndexTuples(myslot,
>                                                                               
>                                            estate,
>                                                                               
>                                            false,
>                                                                               
>                                            NULL,
>                                                                               
>                                            NIL);
>  
>                                       /* AFTER ROW INSERT Triggers */
> -                                     ExecARInsertTriggers(estate, 
> resultRelInfo, slot,
> +                                     ExecARInsertTriggers(estate, 
> resultRelInfo, myslot,
>                                                                               
>  recheckIndexes, cstate->transition_capture);
>  
>                                       list_free(recheckIndexes);
> @@ -3045,32 +3364,25 @@ CopyFrom(CopyState cstate)
>               }
>       }
>  
> -     /* Flush any remaining buffered tuples */
> -     if (nBufferedTuples > 0)
> +     if (insertMethod != CIM_SINGLE)
>       {
> -             if (insertMethod == CIM_MULTI_CONDITIONAL)
> -             {
> -                     CopyFromInsertBatch(cstate, estate, mycid, ti_options,
> -                                                             
> prevResultRelInfo, myslot, bistate,
> -                                                             
> nBufferedTuples, bufferedTuples,
> -                                                             
> firstBufferedLineNo);
> -             }
> -             else
> -                     CopyFromInsertBatch(cstate, estate, mycid, ti_options,
> -                                                             resultRelInfo, 
> myslot, bistate,
> -                                                             
> nBufferedTuples, bufferedTuples,
> -                                                             
> firstBufferedLineNo);
> +             /* Flush any remaining buffered tuples */
> +             if (!CopyMultiInsertInfo_IsEmpty(&multiInsertInfo))
> +                     CopyMultiInsertInfo_Flush(&multiInsertInfo, cstate, 
> estate, mycid,
> +                                                                       
> ti_options);
> +
> +             /* Tear down the multi-insert data */
> +             CopyMultiInsertInfo_Cleanup(&multiInsertInfo);
>       }
>  
>       /* Done, clean up */
>       error_context_stack = errcallback.previous;
>  
> -     FreeBulkInsertState(bistate);
> +     if (bistate != NULL)
> +             ReleaseBulkInsertStatePin(bistate);
>  
>       MemoryContextSwitchTo(oldcontext);
>  
> -     MemoryContextDelete(batchcontext);
> -
>       /*
>        * In the old protocol, tell pqcomm that we can process normal protocol
>        * messages again.
> @@ -3084,9 +3396,6 @@ CopyFrom(CopyState cstate)
>       /* Handle queued AFTER triggers */
>       AfterTriggerEndQuery(estate);
>  
> -     pfree(values);
> -     pfree(nulls);
> -
>       ExecResetTupleTable(estate->es_tupleTable, false);
>  
>       /* Allow the FDW to shut down */
> @@ -3111,88 +3420,6 @@ CopyFrom(CopyState cstate)
>       return processed;
>  }
>  
> -/*
> - * A subroutine of CopyFrom, to write the current batch of buffered heap
> - * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
> - * triggers.
> - */
> -static void
> -CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
> -                                     int ti_options, ResultRelInfo 
> *resultRelInfo,
> -                                     TupleTableSlot *myslot, BulkInsertState 
> bistate,
> -                                     int nBufferedTuples, HeapTuple 
> *bufferedTuples,
> -                                     uint64 firstBufferedLineNo)
> -{
> -     MemoryContext oldcontext;
> -     int                     i;
> -     uint64          save_cur_lineno;
> -     bool            line_buf_valid = cstate->line_buf_valid;
> -
> -     /*
> -      * Print error context information correctly, if one of the operations
> -      * below fail.
> -      */
> -     cstate->line_buf_valid = false;
> -     save_cur_lineno = cstate->cur_lineno;
> -
> -     /*
> -      * heap_multi_insert leaks memory, so switch to short-lived memory 
> context
> -      * before calling it.
> -      */
> -     oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
> -     heap_multi_insert(resultRelInfo->ri_RelationDesc,
> -                                       bufferedTuples,
> -                                       nBufferedTuples,
> -                                       mycid,
> -                                       ti_options,
> -                                       bistate);
> -     MemoryContextSwitchTo(oldcontext);
> -
> -     /*
> -      * If there are any indexes, update them for all the inserted tuples, 
> and
> -      * run AFTER ROW INSERT triggers.
> -      */
> -     if (resultRelInfo->ri_NumIndices > 0)
> -     {
> -             for (i = 0; i < nBufferedTuples; i++)
> -             {
> -                     List       *recheckIndexes;
> -
> -                     cstate->cur_lineno = firstBufferedLineNo + i;
> -                     ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
> -                     recheckIndexes =
> -                             ExecInsertIndexTuples(myslot,
> -                                                                       
> estate, false, NULL, NIL);
> -                     ExecARInsertTriggers(estate, resultRelInfo,
> -                                                              myslot,
> -                                                              
> recheckIndexes, cstate->transition_capture);
> -                     list_free(recheckIndexes);
> -             }
> -     }
> -
> -     /*
> -      * There's no indexes, but see if we need to run AFTER ROW INSERT 
> triggers
> -      * anyway.
> -      */
> -     else if (resultRelInfo->ri_TrigDesc != NULL &&
> -                      (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
> -                       resultRelInfo->ri_TrigDesc->trig_insert_new_table))
> -     {
> -             for (i = 0; i < nBufferedTuples; i++)
> -             {
> -                     cstate->cur_lineno = firstBufferedLineNo + i;
> -                     ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
> -                     ExecARInsertTriggers(estate, resultRelInfo,
> -                                                              myslot,
> -                                                              NIL, 
> cstate->transition_capture);
> -             }
> -     }
> -
> -     /* reset cur_lineno and line_buf_valid to what they were */
> -     cstate->line_buf_valid = line_buf_valid;
> -     cstate->cur_lineno = save_cur_lineno;
> -}
> -
>  /*
>   * Setup to read tuples from a file for COPY FROM.
>   *
> @@ -4990,11 +5217,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver 
> *self)
>       DR_copy    *myState = (DR_copy *) self;
>       CopyState       cstate = myState->cstate;
>  
> -     /* Make sure the tuple is fully deconstructed */
> -     slot_getallattrs(slot);
> -
> -     /* And send the data */
> -     CopyOneRowTo(cstate, slot->tts_values, slot->tts_isnull);
> +     /* Send the data */
> +     CopyOneRowTo(cstate, slot);
>       myState->processed++;
>  
>       return true;
> diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
> index 4c077755d5..ed0e2de144 100644
> --- a/src/include/access/heapam.h
> +++ b/src/include/access/heapam.h
> @@ -36,6 +36,7 @@
>  #define HEAP_INSERT_SPECULATIVE 0x0010
>  
>  typedef struct BulkInsertStateData *BulkInsertState;
> +struct TupleTableSlot;
>  
>  #define MaxLockTupleMode     LockTupleExclusive
>  
> @@ -143,7 +144,7 @@ extern void ReleaseBulkInsertStatePin(BulkInsertState 
> bistate);
>  
>  extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
>                       int options, BulkInsertState bistate);
> -extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int 
> ntuples,
> +extern void heap_multi_insert(Relation relation, struct TupleTableSlot 
> **slots, int ntuples,
>                                 CommandId cid, int options, BulkInsertState 
> bistate);
>  extern TM_Result heap_delete(Relation relation, ItemPointer tid,
>                       CommandId cid, Snapshot crosscheck, bool wait,
> diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
> index 4efe178ed1..c2fdedc551 100644
> --- a/src/include/access/tableam.h
> +++ b/src/include/access/tableam.h
> @@ -328,6 +328,9 @@ typedef struct TableAmRoutine
>        * 
> ------------------------------------------------------------------------
>        */
>  
> +     void            (*multi_insert) (Relation rel, TupleTableSlot **slots, 
> int nslots,
> +                                                              CommandId cid, 
> int options, struct BulkInsertStateData *bistate);
> +
>       /* see table_insert() for reference about parameters */
>       void            (*tuple_insert) (Relation rel, TupleTableSlot *slot,
>                                                                CommandId cid, 
> int options,
> @@ -1157,6 +1160,17 @@ table_update(Relation rel, ItemPointer otid, 
> TupleTableSlot *slot,
>                                                                               
>  lockmode, update_indexes);
>  }
>  
> +/*
> + *   table_multi_insert      - insert multiple tuple into a table
> + */
> +static inline void
> +table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
> +                                CommandId cid, int options, struct 
> BulkInsertStateData *bistate)
> +{
> +     rel->rd_tableam->multi_insert(rel, slots, nslots,
> +                                                               cid, options, 
> bistate);
> +}
> +
>  /*
>   * Lock a tuple in the specified mode.
>   *

Greetings,

Andres Freund


Reply via email to