On 2017/01/06 20:23, Amit Langote wrote: > On 2017/01/05 3:26, Robert Haas wrote: >> It's unclear to me why we need to do 0002. It doesn't seem like it >> should be necessary, it doesn't seem like a good idea, and the commit >> message you proposed is uninformative. > > If a single BulkInsertState object is passed to > heap_insert()/heap_multi_insert() for different heaps corresponding to > different partitions (from one input tuple to next), tuples might end up > going into wrong heaps (like demonstrated in one of the reports [1]). A > simple solution is to disable bulk-insert in case of partitioned tables. > > But my patch (or its motivations) was slightly wrongheaded, wherein I > conflated multi-insert stuff and bulk-insert considerations. I revised > 0002 to not do that.
Ragnar Ouchterlony pointed out [1] on pgsql-bugs that 0002 wasn't correct. Attaching updated 0002 along with rebased 0001 and 0003. Thanks, Amit [1] https://www.postgresql.org/message-id/732dfc84-25f5-413c-1eee-0bfa7a370093%40agama.tv
>From f2a64348021c7dba1f96d0c8b4e3e253f635b019 Mon Sep 17 00:00:00 2001 From: amit <amitlangot...@gmail.com> Date: Wed, 28 Dec 2016 10:10:26 +0900 Subject: [PATCH 1/3] Set ecxt_scantuple correctly for tuple-routing In 2ac3ef7a01df859c62d0a02333b646d65eaec5ff, we changed things so that it's possible for a different TupleTableSlot to be used for partitioned tables at successively lower levels. If we do end up changing the slot from the original, we must update ecxt_scantuple to point to the new one for partition key of the tuple to be computed correctly. Also update the regression tests so that the code manipulating ecxt_scantuple is covered. Reported by: Rajkumar Raghuwanshi Patch by: Amit Langote Reports: https://www.postgresql.org/message-id/CAKcux6%3Dm1qyqB2k6cjniuMMrYXb75O-MB4qGQMu8zg-iGGLjDw%40mail.gmail.com --- src/backend/catalog/partition.c | 29 ++++++++++++++++++++++------- src/backend/executor/execMain.c | 2 -- src/test/regress/expected/insert.out | 2 +- src/test/regress/sql/insert.sql | 2 +- 4 files changed, 24 insertions(+), 11 deletions(-) diff --git a/src/backend/catalog/partition.c b/src/backend/catalog/partition.c index f54e1bdf3f..0de1cf245a 100644 --- a/src/backend/catalog/partition.c +++ b/src/backend/catalog/partition.c @@ -1643,7 +1643,10 @@ get_partition_for_tuple(PartitionDispatch *pd, bool isnull[PARTITION_MAX_KEYS]; int cur_offset, cur_index; - int i; + int i, + result; + ExprContext *ecxt = GetPerTupleExprContext(estate); + TupleTableSlot *ecxt_scantuple_old = ecxt->ecxt_scantuple; /* start with the root partitioned table */ parent = pd[0]; @@ -1672,7 +1675,14 @@ get_partition_for_tuple(PartitionDispatch *pd, slot = myslot; } - /* Extract partition key from tuple */ + /* + * Extract partition key from tuple; FormPartitionKeyDatum() expects + * ecxt_scantuple to point to the correct tuple slot (which might be + * different from the slot we received from the caller if the + * partitioned table of the current level has different tuple + * descriptor from its parent). + */ + ecxt->ecxt_scantuple = slot; FormPartitionKeyDatum(parent, slot, estate, values, isnull); if (key->strategy == PARTITION_STRATEGY_RANGE) @@ -1727,16 +1737,21 @@ get_partition_for_tuple(PartitionDispatch *pd, */ if (cur_index < 0) { + result = -1; *failed_at = RelationGetRelid(parent->reldesc); - return -1; + break; } - else if (parent->indexes[cur_index] < 0) - parent = pd[-parent->indexes[cur_index]]; - else + else if (parent->indexes[cur_index] >= 0) + { + result = parent->indexes[cur_index]; break; + } + else + parent = pd[-parent->indexes[cur_index]]; } - return parent->indexes[cur_index]; + ecxt->ecxt_scantuple = ecxt_scantuple_old; + return result; } /* diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index ff277d300a..6a9bc8372f 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -3167,9 +3167,7 @@ ExecFindPartition(ResultRelInfo *resultRelInfo, PartitionDispatch *pd, { int result; Oid failed_at; - ExprContext *econtext = GetPerTupleExprContext(estate); - econtext->ecxt_scantuple = slot; result = get_partition_for_tuple(pd, slot, estate, &failed_at); if (result < 0) { diff --git a/src/test/regress/expected/insert.out b/src/test/regress/expected/insert.out index ca3134c34c..1c7b8047ee 100644 --- a/src/test/regress/expected/insert.out +++ b/src/test/regress/expected/insert.out @@ -302,7 +302,7 @@ drop cascades to table part_ee_ff1 drop cascades to table part_ee_ff2 -- more tests for certain multi-level partitioning scenarios create table p (a int, b int) partition by range (a, b); -create table p1 (b int, a int not null) partition by range (b); +create table p1 (b int not null, a int not null) partition by range ((b+0)); create table p11 (like p1); alter table p11 drop a; alter table p11 add a int; diff --git a/src/test/regress/sql/insert.sql b/src/test/regress/sql/insert.sql index 09c9879da1..c25dc14575 100644 --- a/src/test/regress/sql/insert.sql +++ b/src/test/regress/sql/insert.sql @@ -173,7 +173,7 @@ drop table list_parted cascade; -- more tests for certain multi-level partitioning scenarios create table p (a int, b int) partition by range (a, b); -create table p1 (b int, a int not null) partition by range (b); +create table p1 (b int not null, a int not null) partition by range ((b+0)); create table p11 (like p1); alter table p11 drop a; alter table p11 add a int; -- 2.11.0
>From 6ac55787450862284556484acebb7346bedf751b Mon Sep 17 00:00:00 2001 From: amit <amitlangot...@gmail.com> Date: Wed, 28 Dec 2016 10:28:37 +0900 Subject: [PATCH 2/3] No multi-insert and bulk-insert when COPYing into partitioned tables MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We might consider alleviating this restriction by allocating a BulkInsertState object and managing tuple-buffering per partition. Reported by: é«å¢ç¦, Venkata B Nagothi, Ragnar Ouchterlony Patch by: Amit Langote (with pointers from é«å¢ç¦) Reports: https://www.postgresql.org/message-id/CAFmBtr32FDOqofo8yG-4mjzL1HnYHxXK5S9OGFJ%3D%3DcJpgEW4vA%40mail.gmail.com https://www.postgresql.org/message-id/CAEyp7J9WiX0L3DoiNcRrY-9iyw%3DqP%2Bj%3DDLsAnNFF1xT2J1ggfQ%40mail.gmail.com https://www.postgresql.org/message-id/16d73804-c9cd-14c5-463e-5caad563ff77%40agama.tv --- src/backend/commands/copy.c | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index f56b2ac49b..9624c93f6b 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -2296,7 +2296,7 @@ CopyFrom(CopyState cstate) ErrorContextCallback errcallback; CommandId mycid = GetCurrentCommandId(true); int hi_options = 0; /* start with default heap_insert options */ - BulkInsertState bistate; + BulkInsertState bistate = NULL; uint64 processed = 0; bool useHeapMultiInsert; int nBufferedTuples = 0; @@ -2455,8 +2455,8 @@ CopyFrom(CopyState cstate) if ((resultRelInfo->ri_TrigDesc != NULL && (resultRelInfo->ri_TrigDesc->trig_insert_before_row || resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) || - cstate->partition_dispatch_info != NULL || - cstate->volatile_defexprs) + cstate->volatile_defexprs || + cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { useHeapMultiInsert = false; } @@ -2480,7 +2480,14 @@ CopyFrom(CopyState cstate) values = (Datum *) palloc(tupDesc->natts * sizeof(Datum)); nulls = (bool *) palloc(tupDesc->natts * sizeof(bool)); - bistate = GetBulkInsertState(); + /* + * FIXME: We don't engage the bulk-insert mode for partitioned tables, + * because the the heap relation is most likely change from one row to + * next due to tuple-routing. + */ + if (cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) + bistate = GetBulkInsertState(); + econtext = GetPerTupleExprContext(estate); /* Set up callback to identify error line number */ @@ -2701,7 +2708,8 @@ CopyFrom(CopyState cstate) /* Done, clean up */ error_context_stack = errcallback.previous; - FreeBulkInsertState(bistate); + if (bistate != NULL) + FreeBulkInsertState(bistate); MemoryContextSwitchTo(oldcontext); -- 2.11.0
>From d9ad05ea0197b0d79e281cf2c4a366d596085077 Mon Sep 17 00:00:00 2001 From: amit <amitlangot...@gmail.com> Date: Fri, 6 Jan 2017 10:28:04 +0900 Subject: [PATCH 3/3] Support bulk-insert mode for partitioned tables Currently, the heap layer (hio.c) supports a bulk-insert mode, which is currently used by certain callers in copy.c, createas.c, etc. Callers must pass a BulkInsertState object down to heapam routines like heap_insert() or heap_multi_insert() along with the input row(s) to engage this mode. A single BulkInsertState object is good only for a given heap relation. In case of a partitioned table, successive input rows may be mapped to different partitions, so different heap relations. We must use a separate BulkInsertState object for each partition and switch to the same every time a given partition is selected. Also, if we are able to use multi-insert mode in CopyFrom() and hence will buffer tuples, we must maintain separate buffer spaces and buffered tuples counts for every partition. Although, maximum limits on the number of buffered tuples and buffered tuple size (across partitions) are still the old compile-time constants, not scaled based on, say, number of partitions. It might be possible to raise that limit so that enough tuples are buffered per partition in the worst case that input tuples are randomly ordered. --- src/backend/commands/copy.c | 168 ++++++++++++++++++++++++++++---------------- 1 file changed, 109 insertions(+), 59 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 9624c93f6b..c4397b4f78 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -2296,15 +2296,20 @@ CopyFrom(CopyState cstate) ErrorContextCallback errcallback; CommandId mycid = GetCurrentCommandId(true); int hi_options = 0; /* start with default heap_insert options */ - BulkInsertState bistate = NULL; uint64 processed = 0; - bool useHeapMultiInsert; - int nBufferedTuples = 0; -#define MAX_BUFFERED_TUPLES 1000 - HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */ - Size bufferedTuplesSize = 0; - int firstBufferedLineNo = 0; +#define MAX_BUFFERED_TUPLES 1000 +#define MAX_BUFFERED_TUPLES_SIZE 65535 + int num_heaps; + bool *useHeapMultiInsert = NULL; + BulkInsertState *bistate = NULL; + HeapTuple **bufferedTuples = NULL; /* initialize to silence warning */ + Size *bufferedTuplesSize = NULL; + int *firstBufferedLineNo = NULL; + int *nBufferedTuples = NULL; + Size bufferedTuplesSize_total = 0; + int nBufferedTuples_total = 0; + int i; Assert(cstate->rel); @@ -2449,21 +2454,44 @@ CopyFrom(CopyState cstate) * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default * expressions. Such triggers or expressions might query the table we're * inserting to, and act differently if the tuples that have already been - * processed and prepared for insertion are not there. We also can't do - * it if the table is partitioned. + * processed and prepared for insertion are not there. + * + * In case of a regular table there is only one heap, whereas in case of + * a partitioned table, there are as many heaps as there are partitions. + * We must manage buffered tuples separately for each heap. */ - if ((resultRelInfo->ri_TrigDesc != NULL && - (resultRelInfo->ri_TrigDesc->trig_insert_before_row || - resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) || - cstate->volatile_defexprs || - cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - { - useHeapMultiInsert = false; - } - else + num_heaps = cstate->num_partitions > 0 ? cstate->num_partitions : 1; + + bufferedTuples = (HeapTuple **) palloc0(num_heaps * sizeof(HeapTuple *)); + useHeapMultiInsert = (bool *) palloc(num_heaps * sizeof(bool)); + nBufferedTuples = (int *) palloc0(num_heaps * sizeof(int)); + bufferedTuplesSize = (Size *) palloc0(num_heaps * sizeof(Size)); + firstBufferedLineNo = (int *) palloc0(num_heaps * sizeof(int)); + + /* Also, maintain separate bulk-insert state for every heap */ + bistate = (BulkInsertState *) palloc(num_heaps * sizeof(BulkInsertState)); + + for (i = 0; i < num_heaps; i++) { - useHeapMultiInsert = true; - bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple)); + /* + * In case of a partitioned table, we check the individual partition's + * TriggerDesc and not the root parent table's. + */ + ResultRelInfo *cur_rel = cstate->partitions ? cstate->partitions + i + : resultRelInfo; + + if ((cur_rel->ri_TrigDesc != NULL && + (cur_rel->ri_TrigDesc->trig_insert_before_row || + cur_rel->ri_TrigDesc->trig_insert_instead_row)) || + cstate->volatile_defexprs) + useHeapMultiInsert[i] = false; + else + useHeapMultiInsert[i] = true; + + if (useHeapMultiInsert[i]) + bufferedTuples[i] = palloc(MAX_BUFFERED_TUPLES * + sizeof(HeapTuple)); + bistate[i] = GetBulkInsertState(); } /* Prepare to catch AFTER triggers. */ @@ -2480,14 +2508,6 @@ CopyFrom(CopyState cstate) values = (Datum *) palloc(tupDesc->natts * sizeof(Datum)); nulls = (bool *) palloc(tupDesc->natts * sizeof(bool)); - /* - * FIXME: We don't engage the bulk-insert mode for partitioned tables, - * because the the heap relation is most likely change from one row to - * next due to tuple-routing. - */ - if (cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) - bistate = GetBulkInsertState(); - econtext = GetPerTupleExprContext(estate); /* Set up callback to identify error line number */ @@ -2502,15 +2522,16 @@ CopyFrom(CopyState cstate) *oldslot; bool skip_tuple; Oid loaded_oid = InvalidOid; + int cur_heap = 0; CHECK_FOR_INTERRUPTS(); - if (nBufferedTuples == 0) + if (nBufferedTuples_total == 0) { /* * Reset the per-tuple exprcontext. We can only do this if the - * tuple buffer is empty. (Calling the context the per-tuple - * memory context is a bit of a misnomer now.) + * there are no buffered tuples. (Calling the context the + * per-tuple memory context is a bit of a misnomer now.) */ ResetPerTupleExprContext(estate); } @@ -2561,6 +2582,7 @@ CopyFrom(CopyState cstate) estate); Assert(leaf_part_index >= 0 && leaf_part_index < cstate->num_partitions); + cur_heap = leaf_part_index; /* * Save the old ResultRelInfo and switch to the one corresponding @@ -2589,7 +2611,13 @@ CopyFrom(CopyState cstate) { Relation partrel = resultRelInfo->ri_RelationDesc; + /* + * Allocate memory for the converted tuple in the per-tuple + * context just like the original tuple. + */ + MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); tuple = do_convert_tuple(tuple, map); + MemoryContextSwitchTo(oldcontext); /* * We must use the partition's tuple descriptor from this @@ -2599,7 +2627,7 @@ CopyFrom(CopyState cstate) slot = cstate->partition_tuple_slot; Assert(slot != NULL); ExecSetSlotDescriptor(slot, RelationGetDescr(partrel)); - ExecStoreTuple(tuple, slot, InvalidBuffer, true); + ExecStoreTuple(tuple, slot, InvalidBuffer, false); } tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); @@ -2634,29 +2662,47 @@ CopyFrom(CopyState cstate) resultRelInfo->ri_PartitionCheck) ExecConstraints(resultRelInfo, slot, oldslot, estate); - if (useHeapMultiInsert) + if (useHeapMultiInsert[cur_heap]) { - /* Add this tuple to the tuple buffer */ - if (nBufferedTuples == 0) - firstBufferedLineNo = cstate->cur_lineno; - bufferedTuples[nBufferedTuples++] = tuple; - bufferedTuplesSize += tuple->t_len; + /* Add this tuple to the corresponding tuple buffer */ + if (nBufferedTuples[cur_heap] == 0) + firstBufferedLineNo[cur_heap] = cstate->cur_lineno; + bufferedTuples[cur_heap][nBufferedTuples[cur_heap]++] = + tuple; + bufferedTuplesSize[cur_heap] += tuple->t_len; + + /* Count the current tuple toward the totals */ + nBufferedTuples_total += nBufferedTuples[cur_heap]; + bufferedTuplesSize_total += bufferedTuplesSize[cur_heap]; /* - * If the buffer filled up, flush it. Also flush if the - * total size of all the tuples in the buffer becomes - * large, to avoid using large amounts of memory for the - * buffer when the tuples are exceptionally wide. + * If enough tuples are buffered, flush them from the + * individual buffers. Also flush if the total size of + * all the buffered tuples becomes large, to avoid using + * large amounts of buffer memory when the tuples are + * exceptionally wide. */ - if (nBufferedTuples == MAX_BUFFERED_TUPLES || - bufferedTuplesSize > 65535) + if (nBufferedTuples_total == MAX_BUFFERED_TUPLES || + bufferedTuplesSize_total > MAX_BUFFERED_TUPLES_SIZE) { - CopyFromInsertBatch(cstate, estate, mycid, hi_options, - resultRelInfo, myslot, bistate, - nBufferedTuples, bufferedTuples, - firstBufferedLineNo); - nBufferedTuples = 0; - bufferedTuplesSize = 0; + for (i = 0; i < num_heaps; i++) + { + ResultRelInfo *cur_rel = cstate->partitions + ? cstate->partitions + i + : resultRelInfo; + + CopyFromInsertBatch(cstate, estate, mycid, + hi_options, cur_rel, + myslot, bistate[i], + nBufferedTuples[i], + bufferedTuples[i], + firstBufferedLineNo[i]); + nBufferedTuples[i] = 0; + bufferedTuplesSize[i] = 0; + } + + nBufferedTuples_total = 0; + bufferedTuplesSize_total = 0; } } else @@ -2665,7 +2711,7 @@ CopyFrom(CopyState cstate) /* OK, store the tuple and create index entries for it */ heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid, - hi_options, bistate); + hi_options, bistate[cur_heap]); if (resultRelInfo->ri_NumIndices > 0) recheckIndexes = ExecInsertIndexTuples(slot, @@ -2699,18 +2745,22 @@ CopyFrom(CopyState cstate) } /* Flush any remaining buffered tuples */ - if (nBufferedTuples > 0) + for (i = 0; i < num_heaps; i++) + { + ResultRelInfo *cur_rel = cstate->partitions ? cstate->partitions + i + : resultRelInfo; + CopyFromInsertBatch(cstate, estate, mycid, hi_options, - resultRelInfo, myslot, bistate, - nBufferedTuples, bufferedTuples, - firstBufferedLineNo); + cur_rel, myslot, bistate[i], + nBufferedTuples[i], bufferedTuples[i], + firstBufferedLineNo[i]); + + FreeBulkInsertState(bistate[i]); + } /* Done, clean up */ error_context_stack = errcallback.previous; - if (bistate != NULL) - FreeBulkInsertState(bistate); - MemoryContextSwitchTo(oldcontext); /* @@ -2803,7 +2853,7 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, * before calling it. */ oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - heap_multi_insert(cstate->rel, + heap_multi_insert(resultRelInfo->ri_RelationDesc, bufferedTuples, nBufferedTuples, mycid, -- 2.11.0
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers