Hello, The attached patch add error handling for Extra data
missing data invalid oid null oid and row count mismatch And the record that field on the above case write to the file with appended error message in it and in case of unique violation or exclusion constraint violation error the failed record write as it is because the case of the error can not be identified specifically The new syntax became : COPY ... WITH ON CONFLICT LOG maximum_error, LOG FILE NAME '…'; Regards Surafel
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml index 13a8b68d95..bf21abd8e0 100644 --- a/doc/src/sgml/ref/copy.sgml +++ b/doc/src/sgml/ref/copy.sgml @@ -364,6 +364,17 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable </listitem> </varlistentry> + <varlistentry> + <term><literal>IGNORE_CONFLICTS</literal></term> + <listitem> + <para> + specifies ignore to error up to specified amount . + Instead write the error record to failed record file and + precede to the next record + </para> + </listitem> + </varlistentry> + </variablelist> </refsect1> diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 9bc67ce60f..ffa6aecbd5 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -43,6 +43,7 @@ #include "port/pg_bswap.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" +#include "storage/lmgr.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -118,6 +119,7 @@ typedef struct CopyStateData int file_encoding; /* file or remote side's character encoding */ bool need_transcoding; /* file encoding diff from server? */ bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ + FILE *failed_rec_file; /* parameters from the COPY command */ Relation rel; /* relation to copy to or from */ @@ -147,6 +149,9 @@ typedef struct CopyStateData bool convert_selectively; /* do selective binary conversion? */ List *convert_select; /* list of column names (can be NIL) */ bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ + char *failed_rec_filename; + bool ignore_conflict; + int error_limit; /* total # of error to log */ /* these are just for error messages, see CopyFromErrorCallback */ const char *cur_relname; /* table name for error messages */ @@ -766,6 +771,21 @@ CopyLoadRawBuf(CopyState cstate) return (inbytes > 0); } +/* + * LogCopyError log error in to error log file + */ +static void +LogCopyError(CopyState cstate, const char *str) +{ + appendBinaryStringInfo(&cstate->line_buf, str, strlen(str)); +#ifndef WIN32 + appendStringInfoCharMacro(&cstate->line_buf, '\n'); +#else + appendBinaryStringInfo(&cstate->line_buf, "\r\n", strlen("\r\n")); +#endif + fwrite(cstate->line_buf.data, 1, cstate->line_buf.len, cstate->failed_rec_file); + cstate->error_limit--; +} /* * DoCopy executes the SQL COPY statement @@ -1226,6 +1246,19 @@ ProcessCopyOptions(ParseState *pstate, defel->defname), parser_errposition(pstate, defel->location))); } + else if (strcmp(defel->defname, "ignore_conflicts") == 0) + { + List *conflictOption; + if (cstate->ignore_conflict) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); + cstate->ignore_conflict = true; + conflictOption = (List *) defel->arg; + cstate->error_limit = intVal(list_nth(conflictOption, 0)); + cstate->failed_rec_filename = strVal(list_nth(conflictOption, 1)); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -1749,6 +1782,11 @@ EndCopy(CopyState cstate) (errcode_for_file_access(), errmsg("could not close file \"%s\": %m", cstate->filename))); + if (cstate->failed_rec_filename != NULL && FreeFile(cstate->failed_rec_file)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", + cstate->failed_rec_filename))); } MemoryContextDelete(cstate->copycontext); @@ -2461,6 +2499,8 @@ CopyFrom(CopyState cstate) hi_options |= HEAP_INSERT_FROZEN; } + if (!cstate->ignore_conflict) + cstate->error_limit = 0; /* * We need a ResultRelInfo so we can use the regular executor's * index-entry-making machinery. (There used to be a huge amount of code @@ -2579,6 +2619,10 @@ CopyFrom(CopyState cstate) */ insertMethod = CIM_SINGLE; } + else if (cstate->ignore_conflict) + { + insertMethod = CIM_SINGLE; + } else { /* @@ -2968,12 +3012,59 @@ CopyFrom(CopyState cstate) */ tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); } + else if (cstate->ignore_conflict && cstate->error_limit > 0) + { + bool specConflict; + uint32 specToken; + specConflict = false; + + specToken = SpeculativeInsertionLockAcquire(GetCurrentTransactionId()); + HeapTupleHeaderSetSpeculativeToken(tuple->t_data, specToken); + + /* insert the tuple, with the speculative token */ + heap_insert(resultRelInfo->ri_RelationDesc, tuple, + estate->es_output_cid, + HEAP_INSERT_SPECULATIVE, + NULL); + + /* insert index entries for tuple */ + recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), + estate, true, &specConflict, + NIL); + + /* adjust the tuple's state accordingly */ + if (!specConflict) + { + heap_finish_speculative(resultRelInfo->ri_RelationDesc, tuple); + processed++; + } + else + { + heap_abort_speculative(resultRelInfo->ri_RelationDesc, tuple); +#ifndef WIN32 + appendStringInfoCharMacro(&cstate->line_buf, '\n'); +#else + appendBinaryStringInfo(&cstate->cstate->line_buf, "\r\n", strlen("\r\n")); +#endif + fwrite(cstate->line_buf.data, 1, cstate->line_buf.len, cstate->failed_rec_file); + cstate->error_limit--; + + } + + /* + * Wake up anyone waiting for our decision. They will re-check + * the tuple, see that it's no longer speculative, and wait on our + * XID as if this was a regularly inserted tuple all along. + */ + SpeculativeInsertionLockRelease(GetCurrentTransactionId()); + + } else heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid, hi_options, bistate); /* And create index entries for it */ - if (resultRelInfo->ri_NumIndices > 0) + if (resultRelInfo->ri_NumIndices > 0 && cstate->error_limit == 0) recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), estate, @@ -2994,7 +3085,8 @@ CopyFrom(CopyState cstate) * or FDW; this is the same definition used by nodeModifyTable.c * for counting tuples inserted by an INSERT command. */ - processed++; + if(!cstate->ignore_conflict) + processed++; } } @@ -3286,6 +3378,48 @@ BeginCopyFrom(ParseState *pstate, cstate->num_defaults = num_defaults; cstate->is_program = is_program; + if (cstate->failed_rec_filename) + { + mode_t oumask; /* Pre-existing umask value */ + struct stat st; + /* + * Prevent write to relative path ... too easy to shoot oneself in + * the foot by overwriting a database file ... + */ + if (!is_absolute_path(cstate->failed_rec_filename)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_NAME), + errmsg("relative path not allowed for failed record file"))); + oumask = umask(S_IWGRP | S_IWOTH); + PG_TRY(); + { + cstate->failed_rec_file = AllocateFile(cstate->failed_rec_filename, PG_BINARY_W); + } + PG_CATCH(); + { + umask(oumask); + PG_RE_THROW(); + } + PG_END_TRY(); + umask(oumask); + if (cstate->failed_rec_file == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" for writing: %m", + cstate->failed_rec_filename))); + + if (fstat(fileno(cstate->failed_rec_file), &st)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", + cstate->failed_rec_filename))); + + if (S_ISDIR(st.st_mode)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a directory", cstate->failed_rec_filename))); + } + if (data_source_cb) { cstate->copy_dest = COPY_CALLBACK; @@ -3498,7 +3632,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, true, num_phys_attrs * sizeof(bool)); - +next_line: if (!cstate->binary) { char **field_strings; @@ -3513,9 +3647,16 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, /* check for overflowing fields */ if (nfields > 0 && fldct > nfields) + { + if (cstate->ignore_conflict && cstate->error_limit > 0) + { + LogCopyError(cstate, " extra data after last expected column"); + goto next_line; + }else ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("extra data after last expected column"))); + } fieldno = 0; @@ -3523,15 +3664,29 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, if (file_has_oids) { if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for OID column"))); + { + if (cstate->ignore_conflict && cstate->error_limit > 0) + { + LogCopyError(cstate, " missing data for OID column"); + goto next_line; + }else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for OID column"))); + } string = field_strings[fieldno++]; if (string == NULL) + { + if (cstate->ignore_conflict && cstate->error_limit > 0) + { + LogCopyError(cstate, " null OID in COPY data"); + goto next_line; + }else ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("null OID in COPY data"))); + } else if (cstate->oids && tupleOid != NULL) { cstate->cur_attname = "oid"; @@ -3539,9 +3694,17 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, *tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin, CStringGetDatum(string))); if (*tupleOid == InvalidOid) + { + if (cstate->ignore_conflict && cstate->error_limit > 0) + { + LogCopyError(cstate, " invalid OID in COPY data"); + goto next_line; + }else + ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid OID in COPY data"))); + } cstate->cur_attname = NULL; cstate->cur_attval = NULL; } @@ -3555,10 +3718,20 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, Form_pg_attribute att = TupleDescAttr(tupDesc, m); if (fieldno >= fldct) + { + if (cstate->ignore_conflict && cstate->error_limit > 0) + { + appendStringInfo(&cstate->line_buf, " missing data for column %s", + NameStr(att->attname)); + LogCopyError(cstate, " "); + goto next_line; + }else + ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("missing data for column \"%s\"", NameStr(att->attname)))); + } string = field_strings[fieldno++]; if (cstate->convert_select_flags && @@ -3645,10 +3818,19 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, } if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); + { + if (cstate->ignore_conflict && cstate->error_limit > 0) + { + appendStringInfo(&cstate->line_buf, "row field count is %d, expected %d", + (int) fld_count, attr_count); + LogCopyError(cstate, " "); + goto next_line; + }else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + } if (file_has_oids) { @@ -3663,9 +3845,16 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, -1, &isnull)); if (isnull || loaded_oid == InvalidOid) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("invalid OID in COPY data"))); + { + if (cstate->ignore_conflict && cstate->error_limit > 0) + { + LogCopyError(cstate, " invalid OID in COPY data"); + goto next_line; + }else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid OID in COPY data"))); + } cstate->cur_attname = NULL; if (cstate->oids && tupleOid != NULL) *tupleOid = loaded_oid; diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 87f5e95827..c1084f71bc 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -632,7 +632,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN EXTENSION EXTERNAL EXTRACT - FALSE_P FAMILY FETCH FILTER FIRST_P FLOAT_P FOLLOWING FOR + FALSE_P FAMILY FETCH FILE_P FILTER FIRST_P FLOAT_P FOLLOWING FOR FORCE FOREIGN FORWARD FREEZE FROM FULL FUNCTION FUNCTIONS GENERATED GLOBAL GRANT GRANTED GREATEST GROUP_P GROUPING GROUPS @@ -650,7 +650,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); LABEL LANGUAGE LARGE_P LAST_P LATERAL_P LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL - LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED + LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOG_P LOGGED MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE @@ -3107,6 +3107,10 @@ copy_opt_item: { $$ = makeDefElem("encoding", (Node *)makeString($2), @1); } + | ON CONFLICT LOG_P Iconst ',' LOG_P FILE_P NAME_P Sconst + { + $$ = makeDefElem("ignore_conflicts", (Node *)list_make2(makeInteger($4), makeString($9)), @1); + } ; /* The following exist for backward compatibility with very old versions */ @@ -15086,6 +15090,7 @@ unreserved_keyword: | EXTENSION | EXTERNAL | FAMILY + | FILE_P | FILTER | FIRST_P | FOLLOWING @@ -15134,6 +15139,7 @@ unreserved_keyword: | LOCATION | LOCK_P | LOCKED + | LOG_P | LOGGED | MAPPING | MATCH diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 23db40147b..442562b0fe 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -162,6 +162,7 @@ PG_KEYWORD("extract", EXTRACT, COL_NAME_KEYWORD) PG_KEYWORD("false", FALSE_P, RESERVED_KEYWORD) PG_KEYWORD("family", FAMILY, UNRESERVED_KEYWORD) PG_KEYWORD("fetch", FETCH, RESERVED_KEYWORD) +PG_KEYWORD("file", FILE_P, UNRESERVED_KEYWORD) PG_KEYWORD("filter", FILTER, UNRESERVED_KEYWORD) PG_KEYWORD("first", FIRST_P, UNRESERVED_KEYWORD) PG_KEYWORD("float", FLOAT_P, COL_NAME_KEYWORD) @@ -242,6 +243,7 @@ PG_KEYWORD("localtimestamp", LOCALTIMESTAMP, RESERVED_KEYWORD) PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD) PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD) PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD) +PG_KEYWORD("log", LOG_P, UNRESERVED_KEYWORD) PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD) PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD) PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)