On Fri, Sep 20, 2019 at 4:16 PM Alexey Kondratov <a.kondra...@postgrespro.ru> wrote:
> > First of all, there is definitely a problem with grammar. In docs ERROR > is defined as option and > > COPY test FROM '/path/to/copy-test-simple.csv' ERROR -1; > > works just fine, but if modern 'WITH (...)' syntax is used: > > COPY test FROM '/path/to/copy-test-simple.csv' WITH (ERROR -1); > ERROR: option "error" not recognized > > while 'WITH (error_limit -1)' it works again. > > It happens, since COPY supports modern and very-very old syntax: > > * In the preferred syntax the options are comma-separated > * and use generic identifiers instead of keywords. The pre-9.0 > * syntax had a hard-wired, space-separated set of options. > > So I see several options here: > > 1) Everything is left as is, but then docs should be updated and > reflect, that error_limit is required for modern syntax. > > 2) However, why do we have to support old syntax here? I guess it exists > for backward compatibility only, but this is a completely new feature. > So maybe just 'WITH (error_limit 42)' will be enough? > > 3) You also may simply change internal option name from 'error_limit' to > 'error' or SQL keyword from 'ERROR' tot 'ERROR_LIMIT'. > > I would prefer the second option. > agreed and Done > > > Next, you use DestRemoteSimple for returning conflicting tuples back: > > + dest = CreateDestReceiver(DestRemoteSimple); > + dest->rStartup(dest, (int) CMD_SELECT, tupDesc); > > However, printsimple supports very limited subset of built-in types, so > > CREATE TABLE large_test (id integer primary key, num1 bigint, num2 > double precision); > COPY large_test FROM '/path/to/copy-test.tsv'; > COPY large_test FROM '/path/to/copy-test.tsv' ERROR 3; > > fails with following error 'ERROR: unsupported type OID: 701', which > seems to be very confusing from the end user perspective. I've tried to > switch to DestRemote, but couldn't figure it out quickly. > > fixed > > Finally, I simply cannot get into this validation: > > + else if (strcmp(defel->defname, "error_limit") == 0) > + { > + if (cstate->ignore_error) > + ereport(ERROR, > + (errcode(ERRCODE_SYNTAX_ERROR), > + errmsg("conflicting or redundant options"), > + parser_errposition(pstate, defel->location))); > + cstate->error_limit = defGetInt64(defel); > + cstate->ignore_error = true; > + if (cstate->error_limit == -1) > + cstate->ignore_all_error = true; > + } > > If cstate->ignore_error is defined, then we have already processed > options list, since this is the only one place, where it's set. So we > should never get into this ereport, doesn't it? > > yes the check only needed for modern syntax regards Surafel
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml index d9b7c4d0d4..ffcfe1e8d3 100644 --- a/doc/src/sgml/ref/copy.sgml +++ b/doc/src/sgml/ref/copy.sgml @@ -44,6 +44,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable FORCE_NOT_NULL ( <replaceable class="parameter">column_name</replaceable> [, ...] ) FORCE_NULL ( <replaceable class="parameter">column_name</replaceable> [, ...] ) ENCODING '<replaceable class="parameter">encoding_name</replaceable>' + ERROR_LIMIT '<replaceable class="parameter">limit_number</replaceable>' </synopsis> </refsynopsisdiv> @@ -355,6 +356,23 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable </listitem> </varlistentry> + <varlistentry> + <term><literal>ERROR_LIMIT</literal></term> + <listitem> + <para> + Specifies to return error record up to <replaceable + class="parameter">limit_number</replaceable> number. + specifying it to -1 returns all error record. + </para> + + <para> + Currently, only unique or exclusion constraint violation + and same record formatting error is ignored. + </para> + + </listitem> + </varlistentry> + <varlistentry> <term><literal>WHERE</literal></term> <listitem> diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index e17d8c760f..c2314480b2 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -24,6 +24,7 @@ #include "access/tableam.h" #include "access/xact.h" #include "access/xlog.h" +#include "access/printtup.h" #include "catalog/dependency.h" #include "catalog/pg_authid.h" #include "catalog/pg_type.h" @@ -48,7 +49,9 @@ #include "port/pg_bswap.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" +#include "storage/lmgr.h" #include "tcop/tcopprot.h" +#include "tcop/pquery.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -154,6 +157,7 @@ typedef struct CopyStateData List *convert_select; /* list of column names (can be NIL) */ bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ Node *whereClause; /* WHERE condition (or NULL) */ + int error_limit; /* total number of error to ignore */ /* these are just for error messages, see CopyFromErrorCallback */ const char *cur_relname; /* table name for error messages */ @@ -183,6 +187,9 @@ typedef struct CopyStateData bool volatile_defexprs; /* is any of defexprs volatile? */ List *range_table; ExprState *qualexpr; + bool ignore_error; /* is ignore error specified? */ + bool ignore_all_error; /* is error_limit -1 (ignore all error) + * specified? */ TransitionCaptureState *transition_capture; @@ -1290,6 +1297,18 @@ ProcessCopyOptions(ParseState *pstate, defel->defname), parser_errposition(pstate, defel->location))); } + else if (strcmp(defel->defname, "error_limit") == 0) + { + if (cstate->ignore_error) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); + cstate->error_limit = defGetInt64(defel); + cstate->ignore_error = true; + if (cstate->error_limit == -1) + cstate->ignore_all_error = true; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -1440,6 +1459,10 @@ ProcessCopyOptions(ParseState *pstate, ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CSV quote character must not appear in the NULL specification"))); + if (cstate->ignore_error && !cstate->is_copy_from) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ERROR LIMIT only available using COPY FROM"))); } /* @@ -2675,6 +2698,8 @@ CopyFrom(CopyState cstate) bool has_before_insert_row_trig; bool has_instead_insert_row_trig; bool leafpart_use_multi_insert = false; + DestReceiver *dest = NULL; + Portal portal = NULL; Assert(cstate->rel); @@ -2838,7 +2863,20 @@ CopyFrom(CopyState cstate) /* Verify the named relation is a valid target for INSERT */ CheckValidResultRel(resultRelInfo, CMD_INSERT); - ExecOpenIndices(resultRelInfo, false); + if (cstate->ignore_error) + { + TupleDesc tupDesc; + + ExecOpenIndices(resultRelInfo, true); + tupDesc = RelationGetDescr(cstate->rel); + + portal = GetPortalByName(""); + dest = CreateDestReceiver(DestRemote); + SetRemoteDestReceiverParams(dest, portal); + dest->rStartup(dest, (int) CMD_SELECT, tupDesc); + } + else + ExecOpenIndices(resultRelInfo, false); estate->es_result_relations = resultRelInfo; estate->es_num_result_relations = 1; @@ -2943,6 +2981,13 @@ CopyFrom(CopyState cstate) */ insertMethod = CIM_SINGLE; } + else if (cstate->ignore_error) + { + /* + * Can't support speculative insertion in multi-inserts. + */ + insertMethod = CIM_SINGLE; + } else { /* @@ -3286,6 +3331,63 @@ CopyFrom(CopyState cstate) */ myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); } + else if ((cstate->error_limit > 0 || cstate->ignore_all_error) && resultRelInfo->ri_NumIndices > 0) + { + /* Perform a speculative insertion. */ + uint32 specToken; + ItemPointerData conflictTid; + bool specConflict; + + /* + * Do a non-conclusive check for conflicts first. + */ + specConflict = false; + + if (!ExecCheckIndexConstraints(myslot, estate, &conflictTid, + NIL)) + { + (void) dest->receiveSlot(myslot, dest); + cstate->error_limit--; + continue; + } + + /* + * Acquire our speculative insertion lock". + */ + specToken = SpeculativeInsertionLockAcquire(GetCurrentTransactionId()); + + /* insert the tuple, with the speculative token */ + table_tuple_insert_speculative(resultRelInfo->ri_RelationDesc, myslot, + estate->es_output_cid, + 0, + NULL, + specToken); + + /* insert index entries for tuple */ + recheckIndexes = ExecInsertIndexTuples(myslot, estate, true, + &specConflict, + NIL); + + /* adjust the tuple's state accordingly */ + table_tuple_complete_speculative(resultRelInfo->ri_RelationDesc, myslot, + specToken, !specConflict); + + /* + * Wake up anyone waiting for our decision. + */ + SpeculativeInsertionLockRelease(GetCurrentTransactionId()); + + /* + * If there was a conflict, return it and preceded to + * the next record if there are any. + */ + if (specConflict) + { + (void) dest->receiveSlot(myslot, dest); + cstate->error_limit--; + continue; + } + } else { /* OK, store the tuple and create index entries for it */ @@ -3703,7 +3805,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, true, num_phys_attrs * sizeof(bool)); - +next_line: if (!cstate->binary) { char **field_strings; @@ -3718,9 +3820,21 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, /* check for overflowing fields */ if (attr_count > 0 && fldct > attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); + { + if (cstate->error_limit > 0 || cstate->ignore_all_error) + { + ereport(WARNING, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("skipping \"%s\" --- extra data after last expected column", + cstate->line_buf.data))); + cstate->error_limit--; + goto next_line; + } + else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + } fieldno = 0; @@ -3732,10 +3846,22 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, Form_pg_attribute att = TupleDescAttr(tupDesc, m); if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); + { + if (cstate->error_limit > 0 || cstate->ignore_all_error) + { + ereport(WARNING, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("skipping \"%s\" --- missing data for column \"%s\"", + cstate->line_buf.data, NameStr(att->attname)))); + cstate->error_limit--; + goto next_line; + } + else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for column \"%s\"", + NameStr(att->attname)))); + } string = field_strings[fieldno++]; if (cstate->convert_select_flags && @@ -3822,10 +3948,23 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, } if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); + { + if (cstate->error_limit > 0 || cstate->ignore_all_error) + { + ereport(WARNING, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("skipping \"%s\" --- row field count is %d, expected %d", + cstate->line_buf.data, (int) fld_count, attr_count))); + cstate->error_limit--; + goto next_line; + } + else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + + } i = 0; foreach(cur, cstate->attnumlist) diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out index c53ed3ebf5..37a77dcaa2 100644 --- a/src/test/regress/expected/copy2.out +++ b/src/test/regress/expected/copy2.out @@ -55,6 +55,11 @@ LINE 1: COPY x TO stdout WHERE a = 1; ^ COPY x from stdin WHERE a = 50004; COPY x from stdin WHERE a > 60003; +COPY x from stdin WITH(ERROR_LIMIT 5); +WARNING: skipping "70001 22 32" --- missing data for column "d" +WARNING: skipping "70002 23 33 43 53 54" --- extra data after last expected column +WARNING: skipping "70003 24 34 44" --- missing data for column "e" + COPY x from stdin WHERE f > 60003; ERROR: column "f" does not exist LINE 1: COPY x from stdin WHERE f > 60003; @@ -102,12 +107,14 @@ SELECT * FROM x; 50004 | 25 | 35 | 45 | before trigger fired 60004 | 25 | 35 | 45 | before trigger fired 60005 | 26 | 36 | 46 | before trigger fired + 70004 | 25 | 35 | 45 | before trigger fired + 70005 | 26 | 36 | 46 | before trigger fired 1 | 1 | stuff | test_1 | after trigger fired 2 | 2 | stuff | test_2 | after trigger fired 3 | 3 | stuff | test_3 | after trigger fired 4 | 4 | stuff | test_4 | after trigger fired 5 | 5 | stuff | test_5 | after trigger fired -(28 rows) +(30 rows) -- check copy out COPY x TO stdout; @@ -134,6 +141,8 @@ COPY x TO stdout; 50004 25 35 45 before trigger fired 60004 25 35 45 before trigger fired 60005 26 36 46 before trigger fired +70004 25 35 45 before trigger fired +70005 26 36 46 before trigger fired 1 1 stuff test_1 after trigger fired 2 2 stuff test_2 after trigger fired 3 3 stuff test_3 after trigger fired @@ -163,6 +172,8 @@ Delimiter before trigger fired 35 before trigger fired 35 before trigger fired 36 before trigger fired +35 before trigger fired +36 before trigger fired stuff after trigger fired stuff after trigger fired stuff after trigger fired @@ -192,6 +203,8 @@ I'm null before trigger fired 25 before trigger fired 25 before trigger fired 26 before trigger fired +25 before trigger fired +26 before trigger fired 1 after trigger fired 2 after trigger fired 3 after trigger fired diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql index 902f4fac19..2378f428fc 100644 --- a/src/test/regress/sql/copy2.sql +++ b/src/test/regress/sql/copy2.sql @@ -110,6 +110,14 @@ COPY x from stdin WHERE a > 60003; 60005 26 36 46 56 \. +COPY x from stdin WITH(ERROR_LIMIT 5); +70001 22 32 +70002 23 33 43 53 54 +70003 24 34 44 +70004 25 35 45 55 +70005 26 36 46 56 +\. + COPY x from stdin WHERE f > 60003; COPY x from stdin WHERE a = max(x.b);