On 2017/01/06 20:23, Amit Langote wrote:
> On 2017/01/05 3:26, Robert Haas wrote:
>> It's unclear to me why we need to do 0002.  It doesn't seem like it
>> should be necessary, it doesn't seem like a good idea, and the commit
>> message you proposed is uninformative.
>
> If a single BulkInsertState object is passed to
> heap_insert()/heap_multi_insert() for different heaps corresponding to
> different partitions (from one input tuple to next), tuples might end up
> going into wrong heaps (like demonstrated in one of the reports [1]).  A
> simple solution is to disable bulk-insert in case of partitioned tables.
>
> But my patch (or its motivations) was slightly wrongheaded, wherein I
> conflated multi-insert stuff and bulk-insert considerations.  I revised
> 0002 to not do that.

Ragnar Ouchterlony pointed out [1] on pgsql-bugs that 0002 wasn't correct.
Attaching updated 0002 along with rebased 0001 and 0003.

Thanks,
Amit

[1]
https://www.postgresql.org/message-id/732dfc84-25f5-413c-1eee-0bfa7a370093%40agama.tv
>From f2a64348021c7dba1f96d0c8b4e3e253f635b019 Mon Sep 17 00:00:00 2001
From: amit <amitlangot...@gmail.com>
Date: Wed, 28 Dec 2016 10:10:26 +0900
Subject: [PATCH 1/3] Set ecxt_scantuple correctly for tuple-routing

In 2ac3ef7a01df859c62d0a02333b646d65eaec5ff, we changed things so that
it's possible for a different TupleTableSlot to be used for partitioned
tables at successively lower levels.  If we do end up changing the slot
from the original, we must update ecxt_scantuple to point to the new one
for partition key of the tuple to be computed correctly.

Also update the regression tests so that the code manipulating
ecxt_scantuple is covered.

Reported by: Rajkumar Raghuwanshi
Patch by: Amit Langote
Reports: https://www.postgresql.org/message-id/CAKcux6%3Dm1qyqB2k6cjniuMMrYXb75O-MB4qGQMu8zg-iGGLjDw%40mail.gmail.com
---
 src/backend/catalog/partition.c      | 29 ++++++++++++++++++++++-------
 src/backend/executor/execMain.c      |  2 --
 src/test/regress/expected/insert.out |  2 +-
 src/test/regress/sql/insert.sql      |  2 +-
 4 files changed, 24 insertions(+), 11 deletions(-)

