On 13.08.2011 17:33, Tom Lane wrote:
Heikki Linnakangas<heikki.linnakan...@enterprisedb.com>  writes:
The patch is WIP, mainly because I didn't write the WAL replay routines
yet, but please let me know if you see any issues.

Why do you need new WAL replay routines?  Can't you just use the existing
XLOG_HEAP_NEWPAGE support?

By any large, I think we should be avoiding special-purpose WAL entries
as much as possible.

I tried that, but most of the reduction in WAL-size melts away with that. And if the page you're copying to is not empty, logging the whole page is even more expensive. You'd need to fall back to retail inserts in that case which complicates the logic.

Also, I strongly object to turning regular heap_insert into a wrapper
around some other more complicated operation.  That will likely have
bad consequences for the performance of every other operation.

Ok. I doubt it makes any noticeable difference for performance, but having done that, it's more readable, too, to duplicate the code.

Attached is a new version of the patch. It is now complete, including WAL replay code.

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 06db65d..b479613 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -24,6 +24,7 @@
  *		heap_getnext	- retrieve next tuple in scan
  *		heap_fetch		- retrieve tuple with given tid
  *		heap_insert		- insert tuple into a relation
+ *		heap_multi_insert - insert multiple tuples into a relation
  *		heap_delete		- delete a tuple from a relation
  *		heap_update		- replace a tuple in a relation with another tuple
  *		heap_markpos	- mark scan position
@@ -2030,7 +2031,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 	 */
 	CacheInvalidateHeapTuple(relation, heaptup, NULL);
 
