Hellow hackers, A few commitfest ago there was same effort to add errors handling to COPY FROM[1] and i see there that we already have infrastructure for supporting handling of unique violation or exclusion constraint violation error and I think it is independently useful too. Attached is a patch to do that.
In order to prevent extreme condition the patch also add a new GUC variable called copy_max_error_limit that control the amount of error to swallow before start to error and new failed record file options for copy to write a failed record so the user can examine it. With the new option COPY FROM can be specified like: COPY table_name [ ( column_name [, ...] ) ] FROM { 'filename' | PROGRAM 'command' | STDIN }[ON CONFLICT IGNORE failed_record_filename] [ [ WITH ] ( option [, ...] ) ] [1]. https://www.postgresql.org/message-id/flat/7179f2fd-49ce-4093-ae14-1b26c5dfb...@gmail.com Comment? Regards Surafel
diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c index 2cf09aecf6..ba63624eac 100644 --- a/contrib/file_fdw/file_fdw.c +++ b/contrib/file_fdw/file_fdw.c @@ -676,6 +676,8 @@ fileBeginForeignScan(ForeignScanState *node, int eflags) cstate = BeginCopyFrom(NULL, node->ss.ss_currentRelation, filename, + NULL, + false, is_program, NULL, NIL, @@ -752,6 +754,8 @@ fileReScanForeignScan(ForeignScanState *node) festate->cstate = BeginCopyFrom(NULL, node->ss.ss_currentRelation, festate->filename, + NULL, + false, festate->is_program, NULL, NIL, @@ -1117,7 +1121,7 @@ file_acquire_sample_rows(Relation onerel, int elevel, /* * Create CopyState from FDW options. */ - cstate = BeginCopyFrom(NULL, onerel, filename, is_program, NULL, NIL, + cstate = BeginCopyFrom(NULL, onerel, filename, NULL, false, is_program, NULL, NIL, options); /* diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index bee4afbe4e..edd96f4711 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -7129,6 +7129,19 @@ SET XML OPTION { DOCUMENT | CONTENT }; </listitem> </varlistentry> + <varlistentry id="guc-copy-maximum-error-limit" xreflabel="copy_maximum_error_limit"> + <term><varname>copy_maximum_error_limit</varname> (<type>integer</type>) + <indexterm> + <primary><varname>copy_maximum_error_limit</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Specifies the maximum number of ignored unique or exclusion constraint violation error + on COPY FROM before starting to error.The default is 100. + </para> + </listitem> + </varlistentry> </variablelist> </sect2> <sect2 id="runtime-config-client-format"> diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml index 13a8b68d95..f3639b3b48 100644 --- a/doc/src/sgml/ref/copy.sgml +++ b/doc/src/sgml/ref/copy.sgml @@ -24,6 +24,7 @@ PostgreSQL documentation <synopsis> COPY <replaceable class="parameter">table_name</replaceable> [ ( <replaceable class="parameter">column_name</replaceable> [, ...] ) ] FROM { '<replaceable class="parameter">filename</replaceable>' | PROGRAM '<replaceable class="parameter">command</replaceable>' | STDIN } + [ON CONFLICT IGNORE '<replaceable class="parameter">failed_record_filename</replaceable>'] [ [ WITH ] ( <replaceable class="parameter">option</replaceable> [, ...] ) ] COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable class="parameter">column_name</replaceable> [, ...] ) ] | ( <replaceable class="parameter">query</replaceable> ) } @@ -167,6 +168,28 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable </listitem> </varlistentry> + <varlistentry> + <term><literal>ON CONFLICT IGNORE</literal></term> + <listitem> + <para> + specifies ignore to error on a raising unique or exclusion constraint violation + up to configured amount .Instead write the error record to failed record file and + precede to the next record + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><replaceable class="parameter">failed_record_filename</replaceable></term> + <listitem> + <para> + The path name of the failed record file. failed record file name can be an absolute or + relative path. Windows users might need to use an E'' string and double any backslashes + used in the path name. + </para> + </listitem> + </varlistentry> + <varlistentry> <term><literal>STDOUT</literal></term> <listitem> diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 9bc67ce60f..b91335d783 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -43,6 +43,7 @@ #include "port/pg_bswap.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" +#include "storage/lmgr.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -118,6 +119,7 @@ typedef struct CopyStateData int file_encoding; /* file or remote side's character encoding */ bool need_transcoding; /* file encoding diff from server? */ bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ + FILE *failed_rec_file; /* parameters from the COPY command */ Relation rel; /* relation to copy to or from */ @@ -147,6 +149,8 @@ typedef struct CopyStateData bool convert_selectively; /* do selective binary conversion? */ List *convert_select; /* list of column names (can be NIL) */ bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ + char *failed_rec_filename; + bool ignore_conflict; /* these are just for error messages, see CopyFromErrorCallback */ const char *cur_relname; /* table name for error messages */ @@ -995,7 +999,8 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, PreventCommandIfReadOnly("COPY FROM"); PreventCommandIfParallelMode("COPY FROM"); - cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program, + cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->failed_rec_filename, + stmt->ignore_error, stmt->is_program, NULL, stmt->attlist, stmt->options); *processed = CopyFrom(cstate); /* copy from file to database */ EndCopyFrom(cstate); @@ -1749,6 +1754,11 @@ EndCopy(CopyState cstate) (errcode_for_file_access(), errmsg("could not close file \"%s\": %m", cstate->filename))); + if (cstate->failed_rec_filename != NULL && FreeFile(cstate->failed_rec_file)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", + cstate->failed_rec_filename))); } MemoryContextDelete(cstate->copycontext); @@ -2342,6 +2352,7 @@ CopyFrom(CopyState cstate) uint64 lastPartitionSampleLineNo = 0; uint64 nPartitionChanges = 0; double avgTuplesPerPartChange = 0; + int ignore_limit = 0; Assert(cstate->rel); @@ -2460,7 +2471,8 @@ CopyFrom(CopyState cstate) hi_options |= HEAP_INSERT_FROZEN; } - + if (cstate->ignore_conflict) + ignore_limit = copy_maximum_error_limit; /* * We need a ResultRelInfo so we can use the regular executor's * index-entry-making machinery. (There used to be a huge amount of code @@ -2579,6 +2591,10 @@ CopyFrom(CopyState cstate) */ insertMethod = CIM_SINGLE; } + else if (cstate->ignore_conflict) + { + insertMethod = CIM_SINGLE; + } else { /* @@ -2968,12 +2984,59 @@ CopyFrom(CopyState cstate) */ tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); } + else if (cstate->ignore_conflict && ignore_limit > 0) + { + bool specConflict; + uint32 specToken; + specConflict = false; + + specToken = SpeculativeInsertionLockAcquire(GetCurrentTransactionId()); + HeapTupleHeaderSetSpeculativeToken(tuple->t_data, specToken); + + /* insert the tuple, with the speculative token */ + heap_insert(resultRelInfo->ri_RelationDesc, tuple, + estate->es_output_cid, + HEAP_INSERT_SPECULATIVE, + NULL); + + /* insert index entries for tuple */ + recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), + estate, true, &specConflict, + NIL); + + /* adjust the tuple's state accordingly */ + if (!specConflict) + { + heap_finish_speculative(resultRelInfo->ri_RelationDesc, tuple); + processed++; + } + else + { + heap_abort_speculative(resultRelInfo->ri_RelationDesc, tuple); +#ifndef WIN32 + appendStringInfoCharMacro(&cstate->line_buf, '\n'); +#else + appendBinaryStringInfo(&cstate->cstate->line_buf, "\r\n", strlen("\r\n")); +#endif + fwrite(cstate->line_buf.data, 1, cstate->line_buf.len, cstate->failed_rec_file); + ignore_limit--; + + } + + /* + * Wake up anyone waiting for our decision. They will re-check + * the tuple, see that it's no longer speculative, and wait on our + * XID as if this was a regularly inserted tuple all along. + */ + SpeculativeInsertionLockRelease(GetCurrentTransactionId()); + + } else heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid, hi_options, bistate); /* And create index entries for it */ - if (resultRelInfo->ri_NumIndices > 0) + if (resultRelInfo->ri_NumIndices > 0 && ignore_limit == 0) recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), estate, @@ -2994,7 +3057,8 @@ CopyFrom(CopyState cstate) * or FDW; this is the same definition used by nodeModifyTable.c * for counting tuples inserted by an INSERT command. */ - processed++; + if(!cstate->ignore_conflict) + processed++; } } @@ -3166,6 +3230,8 @@ CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename, + const char *failedfilename, + bool ignore_error, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, @@ -3285,6 +3351,50 @@ BeginCopyFrom(ParseState *pstate, cstate->volatile_defexprs = volatile_defexprs; cstate->num_defaults = num_defaults; cstate->is_program = is_program; + cstate->ignore_conflict = ignore_error; + + if (failedfilename) + { + mode_t oumask; /* Pre-existing umask value */ + struct stat st; + cstate->failed_rec_filename = pstrdup(failedfilename); + /* + * Prevent write to relative path ... too easy to shoot oneself in + * the foot by overwriting a database file ... + */ + if (!is_absolute_path(failedfilename)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_NAME), + errmsg("relative path not allowed for failed record file"))); + oumask = umask(S_IWGRP | S_IWOTH); + PG_TRY(); + { + cstate->failed_rec_file = AllocateFile(cstate->failed_rec_filename, PG_BINARY_W); + } + PG_CATCH(); + { + umask(oumask); + PG_RE_THROW(); + } + PG_END_TRY(); + umask(oumask); + if (cstate->failed_rec_file == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" for writing: %m", + cstate->failed_rec_filename))); + + if (fstat(fileno(cstate->failed_rec_file), &st)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", + cstate->failed_rec_filename))); + + if (S_ISDIR(st.st_mode)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a directory", cstate->failed_rec_filename))); + } if (data_source_cb) { diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 7c8220cf65..7120913cd8 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -3312,6 +3312,8 @@ _copyCopyStmt(const CopyStmt *from) COPY_SCALAR_FIELD(is_program); COPY_STRING_FIELD(filename); COPY_NODE_FIELD(options); + COPY_SCALAR_FIELD(ignore_error); + COPY_STRING_FIELD(failed_rec_filename); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 378f2facb8..f1cbee2074 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -1221,6 +1221,8 @@ _equalCopyStmt(const CopyStmt *a, const CopyStmt *b) COMPARE_SCALAR_FIELD(is_program); COMPARE_STRING_FIELD(filename); COMPARE_NODE_FIELD(options); + COMPARE_SCALAR_FIELD(ignore_error); + COMPARE_STRING_FIELD(failed_rec_filename); return true; } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 87f5e95827..d8ff7fbef7 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -335,7 +335,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <defelt> event_trigger_when_item %type <chr> enable_trigger -%type <str> copy_file_name +%type <str> copy_file_name failed_rec_filename database_name access_method_clause access_method attr_name name cursor_name file_name index_name opt_index_name cluster_index_specification @@ -442,7 +442,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <boolean> opt_freeze opt_analyze opt_default opt_recheck %type <defelt> opt_binary opt_oids copy_delimiter -%type <boolean> copy_from opt_program +%type <boolean> copy_from opt_program ignore_conflict %type <ival> opt_column event cursor_options opt_hold opt_set_data %type <objtype> drop_type_any_name drop_type_name drop_type_name_on_any_name @@ -639,7 +639,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); HANDLER HAVING HEADER_P HOLD HOUR_P - IDENTITY_P IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IMPORT_P IN_P INCLUDE + IDENTITY_P IF_P IGNORE ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IMPORT_P IN_P INCLUDE INCLUDING INCREMENT INDEX INDEXES INHERIT INHERITS INITIALLY INLINE_P INNER_P INOUT INPUT_P INSENSITIVE INSERT INSTEAD INT_P INTEGER INTERSECT INTERVAL INTO INVOKER IS ISNULL ISOLATION @@ -2970,7 +2970,7 @@ ClosePortalStmt: *****************************************************************************/ CopyStmt: COPY opt_binary qualified_name opt_column_list opt_oids - copy_from opt_program copy_file_name copy_delimiter opt_with copy_options + copy_from opt_program copy_file_name ignore_conflict failed_rec_filename copy_delimiter opt_with copy_options { CopyStmt *n = makeNode(CopyStmt); n->relation = $3; @@ -2979,6 +2979,8 @@ CopyStmt: COPY opt_binary qualified_name opt_column_list opt_oids n->is_from = $6; n->is_program = $7; n->filename = $8; + n->ignore_error = $9; + n->failed_rec_filename = $10; if (n->is_program && n->filename == NULL) ereport(ERROR, @@ -2986,16 +2988,32 @@ CopyStmt: COPY opt_binary qualified_name opt_column_list opt_oids errmsg("STDIN/STDOUT not allowed with PROGRAM"), parser_errposition(@8))); + if (!n->is_from && n->ignore_error) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ON CONFLICT IGNORE not allowed in COPY TO"), + parser_errposition(@9))); + if (n->ignore_error && n->failed_rec_filename == NULL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("Failed record file name must be specified"))); + if (!n->ignore_error && n->failed_rec_filename != NULL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("Failed record file can not be specified without ON CONFLICT IGNORE"), + + parser_errposition(@10))); + n->options = NIL; /* Concatenate user-supplied flags */ if ($2) n->options = lappend(n->options, $2); if ($5) n->options = lappend(n->options, $5); - if ($9) - n->options = lappend(n->options, $9); if ($11) - n->options = list_concat(n->options, $11); + n->options = lappend(n->options, $11); + if ($13) + n->options = list_concat(n->options, $13); $$ = (Node *)n; } | COPY '(' PreparableStmt ')' TO opt_program copy_file_name opt_with copy_options @@ -3040,6 +3058,16 @@ copy_file_name: | STDOUT { $$ = NULL; } ; +ignore_conflict: + ON CONFLICT IGNORE { $$ = true; } + | /* EMPTY */ { $$ = false; } + ; + +failed_rec_filename: + Sconst { $$ = $1; } + | /* EMPTY */ { $$ = NULL; } + ; + copy_options: copy_opt_list { $$ = $1; } | '(' copy_generic_opt_list ')' { $$ = $2; } ; @@ -15103,6 +15131,7 @@ unreserved_keyword: | HOUR_P | IDENTITY_P | IF_P + | IGNORE | IMMEDIATE | IMMUTABLE | IMPLICIT_P diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index acc6498567..55b9ce0139 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -798,7 +798,7 @@ copy_table(Relation rel) addRangeTableEntryForRelation(pstate, rel, NULL, false, false); attnamelist = make_copy_attnamelist(relmapentry); - cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL); + cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, false, copy_read_data, attnamelist, NIL); /* Do the copy */ (void) CopyFrom(cstate); diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index c5ba149996..c714f5906f 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -482,6 +482,7 @@ char *application_name; int tcp_keepalives_idle; int tcp_keepalives_interval; int tcp_keepalives_count; +int copy_maximum_error_limit; /* * SSL renegotiation was been removed in PostgreSQL 9.5, but we tolerate it @@ -3064,6 +3065,17 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"copy_maximum_error_limit", PGC_USERSET, CLIENT_CONN_STATEMENT, + gettext_noop("Maximum number of ignored unique or exclusion constraint violation error on COPY FROM."), + NULL, + INT_MAX + }, + ©_maximum_error_limit, + 100, 0, INT_MAX, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index f393e7e73d..ee976d3cda 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -19,6 +19,7 @@ #include "parser/parse_node.h" #include "tcop/dest.h" +extern int copy_maximum_error_limit; /* CopyStateData is private in commands/copy.c */ typedef struct CopyStateData *CopyState; typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); @@ -28,8 +29,9 @@ extern void DoCopy(ParseState *state, const CopyStmt *stmt, uint64 *processed); extern void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options); -extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename, - bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options); +extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename, const char *faild_rec_filename, + bool is_program, bool ignore_error, copy_data_source_cb data_source_cb, List *attnamelist, + List *options); extern void EndCopyFrom(CopyState cstate); extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext, Datum *values, bool *nulls, Oid *tupleOid); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 07ab1a3dde..abef66385d 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -1959,6 +1959,8 @@ typedef struct CopyStmt bool is_program; /* is 'filename' a program to popen? */ char *filename; /* filename, or NULL for STDIN/STDOUT */ List *options; /* List of DefElem nodes */ + bool ignore_error; + char *failed_rec_filename; } CopyStmt; /* ---------------------- diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 23db40147b..9f0944d206 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -190,6 +190,7 @@ PG_KEYWORD("hold", HOLD, UNRESERVED_KEYWORD) PG_KEYWORD("hour", HOUR_P, UNRESERVED_KEYWORD) PG_KEYWORD("identity", IDENTITY_P, UNRESERVED_KEYWORD) PG_KEYWORD("if", IF_P, UNRESERVED_KEYWORD) +PG_KEYWORD("ignore", IGNORE, UNRESERVED_KEYWORD) PG_KEYWORD("ilike", ILIKE, TYPE_FUNC_NAME_KEYWORD) PG_KEYWORD("immediate", IMMEDIATE, UNRESERVED_KEYWORD) PG_KEYWORD("immutable", IMMUTABLE, UNRESERVED_KEYWORD) diff --git a/src/interfaces/ecpg/preproc/ecpg.addons b/src/interfaces/ecpg/preproc/ecpg.addons index ca3efadc48..45e311c7e6 100644 --- a/src/interfaces/ecpg/preproc/ecpg.addons +++ b/src/interfaces/ecpg/preproc/ecpg.addons @@ -192,7 +192,7 @@ ECPG: where_or_current_clauseWHERECURRENT_POFcursor_name block char *cursor_marker = $4[0] == ':' ? mm_strdup("$0") : $4; $$ = cat_str(2,mm_strdup("where current of"), cursor_marker); } -ECPG: CopyStmtCOPYopt_binaryqualified_nameopt_column_listopt_oidscopy_fromopt_programcopy_file_namecopy_delimiteropt_withcopy_options addon +ECPG: CopyStmtCOPYopt_binaryqualified_nameopt_column_listopt_oidscopy_fromopt_programcopy_file_nameignore_conflictfailed_rec_filenamecopy_delimiteropt_withcopy_options addon if (strcmp($6, "from") == 0 && (strcmp($7, "stdin") == 0 || strcmp($7, "stdout") == 0)) mmerror(PARSE_ERROR, ET_WARNING, "COPY FROM STDIN is not implemented");