On 25.09.2011 16:03, Dean Rasheed wrote:
On 25 September 2011 09:43, Kohei KaiGai<kai...@kaigai.gr.jp>  wrote:
Hi Heikki,

I checked your patch, then I have a comment and two questions here.

2011/9/14 Heikki Linnakangas<heikki.linnakan...@enterprisedb.com>:

Attached is a new version of the patch. It is now complete, including WAL
replay code.

Hi,

I had a quick look at the patch as well and spotted an oversight: the
multi-insert code branch fails to invoke AFTER ROW triggers.

Thanks! Here's an updated version of the patch, fixing that, and all the other issues pointed out this far.

I extracted the code that sets oid and tuple headers, and invokes the toaster, into a new function that's shared by heap_insert() and heap_multi_insert(). Tom objected to merging heap_insert() and heap_multi_insert() into one complicated function, and I think he was right on that, but sharing this code to prepare a tuple still makes sense. IMHO it makes heap_insert() slightly more readable too.

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
*** a/src/backend/access/heap/heapam.c
--- b/src/backend/access/heap/heapam.c
***************
*** 24,29 ****
--- 24,30 ----
   *		heap_getnext	- retrieve next tuple in scan
   *		heap_fetch		- retrieve tuple with given tid
   *		heap_insert		- insert tuple into a relation
+  *		heap_multi_insert - insert multiple tuples into a relation
   *		heap_delete		- delete a tuple from a relation
   *		heap_update		- replace a tuple in a relation with another tuple
   *		heap_markpos	- mark scan position
***************
*** 79,84 **** static HeapScanDesc heap_beginscan_internal(Relation relation,
--- 80,87 ----
  						int nkeys, ScanKey key,
  						bool allow_strat, bool allow_sync,
  						bool is_bitmapscan);
+ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
+ 					TransactionId xid, CommandId cid, int options);
  static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
  				ItemPointerData from, Buffer newbuf, HeapTuple newtup,
  				bool all_visible_cleared, bool new_all_visible_cleared);
***************
*** 1866,1920 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
  	Buffer		vmbuffer = InvalidBuffer;
  	bool		all_visible_cleared = false;
  
- 	if (relation->rd_rel->relhasoids)
- 	{
- #ifdef NOT_USED
- 		/* this is redundant with an Assert in HeapTupleSetOid */
- 		Assert(tup->t_data->t_infomask & HEAP_HASOID);
- #endif
- 
- 		/*
- 		 * If the object id of this tuple has already been assigned, trust the
- 		 * caller.	There are a couple of ways this can happen.  At initial db
- 		 * creation, the backend program sets oids for tuples. When we define
- 		 * an index, we set the oid.  Finally, in the future, we may allow
- 		 * users to set their own object ids in order to support a persistent
- 		 * object store (objects need to contain pointers to one another).
- 		 */
- 		if (!OidIsValid(HeapTupleGetOid(tup)))
- 			HeapTupleSetOid(tup, GetNewOid(relation));
- 	}
- 	else
- 	{
- 		/* check there is not space for an OID */
- 		Assert(!(tup->t_data->t_infomask & HEAP_HASOID));
- 	}
- 
- 	tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
- 	tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
- 	tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
- 	HeapTupleHeaderSetXmin(tup->t_data, xid);
- 	HeapTupleHeaderSetCmin(tup->t_data, cid);
- 	HeapTupleHeaderSetXmax(tup->t_data, 0);		/* for cleanliness */
- 	tup->t_tableOid = RelationGetRelid(relation);
- 
  	/*
! 	 * If the new tuple is too big for storage or contains already toasted
! 	 * out-of-line attributes from some other relation, invoke the toaster.
  	 *
  	 * Note: below this point, heaptup is the data we actually intend to store
  	 * into the relation; tup is the caller's original untoasted data.
  	 */
! 	if (relation->rd_rel->relkind != RELKIND_RELATION)
! 	{
! 		/* toast table entries should never be recursively toasted */
! 		Assert(!HeapTupleHasExternal(tup));
! 		heaptup = tup;
! 	}
! 	else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
! 		heaptup = toast_insert_or_update(relation, tup, NULL, options);
! 	else
! 		heaptup = tup;
  
  	/*
  	 * We're about to do the actual insert -- but check for conflict first,
--- 1869,1882 ----
  	Buffer		vmbuffer = InvalidBuffer;
  	bool		all_visible_cleared = false;
  
  	/*
! 	 * Fill in tuple header fields, assign an OID, and toast the tuple if
! 	 * necessary.
  	 *
  	 * Note: below this point, heaptup is the data we actually intend to store
  	 * into the relation; tup is the caller's original untoasted data.
  	 */
! 	heaptup = heap_prepare_insert(relation, tup, xid, cid, options);
  
  	/*
  	 * We're about to do the actual insert -- but check for conflict first,
***************
*** 2035,2041 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
  	 */
  	CacheInvalidateHeapTuple(relation, heaptup, NULL);
  
