On Mon, Dec 11, 2023 at 10:05 PM Alena Rybakina <lena.riback...@yandex.ru> wrote: > > Hi! Thank you for your work. Your patch looks better! > Yes, thank you! It works fine, and I see that the regression tests have been > passed. 🙂 > However, when I ran 'copy from with save_error' operation with simple csv > files (copy_test.csv, copy_test1.csv) for tables test, test1 (how I created > it, I described below): > > postgres=# create table test (x int primary key, y int not null); > postgres=# create table test1 (x int, z int, CONSTRAINT fk_x > FOREIGN KEY(x) > REFERENCES test(x)); > > I did not find a table with saved errors after operation, although I received > a log about it: > > postgres=# \copy test from '/home/alena/copy_test.csv' DELIMITER ',' CSV > save_error > NOTICE: 2 rows were skipped because of error. skipped row saved to table > public.test_error > ERROR: duplicate key value violates unique constraint "test_pkey" > DETAIL: Key (x)=(2) already exists. > CONTEXT: COPY test, line 3 > > postgres=# select * from public.test_error; > ERROR: relation "public.test_error" does not exist > LINE 1: select * from public.test_error; > > postgres=# \copy test1 from '/home/alena/copy_test1.csv' DELIMITER ',' CSV > save_error > NOTICE: 2 rows were skipped because of error. skipped row saved to table > public.test1_error > ERROR: insert or update on table "test1" violates foreign key constraint > "fk_x" > DETAIL: Key (x)=(2) is not present in table "test". > > postgres=# select * from public.test1_error; > ERROR: relation "public.test1_error" does not exist > LINE 1: select * from public.test1_error; > > Two lines were written correctly in the csv files, therefore they should have > been added to the tables, but they were not added to the tables test and > test1. > > If I leave only the correct rows, everything works fine and the rows are > added to the tables. > > in copy_test.csv: > > 2,0 > > 1,1 > > in copy_test1.csv: > > 2,0 > > 2,1 > > 1,1 > > postgres=# \copy test from '/home/alena/copy_test.csv' DELIMITER ',' CSV > COPY 2 > postgres=# \copy test1 from '/home/alena/copy_test1.csv' DELIMITER ',' CSV > save_error > NOTICE: No error happened.Error holding table public.test1_error will be > droped > COPY 3 > > Maybe I'm launching it the wrong way. If so, let me know about it.
looks like the above is about constraints violation while copying. constraints violation while copying not in the scope of this patch. Since COPY FROM is very like the INSERT command, you do want all the valid constraints to check all the copied rows? but the notice raised by the patch is not right. So I place the drop error saving table or raise notice logic above `ExecResetTupleTable(estate->es_tupleTable, false)` in the function CopyFrom. > > I also notice interesting behavior if the table was previously created by the > user. When I was creating an error_table before the 'copy from' operation, > I received a message saying that it is impossible to create a table with the > same name (it is shown below) during the 'copy from' operation. > I think you should add information about this in the documentation, since > this seems to be normal behavior to me. > doc changed. you may check it.
From 3024bf3b727b728c58dfef41c62d7a93c083b887 Mon Sep 17 00:00:00 2001 From: pgaddict <jian.universal...@gmail.com> Date: Tue, 12 Dec 2023 20:58:45 +0800 Subject: [PATCH v11 1/1] Make COPY FROM more error tolerant Currently COPY FROM has 3 types of error while processing the source file. * extra data after last expected column * missing data for column \"%s\" * data type conversion error. Instead of throwing errors while copying, save_error specifier will save errors to a error saving table automatically. We check the error saving table definition by column name and column data type. if table already exists and meets the criteria then errors will save to that table. if the table does not exist, then create one. Only works for COPY FROM, non-BINARY mode. While copying, if error never happened, error saving table will be dropped at the ending of COPY FROM. If the error saving table exists, meaning at least once COPY FROM errors has happened, then all the future errors will be saved to that table. We save the error related meta info to error saving table using SPI, that is construct a query string, then execute the query. --- contrib/file_fdw/file_fdw.c | 4 +- doc/src/sgml/ref/copy.sgml | 100 +++++++++++++- src/backend/commands/copy.c | 12 ++ src/backend/commands/copyfrom.c | 146 +++++++++++++++++++- src/backend/commands/copyfromparse.c | 169 +++++++++++++++++++++-- src/backend/parser/gram.y | 8 +- src/bin/psql/tab-complete.c | 3 +- src/include/commands/copy.h | 3 +- src/include/commands/copyfrom_internal.h | 7 + src/include/parser/kwlist.h | 1 + src/test/regress/expected/copy2.out | 135 ++++++++++++++++++ src/test/regress/sql/copy2.sql | 108 +++++++++++++++ 12 files changed, 676 insertions(+), 20 deletions(-) diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c index 2189be8a..2d3eb34f 100644 --- a/contrib/file_fdw/file_fdw.c +++ b/contrib/file_fdw/file_fdw.c @@ -751,7 +751,7 @@ fileIterateForeignScan(ForeignScanState *node) */ oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); found = NextCopyFrom(festate->cstate, econtext, - slot->tts_values, slot->tts_isnull); + slot->tts_values, slot->tts_isnull, NULL); if (found) ExecStoreVirtualTuple(slot); @@ -1183,7 +1183,7 @@ file_acquire_sample_rows(Relation onerel, int elevel, MemoryContextReset(tupcontext); MemoryContextSwitchTo(tupcontext); - found = NextCopyFrom(cstate, NULL, values, nulls); + found = NextCopyFrom(cstate, NULL, values, nulls, NULL); MemoryContextSwitchTo(oldcontext); diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml index 18ecc69c..fb303b4f 100644 --- a/doc/src/sgml/ref/copy.sgml +++ b/doc/src/sgml/ref/copy.sgml @@ -44,6 +44,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable FORCE_NOT_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * } FORCE_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * } ENCODING '<replaceable class="parameter">encoding_name</replaceable>' + SAVE_ERROR [ <replaceable class="parameter">boolean</replaceable> ] </synopsis> </refsynopsisdiv> @@ -411,6 +412,18 @@ WHERE <replaceable class="parameter">condition</replaceable> </listitem> </varlistentry> + <varlistentry> + <term><literal>SAVE_ERROR</literal></term> + <listitem> + <para> + Specifies that any data conversion errors while copying will automatically saved in an Error Saving table and the <command>COPY FROM</command> operation will not be interrupted by conversion errors. + This option is not allowed when using <literal>binary</literal> format. Note that this + is only supported in current <command>COPY FROM</command> syntax. + If this option is omitted, any data type conversion errors will be raised immediately. + </para> + </listitem> + </varlistentry> + </variablelist> </refsect1> @@ -564,6 +577,7 @@ COPY <replaceable class="parameter">count</replaceable> amount to a considerable amount of wasted disk space if the failure happened well into a large copy operation. You might wish to invoke <command>VACUUM</command> to recover the wasted space. + To continue copying while skip conversion errors in a <command>COPY FROM</command>, you might wish to specify <literal>SAVE_ERROR</literal>. </para> <para> @@ -572,6 +586,16 @@ COPY <replaceable class="parameter">count</replaceable> null strings to null values and unquoted null strings to empty strings. </para> + <para> + If the <literal>SAVE_ERROR</literal> option is specified and conversion errors occurred while copying, then + <productname>PostgreSQL</productname> will first try to create a regular Error Saving table to save all the conversion errors related information. + The Error Saving table naming rule is the existing table name concatenated with <literal>_error</literal>. + If <productname>PostgreSQL</productname> cannot create the Error Saving table, <command>COPY FROM</command> operation stops, an error is raised. + All the future errors while copying to the same table will automatically saved to the same Error Saving table. + Conversion errors includes data type conversion failure, extra data or missing data in the source file. + Error Saving table detailed description listed in <xref linkend="copy-errorsave-table"/>. + </para> + </refsect1> <refsect1> @@ -588,7 +612,7 @@ COPY <replaceable class="parameter">count</replaceable> output function, or acceptable to the input function, of each attribute's data type. The specified null string is used in place of columns that are null. - <command>COPY FROM</command> will raise an error if any line of the + By default, if <literal>SAVE_ERROR</literal> not specified, <command>COPY FROM</command> will raise an error if any line of the input file contains more or fewer columns than are expected. </para> @@ -962,6 +986,80 @@ versions of <productname>PostgreSQL</productname>. check against somehow getting out of sync with the data. </para> </refsect3> + + <refsect3> + <title>Error Save Table </title> + <para> + If <literal>SAVE_ERROR</literal> specified, all the data type conversion errors while copying will automatically saved in an Error Saving table. + <xref linkend="copy-errorsave-table"/> shows the Error Saving table's column name, data type, and description. + </para> + + <table id="copy-errorsave-table"> + <title>Error Saving table description </title> + + <tgroup cols="3"> + <thead> + <row> + <entry>Column name</entry> + <entry>Data type</entry> + <entry>Description</entry> + </row> + </thead> + + <tbody> + <row> + <entry> <literal>filename</literal> </entry> + <entry><type>text</type></entry> + <entry>The path name of the input file</entry> + </row> + + <row> + <entry> <literal>lineno</literal> </entry> + <entry><type>bigint</type></entry> + <entry>Line number where the error occurred, counting from 1</entry> + </row> + + <row> + <entry> <literal>line</literal> </entry> + <entry><type>text</type></entry> + <entry>Raw content of the error occurred line</entry> + </row> + + <row> + <entry> <literal>field</literal> </entry> + <entry><type>text</type></entry> + <entry>Field name of the error occurred</entry> + </row> + + <row> + <entry> <literal>source</literal> </entry> + <entry><type>text</type></entry> + <entry>Raw content of the error occurred field</entry> + </row> + + <row> + <entry> <literal>err_message </literal> </entry> + <entry><type>text</type></entry> + <entry>The error message text </entry> + </row> + + <row> + <entry> <literal>err_detail</literal> </entry> + <entry><type>text</type></entry> + <entry>Detailed error message </entry> + </row> + + <row> + <entry> <literal>errorcode </literal> </entry> + <entry><type>text</type></entry> + <entry>The error code for the copying error</entry> + </row> + + </tbody> + </tgroup> + </table> + </refsect3> + </refsect2> </refsect1> diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index cfad47b5..bc4af10a 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -419,6 +419,7 @@ ProcessCopyOptions(ParseState *pstate, bool format_specified = false; bool freeze_specified = false; bool header_specified = false; + bool save_error_specified = false; ListCell *option; /* Support external use for option sanity checking */ @@ -458,6 +459,13 @@ ProcessCopyOptions(ParseState *pstate, freeze_specified = true; opts_out->freeze = defGetBoolean(defel); } + else if (strcmp(defel->defname, "save_error") == 0) + { + if (save_error_specified) + errorConflictingDefElem(defel, pstate); + save_error_specified = true; + opts_out->save_error = defGetBoolean(defel); + } else if (strcmp(defel->defname, "delimiter") == 0) { if (opts_out->delim) @@ -598,6 +606,10 @@ ProcessCopyOptions(ParseState *pstate, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("cannot specify DEFAULT in BINARY mode"))); + if (opts_out->binary && opts_out->save_error) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot specify SAVE_ERROR in BINARY mode"))); /* Set defaults for omitted options */ if (!opts_out->delim) opts_out->delim = opts_out->csv_mode ? "," : "\t"; diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index f4861652..236d711b 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -38,6 +38,7 @@ #include "executor/executor.h" #include "executor/nodeModifyTable.h" #include "executor/tuptable.h" +#include "executor/spi.h" #include "foreign/fdwapi.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" @@ -652,10 +653,12 @@ CopyFrom(CopyFromState cstate) bool has_before_insert_row_trig; bool has_instead_insert_row_trig; bool leafpart_use_multi_insert = false; + StringInfo err_save_buf; Assert(cstate->rel); Assert(list_length(cstate->range_table) == 1); - + if (cstate->opts.save_error) + Assert(cstate->escontext); /* * The target must be a plain, foreign, or partitioned relation, or have * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only @@ -952,6 +955,7 @@ CopyFrom(CopyFromState cstate) errcallback.previous = error_context_stack; error_context_stack = &errcallback; + err_save_buf = makeStringInfo(); for (;;) { TupleTableSlot *myslot; @@ -989,9 +993,13 @@ CopyFrom(CopyFromState cstate) ExecClearTuple(myslot); /* Directly store the values/nulls array in the slot */ - if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull)) + if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull, err_save_buf)) break; + /* Soft error occured, skip this tuple. */ + if (cstate->opts.save_error && cstate->line_error_occured) + continue; + ExecStoreVirtualTuple(myslot); /* @@ -1297,6 +1305,48 @@ CopyFrom(CopyFromState cstate) ExecResetTupleTable(estate->es_tupleTable, false); + /* drop the error saving table or raise a notice */ + if (cstate->opts.save_error) + { + Assert(cstate->error_nsp && cstate->error_rel); + + if (cstate->error_rows_cnt > 0) + { + ereport(NOTICE, + errmsg("%llu rows were skipped because of conversion error." + " Skipped rows saved to table %s.%s", + (unsigned long long) cstate->error_rows_cnt, + cstate->error_nsp, cstate->error_rel)); + } + else + { + StringInfoData querybuf; + if (cstate->error_firsttime) + { + ereport(NOTICE, + errmsg("No conversion error happened. " + "Error Saving table %s.%s will be dropped", + cstate->error_nsp, cstate->error_rel)); + initStringInfo(&querybuf); + appendStringInfo(&querybuf, + "DROP TABLE IF EXISTS %s.%s CASCADE ", + cstate->error_nsp, cstate->error_rel); + + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed"); + if (SPI_execute(querybuf.data, false, 0) != SPI_OK_UTILITY) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + if (SPI_finish() != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed"); + } + else + ereport(NOTICE, + errmsg("No error happened. " + "All previouly encountered conversion errors saved at %s.%s", + cstate->error_nsp, cstate->error_rel)); + } + } + /* Allow the FDW to shut down */ if (target_resultRelInfo->ri_FdwRoutine != NULL && target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL) @@ -1444,6 +1494,98 @@ BeginCopyFrom(ParseState *pstate, } } + /* Set up soft error handler for SAVE_ERROR */ + if (cstate->opts.save_error) + { + char *err_nsp; + char error_rel[NAMEDATALEN]; + StringInfoData querybuf; + bool isnull; + bool error_table_ok; + + cstate->escontext = makeNode(ErrorSaveContext); + cstate->escontext->type = T_ErrorSaveContext; + cstate->escontext->details_wanted = true; + cstate->escontext->error_occurred = false; + + snprintf(error_rel, sizeof(error_rel), "%s", + RelationGetRelationName(cstate->rel)); + strlcat(error_rel,"_error", NAMEDATALEN); + err_nsp = get_namespace_name(RelationGetNamespace(cstate->rel)); + + initStringInfo(&querybuf); + /* + * + * Verify whether the err_nsp.error_rel table already exists, and if so, + * examine its column names and data types. + */ + appendStringInfo(&querybuf, + "SELECT (array_agg(pa.attname ORDER BY pa.attnum) " + "= '{ctid,filename,lineno,line,field,source,err_message,err_detail,errorcode}') " + "AND (ARRAY_AGG(pt.typname ORDER BY pa.attnum) " + "= '{tid,text,int8,text,text,text,text,text,text}') " + "FROM pg_catalog.pg_attribute pa " + "JOIN pg_catalog.pg_class pc ON pc.oid = pa.attrelid " + "JOIN pg_catalog.pg_type pt ON pt.oid = pa.atttypid " + "JOIN pg_catalog.pg_namespace pn " + "ON pn.oid = pc.relnamespace WHERE "); + + appendStringInfo(&querybuf, + "relname = $$%s$$ AND pn.nspname = $$%s$$ " + " AND pa.attnum >= -1 AND NOT attisdropped ", + error_rel, err_nsp); + + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed"); + + if (SPI_execute(querybuf.data, false, 0) != SPI_OK_SELECT) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + error_table_ok = DatumGetBool(SPI_getbinval(SPI_tuptable->vals[0], + SPI_tuptable->tupdesc, + 1, &isnull)); + + /* No err_nsp.error_rel table then create it for holding error. */ + if (isnull) + { + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, + "CREATE TABLE %s.%s (FILENAME TEXT, LINENO BIGINT, LINE TEXT, " + "FIELD TEXT, SOURCE TEXT, ERR_MESSAGE TEXT, " + "ERR_DETAIL TEXT, ERRORCODE TEXT)", + err_nsp,error_rel); + if (SPI_execute(querybuf.data, false, 0) != SPI_OK_UTILITY) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + cstate->error_firsttime = true; + } + else if (error_table_ok) + /* error save table already exists. Set error_firsttime to false */ + cstate->error_firsttime = false; + else if(!error_table_ok) + ereport(ERROR, + (errmsg("Error save table %s.%s already exists. " + "Cannot use it for COPY FROM error saving", + err_nsp, error_rel))); + + if (SPI_finish() != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed"); + + /* thses information is necessary, no error then drop err_sp.error_rel table*/ + cstate->error_rel = pstrdup(error_rel); + cstate->error_nsp = err_nsp; + } + else + { + /* set to NULL */ + cstate->error_rel = NULL; + cstate->error_nsp = NULL; + cstate->escontext = NULL; + } + + cstate->error_rows_cnt = 0; /* set the default to 0 */ + cstate->line_error_occured = false; /* default, assume conversion be ok. */ + /* Convert convert_selectively name list to per-column flags */ if (cstate->opts.convert_selectively) { diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index f5537345..aa168d3f 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -66,10 +66,12 @@ #include "commands/copyfrom_internal.h" #include "commands/progress.h" #include "executor/executor.h" +#include "executor/spi.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "mb/pg_wchar.h" #include "miscadmin.h" +#include "nodes/miscnodes.h" #include "pgstat.h" #include "port/pg_bswap.h" #include "utils/builtins.h" @@ -852,7 +854,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) */ bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext, - Datum *values, bool *nulls) + Datum *values, bool *nulls, StringInfo err_save_buf) { TupleDesc tupDesc; AttrNumber num_phys_attrs, @@ -885,11 +887,48 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) return false; + /* reset line_error_occured to false for next new line. */ + if (cstate->line_error_occured) + cstate->line_error_occured = false; + /* check for overflowing fields */ if (attr_count > 0 && fldct > attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); + { + if(cstate->opts.save_error) + { + char *errmsg_extra = "extra data after last expected column"; + + resetStringInfo(err_save_buf); + /* add line buf, etc for line have extra data to error save table*/ + appendStringInfo(err_save_buf, + "INSERT INTO %s.%s(filename, lineno,line, " + "err_message, errorcode) " + "SELECT $$%s$$, $$%llu$$::bigint, $$%s$$, $$%s$$, " + "$$%s$$", + cstate->error_nsp, cstate->error_rel, + cstate->filename ? cstate->filename : "STDIN", + (unsigned long long) cstate->cur_lineno, + cstate->line_buf.data, errmsg_extra, + unpack_sql_state(ERRCODE_BAD_COPY_FILE_FORMAT)); + + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed"); + + if (SPI_execute(err_save_buf->data, false, 0) != SPI_OK_INSERT) + elog(ERROR, "SPI_exec failed: %s", err_save_buf->data); + + if (SPI_finish() != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed"); + + cstate->line_error_occured = true; + cstate->error_rows_cnt++; + return true; + } + else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + } fieldno = 0; @@ -901,10 +940,46 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, Form_pg_attribute att = TupleDescAttr(tupDesc, m); if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); + { + if(cstate->opts.save_error) + { + char errmsg[128]; + snprintf(errmsg, sizeof(errmsg), + "missing data for column \"%s\"", + NameStr(att->attname)); + + resetStringInfo(err_save_buf); + appendStringInfo(err_save_buf, + "INSERT INTO %s.%s(filename,lineno,line, field, " + "err_message, errorcode) " + "SELECT $$%s$$, $$%llu$$::bigint, $$%s$$, $$%s$$, " + "$$%s$$, $$%s$$ ", + cstate->error_nsp, cstate->error_rel, + cstate->filename ? cstate->filename : "STDIN", + (unsigned long long) cstate->cur_lineno, + cstate->line_buf.data, NameStr(att->attname), errmsg, + unpack_sql_state(ERRCODE_BAD_COPY_FILE_FORMAT)); + + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed"); + + if (SPI_execute(err_save_buf->data, false, 0) != SPI_OK_INSERT) + elog(ERROR, "SPI_exec failed: %s", err_save_buf->data); + + if (SPI_finish() != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed"); + + cstate->line_error_occured = true; + cstate->error_rows_cnt++; + return true; + } + else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for column \"%s\"", + NameStr(att->attname)))); + } + string = field_strings[fieldno++]; if (cstate->convert_select_flags && @@ -956,15 +1031,85 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); } else - values[m] = InputFunctionCall(&in_functions[m], - string, - typioparams[m], - att->atttypmod); + { + /* + * + * InputFunctionCall is more faster than InputFunctionCallSafe. + * + */ + if(!cstate->opts.save_error) + { + values[m] = InputFunctionCall(&in_functions[m], + string, + typioparams[m], + att->atttypmod); + } + else + { + if (!InputFunctionCallSafe(&in_functions[m], + string, + typioparams[m], + att->atttypmod, + (Node *) cstate->escontext, + &values[m])) + { + char errcode[12]; + char *err_detail; + snprintf(errcode, sizeof(errcode), "%s", + unpack_sql_state(cstate->escontext->error_data->sqlerrcode)); + + if (!cstate->escontext->error_data->detail) + err_detail = NULL; + else + err_detail = cstate->escontext->error_data->detail; + + resetStringInfo(err_save_buf); + appendStringInfo(err_save_buf, + "INSERT INTO %s.%s(filename, lineno,line,field, " + "source, err_message, errorcode,err_detail) " + "SELECT $$%s$$, $$%llu$$::bigint, $$%s$$, $$%s$$, " + "$$%s$$, $$%s$$, $$%s$$, ", + cstate->error_nsp, cstate->error_rel, + cstate->filename ? cstate->filename : "STDIN", + (unsigned long long) cstate->cur_lineno, + cstate->line_buf.data, cstate->cur_attname, string, + cstate->escontext->error_data->message, + errcode); + + if (!err_detail) + appendStringInfo(err_save_buf, "NULL::text"); + else + appendStringInfo(err_save_buf,"$$%s$$", err_detail); + + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed"); + if (SPI_execute(err_save_buf->data, false, 0) != SPI_OK_INSERT) + elog(ERROR, "SPI_execute failed: %s", err_save_buf->data); + + if (SPI_finish() != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed"); + + /* line error occured, set it once per line */ + if (!cstate->line_error_occured) + cstate->line_error_occured = true; + /* reset ErrorSaveContext */ + cstate->escontext->error_occurred = false; + cstate->escontext->details_wanted = true; + memset(cstate->escontext->error_data,0, sizeof(ErrorData)); + } + } + } cstate->cur_attname = NULL; cstate->cur_attval = NULL; } + /* record error rows count. */ + if (cstate->line_error_occured) + { + cstate->error_rows_cnt++; + Assert(cstate->opts.save_error); + } Assert(fieldno == attr_count); } else diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index f16bbd3c..3a616ab5 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -755,7 +755,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); RESET RESTART RESTRICT RETURN RETURNING RETURNS REVOKE RIGHT ROLE ROLLBACK ROLLUP ROUTINE ROUTINES ROW ROWS RULE - SAVEPOINT SCALAR SCHEMA SCHEMAS SCROLL SEARCH SECOND_P SECURITY SELECT + SAVEPOINT SAVE_ERROR SCALAR SCHEMA SCHEMAS SCROLL SEARCH SECOND_P SECURITY SELECT SEQUENCE SEQUENCES SERIALIZABLE SERVER SESSION SESSION_USER SET SETS SETOF SHARE SHOW SIMILAR SIMPLE SKIP SMALLINT SNAPSHOT SOME SQL_P STABLE STANDALONE_P @@ -3448,6 +3448,10 @@ copy_opt_item: { $$ = makeDefElem("encoding", (Node *) makeString($2), @1); } + | SAVE_ERROR + { + $$ = makeDefElem("save_error", (Node *) makeBoolean(true), @1); + } ; /* The following exist for backward compatibility with very old versions */ @@ -17346,6 +17350,7 @@ unreserved_keyword: | ROWS | RULE | SAVEPOINT + | SAVE_ERROR | SCALAR | SCHEMA | SCHEMAS @@ -17954,6 +17959,7 @@ bare_label_keyword: | ROWS | RULE | SAVEPOINT + | SAVE_ERROR | SCALAR | SCHEMA | SCHEMAS diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 04980118..e6a358e0 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -2890,7 +2890,8 @@ psql_completion(const char *text, int start, int end) else if (Matches("COPY|\\copy", MatchAny, "FROM|TO", MatchAny, "WITH", "(")) COMPLETE_WITH("FORMAT", "FREEZE", "DELIMITER", "NULL", "HEADER", "QUOTE", "ESCAPE", "FORCE_QUOTE", - "FORCE_NOT_NULL", "FORCE_NULL", "ENCODING", "DEFAULT"); + "FORCE_NOT_NULL", "FORCE_NULL", "ENCODING", "DEFAULT", + "SAVE_ERROR"); /* Complete COPY <sth> FROM|TO filename WITH (FORMAT */ else if (Matches("COPY|\\copy", MatchAny, "FROM|TO", MatchAny, "WITH", "(", "FORMAT")) diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index f2cca0b9..de47791a 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -43,6 +43,7 @@ typedef struct CopyFormatOptions bool binary; /* binary format? */ bool freeze; /* freeze rows on loading? */ bool csv_mode; /* Comma Separated Value format? */ + bool save_error; /* save error to a table? */ CopyHeaderChoice header_line; /* header line? */ char *null_print; /* NULL marker string (server encoding!) */ int null_print_len; /* length of same */ @@ -82,7 +83,7 @@ extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *where bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options); extern void EndCopyFrom(CopyFromState cstate); extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext, - Datum *values, bool *nulls); + Datum *values, bool *nulls, StringInfo err_save_buf); extern bool NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields); extern void CopyFromErrorCallback(void *arg); diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index 5ec41589..dd41fcaa 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -16,6 +16,7 @@ #include "commands/copy.h" #include "commands/trigger.h" +#include "nodes/miscnodes.h" /* * Represents the different source cases we need to worry about at @@ -94,6 +95,12 @@ typedef struct CopyFromStateData * default value */ FmgrInfo *in_functions; /* array of input functions for each attrs */ Oid *typioparams; /* array of element types for in_functions */ + ErrorSaveContext *escontext; /* soft error trapper during in_functions execution */ + uint64 error_rows_cnt; /* total number of rows that have errors */ + const char *error_rel; /* the error row save table name */ + const char *error_nsp; /* the error row table's namespace */ + bool line_error_occured; /* does this line conversion error happened */ + bool error_firsttime; /* first time create error save table */ int *defmap; /* array of default att numbers related to * missing att */ ExprState **defexprs; /* array of default att expressions for all diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 5984dcfa..d0988a4c 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -377,6 +377,7 @@ PG_KEYWORD("routines", ROUTINES, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("row", ROW, COL_NAME_KEYWORD, BARE_LABEL) PG_KEYWORD("rows", ROWS, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("rule", RULE, UNRESERVED_KEYWORD, BARE_LABEL) +PG_KEYWORD("save_error", SAVE_ERROR, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("savepoint", SAVEPOINT, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("scalar", SCALAR, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("schema", SCHEMA, UNRESERVED_KEYWORD, BARE_LABEL) diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out index c4178b9c..aa1398d7 100644 --- a/src/test/regress/expected/copy2.out +++ b/src/test/regress/expected/copy2.out @@ -564,6 +564,116 @@ ERROR: conflicting or redundant options LINE 1: ... b, c) FROM STDIN WITH (FORMAT csv, FORCE_NULL *, FORCE_NULL... ^ ROLLBACK; +-- +-- tests for SAVE_ERROR option with force_not_null, force_null +\pset null NULL +CREATE TABLE save_error_csv( + a INT NOT NULL, + b TEXT NOT NULL, + c TEXT, + d TEXT +); +--- copy success, error save table will be dropped automatically. +COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error); +NOTICE: No conversion error happened. Error Saving table public.save_error_csv_error will be dropped +--error TABLE should already droppped. +select count(*) as expected_zero from pg_class where relname = 'save_error_csv_error'; + expected_zero +--------------- + 0 +(1 row) + +--save_error not allowed in binary mode +COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error,FORMAT binary); +ERROR: cannot specify SAVE_ERROR in BINARY mode +create table save_error_csv_error(); +--should fail. since table save_error_csv_error already exists. +--error save table naming logic = copy destination tablename + "_error" +COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error); +ERROR: Error save table public.save_error_csv_error already exists. Cannot use it for COPY FROM error saving +DROP TABLE save_error_csv_error; +-- save error with extra data +COPY save_error_csv from stdin(save_error); +NOTICE: 1 rows were skipped because of conversion error. Skipped rows saved to table public.save_error_csv_error +-- save error with missing data for column +COPY save_error_csv from stdin(save_error); +NOTICE: 1 rows were skipped because of conversion error. Skipped rows saved to table public.save_error_csv_error +--with FORCE_NOT_NULL and FORCE_NULL. +COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NOT_NULL(b), FORCE_NULL(c)); +NOTICE: 2 rows were skipped because of conversion error. Skipped rows saved to table public.save_error_csv_error +SELECT *, b is null as b_null, b = '' as b_empty FROM save_error_csv; + a | b | c | d | b_null | b_empty +---+---+------+------+--------+--------- + 2 | | NULL | NULL | f | t +(1 row) + +SELECT * FROM save_error_csv_error; + filename | lineno | line | field | source | err_message | err_detail | errorcode +----------+--------+----------------------------------------------------+-------+--------+---------------------------------------------+------------+----------- + STDIN | 1 | 2002 232 40 50 60 70 80 | NULL | NULL | extra data after last expected column | NULL | 22P04 + STDIN | 1 | 2000 230 23 | d | NULL | missing data for column "d" | NULL | 22P04 + STDIN | 1 | z,,"" | a | z | invalid input syntax for type integer: "z" | NULL | 22P02 + STDIN | 2 | \0,, | a | \0 | invalid input syntax for type integer: "\0" | NULL | 22P02 +(4 rows) + +DROP TABLE save_error_csv, save_error_csv_error; +CREATE TABLE check_ign_err (n int, m int[], k bigint, l text); +COPY check_ign_err FROM STDIN WITH (save_error); +NOTICE: 8 rows were skipped because of conversion error. Skipped rows saved to table public.check_ign_err_error +--special case. will work,but the error TABLE should not DROP. +COPY check_ign_err FROM STDIN WITH (save_error, format csv, FORCE_NULL *); +NOTICE: No error happened. All previouly encountered conversion errors saved at public.check_ign_err_error +--expect error TABLE exists +SELECT * FROM check_ign_err_error; + filename | lineno | line | field | source | err_message | err_detail | errorcode +----------+--------+--------------------------------------------+-------+-------------------------+-----------------------------------------------------------------+---------------------------+----------- + STDIN | 2 | \n {1} 1 \- | n | +| invalid input syntax for type integer: " +| NULL | 22P02 + | | | | | " | | + STDIN | 3 | a {2} 2 \r | n | a | invalid input syntax for type integer: "a" | NULL | 22P02 + STDIN | 4 | 3 {\3} 3333333333 \n | m | {\x03} | invalid input syntax for type integer: "\x03" | NULL | 22P02 + STDIN | 5 | 0x11 {3,} 3333333333 \\. | m | {3,} | malformed array literal: "{3,}" | Unexpected "}" character. | 22P02 + STDIN | 6 | d {3,1/} 3333333333 \\0 | n | d | invalid input syntax for type integer: "d" | NULL | 22P02 + STDIN | 6 | d {3,1/} 3333333333 \\0 | m | {3,1/} | invalid input syntax for type integer: "1/" | NULL | 22P02 + STDIN | 7 | e {3,\1} -3323879289873933333333 \n | n | e | invalid input syntax for type integer: "e" | NULL | 22P02 + STDIN | 7 | e {3,\1} -3323879289873933333333 \n | m | {3,\x01} | invalid input syntax for type integer: "\x01" | NULL | 22P02 + STDIN | 7 | e {3,\1} -3323879289873933333333 \n | k | -3323879289873933333333 | value "-3323879289873933333333" is out of range for type bigint | NULL | 22003 + STDIN | 8 | f {3,1} 3323879289873933333333 \r | n | f | invalid input syntax for type integer: "f" | NULL | 22P02 + STDIN | 8 | f {3,1} 3323879289873933333333 \r | k | 3323879289873933333333 | value "3323879289873933333333" is out of range for type bigint | NULL | 22003 + STDIN | 9 | b {a, 4} 1.1 h | n | b | invalid input syntax for type integer: "b" | NULL | 22P02 + STDIN | 9 | b {a, 4} 1.1 h | m | {a, 4} | invalid input syntax for type integer: "a" | NULL | 22P02 + STDIN | 9 | b {a, 4} 1.1 h | k | 1.1 | invalid input syntax for type bigint: "1.1" | NULL | 22P02 +(14 rows) + +-- redundant options not allowed. +COPY check_ign_err FROM STDIN WITH (save_error, save_error off); +ERROR: conflicting or redundant options +LINE 1: COPY check_ign_err FROM STDIN WITH (save_error, save_error o... + ^ +DROP TABLE check_ign_err CASCADE; +DROP TABLE IF EXISTS check_ign_err_error CASCADE; +--(type textrange was already made in test_setup.sql) +--using textrange doing test +CREATE TABLE textrange_input(a textrange, b textrange, c textrange); +COPY textrange_input(a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NULL *); +NOTICE: 4 rows were skipped because of conversion error. Skipped rows saved to table public.textrange_input_error +SELECT * FROM textrange_input_error; + filename | lineno | line | field | source | err_message | err_detail | errorcode +----------+--------+----------------------------+-------+----------+-------------------------------------------------------------------+------------------------------------------+----------- + STDIN | 1 | ,-[a\","z),[a","-inf) | b | -[a\,z) | malformed range literal: "-[a\,z)" | Missing left parenthesis or bracket. | 22P02 + STDIN | 1 | ,-[a\","z),[a","-inf) | c | [a,-inf) | range lower bound must be less than or equal to range upper bound | NULL | 22000 + STDIN | 2 | (",a),(",",a),()",a) | a | (,a),( | malformed range literal: "(,a),(" | Junk after right parenthesis or bracket. | 22P02 + STDIN | 2 | (",a),(",",a),()",a) | b | ,a),() | malformed range literal: ",a),()" | Missing left parenthesis or bracket. | 22P02 + STDIN | 2 | (",a),(",",a),()",a) | c | a) | malformed range literal: "a)" | Missing left parenthesis or bracket. | 22P02 + STDIN | 3 | (a",")),(]","a),(a","]) | a | (a,)) | malformed range literal: "(a,))" | Junk after right parenthesis or bracket. | 22P02 + STDIN | 3 | (a",")),(]","a),(a","]) | b | (],a) | malformed range literal: "(],a)" | Missing comma after lower bound. | 22P02 + STDIN | 3 | (a",")),(]","a),(a","]) | c | (a,]) | malformed range literal: "(a,])" | Junk after right parenthesis or bracket. | 22P02 + STDIN | 4 | [z","a],[z","2],[(","",")] | a | [z,a] | range lower bound must be less than or equal to range upper bound | NULL | 22000 + STDIN | 4 | [z","a],[z","2],[(","",")] | b | [z,2] | range lower bound must be less than or equal to range upper bound | NULL | 22000 + STDIN | 4 | [z","a],[z","2],[(","",")] | c | [(,",)] | malformed range literal: "[(,",)]" | Unexpected end of input. | 22P02 +(11 rows) + +DROP TABLE textrange_input; +DROP TABLE textrange_input_error; \pset null '' -- test case with whole-row Var in a check constraint create table check_con_tbl (f1 int); @@ -822,3 +932,28 @@ truncate copy_default; -- DEFAULT cannot be used in COPY TO copy (select 1 as test) TO stdout with (default '\D'); ERROR: COPY DEFAULT only available using COPY FROM +-- DEFAULT WITH SAVE_ERROR. +create table copy_default_error_save ( + id integer, + text_value text not null default 'test', + ts_value timestamp without time zone not null default '2022-07-05' +); +copy copy_default_error_save from stdin with (save_error, default '\D'); +NOTICE: 3 rows were skipped because of conversion error. Skipped rows saved to table public.copy_default_error_save_error +select count(*) as expect_zero from copy_default_error_save; + expect_zero +------------- + 0 +(1 row) + +select * from copy_default_error_save_error; + filename | lineno | line | field | source | err_message | err_detail | errorcode +----------+--------+----------------------------------+----------+------------------+-------------------------------------------------------------+------------+----------- + STDIN | 1 | k value '2022-07-04' | id | k | invalid input syntax for type integer: "k" | | 22P02 + STDIN | 2 | z \D '2022-07-03ASKL' | id | z | invalid input syntax for type integer: "z" | | 22P02 + STDIN | 2 | z \D '2022-07-03ASKL' | ts_value | '2022-07-03ASKL' | invalid input syntax for type timestamp: "'2022-07-03ASKL'" | | 22007 + STDIN | 3 | s \D \D | id | s | invalid input syntax for type integer: "s" | | 22P02 +(4 rows) + +drop table copy_default_error_save_error,copy_default_error_save; +truncate copy_default; diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql index a5486f60..3f43ce75 100644 --- a/src/test/regress/sql/copy2.sql +++ b/src/test/regress/sql/copy2.sql @@ -374,6 +374,98 @@ BEGIN; COPY forcetest (a, b, c) FROM STDIN WITH (FORMAT csv, FORCE_NULL *, FORCE_NULL(b)); ROLLBACK; +-- +-- tests for SAVE_ERROR option with force_not_null, force_null +\pset null NULL +CREATE TABLE save_error_csv( + a INT NOT NULL, + b TEXT NOT NULL, + c TEXT, + d TEXT +); + +--- copy success, error save table will be dropped automatically. +COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error); +\. + +--error TABLE should already droppped. +select count(*) as expected_zero from pg_class where relname = 'save_error_csv_error'; + +--save_error not allowed in binary mode +COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error,FORMAT binary); +create table save_error_csv_error(); +--should fail. since table save_error_csv_error already exists. +--error save table naming logic = copy destination tablename + "_error" +COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error); + +DROP TABLE save_error_csv_error; + +-- save error with extra data +COPY save_error_csv from stdin(save_error); +2002 232 40 50 60 70 80 +\. + +-- save error with missing data for column +COPY save_error_csv from stdin(save_error); +2000 230 23 +\. + +--with FORCE_NOT_NULL and FORCE_NULL. +COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NOT_NULL(b), FORCE_NULL(c)); +z,,"" +\0,, +2,, +\. + +SELECT *, b is null as b_null, b = '' as b_empty FROM save_error_csv; + +SELECT * FROM save_error_csv_error; + +DROP TABLE save_error_csv, save_error_csv_error; + + +CREATE TABLE check_ign_err (n int, m int[], k bigint, l text); +COPY check_ign_err FROM STDIN WITH (save_error); +1 {1} 1 1 +\n {1} 1 \- +a {2} 2 \r +3 {\3} 3333333333 \n +0x11 {3,} 3333333333 \\. +d {3,1/} 3333333333 \\0 +e {3,\1} -3323879289873933333333 \n +f {3,1} 3323879289873933333333 \r +b {a, 4} 1.1 h +5 {5} 5 \\ +\. + +--special case. will work,but the error TABLE should not DROP. +COPY check_ign_err FROM STDIN WITH (save_error, format csv, FORCE_NULL *); +,,, +\. + +--expect error TABLE exists +SELECT * FROM check_ign_err_error; + +-- redundant options not allowed. +COPY check_ign_err FROM STDIN WITH (save_error, save_error off); + +DROP TABLE check_ign_err CASCADE; +DROP TABLE IF EXISTS check_ign_err_error CASCADE; + +--(type textrange was already made in test_setup.sql) +--using textrange doing test +CREATE TABLE textrange_input(a textrange, b textrange, c textrange); +COPY textrange_input(a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NULL *); +,-[a\","z),[a","-inf) +(",a),(",",a),()",a) +(a",")),(]","a),(a","]) +[z","a],[z","2],[(","",")] +\. + +SELECT * FROM textrange_input_error; +DROP TABLE textrange_input; +DROP TABLE textrange_input_error; + \pset null '' -- test case with whole-row Var in a check constraint @@ -609,3 +701,19 @@ truncate copy_default; -- DEFAULT cannot be used in COPY TO copy (select 1 as test) TO stdout with (default '\D'); + +-- DEFAULT WITH SAVE_ERROR. +create table copy_default_error_save ( + id integer, + text_value text not null default 'test', + ts_value timestamp without time zone not null default '2022-07-05' +); +copy copy_default_error_save from stdin with (save_error, default '\D'); +k value '2022-07-04' +z \D '2022-07-03ASKL' +s \D \D +\. +select count(*) as expect_zero from copy_default_error_save; +select * from copy_default_error_save_error; +drop table copy_default_error_save_error,copy_default_error_save; +truncate copy_default; \ No newline at end of file -- 2.34.1