-	pgstat_count_heap_insert(relation);
+	pgstat_count_heap_insert(relation, 1);
 
 	/*
 	 * If heaptup is a private copy, release it.  Don't forget to copy t_self
@@ -2046,6 +2047,276 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 }
 
 /*
+ * Subroutine for heap_multi_insert(). Prepares a tuple for insertion. This
+ * sets the tuple header fields, assigns an OID, and toasts the tuple if
+ * necessary. Returns a toasted version of the tuple if it was toasted, or
+ * the original tuple if not.
+ */
+static HeapTuple
+heap_prepare_insert(Relation relation, HeapTuple tup, CommandId cid,
+					int options)
+{
+	TransactionId xid = GetCurrentTransactionId();
+	HeapTuple heaptup;
+
+	if (relation->rd_rel->relhasoids)
+	{
+#ifdef NOT_USED
+		/* this is redundant with an Assert in HeapTupleSetOid */
+		Assert(tup->t_data->t_infomask & HEAP_HASOID);
+#endif
+
+		/*
+		 * If the object id of this tuple has already been assigned, trust the
+		 * caller.	There are a couple of ways this can happen.  At initial db
+		 * creation, the backend program sets oids for tuples. When we define
+		 * an index, we set the oid.  Finally, in the future, we may allow
+		 * users to set their own object ids in order to support a persistent
+		 * object store (objects need to contain pointers to one another).
+		 */
+		if (!OidIsValid(HeapTupleGetOid(tup)))
+			HeapTupleSetOid(tup, GetNewOid(relation));
+	}
+	else
+	{
+		/* check there is not space for an OID */
+		Assert(!(tup->t_data->t_infomask & HEAP_HASOID));
+	}
+
+	tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
+	tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
+	tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
+	HeapTupleHeaderSetXmin(tup->t_data, xid);
+	HeapTupleHeaderSetCmin(tup->t_data, cid);
+	HeapTupleHeaderSetXmax(tup->t_data, 0);		/* for cleanliness */
+	tup->t_tableOid = RelationGetRelid(relation);
+
+	/*
+	 * If the new tuple is too big for storage or contains already toasted
+	 * out-of-line attributes from some other relation, invoke the toaster.
+	 *
+	 * Note: below this point, heaptup is the data we actually intend to store
+	 * into the relation; tup is the caller's original untoasted data.
+	 */
+	if (relation->rd_rel->relkind != RELKIND_RELATION)
+	{
+		/* toast table entries should never be recursively toasted */
+		Assert(!HeapTupleHasExternal(tup));
+		heaptup = tup;
+	}
+	else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
+		heaptup = toast_insert_or_update(relation, tup, NULL, options);
+	else
+		heaptup = tup;
+
+	return heaptup;
+}
+
+/*
+ *	heap_multi_insert	- insert multiple tuple into a heap
+ *
+ * This is like heap_insert(), but inserts multiple tuples in one operation.
+ * That's faster than calling heap_insert() in a loop, because when multiple
+ * tuples can be inserted on a single page, we can write just a single WAL
+ * record covering all of them, and only need to lock/unlock the page once.
+ *
+ * Note: this leaks memory into the current memory context. You can create a
+ * temporary context before calling this, if that's a problem.
+ */
+void
+heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+				  CommandId cid, int options, BulkInsertState bistate)
+{
+	HeapTuple  *heaptuples;
+	Buffer		buffer;
+	Buffer		vmbuffer = InvalidBuffer;
+	bool		all_visible_cleared = false;
+	int			i;
+	int			ndone;
+	char	   *scratch = NULL;
+	Page		page;
+	bool		needwal;
+
+	needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation);
+
+	/* Toast and set header data in all the tuples */
+	heaptuples = palloc(ntuples * sizeof(HeapTuple));
+	for (i = 0; i < ntuples; i++)
+		heaptuples[i] = heap_prepare_insert(relation, tuples[i], cid, options);
+
+	/*
+	 * Allocate some memory to use for constructing the WAL record. Using
+	 * palloc() within a critical section is not safe, so we allocate this
+	 * beforehand.
+	 */
+	if (needwal)
+		scratch = palloc(BLCKSZ);
+
+	ndone = 0;
+	while (ndone < ntuples)
+	{
+		int nthispage;
+
+		/*
+		 * Find buffer to insert this tuple into.  If the page is all visible,
+		 * this will also pin the requisite visibility map page.
+		 */
+		buffer = RelationGetBufferForTuple(relation, heaptuples[ndone]->t_len,
+										   InvalidBuffer, options, bistate,
+										   &vmbuffer, NULL);
+		page = BufferGetPage(buffer);
+
+		/*
+		 * We're about to do the actual insert -- check for conflict at the
+		 * relation or buffer level first, to avoid possibly having to roll back
+		 * work we've just done.
+		 */
+		CheckForSerializableConflictIn(relation, NULL, buffer);
+
+		if (PageIsAllVisible(page))
+		{
+			all_visible_cleared = true;
+			PageClearAllVisible(page);
+			visibilitymap_clear(relation,
+								BufferGetBlockNumber(buffer),
+								vmbuffer);
+		}
+
+		/* NO EREPORT(ERROR) from here till changes are logged */
+		START_CRIT_SECTION();
+
+		/* Put as many tuples as fit on this page */
+		for (nthispage = 0; ndone + nthispage < ntuples; nthispage++)
+		{
+			HeapTuple	heaptup = heaptuples[ndone + nthispage];
+
+			if (PageGetHeapFreeSpace(page) < MAXALIGN(heaptup->t_len))
+				break;
+
+			RelationPutHeapTuple(relation, buffer, heaptup);
+		}
+
+		/*
+		 * XXX Should we set PageSetPrunable on this page ? See heap_insert()
+		 */
+
+		MarkBufferDirty(buffer);
+
+		/* XLOG stuff */
+		if (needwal)
+		{
+			XLogRecPtr	recptr;
+			xl_heap_multi_insert *xlrec;
+			XLogRecData rdata[2];
+			uint8		info = XLOG_HEAP2_MULTI_INSERT;
+			char	   *tupledata;
+			int			totaldatalen;
+			char	   *scratchptr;
+			bool		init;
+
+			/*
+			 * If the page was previously empty, we can reinit the page
+			 * instead of restoring the whole thing.
+			 */
+			init = (ItemPointerGetOffsetNumber(&(heaptuples[ndone]->t_self)) == FirstOffsetNumber &&
+					PageGetMaxOffsetNumber(page) == FirstOffsetNumber + nthispage - 1);
+
+			/* allocate xl_heap_multi_insert struct from the scratch area */
+			scratchptr = scratch;
+			xlrec = (xl_heap_multi_insert *) scratchptr;
+			scratchptr += SizeOfHeapMultiInsert;
+
+			/* allocate offsets array, unless we're reinitializing the page */
+			if (!init)
+				scratchptr += nthispage * sizeof(OffsetNumber);
+
+			/* the rest of the scratch space is used for tuple data */
+			tupledata = scratchptr;
+
+			xlrec->all_visible_cleared = all_visible_cleared;
+			xlrec->node = relation->rd_node;
+			xlrec->blkno = BufferGetBlockNumber(buffer);
+			xlrec->ntuples = nthispage;
+
+			for (i = 0; i < nthispage; i++)
+			{
+				HeapTuple	heaptup = heaptuples[ndone + i];
+				xl_multi_insert_tuple *tuphdr;
+				int			datalen;
+
+				tuphdr = (xl_multi_insert_tuple *) SHORTALIGN(scratchptr);
+
+				scratchptr = ((char *) tuphdr) + SizeOfMultiInsertTuple;
+
+				if (!init)
+					xlrec->offsets[i] = ItemPointerGetOffsetNumber(&heaptup->t_self);
+				tuphdr->t_infomask2 = heaptup->t_data->t_infomask2;
+				tuphdr->t_infomask = heaptup->t_data->t_infomask;
+				tuphdr->t_hoff = heaptup->t_data->t_hoff;
+
+				/* write bitmap [+ padding] [+ oid] + data */
+				datalen = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits);
+				memcpy(scratchptr,
+					   (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits),
+					   datalen);
+				tuphdr->datalen = datalen;
+				scratchptr += datalen;
+			}
+			totaldatalen = scratchptr - tupledata;
+			Assert((scratchptr - scratch) < BLCKSZ);
+
+			rdata[0].data = (char *) xlrec;
+			rdata[0].len = tupledata - scratch;
+			rdata[0].buffer = InvalidBuffer;
+			rdata[0].next = &rdata[1];
+
+			rdata[1].data = tupledata;
+			rdata[1].len = totaldatalen;
+			rdata[1].buffer = buffer;
+			rdata[1].buffer_std = true;
+			rdata[1].next = NULL;
+
+			/*
+			 * If we're going to reinitialize the whole page using the WAL
+			 * record, hide buffer reference from XLogInsert.
+			 */
+			if (init)
+			{
+				rdata[1].buffer = InvalidBuffer;
+				info |= XLOG_HEAP_INIT_PAGE;
+			}
+
+			recptr = XLogInsert(RM_HEAP2_ID, info, rdata);
+
+			PageSetLSN(page, recptr);
+			PageSetTLI(page, ThisTimeLineID);
+		}
+
+		END_CRIT_SECTION();
+
+		UnlockReleaseBuffer(buffer);
+		if (vmbuffer != InvalidBuffer)
+			ReleaseBuffer(vmbuffer);
+
+		ndone += nthispage;
+	}
+
+	/*
+	 * If tuples are cachable, mark them for invalidation from the caches in
+	 * case we abort.  Note it is OK to do this after releasing the buffer,
+	 * because the heaptuples data structure is all in local memory, not in
+	 * the shared buffer.
+	 */
+	if (IsSystemRelation(relation))
+	{
+		for (i = 0; i < ntuples; i++)
+			CacheInvalidateHeapTuple(relation, heaptuples[i], NULL);
+	}
+
+	pgstat_count_heap_insert(relation, ntuples);
+}
+
+/*
  *	simple_heap_insert - insert a tuple
  *
  * Currently, this routine differs from heap_insert only in supplying
@@ -4730,6 +5001,144 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
 }
 
 /*
+ * Handles MULTI_INSERT record type.
+ */
+static void
+heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
+{
+	char	   *recdata = XLogRecGetData(record);
+	xl_heap_multi_insert *xlrec;
+	Buffer		buffer;
+	Page		page;
+	struct
+	{
+		HeapTupleHeaderData hdr;
+		char		data[MaxHeapTupleSize];
+	}			tbuf;
+	HeapTupleHeader htup;
+	uint32		newlen;
+	Size		freespace;
+	BlockNumber blkno;
+	int			i;
+	bool		isinit = (record->xl_info & XLOG_HEAP_INIT_PAGE) != 0;
+
+	xlrec = (xl_heap_multi_insert *) recdata;
+	recdata += SizeOfHeapMultiInsert;
+
+	/*
+	 * If we're reinitializing the page, the tuples are stored in order from
+	 * FirstOffsetNumber. Otherwise there's an array of offsets in the WAL
+	 * record.
+	 */
+	if (!isinit)
+		recdata += sizeof(OffsetNumber) * xlrec->ntuples;
+
+	blkno = xlrec->blkno;
+
+	/*
+	 * The visibility map may need to be fixed even if the heap page is
+	 * already up-to-date.
+	 */
+	if (xlrec->all_visible_cleared)
+	{
+		Relation	reln = CreateFakeRelcacheEntry(xlrec->node);
+		Buffer		vmbuffer = InvalidBuffer;
+
+		visibilitymap_pin(reln, blkno, &vmbuffer);
+		visibilitymap_clear(reln, blkno, vmbuffer);
+		ReleaseBuffer(vmbuffer);
+		FreeFakeRelcacheEntry(reln);
+	}
+
+	if (record->xl_info & XLR_BKP_BLOCK_1)
+		return;
+
+	if (isinit)
+	{
+		buffer = XLogReadBuffer(xlrec->node, blkno, true);
+		Assert(BufferIsValid(buffer));
+		page = (Page) BufferGetPage(buffer);
+
+		PageInit(page, BufferGetPageSize(buffer), 0);
+	}
+	else
+	{
+		buffer = XLogReadBuffer(xlrec->node, blkno, false);
+		if (!BufferIsValid(buffer))
+			return;
+		page = (Page) BufferGetPage(buffer);
+
+		if (XLByteLE(lsn, PageGetLSN(page)))	/* changes are applied */
+		{
+			UnlockReleaseBuffer(buffer);
+			return;
+		}
+	}
+
+	for (i = 0; i < xlrec->ntuples; i++)
+	{
+		OffsetNumber offnum;
+		xl_multi_insert_tuple *xlhdr;
+
+		if (isinit)
+			offnum = FirstOffsetNumber + i;
+		else
+			offnum = xlrec->offsets[i];
+		if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+			elog(PANIC, "heap_multi_insert_redo: invalid max offset number");
+
+		xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(recdata);
+		recdata += SizeOfMultiInsertTuple;
+
+		newlen = xlhdr->datalen;
+		Assert(newlen <= MaxHeapTupleSize);
+		htup = &tbuf.hdr;
+		MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
+		/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
+		memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
+			   (char *) recdata,
+			   newlen);
+		recdata += newlen;
+
+		newlen += offsetof(HeapTupleHeaderData, t_bits);
+		htup->t_infomask2 = xlhdr->t_infomask2;
+		htup->t_infomask = xlhdr->t_infomask;
+		htup->t_hoff = xlhdr->t_hoff;
+		HeapTupleHeaderSetXmin(htup, record->xl_xid);
+		HeapTupleHeaderSetCmin(htup, FirstCommandId);
+		ItemPointerSetBlockNumber(&htup->t_ctid, blkno);
+		ItemPointerSetOffsetNumber(&htup->t_ctid, offnum);
+
+		offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
+		if (offnum == InvalidOffsetNumber)
+			elog(PANIC, "heap_multi_insert_redo: failed to add tuple");
+	}
+
+	freespace = PageGetHeapFreeSpace(page);		/* needed to update FSM below */
+
+	PageSetLSN(page, lsn);
+	PageSetTLI(page, ThisTimeLineID);
+
+	if (xlrec->all_visible_cleared)
+		PageClearAllVisible(page);
+
+	MarkBufferDirty(buffer);
+	UnlockReleaseBuffer(buffer);
+
+	/*
+	 * If the page is running low on free space, update the FSM as well.
+	 * Arbitrarily, our definition of "low" is less than 20%. We can't do much
+	 * better than that without knowing the fill-factor for the table.
+	 *
+	 * XXX: We don't get here if the page was restored from full page image.
+	 * We don't bother to update the FSM in that case, it doesn't need to be
+	 * totally accurate anyway.
+	 */
+	if (freespace < BLCKSZ / 5)
+		XLogRecordPageWithFreeSpace(xlrec->node, blkno, freespace);
+}
+
+/*
  * Handles UPDATE and HOT_UPDATE
  */
 static void
@@ -5118,6 +5527,9 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record)
 		case XLOG_HEAP2_VISIBLE:
 			heap_xlog_visible(lsn, record);
 			break;
+		case XLOG_HEAP2_MULTI_INSERT:
+			heap_xlog_multi_insert(lsn, record);
+			break;
 		default:
 			elog(PANIC, "heap2_redo: unknown op code %u", info);
 	}
@@ -5255,6 +5667,18 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec)
 						 xlrec->node.spcNode, xlrec->node.dbNode,
 						 xlrec->node.relNode, xlrec->block);
 	}
+	else if (info == XLOG_HEAP2_MULTI_INSERT)
+	{
+		xl_heap_multi_insert *xlrec = (xl_heap_multi_insert *) rec;
+
+		if (xl_info & XLOG_HEAP_INIT_PAGE)
+			appendStringInfo(buf, "multi-insert (init): ");
+		else
+			appendStringInfo(buf, "multi-insert: ");
+		appendStringInfo(buf, "rel %u/%u/%u; blk %u; %d tuples",
+						 xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode,
+						 xlrec->blkno, xlrec->ntuples);
+	}
 	else
 		appendStringInfo(buf, "UNKNOWN");
 }
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index a9c1980..0d75104 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -33,6 +33,7 @@
 #include "libpq/pqformat.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "optimizer/clauses.h"
 #include "optimizer/planner.h"
 #include "parser/parse_relation.h"
 #include "rewrite/rewriteHandler.h"