! 	pgstat_count_heap_insert(relation);
  
  	/*
  	 * If heaptup is a private copy, release it.  Don't forget to copy t_self
--- 1997,2003 ----
  	 */
  	CacheInvalidateHeapTuple(relation, heaptup, NULL);
  
! 	pgstat_count_heap_insert(relation, 1);
  
  	/*
  	 * If heaptup is a private copy, release it.  Don't forget to copy t_self
***************
*** 2051,2056 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
--- 2013,2297 ----
  }
  
  /*
+  * Subroutine for heap_insert(). Prepares a tuple for insertion. This sets the
+  * tuple header fields, assigns an OID, and toasts the tuple if necessary.
+  * Returns a toasted version of the tuple if it was toasted, or the original
+  * tuple if not. Note that in any case, the header fields are also set in
+  * the original tuple.
+  */
+ static HeapTuple
+ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
+ 					CommandId cid, int options)
+ {
+ 	if (relation->rd_rel->relhasoids)
+ 	{
+ #ifdef NOT_USED
+ 		/* this is redundant with an Assert in HeapTupleSetOid */
+ 		Assert(tup->t_data->t_infomask & HEAP_HASOID);
+ #endif
+ 
+ 		/*
+ 		 * If the object id of this tuple has already been assigned, trust the
+ 		 * caller.	There are a couple of ways this can happen.  At initial db
+ 		 * creation, the backend program sets oids for tuples. When we define
+ 		 * an index, we set the oid.  Finally, in the future, we may allow
+ 		 * users to set their own object ids in order to support a persistent
+ 		 * object store (objects need to contain pointers to one another).
+ 		 */
+ 		if (!OidIsValid(HeapTupleGetOid(tup)))
+ 			HeapTupleSetOid(tup, GetNewOid(relation));
+ 	}
+ 	else
+ 	{
+ 		/* check there is not space for an OID */
+ 		Assert(!(tup->t_data->t_infomask & HEAP_HASOID));
+ 	}
+ 
+ 	tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
+ 	tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
+ 	tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
+ 	HeapTupleHeaderSetXmin(tup->t_data, xid);
+ 	HeapTupleHeaderSetCmin(tup->t_data, cid);
+ 	HeapTupleHeaderSetXmax(tup->t_data, 0);		/* for cleanliness */
+ 	tup->t_tableOid = RelationGetRelid(relation);
+ 
+ 	/*
+ 	 * If the new tuple is too big for storage or contains already toasted
+ 	 * out-of-line attributes from some other relation, invoke the toaster.
+ 	 */
+ 	if (relation->rd_rel->relkind != RELKIND_RELATION)
+ 	{
+ 		/* toast table entries should never be recursively toasted */
+ 		Assert(!HeapTupleHasExternal(tup));
+ 		return tup;
+ 	}
+ 	else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
+ 		return toast_insert_or_update(relation, tup, NULL, options);
+ 	else
+ 		return tup;
+ }
+ 
+ /*
+  *	heap_multi_insert	- insert multiple tuple into a heap
+  *
+  * This is like heap_insert(), but inserts multiple tuples in one operation.
+  * That's faster than calling heap_insert() in a loop, because when multiple
+  * tuples can be inserted on a single page, we can write just a single WAL
+  * record covering all of them, and only need to lock/unlock the page once.
+  *
+  * Note: this leaks memory into the current memory context. You can create a
+  * temporary context before calling this, if that's a problem.
+  */
+ void
+ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+ 				  CommandId cid, int options, BulkInsertState bistate)
+ {
+ 	TransactionId xid = GetCurrentTransactionId();
+ 	HeapTuple  *heaptuples;
+ 	Buffer		buffer;
+ 	Buffer		vmbuffer = InvalidBuffer;
+ 	bool		all_visible_cleared = false;
+ 	int			i;
+ 	int			ndone;
+ 	char	   *scratch = NULL;
+ 	Page		page;
+ 	bool		needwal;
+ 
+ 	needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation);
+ 
+ 	/* Toast and set header data in all the tuples */
+ 	heaptuples = palloc(ntuples * sizeof(HeapTuple));
+ 	for (i = 0; i < ntuples; i++)
+ 		heaptuples[i] = heap_prepare_insert(relation, tuples[i],
+ 											xid, cid, options);
+ 
+ 	/*
+ 	 * Allocate some memory to use for constructing the WAL record. Using
+ 	 * palloc() within a critical section is not safe, so we allocate this
+ 	 * beforehand.
+ 	 */
+ 	if (needwal)
+ 		scratch = palloc(BLCKSZ);
+ 
+ 	/*
+ 	 * We're about to do the actual inserts -- but check for conflict first,
+ 	 * to avoid possibly having to roll back work we've just done.
+ 	 *
+ 	 * For a heap insert, we only need to check for table-level SSI locks.
+ 	 * Our new tuple can't possibly conflict with existing tuple locks, and
+ 	 * heap page locks are only consolidated versions of tuple locks; they do
+ 	 * not lock "gaps" as index page locks do.  So we don't need to identify
+ 	 * a buffer before making the call.
+ 	 */
+ 	CheckForSerializableConflictIn(relation, NULL, InvalidBuffer);
+ 
+ 	ndone = 0;
+ 	while (ndone < ntuples)
+ 	{
+ 		int nthispage;
+ 
+ 		/*
+ 		 * Find buffer where at least the next tuple will fit.  If the page
+ 		 * is all-visible, this will also pin the requisite visibility map
+ 		 * page.
+ 		 */
+ 		buffer = RelationGetBufferForTuple(relation, heaptuples[ndone]->t_len,
+ 										   InvalidBuffer, options, bistate,
+ 										   &vmbuffer, NULL);
+ 		page = BufferGetPage(buffer);
+ 
+ 		if (PageIsAllVisible(page))
+ 		{
+ 			all_visible_cleared = true;
+ 			PageClearAllVisible(page);
+ 			visibilitymap_clear(relation,
+ 								BufferGetBlockNumber(buffer),
+ 								vmbuffer);
+ 		}
+ 
+ 		/* NO EREPORT(ERROR) from here till changes are logged */
+ 		START_CRIT_SECTION();
+ 
+ 		/* Put as many tuples as fit on this page */
+ 		for (nthispage = 0; ndone + nthispage < ntuples; nthispage++)
+ 		{
+ 			HeapTuple	heaptup = heaptuples[ndone + nthispage];
+ 
+ 			if (PageGetHeapFreeSpace(page) < MAXALIGN(heaptup->t_len))
+ 				break;
+ 
+ 			RelationPutHeapTuple(relation, buffer, heaptup);
+ 		}
+ 
+ 		/*
+ 		 * XXX Should we set PageSetPrunable on this page ? See heap_insert()
+ 		 */
+ 
+ 		MarkBufferDirty(buffer);
+ 
+ 		/* XLOG stuff */
+ 		if (needwal)
+ 		{
+ 			XLogRecPtr	recptr;
+ 			xl_heap_multi_insert *xlrec;
+ 			XLogRecData rdata[2];
+ 			uint8		info = XLOG_HEAP2_MULTI_INSERT;
+ 			char	   *tupledata;
+ 			int			totaldatalen;
+ 			char	   *scratchptr = scratch;
+ 			bool		init;
+ 
+ 			/*
+ 			 * If the page was previously empty, we can reinit the page
+ 			 * instead of restoring the whole thing.
+ 			 */
+ 			init = (ItemPointerGetOffsetNumber(&(heaptuples[ndone]->t_self)) == FirstOffsetNumber &&
+ 					PageGetMaxOffsetNumber(page) == FirstOffsetNumber + nthispage - 1);
+ 
+ 			/* allocate xl_heap_multi_insert struct from the scratch area */
+ 			xlrec = (xl_heap_multi_insert *) scratchptr;
+ 			scratchptr += SizeOfHeapMultiInsert;
+ 
+ 			/*
+ 			 * Allocate offsets array. Unless we're reinitializing the page,
+ 			 * in that case the tuples are stored in order starting at
+ 			 * FirstOffsetNumber and we don't need to store the offsets
+ 			 * explicitly.
+ 			 */
+ 			if (!init)
+ 				scratchptr += nthispage * sizeof(OffsetNumber);
+ 
+ 			/* the rest of the scratch space is used for tuple data */
+ 			tupledata = scratchptr;
+ 
+ 			xlrec->all_visible_cleared = all_visible_cleared;
+ 			xlrec->node = relation->rd_node;
+ 			xlrec->blkno = BufferGetBlockNumber(buffer);
+ 			xlrec->ntuples = nthispage;
+ 
+ 			/*
+ 			 * Write out an xl_multi_insert_tuple and the tuple data itself
+ 			 * for each tuple.
+ 			 */
+ 			for (i = 0; i < nthispage; i++)
+ 			{
+ 				HeapTuple	heaptup = heaptuples[ndone + i];
+ 				xl_multi_insert_tuple *tuphdr;
+ 				int			datalen;
+ 
+ 				if (!init)
+ 					xlrec->offsets[i] = ItemPointerGetOffsetNumber(&heaptup->t_self);
+ 				/* xl_multi_insert_tuple needs two-byte alignment. */
+ 				tuphdr = (xl_multi_insert_tuple *) SHORTALIGN(scratchptr);
+ 				scratchptr = ((char *) tuphdr) + SizeOfMultiInsertTuple;
+ 
+ 				tuphdr->t_infomask2 = heaptup->t_data->t_infomask2;
+ 				tuphdr->t_infomask = heaptup->t_data->t_infomask;
+ 				tuphdr->t_hoff = heaptup->t_data->t_hoff;
+ 
+ 				/* write bitmap [+ padding] [+ oid] + data */
+ 				datalen = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits);
+ 				memcpy(scratchptr,
+ 					   (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits),
+ 					   datalen);
+ 				tuphdr->datalen = datalen;
+ 				scratchptr += datalen;
+ 			}
+ 			totaldatalen = scratchptr - tupledata;
+ 			Assert((scratchptr - scratch) < BLCKSZ);
+ 
+ 			rdata[0].data = (char *) xlrec;
+ 			rdata[0].len = tupledata - scratch;
+ 			rdata[0].buffer = InvalidBuffer;
+ 			rdata[0].next = &rdata[1];
+ 
+ 			rdata[1].data = tupledata;
+ 			rdata[1].len = totaldatalen;
+ 			rdata[1].buffer = buffer;
+ 			rdata[1].buffer_std = true;
+ 			rdata[1].next = NULL;
+ 
+ 			/*
+ 			 * If we're going to reinitialize the whole page using the WAL
+ 			 * record, hide buffer reference from XLogInsert.
+ 			 */
+ 			if (init)
+ 			{
+ 				rdata[1].buffer = InvalidBuffer;
+ 				info |= XLOG_HEAP_INIT_PAGE;
+ 			}
+ 
+ 			recptr = XLogInsert(RM_HEAP2_ID, info, rdata);
+ 
+ 			PageSetLSN(page, recptr);
+ 			PageSetTLI(page, ThisTimeLineID);
+ 		}
+ 
+ 		END_CRIT_SECTION();
+ 
+ 		UnlockReleaseBuffer(buffer);
+ 		if (vmbuffer != InvalidBuffer)
+ 			ReleaseBuffer(vmbuffer);
+ 
+ 		ndone += nthispage;
+ 	}
+ 
+ 	/*
+ 	 * If tuples are cachable, mark them for invalidation from the caches in
+ 	 * case we abort.  Note it is OK to do this after releasing the buffer,
+ 	 * because the heaptuples data structure is all in local memory, not in
+ 	 * the shared buffer.
+ 	 */
+ 	if (IsSystemRelation(relation))
+ 	{
+ 		for (i = 0; i < ntuples; i++)
+ 			CacheInvalidateHeapTuple(relation, heaptuples[i], NULL);
+ 	}
+ 
+ 	pgstat_count_heap_insert(relation, ntuples);
+ }
+ 
+ /*
   *	simple_heap_insert - insert a tuple
   *
   * Currently, this routine differs from heap_insert only in supplying
***************
*** 4738,4743 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
--- 4979,5122 ----
  }
  
  /*
+  * Handles MULTI_INSERT record type.
+  */
+ static void
+ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
+ {
+ 	char	   *recdata = XLogRecGetData(record);
+ 	xl_heap_multi_insert *xlrec;
+ 	Buffer		buffer;
+ 	Page		page;
+ 	struct
+ 	{
+ 		HeapTupleHeaderData hdr;
+ 		char		data[MaxHeapTupleSize];
+ 	}			tbuf;
+ 	HeapTupleHeader htup;
+ 	uint32		newlen;
+ 	Size		freespace;
+ 	BlockNumber blkno;
+ 	int			i;
+ 	bool		isinit = (record->xl_info & XLOG_HEAP_INIT_PAGE) != 0;
+ 
+ 	xlrec = (xl_heap_multi_insert *) recdata;
+ 	recdata += SizeOfHeapMultiInsert;
+ 
+ 	/*
+ 	 * If we're reinitializing the page, the tuples are stored in order from
+ 	 * FirstOffsetNumber. Otherwise there's an array of offsets in the WAL
+ 	 * record.
+ 	 */
+ 	if (!isinit)
+ 		recdata += sizeof(OffsetNumber) * xlrec->ntuples;
+ 
+ 	blkno = xlrec->blkno;
+ 
+ 	/*
+ 	 * The visibility map may need to be fixed even if the heap page is
+ 	 * already up-to-date.
+ 	 */
+ 	if (xlrec->all_visible_cleared)
+ 	{
+ 		Relation	reln = CreateFakeRelcacheEntry(xlrec->node);
+ 		Buffer		vmbuffer = InvalidBuffer;
+ 
+ 		visibilitymap_pin(reln, blkno, &vmbuffer);
+ 		visibilitymap_clear(reln, blkno, vmbuffer);
+ 		ReleaseBuffer(vmbuffer);
+ 		FreeFakeRelcacheEntry(reln);
+ 	}
+ 
+ 	if (record->xl_info & XLR_BKP_BLOCK_1)
+ 		return;
+ 
+ 	if (isinit)
+ 	{
+ 		buffer = XLogReadBuffer(xlrec->node, blkno, true);
+ 		Assert(BufferIsValid(buffer));
+ 		page = (Page) BufferGetPage(buffer);
+ 
+ 		PageInit(page, BufferGetPageSize(buffer), 0);
+ 	}
+ 	else
+ 	{
+ 		buffer = XLogReadBuffer(xlrec->node, blkno, false);
+ 		if (!BufferIsValid(buffer))
+ 			return;
+ 		page = (Page) BufferGetPage(buffer);
+ 
+ 		if (XLByteLE(lsn, PageGetLSN(page)))	/* changes are applied */
+ 		{
+ 			UnlockReleaseBuffer(buffer);
+ 			return;
+ 		}
+ 	}
+ 
+ 	for (i = 0; i < xlrec->ntuples; i++)
+ 	{
+ 		OffsetNumber offnum;
+ 		xl_multi_insert_tuple *xlhdr;
+ 
+ 		if (isinit)
+ 			offnum = FirstOffsetNumber + i;
+ 		else
+ 			offnum = xlrec->offsets[i];
+ 		if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+ 			elog(PANIC, "heap_multi_insert_redo: invalid max offset number");
+ 
+ 		xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(recdata);
+ 		recdata += SizeOfMultiInsertTuple;
+ 
+ 		newlen = xlhdr->datalen;
+ 		Assert(newlen <= MaxHeapTupleSize);
+ 		htup = &tbuf.hdr;
+ 		MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
+ 		/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
+ 		memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
+ 			   (char *) recdata,
+ 			   newlen);
+ 		recdata += newlen;
+ 
+ 		newlen += offsetof(HeapTupleHeaderData, t_bits);
+ 		htup->t_infomask2 = xlhdr->t_infomask2;
+ 		htup->t_infomask = xlhdr->t_infomask;
+ 		htup->t_hoff = xlhdr->t_hoff;
+ 		HeapTupleHeaderSetXmin(htup, record->xl_xid);
+ 		HeapTupleHeaderSetCmin(htup, FirstCommandId);
+ 		ItemPointerSetBlockNumber(&htup->t_ctid, blkno);
+ 		ItemPointerSetOffsetNumber(&htup->t_ctid, offnum);
+ 
+ 		offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
+ 		if (offnum == InvalidOffsetNumber)
+ 			elog(PANIC, "heap_multi_insert_redo: failed to add tuple");
+ 	}
+ 
+ 	freespace = PageGetHeapFreeSpace(page);		/* needed to update FSM below */
+ 
+ 	PageSetLSN(page, lsn);
+ 	PageSetTLI(page, ThisTimeLineID);
+ 
+ 	if (xlrec->all_visible_cleared)
+ 		PageClearAllVisible(page);
+ 
+ 	MarkBufferDirty(buffer);
+ 	UnlockReleaseBuffer(buffer);
+ 
+ 	/*
+ 	 * If the page is running low on free space, update the FSM as well.
+ 	 * Arbitrarily, our definition of "low" is less than 20%. We can't do much
+ 	 * better than that without knowing the fill-factor for the table.
+ 	 *
+ 	 * XXX: We don't get here if the page was restored from full page image.
+ 	 * We don't bother to update the FSM in that case, it doesn't need to be
+ 	 * totally accurate anyway.
+ 	 */
+ 	if (freespace < BLCKSZ / 5)
+ 		XLogRecordPageWithFreeSpace(xlrec->node, blkno, freespace);
+ }
+ 
+ /*
   * Handles UPDATE and HOT_UPDATE
   */
  static void
