Hi. The attached patch should address most, if not all, of the issues you both raised.
As explained in [1], we can export ExecInsert to let it perform the main insertion work. To allow ExecInsert to handle the remaining tasks, we need to carefuly manage the lifecycle of constructed CopyFromStateData->ModifyTableContext (including ModifyTableContext->EState): populate it, use it, and then release it. Since ExecInsert already contains the necessary infrastructure for INSERT ON CONFLICT DO NOTHING/SELECT, exporting it avoids duplicating that logic in src/backend/commands/copyfrom.c (which is what v1 of the patch did). [1]: https://postgr.es/m/cacjufxh_nbpua+o5yr7xp4xdz+ihko2vfkddhrhbz+4-eut...@mail.gmail.com The exclusion unique constraint issue is still not resolved.... but, overall v2 is better than v1, IMHO. -- jian https://www.enterprisedb.com/
From b050faaffc1549f79874cb6a04a9b3047dab721c Mon Sep 17 00:00:00 2001 From: jian he <[email protected]> Date: Mon, 11 May 2026 10:40:17 +0800 Subject: [PATCH v2 2/2] COPY ON_CONFLICT TABLE not sure how to deal with excludsion constraint reference: https://web.archive.org/web/20240328094030/https://riggs.business/blog/f/postgresql-todo-2023 discussion: https://postgr.es/m/cacjufxg672yotdt87dbazf1c9scnzm7qsb+zu6vhc+j5qrj...@mail.gmail.com commitfest entry: https://commitfest.postgresql.org/patch/6736/ --- doc/src/sgml/monitoring.sgml | 6 +- doc/src/sgml/ref/copy.sgml | 90 ++++ src/backend/commands/copy.c | 59 +++ src/backend/commands/copyfrom.c | 527 ++++++++++++++++++++++- src/backend/commands/explain.c | 3 +- src/backend/executor/nodeModifyTable.c | 18 +- src/backend/parser/gram.y | 1 + src/include/commands/copy.h | 4 + src/include/commands/copyfrom_internal.h | 11 + src/include/executor/nodeModifyTable.h | 3 +- src/include/nodes/nodes.h | 1 + src/test/regress/expected/copy.out | 16 +- src/test/regress/expected/copy2.out | 154 +++++++ src/test/regress/sql/copy.sql | 18 +- src/test/regress/sql/copy2.sql | 130 ++++++ 15 files changed, 1025 insertions(+), 16 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 08d5b824552..0860da3d23b 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -6745,9 +6745,9 @@ FROM pg_stat_get_backend_idset() AS backendid; </para> <para> Number of tuples skipped because they contain malformed data. - This counter only advances when - <literal>ignore</literal> is specified to the <literal>ON_ERROR</literal> - option. + This counter advances when + <literal>ignore</literal> is specified to the <literal>ON_ERROR</literal> option + or <literal>table</literal> is specified to the <literal>ON_CONFLICT</literal> option. </para></entry> </row> </tbody> diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml index 4706c9a4410..7410248c0b4 100644 --- a/doc/src/sgml/ref/copy.sgml +++ b/doc/src/sgml/ref/copy.sgml @@ -44,6 +44,8 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable FORCE_QUOTE { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * } FORCE_NOT_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * } FORCE_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * } + ON_CONFLICT <replaceable class="parameter">conflict_action</replaceable> + CONFLICT_TABLE <replaceable class="parameter">conflict_table</replaceable> ON_ERROR <replaceable class="parameter">error_action</replaceable> REJECT_LIMIT <replaceable class="parameter">maxerror</replaceable> ENCODING '<replaceable class="parameter">encoding_name</replaceable>' @@ -440,6 +442,92 @@ COPY (SELECT j FROM (VALUES ('null'::json), (NULL::json)) v(j)) </listitem> </varlistentry> + <varlistentry id="sql-copy-params-on-conflict"> + <term><literal>ON_CONFLICT</literal></term> + <listitem> + <para> + Specifies the behavior when a row violates a unique constraint. + An <replaceable class="parameter">conflict_action</replaceable> value of + <literal>stop</literal> means fail the command, while + <literal>table</literal> means save the conflicting input row to table + <replaceable class="parameter">conflict_table</replaceable> + specified by <literal>CONFLICT_TABLE</literal> and continue with the next one. + The default is <literal>stop</literal>. + </para> + <para> + The <literal>table</literal> + options are applicable only for <command>COPY FROM</command> + when the <literal>FORMAT</literal> is <literal>text</literal> or <literal>csv</literal>. + </para> + <para> + If <literal>ON_CONFLICT</literal> is set to <literal>table</literal>, a + <literal>NOTICE</literal> message is emitted at the end of the command + reporting the number of rows that were inserted to table <replaceable class="parameter">conflict_table</replaceable> + due to unique constraint violation, provided that at least one row was affected. + </para> + <para> + When the <literal>LOG_VERBOSITY</literal> option is set to + <literal>verbose</literal>, a <literal>NOTICE</literal> message is emitted + for each row insert by <literal>ON_CONFLICT</literal>, containing the + input line that violated the unique constraint. When set to + <literal>silent</literal>, no messages are emitted regarding discarded rows. + </para> + <para> + This uses the same mechanism as <link linkend="sql-on-conflict"><command>INSERT ... ON CONFLICT</command></link>. + However, exclusion constraints are not supported; only <literal>NOT DEFERRABLE</literal> + unique constraints are checked for violations. + </para> + </listitem> + </varlistentry> + + <varlistentry id="sql-copy-params-conflict-table"> + <term><literal>CONFLICT_TABLE</literal></term> + <listitem> + <para> + Specifies a destination table (<replaceable class="parameter">conflict_table</replaceable>) + to store details regarding unique constraint violations encountered during + the <command>COPY FROM</command> operation. The target table must define + exactly four columns, though the specific column names are not restricted. + The required column order and data types are: + + <informaltable> + <tgroup cols="2"> + <thead> + <row> + <entry>Data Type</entry> + <entry>Description</entry> + </row> + </thead> + <tbody> + <row> + <entry><type>oid</type></entry> + <entry> + The OID of the destination table for the <command>COPY FROM</command> command. + This corresponds to <link linkend="catalog-pg-class"><structname>pg_class</structname></link>.<structfield>oid</structfield>. + Note that no formal dependency is maintained; if the referenced table is dropped, this value will persist as a stale reference. + </entry> + </row> + <row> + <entry><type>text</type></entry> + <entry>The file path of the <command>COPY FROM</command> input.</entry> + </row> + <row> + <entry><type>bigint</type></entry> + <entry>The line number within the input source where the unique constraint violation occurred (starting at 1).</entry> + </row> + <row> + <entry><type>text</type></entry> + <entry>The raw line text content of the record that caused the violation.</entry> + </row> + </tbody> + </tgroup> + </informaltable> + + </para> + </listitem> + </varlistentry> + + <varlistentry id="sql-copy-params-on-error"> <term><literal>ON_ERROR</literal></term> <listitem> @@ -493,6 +581,8 @@ COPY (SELECT j FROM (VALUES ('null'::json), (NULL::json)) v(j)) If not specified, <literal>ON_ERROR</literal>=<literal>ignore</literal> allows an unlimited number of errors, meaning <command>COPY</command> will skip all erroneous data. + Note: Rows ignored due to unique constraint violations via the + <literal>ON_CONFLICT</literal> option do not count toward this limit. </para> </listitem> </varlistentry> diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 003b70852bb..7b0564f6507 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -561,6 +561,36 @@ defGetCopyLogVerbosityChoice(DefElem *def, ParseState *pstate) return COPY_LOG_VERBOSITY_DEFAULT; /* keep compiler quiet */ } +/* + * Extract a OnConflictAction value from a DefElem. + */ +static OnConflictAction +defGetCopyOnConflictChoice(DefElem *def, ParseState *pstate, bool is_from) +{ + char *sval; + + sval = defGetString(def); + + if (!is_from) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY %s cannot be used with %s", "ON_CONFLICT", "COPY TO"), + parser_errposition(pstate, def->location)); + + if (pg_strcasecmp(sval, "stop") == 0) + return ONCONFLICT_NONE; + else if (pg_strcasecmp(sval, "table") == 0) + return ONCONFLICT_TABLE; + + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + /*- translator: first %s is the name of a COPY option, e.g. ON_ERROR */ + errmsg("COPY %s \"%s\" not recognized", "ON_CONFLICT", sval), + parser_errposition(pstate, def->location)); + + return ONCONFLICT_NONE; /* keep compiler quiet */ +} + /* * Process the statement option list for COPY. * @@ -587,9 +617,11 @@ ProcessCopyOptions(ParseState *pstate, bool freeze_specified = false; bool header_specified = false; bool on_error_specified = false; + bool conflict_rel_specified = false; bool log_verbosity_specified = false; bool reject_limit_specified = false; bool force_array_specified = false; + bool on_conflict_specified = false; ListCell *option; /* Support external use for option sanity checking */ @@ -600,6 +632,8 @@ ProcessCopyOptions(ParseState *pstate, /* default format */ opts_out->format = COPY_FORMAT_TEXT; + opts_out->on_conflict = ONCONFLICT_NONE; + /* Extract options from the statement node tree */ foreach(option, options) { @@ -774,6 +808,21 @@ ProcessCopyOptions(ParseState *pstate, reject_limit_specified = true; opts_out->reject_limit = defGetCopyRejectLimitOption(defel); } + else if (strcmp(defel->defname, "on_conflict") == 0) + { + if (on_conflict_specified) + errorConflictingDefElem(defel, pstate); + on_conflict_specified = true; + opts_out->on_conflict = defGetCopyOnConflictChoice(defel, pstate, is_from); + } + else if (strcmp(defel->defname, "conflict_table") == 0) + { + if (conflict_rel_specified) + errorConflictingDefElem(defel, pstate); + conflict_rel_specified = true; + + opts_out->on_conflictRel = defGetString(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -782,6 +831,16 @@ ProcessCopyOptions(ParseState *pstate, parser_errposition(pstate, defel->location))); } + if ((opts_out->on_conflict != ONCONFLICT_TABLE) && conflict_rel_specified) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY %s requires %s option", "CONFLICT_TABLE", "ON_CONFLICT")); + + if ((opts_out->on_conflict == ONCONFLICT_TABLE) && !conflict_rel_specified) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY %s requires %s option", "ON_CONFLICT", "CONFLICT_TABLE")); + /* * Check for incompatible options (must do these three before inserting * defaults) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 64ac3063c61..4fa5d17a7d2 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -42,16 +42,21 @@ #include "miscadmin.h" #include "nodes/miscnodes.h" #include "optimizer/optimizer.h" +#include "parser/parse_relation.h" #include "pgstat.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" #include "tcop/tcopprot.h" +#include "utils/acl.h" +#include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/portal.h" +#include "utils/regproc.h" #include "utils/rel.h" #include "utils/snapmgr.h" #include "utils/typcache.h" +#include "utils/syscache.h" /* * No more than this many tuples per CopyMultiInsertBuffer @@ -120,6 +125,11 @@ static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo, Oid *typioparam); static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc); static void CopyFromBinaryEnd(CopyFromState cstate); +static void CopyFromConflictTableCheck(CopyFromState cstate); +static void RangeVarCallbackForCopyConflictTable(const RangeVar *rv, Oid relid, Oid oldrelid, + void *arg); +static void CopyFromConflictTableInit(CopyFromState cstate); +static void CopyConflictTablePermissionCheck(ParseState *pstate, Relation rel); /* @@ -801,6 +811,21 @@ CopyFrom(CopyFromState cstate) bool has_before_insert_row_trig; bool has_instead_insert_row_trig; bool leafpart_use_multi_insert = false; + ModifyTableContext mtcontext; /* Used only when ON_CONFLICT is specified */ + TupleTableSlot *conflictslot = NULL; + ModifyTable *node = makeNode(ModifyTable); + + node->operation = CMD_INSERT; + node->canSetTag = false; + node->rootRelation = 0; + node->resultRelations = list_make1_int(1); + node->onConflictAction = ONCONFLICT_NONE; + + if (cstate->opts.on_conflict == ONCONFLICT_TABLE) + { + node->onConflictAction = ONCONFLICT_NOTHING; + node->canSetTag = true; + } Assert(cstate->rel); Assert(list_length(cstate->range_table) == 1); @@ -808,6 +833,11 @@ CopyFrom(CopyFromState cstate) if (cstate->opts.on_error != COPY_ON_ERROR_STOP) Assert(cstate->escontext); + if (cstate->opts.on_conflict == ONCONFLICT_TABLE) + conflictslot = ExecInitExtraTupleSlot(estate, + RelationGetDescr(cstate->conflictRel), + &TTSOpsVirtual); + /* * The target must be a plain, foreign, or partitioned relation, or have * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only @@ -842,6 +872,18 @@ CopyFrom(CopyFromState cstate) RelationGetRelationName(cstate->rel)))); } + /* + * If COPY ON_CONFLICT is specified, the target relation must be either a + * plain table or a partitioned table. + */ + if (cstate->opts.on_conflict == ONCONFLICT_TABLE && + cstate->rel->rd_rel->relkind != RELKIND_RELATION && + cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot perform COPY ON_CONFLCT on relation \"%s\"", RelationGetRelationName(cstate->rel)), + errdetail_relkind_not_supported(cstate->rel->rd_rel->relkind)); + /* * If the target file is new-in-transaction, we assume that checking FSM * for free space is a waste of time. This could possibly be wrong, but @@ -910,6 +952,14 @@ CopyFrom(CopyFromState cstate) ti_options |= TABLE_INSERT_FROZEN; } + /* + * Copy other important information into the EState, this aligned with + * standard_ExecutorStart + */ + estate->es_output_cid = mycid; + estate->es_snapshot = RegisterSnapshot(GetActiveSnapshot()); + estate->es_crosscheck_snapshot = InvalidSnapshot; + /* * We need a ResultRelInfo so we can use the regular executor's * index-entry-making machinery. (There used to be a huge amount of code @@ -923,16 +973,17 @@ CopyFrom(CopyFromState cstate) /* Verify the named relation is a valid target for INSERT */ CheckValidResultRel(resultRelInfo, CMD_INSERT, ONCONFLICT_NONE, NIL); - ExecOpenIndices(resultRelInfo, false); + ExecOpenIndices(resultRelInfo, cstate->opts.on_conflict != ONCONFLICT_NONE); /* * Set up a ModifyTableState so we can let FDW(s) init themselves for * foreign-table result relation(s). */ mtstate = makeNode(ModifyTableState); - mtstate->ps.plan = NULL; + mtstate->ps.plan = (Plan *) node; mtstate->ps.state = estate; mtstate->operation = CMD_INSERT; + mtstate->canSetTag = node->canSetTag; mtstate->mt_nrels = 1; mtstate->resultRelInfo = resultRelInfo; mtstate->rootResultRelInfo = resultRelInfo; @@ -982,6 +1033,13 @@ CopyFrom(CopyFromState cstate) if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) proute = ExecSetupPartitionTupleRouting(estate, cstate->rel); + mtstate->mt_partition_tuple_routing = proute; + + if (cstate->opts.on_conflict == ONCONFLICT_TABLE) + CopyFromConflictTableInit(cstate); + else + cstate->mtcontext = NULL; + if (cstate->whereClause) cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause), &mtstate->ps); @@ -1052,6 +1110,19 @@ CopyFrom(CopyFromState cstate) */ insertMethod = CIM_SINGLE; } + else if (cstate->opts.on_conflict == ONCONFLICT_TABLE) + { + /* + * Cannot use multi-inserts when ON_CONFLICT option is specified as + * TABLE. + * + * We use ExecInsert() for each row individually because we need its + * native ON CONFLICT (DO NOTHING) handling to detect unique + * constraint violations on the COPY source table. and ExecInsert() is + * incompatible with COPY's bulk insert path. + */ + insertMethod = CIM_SINGLE; + } else { /* @@ -1110,6 +1181,8 @@ CopyFrom(CopyFromState cstate) errcallback.arg = cstate; errcallback.previous = error_context_stack; error_context_stack = &errcallback; + mtcontext.mtstate = mtstate; + mtcontext.estate = estate; for (;;) { @@ -1164,7 +1237,7 @@ CopyFrom(CopyFromState cstate) /* Report that this tuple was skipped by the ON_ERROR clause */ pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED, - cstate->num_errors); + (cstate->num_conflicts + cstate->num_errors)); if (cstate->opts.reject_limit > 0 && cstate->num_errors > cstate->opts.reject_limit) @@ -1204,6 +1277,110 @@ CopyFrom(CopyFromState cstate) } } + /* + * For COPY FROM(ON_CONFLICT TABLE), we use ExecInsert() to insert the + * input data into the destination table. The conflict_relOId + * indicates whether a unique constraint violation occurred for ON + * CONFLICT. If a conflict happened, we construct the conflict tuple + * and insert it into the CONFLICT_TABLE. + */ + if (cstate->opts.on_conflict == ONCONFLICT_TABLE) + { + Oid conflict_relOId = InvalidOid; + + Assert(IsA(mtcontext.mtstate->ps.plan, ModifyTable)); + Assert(((ModifyTable *) mtcontext.mtstate->ps.plan)->onConflictAction == ONCONFLICT_NOTHING); + + mtcontext.estate->es_processed = 0; + + ExecInsert(&mtcontext, resultRelInfo, myslot, mtstate->canSetTag, NULL, NULL, + &conflict_relOId); + + if (!OidIsValid(conflict_relOId)) + processed = processed + mtcontext.estate->es_processed; + else + { + int j = 0; + Datum *newvalues; + bool *nulls; + + ModifyTableState *conflict_mstate = cstate->mtcontext->mtstate; + TupleDesc tupdesc = RelationGetDescr(cstate->conflictRel); + + ExecClearTuple(conflictslot); + + newvalues = conflictslot->tts_values; + nulls = conflictslot->tts_isnull; + + for (int i = 0; i < tupdesc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(tupdesc, i); + + if (att->attisdropped) + { + newvalues[i] = (Datum) 0; + nulls[i] = true; + continue; + } + + j++; + nulls[i] = false; + + switch (j) + { + case 1: + newvalues[i] = ObjectIdGetDatum(conflict_relOId); + break; + + case 2: + newvalues[i] = CStringGetTextDatum(cstate->filename ? cstate->filename : "STDIN"); + break; + + case 3: + newvalues[i] = Int64GetDatum((int64) cstate->cur_lineno); + break; + + case 4: + newvalues[i] = CStringGetTextDatum(cstate->line_buf.data); + break; + + default: + elog(ERROR, "COPY ON CONFLICT table must have 4 attributes"); + break; + } + } + + /* Build the virtual tuple. */ + ExecStoreVirtualTuple(conflictslot); + + /* + * On first call, fire BEFORE STATEMENT triggers before + * proceeding. We will only fire BEFORE STATEMENT on + * CONFLICT_TABLE once. + */ + if (conflict_mstate->fireBSTriggers) + { + ExecBSInsertTriggers(conflict_mstate->ps.state, conflict_mstate->rootResultRelInfo); + + conflict_mstate->fireBSTriggers = false; + } + + conflict_mstate->ps.state->es_processed = 0; + ExecInsert(cstate->mtcontext, + conflict_mstate->resultRelInfo, + conflictslot, conflict_mstate->canSetTag, NULL, NULL, NULL); + + cstate->num_conflicts = + cstate->num_conflicts + conflict_mstate->ps.state->es_processed; + + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED, + (cstate->num_conflicts + cstate->num_errors)); + } + + continue; + } + + /* Determine the partition to insert the tuple into */ if (proute) { @@ -1513,8 +1690,61 @@ CopyFrom(CopyFromState cstate) ExecCloseResultRelations(estate); ExecCloseRangeTableRelations(estate); + /* do away with our snapshots */ + UnregisterSnapshot(estate->es_snapshot); + UnregisterSnapshot(estate->es_crosscheck_snapshot); + FreeExecutorState(estate); + /* + * This code path should be aligned with the resource release/destruction + * performed by ExecutorFinish and ExecutorEnd on the EState. + */ + if (cstate->opts.on_conflict == ONCONFLICT_TABLE) + { + MemoryContext tmpcontext; + ModifyTableState *on_conflict_mtstate = cstate->mtcontext->mtstate; + + if (cstate->num_conflicts > 0) + { + if (cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT) + ereport(NOTICE, + errmsg_plural("%" PRIu64 " row was saved to conflict table \"%s\" due to unique constraint violation", + "%" PRIu64 " rows were saved to conflict table \"%s\" due to unique constraint violation", + cstate->num_conflicts, + cstate->num_conflicts, + RelationGetRelationName(cstate->conflictRel))); + + /* Execute AFTER STATEMENT insertion triggers */ + ExecASInsertTriggers(cstate->mtcontext->estate, + on_conflict_mtstate->rootResultRelInfo, + on_conflict_mtstate->mt_transition_capture); + } + + on_conflict_mtstate->mt_done = true; + + /* Close/release resources associated with copy conflict_table */ + tmpcontext = MemoryContextSwitchTo(cstate->mtcontext->estate->es_query_cxt); + + cstate->mtcontext->estate->es_finished = true; + + /* Handle queued AFTER triggers */ + AfterTriggerEndQuery(cstate->mtcontext->estate); + + ExecResetTupleTable(cstate->mtcontext->estate->es_tupleTable, false); + ExecCloseResultRelations(cstate->mtcontext->estate); + ExecCloseRangeTableRelations(cstate->mtcontext->estate); + + /* do away with our snapshots */ + UnregisterSnapshot(cstate->mtcontext->estate->es_snapshot); + UnregisterSnapshot(cstate->mtcontext->estate->es_crosscheck_snapshot); + + /* Must switch out of context before destroying it */ + MemoryContextSwitchTo(tmpcontext); + + FreeExecutorState(cstate->mtcontext->estate); + } + return processed; } @@ -1634,6 +1864,45 @@ BeginCopyFrom(ParseState *pstate, else cstate->escontext = NULL; + if (cstate->opts.on_conflict == ONCONFLICT_TABLE) + { + Oid conflictRelid; + RangeVar *relvar; + List *relname_list; + + Assert(cstate->opts.on_conflictRel != NULL); + + relname_list = stringToQualifiedNameList(cstate->opts.on_conflictRel, NULL); + relvar = makeRangeVarFromNameList(relname_list); + + /* + * Before inserting tuples into the CONFLICT_TABLE, we first check its + * lock status. If the table is already heavily locked, the subsequent + * COPY FROM (ON_CONFLICT TABLE) could hang waiting for the lock. To + * avoid this, we use RVR_NOWAIT and report an error immediately if + * the CONFLICT_TABLE cannot be locked. + */ + conflictRelid = RangeVarGetRelidExtended(relvar, + RowExclusiveLock, + RVR_NOWAIT, + RangeVarCallbackForCopyConflictTable, + NULL); + + if (RelationGetRelid(cstate->rel) == conflictRelid) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot use relation \"%s\" for COPY on_conflict error saving while copying data to it", + cstate->opts.on_conflictRel)); + + cstate->conflictRel = table_open(conflictRelid, NoLock); + + CopyFromConflictTableCheck(cstate); + + table_close(cstate->conflictRel, NoLock); + + /* We will do CONFLICT_TABLE permission check later */ + } + if (cstate->opts.on_error == COPY_ON_ERROR_SET_NULL) { int attr_count = list_length(cstate->attnumlist); @@ -1998,3 +2267,255 @@ ClosePipeFromProgram(CopyFromState cstate) errdetail_internal("%s", wait_result_to_str(pclose_rc)))); } } + +/* + * The conflict_table must be a plain table and is subject to the following + * restrictions: it cannot have generated columns, rules, or row-level security + * policies. + * + * The conflict_table must follow a specific schema: the first column is an OID + * (recording the COPY FROM source relation), the second is the COPY FILE path, + * the third is the line number, and the fourth contains the raw line content. + */ +static void +CopyFromConflictTableCheck(CopyFromState cstate) +{ + int valid_col_count = 0; + char *errdetail_msg = NULL; + Relation relation = cstate->conflictRel; + TupleDesc tupDesc = RelationGetDescr(relation); + + if (tupDesc->constr && + (tupDesc->constr->has_generated_stored || tupDesc->constr->has_generated_virtual)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use relation \"%s\" for COPY on_conflict error saving", + RelationGetRelationName(relation)), + errdetail("The conflict_table cannot have generated columns.")); + + if (relation->rd_rules || relation->rd_rel->relrowsecurity) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use relation \"%s\" for COPY on_conflict error saving", + RelationGetRelationName(relation)), + relation->rd_rules ? errdetail("The conflict_table cannot have rules.") + : errdetail("The conflict_table cannot have row-level security policies.")); + + for (int i = 0; i < tupDesc->natts; i++) + { + Form_pg_attribute attr = TupleDescAttr(tupDesc, i); + + /* Skip columns marked as dropped */ + if (attr->attisdropped) + continue; + + valid_col_count++; + + /* Check types based on the effective column position */ + switch (valid_col_count) + { + case 1: + if (attr->atttypid != OIDOID) + errdetail_msg = _("The first column of the conflict_table must be type OID."); + break; + case 2: + if (attr->atttypid != TEXTOID) + errdetail_msg = _("The second column of the conflict_table must be type TEXT."); + break; + case 3: + if (attr->atttypid != INT8OID) + errdetail_msg = _("The third column of the conflict_table must be type BIGINT."); + break; + case 4: + if (attr->atttypid != TEXTOID) + errdetail_msg = _("The fourth column of the conflict_table must be type TEXT."); + break; + default: + errdetail_msg = _("The conflict_table must have exactly four columns."); + break; + } + } + + if (valid_col_count != 4) + errdetail_msg = _("The conflict_table is incomplete; exactly four columns are required."); + + if (errdetail_msg) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use relation \"%s\" for COPY on_conflict error saving", + RelationGetRelationName(relation)), + errdetail_internal("%s", errdetail_msg)); +} + +/* + * Initialize executor infrastructure needed to insert rows into the + * conflict table during COPY FROM (ON_CONFLICT TABLE) + * + * Performs permission checks, builds a ResultRelInfo with open indexes, + * sets up snapshots and trigger state, and populates cstate->mtcontext + * with a ready-to-use ModifyTableState. + */ +static void +CopyFromConflictTableInit(CopyFromState cstate) +{ + ModifyTableState *mtstate; + ModifyTable *node; + MemoryContext tmpcontext; + ParseState *pstate = make_parsestate(NULL); + EState *estate = CreateExecutorState(); + + cstate->mtcontext = palloc0_object(ModifyTableContext); + + tmpcontext = MemoryContextSwitchTo(estate->es_query_cxt); + + estate->es_output_cid = GetCurrentCommandId(true); + estate->es_snapshot = RegisterSnapshot(GetActiveSnapshot()); + estate->es_crosscheck_snapshot = RegisterSnapshot(InvalidSnapshot); + + /* Set up an AFTER-trigger statement context */ + AfterTriggerBeginQuery(); + + /* permission check for conflict_table */ + CopyConflictTablePermissionCheck(pstate, cstate->conflictRel); + + node = makeNode(ModifyTable); + node->operation = CMD_INSERT; + node->canSetTag = true; + node->rootRelation = 0; + node->resultRelations = list_make1_int(1); + node->onConflictAction = ONCONFLICT_NONE; + + /* + * We need a ResultRelInfo so we can use the regular executor's + * index-entry-making machinery. + */ + ExecInitRangeTable(estate, pstate->p_rtable, pstate->p_rteperminfos, + bms_make_singleton(1)); + + /* Populate the ModifyTableState for inserting record to CONFLICT_TABLE */ + mtstate = makeNode(ModifyTableState); + mtstate->ps.plan = (Plan *) node; + mtstate->ps.state = estate; + + mtstate->operation = node->operation; + mtstate->canSetTag = node->canSetTag; + mtstate->mt_done = false; + + mtstate->mt_nrels = 1; + mtstate->resultRelInfo = palloc_array(ResultRelInfo, mtstate->mt_nrels); + + mtstate->rootResultRelInfo = mtstate->resultRelInfo; + ExecInitResultRelation(estate, mtstate->resultRelInfo, + linitial_int(node->resultRelations)); + + /* Verify the named relation is a valid target for INSERT */ + CheckValidResultRel(mtstate->resultRelInfo, node->operation, + node->onConflictAction, NIL); + + mtstate->fireBSTriggers = true; + MakeTransitionCaptureState(cstate->conflictRel->trigdesc, + RelationGetRelid(cstate->conflictRel), + CMD_INSERT); + + /* TODO: Support cstate->conflictRel when it is a partitioned table */ + + /* + * Open the table's indexes, if we have not done so already, so that we + * can add new index entries for the inserted tuple. + */ + if (cstate->conflictRel->rd_rel->relhasindex && + mtstate->resultRelInfo->ri_IndexRelationDescs == NULL) + ExecOpenIndices(mtstate->resultRelInfo, node->onConflictAction != ONCONFLICT_NONE); + + MemoryContextSwitchTo(tmpcontext); + + cstate->mtcontext->mtstate = mtstate; + cstate->mtcontext->estate = estate; +} + +/* + * COPY (ON_CONFLICT TABLE) log COPY FROM unqiue constraint violation details to + * the CONFLICT_TABLE. Obviously, the current user must have INSERT privileges + * on all columns of the CONFLICT_TABLE. + */ +static void +CopyConflictTablePermissionCheck(ParseState *pstate, Relation rel) +{ + LOCKMODE lockmode = RowExclusiveLock; + ParseNamespaceItem *nsitem; + RTEPermissionInfo *perminfo; + TupleDesc tupDesc = RelationGetDescr(rel); + AclResult aclresult; + + /* Must have INSERT privilege on the table */ + aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(), ACL_INSERT); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, get_relkind_objtype(get_rel_relkind(RelationGetRelid(rel))), + RelationGetRelationName(rel)); + + nsitem = addRangeTableEntryForRelation(pstate, rel, lockmode, + NULL, false, false); + perminfo = nsitem->p_perminfo; + perminfo->requiredPerms = ACL_INSERT; + + /* Must have INSERT privilege on each column of the table */ + for (int i = 0; i < tupDesc->natts; i++) + { + Bitmapset **bms; + int attno; + + CompactAttribute *attr = TupleDescCompactAttr(tupDesc, i); + + if (attr->attisdropped) + continue; + + attno = i + 1 - FirstLowInvalidHeapAttributeNumber; + bms = &perminfo->insertedCols; + + *bms = bms_add_member(*bms, attno); + + } + ExecCheckPermissions(pstate->p_rtable, list_make1(perminfo), true); +} + +/* + * Callback to RangeVarGetRelidExtended(). + * + * Checks the following: + * - the relation specified is a table. + * - the table is not a system table. + * + * If any of these checks fails then an error is raised. + */ +static void +RangeVarCallbackForCopyConflictTable(const RangeVar *rv, Oid relid, Oid oldrelid, + void *arg) +{ + HeapTuple tuple; + Form_pg_class classform; + char relkind; + + tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid)); + if (!HeapTupleIsValid(tuple)) + return; + + classform = (Form_pg_class) GETSTRUCT(tuple); + relkind = classform->relkind; + + /* No system table modifications unless explicitly allowed. */ + if (!allowSystemTableMods && IsSystemClass(relid, classform)) + ereport(ERROR, + errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied: \"%s\" is a system catalog", + rv->relname)); + + /* The conflict error saving table must be a regular relation */ + if (relkind != RELKIND_RELATION) + ereport(ERROR, + errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot use relation \"%s\" for COPY on_conflict error saving", + rv->relname), + errdetail_relkind_not_supported(relkind)); + + ReleaseSysCache(tuple); +} diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 112c17b0d64..acefcb20498 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -4857,9 +4857,8 @@ show_modifytable_info(ModifyTableState *mtstate, List *ancestors, resolution = "NOTHING"; else if (node->onConflictAction == ONCONFLICT_UPDATE) resolution = "UPDATE"; - else + else if (node->onConflictAction == ONCONFLICT_SELECT) { - Assert(node->onConflictAction == ONCONFLICT_SELECT); switch (node->onConflictLockStrength) { case LCS_NONE: diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 908b18d7d4a..eb56ead4d0c 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -824,6 +824,10 @@ ExecGetUpdateNewTuple(ResultRelInfo *relinfo, * *insert_destrel is the relation where it was inserted. * These are only set on success. * + * If conflict_relOId is not NULL, it also checks whether a unique constraint + * violation actually occurred for the ON CONFLICT DO clause and, if so, sets + * *conflict_relOId to the OID of that relation. + * * This may change the currently active tuple conversion map in * mtstate->mt_transition_capture, so the callers must take care to * save the previous value to avoid losing track of it. @@ -835,7 +839,8 @@ ExecInsert(ModifyTableContext *context, TupleTableSlot *slot, bool canSetTag, TupleTableSlot **inserted_tuple, - ResultRelInfo **insert_destrel) + ResultRelInfo **insert_destrel, + Oid *conflict_relOId) { ModifyTableState *mtstate = context->mtstate; EState *estate = context->estate; @@ -1119,6 +1124,9 @@ ExecInsert(ModifyTableContext *context, &conflictTid, &invalidItemPtr, arbiterIndexes)) { + if (conflict_relOId) + *conflict_relOId = RelationGetRelid(resultRelationDesc); + /* committed conflict tuple found */ if (onconflict == ONCONFLICT_UPDATE) { @@ -1580,7 +1588,7 @@ ExecForPortionOfLeftovers(ModifyTableContext *context, AfterTriggerBeginQuery(); ExecSetupTransitionCaptureState(mtstate, estate); fireBSTriggers(mtstate); - ExecInsert(context, resultRelInfo, leftoverSlot, false, NULL, NULL); + ExecInsert(context, resultRelInfo, leftoverSlot, false, NULL, NULL, NULL); fireASTriggers(mtstate); AfterTriggerEndQuery(estate); } @@ -2320,7 +2328,7 @@ ExecCrossPartitionUpdate(ModifyTableContext *context, /* Tuple routing starts from the root table. */ context->cpUpdateReturningSlot = ExecInsert(context, mtstate->rootResultRelInfo, slot, canSetTag, - inserted_tuple, insert_destrel); + inserted_tuple, insert_destrel, NULL); /* * Reset the transition state that may possibly have been written by @@ -4082,7 +4090,7 @@ ExecMergeNotMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo, mtstate->mt_merge_action = action; rslot = ExecInsert(context, mtstate->rootResultRelInfo, - newslot, canSetTag, NULL, NULL); + newslot, canSetTag, NULL, NULL, NULL); mtstate->mt_merge_inserted += 1; break; case CMD_NOTHING: @@ -4913,7 +4921,7 @@ ExecModifyTable(PlanState *pstate) ExecInitInsertProjection(node, resultRelInfo); slot = ExecGetInsertNewTuple(resultRelInfo, context.planSlot); slot = ExecInsert(&context, resultRelInfo, slot, - node->canSetTag, NULL, NULL); + node->canSetTag, NULL, NULL, NULL); break; case CMD_UPDATE: diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index ff4e1388c55..2854f2a884f 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -3755,6 +3755,7 @@ copy_generic_opt_arg: | NumericOnly { $$ = (Node *) $1; } | '*' { $$ = (Node *) makeNode(A_Star); } | DEFAULT { $$ = (Node *) makeString("default"); } + | TABLE { $$ = (Node *) makeString("table"); } | '(' copy_generic_opt_arg_list ')' { $$ = (Node *) $2; } | /* EMPTY */ { $$ = NULL; } ; diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index abecfe51098..adb297f1f6c 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -94,9 +94,13 @@ typedef struct CopyFormatOptions bool *force_null_flags; /* per-column CSV FN flags */ bool convert_selectively; /* do selective binary conversion? */ CopyOnErrorChoice on_error; /* what to do when error happened */ + OnConflictAction on_conflict; /* what to do when unique conflict + * happened */ CopyLogVerbosityChoice log_verbosity; /* verbosity of logged messages */ int64 reject_limit; /* maximum tolerable number of errors */ List *convert_select; /* list of column names (can be NIL) */ + char *on_conflictRel; /* on error, save error info to the table, + * table name */ } CopyFormatOptions; /* These are private in commands/copy[from|to].c */ diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index 9d3e244ee55..f487b1cb6f5 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -16,6 +16,7 @@ #include "commands/copy.h" #include "commands/trigger.h" +#include "executor/nodeModifyTable.h" #include "nodes/miscnodes.h" /* @@ -73,6 +74,7 @@ typedef struct CopyFromStateData /* parameters from the COPY command */ Relation rel; /* relation to copy from */ + Relation conflictRel; /* relation for copy from conflict saving */ List *attnumlist; /* integer list of attnums to copy */ char *filename; /* filename, or NULL for STDIN */ bool is_program; /* is 'filename' a program to popen? */ @@ -102,6 +104,8 @@ typedef struct CopyFromStateData * execution */ uint64 num_errors; /* total number of rows which contained soft * errors */ + uint64 num_conflicts; /* total number of rows skipped due to unique + * constraint conflict */ int *defmap; /* array of default att numbers related to * missing att */ ExprState **defexprs; /* array of default att expressions for all @@ -189,6 +193,13 @@ typedef struct CopyFromStateData #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) uint64 bytes_processed; /* number of bytes processed so far */ + + /* + * INSERT operation context for inserting COPY FROM unique constraint + * violation failure information to conflict_table. This is set only when + * COPY (ON_CONFLICT TABLE) is used; otherwise it remains NULL. + */ + ModifyTableContext *mtcontext; } CopyFromStateData; extern void ReceiveCopyBegin(CopyFromState cstate); diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h index 250bd64ad15..e595c3737d9 100644 --- a/src/include/executor/nodeModifyTable.h +++ b/src/include/executor/nodeModifyTable.h @@ -68,7 +68,8 @@ extern TupleTableSlot *ExecInsert(ModifyTableContext *context, TupleTableSlot *slot, bool canSetTag, TupleTableSlot **inserted_tuple, - ResultRelInfo **insert_destrel); + ResultRelInfo **insert_destrel, + Oid *conflict_relOId); extern void ExecEndModifyTable(ModifyTableState *node); extern void ExecReScanModifyTable(ModifyTableState *node); diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index a2925ae4946..22a329cb810 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -429,6 +429,7 @@ typedef enum OnConflictAction ONCONFLICT_NOTHING, /* ON CONFLICT ... DO NOTHING */ ONCONFLICT_UPDATE, /* ON CONFLICT ... DO UPDATE */ ONCONFLICT_SELECT, /* ON CONFLICT ... DO SELECT */ + ONCONFLICT_TABLE, /* COPY (ON_CONFLICT TABLE) */ } OnConflictAction; /* diff --git a/src/test/regress/expected/copy.out b/src/test/regress/expected/copy.out index 1714faab39c..e13fe171585 100644 --- a/src/test/regress/expected/copy.out +++ b/src/test/regress/expected/copy.out @@ -430,6 +430,14 @@ copy tab_progress_reporting from :'filename' where (salary < 2000); INFO: progress: {"type": "FILE", "command": "COPY FROM", "relname": "tab_progress_reporting", "tuples_skipped": 0, "has_bytes_total": true, "tuples_excluded": 1, "tuples_processed": 2, "has_bytes_processed": true} -- Generate COPY FROM report with PIPE, with some skipped tuples. +create unique index tab_progress_reporting_idx1 on tab_progress_reporting(name); +create temp table conflict_tbl(copy_tbl oid, filename text, lineno bigint, line text); +copy tab_progress_reporting from stdin(on_conflict table, conflict_table 'conflict_tbl'); +INFO: progress: {"type": "PIPE", "command": "COPY FROM", "relname": "tab_progress_reporting", "tuples_skipped": 3, "has_bytes_total": false, "tuples_excluded": 0, "tuples_processed": 0, "has_bytes_processed": true} +NOTICE: 3 rows were saved to conflict table "conflict_tbl" due to unique constraint violation +drop index tab_progress_reporting_idx1; +drop table conflict_tbl; +-- Generate COPY FROM report with PIPE, with some skipped tuples. copy tab_progress_reporting from stdin(on_error ignore); NOTICE: 2 rows were skipped due to data type incompatibility INFO: progress: {"type": "PIPE", "command": "COPY FROM", "relname": "tab_progress_reporting", "tuples_skipped": 2, "has_bytes_total": false, "tuples_excluded": 0, "tuples_processed": 1, "has_bytes_processed": true} @@ -554,11 +562,17 @@ SELECT tableoid::regclass, id % 2 = 0 is_even, count(*) from parted_si GROUP BY (2 rows) DROP TABLE parted_si; --- ensure COPY FREEZE errors for foreign tables +-- ensure COPY FREEZE/ON_CONFLICT errors for foreign tables begin; +create temp table conflict_tbl(copy_tbl oid, filename text, lineno bigint, line text); create foreign data wrapper copytest_wrapper; create server copytest_server foreign data wrapper copytest_wrapper; create foreign table copytest_foreign_table (a int) server copytest_server; +SAVEPOINT s1; +copy copytest_foreign_table from stdin (ON_CONFLICT TABLE, CONFLICT_TABLE 'conflict_tbl'); +ERROR: cannot perform COPY ON_CONFLCT on relation "copytest_foreign_table" +DETAIL: This operation is not supported for foreign tables. +ROLLBACK TO SAVEPOINT s1; copy copytest_foreign_table from stdin (freeze); ERROR: cannot perform COPY FREEZE on a foreign table rollback; diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out index 7600e5239d2..f22fb9c4651 100644 --- a/src/test/regress/expected/copy2.out +++ b/src/test/regress/expected/copy2.out @@ -884,7 +884,161 @@ ERROR: skipped more than REJECT_LIMIT (3) rows due to data type incompatibility CONTEXT: COPY check_ign_err, line 5, column n: "" COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 4); NOTICE: 4 rows were skipped due to data type incompatibility +CREATE DOMAIN d_text as TEXT; +CREATE TABLE t_copy_tblp(c text, b int, a int) PARTITION BY RANGE(a); +CREATE TABLE t_copy_tbl(a int, b int, c text); +ALTER TABLE t_copy_tblp ATTACH PARTITION t_copy_tbl FOR VALUES FROM (MINVALUE) TO (100); +CREATE TABLE t_copy_tbl1 PARTITION OF t_copy_tblp FOR VALUES FROM (100) TO (200); +CREATE TABLE err_tbl1(copy_tbl oid, filename text, lineno bigint, line text generated always as ('hh') stored); +CREATE POLICY p1 ON err_tbl1 FOR SELECT USING (true); +ALTER TABLE err_tbl1 ENABLE ROW LEVEL SECURITY; +ALTER TABLE err_tbl1 FORCE ROW LEVEL SECURITY; +CREATE VIEW err_tblv AS SELECT * FROM err_tbl1; +COPY t_copy_tbl FROM STDIN WITH (ON_CONFLICT TABLE, CONFLICT_TABLE err_tblv); -- error +ERROR: cannot use relation "err_tblv" for COPY on_conflict error saving +DETAIL: This operation is not supported for views. +COPY t_copy_tbl FROM STDIN WITH (ON_CONFLICT TABLE); -- error +ERROR: COPY ON_CONFLICT requires CONFLICT_TABLE option +COPY t_copy_tbl FROM STDOUT WITH (CONFLICT_TABLE err_tbl1); -- error +ERROR: COPY CONFLICT_TABLE requires ON_CONFLICT option +COPY t_copy_tbl FROM STDOUT WITH (CONFLICT_TABLE 'err_tbl1'); -- error +ERROR: COPY CONFLICT_TABLE requires ON_CONFLICT option +COPY t_copy_tbl TO STDOUT (ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1); -- error +ERROR: COPY ON_CONFLICT cannot be used with COPY TO +LINE 1: COPY t_copy_tbl TO STDOUT (ON_CONFLICT TABLE, CONFLICT_TABLE... + ^ +-- The conflict error saving table must be a plain table and it cannot contain +-- generated column, rules, or row-level security policies +COPY t_copy_tbl FROM STDIN WITH (DELIMITER ',', ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1); -- error +ERROR: cannot use relation "err_tbl1" for COPY on_conflict error saving +DETAIL: The conflict_table cannot have generated columns. +ALTER TABLE err_tbl1 ALTER COLUMN line DROP EXPRESSION; +COPY t_copy_tbl FROM STDIN WITH (DELIMITER ',', ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1); -- error +ERROR: cannot use relation "err_tbl1" for COPY on_conflict error saving +DETAIL: The conflict_table cannot have row-level security policies. +DROP POLICY IF EXISTS p1 ON err_tbl1; +ALTER TABLE err_tbl1 DISABLE ROW LEVEL SECURITY; +COPY instead_of_insert_tbl_view FROM STDIN (ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1); -- error +ERROR: cannot perform COPY ON_CONFLCT on relation "instead_of_insert_tbl_view" +DETAIL: This operation is not supported for views. +COPY t_copy_tblp(a, c, b) FROM STDIN (DELIMITER ',', ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1); -- ok +-- COPY ON_CONFLICT TABLE cannot apply to deferred unique constraint +ALTER TABLE t_copy_tbl ADD CONSTRAINT t_copy_tbl_unq1 UNIQUE (a) DEFERRABLE INITIALLY DEFERRED; +BEGIN; +COPY t_copy_tbl FROM STDIN (DELIMITER ',', ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1); +ERROR: ON CONFLICT does not support deferrable unique constraints/exclusion constraints as arbiters +CONTEXT: COPY t_copy_tbl, line 1: "1,2,3" +ROLLBACK; +ALTER TABLE t_copy_tbl DROP CONSTRAINT t_copy_tbl_unq1; +ALTER TABLE err_tbl1 ADD CONSTRAINT cc CHECK (lineno > 0); +ALTER TABLE err_tbl1 ADD CONSTRAINT nn NOT NULL copy_tbl; +CREATE UNIQUE INDEX ON t_copy_tbl (b) WHERE a = 1; +CREATE UNIQUE INDEX ON t_copy_tbl ((b+1)); +CREATE UNIQUE INDEX ON t_copy_tbl (c); +COPY t_copy_tbl(b, a, c) FROM STDIN (DELIMITER ',', ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1, LOG_VERBOSITY verbose); -- ok +NOTICE: 2 rows were saved to conflict table "err_tbl1" due to unique constraint violation +SELECT tableoid::regclass, * FROM t_copy_tblp; + tableoid | c | b | a +------------+---+---+--- + t_copy_tbl | 3 | 2 | 1 +(1 row) + +SELECT copy_tbl::regclass, filename, lineno, line FROM err_tbl1; + copy_tbl | filename | lineno | line +------------+----------+--------+--------- + t_copy_tbl | STDIN | 1 | 2,1,aaa + t_copy_tbl | STDIN | 2 | 2,1,XXX +(2 rows) + +CREATE FUNCTION trig_copy_conflict_insert() +RETURNS TRIGGER LANGUAGE plpgsql AS +$$ +BEGIN + RAISE NOTICE 'trigger name: %, % % FOR EACH %', TG_NAME, TG_WHEN, TG_OP, TG_LEVEL; + RAISE NOTICE 'NEW lineno: %, line: %', NEW.lineno, NEW.line; + RETURN NEW; +END; +$$; +CREATE TRIGGER t_copy_tbl_before_row_trig + BEFORE INSERT ON err_tbl1 + FOR EACH ROW EXECUTE PROCEDURE trig_copy_conflict_insert(); +CREATE TRIGGER t_copy_tbl_after_row_trig + AFTER INSERT ON err_tbl1 + FOR EACH ROW EXECUTE PROCEDURE trig_copy_conflict_insert(); +CREATE TRIGGER t_copy_tbl_before_stmt_trig + BEFORE INSERT ON err_tbl1 + FOR EACH STATEMENT EXECUTE PROCEDURE trig_copy_conflict_insert(); +CREATE TRIGGER t_copy_tbl_after_stmt_trig + AFTER INSERT ON err_tbl1 + FOR EACH STATEMENT EXECUTE PROCEDURE trig_copy_conflict_insert(); +CREATE UNIQUE INDEX ON t_copy_tblp (a); +-- Since we are inserting data into CONFLICT_TABLE: +-- FOR EACH STATEMENT triggers will be executed only once per INSERT statement +-- FOR EACH ROW triggers will fire once for every row inserted into CONFLICT_TABLE +BEGIN ISOLATION LEVEL REPEATABLE READ; +INSERT INTO t_copy_tblp(b, a, c) VALUES (14,7,'xxxxxxxx'); +DELETE FROM t_copy_tblp WHERE b = 14 and a = 7 and c = 'xxxxxxxx'; +COPY t_copy_tblp(b, a, c) FROM STDIN (DELIMITER ',', ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1, LOG_VERBOSITY verbose); +NOTICE: trigger name: t_copy_tbl_before_stmt_trig, BEFORE INSERT FOR EACH STATEMENT +NOTICE: NEW lineno: <NULL>, line: <NULL> +NOTICE: trigger name: t_copy_tbl_before_row_trig, BEFORE INSERT FOR EACH ROW +NOTICE: NEW lineno: 2, line: 6,11,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +NOTICE: trigger name: t_copy_tbl_before_row_trig, BEFORE INSERT FOR EACH ROW +NOTICE: NEW lineno: 4, line: 12,2,xxxxxxxx +NOTICE: trigger name: t_copy_tbl_before_row_trig, BEFORE INSERT FOR EACH ROW +NOTICE: NEW lineno: 5, line: 13,3,xxxxxxxx +NOTICE: trigger name: t_copy_tbl_before_row_trig, BEFORE INSERT FOR EACH ROW +NOTICE: NEW lineno: 7, line: 2,199,Z +NOTICE: trigger name: t_copy_tbl_after_row_trig, AFTER INSERT FOR EACH ROW +NOTICE: NEW lineno: 2, line: 6,11,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +NOTICE: trigger name: t_copy_tbl_after_row_trig, AFTER INSERT FOR EACH ROW +NOTICE: NEW lineno: 4, line: 12,2,xxxxxxxx +NOTICE: trigger name: t_copy_tbl_after_row_trig, AFTER INSERT FOR EACH ROW +NOTICE: NEW lineno: 5, line: 13,3,xxxxxxxx +NOTICE: trigger name: t_copy_tbl_after_row_trig, AFTER INSERT FOR EACH ROW +NOTICE: NEW lineno: 7, line: 2,199,Z +NOTICE: 4 rows were saved to conflict table "err_tbl1" due to unique constraint violation +NOTICE: trigger name: t_copy_tbl_after_stmt_trig, AFTER INSERT FOR EACH STATEMENT +NOTICE: NEW lineno: <NULL>, line: <NULL> +COPY t_copy_tblp(b, a, c) FROM STDIN (DELIMITER ',', ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1, LOG_VERBOSITY verbose); +NOTICE: trigger name: t_copy_tbl_before_stmt_trig, BEFORE INSERT FOR EACH STATEMENT +NOTICE: NEW lineno: <NULL>, line: <NULL> +NOTICE: trigger name: t_copy_tbl_before_row_trig, BEFORE INSERT FOR EACH ROW +NOTICE: NEW lineno: 1, line: 199,199,Y +NOTICE: trigger name: t_copy_tbl_after_row_trig, AFTER INSERT FOR EACH ROW +NOTICE: NEW lineno: 1, line: 199,199,Y +NOTICE: 1 row was saved to conflict table "err_tbl1" due to unique constraint violation +NOTICE: trigger name: t_copy_tbl_after_stmt_trig, AFTER INSERT FOR EACH STATEMENT +NOTICE: NEW lineno: <NULL>, line: <NULL> +ALTER TABLE err_tbl1 DISABLE TRIGGER USER; +COMMIT; +CREATE TABLE err_tbl6 ( + id1 int4range, + valid_at int4range, + CONSTRAINT err_tbl6_uq UNIQUE (id1, valid_at WITHOUT OVERLAPS) +); +COPY err_tbl6 FROM STDIN (ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1); -- error +ERROR: empty WITHOUT OVERLAPS value found in column "valid_at" in relation "err_tbl6" +CONTEXT: COPY err_tbl6, line 1: "[11,12) empty" +COPY err_tbl6 FROM STDIN (ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1); +NOTICE: 1 row was saved to conflict table "err_tbl1" due to unique constraint violation +SELECT copy_tbl::regclass, filename, lineno, line FROM err_tbl1; + copy_tbl | filename | lineno | line +-------------+----------+--------+---------------------------------------------------------------------------------- + t_copy_tbl | STDIN | 1 | 2,1,aaa + t_copy_tbl | STDIN | 2 | 2,1,XXX + t_copy_tbl | STDIN | 2 | 6,11,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa + t_copy_tbl | STDIN | 4 | 12,2,xxxxxxxx + t_copy_tbl | STDIN | 5 | 13,3,xxxxxxxx + t_copy_tbl1 | STDIN | 7 | 2,199,Z + t_copy_tbl1 | STDIN | 1 | 199,199,Y + err_tbl6 | STDIN | 2 | [1,10) [1,12) +(8 rows) + -- clean up +DROP VIEW err_tblv; +DROP TABLE err_tbl1; +DROP DOMAIN d_text; DROP TABLE forcetest; DROP TABLE vistest; DROP FUNCTION truncate_in_subxact(); diff --git a/src/test/regress/sql/copy.sql b/src/test/regress/sql/copy.sql index eaad290b257..9605e532715 100644 --- a/src/test/regress/sql/copy.sql +++ b/src/test/regress/sql/copy.sql @@ -369,6 +369,17 @@ truncate tab_progress_reporting; copy tab_progress_reporting from :'filename' where (salary < 2000); +-- Generate COPY FROM report with PIPE, with some skipped tuples. +create unique index tab_progress_reporting_idx1 on tab_progress_reporting(name); +create temp table conflict_tbl(copy_tbl oid, filename text, lineno bigint, line text); +copy tab_progress_reporting from stdin(on_conflict table, conflict_table 'conflict_tbl'); +sharon 25 (115,12) 1000 sam +bill 20 (111,10) 1000 sharon +bill 20 (111,10) 1000 sharon +\. +drop index tab_progress_reporting_idx1; +drop table conflict_tbl; + -- Generate COPY FROM report with PIPE, with some skipped tuples. copy tab_progress_reporting from stdin(on_error ignore); sharon x (15,12) x sam @@ -503,11 +514,16 @@ SELECT tableoid::regclass, id % 2 = 0 is_even, count(*) from parted_si GROUP BY DROP TABLE parted_si; --- ensure COPY FREEZE errors for foreign tables +-- ensure COPY FREEZE/ON_CONFLICT errors for foreign tables begin; +create temp table conflict_tbl(copy_tbl oid, filename text, lineno bigint, line text); create foreign data wrapper copytest_wrapper; create server copytest_server foreign data wrapper copytest_wrapper; create foreign table copytest_foreign_table (a int) server copytest_server; +SAVEPOINT s1; +copy copytest_foreign_table from stdin (ON_CONFLICT TABLE, CONFLICT_TABLE 'conflict_tbl'); +\. +ROLLBACK TO SAVEPOINT s1; copy copytest_foreign_table from stdin (freeze); 1 \. diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql index e0810109473..c3a1e5f1b59 100644 --- a/src/test/regress/sql/copy2.sql +++ b/src/test/regress/sql/copy2.sql @@ -636,7 +636,137 @@ a {7} 7 10 {10} 10 \. +CREATE DOMAIN d_text as TEXT; +CREATE TABLE t_copy_tblp(c text, b int, a int) PARTITION BY RANGE(a); +CREATE TABLE t_copy_tbl(a int, b int, c text); +ALTER TABLE t_copy_tblp ATTACH PARTITION t_copy_tbl FOR VALUES FROM (MINVALUE) TO (100); +CREATE TABLE t_copy_tbl1 PARTITION OF t_copy_tblp FOR VALUES FROM (100) TO (200); +CREATE TABLE err_tbl1(copy_tbl oid, filename text, lineno bigint, line text generated always as ('hh') stored); +CREATE POLICY p1 ON err_tbl1 FOR SELECT USING (true); +ALTER TABLE err_tbl1 ENABLE ROW LEVEL SECURITY; +ALTER TABLE err_tbl1 FORCE ROW LEVEL SECURITY; + +CREATE VIEW err_tblv AS SELECT * FROM err_tbl1; +COPY t_copy_tbl FROM STDIN WITH (ON_CONFLICT TABLE, CONFLICT_TABLE err_tblv); -- error +COPY t_copy_tbl FROM STDIN WITH (ON_CONFLICT TABLE); -- error +COPY t_copy_tbl FROM STDOUT WITH (CONFLICT_TABLE err_tbl1); -- error +COPY t_copy_tbl FROM STDOUT WITH (CONFLICT_TABLE 'err_tbl1'); -- error +COPY t_copy_tbl TO STDOUT (ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1); -- error +-- The conflict error saving table must be a plain table and it cannot contain +-- generated column, rules, or row-level security policies +COPY t_copy_tbl FROM STDIN WITH (DELIMITER ',', ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1); -- error +ALTER TABLE err_tbl1 ALTER COLUMN line DROP EXPRESSION; +COPY t_copy_tbl FROM STDIN WITH (DELIMITER ',', ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1); -- error +DROP POLICY IF EXISTS p1 ON err_tbl1; +ALTER TABLE err_tbl1 DISABLE ROW LEVEL SECURITY; +COPY instead_of_insert_tbl_view FROM STDIN (ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1); -- error +ALTER TABLE err_tbl1 ALTER COLUMN line SET DATA TYPE d_text; +COPY t_copy_tbl FROM STDIN WITH (DELIMITER ',', ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1); -- error +ALTER TABLE err_tbl1 DROP COLUMN line; +COPY t_copy_tbl FROM STDIN WITH (DELIMITER ',', ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1); -- error +ALTER TABLE err_tbl1 ADD COLUMN line text, ADD column extra int; +COPY t_copy_tbl FROM STDIN WITH (DELIMITER ',', ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1); -- error +ALTER TABLE err_tbl1 DROP COLUMN extra; + +COPY t_copy_tbl FROM STDIN WITH (ON_CONFLICT STOP); -- ok +\. + +COPY t_copy_tblp(a, c, b) FROM STDIN (DELIMITER ',', ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1); -- ok +1,3,2 +\. + +-- COPY ON_CONFLICT TABLE cannot apply to deferred unique constraint +ALTER TABLE t_copy_tbl ADD CONSTRAINT t_copy_tbl_unq1 UNIQUE (a) DEFERRABLE INITIALLY DEFERRED; +BEGIN; +COPY t_copy_tbl FROM STDIN (DELIMITER ',', ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1); +1,2,3 +\. +ROLLBACK; +ALTER TABLE t_copy_tbl DROP CONSTRAINT t_copy_tbl_unq1; + +ALTER TABLE err_tbl1 ADD CONSTRAINT cc CHECK (lineno > 0); +ALTER TABLE err_tbl1 ADD CONSTRAINT nn NOT NULL copy_tbl; +CREATE UNIQUE INDEX ON t_copy_tbl (b) WHERE a = 1; +CREATE UNIQUE INDEX ON t_copy_tbl ((b+1)); +CREATE UNIQUE INDEX ON t_copy_tbl (c); + +COPY t_copy_tbl(b, a, c) FROM STDIN (DELIMITER ',', ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1, LOG_VERBOSITY verbose); -- ok +2,1,aaa +2,1,XXX +\. + +SELECT tableoid::regclass, * FROM t_copy_tblp; +SELECT copy_tbl::regclass, filename, lineno, line FROM err_tbl1; + +CREATE FUNCTION trig_copy_conflict_insert() +RETURNS TRIGGER LANGUAGE plpgsql AS +$$ +BEGIN + RAISE NOTICE 'trigger name: %, % % FOR EACH %', TG_NAME, TG_WHEN, TG_OP, TG_LEVEL; + RAISE NOTICE 'NEW lineno: %, line: %', NEW.lineno, NEW.line; + RETURN NEW; +END; +$$; + +CREATE TRIGGER t_copy_tbl_before_row_trig + BEFORE INSERT ON err_tbl1 + FOR EACH ROW EXECUTE PROCEDURE trig_copy_conflict_insert(); +CREATE TRIGGER t_copy_tbl_after_row_trig + AFTER INSERT ON err_tbl1 + FOR EACH ROW EXECUTE PROCEDURE trig_copy_conflict_insert(); +CREATE TRIGGER t_copy_tbl_before_stmt_trig + BEFORE INSERT ON err_tbl1 + FOR EACH STATEMENT EXECUTE PROCEDURE trig_copy_conflict_insert(); +CREATE TRIGGER t_copy_tbl_after_stmt_trig + AFTER INSERT ON err_tbl1 + FOR EACH STATEMENT EXECUTE PROCEDURE trig_copy_conflict_insert(); + +CREATE UNIQUE INDEX ON t_copy_tblp (a); + +-- Since we are inserting data into CONFLICT_TABLE: +-- FOR EACH STATEMENT triggers will be executed only once per INSERT statement +-- FOR EACH ROW triggers will fire once for every row inserted into CONFLICT_TABLE +BEGIN ISOLATION LEVEL REPEATABLE READ; +INSERT INTO t_copy_tblp(b, a, c) VALUES (14,7,'xxxxxxxx'); +DELETE FROM t_copy_tblp WHERE b = 14 and a = 7 and c = 'xxxxxxxx'; + +COPY t_copy_tblp(b, a, c) FROM STDIN (DELIMITER ',', ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1, LOG_VERBOSITY verbose); +4,17,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +6,11,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +15,21,xxxxxxxx +12,2,xxxxxxxx +13,3,xxxxxxxx +199,199,Y +2,199,Z +\. + +COPY t_copy_tblp(b, a, c) FROM STDIN (DELIMITER ',', ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1, LOG_VERBOSITY verbose); +199,199,Y +\. +ALTER TABLE err_tbl1 DISABLE TRIGGER USER; +COMMIT; + +CREATE TABLE err_tbl6 ( + id1 int4range, + valid_at int4range, + CONSTRAINT err_tbl6_uq UNIQUE (id1, valid_at WITHOUT OVERLAPS) +); + +COPY err_tbl6 FROM STDIN (ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1); -- error +[11,12) empty +\. + +COPY err_tbl6 FROM STDIN (ON_CONFLICT TABLE, CONFLICT_TABLE err_tbl1); +[1,10) [1,2) +[1,10) [1,12) +\. + +SELECT copy_tbl::regclass, filename, lineno, line FROM err_tbl1; + -- clean up +DROP VIEW err_tblv; +DROP TABLE err_tbl1; +DROP DOMAIN d_text; DROP TABLE forcetest; DROP TABLE vistest; DROP FUNCTION truncate_in_subxact(); -- 2.34.1
From d11b899f771b4145bc139df3da2c4c734f9c0e76 Mon Sep 17 00:00:00 2001 From: jian he <[email protected]> Date: Sun, 10 May 2026 12:13:09 +0800 Subject: [PATCH v2 1/2] export ExecInsert The ExecInsert function encapsulates core logic for the insertion pipeline, including partition routing, BEFORE ROW triggers, INSTEAD OF triggers, and AFTER ROW triggers and others. exporting ExecInsert, the COPY FROM command can leverage the exact same execution path as standard inserts. discussion: https://postgr.es/m/cacjufxg672yotdt87dbazf1c9scnzm7qsb+zu6vhc+j5qrj...@mail.gmail.com commitfest entry: https://commitfest.postgresql.org/patch/6736/ --- src/backend/executor/nodeModifyTable.c | 40 +---------------------- src/include/executor/nodeModifyTable.h | 45 ++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 39 deletions(-) diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 4cb057ca4f9..908b18d7d4a 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -83,44 +83,6 @@ typedef struct MTTargetRelLookup int relationIndex; /* rel's index in resultRelInfo[] array */ } MTTargetRelLookup; -/* - * Context struct for a ModifyTable operation, containing basic execution - * state and some output variables populated by ExecUpdateAct() and - * ExecDeleteAct() to report the result of their actions to callers. - */ -typedef struct ModifyTableContext -{ - /* Operation state */ - ModifyTableState *mtstate; - EPQState *epqstate; - EState *estate; - - /* - * Slot containing tuple obtained from ModifyTable's subplan. Used to - * access "junk" columns that are not going to be stored. - */ - TupleTableSlot *planSlot; - - /* - * Information about the changes that were made concurrently to a tuple - * being updated or deleted - */ - TM_FailureData tmfd; - - /* - * The tuple deleted when doing a cross-partition UPDATE with a RETURNING - * clause that refers to OLD columns (converted to the root's tuple - * descriptor). - */ - TupleTableSlot *cpDeletedSlot; - - /* - * The tuple projected by the INSERT's RETURNING clause, when doing a - * cross-partition UPDATE - */ - TupleTableSlot *cpUpdateReturningSlot; -} ModifyTableContext; - /* * Context struct containing output data specific to UPDATE operations. */ @@ -867,7 +829,7 @@ ExecGetUpdateNewTuple(ResultRelInfo *relinfo, * save the previous value to avoid losing track of it. * ---------------------------------------------------------------- */ -static TupleTableSlot * +TupleTableSlot * ExecInsert(ModifyTableContext *context, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h index f6070e1cdf3..250bd64ad15 100644 --- a/src/include/executor/nodeModifyTable.h +++ b/src/include/executor/nodeModifyTable.h @@ -13,8 +13,47 @@ #ifndef NODEMODIFYTABLE_H #define NODEMODIFYTABLE_H +#include "access/tableam.h" #include "nodes/execnodes.h" +/* + * Context struct for a ModifyTable operation, containing basic execution + * state and some output variables populated by ExecUpdateAct() and + * ExecDeleteAct() to report the result of their actions to callers. + */ +typedef struct ModifyTableContext +{ + /* Operation state */ + ModifyTableState *mtstate; + EPQState *epqstate; + EState *estate; + + /* + * Slot containing tuple obtained from ModifyTable's subplan. Used to + * access "junk" columns that are not going to be stored. + */ + TupleTableSlot *planSlot; + + /* + * Information about the changes that were made concurrently to a tuple + * being updated or deleted + */ + TM_FailureData tmfd; + + /* + * The tuple deleted when doing a cross-partition UPDATE with a RETURNING + * clause that refers to OLD columns (converted to the root's tuple + * descriptor). + */ + TupleTableSlot *cpDeletedSlot; + + /* + * The tuple projected by the INSERT's RETURNING clause, when doing a + * cross-partition UPDATE + */ + TupleTableSlot *cpUpdateReturningSlot; +} ModifyTableContext; + extern void ExecInitGenerated(ResultRelInfo *resultRelInfo, EState *estate, CmdType cmdtype); @@ -24,6 +63,12 @@ extern void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, CmdType cmdtype); extern ModifyTableState *ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags); +extern TupleTableSlot *ExecInsert(ModifyTableContext *context, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + bool canSetTag, + TupleTableSlot **inserted_tuple, + ResultRelInfo **insert_destrel); extern void ExecEndModifyTable(ModifyTableState *node); extern void ExecReScanModifyTable(ModifyTableState *node); -- 2.34.1