@@ -149,6 +150,7 @@ typedef struct CopyStateData
 	Oid		   *typioparams;	/* array of element types for in_functions */
 	int		   *defmap;			/* array of default att numbers */
 	ExprState **defexprs;		/* array of default att expressions */
+	bool		volatile_defexprs; /* is any of defexprs volatile? */
 
 	/*
 	 * These variables are used to reduce overhead in textual COPY FROM.
@@ -1842,11 +1844,17 @@ CopyFrom(CopyState cstate)
 	ExprContext *econtext;
 	TupleTableSlot *myslot;
 	MemoryContext oldcontext = CurrentMemoryContext;
+
 	ErrorContextCallback errcontext;
 	CommandId	mycid = GetCurrentCommandId(true);
 	int			hi_options = 0; /* start with default heap_insert options */
 	BulkInsertState bistate;
 	uint64		processed = 0;
+	bool		useHeapMultiInsert = true;
+	int			nBufferedTuples = 0;
+#define MAX_BUFFERED_TUPLES 1000
+	HeapTuple	bufferedTuples[MAX_BUFFERED_TUPLES];
+	Size		bufferedTuplesSize = 0;
 
 	Assert(cstate->rel);
 
@@ -1941,6 +1949,23 @@ CopyFrom(CopyState cstate)
 	/* Triggers might need a slot as well */
 	estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
 
+	/*
+	 * It's more efficient to prepare a bunch of tuples for insertion, and
+	 * insert them in one heap_multi_insert() call, than call heap_insert()
+	 * separately for every tuple. However, we can't do that if there are
+	 * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
+	 * expressions. Such triggers or expressions might query the table we're
+	 * inserting to, and act differently if the tuples that have already been
+	 * processed and prepared for insertion are not there.
+	 */
+	if ((resultRelInfo->ri_TrigDesc != NULL &&
+		(resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
+		 resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
+		cstate->volatile_defexprs)
+	{
+		useHeapMultiInsert = false;
+	}
+
 	/* Prepare to catch AFTER triggers. */
 	AfterTriggerBeginQuery();
 
@@ -1972,8 +1997,15 @@ CopyFrom(CopyState cstate)
 
 		CHECK_FOR_INTERRUPTS();
 
-		/* Reset the per-tuple exprcontext */
-		ResetPerTupleExprContext(estate);
+		if (nBufferedTuples == 0)
+		{
+			/*
+			 * Reset the per-tuple exprcontext. We can only do this if the
+			 * tuple buffer is empty (calling the context the per-tuple memory
+			 * context is a bit of a misnomer now
+			 */
+			ResetPerTupleExprContext(estate);
+		}
 
 		/* Switch into its memory context */
 		MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
@@ -2016,18 +2048,69 @@ CopyFrom(CopyState cstate)
 			if (cstate->rel->rd_att->constr)
 				ExecConstraints(resultRelInfo, slot, estate);
 
-			/* OK, store the tuple and create index entries for it */
-			heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
+			if (useHeapMultiInsert)
+			{
+				/* Insert this tuple to the tuple buffer */
+				bufferedTuples[nBufferedTuples++] = tuple;
+				bufferedTuplesSize += tuple->t_len;
 
-			if (resultRelInfo->ri_NumIndices > 0)
-				recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
-													   estate);
+				/*
+				 * If the buffer filled up, flush it. Also flush if the total
+				 * size of all the tuples in the buffer becomes large, to
+				 * avoid using large amounts of memory for the buffers when
+				 * the tuples are exceptionally wide.
+				 */
+				if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
+					bufferedTuplesSize > 65535)
+				{
+					/*
+					 * heap_multi_insert leaks memory, so switch into
+					 * short-lived memory context before calling it.
+					 */
+					MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+					heap_multi_insert(cstate->rel,
+									  bufferedTuples,
+									  nBufferedTuples,
+									  mycid,
+									  hi_options,
+									  bistate);
+					MemoryContextSwitchTo(oldcontext);
 
-			/* AFTER ROW INSERT Triggers */
-			ExecARInsertTriggers(estate, resultRelInfo, tuple,
-								 recheckIndexes);
+					/*
+					 * If there are any indexes, update the indexes for all the
+					 * inserted tuples.
+					 */
+					if (resultRelInfo->ri_NumIndices > 0)
+					{
+						int i;
+						for (i = 0; i < nBufferedTuples; i++)
+						{
+							ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
+							recheckIndexes = ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
+																   estate);
+							list_free(recheckIndexes);
+						}
+					}
+
+					nBufferedTuples = 0;
+					bufferedTuplesSize = 0;
+				}
+			}
+			else
+			{
+				/* OK, store the tuple and create index entries for it */
+				heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
 
-			list_free(recheckIndexes);
+				if (resultRelInfo->ri_NumIndices > 0)
+					recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
+														   estate);
+
+				/* AFTER ROW INSERT Triggers */
+				ExecARInsertTriggers(estate, resultRelInfo, tuple,
+									 recheckIndexes);
+
+				list_free(recheckIndexes);
+			}
 
 			/*
 			 * We count only tuples not suppressed by a BEFORE INSERT trigger;
@@ -2038,6 +2121,34 @@ CopyFrom(CopyState cstate)
 		}
 	}
 
+	/* Flush any remaining buffered tuples */
+	if (nBufferedTuples > 0)
+	{
+		heap_multi_insert(cstate->rel,
+						  bufferedTuples,
+						  nBufferedTuples,
+						  mycid,
+						  hi_options,
+						  bistate);
+
+		/*
+		 * If there are any indexes, update the indexes for all the
+		 * inserted tuples.
+		 */
+		if (resultRelInfo->ri_NumIndices > 0)
+		{
+			int i;
+			for (i = 0; i < nBufferedTuples; i++)
+			{
+				List *recheckIndexes;
+				ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
+				recheckIndexes = ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
+													   estate);
+				list_free(recheckIndexes);
+			}
+		}
+	}
+
 	/* Done, clean up */
 	error_context_stack = errcontext.previous;
 
