On Thu, Nov 29, 2018 at 3:15 PM Dmitry Dolgov <9erthali...@gmail.com> wrote:
> > Unfortunately, the patch conflict-handling-onCopy-from-v2.patch has some > conflicts now, could you rebase it? > Thank you for informing, attach is rebased patch against current master Regards Surafel
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml index 411941ed31..33015451a5 100644 --- a/doc/src/sgml/ref/copy.sgml +++ b/doc/src/sgml/ref/copy.sgml @@ -353,6 +353,28 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable </listitem> </varlistentry> + <varlistentry> + <term><literal>on_conflict_log</literal></term> + <listitem> + <para> + Specifies to log error record up to specified amount. + Instead write the record to log file and + precede to the next record + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>log_file_name</literal></term> + <listitem> + <para> + The path name of the log file. It must be an absolute + path. Windows users might need to use an <literal>E''</literal> string and + double any backslashes used in the path name. + </para> + </listitem> + </varlistentry> + </variablelist> </refsect1> diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 4311e16007..b4b707c3f6 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -44,6 +44,7 @@ #include "port/pg_bswap.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" +#include "storage/lmgr.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -121,6 +122,7 @@ typedef struct CopyStateData int file_encoding; /* file or remote side's character encoding */ bool need_transcoding; /* file encoding diff from server? */ bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ + FILE *failed_rec_file; /* used if ignore_conflict is true */ /* parameters from the COPY command */ Relation rel; /* relation to copy to or from */ @@ -149,6 +151,9 @@ typedef struct CopyStateData bool convert_selectively; /* do selective binary conversion? */ List *convert_select; /* list of column names (can be NIL) */ bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ + char *failed_rec_filename; /* failed record filename */ + bool ignore_conflict; + int error_limit; /* total # of error to log */ /* these are just for error messages, see CopyFromErrorCallback */ const char *cur_relname; /* table name for error messages */ @@ -769,6 +774,21 @@ CopyLoadRawBuf(CopyState cstate) return (inbytes > 0); } +/* + * LogCopyError log error in to failed record file + */ +static void +LogCopyError(CopyState cstate, const char *str) +{ + appendBinaryStringInfo(&cstate->line_buf, str, strlen(str)); +#ifndef WIN32 + appendStringInfoCharMacro(&cstate->line_buf, '\n'); +#else + appendBinaryStringInfo(&cstate->line_buf, "\r\n", strlen("\r\n")); +#endif + fwrite(cstate->line_buf.data, 1, cstate->line_buf.len, cstate->failed_rec_file); + cstate->error_limit--; +} /* * DoCopy executes the SQL COPY statement @@ -1223,6 +1243,32 @@ ProcessCopyOptions(ParseState *pstate, defel->defname), parser_errposition(pstate, defel->location))); } + else if (strcmp(defel->defname, "on_conflict_log") == 0) + { + if (cstate->ignore_conflict) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); + + cstate->ignore_conflict = true; + cstate->error_limit =defGetInt64(defel); + if (cstate->error_limit < 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("argument to option \"%s\" must be positive number", + defel->defname), + parser_errposition(pstate, defel->location))); + } + else if (strcmp(defel->defname, "log_file_name") == 0) + { + if (cstate->failed_rec_filename) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); + cstate->failed_rec_filename =defGetString(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -1245,6 +1291,21 @@ ProcessCopyOptions(ParseState *pstate, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("cannot specify NULL in BINARY mode"))); + if (!cstate->error_limit && cstate->failed_rec_filename) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot specify log file name without on conflict log option"))); + + if (cstate->error_limit && !cstate->failed_rec_filename) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot specify on conflict log without log file name option"))); + + if (cstate->error_limit && !cstate->is_copy_from) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot specify on conflict log on COPY TO"))); + /* Set defaults for omitted options */ if (!cstate->delim) cstate->delim = cstate->csv_mode ? "," : "\t"; @@ -1745,6 +1806,11 @@ EndCopy(CopyState cstate) (errcode_for_file_access(), errmsg("could not close file \"%s\": %m", cstate->filename))); + if (cstate->failed_rec_filename != NULL && FreeFile(cstate->failed_rec_file)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", + cstate->failed_rec_filename))); } MemoryContextDelete(cstate->copycontext); @@ -2461,6 +2527,8 @@ CopyFrom(CopyState cstate) hi_options |= HEAP_INSERT_FROZEN; } + if (!cstate->ignore_conflict) + cstate->error_limit = 0; /* * We need a ResultRelInfo so we can use the regular executor's * index-entry-making machinery. (There used to be a huge amount of code @@ -2575,6 +2643,10 @@ CopyFrom(CopyState cstate) */ insertMethod = CIM_SINGLE; } + else if (cstate->ignore_conflict) + { + insertMethod = CIM_SINGLE; + } else { /* @@ -2946,12 +3018,59 @@ CopyFrom(CopyState cstate) */ tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); } + else if (cstate->ignore_conflict && cstate->error_limit > 0) + { + bool specConflict; + uint32 specToken; + specConflict = false; + + specToken = SpeculativeInsertionLockAcquire(GetCurrentTransactionId()); + HeapTupleHeaderSetSpeculativeToken(tuple->t_data, specToken); + + /* insert the tuple, with the speculative token */ + heap_insert(resultRelInfo->ri_RelationDesc, tuple, + estate->es_output_cid, + HEAP_INSERT_SPECULATIVE, + NULL); + + /* insert index entries for tuple */ + recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), + estate, true, &specConflict, + NIL); + + /* adjust the tuple's state accordingly */ + if (!specConflict) + { + heap_finish_speculative(resultRelInfo->ri_RelationDesc, tuple); + processed++; + } + else + { + heap_abort_speculative(resultRelInfo->ri_RelationDesc, tuple); +#ifndef WIN32 + appendStringInfoCharMacro(&cstate->line_buf, '\n'); +#else + appendBinaryStringInfo(&cstate->cstate->line_buf, "\r\n", strlen("\r\n")); +#endif + fwrite(cstate->line_buf.data, 1, cstate->line_buf.len, cstate->failed_rec_file); + cstate->error_limit--; + + } + + /* + * Wake up anyone waiting for our decision. They will re-check + * the tuple, see that it's no longer speculative, and wait on our + * XID as if this was a regularly inserted tuple all along. + */ + SpeculativeInsertionLockRelease(GetCurrentTransactionId()); + + } else heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid, hi_options, bistate); /* And create index entries for it */ - if (resultRelInfo->ri_NumIndices > 0) + if (resultRelInfo->ri_NumIndices > 0 && cstate->error_limit == 0) recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), estate, @@ -2972,7 +3091,8 @@ CopyFrom(CopyState cstate) * or FDW; this is the same definition used by nodeModifyTable.c * for counting tuples inserted by an INSERT command. */ - processed++; + if(!cstate->ignore_conflict) + processed++; } } @@ -3260,6 +3380,48 @@ BeginCopyFrom(ParseState *pstate, cstate->num_defaults = num_defaults; cstate->is_program = is_program; + if (cstate->failed_rec_filename) + { + mode_t oumask; /* Pre-existing umask value */ + struct stat st; + /* + * Prevent write to relative path ... too easy to shoot oneself in + * the foot by overwriting a database file ... + */ + if (!is_absolute_path(cstate->failed_rec_filename)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_NAME), + errmsg("relative path not allowed for failed record file"))); + oumask = umask(S_IWGRP | S_IWOTH); + PG_TRY(); + { + cstate->failed_rec_file = AllocateFile(cstate->failed_rec_filename, PG_BINARY_W); + } + PG_CATCH(); + { + umask(oumask); + PG_RE_THROW(); + } + PG_END_TRY(); + umask(oumask); + if (cstate->failed_rec_file == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" for writing: %m", + cstate->failed_rec_filename))); + + if (fstat(fileno(cstate->failed_rec_file), &st)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", + cstate->failed_rec_filename))); + + if (S_ISDIR(st.st_mode)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a directory", cstate->failed_rec_filename))); + } + if (data_source_cb) { cstate->copy_dest = COPY_CALLBACK; @@ -3458,7 +3620,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, true, num_phys_attrs * sizeof(bool)); - +next_line: if (!cstate->binary) { char **field_strings; @@ -3473,9 +3635,16 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, /* check for overflowing fields */ if (attr_count > 0 && fldct > attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); + { + if (cstate->ignore_conflict && cstate->error_limit > 0) + { + LogCopyError(cstate, " extra data after last expected column"); + goto next_line; + }else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + } fieldno = 0; @@ -3487,10 +3656,20 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, Form_pg_attribute att = TupleDescAttr(tupDesc, m); if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); + { + if (cstate->ignore_conflict && cstate->error_limit > 0) + { + appendStringInfo(&cstate->line_buf, " missing data for column %s", + NameStr(att->attname)); + LogCopyError(cstate, " "); + goto next_line; + }else + + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for column \"%s\"", + NameStr(att->attname)))); + } string = field_strings[fieldno++]; if (cstate->convert_select_flags && @@ -3577,10 +3756,19 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, } if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); + { + if (cstate->ignore_conflict && cstate->error_limit > 0) + { + appendStringInfo(&cstate->line_buf, "row field count is %d, expected %d", + (int) fld_count, attr_count); + LogCopyError(cstate, " "); + goto next_line; + }else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + } i = 0; foreach(cur, cstate->attnumlist) diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 2c2208ffb7..ecfa5f9874 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -632,7 +632,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN EXTENSION EXTERNAL EXTRACT - FALSE_P FAMILY FETCH FILTER FIRST_P FLOAT_P FOLLOWING FOR + FALSE_P FAMILY FETCH FILE_P FILTER FIRST_P FLOAT_P FOLLOWING FOR FORCE FOREIGN FORWARD FREEZE FROM FULL FUNCTION FUNCTIONS GENERATED GLOBAL GRANT GRANTED GREATEST GROUP_P GROUPING GROUPS @@ -650,7 +650,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); LABEL LANGUAGE LARGE_P LAST_P LATERAL_P LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL - LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED + LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOG_P LOGGED MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE @@ -3093,6 +3093,14 @@ copy_opt_item: { $$ = makeDefElem("encoding", (Node *)makeString($2), @1); } + | ON CONFLICT LOG_P Iconst + { + $$ = makeDefElem("on_conflict_log", (Node *)makeInteger($4), @1); + } + | LOG_P FILE_P NAME_P Sconst + { + $$ = makeDefElem("log_file_name", (Node *)makeString($4), @1); + } ; /* The following exist for backward compatibility with very old versions */ @@ -15052,6 +15060,7 @@ unreserved_keyword: | EXTENSION | EXTERNAL | FAMILY + | FILE_P | FILTER | FIRST_P | FOLLOWING @@ -15100,6 +15109,7 @@ unreserved_keyword: | LOCATION | LOCK_P | LOCKED + | LOG_P | LOGGED | MAPPING | MATCH diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 23db40147b..442562b0fe 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -162,6 +162,7 @@ PG_KEYWORD("extract", EXTRACT, COL_NAME_KEYWORD) PG_KEYWORD("false", FALSE_P, RESERVED_KEYWORD) PG_KEYWORD("family", FAMILY, UNRESERVED_KEYWORD) PG_KEYWORD("fetch", FETCH, RESERVED_KEYWORD) +PG_KEYWORD("file", FILE_P, UNRESERVED_KEYWORD) PG_KEYWORD("filter", FILTER, UNRESERVED_KEYWORD) PG_KEYWORD("first", FIRST_P, UNRESERVED_KEYWORD) PG_KEYWORD("float", FLOAT_P, COL_NAME_KEYWORD) @@ -242,6 +243,7 @@ PG_KEYWORD("localtimestamp", LOCALTIMESTAMP, RESERVED_KEYWORD) PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD) PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD) PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD) +PG_KEYWORD("log", LOG_P, UNRESERVED_KEYWORD) PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD) PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD) PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)