***************
*** 5126,5131 **** heap2_redo(XLogRecPtr lsn, XLogRecord *record)
--- 5505,5513 ----
  		case XLOG_HEAP2_VISIBLE:
  			heap_xlog_visible(lsn, record);
  			break;
+ 		case XLOG_HEAP2_MULTI_INSERT:
+ 			heap_xlog_multi_insert(lsn, record);
+ 			break;
  		default:
  			elog(PANIC, "heap2_redo: unknown op code %u", info);
  	}
***************
*** 5263,5268 **** heap2_desc(StringInfo buf, uint8 xl_info, char *rec)
--- 5645,5662 ----
  						 xlrec->node.spcNode, xlrec->node.dbNode,
  						 xlrec->node.relNode, xlrec->block);
  	}
+ 	else if (info == XLOG_HEAP2_MULTI_INSERT)
+ 	{
+ 		xl_heap_multi_insert *xlrec = (xl_heap_multi_insert *) rec;
+ 
+ 		if (xl_info & XLOG_HEAP_INIT_PAGE)
+ 			appendStringInfo(buf, "multi-insert (init): ");
+ 		else
+ 			appendStringInfo(buf, "multi-insert: ");
+ 		appendStringInfo(buf, "rel %u/%u/%u; blk %u; %d tuples",
+ 						 xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode,
+ 						 xlrec->blkno, xlrec->ntuples);
+ 	}
  	else
  		appendStringInfo(buf, "UNKNOWN");
  }
