On 25.09.2011 16:03, Dean Rasheed wrote:
On 25 September 2011 09:43, Kohei KaiGai<kai...@kaigai.gr.jp> wrote:
Hi Heikki,
I checked your patch, then I have a comment and two questions here.
2011/9/14 Heikki Linnakangas<heikki.linnakan...@enterprisedb.com>:
Attached is a new version of the patch. It is now complete, including WAL
replay code.
Hi,
I had a quick look at the patch as well and spotted an oversight: the
multi-insert code branch fails to invoke AFTER ROW triggers.
Thanks! Here's an updated version of the patch, fixing that, and all the
other issues pointed out this far.
I extracted the code that sets oid and tuple headers, and invokes the
toaster, into a new function that's shared by heap_insert() and
heap_multi_insert(). Tom objected to merging heap_insert() and
heap_multi_insert() into one complicated function, and I think he was
right on that, but sharing this code to prepare a tuple still makes
sense. IMHO it makes heap_insert() slightly more readable too.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
*** a/src/backend/access/heap/heapam.c
--- b/src/backend/access/heap/heapam.c
***************
*** 24,29 ****
--- 24,30 ----
* heap_getnext - retrieve next tuple in scan
* heap_fetch - retrieve tuple with given tid
* heap_insert - insert tuple into a relation
+ * heap_multi_insert - insert multiple tuples into a relation
* heap_delete - delete a tuple from a relation
* heap_update - replace a tuple in a relation with another tuple
* heap_markpos - mark scan position
***************
*** 79,84 **** static HeapScanDesc heap_beginscan_internal(Relation relation,
--- 80,87 ----
int nkeys, ScanKey key,
bool allow_strat, bool allow_sync,
bool is_bitmapscan);
+ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
+ TransactionId xid, CommandId cid, int options);
static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
ItemPointerData from, Buffer newbuf, HeapTuple newtup,
bool all_visible_cleared, bool new_all_visible_cleared);
***************
*** 1866,1920 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
Buffer vmbuffer = InvalidBuffer;
bool all_visible_cleared = false;
- if (relation->rd_rel->relhasoids)
- {
- #ifdef NOT_USED
- /* this is redundant with an Assert in HeapTupleSetOid */
- Assert(tup->t_data->t_infomask & HEAP_HASOID);
- #endif
-
- /*
- * If the object id of this tuple has already been assigned, trust the
- * caller. There are a couple of ways this can happen. At initial db
- * creation, the backend program sets oids for tuples. When we define
- * an index, we set the oid. Finally, in the future, we may allow
- * users to set their own object ids in order to support a persistent
- * object store (objects need to contain pointers to one another).
- */
- if (!OidIsValid(HeapTupleGetOid(tup)))
- HeapTupleSetOid(tup, GetNewOid(relation));
- }
- else
- {
- /* check there is not space for an OID */
- Assert(!(tup->t_data->t_infomask & HEAP_HASOID));
- }
-
- tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
- tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
- tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
- HeapTupleHeaderSetXmin(tup->t_data, xid);
- HeapTupleHeaderSetCmin(tup->t_data, cid);
- HeapTupleHeaderSetXmax(tup->t_data, 0); /* for cleanliness */
- tup->t_tableOid = RelationGetRelid(relation);
-
/*
! * If the new tuple is too big for storage or contains already toasted
! * out-of-line attributes from some other relation, invoke the toaster.
*
* Note: below this point, heaptup is the data we actually intend to store
* into the relation; tup is the caller's original untoasted data.
*/
! if (relation->rd_rel->relkind != RELKIND_RELATION)
! {
! /* toast table entries should never be recursively toasted */
! Assert(!HeapTupleHasExternal(tup));
! heaptup = tup;
! }
! else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
! heaptup = toast_insert_or_update(relation, tup, NULL, options);
! else
! heaptup = tup;
/*
* We're about to do the actual insert -- but check for conflict first,
--- 1869,1882 ----
Buffer vmbuffer = InvalidBuffer;
bool all_visible_cleared = false;
/*
! * Fill in tuple header fields, assign an OID, and toast the tuple if
! * necessary.
*
* Note: below this point, heaptup is the data we actually intend to store
* into the relation; tup is the caller's original untoasted data.
*/
! heaptup = heap_prepare_insert(relation, tup, xid, cid, options);
/*
* We're about to do the actual insert -- but check for conflict first,
***************
*** 2035,2041 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
*/
CacheInvalidateHeapTuple(relation, heaptup, NULL);
! pgstat_count_heap_insert(relation);
/*
* If heaptup is a private copy, release it. Don't forget to copy t_self
--- 1997,2003 ----
*/
CacheInvalidateHeapTuple(relation, heaptup, NULL);
! pgstat_count_heap_insert(relation, 1);
/*
* If heaptup is a private copy, release it. Don't forget to copy t_self
***************
*** 2051,2056 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
--- 2013,2297 ----
}
/*
+ * Subroutine for heap_insert(). Prepares a tuple for insertion. This sets the
+ * tuple header fields, assigns an OID, and toasts the tuple if necessary.
+ * Returns a toasted version of the tuple if it was toasted, or the original
+ * tuple if not. Note that in any case, the header fields are also set in
+ * the original tuple.
+ */
+ static HeapTuple
+ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
+ CommandId cid, int options)
+ {
+ if (relation->rd_rel->relhasoids)
+ {
+ #ifdef NOT_USED
+ /* this is redundant with an Assert in HeapTupleSetOid */
+ Assert(tup->t_data->t_infomask & HEAP_HASOID);
+ #endif
+
+ /*
+ * If the object id of this tuple has already been assigned, trust the
+ * caller. There are a couple of ways this can happen. At initial db
+ * creation, the backend program sets oids for tuples. When we define
+ * an index, we set the oid. Finally, in the future, we may allow
+ * users to set their own object ids in order to support a persistent
+ * object store (objects need to contain pointers to one another).
+ */
+ if (!OidIsValid(HeapTupleGetOid(tup)))
+ HeapTupleSetOid(tup, GetNewOid(relation));
+ }
+ else
+ {
+ /* check there is not space for an OID */
+ Assert(!(tup->t_data->t_infomask & HEAP_HASOID));
+ }
+
+ tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
+ tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
+ tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
+ HeapTupleHeaderSetXmin(tup->t_data, xid);
+ HeapTupleHeaderSetCmin(tup->t_data, cid);
+ HeapTupleHeaderSetXmax(tup->t_data, 0); /* for cleanliness */
+ tup->t_tableOid = RelationGetRelid(relation);
+
+ /*
+ * If the new tuple is too big for storage or contains already toasted
+ * out-of-line attributes from some other relation, invoke the toaster.
+ */
+ if (relation->rd_rel->relkind != RELKIND_RELATION)
+ {
+ /* toast table entries should never be recursively toasted */
+ Assert(!HeapTupleHasExternal(tup));
+ return tup;
+ }
+ else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
+ return toast_insert_or_update(relation, tup, NULL, options);
+ else
+ return tup;
+ }
+
+ /*
+ * heap_multi_insert - insert multiple tuple into a heap
+ *
+ * This is like heap_insert(), but inserts multiple tuples in one operation.
+ * That's faster than calling heap_insert() in a loop, because when multiple
+ * tuples can be inserted on a single page, we can write just a single WAL
+ * record covering all of them, and only need to lock/unlock the page once.
+ *
+ * Note: this leaks memory into the current memory context. You can create a
+ * temporary context before calling this, if that's a problem.
+ */
+ void
+ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+ CommandId cid, int options, BulkInsertState bistate)
+ {
+ TransactionId xid = GetCurrentTransactionId();
+ HeapTuple *heaptuples;
+ Buffer buffer;
+ Buffer vmbuffer = InvalidBuffer;
+ bool all_visible_cleared = false;
+ int i;
+ int ndone;
+ char *scratch = NULL;
+ Page page;
+ bool needwal;
+
+ needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation);
+
+ /* Toast and set header data in all the tuples */
+ heaptuples = palloc(ntuples * sizeof(HeapTuple));
+ for (i = 0; i < ntuples; i++)
+ heaptuples[i] = heap_prepare_insert(relation, tuples[i],
+ xid, cid, options);
+
+ /*
+ * Allocate some memory to use for constructing the WAL record. Using
+ * palloc() within a critical section is not safe, so we allocate this
+ * beforehand.
+ */
+ if (needwal)
+ scratch = palloc(BLCKSZ);
+
+ /*
+ * We're about to do the actual inserts -- but check for conflict first,
+ * to avoid possibly having to roll back work we've just done.
+ *
+ * For a heap insert, we only need to check for table-level SSI locks.
+ * Our new tuple can't possibly conflict with existing tuple locks, and
+ * heap page locks are only consolidated versions of tuple locks; they do
+ * not lock "gaps" as index page locks do. So we don't need to identify
+ * a buffer before making the call.
+ */
+ CheckForSerializableConflictIn(relation, NULL, InvalidBuffer);
+
+ ndone = 0;
+ while (ndone < ntuples)
+ {
+ int nthispage;
+
+ /*
+ * Find buffer where at least the next tuple will fit. If the page
+ * is all-visible, this will also pin the requisite visibility map
+ * page.
+ */
+ buffer = RelationGetBufferForTuple(relation, heaptuples[ndone]->t_len,
+ InvalidBuffer, options, bistate,
+ &vmbuffer, NULL);
+ page = BufferGetPage(buffer);
+
+ if (PageIsAllVisible(page))
+ {
+ all_visible_cleared = true;
+ PageClearAllVisible(page);
+ visibilitymap_clear(relation,
+ BufferGetBlockNumber(buffer),
+ vmbuffer);
+ }
+
+ /* NO EREPORT(ERROR) from here till changes are logged */
+ START_CRIT_SECTION();
+
+ /* Put as many tuples as fit on this page */
+ for (nthispage = 0; ndone + nthispage < ntuples; nthispage++)
+ {
+ HeapTuple heaptup = heaptuples[ndone + nthispage];
+
+ if (PageGetHeapFreeSpace(page) < MAXALIGN(heaptup->t_len))
+ break;
+
+ RelationPutHeapTuple(relation, buffer, heaptup);
+ }
+
+ /*
+ * XXX Should we set PageSetPrunable on this page ? See heap_insert()
+ */
+
+ MarkBufferDirty(buffer);
+
+ /* XLOG stuff */
+ if (needwal)
+ {
+ XLogRecPtr recptr;
+ xl_heap_multi_insert *xlrec;
+ XLogRecData rdata[2];
+ uint8 info = XLOG_HEAP2_MULTI_INSERT;
+ char *tupledata;
+ int totaldatalen;
+ char *scratchptr = scratch;
+ bool init;
+
+ /*
+ * If the page was previously empty, we can reinit the page
+ * instead of restoring the whole thing.
+ */
+ init = (ItemPointerGetOffsetNumber(&(heaptuples[ndone]->t_self)) == FirstOffsetNumber &&
+ PageGetMaxOffsetNumber(page) == FirstOffsetNumber + nthispage - 1);
+
+ /* allocate xl_heap_multi_insert struct from the scratch area */
+ xlrec = (xl_heap_multi_insert *) scratchptr;
+ scratchptr += SizeOfHeapMultiInsert;
+
+ /*
+ * Allocate offsets array. Unless we're reinitializing the page,
+ * in that case the tuples are stored in order starting at
+ * FirstOffsetNumber and we don't need to store the offsets
+ * explicitly.
+ */
+ if (!init)
+ scratchptr += nthispage * sizeof(OffsetNumber);
+
+ /* the rest of the scratch space is used for tuple data */
+ tupledata = scratchptr;
+
+ xlrec->all_visible_cleared = all_visible_cleared;
+ xlrec->node = relation->rd_node;
+ xlrec->blkno = BufferGetBlockNumber(buffer);
+ xlrec->ntuples = nthispage;
+
+ /*
+ * Write out an xl_multi_insert_tuple and the tuple data itself
+ * for each tuple.
+ */
+ for (i = 0; i < nthispage; i++)
+ {
+ HeapTuple heaptup = heaptuples[ndone + i];
+ xl_multi_insert_tuple *tuphdr;
+ int datalen;
+
+ if (!init)
+ xlrec->offsets[i] = ItemPointerGetOffsetNumber(&heaptup->t_self);
+ /* xl_multi_insert_tuple needs two-byte alignment. */
+ tuphdr = (xl_multi_insert_tuple *) SHORTALIGN(scratchptr);
+ scratchptr = ((char *) tuphdr) + SizeOfMultiInsertTuple;
+
+ tuphdr->t_infomask2 = heaptup->t_data->t_infomask2;
+ tuphdr->t_infomask = heaptup->t_data->t_infomask;
+ tuphdr->t_hoff = heaptup->t_data->t_hoff;
+
+ /* write bitmap [+ padding] [+ oid] + data */
+ datalen = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits);
+ memcpy(scratchptr,
+ (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits),
+ datalen);
+ tuphdr->datalen = datalen;
+ scratchptr += datalen;
+ }
+ totaldatalen = scratchptr - tupledata;
+ Assert((scratchptr - scratch) < BLCKSZ);
+
+ rdata[0].data = (char *) xlrec;
+ rdata[0].len = tupledata - scratch;
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].next = &rdata[1];
+
+ rdata[1].data = tupledata;
+ rdata[1].len = totaldatalen;
+ rdata[1].buffer = buffer;
+ rdata[1].buffer_std = true;
+ rdata[1].next = NULL;
+
+ /*
+ * If we're going to reinitialize the whole page using the WAL
+ * record, hide buffer reference from XLogInsert.
+ */
+ if (init)
+ {
+ rdata[1].buffer = InvalidBuffer;
+ info |= XLOG_HEAP_INIT_PAGE;
+ }
+
+ recptr = XLogInsert(RM_HEAP2_ID, info, rdata);
+
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+ }
+
+ END_CRIT_SECTION();
+
+ UnlockReleaseBuffer(buffer);
+ if (vmbuffer != InvalidBuffer)
+ ReleaseBuffer(vmbuffer);
+
+ ndone += nthispage;
+ }
+
+ /*
+ * If tuples are cachable, mark them for invalidation from the caches in
+ * case we abort. Note it is OK to do this after releasing the buffer,
+ * because the heaptuples data structure is all in local memory, not in
+ * the shared buffer.
+ */
+ if (IsSystemRelation(relation))
+ {
+ for (i = 0; i < ntuples; i++)
+ CacheInvalidateHeapTuple(relation, heaptuples[i], NULL);
+ }
+
+ pgstat_count_heap_insert(relation, ntuples);
+ }
+
+ /*
* simple_heap_insert - insert a tuple
*
* Currently, this routine differs from heap_insert only in supplying
***************
*** 4738,4743 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
--- 4979,5122 ----
}
/*
+ * Handles MULTI_INSERT record type.
+ */
+ static void
+ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
+ {
+ char *recdata = XLogRecGetData(record);
+ xl_heap_multi_insert *xlrec;
+ Buffer buffer;
+ Page page;
+ struct
+ {
+ HeapTupleHeaderData hdr;
+ char data[MaxHeapTupleSize];
+ } tbuf;
+ HeapTupleHeader htup;
+ uint32 newlen;
+ Size freespace;
+ BlockNumber blkno;
+ int i;
+ bool isinit = (record->xl_info & XLOG_HEAP_INIT_PAGE) != 0;
+
+ xlrec = (xl_heap_multi_insert *) recdata;
+ recdata += SizeOfHeapMultiInsert;
+
+ /*
+ * If we're reinitializing the page, the tuples are stored in order from
+ * FirstOffsetNumber. Otherwise there's an array of offsets in the WAL
+ * record.
+ */
+ if (!isinit)
+ recdata += sizeof(OffsetNumber) * xlrec->ntuples;
+
+ blkno = xlrec->blkno;
+
+ /*
+ * The visibility map may need to be fixed even if the heap page is
+ * already up-to-date.
+ */
+ if (xlrec->all_visible_cleared)
+ {
+ Relation reln = CreateFakeRelcacheEntry(xlrec->node);
+ Buffer vmbuffer = InvalidBuffer;
+
+ visibilitymap_pin(reln, blkno, &vmbuffer);
+ visibilitymap_clear(reln, blkno, vmbuffer);
+ ReleaseBuffer(vmbuffer);
+ FreeFakeRelcacheEntry(reln);
+ }
+
+ if (record->xl_info & XLR_BKP_BLOCK_1)
+ return;
+
+ if (isinit)
+ {
+ buffer = XLogReadBuffer(xlrec->node, blkno, true);
+ Assert(BufferIsValid(buffer));
+ page = (Page) BufferGetPage(buffer);
+
+ PageInit(page, BufferGetPageSize(buffer), 0);
+ }
+ else
+ {
+ buffer = XLogReadBuffer(xlrec->node, blkno, false);
+ if (!BufferIsValid(buffer))
+ return;
+ page = (Page) BufferGetPage(buffer);
+
+ if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
+ {
+ UnlockReleaseBuffer(buffer);
+ return;
+ }
+ }
+
+ for (i = 0; i < xlrec->ntuples; i++)
+ {
+ OffsetNumber offnum;
+ xl_multi_insert_tuple *xlhdr;
+
+ if (isinit)
+ offnum = FirstOffsetNumber + i;
+ else
+ offnum = xlrec->offsets[i];
+ if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+ elog(PANIC, "heap_multi_insert_redo: invalid max offset number");
+
+ xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(recdata);
+ recdata += SizeOfMultiInsertTuple;
+
+ newlen = xlhdr->datalen;
+ Assert(newlen <= MaxHeapTupleSize);
+ htup = &tbuf.hdr;
+ MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
+ /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
+ memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
+ (char *) recdata,
+ newlen);
+ recdata += newlen;
+
+ newlen += offsetof(HeapTupleHeaderData, t_bits);
+ htup->t_infomask2 = xlhdr->t_infomask2;
+ htup->t_infomask = xlhdr->t_infomask;
+ htup->t_hoff = xlhdr->t_hoff;
+ HeapTupleHeaderSetXmin(htup, record->xl_xid);
+ HeapTupleHeaderSetCmin(htup, FirstCommandId);
+ ItemPointerSetBlockNumber(&htup->t_ctid, blkno);
+ ItemPointerSetOffsetNumber(&htup->t_ctid, offnum);
+
+ offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
+ if (offnum == InvalidOffsetNumber)
+ elog(PANIC, "heap_multi_insert_redo: failed to add tuple");
+ }
+
+ freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
+
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+
+ if (xlrec->all_visible_cleared)
+ PageClearAllVisible(page);
+
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
+
+ /*
+ * If the page is running low on free space, update the FSM as well.
+ * Arbitrarily, our definition of "low" is less than 20%. We can't do much
+ * better than that without knowing the fill-factor for the table.
+ *
+ * XXX: We don't get here if the page was restored from full page image.
+ * We don't bother to update the FSM in that case, it doesn't need to be
+ * totally accurate anyway.
+ */
+ if (freespace < BLCKSZ / 5)
+ XLogRecordPageWithFreeSpace(xlrec->node, blkno, freespace);
+ }
+
+ /*
* Handles UPDATE and HOT_UPDATE
*/
static void
***************
*** 5126,5131 **** heap2_redo(XLogRecPtr lsn, XLogRecord *record)
--- 5505,5513 ----
case XLOG_HEAP2_VISIBLE:
heap_xlog_visible(lsn, record);
break;
+ case XLOG_HEAP2_MULTI_INSERT:
+ heap_xlog_multi_insert(lsn, record);
+ break;
default:
elog(PANIC, "heap2_redo: unknown op code %u", info);
}
***************
*** 5263,5268 **** heap2_desc(StringInfo buf, uint8 xl_info, char *rec)
--- 5645,5662 ----
xlrec->node.spcNode, xlrec->node.dbNode,
xlrec->node.relNode, xlrec->block);
}
+ else if (info == XLOG_HEAP2_MULTI_INSERT)
+ {
+ xl_heap_multi_insert *xlrec = (xl_heap_multi_insert *) rec;
+
+ if (xl_info & XLOG_HEAP_INIT_PAGE)
+ appendStringInfo(buf, "multi-insert (init): ");
+ else
+ appendStringInfo(buf, "multi-insert: ");
+ appendStringInfo(buf, "rel %u/%u/%u; blk %u; %d tuples",
+ xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode,
+ xlrec->blkno, xlrec->ntuples);
+ }
else
appendStringInfo(buf, "UNKNOWN");
}
*** a/src/backend/commands/copy.c
--- b/src/backend/commands/copy.c
***************
*** 33,38 ****
--- 33,39 ----
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
+ #include "optimizer/clauses.h"
#include "optimizer/planner.h"
#include "parser/parse_relation.h"
#include "rewrite/rewriteHandler.h"
***************
*** 149,154 **** typedef struct CopyStateData
--- 150,156 ----
Oid *typioparams; /* array of element types for in_functions */
int *defmap; /* array of default att numbers */
ExprState **defexprs; /* array of default att expressions */
+ bool volatile_defexprs; /* is any of defexprs volatile? */
/*
* These variables are used to reduce overhead in textual COPY FROM.
***************
*** 277,282 **** static uint64 CopyTo(CopyState cstate);
--- 279,289 ----
static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
Datum *values, bool *nulls);
static uint64 CopyFrom(CopyState cstate);
+ static void WriteCopyFromBatch(CopyState cstate, EState *estate,
+ CommandId mycid, int hi_options,
+ ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
+ BulkInsertState bistate,
+ int nBufferedTuples, HeapTuple *bufferedTuples);
static bool CopyReadLine(CopyState cstate);
static bool CopyReadLineText(CopyState cstate);
static int CopyReadAttributesText(CopyState cstate);
***************
*** 1842,1852 **** CopyFrom(CopyState cstate)
--- 1849,1865 ----
ExprContext *econtext;
TupleTableSlot *myslot;
MemoryContext oldcontext = CurrentMemoryContext;
+
ErrorContextCallback errcontext;
CommandId mycid = GetCurrentCommandId(true);
int hi_options = 0; /* start with default heap_insert options */
BulkInsertState bistate;
uint64 processed = 0;
+ bool useHeapMultiInsert;
+ int nBufferedTuples = 0;
+ #define MAX_BUFFERED_TUPLES 1000
+ HeapTuple *bufferedTuples;
+ Size bufferedTuplesSize = 0;
Assert(cstate->rel);
***************
*** 1941,1946 **** CopyFrom(CopyState cstate)
--- 1954,1981 ----
/* Triggers might need a slot as well */
estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
+ /*
+ * It's more efficient to prepare a bunch of tuples for insertion, and
+ * insert them in one heap_multi_insert() call, than call heap_insert()
+ * separately for every tuple. However, we can't do that if there are
+ * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
+ * expressions. Such triggers or expressions might query the table we're
+ * inserting to, and act differently if the tuples that have already been
+ * processed and prepared for insertion are not there.
+ */
+ if ((resultRelInfo->ri_TrigDesc != NULL &&
+ (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
+ resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
+ cstate->volatile_defexprs)
+ {
+ useHeapMultiInsert = false;
+ }
+ else
+ {
+ useHeapMultiInsert = true;
+ bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
+ }
+
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery();
***************
*** 1972,1979 **** CopyFrom(CopyState cstate)
CHECK_FOR_INTERRUPTS();
! /* Reset the per-tuple exprcontext */
! ResetPerTupleExprContext(estate);
/* Switch into its memory context */
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
--- 2007,2021 ----
CHECK_FOR_INTERRUPTS();
! if (nBufferedTuples == 0)
! {
! /*
! * Reset the per-tuple exprcontext. We can only do this if the
! * tuple buffer is empty (calling the context the per-tuple memory
! * context is a bit of a misnomer now
! */
! ResetPerTupleExprContext(estate);
! }
/* Switch into its memory context */
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
***************
*** 2010,2033 **** CopyFrom(CopyState cstate)
if (!skip_tuple)
{
- List *recheckIndexes = NIL;
-
/* Check the constraints of the tuple */
if (cstate->rel->rd_att->constr)
ExecConstraints(resultRelInfo, slot, estate);
! /* OK, store the tuple and create index entries for it */
! heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
! if (resultRelInfo->ri_NumIndices > 0)
! recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
! estate);
! /* AFTER ROW INSERT Triggers */
! ExecARInsertTriggers(estate, resultRelInfo, tuple,
! recheckIndexes);
! list_free(recheckIndexes);
/*
* We count only tuples not suppressed by a BEFORE INSERT trigger;
--- 2052,2100 ----
if (!skip_tuple)
{
/* Check the constraints of the tuple */
if (cstate->rel->rd_att->constr)
ExecConstraints(resultRelInfo, slot, estate);
! if (useHeapMultiInsert)
! {
! /* Add this tuple to the tuple buffer */
! bufferedTuples[nBufferedTuples++] = tuple;
! bufferedTuplesSize += tuple->t_len;
! /*
! * If the buffer filled up, flush it. Also flush if the total
! * size of all the tuples in the buffer becomes large, to
! * avoid using large amounts of memory for the buffers when
! * the tuples are exceptionally wide.
! */
! if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
! bufferedTuplesSize > 65535)
! {
! WriteCopyFromBatch(cstate, estate, mycid, hi_options,
! resultRelInfo, myslot, bistate,
! nBufferedTuples, bufferedTuples);
! nBufferedTuples = 0;
! bufferedTuplesSize = 0;
! }
! }
! else
! {
! List *recheckIndexes = NIL;
! /* OK, store the tuple and create index entries for it */
! heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
! if (resultRelInfo->ri_NumIndices > 0)
! recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
! estate);
!
! /* AFTER ROW INSERT Triggers */
! ExecARInsertTriggers(estate, resultRelInfo, tuple,
! recheckIndexes);
!
! list_free(recheckIndexes);
! }
/*
* We count only tuples not suppressed by a BEFORE INSERT trigger;
***************
*** 2038,2043 **** CopyFrom(CopyState cstate)
--- 2105,2116 ----
}
}
+ /* Flush any remaining buffered tuples */
+ if (nBufferedTuples > 0)
+ WriteCopyFromBatch(cstate, estate, mycid, hi_options,
+ resultRelInfo, myslot, bistate,
+ nBufferedTuples, bufferedTuples);
+
/* Done, clean up */
error_context_stack = errcontext.previous;
***************
*** 2071,2076 **** CopyFrom(CopyState cstate)
--- 2144,2210 ----
}
/*
+ * A subroutine of CopyFrom, to write the current batch of buffered heap
+ * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
+ * triggers.
+ */
+ static void
+ WriteCopyFromBatch(CopyState cstate, EState *estate, CommandId mycid,
+ int hi_options, ResultRelInfo *resultRelInfo,
+ TupleTableSlot *myslot, BulkInsertState bistate,
+ int nBufferedTuples, HeapTuple *bufferedTuples)
+ {
+ MemoryContext oldcontext;
+ int i;
+
+ /*
+ * heap_multi_insert leaks memory, so switch to short-lived memory
+ * context before calling it.
+ */
+ oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ heap_multi_insert(cstate->rel,
+ bufferedTuples,
+ nBufferedTuples,
+ mycid,
+ hi_options,
+ bistate);
+ MemoryContextSwitchTo(oldcontext);
+
+ /*
+ * If there are any indexes, update them for all the inserted tuples,
+ * and run AFTER ROW INSERT triggers.
+ */
+ if (resultRelInfo->ri_NumIndices > 0)
+ {
+ for (i = 0; i < nBufferedTuples; i++)
+ {
+ List *recheckIndexes;
+
+ ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
+ recheckIndexes =
+ ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
+ estate);
+ ExecARInsertTriggers(estate, resultRelInfo,
+ bufferedTuples[i],
+ recheckIndexes);
+ list_free(recheckIndexes);
+ }
+ }
+ /*
+ * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+ * anyway.
+ */
+ else if (resultRelInfo->ri_TrigDesc != NULL &&
+ resultRelInfo->ri_TrigDesc->trig_insert_after_row)
+ {
+ for (i = 0; i < nBufferedTuples; i++)
+ ExecARInsertTriggers(estate, resultRelInfo,
+ bufferedTuples[i],
+ NIL);
+ }
+ }
+
+ /*
* Setup to read tuples from a file for COPY FROM.
*
* 'rel': Used as a template for the tuples
***************
*** 2099,2104 **** BeginCopyFrom(Relation rel,
--- 2233,2239 ----
int *defmap;
ExprState **defexprs;
MemoryContext oldcontext;
+ bool volatile_defexprs;
cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options);
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
***************
*** 2122,2127 **** BeginCopyFrom(Relation rel,
--- 2257,2263 ----
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;
num_defaults = 0;
+ volatile_defexprs = false;
/*
* Pick up the required catalog information for each attribute in the
***************
*** 2163,2168 **** BeginCopyFrom(Relation rel,
--- 2299,2307 ----
expression_planner((Expr *) defexpr), NULL);
defmap[num_defaults] = attnum - 1;
num_defaults++;
+
+ if (!volatile_defexprs)
+ volatile_defexprs = contain_volatile_functions(defexpr);
}
}
}
***************
*** 2172,2177 **** BeginCopyFrom(Relation rel,
--- 2311,2317 ----
cstate->typioparams = typioparams;
cstate->defmap = defmap;
cstate->defexprs = defexprs;
+ cstate->volatile_defexprs = volatile_defexprs;
cstate->num_defaults = num_defaults;
if (pipe)
*** a/src/backend/postmaster/pgstat.c
--- b/src/backend/postmaster/pgstat.c
***************
*** 1677,1686 **** add_tabstat_xact_level(PgStat_TableStatus *pgstat_info, int nest_level)
}
/*
! * pgstat_count_heap_insert - count a tuple insertion
*/
void
! pgstat_count_heap_insert(Relation rel)
{
PgStat_TableStatus *pgstat_info = rel->pgstat_info;
--- 1677,1686 ----
}
/*
! * pgstat_count_heap_insert - count a tuple insertion of n tuples
*/
void
! pgstat_count_heap_insert(Relation rel, int n)
{
PgStat_TableStatus *pgstat_info = rel->pgstat_info;
***************
*** 1693,1699 **** pgstat_count_heap_insert(Relation rel)
pgstat_info->trans->nest_level != nest_level)
add_tabstat_xact_level(pgstat_info, nest_level);
! pgstat_info->trans->tuples_inserted++;
}
}
--- 1693,1699 ----
pgstat_info->trans->nest_level != nest_level)
add_tabstat_xact_level(pgstat_info, nest_level);
! pgstat_info->trans->tuples_inserted += n;
}
}
*** a/src/include/access/heapam.h
--- b/src/include/access/heapam.h
***************
*** 97,102 **** extern void FreeBulkInsertState(BulkInsertState);
--- 97,104 ----
extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid,
int options, BulkInsertState bistate);
+ extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+ CommandId cid, int options, BulkInsertState bistate);
extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
ItemPointer ctid, TransactionId *update_xmax,
CommandId cid, Snapshot crosscheck, bool wait);
*** a/src/include/access/htup.h
--- b/src/include/access/htup.h
***************
*** 608,613 **** typedef HeapTupleData *HeapTuple;
--- 608,614 ----
/* 0x20 is free, was XLOG_HEAP2_CLEAN_MOVE */
#define XLOG_HEAP2_CLEANUP_INFO 0x30
#define XLOG_HEAP2_VISIBLE 0x40
+ #define XLOG_HEAP2_MULTI_INSERT 0x50
/*
* All what we need to find changed tuple
***************
*** 661,666 **** typedef struct xl_heap_insert
--- 662,697 ----
#define SizeOfHeapInsert (offsetof(xl_heap_insert, all_visible_cleared) + sizeof(bool))
+ /*
+ * This is what we need to know about a multi-insert. The record consists of
+ * xl_heap_multi_insert header, followed by a xl_multi_insert_tuple and tuple
+ * data for each tuple. 'offsets' array is omitted if the whole page is
+ * reinitialized (XLOG_HEAP_INIT_PAGE)
+ */
+ typedef struct xl_heap_multi_insert
+ {
+ RelFileNode node;
+ BlockNumber blkno;
+ bool all_visible_cleared;
+ uint16 ntuples;
+ OffsetNumber offsets[1];
+
+ /* TUPLE DATA (xl_multi_insert_tuples) FOLLOW AT END OF STRUCT */
+ } xl_heap_multi_insert;
+
+ #define SizeOfHeapMultiInsert offsetof(xl_heap_multi_insert, offsets)
+
+ typedef struct xl_multi_insert_tuple
+ {
+ uint16 datalen; /* size of tuple data that follows */
+ uint16 t_infomask2;
+ uint16 t_infomask;
+ uint8 t_hoff;
+ /* TUPLE DATA FOLLOWS AT END OF STRUCT */
+ } xl_multi_insert_tuple;
+
+ #define SizeOfMultiInsertTuple (offsetof(xl_multi_insert_tuple, t_hoff) + sizeof(uint8))
+
/* This is what we need to know about update|hot_update */
typedef struct xl_heap_update
{
*** a/src/include/pgstat.h
--- b/src/include/pgstat.h
***************
*** 766,772 **** extern void pgstat_initstats(Relation rel);
(rel)->pgstat_info->t_counts.t_blocks_hit++; \
} while (0)
! extern void pgstat_count_heap_insert(Relation rel);
extern void pgstat_count_heap_update(Relation rel, bool hot);
extern void pgstat_count_heap_delete(Relation rel);
extern void pgstat_update_heap_dead_tuples(Relation rel, int delta);
--- 766,772 ----
(rel)->pgstat_info->t_counts.t_blocks_hit++; \
} while (0)
! extern void pgstat_count_heap_insert(Relation rel, int n);
extern void pgstat_count_heap_update(Relation rel, bool hot);
extern void pgstat_count_heap_delete(Relation rel);
extern void pgstat_update_heap_dead_tuples(Relation rel, int delta);
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers