On Tue, Feb 19, 2019 at 3:47 PM Andres Freund <and...@anarazel.de> wrote:
> > > Err, what? Again, that requires super user permissions (in contrast to > copy from/to stdin/out). Backends run as the user postgres runs under > okay i see it now and modified the patch similarly regards Surafel
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml index 254d3ab8eb..5ee70d62bf 100644 --- a/doc/src/sgml/ref/copy.sgml +++ b/doc/src/sgml/ref/copy.sgml @@ -380,6 +380,28 @@ WHERE <replaceable class="parameter">condition</replaceable> </listitem> </varlistentry> + <varlistentry> + <term><literal>on_conflict_log</literal></term> + <listitem> + <para> + Specifies to log error record up to specified amount. + Instead write the record to log file and + precede to the next record + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>log_file_name</literal></term> + <listitem> + <para> + The path name of the log file. It must be an absolute + path. Windows users might need to use an <literal>E''</literal> string and + double any backslashes used in the path name. + </para> + </listitem> + </varlistentry> + </variablelist> </refsect1> diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index dbb06397e6..2a2c3d98b4 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -46,6 +46,7 @@ #include "port/pg_bswap.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" +#include "storage/lmgr.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -123,6 +124,7 @@ typedef struct CopyStateData int file_encoding; /* file or remote side's character encoding */ bool need_transcoding; /* file encoding diff from server? */ bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ + FILE *failed_rec_file; /* used if ignore_conflict is true */ /* parameters from the COPY command */ Relation rel; /* relation to copy to or from */ @@ -152,6 +154,9 @@ typedef struct CopyStateData List *convert_select; /* list of column names (can be NIL) */ bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ Node *whereClause; /* WHERE condition (or NULL) */ + char *failed_rec_filename; /* failed record filename */ + bool ignore_conflict; + int error_limit; /* total # of error to log */ /* these are just for error messages, see CopyFromErrorCallback */ const char *cur_relname; /* table name for error messages */ @@ -773,6 +778,21 @@ CopyLoadRawBuf(CopyState cstate) return (inbytes > 0); } +/* + * LogCopyError log error in to failed record file + */ +static void +LogCopyError(CopyState cstate, const char *str) +{ + appendBinaryStringInfo(&cstate->line_buf, str, strlen(str)); +#ifndef WIN32 + appendStringInfoCharMacro(&cstate->line_buf, '\n'); +#else + appendBinaryStringInfo(&cstate->line_buf, "\r\n", strlen("\r\n")); +#endif + fwrite(cstate->line_buf.data, 1, cstate->line_buf.len, cstate->failed_rec_file); + cstate->error_limit--; +} /* * DoCopy executes the SQL COPY statement @@ -836,6 +856,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, errmsg("must be superuser or a member of the pg_write_server_files role to COPY to a file"), errhint("Anyone can COPY to stdout or from stdin. " "psql's \\copy command also works for anyone."))); + } } @@ -1249,6 +1270,36 @@ ProcessCopyOptions(ParseState *pstate, defel->defname), parser_errposition(pstate, defel->location))); } + else if (strcmp(defel->defname, "on_conflict_log") == 0) + { + if (cstate->ignore_conflict) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); + + cstate->ignore_conflict = true; + cstate->error_limit =defGetInt64(defel); + if (cstate->error_limit < 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("argument to option \"%s\" must be positive number", + defel->defname), + parser_errposition(pstate, defel->location))); + } + else if (strcmp(defel->defname, "log_file_name") == 0) + { + if (cstate->failed_rec_filename) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); + if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_WRITE_SERVER_FILES)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser or a member of the pg_write_server_files role to log error"))); + cstate->failed_rec_filename =defGetString(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -1271,6 +1322,21 @@ ProcessCopyOptions(ParseState *pstate, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("cannot specify NULL in BINARY mode"))); + if (!cstate->error_limit && cstate->failed_rec_filename) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot specify log file name without on conflict log option"))); + + if (cstate->error_limit && !cstate->failed_rec_filename) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot specify on conflict log without log file name option"))); + + if (cstate->error_limit && !cstate->is_copy_from) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot specify on conflict log on COPY TO"))); + /* Set defaults for omitted options */ if (!cstate->delim) cstate->delim = cstate->csv_mode ? "," : "\t"; @@ -1771,6 +1837,11 @@ EndCopy(CopyState cstate) (errcode_for_file_access(), errmsg("could not close file \"%s\": %m", cstate->filename))); + if (cstate->failed_rec_filename != NULL && FreeFile(cstate->failed_rec_file)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", + cstate->failed_rec_filename))); } MemoryContextDelete(cstate->copycontext); @@ -2492,6 +2563,8 @@ CopyFrom(CopyState cstate) hi_options |= HEAP_INSERT_FROZEN; } + if (!cstate->ignore_conflict) + cstate->error_limit = 0; /* * We need a ResultRelInfo so we can use the regular executor's * index-entry-making machinery. (There used to be a huge amount of code @@ -2619,6 +2692,10 @@ CopyFrom(CopyState cstate) */ insertMethod = CIM_SINGLE; } + else if (cstate->ignore_conflict) + { + insertMethod = CIM_SINGLE; + } else { /* @@ -3000,12 +3077,59 @@ CopyFrom(CopyState cstate) */ tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); } + else if (cstate->ignore_conflict && cstate->error_limit > 0) + { + bool specConflict; + uint32 specToken; + specConflict = false; + + specToken = SpeculativeInsertionLockAcquire(GetCurrentTransactionId()); + HeapTupleHeaderSetSpeculativeToken(tuple->t_data, specToken); + + /* insert the tuple, with the speculative token */ + heap_insert(resultRelInfo->ri_RelationDesc, tuple, + estate->es_output_cid, + HEAP_INSERT_SPECULATIVE, + NULL); + + /* insert index entries for tuple */ + recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), + estate, true, &specConflict, + NIL); + + /* adjust the tuple's state accordingly */ + if (!specConflict) + { + heap_finish_speculative(resultRelInfo->ri_RelationDesc, tuple); + processed++; + } + else + { + heap_abort_speculative(resultRelInfo->ri_RelationDesc, tuple); +#ifndef WIN32 + appendStringInfoCharMacro(&cstate->line_buf, '\n'); +#else + appendBinaryStringInfo(&cstate->cstate->line_buf, "\r\n", strlen("\r\n")); +#endif + fwrite(cstate->line_buf.data, 1, cstate->line_buf.len, cstate->failed_rec_file); + cstate->error_limit--; + + } + + /* + * Wake up anyone waiting for our decision. They will re-check + * the tuple, see that it's no longer speculative, and wait on our + * XID as if this was a regularly inserted tuple all along. + */ + SpeculativeInsertionLockRelease(GetCurrentTransactionId()); + + } else heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid, hi_options, bistate); /* And create index entries for it */ - if (resultRelInfo->ri_NumIndices > 0) + if (resultRelInfo->ri_NumIndices > 0 && cstate->error_limit == 0) recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), estate, @@ -3026,7 +3150,8 @@ CopyFrom(CopyState cstate) * or FDW; this is the same definition used by nodeModifyTable.c * for counting tuples inserted by an INSERT command. */ - processed++; + if(!cstate->ignore_conflict) + processed++; } } @@ -3316,6 +3441,48 @@ BeginCopyFrom(ParseState *pstate, cstate->num_defaults = num_defaults; cstate->is_program = is_program; + if (cstate->failed_rec_filename) + { + mode_t oumask; /* Pre-existing umask value */ + struct stat st; + /* + * Prevent write to relative path ... too easy to shoot oneself in + * the foot by overwriting a database file ... + */ + if (!is_absolute_path(cstate->failed_rec_filename)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_NAME), + errmsg("relative path not allowed for failed record file"))); + oumask = umask(S_IWGRP | S_IWOTH); + PG_TRY(); + { + cstate->failed_rec_file = AllocateFile(cstate->failed_rec_filename, PG_BINARY_W); + } + PG_CATCH(); + { + umask(oumask); + PG_RE_THROW(); + } + PG_END_TRY(); + umask(oumask); + if (cstate->failed_rec_file == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" for writing: %m", + cstate->failed_rec_filename))); + + if (fstat(fileno(cstate->failed_rec_file), &st)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", + cstate->failed_rec_filename))); + + if (S_ISDIR(st.st_mode)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a directory", cstate->failed_rec_filename))); + } + if (data_source_cb) { cstate->copy_dest = COPY_CALLBACK; @@ -3514,7 +3681,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, true, num_phys_attrs * sizeof(bool)); - +next_line: if (!cstate->binary) { char **field_strings; @@ -3529,9 +3696,16 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, /* check for overflowing fields */ if (attr_count > 0 && fldct > attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); + { + if (cstate->ignore_conflict && cstate->error_limit > 0) + { + LogCopyError(cstate, " extra data after last expected column"); + goto next_line; + }else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + } fieldno = 0; @@ -3543,10 +3717,20 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, Form_pg_attribute att = TupleDescAttr(tupDesc, m); if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); + { + if (cstate->ignore_conflict && cstate->error_limit > 0) + { + appendStringInfo(&cstate->line_buf, " missing data for column %s", + NameStr(att->attname)); + LogCopyError(cstate, " "); + goto next_line; + }else + + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for column \"%s\"", + NameStr(att->attname)))); + } string = field_strings[fieldno++]; if (cstate->convert_select_flags && @@ -3633,10 +3817,19 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, } if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); + { + if (cstate->ignore_conflict && cstate->error_limit > 0) + { + appendStringInfo(&cstate->line_buf, "row field count is %d, expected %d", + (int) fld_count, attr_count); + LogCopyError(cstate, " "); + goto next_line; + }else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + } i = 0; foreach(cur, cstate->attnumlist) diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index c1faf4152c..bf21ba408e 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -631,7 +631,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN EXTENSION EXTERNAL EXTRACT - FALSE_P FAMILY FETCH FILTER FIRST_P FLOAT_P FOLLOWING FOR + FALSE_P FAMILY FETCH FILE_P FILTER FIRST_P FLOAT_P FOLLOWING FOR FORCE FOREIGN FORWARD FREEZE FROM FULL FUNCTION FUNCTIONS GENERATED GLOBAL GRANT GRANTED GREATEST GROUP_P GROUPING GROUPS @@ -649,7 +649,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); LABEL LANGUAGE LARGE_P LAST_P LATERAL_P LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL - LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED + LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOG_P LOGGED MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE @@ -3047,6 +3047,14 @@ copy_opt_item: { $$ = makeDefElem("encoding", (Node *)makeString($2), @1); } + | ON CONFLICT LOG_P Iconst + { + $$ = makeDefElem("on_conflict_log", (Node *)makeInteger($4), @1); + } + | LOG_P FILE_P NAME_P Sconst + { + $$ = makeDefElem("log_file_name", (Node *)makeString($4), @1); + } ; /* The following exist for backward compatibility with very old versions */ @@ -15004,6 +15012,7 @@ unreserved_keyword: | EXTENSION | EXTERNAL | FAMILY + | FILE_P | FILTER | FIRST_P | FOLLOWING @@ -15052,6 +15061,7 @@ unreserved_keyword: | LOCATION | LOCK_P | LOCKED + | LOG_P | LOGGED | MAPPING | MATCH diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index adeb834ce8..3b20f6d16a 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -161,6 +161,7 @@ PG_KEYWORD("extract", EXTRACT, COL_NAME_KEYWORD) PG_KEYWORD("false", FALSE_P, RESERVED_KEYWORD) PG_KEYWORD("family", FAMILY, UNRESERVED_KEYWORD) PG_KEYWORD("fetch", FETCH, RESERVED_KEYWORD) +PG_KEYWORD("file", FILE_P, UNRESERVED_KEYWORD) PG_KEYWORD("filter", FILTER, UNRESERVED_KEYWORD) PG_KEYWORD("first", FIRST_P, UNRESERVED_KEYWORD) PG_KEYWORD("float", FLOAT_P, COL_NAME_KEYWORD) @@ -241,6 +242,7 @@ PG_KEYWORD("localtimestamp", LOCALTIMESTAMP, RESERVED_KEYWORD) PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD) PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD) PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD) +PG_KEYWORD("log", LOG_P, UNRESERVED_KEYWORD) PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD) PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD) PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)