*** a/src/backend/commands/copy.c
--- b/src/backend/commands/copy.c
***************
*** 33,38 ****
--- 33,39 ----
  #include "libpq/pqformat.h"
  #include "mb/pg_wchar.h"
  #include "miscadmin.h"
+ #include "optimizer/clauses.h"
  #include "optimizer/planner.h"
  #include "parser/parse_relation.h"
  #include "rewrite/rewriteHandler.h"
***************
*** 149,154 **** typedef struct CopyStateData
--- 150,156 ----
  	Oid		   *typioparams;	/* array of element types for in_functions */
  	int		   *defmap;			/* array of default att numbers */
  	ExprState **defexprs;		/* array of default att expressions */
+ 	bool		volatile_defexprs; /* is any of defexprs volatile? */
  
  	/*
  	 * These variables are used to reduce overhead in textual COPY FROM.
***************
*** 277,282 **** static uint64 CopyTo(CopyState cstate);
--- 279,289 ----
  static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
  			 Datum *values, bool *nulls);
  static uint64 CopyFrom(CopyState cstate);
+ static void WriteCopyFromBatch(CopyState cstate, EState *estate,
+ 				   CommandId mycid, int hi_options,
+ 				   ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
+ 				   BulkInsertState bistate,
+ 				   int nBufferedTuples, HeapTuple *bufferedTuples);
  static bool CopyReadLine(CopyState cstate);
  static bool CopyReadLineText(CopyState cstate);
  static int	CopyReadAttributesText(CopyState cstate);
***************
*** 1842,1852 **** CopyFrom(CopyState cstate)
--- 1849,1865 ----
  	ExprContext *econtext;
  	TupleTableSlot *myslot;
  	MemoryContext oldcontext = CurrentMemoryContext;
+ 
  	ErrorContextCallback errcontext;
  	CommandId	mycid = GetCurrentCommandId(true);
  	int			hi_options = 0; /* start with default heap_insert options */
  	BulkInsertState bistate;
  	uint64		processed = 0;
+ 	bool		useHeapMultiInsert;
+ 	int			nBufferedTuples = 0;
+ #define MAX_BUFFERED_TUPLES 1000
+ 	HeapTuple  *bufferedTuples;
+ 	Size		bufferedTuplesSize = 0;
  
  	Assert(cstate->rel);
  
***************
*** 1941,1946 **** CopyFrom(CopyState cstate)
--- 1954,1981 ----
  	/* Triggers might need a slot as well */
  	estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
  
+ 	/*
+ 	 * It's more efficient to prepare a bunch of tuples for insertion, and
+ 	 * insert them in one heap_multi_insert() call, than call heap_insert()
+ 	 * separately for every tuple. However, we can't do that if there are
+ 	 * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
+ 	 * expressions. Such triggers or expressions might query the table we're
+ 	 * inserting to, and act differently if the tuples that have already been
+ 	 * processed and prepared for insertion are not there.
+ 	 */
+ 	if ((resultRelInfo->ri_TrigDesc != NULL &&
+ 		(resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
+ 		 resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
+ 		cstate->volatile_defexprs)
+ 	{
+ 		useHeapMultiInsert = false;
+ 	}
+ 	else
+ 	{
+ 		useHeapMultiInsert = true;
+ 		bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
+ 	}
+ 
  	/* Prepare to catch AFTER triggers. */
  	AfterTriggerBeginQuery();
  
***************
*** 1972,1979 **** CopyFrom(CopyState cstate)
  
  		CHECK_FOR_INTERRUPTS();
  
! 		/* Reset the per-tuple exprcontext */
! 		ResetPerTupleExprContext(estate);
  
  		/* Switch into its memory context */
  		MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
--- 2007,2021 ----
  
  		CHECK_FOR_INTERRUPTS();
  
! 		if (nBufferedTuples == 0)
! 		{
! 			/*
! 			 * Reset the per-tuple exprcontext. We can only do this if the
! 			 * tuple buffer is empty (calling the context the per-tuple memory
! 			 * context is a bit of a misnomer now
! 			 */
! 			ResetPerTupleExprContext(estate);
! 		}
  
  		/* Switch into its memory context */
  		MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
***************
*** 2010,2033 **** CopyFrom(CopyState cstate)
  
  		if (!skip_tuple)
  		{
- 			List	   *recheckIndexes = NIL;
- 
  			/* Check the constraints of the tuple */
  			if (cstate->rel->rd_att->constr)
  				ExecConstraints(resultRelInfo, slot, estate);
  
! 			/* OK, store the tuple and create index entries for it */
! 			heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
  
! 			if (resultRelInfo->ri_NumIndices > 0)
! 				recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
! 													   estate);
  
! 			/* AFTER ROW INSERT Triggers */
! 			ExecARInsertTriggers(estate, resultRelInfo, tuple,
! 								 recheckIndexes);
  
! 			list_free(recheckIndexes);
  
  			/*
  			 * We count only tuples not suppressed by a BEFORE INSERT trigger;
--- 2052,2100 ----
  
  		if (!skip_tuple)
  		{
  			/* Check the constraints of the tuple */
  			if (cstate->rel->rd_att->constr)
  				ExecConstraints(resultRelInfo, slot, estate);
  
! 			if (useHeapMultiInsert)
! 			{
! 				/* Add this tuple to the tuple buffer */
! 				bufferedTuples[nBufferedTuples++] = tuple;
! 				bufferedTuplesSize += tuple->t_len;
  
! 				/*
! 				 * If the buffer filled up, flush it. Also flush if the total
! 				 * size of all the tuples in the buffer becomes large, to
! 				 * avoid using large amounts of memory for the buffers when
! 				 * the tuples are exceptionally wide.
! 				 */
! 				if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
! 					bufferedTuplesSize > 65535)
! 				{
! 					WriteCopyFromBatch(cstate, estate, mycid, hi_options,
! 									   resultRelInfo, myslot, bistate,
! 									   nBufferedTuples, bufferedTuples);
! 					nBufferedTuples = 0;
! 					bufferedTuplesSize = 0;
! 				}
! 			}
! 			else
! 			{
! 				List	   *recheckIndexes = NIL;
  
! 				/* OK, store the tuple and create index entries for it */
! 				heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
  
! 				if (resultRelInfo->ri_NumIndices > 0)
! 					recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
! 														   estate);
! 
! 				/* AFTER ROW INSERT Triggers */
! 				ExecARInsertTriggers(estate, resultRelInfo, tuple,
! 									 recheckIndexes);
! 
! 				list_free(recheckIndexes);
! 			}
  
  			/*
  			 * We count only tuples not suppressed by a BEFORE INSERT trigger;
***************
*** 2038,2043 **** CopyFrom(CopyState cstate)
--- 2105,2116 ----
  		}
  	}
  
+ 	/* Flush any remaining buffered tuples */
+ 	if (nBufferedTuples > 0)
+ 		WriteCopyFromBatch(cstate, estate, mycid, hi_options,
+ 						   resultRelInfo, myslot, bistate,
+ 						   nBufferedTuples, bufferedTuples);
+ 
  	/* Done, clean up */
  	error_context_stack = errcontext.previous;
  