diff --git a/src/backend/catalog/partition.c b/src/backend/catalog/partition.c
index f54e1bdf3f..0de1cf245a 100644
--- a/src/backend/catalog/partition.c
+++ b/src/backend/catalog/partition.c
@@ -1643,7 +1643,10 @@ get_partition_for_tuple(PartitionDispatch *pd,
 	bool		isnull[PARTITION_MAX_KEYS];
 	int			cur_offset,
 				cur_index;
-	int			i;
+	int			i,
+				result;
+	ExprContext *ecxt = GetPerTupleExprContext(estate);
+	TupleTableSlot *ecxt_scantuple_old = ecxt->ecxt_scantuple;
 
 	/* start with the root partitioned table */
 	parent = pd[0];
@@ -1672,7 +1675,14 @@ get_partition_for_tuple(PartitionDispatch *pd,
 			slot = myslot;
 		}
 
-		/* Extract partition key from tuple */
+		/*
+		 * Extract partition key from tuple; FormPartitionKeyDatum() expects
+		 * ecxt_scantuple to point to the correct tuple slot (which might be
+		 * different from the slot we received from the caller if the
+		 * partitioned table of the current level has different tuple
+		 * descriptor from its parent).
+		 */
+		ecxt->ecxt_scantuple = slot;
 		FormPartitionKeyDatum(parent, slot, estate, values, isnull);
 
 		if (key->strategy == PARTITION_STRATEGY_RANGE)
@@ -1727,16 +1737,21 @@ get_partition_for_tuple(PartitionDispatch *pd,
 		 */
 		if (cur_index < 0)
 		{
+			result = -1;
 			*failed_at = RelationGetRelid(parent->reldesc);
-			return -1;
+			break;
 		}
-		else if (parent->indexes[cur_index] < 0)
-			parent = pd[-parent->indexes[cur_index]];
-		else
+		else if (parent->indexes[cur_index] >= 0)
+		{
+			result = parent->indexes[cur_index];
 			break;
+		}
+		else
+			parent = pd[-parent->indexes[cur_index]];
 	}
 
-	return parent->indexes[cur_index];
+	ecxt->ecxt_scantuple = ecxt_scantuple_old;
+	return result;
 }
 
 /*
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index ff277d300a..6a9bc8372f 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -3167,9 +3167,7 @@ ExecFindPartition(ResultRelInfo *resultRelInfo, PartitionDispatch *pd,
 {
 	int		result;
 	Oid		failed_at;
-	ExprContext *econtext = GetPerTupleExprContext(estate);
 
-	econtext->ecxt_scantuple = slot;
 	result = get_partition_for_tuple(pd, slot, estate, &failed_at);
 	if (result < 0)
 	{
diff --git a/src/test/regress/expected/insert.out b/src/test/regress/expected/insert.out
index ca3134c34c..1c7b8047ee 100644
--- a/src/test/regress/expected/insert.out
+++ b/src/test/regress/expected/insert.out
@@ -302,7 +302,7 @@ drop cascades to table part_ee_ff1
 drop cascades to table part_ee_ff2
 -- more tests for certain multi-level partitioning scenarios
 create table p (a int, b int) partition by range (a, b);
-create table p1 (b int, a int not null) partition by range (b);
+create table p1 (b int not null, a int not null) partition by range ((b+0));
 create table p11 (like p1);
 alter table p11 drop a;
 alter table p11 add a int;
diff --git a/src/test/regress/sql/insert.sql b/src/test/regress/sql/insert.sql
index 09c9879da1..c25dc14575 100644
--- a/src/test/regress/sql/insert.sql
+++ b/src/test/regress/sql/insert.sql
@@ -173,7 +173,7 @@ drop table list_parted cascade;
 
 -- more tests for certain multi-level partitioning scenarios
 create table p (a int, b int) partition by range (a, b);
-create table p1 (b int, a int not null) partition by range (b);
+create table p1 (b int not null, a int not null) partition by range ((b+0));
 create table p11 (like p1);
 alter table p11 drop a;
 alter table p11 add a int;
-- 
2.11.0

>From 6ac55787450862284556484acebb7346bedf751b Mon Sep 17 00:00:00 2001
From: amit <amitlangot...@gmail.com>
Date: Wed, 28 Dec 2016 10:28:37 +0900
Subject: [PATCH 2/3] No multi-insert and bulk-insert when COPYing into
 partitioned tables
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

We might consider alleviating this restriction by allocating a
BulkInsertState object and managing tuple-buffering per partition.

Reported by: 高增琦, Venkata B Nagothi, Ragnar Ouchterlony
Patch by: Amit Langote (with pointers from 高增琦)
Reports: https://www.postgresql.org/message-id/CAFmBtr32FDOqofo8yG-4mjzL1HnYHxXK5S9OGFJ%3D%3DcJpgEW4vA%40mail.gmail.com
         https://www.postgresql.org/message-id/CAEyp7J9WiX0L3DoiNcRrY-9iyw%3DqP%2Bj%3DDLsAnNFF1xT2J1ggfQ%40mail.gmail.com
         https://www.postgresql.org/message-id/16d73804-c9cd-14c5-463e-5caad563ff77%40agama.tv
---
 src/backend/commands/copy.c | 18 +++++++++++++-----
 1 file changed, 13 insertions(+), 5 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index f56b2ac49b..9624c93f6b 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -2296,7 +2296,7 @@ CopyFrom(CopyState cstate)
 	ErrorContextCallback errcallback;
 	CommandId	mycid = GetCurrentCommandId(true);
 	int			hi_options = 0; /* start with default heap_insert options */
-	BulkInsertState bistate;
+	BulkInsertState bistate = NULL;
 	uint64		processed = 0;
 	bool		useHeapMultiInsert;
 	int			nBufferedTuples = 0;