@@ -2099,6 +2210,7 @@ BeginCopyFrom(Relation rel,
 	int		   *defmap;
 	ExprState **defexprs;
 	MemoryContext oldcontext;
+	bool		volatile_defexprs;
 
 	cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options);
 	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
@@ -2122,6 +2234,7 @@ BeginCopyFrom(Relation rel,
 	attr = tupDesc->attrs;
 	num_phys_attrs = tupDesc->natts;
 	num_defaults = 0;
+	volatile_defexprs = false;
 
 	/*
 	 * Pick up the required catalog information for each attribute in the
@@ -2163,6 +2276,9 @@ BeginCopyFrom(Relation rel,
 								 expression_planner((Expr *) defexpr), NULL);
 				defmap[num_defaults] = attnum - 1;
 				num_defaults++;
+
+				if (!volatile_defexprs)
+					volatile_defexprs = contain_volatile_functions(defexpr);
 			}
 		}
 	}
@@ -2172,6 +2288,7 @@ BeginCopyFrom(Relation rel,
 	cstate->typioparams = typioparams;
 	cstate->defmap = defmap;
 	cstate->defexprs = defexprs;
+	cstate->volatile_defexprs = volatile_defexprs;
 	cstate->num_defaults = num_defaults;
 
 	if (pipe)
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index eb9adc8..4f1a6ab 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -1672,10 +1672,10 @@ add_tabstat_xact_level(PgStat_TableStatus *pgstat_info, int nest_level)
 }
 
 /*
- * pgstat_count_heap_insert - count a tuple insertion
+ * pgstat_count_heap_insert - count a tuple insertion of n tuples
  */
 void
-pgstat_count_heap_insert(Relation rel)
+pgstat_count_heap_insert(Relation rel, int n)
 {
 	PgStat_TableStatus *pgstat_info = rel->pgstat_info;
 
@@ -1688,7 +1688,7 @@ pgstat_count_heap_insert(Relation rel)
 			pgstat_info->trans->nest_level != nest_level)
 			add_tabstat_xact_level(pgstat_info, nest_level);
 
-		pgstat_info->trans->tuples_inserted++;
+		pgstat_info->trans->tuples_inserted += n;
 	}
 }
 
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 776ea5c..dfbd276 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -97,6 +97,8 @@ extern void FreeBulkInsertState(BulkInsertState);
 
 extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 			int options, BulkInsertState bistate);
+extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+				  CommandId cid, int options, BulkInsertState bistate);
 extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
 			ItemPointer ctid, TransactionId *update_xmax,
 			CommandId cid, Snapshot crosscheck, bool wait);
diff --git a/src/include/access/htup.h b/src/include/access/htup.h
index c025835..2e87f22 100644
--- a/src/include/access/htup.h
+++ b/src/include/access/htup.h
@@ -608,6 +608,7 @@ typedef HeapTupleData *HeapTuple;
 /* 0x20 is free, was XLOG_HEAP2_CLEAN_MOVE */
 #define XLOG_HEAP2_CLEANUP_INFO 0x30
 #define XLOG_HEAP2_VISIBLE		0x40
+#define XLOG_HEAP2_MULTI_INSERT	0x50
 
 /*
  * All what we need to find changed tuple
@@ -661,6 +662,36 @@ typedef struct xl_heap_insert
 
 #define SizeOfHeapInsert	(offsetof(xl_heap_insert, all_visible_cleared) + sizeof(bool))
 
+/*
+ * This is what we need to know about a multi-insert. The record consists of
+ * xl_heap_multi_insert header, followed by a xl_multi_insert_tuple and tuple
+ * data for each tuple. 'offsets' array is omitted if the whole page is
+ * reinitialized (XLOG_HEAP_INIT_PAGE)
+ */
+typedef struct xl_heap_multi_insert
+{
+	RelFileNode node;
+	BlockNumber	blkno;
+	bool		all_visible_cleared;
+	uint16		ntuples;
+	OffsetNumber offsets[1];
+
+	/* TUPLE DATA (xl_multi_insert_tuples) FOLLOW AT END OF STRUCT */
+} xl_heap_multi_insert;
+
+#define SizeOfHeapMultiInsert	offsetof(xl_heap_multi_insert, offsets)
+
+typedef struct xl_multi_insert_tuple
+{
+	uint16		datalen;				/* size of tuple data that follows */
+	uint16		t_infomask2;
+	uint16		t_infomask;
+	uint8		t_hoff;
+	/* TUPLE DATA FOLLOWS AT END OF STRUCT */
+} xl_multi_insert_tuple;
+
+#define SizeOfMultiInsertTuple	(offsetof(xl_multi_insert_tuple, t_hoff) + sizeof(uint8))
+
 /* This is what we need to know about update|hot_update */
 typedef struct xl_heap_update
 {
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 20c4d43..689b95f 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -764,7 +764,7 @@ extern void pgstat_initstats(Relation rel);
 			(rel)->pgstat_info->t_counts.t_blocks_hit++;			\
 	} while (0)
 
-extern void pgstat_count_heap_insert(Relation rel);
+extern void pgstat_count_heap_insert(Relation rel, int n);
 extern void pgstat_count_heap_update(Relation rel, bool hot);
 extern void pgstat_count_heap_delete(Relation rel);
 extern void pgstat_update_heap_dead_tuples(Relation rel, int delta);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to