***************
*** 2071,2076 **** CopyFrom(CopyState cstate)
--- 2144,2210 ----
  }
  
  /*
+  * A subroutine of CopyFrom, to write the current batch of buffered heap
+  * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
+  * triggers.
+  */
+ static void
+ WriteCopyFromBatch(CopyState cstate, EState *estate, CommandId mycid,
+ 				   int hi_options, ResultRelInfo *resultRelInfo,
+ 				   TupleTableSlot *myslot, BulkInsertState bistate,
+ 				   int nBufferedTuples, HeapTuple *bufferedTuples)
+ {
+ 	MemoryContext oldcontext;
+ 	int			i;
+ 
+ 	/*
+ 	 * heap_multi_insert leaks memory, so switch to short-lived memory
+ 	 * context before calling it.
+ 	 */
+ 	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ 	heap_multi_insert(cstate->rel,
+ 					  bufferedTuples,
+ 					  nBufferedTuples,
+ 					  mycid,
+ 					  hi_options,
+ 					  bistate);
+ 	MemoryContextSwitchTo(oldcontext);
+ 
+ 	/*
+ 	 * If there are any indexes, update them for all the inserted tuples,
+ 	 * and run AFTER ROW INSERT triggers.
+ 	 */
+ 	if (resultRelInfo->ri_NumIndices > 0)
+ 	{
+ 		for (i = 0; i < nBufferedTuples; i++)
+ 		{
+ 			List *recheckIndexes;
+ 
+ 			ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
+ 			recheckIndexes =
+ 				ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
+ 									  estate);
+ 			ExecARInsertTriggers(estate, resultRelInfo,
+ 								 bufferedTuples[i],
+ 								 recheckIndexes);
+ 			list_free(recheckIndexes);
+ 		}
+ 	}
+ 	/*
+ 	 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+ 	 * anyway.
+ 	 */
+ 	else if (resultRelInfo->ri_TrigDesc != NULL &&
+ 			 resultRelInfo->ri_TrigDesc->trig_insert_after_row)
+ 	{
+ 		for (i = 0; i < nBufferedTuples; i++)
+ 			ExecARInsertTriggers(estate, resultRelInfo,
+ 								 bufferedTuples[i],
+ 								 NIL);
+ 	}
+ }
+ 
+ /*
   * Setup to read tuples from a file for COPY FROM.
   *
   * 'rel': Used as a template for the tuples
***************
*** 2099,2104 **** BeginCopyFrom(Relation rel,
--- 2233,2239 ----
  	int		   *defmap;
  	ExprState **defexprs;
  	MemoryContext oldcontext;
+ 	bool		volatile_defexprs;
  
  	cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options);
  	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
***************
*** 2122,2127 **** BeginCopyFrom(Relation rel,
--- 2257,2263 ----
  	attr = tupDesc->attrs;
  	num_phys_attrs = tupDesc->natts;
  	num_defaults = 0;
+ 	volatile_defexprs = false;
  
  	/*
  	 * Pick up the required catalog information for each attribute in the
***************
*** 2163,2168 **** BeginCopyFrom(Relation rel,
--- 2299,2307 ----
  								 expression_planner((Expr *) defexpr), NULL);
  				defmap[num_defaults] = attnum - 1;
  				num_defaults++;
+ 
+ 				if (!volatile_defexprs)
+ 					volatile_defexprs = contain_volatile_functions(defexpr);
  			}
  		}
  	}
***************
*** 2172,2177 **** BeginCopyFrom(Relation rel,
--- 2311,2317 ----
  	cstate->typioparams = typioparams;
  	cstate->defmap = defmap;
  	cstate->defexprs = defexprs;
+ 	cstate->volatile_defexprs = volatile_defexprs;
  	cstate->num_defaults = num_defaults;
  
  	if (pipe)
*** a/src/backend/postmaster/pgstat.c
--- b/src/backend/postmaster/pgstat.c
***************
*** 1677,1686 **** add_tabstat_xact_level(PgStat_TableStatus *pgstat_info, int nest_level)
  }
  
  /*
!  * pgstat_count_heap_insert - count a tuple insertion
   */
  void
! pgstat_count_heap_insert(Relation rel)
  {
  	PgStat_TableStatus *pgstat_info = rel->pgstat_info;
  
--- 1677,1686 ----
  }
  
  /*
!  * pgstat_count_heap_insert - count a tuple insertion of n tuples
   */
  void
! pgstat_count_heap_insert(Relation rel, int n)
  {
  	PgStat_TableStatus *pgstat_info = rel->pgstat_info;
  
***************
*** 1693,1699 **** pgstat_count_heap_insert(Relation rel)
  			pgstat_info->trans->nest_level != nest_level)
  			add_tabstat_xact_level(pgstat_info, nest_level);
  
! 		pgstat_info->trans->tuples_inserted++;
  	}
  }
  
--- 1693,1699 ----
  			pgstat_info->trans->nest_level != nest_level)
  			add_tabstat_xact_level(pgstat_info, nest_level);
  
! 		pgstat_info->trans->tuples_inserted += n;
  	}
  }
  
*** a/src/include/access/heapam.h
--- b/src/include/access/heapam.h
***************
*** 97,102 **** extern void FreeBulkInsertState(BulkInsertState);
--- 97,104 ----
  
  extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid,
  			int options, BulkInsertState bistate);
+ extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+ 				  CommandId cid, int options, BulkInsertState bistate);
  extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
  			ItemPointer ctid, TransactionId *update_xmax,
  			CommandId cid, Snapshot crosscheck, bool wait);