@@ -2455,8 +2455,8 @@ CopyFrom(CopyState cstate)
 	if ((resultRelInfo->ri_TrigDesc != NULL &&
 		 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
 		  resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
-		cstate->partition_dispatch_info != NULL ||
-		cstate->volatile_defexprs)
+		cstate->volatile_defexprs ||
+		cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 	{
 		useHeapMultiInsert = false;
 	}
@@ -2480,7 +2480,14 @@ CopyFrom(CopyState cstate)
 	values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
 	nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
 
-	bistate = GetBulkInsertState();
+	/*
+	 * FIXME: We don't engage the bulk-insert mode for partitioned tables,
+	 * because the the heap relation is most likely change from one row to
+	 * next due to tuple-routing.
+	 */
+	if (cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+		bistate = GetBulkInsertState();
+
 	econtext = GetPerTupleExprContext(estate);
 
 	/* Set up callback to identify error line number */
@@ -2701,7 +2708,8 @@ CopyFrom(CopyState cstate)
 	/* Done, clean up */
 	error_context_stack = errcallback.previous;
 
-	FreeBulkInsertState(bistate);
+	if (bistate != NULL)
+		FreeBulkInsertState(bistate);
 
 	MemoryContextSwitchTo(oldcontext);
 
-- 
2.11.0

>From d9ad05ea0197b0d79e281cf2c4a366d596085077 Mon Sep 17 00:00:00 2001
From: amit <amitlangot...@gmail.com>
Date: Fri, 6 Jan 2017 10:28:04 +0900
Subject: [PATCH 3/3] Support bulk-insert mode for partitioned tables

Currently, the heap layer (hio.c) supports a bulk-insert mode, which
is currently used by certain callers in copy.c, createas.c, etc.
Callers must pass a BulkInsertState object down to heapam routines
like heap_insert() or heap_multi_insert() along with the input row(s)
to engage this mode.

A single BulkInsertState object is good only for a given heap relation.
In case of a partitioned table, successive input rows may be mapped to
different partitions, so different heap relations.  We must use a separate
BulkInsertState object for each partition and switch to the same every
time a given partition is selected.

Also, if we are able to use multi-insert mode in CopyFrom() and hence
will buffer tuples, we must maintain separate buffer spaces and buffered
tuples counts for every partition.  Although, maximum limits on the
number of buffered tuples and buffered tuple size (across partitions)
are still the old compile-time constants, not scaled based on, say,
number of partitions.  It might be possible to raise that limit so that
enough tuples are buffered per partition in the worst case that input
tuples are randomly ordered.
---
 src/backend/commands/copy.c | 168 ++++++++++++++++++++++++++++----------------
 1 file changed, 109 insertions(+), 59 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 9624c93f6b..c4397b4f78 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -2296,15 +2296,20 @@ CopyFrom(CopyState cstate)
 	ErrorContextCallback errcallback;
 	CommandId	mycid = GetCurrentCommandId(true);
 	int			hi_options = 0; /* start with default heap_insert options */
-	BulkInsertState bistate = NULL;
 	uint64		processed = 0;
-	bool		useHeapMultiInsert;
-	int			nBufferedTuples = 0;
 
-#define MAX_BUFFERED_TUPLES 1000
-	HeapTuple  *bufferedTuples = NULL;	/* initialize to silence warning */
-	Size		bufferedTuplesSize = 0;
-	int			firstBufferedLineNo = 0;
+#define MAX_BUFFERED_TUPLES			1000
+#define MAX_BUFFERED_TUPLES_SIZE	65535
+	int		num_heaps;
+	bool   *useHeapMultiInsert = NULL;
+	BulkInsertState *bistate = NULL;
+	HeapTuple **bufferedTuples = NULL;	/* initialize to silence warning */
+	Size	   *bufferedTuplesSize = NULL;
+	int		   *firstBufferedLineNo = NULL;
+	int		   *nBufferedTuples = NULL;
+	Size		bufferedTuplesSize_total = 0;
+	int			nBufferedTuples_total = 0;
+	int			i;
 
 	Assert(cstate->rel);
 
@@ -2449,21 +2454,44 @@ CopyFrom(CopyState cstate)
 	 * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
 	 * expressions. Such triggers or expressions might query the table we're
 	 * inserting to, and act differently if the tuples that have already been
-	 * processed and prepared for insertion are not there.  We also can't do
-	 * it if the table is partitioned.
+	 * processed and prepared for insertion are not there.
+	 *
+	 * In case of a regular table there is only one heap, whereas in case of
+	 * a partitioned table, there are as many heaps as there are partitions.
+	 * We must manage buffered tuples separately for each heap.
 	 */
-	if ((resultRelInfo->ri_TrigDesc != NULL &&
-		 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
-		  resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
-		cstate->volatile_defexprs ||
-		cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
-	{
-		useHeapMultiInsert = false;
-	}
-	else
+	num_heaps = cstate->num_partitions > 0 ? cstate->num_partitions : 1;
+
+	bufferedTuples = (HeapTuple **) palloc0(num_heaps * sizeof(HeapTuple *));
+	useHeapMultiInsert = (bool *) palloc(num_heaps * sizeof(bool));
+	nBufferedTuples = (int *) palloc0(num_heaps * sizeof(int));
+	bufferedTuplesSize = (Size *) palloc0(num_heaps * sizeof(Size));
+	firstBufferedLineNo = (int *) palloc0(num_heaps * sizeof(int));
+
+	/* Also, maintain separate bulk-insert state for every heap */
+	bistate = (BulkInsertState *) palloc(num_heaps * sizeof(BulkInsertState));
+
+	for (i = 0; i < num_heaps; i++)
 	{
-		useHeapMultiInsert = true;
-		bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
+		/*
+		 * In case of a partitioned table, we check the individual partition's
+		 * TriggerDesc and not the root parent table's.
+		 */
+		ResultRelInfo *cur_rel = cstate->partitions ? cstate->partitions + i
+													: resultRelInfo;
+
+		if ((cur_rel->ri_TrigDesc != NULL &&
+			(cur_rel->ri_TrigDesc->trig_insert_before_row ||
+			 cur_rel->ri_TrigDesc->trig_insert_instead_row)) ||
+			cstate->volatile_defexprs)
+			useHeapMultiInsert[i] = false;
+		else
+			useHeapMultiInsert[i] = true;
+
+		if (useHeapMultiInsert[i])
+			bufferedTuples[i] = palloc(MAX_BUFFERED_TUPLES *
+									   sizeof(HeapTuple));
+		bistate[i] = GetBulkInsertState();
 	}
 
 	/* Prepare to catch AFTER triggers. */
@@ -2480,14 +2508,6 @@ CopyFrom(CopyState cstate)
 	values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
 	nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
 
-	/*
-	 * FIXME: We don't engage the bulk-insert mode for partitioned tables,
-	 * because the the heap relation is most likely change from one row to
-	 * next due to tuple-routing.
-	 */
-	if (cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-		bistate = GetBulkInsertState();
-
 	econtext = GetPerTupleExprContext(estate);
 
 	/* Set up callback to identify error line number */
@@ -2502,15 +2522,16 @@ CopyFrom(CopyState cstate)
 					   *oldslot;
 		bool		skip_tuple;
 		Oid			loaded_oid = InvalidOid;
+		int			cur_heap = 0;
 
 		CHECK_FOR_INTERRUPTS();
 
-		if (nBufferedTuples == 0)
+		if (nBufferedTuples_total == 0)
 		{
 			/*
 			 * Reset the per-tuple exprcontext. We can only do this if the
-			 * tuple buffer is empty. (Calling the context the per-tuple
-			 * memory context is a bit of a misnomer now.)
+			 * there are no buffered tuples. (Calling the context the
+			 * per-tuple memory context is a bit of a misnomer now.)
 			 */
 			ResetPerTupleExprContext(estate);
 		}
@@ -2561,6 +2582,7 @@ CopyFrom(CopyState cstate)
 												estate);
 			Assert(leaf_part_index >= 0 &&
 				   leaf_part_index < cstate->num_partitions);
+			cur_heap = leaf_part_index;
 
 			/*
 			 * Save the old ResultRelInfo and switch to the one corresponding
@@ -2589,7 +2611,13 @@ CopyFrom(CopyState cstate)
 			{
 				Relation	partrel = resultRelInfo->ri_RelationDesc;
 
+				/*
+				 * Allocate memory for the converted tuple in the per-tuple
+				 * context just like the original tuple.
+				 */
+				MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 				tuple = do_convert_tuple(tuple, map);
+				MemoryContextSwitchTo(oldcontext);
 
 				/*
 				 * We must use the partition's tuple descriptor from this
@@ -2599,7 +2627,7 @@ CopyFrom(CopyState cstate)
 				slot = cstate->partition_tuple_slot;
 				Assert(slot != NULL);
 				ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
-				ExecStoreTuple(tuple, slot, InvalidBuffer, true);
+				ExecStoreTuple(tuple, slot, InvalidBuffer, false);
 			}
 
 			tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
@@ -2634,29 +2662,47 @@ CopyFrom(CopyState cstate)
 					resultRelInfo->ri_PartitionCheck)
 					ExecConstraints(resultRelInfo, slot, oldslot, estate);
 
-				if (useHeapMultiInsert)
+				if (useHeapMultiInsert[cur_heap])
 				{
-					/* Add this tuple to the tuple buffer */
-					if (nBufferedTuples == 0)
-						firstBufferedLineNo = cstate->cur_lineno;
-					bufferedTuples[nBufferedTuples++] = tuple;
-					bufferedTuplesSize += tuple->t_len;
+					/* Add this tuple to the corresponding tuple buffer */
+					if (nBufferedTuples[cur_heap] == 0)
+						firstBufferedLineNo[cur_heap] = cstate->cur_lineno;
+					bufferedTuples[cur_heap][nBufferedTuples[cur_heap]++] =
+																	tuple;
+					bufferedTuplesSize[cur_heap] += tuple->t_len;
+
+					/* Count the current tuple toward the totals */
+					nBufferedTuples_total += nBufferedTuples[cur_heap];
+					bufferedTuplesSize_total += bufferedTuplesSize[cur_heap];
 
 					/*
-					 * If the buffer filled up, flush it.  Also flush if the
-					 * total size of all the tuples in the buffer becomes
-					 * large, to avoid using large amounts of memory for the
-					 * buffer when the tuples are exceptionally wide.
+					 * If enough tuples are buffered, flush them from the
+					 * individual buffers. Also flush if the total size of
+					 * all the buffered tuples becomes large, to avoid using
+					 * large amounts of buffer memory when the tuples are
+					 * exceptionally wide.
 					 */
-					if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
-						bufferedTuplesSize > 65535)
+					if (nBufferedTuples_total == MAX_BUFFERED_TUPLES ||
+						bufferedTuplesSize_total > MAX_BUFFERED_TUPLES_SIZE)
 					{
-						CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-											resultRelInfo, myslot, bistate,
-											nBufferedTuples, bufferedTuples,
-											firstBufferedLineNo);
-						nBufferedTuples = 0;
-						bufferedTuplesSize = 0;
+						for (i = 0; i < num_heaps; i++)
+						{
+							ResultRelInfo *cur_rel = cstate->partitions
+												   ? cstate->partitions + i
+												   : resultRelInfo;
+
+							CopyFromInsertBatch(cstate, estate, mycid,
+												hi_options, cur_rel,
+												myslot, bistate[i],
+												nBufferedTuples[i],
+												bufferedTuples[i],
+												firstBufferedLineNo[i]);
+							nBufferedTuples[i] = 0;
+							bufferedTuplesSize[i] = 0;
+						}
+
+						nBufferedTuples_total = 0;
+						bufferedTuplesSize_total = 0;
 					}
 				}
 				else
@@ -2665,7 +2711,7 @@ CopyFrom(CopyState cstate)
 
 					/* OK, store the tuple and create index entries for it */
 					heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
-								hi_options, bistate);
+								hi_options, bistate[cur_heap]);
 
 					if (resultRelInfo->ri_NumIndices > 0)
 						recheckIndexes = ExecInsertIndexTuples(slot,
@@ -2699,18 +2745,22 @@ CopyFrom(CopyState cstate)
 	}
 
 	/* Flush any remaining buffered tuples */
-	if (nBufferedTuples > 0)
+	for (i = 0; i < num_heaps; i++)
+	{
+		ResultRelInfo *cur_rel = cstate->partitions ? cstate->partitions + i
+													: resultRelInfo;
+
 		CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-							resultRelInfo, myslot, bistate,
-							nBufferedTuples, bufferedTuples,
-							firstBufferedLineNo);
+							cur_rel, myslot, bistate[i],
+							nBufferedTuples[i], bufferedTuples[i],
+							firstBufferedLineNo[i]);
+
+		FreeBulkInsertState(bistate[i]);
+	}
 
 	/* Done, clean up */
 	error_context_stack = errcallback.previous;
 
-	if (bistate != NULL)
-		FreeBulkInsertState(bistate);
-
 	MemoryContextSwitchTo(oldcontext);
 
 	/*
@@ -2803,7 +2853,7 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 	 * before calling it.
 	 */
 	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	heap_multi_insert(cstate->rel,
+	heap_multi_insert(resultRelInfo->ri_RelationDesc,
 					  bufferedTuples,
 					  nBufferedTuples,
 					  mycid,
-- 
2.11.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to