*** a/src/include/access/htup.h
--- b/src/include/access/htup.h
***************
*** 608,613 **** typedef HeapTupleData *HeapTuple;
--- 608,614 ----
  /* 0x20 is free, was XLOG_HEAP2_CLEAN_MOVE */
  #define XLOG_HEAP2_CLEANUP_INFO 0x30
  #define XLOG_HEAP2_VISIBLE		0x40
+ #define XLOG_HEAP2_MULTI_INSERT	0x50
  
  /*
   * All what we need to find changed tuple
***************
*** 661,666 **** typedef struct xl_heap_insert
--- 662,697 ----
  
  #define SizeOfHeapInsert	(offsetof(xl_heap_insert, all_visible_cleared) + sizeof(bool))
  
+ /*
+  * This is what we need to know about a multi-insert. The record consists of
+  * xl_heap_multi_insert header, followed by a xl_multi_insert_tuple and tuple
+  * data for each tuple. 'offsets' array is omitted if the whole page is
+  * reinitialized (XLOG_HEAP_INIT_PAGE)
+  */
+ typedef struct xl_heap_multi_insert
+ {
+ 	RelFileNode node;
+ 	BlockNumber	blkno;
+ 	bool		all_visible_cleared;
+ 	uint16		ntuples;
+ 	OffsetNumber offsets[1];
+ 
+ 	/* TUPLE DATA (xl_multi_insert_tuples) FOLLOW AT END OF STRUCT */
+ } xl_heap_multi_insert;
+ 
+ #define SizeOfHeapMultiInsert	offsetof(xl_heap_multi_insert, offsets)
+ 
+ typedef struct xl_multi_insert_tuple
+ {
+ 	uint16		datalen;				/* size of tuple data that follows */
+ 	uint16		t_infomask2;
+ 	uint16		t_infomask;
+ 	uint8		t_hoff;
+ 	/* TUPLE DATA FOLLOWS AT END OF STRUCT */
+ } xl_multi_insert_tuple;
+ 
+ #define SizeOfMultiInsertTuple	(offsetof(xl_multi_insert_tuple, t_hoff) + sizeof(uint8))
+ 
  /* This is what we need to know about update|hot_update */
  typedef struct xl_heap_update
  {
*** a/src/include/pgstat.h
--- b/src/include/pgstat.h
***************
*** 766,772 **** extern void pgstat_initstats(Relation rel);
  			(rel)->pgstat_info->t_counts.t_blocks_hit++;			\
  	} while (0)
  
! extern void pgstat_count_heap_insert(Relation rel);
  extern void pgstat_count_heap_update(Relation rel, bool hot);
  extern void pgstat_count_heap_delete(Relation rel);
  extern void pgstat_update_heap_dead_tuples(Relation rel, int delta);
--- 766,772 ----
  			(rel)->pgstat_info->t_counts.t_blocks_hit++;			\
  	} while (0)
  
! extern void pgstat_count_heap_insert(Relation rel, int n);
  extern void pgstat_count_heap_update(Relation rel, bool hot);
  extern void pgstat_count_heap_delete(Relation rel);
  extern void pgstat_update_heap_dead_tuples(Relation rel, int delta);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to