Trent Shipley <trent_ship...@qwest.net> writes: > On Friday 2007-12-14 16:22, Tom Lane wrote: >> Neil Conway <ne...@samurai.com> writes: >> > By modifying COPY: COPY IGNORE ERRORS or some such would instruct COPY >> > to drop (and log) rows that contain malformed data. That is, rows with >> > too many or too few columns, rows that result in constraint violations, >> > and rows containing columns where the data type's input function raises >> > an error. The last case is the only thing that would be a bit tricky to >> > implement, I think: you could use PG_TRY() around the InputFunctionCall, >> > but I guess you'd need a subtransaction to ensure that you reset your >> > state correctly after catching an error. >> >> Yeah. It's the subtransaction per row that's daunting --- not only the >> cycles spent for that, but the ensuing limitation to 4G rows imported >> per COPY. > > You could extend the COPY FROM syntax with a COMMIT EVERY n clause. This > would help with the 4G subtransaction limit. The cost to the ETL process is > that a simple rollback would not be guaranteed send the process back to it's > initial state. There are easy ways to deal with the rollback issue though. > > A {NO} RETRY {USING algorithm} clause might be useful. If the NO RETRY > option is selected then the COPY FROM can run without subtransactions and in > excess of the 4G per transaction limit. NO RETRY should be the default since > it preserves the legacy behavior of COPY FROM. > > You could have an EXCEPTIONS TO {filename|STDERR} clause. I would not give > the > option of sending exceptions to a table since they are presumably malformed, > otherwise they would not be exceptions. (Users should re-process exception > files if they want an if good then table a else exception to table b ...) > > EXCEPTIONS TO and NO RETRY would be mutually exclusive. > > >> If we could somehow only do a subtransaction per failure, things would >> be much better, but I don't see how.
Hello, Attached is a proof of concept patch for this TODO item. There is no docs yet, I just wanted to know if approach is sane. The added syntax is like the following: COPY [table] FROM [file/program/stdin] EXCEPTIONS TO [file or stdout] The way it's done it is abusing Copy Both mode and from my limited testing, that seems to just work. The error trapping itself is done using PG_TRY/PG_CATCH and can only catch formatting or before-insert trigger errors, no attempt is made to recover from a failed unique constraint, etc. Example in action: postgres=# \d test_copy2 Table "public.test_copy2" Column | Type | Modifiers --------+---------+----------- id | integer | val | integer | postgres=# copy test_copy2 from program 'seq 3' exceptions to stdout; 1 NOTICE: missing data for column "val" CONTEXT: COPY test_copy2, line 1: "1" 2 NOTICE: missing data for column "val" CONTEXT: COPY test_copy2, line 2: "2" 3 NOTICE: missing data for column "val" CONTEXT: COPY test_copy2, line 3: "3" NOTICE: total exceptions ignored: 3 postgres=# \d test_copy1 Table "public.test_copy1" Column | Type | Modifiers --------+---------+----------- id | integer | not null postgres=# set client_min_messages to warning; SET postgres=# copy test_copy1 from program 'ls /proc' exceptions to stdout; ... vmstat zoneinfo postgres=# Limited performance testing shows no significant difference between error-catching and plain code path. For example, timing copy test_copy1 from program 'seq 1000000' [exceptions to stdout] shows similar numbers with or without the added "exceptions to" clause. Now that I'm sending this I wonder if the original comment about the need for subtransaction around every loaded line still holds. Any example of what would be not properly rolled back by just PG_TRY? Happy hacking! -- Alex
>From 50f7ab0a503a0d61776add8a138abf2622fc6c35 Mon Sep 17 00:00:00 2001 From: Alex Shulgin <a...@commandprompt.com> Date: Fri, 19 Dec 2014 18:21:31 +0300 Subject: [PATCH] POC: COPY FROM ... EXCEPTIONS TO --- contrib/file_fdw/file_fdw.c | 4 +- src/backend/commands/copy.c | 251 +++++++++++++++++++++++++++++--- src/backend/parser/gram.y | 26 +++- src/bin/psql/common.c | 14 +- src/bin/psql/copy.c | 119 ++++++++++++++- src/bin/psql/settings.h | 1 + src/bin/psql/startup.c | 1 + src/bin/psql/tab-complete.c | 12 +- src/include/commands/copy.h | 3 +- src/include/nodes/parsenodes.h | 1 + src/include/parser/kwlist.h | 1 + src/interfaces/ecpg/preproc/ecpg.addons | 2 +- 12 files changed, 396 insertions(+), 39 deletions(-) diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c new file mode 100644 index 5a4d5aa..0df02f7 *** a/contrib/file_fdw/file_fdw.c --- b/contrib/file_fdw/file_fdw.c *************** fileBeginForeignScan(ForeignScanState *n *** 624,629 **** --- 624,630 ---- cstate = BeginCopyFrom(node->ss.ss_currentRelation, filename, false, + NULL, NIL, options); *************** fileReScanForeignScan(ForeignScanState * *** 697,702 **** --- 698,704 ---- festate->cstate = BeginCopyFrom(node->ss.ss_currentRelation, festate->filename, false, + NULL, NIL, festate->options); } *************** file_acquire_sample_rows(Relation onerel *** 1030,1036 **** /* * Create CopyState from FDW options. */ ! cstate = BeginCopyFrom(onerel, filename, false, NIL, options); /* * Use per-tuple memory context to prevent leak of memory used to read --- 1032,1038 ---- /* * Create CopyState from FDW options. */ ! cstate = BeginCopyFrom(onerel, filename, false, NULL, NIL, options); /* * Use per-tuple memory context to prevent leak of memory used to read diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c new file mode 100644 index 08abe14..4f59c63 *** a/src/backend/commands/copy.c --- b/src/backend/commands/copy.c *************** typedef enum EolType *** 96,102 **** typedef struct CopyStateData { /* low-level state data */ ! CopyDest copy_dest; /* type of copy source/destination */ FILE *copy_file; /* used if copy_dest == COPY_FILE */ StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for * dest == COPY_NEW_FE in COPY FROM */ --- 96,103 ---- typedef struct CopyStateData { /* low-level state data */ ! CopyDest copy_src; /* type of copy source */ ! CopyDest copy_dest; /* type of copy destination */ FILE *copy_file; /* used if copy_dest == COPY_FILE */ StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for * dest == COPY_NEW_FE in COPY FROM */ *************** typedef struct CopyStateData *** 105,110 **** --- 106,114 ---- int file_encoding; /* file or remote side's character encoding */ bool need_transcoding; /* file encoding diff from server? */ bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ + bool ignore_exceptions; /* should we trap and ignore exceptions? */ + FILE *exc_file; /* file stream to write erroring lines to */ + uint64 exceptions; /* total number of exceptions ignored */ /* parameters from the COPY command */ Relation rel; /* relation to copy to or from */ *************** typedef struct CopyStateData *** 112,117 **** --- 116,122 ---- List *attnumlist; /* integer list of attnums to copy */ char *filename; /* filename, or NULL for STDIN/STDOUT */ bool is_program; /* is 'filename' a program to popen? */ + char *exc_filename; /* filename for exceptions or NULL for STDOUT */ bool binary; /* binary format? */ bool oids; /* include OIDs? */ bool freeze; /* freeze rows on loading? */ *************** SendCopyBegin(CopyState cstate) *** 347,352 **** --- 352,366 ---- int16 format = (cstate->binary ? 1 : 0); int i; + /* + * Check if we might need to stream exceptions to the frontend. If + * so, this must be a "COPY FROM file/program EXCEPTIONS TO STDOUT". + * + * We need to create the frontend message buffer now. + */ + if (cstate->ignore_exceptions) + cstate->fe_msgbuf = makeStringInfo(); + pq_beginmessage(&buf, 'H'); pq_sendbyte(&buf, format); /* overall format */ pq_sendint(&buf, natts, 2); *************** ReceiveCopyBegin(CopyState cstate) *** 388,404 **** { /* new way */ StringInfoData buf; int natts = list_length(cstate->attnumlist); int16 format = (cstate->binary ? 1 : 0); int i; ! pq_beginmessage(&buf, 'G'); pq_sendbyte(&buf, format); /* overall format */ pq_sendint(&buf, natts, 2); for (i = 0; i < natts; i++) pq_sendint(&buf, format, 2); /* per-column formats */ pq_endmessage(&buf); ! cstate->copy_dest = COPY_NEW_FE; cstate->fe_msgbuf = makeStringInfo(); } else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2) --- 402,428 ---- { /* new way */ StringInfoData buf; + char msgid = 'G'; /* receiving from client only */ int natts = list_length(cstate->attnumlist); int16 format = (cstate->binary ? 1 : 0); int i; ! /* ! * Check if we also need to pipe exceptions back to the frontend. ! */ ! if (cstate->ignore_exceptions && cstate->exc_filename == NULL) ! { ! msgid = 'W'; /* copying in both directions */ ! cstate->copy_dest = COPY_NEW_FE; ! } ! ! pq_beginmessage(&buf, msgid); pq_sendbyte(&buf, format); /* overall format */ pq_sendint(&buf, natts, 2); for (i = 0; i < natts; i++) pq_sendint(&buf, format, 2); /* per-column formats */ pq_endmessage(&buf); ! cstate->copy_src = COPY_NEW_FE; cstate->fe_msgbuf = makeStringInfo(); } else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2) *************** ReceiveCopyBegin(CopyState cstate) *** 409,415 **** (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY BINARY is not supported to stdout or from stdin"))); pq_putemptymessage('G'); ! cstate->copy_dest = COPY_OLD_FE; } else { --- 433,439 ---- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY BINARY is not supported to stdout or from stdin"))); pq_putemptymessage('G'); ! cstate->copy_src = COPY_OLD_FE; } else { *************** ReceiveCopyBegin(CopyState cstate) *** 419,425 **** (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY BINARY is not supported to stdout or from stdin"))); pq_putemptymessage('D'); ! cstate->copy_dest = COPY_OLD_FE; } /* We *must* flush here to ensure FE knows it can send. */ pq_flush(); --- 443,449 ---- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY BINARY is not supported to stdout or from stdin"))); pq_putemptymessage('D'); ! cstate->copy_src = COPY_OLD_FE; } /* We *must* flush here to ensure FE knows it can send. */ pq_flush(); *************** CopySendChar(CopyState cstate, char c) *** 472,486 **** appendStringInfoCharMacro(cstate->fe_msgbuf, c); } static void CopySendEndOfRow(CopyState cstate) { StringInfo fe_msgbuf = cstate->fe_msgbuf; switch (cstate->copy_dest) { case COPY_FILE: ! if (!cstate->binary) { /* Default line termination depends on platform */ #ifndef WIN32 --- 496,560 ---- appendStringInfoCharMacro(cstate->fe_msgbuf, c); } + /* + * This should be called from PG_CATCH() after switching to appropriate + * MemoryContext. + */ + static void + CopySendException(CopyState cstate) + { + ErrorData *error; + + ++cstate->exceptions; + + /* + * When reading from the frontend, we reuse the current line held in the + * message buffer to send the exception line back, otherwise we need to + * copy the line over from the line buffer. + */ + if (cstate->copy_src == COPY_FILE) + CopySendData(cstate, cstate->line_buf.data, cstate->line_buf.len); + + /* this flushes the message buffer */ + CopySendEndOfRow(cstate); + + error = CopyErrorData(); + FlushErrorState(); + + /* report error as a harmless notice */ + ereport(NOTICE, + (errmsg("%s", error->message))); + FreeErrorData(error); + } + static void CopySendEndOfRow(CopyState cstate) { StringInfo fe_msgbuf = cstate->fe_msgbuf; + FILE *file; + bool should_add_newline; + + /* determine where are we writing to */ + if (cstate->ignore_exceptions) + { + file = cstate->exc_file; + /* + * We should only add a newline if we're not sending the frontend what + * it has just sent us and in any case we shouldn't do this for binary + * copy. + */ + should_add_newline = (cstate->copy_src == COPY_FILE && !cstate->binary); + } + else + { + file = cstate->copy_file; + should_add_newline = !(cstate->binary); + } switch (cstate->copy_dest) { case COPY_FILE: ! if (should_add_newline) { /* Default line termination depends on platform */ #ifndef WIN32 *************** CopySendEndOfRow(CopyState cstate) *** 490,498 **** #endif } ! if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, ! cstate->copy_file) != 1 || ! ferror(cstate->copy_file)) { if (cstate->is_program) { --- 564,571 ---- #endif } ! if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, file) != 1 || ! ferror(file)) { if (cstate->is_program) { *************** CopySendEndOfRow(CopyState cstate) *** 525,531 **** break; case COPY_OLD_FE: /* The FE/BE protocol uses \n as newline for all platforms */ ! if (!cstate->binary) CopySendChar(cstate, '\n'); if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len)) --- 598,604 ---- break; case COPY_OLD_FE: /* The FE/BE protocol uses \n as newline for all platforms */ ! if (should_add_newline) CopySendChar(cstate, '\n'); if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len)) *************** CopySendEndOfRow(CopyState cstate) *** 538,544 **** break; case COPY_NEW_FE: /* The FE/BE protocol uses \n as newline for all platforms */ ! if (!cstate->binary) CopySendChar(cstate, '\n'); /* Dump the accumulated row as one CopyData message */ --- 611,617 ---- break; case COPY_NEW_FE: /* The FE/BE protocol uses \n as newline for all platforms */ ! if (should_add_newline) CopySendChar(cstate, '\n'); /* Dump the accumulated row as one CopyData message */ *************** CopySendEndOfRow(CopyState cstate) *** 546,552 **** break; } ! resetStringInfo(fe_msgbuf); } /* --- 619,630 ---- break; } ! /* ! * Avoid resetting the buffer we reused to send the exception line back to ! * the frontend. ! */ ! if (!cstate->ignore_exceptions || cstate->copy_src == COPY_FILE) ! resetStringInfo(fe_msgbuf); } /* *************** CopyGetData(CopyState cstate, void *data *** 567,573 **** { int bytesread = 0; ! switch (cstate->copy_dest) { case COPY_FILE: bytesread = fread(databuf, 1, maxread, cstate->copy_file); --- 645,651 ---- { int bytesread = 0; ! switch (cstate->copy_src) { case COPY_FILE: bytesread = fread(databuf, 1, maxread, cstate->copy_file); *************** DoCopy(const CopyStmt *stmt, const char *** 919,930 **** PreventCommandIfReadOnly("COPY FROM"); cstate = BeginCopyFrom(rel, stmt->filename, stmt->is_program, ! stmt->attlist, stmt->options); *processed = CopyFrom(cstate); /* copy from file to database */ EndCopyFrom(cstate); } else { cstate = BeginCopyTo(rel, query, queryString, relid, stmt->filename, stmt->is_program, stmt->attlist, stmt->options); --- 997,1018 ---- PreventCommandIfReadOnly("COPY FROM"); cstate = BeginCopyFrom(rel, stmt->filename, stmt->is_program, ! stmt->exc_filename, stmt->attlist, stmt->options); *processed = CopyFrom(cstate); /* copy from file to database */ + if (cstate->exceptions) + ereport(NOTICE, + (errmsg("total exceptions ignored: " UINT64_FORMAT, + cstate->exceptions))); EndCopyFrom(cstate); } else { + if (stmt->exc_filename != NULL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("EXCEPTIONS TO not allowed with COPY ... TO"), + errhint("see COPY ... FROM"))); + cstate = BeginCopyTo(rel, query, queryString, relid, stmt->filename, stmt->is_program, stmt->attlist, stmt->options); *************** BeginCopy(bool is_from, *** 1561,1566 **** --- 1649,1655 ---- /* See Multibyte encoding comment above */ cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); + cstate->copy_src = COPY_FILE; /* default */ cstate->copy_dest = COPY_FILE; /* default */ MemoryContextSwitchTo(oldcontext); *************** EndCopy(CopyState cstate) *** 1608,1613 **** --- 1697,1708 ---- cstate->filename))); } + if (cstate->exc_filename != NULL && FreeFile(cstate->exc_file)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", + cstate->exc_filename))); + MemoryContextDelete(cstate->copycontext); pfree(cstate); } *************** CopyFrom(CopyState cstate) *** 2331,2336 **** --- 2426,2432 ---- { TupleTableSlot *slot; bool skip_tuple; + bool depleted; Oid loaded_oid = InvalidOid; CHECK_FOR_INTERRUPTS(); *************** CopyFrom(CopyState cstate) *** 2348,2356 **** /* Switch into its memory context */ MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); ! if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid)) break; /* And now we can form the input tuple. */ tuple = heap_form_tuple(tupDesc, values, nulls); --- 2444,2475 ---- /* Switch into its memory context */ MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); ! skip_tuple = false; ! depleted = false; ! ! PG_TRY(); ! { ! if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid)) ! /* can't break right here due to PG_TRY using do/while(0) */ ! depleted = true; ! } ! PG_CATCH(); ! { ! if (!cstate->ignore_exceptions) ! PG_RE_THROW(); ! ! skip_tuple = true; ! MemoryContextSwitchTo(oldcontext); ! CopySendException(cstate); ! } ! PG_END_TRY(); ! ! if (depleted) break; + if (skip_tuple) + continue; + /* And now we can form the input tuple. */ tuple = heap_form_tuple(tupDesc, values, nulls); *************** CopyFrom(CopyState cstate) *** 2376,2382 **** if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_before_row) { ! slot = ExecBRInsertTriggers(estate, resultRelInfo, slot); if (slot == NULL) /* "do nothing" */ skip_tuple = true; --- 2495,2514 ---- if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_before_row) { ! PG_TRY(); ! { ! slot = ExecBRInsertTriggers(estate, resultRelInfo, slot); ! } ! PG_CATCH(); ! { ! if (!cstate->ignore_exceptions) ! PG_RE_THROW(); ! ! slot = NULL; ! MemoryContextSwitchTo(oldcontext); ! CopySendException(cstate); ! } ! PG_END_TRY(); if (slot == NULL) /* "do nothing" */ skip_tuple = true; *************** CopyFrom(CopyState cstate) *** 2384,2395 **** tuple = ExecMaterializeSlot(slot); } ! if (!skip_tuple) { ! /* Check the constraints of the tuple */ ! if (cstate->rel->rd_att->constr) ExecConstraints(resultRelInfo, slot, estate); if (useHeapMultiInsert) { /* Add this tuple to the tuple buffer */ --- 2516,2545 ---- tuple = ExecMaterializeSlot(slot); } ! if (skip_tuple) ! continue; ! ! /* Check the constraints of the tuple */ ! if (cstate->rel->rd_att->constr) { ! PG_TRY(); ! { ExecConstraints(resultRelInfo, slot, estate); + } + PG_CATCH(); + { + if (!cstate->ignore_exceptions) + PG_RE_THROW(); + skip_tuple = true; + MemoryContextSwitchTo(oldcontext); + CopySendException(cstate); + } + PG_END_TRY(); + } + + if (!skip_tuple) + { if (useHeapMultiInsert) { /* Add this tuple to the tuple buffer */ *************** CopyState *** 2573,2583 **** --- 2723,2736 ---- BeginCopyFrom(Relation rel, const char *filename, bool is_program, + const char *exc_filename, List *attnamelist, List *options) { CopyState cstate; bool pipe = (filename == NULL); + bool ignore_exceptions = (exc_filename != NULL); + bool exc_pipe = (exc_filename != NULL && *exc_filename == 0); TupleDesc tupDesc; Form_pg_attribute *attr; AttrNumber num_phys_attrs, *************** BeginCopyFrom(Relation rel, *** 2686,2691 **** --- 2839,2900 ---- cstate->volatile_defexprs = volatile_defexprs; cstate->num_defaults = num_defaults; cstate->is_program = is_program; + cstate->ignore_exceptions = ignore_exceptions; + + if (ignore_exceptions) + { + if (exc_pipe) + { + if (whereToSendOutput == DestRemote) + { + if (!pipe) + SendCopyBegin(cstate); + else + ; /* handled by ReceiveCopyBegin() call below */ + } + else + { + cstate->exc_file = stdout; + } + } + else + { + if (!is_absolute_path(exc_filename)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_NAME), + errmsg("relative path not allowed for EXCEPTIONS TO file"))); + + if (!pipe) + { + struct stat stbuf_exc; + struct stat stbuf_from; + + /* check if both FROM and EXCEPTIONS TO are the same file */ + if (stat(exc_filename, &stbuf_exc) == 0 && + stat(filename, &stbuf_from) == 0 && + stbuf_exc.st_dev == stbuf_from.st_dev && + stbuf_exc.st_ino == stbuf_from.st_ino) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_NAME), + errmsg("cannot specify the same file for COPY FROM and EXCEPTIONS TO"))); + } + /* + * We won't send or receive data via frontend, but we still + * need to the buffer for CopySendException() to work with. + */ + cstate->fe_msgbuf = makeStringInfo(); + } + + cstate->exc_filename = pstrdup(exc_filename); + cstate->exc_file = AllocateFile(cstate->exc_filename, PG_BINARY_W); + if (cstate->exc_file == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" for writing: %m", + cstate->exc_filename))); + } + } if (pipe) { *************** NextCopyFrom(CopyState cstate, ExprConte *** 3019,3025 **** */ char dummy; ! if (cstate->copy_dest != COPY_OLD_FE && CopyGetData(cstate, &dummy, 1, 1) > 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), --- 3228,3234 ---- */ char dummy; ! if (cstate->copy_src != COPY_OLD_FE && CopyGetData(cstate, &dummy, 1, 1) > 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), *************** CopyReadLine(CopyState cstate) *** 3133,3139 **** * after \. up to the protocol end of copy data. (XXX maybe better * not to treat \. as special?) */ ! if (cstate->copy_dest == COPY_NEW_FE) { do { --- 3342,3348 ---- * after \. up to the protocol end of copy data. (XXX maybe better * not to treat \. as special?) */ ! if (cstate->copy_src == COPY_NEW_FE) { do { diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y new file mode 100644 index 6431601..47f2be2 *** a/src/backend/parser/gram.y --- b/src/backend/parser/gram.y *************** static Node *makeRecursiveViewSelect(cha *** 307,313 **** %type <defelt> event_trigger_when_item %type <chr> enable_trigger ! %type <str> copy_file_name database_name access_method_clause access_method attr_name name cursor_name file_name index_name opt_index_name cluster_index_specification --- 307,313 ---- %type <defelt> event_trigger_when_item %type <chr> enable_trigger ! %type <str> copy_file_name opt_copy_exceptions database_name access_method_clause access_method attr_name name cursor_name file_name index_name opt_index_name cluster_index_specification *************** static Node *makeRecursiveViewSelect(cha *** 561,567 **** DEFERRABLE DEFERRED DEFINER DELETE_P DELIMITER DELIMITERS DESC DICTIONARY DISABLE_P DISCARD DISTINCT DO DOCUMENT_P DOMAIN_P DOUBLE_P DROP ! EACH ELSE ENABLE_P ENCODING ENCRYPTED END_P ENUM_P ESCAPE EVENT EXCEPT EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN EXTENSION EXTERNAL EXTRACT --- 561,568 ---- DEFERRABLE DEFERRED DEFINER DELETE_P DELIMITER DELIMITERS DESC DICTIONARY DISABLE_P DISCARD DISTINCT DO DOCUMENT_P DOMAIN_P DOUBLE_P DROP ! EACH ELSE ENABLE_P ENCODING ENCRYPTED END_P ENUM_P ESCAPE EVENT ! EXCEPT EXCEPTIONS EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN EXTENSION EXTERNAL EXTRACT *************** ClosePortalStmt: *** 2515,2521 **** /***************************************************************************** * * QUERY : ! * COPY relname [(columnList)] FROM/TO file [WITH] [(options)] * COPY ( SELECT ... ) TO file [WITH] [(options)] * * where 'file' can be one of: --- 2516,2523 ---- /***************************************************************************** * * QUERY : ! * COPY relname [(columnList)] FROM/TO file [EXCEPTIONS TO file] ! * [WITH] [(options)] * COPY ( SELECT ... ) TO file [WITH] [(options)] * * where 'file' can be one of: *************** ClosePortalStmt: *** 2534,2540 **** *****************************************************************************/ CopyStmt: COPY opt_binary qualified_name opt_column_list opt_oids ! copy_from opt_program copy_file_name copy_delimiter opt_with copy_options { CopyStmt *n = makeNode(CopyStmt); n->relation = $3; --- 2536,2543 ---- *****************************************************************************/ CopyStmt: COPY opt_binary qualified_name opt_column_list opt_oids ! copy_from opt_program copy_file_name opt_copy_exceptions copy_delimiter ! opt_with copy_options { CopyStmt *n = makeNode(CopyStmt); n->relation = $3; *************** CopyStmt: COPY opt_binary qualified_name *** 2543,2548 **** --- 2546,2552 ---- n->is_from = $6; n->is_program = $7; n->filename = $8; + n->exc_filename = $9; if (n->is_program && n->filename == NULL) ereport(ERROR, *************** CopyStmt: COPY opt_binary qualified_name *** 2556,2565 **** n->options = lappend(n->options, $2); if ($5) n->options = lappend(n->options, $5); ! if ($9) ! n->options = lappend(n->options, $9); ! if ($11) ! n->options = list_concat(n->options, $11); $$ = (Node *)n; } | COPY select_with_parens TO opt_program copy_file_name opt_with copy_options --- 2560,2569 ---- n->options = lappend(n->options, $2); if ($5) n->options = lappend(n->options, $5); ! if ($10) ! n->options = lappend(n->options, $10); ! if ($12) ! n->options = list_concat(n->options, $12); $$ = (Node *)n; } | COPY select_with_parens TO opt_program copy_file_name opt_with copy_options *************** copy_file_name: *** 2604,2609 **** --- 2608,2618 ---- | STDOUT { $$ = NULL; } ; + opt_copy_exceptions: + EXCEPTIONS TO copy_file_name { $$ = ($3 ? $3 : ""); } + | /* EMPTY */ { $$ = NULL; } + ; + copy_options: copy_opt_list { $$ = $1; } | '(' copy_generic_opt_list ')' { $$ = $2; } ; *************** unreserved_keyword: *** 13142,13147 **** --- 13151,13157 ---- | ENUM_P | ESCAPE | EVENT + | EXCEPTIONS | EXCLUDE | EXCLUDING | EXCLUSIVE diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c new file mode 100644 index 66d80b5..d308286 *** a/src/bin/psql/common.c --- b/src/bin/psql/common.c *************** AcceptResult(const PGresult *result) *** 388,393 **** --- 388,394 ---- case PGRES_EMPTY_QUERY: case PGRES_COPY_IN: case PGRES_COPY_OUT: + case PGRES_COPY_BOTH: /* Fine, do nothing */ OK = true; break; *************** ProcessResult(PGresult **results) *** 751,756 **** --- 752,758 ---- case PGRES_COPY_OUT: case PGRES_COPY_IN: + case PGRES_COPY_BOTH: is_copy = true; break; *************** ProcessResult(PGresult **results) *** 777,784 **** SetCancelConn(); if (result_status == PGRES_COPY_OUT) { ! if (!copystream) copystream = pset.queryFout; success = handleCopyOut(pset.db, copystream, ©_result) && success; --- 779,793 ---- SetCancelConn(); if (result_status == PGRES_COPY_OUT) { ! /* ! * If we have the stream for exceptions, then this must be the ! * capture phase: use it. ! */ ! if (pset.copyExcStream) ! copystream = pset.copyExcStream; ! else if (!copystream) copystream = pset.queryFout; + success = handleCopyOut(pset.db, copystream, ©_result) && success; *************** ProcessResult(PGresult **results) *** 794,800 **** copy_result = NULL; } } ! else { if (!copystream) copystream = pset.cur_cmd_source; --- 803,809 ---- copy_result = NULL; } } ! else /* PGRES_COPY_IN or PGRES_COPY_BOTH */ { if (!copystream) copystream = pset.cur_cmd_source; *************** PrintQueryResults(PGresult *results) *** 913,918 **** --- 922,928 ---- case PGRES_COPY_OUT: case PGRES_COPY_IN: + case PGRES_COPY_BOTH: /* nothing to do here */ success = true; break; diff --git a/src/bin/psql/copy.c b/src/bin/psql/copy.c new file mode 100644 index 010a593..cc36071 *** a/src/bin/psql/copy.c --- b/src/bin/psql/copy.c *************** struct copy_options *** 58,63 **** --- 58,66 ---- bool program; /* is 'file' a program to popen? */ bool psql_inout; /* true = use psql stdin/stdout */ bool from; /* true = FROM, false = TO */ + bool exceptions; /* has EXCEPTIONS TO */ + char *exc_file; /* NULL = stdin/stdout */ + bool exc_psql_inout; /* true = use psql stdout for exceptions */ }; *************** free_copy_options(struct copy_options * *** 69,74 **** --- 72,78 ---- free(ptr->before_tofrom); free(ptr->after_tofrom); free(ptr->file); + free(ptr->exc_file); free(ptr); } *************** parse_slash_copy(const char *args) *** 240,250 **** expand_tilde(&result->file); } /* Collect the rest of the line (COPY options) */ token = strtokx(NULL, "", NULL, NULL, 0, false, false, pset.encoding); if (token) ! result->after_tofrom = pg_strdup(token); return result; --- 244,302 ---- expand_tilde(&result->file); } + result->after_tofrom = pg_strdup(""); /* initialize for appending */ + + /* check for COPY FROM ... EXCEPTIONS TO */ + if (result->from) + { + token = strtokx(NULL, whitespace, NULL, NULL, + 0, false, false, pset.encoding); + if (token) + { + if (pg_strcasecmp(token, "exceptions") == 0) + { + result->exceptions = true; + + token = strtokx(NULL, whitespace, NULL, NULL, + 0, false, false, pset.encoding); + if (!token || pg_strcasecmp(token, "to") != 0) + goto error; + + token = strtokx(NULL, whitespace, ";", "'", + 0, false, false, pset.encoding); + if (!token) + goto error; + + if (pg_strcasecmp(token, "stdin") == 0 || + pg_strcasecmp(token, "stdout") == 0) + { + result->exc_file = NULL; + } + else if (pg_strcasecmp(token, "pstdin") == 0 || + pg_strcasecmp(token, "pstdout") == 0) + { + result->exc_psql_inout = true; + result->exc_file = NULL; + } + else + { + strip_quotes(token, '\'', 0, pset.encoding); + result->exc_file = pg_strdup(token); + expand_tilde(&result->exc_file); + } + } + else + { + xstrcat(&result->after_tofrom, token); + } + } + } + /* Collect the rest of the line (COPY options) */ token = strtokx(NULL, "", NULL, NULL, 0, false, false, pset.encoding); if (token) ! xstrcat(&result->after_tofrom, token); return result; *************** do_copy(const char *args) *** 269,274 **** --- 321,327 ---- { PQExpBufferData query; FILE *copystream; + FILE *excstream; struct copy_options *options; bool success; *************** do_copy(const char *args) *** 278,287 **** if (!options) return false; ! /* prepare to read or write the target file */ if (options->file && !options->program) canonicalize_path(options->file); if (options->from) { if (options->file) --- 331,346 ---- if (!options) return false; ! /* prepare to read or write the target file(s) */ if (options->file && !options->program) canonicalize_path(options->file); + if (options->exc_file) + canonicalize_path(options->exc_file); + + copystream = NULL; + excstream = NULL; + if (options->from) { if (options->file) *************** do_copy(const char *args) *** 294,305 **** --- 353,394 ---- copystream = popen(options->file, PG_BINARY_R); } else + { + if (options->exc_file) + { + struct stat stbuf_exc; + struct stat stbuf_from; + + /* check if both FROM and EXCEPTIONS TO are the same file */ + if (stat(options->exc_file, &stbuf_exc) == 0 && + stat(options->file, &stbuf_from) == 0 && + stbuf_exc.st_dev == stbuf_from.st_dev && + stbuf_exc.st_ino == stbuf_from.st_ino) + { + psql_error("COPY FROM and EXCEPTIONS TO cannot point to the same file\n"); + free_copy_options(options); + return false; + } + } copystream = fopen(options->file, PG_BINARY_R); + } } else if (!options->psql_inout) copystream = pset.cur_cmd_source; else copystream = stdin; + + if (options->exceptions) + { + if (options->exc_file) + { + excstream = fopen(options->exc_file, PG_BINARY_W); + } + else if (!options->exc_psql_inout) + excstream = pset.queryFout; + else + excstream = stdout; + } } else { *************** do_copy(const char *args) *** 332,337 **** --- 421,438 ---- else psql_error("%s: %s\n", options->file, strerror(errno)); + if (options->exc_file && excstream) + fclose(excstream); + free_copy_options(options); + return false; + } + + if (options->exceptions && !excstream) + { + psql_error("%s: %s\n", + options->exc_file, strerror(errno)); + if (options->file) + fclose(copystream); free_copy_options(options); return false; } *************** do_copy(const char *args) *** 353,358 **** --- 454,461 ---- if (result < 0 || S_ISDIR(st.st_mode)) { fclose(copystream); + if (options->exc_file && excstream) + fclose(excstream); free_copy_options(options); return false; } *************** do_copy(const char *args) *** 366,378 **** --- 469,485 ---- appendPQExpBufferStr(&query, " FROM STDIN "); else appendPQExpBufferStr(&query, " TO STDOUT "); + if (options->exceptions) + appendPQExpBufferStr(&query, " EXCEPTIONS TO STDOUT "); if (options->after_tofrom) appendPQExpBufferStr(&query, options->after_tofrom); /* run it like a user command, but with copystream as data source/sink */ pset.copyStream = copystream; + pset.copyExcStream = excstream; success = SendQuery(query.data); pset.copyStream = NULL; + pset.copyExcStream = NULL; termPQExpBuffer(&query); if (options->file != NULL) *************** do_copy(const char *args) *** 410,415 **** --- 517,530 ---- } } } + if (options->exc_file != NULL) + { + if (fclose(excstream) != 0) + { + psql_error("%s: %s\n", options->exc_file, strerror(errno)); + success = false; + } + } free_copy_options(options); return success; } diff --git a/src/bin/psql/settings.h b/src/bin/psql/settings.h new file mode 100644 index ef24a4e..36c9300 *** a/src/bin/psql/settings.h --- b/src/bin/psql/settings.h *************** typedef struct _psqlSettings *** 72,77 **** --- 72,78 ---- bool queryFoutPipe; /* queryFout is from a popen() */ FILE *copyStream; /* Stream to read/write for \copy command */ + FILE *copyExcStream; /* Stream to read exceptions for \copy command */ printQueryOpt popt; diff --git a/src/bin/psql/startup.c b/src/bin/psql/startup.c new file mode 100644 index 11a159a..f1f65df *** a/src/bin/psql/startup.c --- b/src/bin/psql/startup.c *************** main(int argc, char *argv[]) *** 122,127 **** --- 122,128 ---- pset.queryFout = stdout; pset.queryFoutPipe = false; pset.copyStream = NULL; + pset.copyExcStream = NULL; pset.cur_cmd_source = stdin; pset.cur_cmd_interactive = false; diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c new file mode 100644 index 82c926d..361cd80 *** a/src/bin/psql/tab-complete.c --- b/src/bin/psql/tab-complete.c *************** psql_completion(const char *text, int st *** 2169,2175 **** completion_charp = ""; matches = completion_matches(text, complete_from_files); } - /* Handle COPY|BINARY <sth> FROM|TO filename */ else if ((pg_strcasecmp(prev4_wd, "COPY") == 0 || pg_strcasecmp(prev4_wd, "\\copy") == 0 || --- 2169,2174 ---- *************** psql_completion(const char *text, int st *** 2178,2187 **** pg_strcasecmp(prev2_wd, "TO") == 0)) { static const char *const list_COPY[] = ! {"BINARY", "OIDS", "DELIMITER", "NULL", "CSV", "ENCODING", NULL}; ! COMPLETE_WITH_LIST(list_COPY); } /* Handle COPY|BINARY <sth> FROM|TO filename CSV */ else if (pg_strcasecmp(prev_wd, "CSV") == 0 && --- 2177,2193 ---- pg_strcasecmp(prev2_wd, "TO") == 0)) { static const char *const list_COPY[] = ! {"BINARY", "OIDS", "DELIMITER", "NULL", "CSV", "ENCODING", "EXCEPTIONS TO", NULL}; COMPLETE_WITH_LIST(list_COPY); } + /* If we have [COPY...] FROM <sth> EXCEPTIONS TO, complete with filename */ + else if (pg_strcasecmp(prev4_wd, "FROM") == 0 && + pg_strcasecmp(prev2_wd, "EXCEPTIONS") == 0 && + pg_strcasecmp(prev_wd, "TO") == 0) + { + completion_charp = ""; + matches = completion_matches(text, complete_from_files); + } /* Handle COPY|BINARY <sth> FROM|TO filename CSV */ else if (pg_strcasecmp(prev_wd, "CSV") == 0 && diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h new file mode 100644 index ba0f1b3..7137dfc *** a/src/include/commands/copy.h --- b/src/include/commands/copy.h *************** extern Oid DoCopy(const CopyStmt *stmt, *** 26,32 **** extern void ProcessCopyOptions(CopyState cstate, bool is_from, List *options); extern CopyState BeginCopyFrom(Relation rel, const char *filename, ! bool is_program, List *attnamelist, List *options); extern void EndCopyFrom(CopyState cstate); extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext, Datum *values, bool *nulls, Oid *tupleOid); --- 26,33 ---- extern void ProcessCopyOptions(CopyState cstate, bool is_from, List *options); extern CopyState BeginCopyFrom(Relation rel, const char *filename, ! bool is_program, const char *exc_filename, ! List *attnamelist, List *options); extern void EndCopyFrom(CopyState cstate); extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext, Datum *values, bool *nulls, Oid *tupleOid); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h new file mode 100644 index 9141c30..93f50cb *** a/src/include/nodes/parsenodes.h --- b/src/include/nodes/parsenodes.h *************** typedef struct CopyStmt *** 1513,1518 **** --- 1513,1519 ---- bool is_from; /* TO or FROM */ bool is_program; /* is 'filename' a program to popen? */ char *filename; /* filename, or NULL for STDIN/STDOUT */ + char *exc_filename; /* filename for exceptions or NULL, empty string for STDOUT */ List *options; /* List of DefElem nodes */ } CopyStmt; diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h new file mode 100644 index e14dc9a..c44f02b *** a/src/include/parser/kwlist.h --- b/src/include/parser/kwlist.h *************** PG_KEYWORD("enum", ENUM_P, UNRESERVED_KE *** 143,148 **** --- 143,149 ---- PG_KEYWORD("escape", ESCAPE, UNRESERVED_KEYWORD) PG_KEYWORD("event", EVENT, UNRESERVED_KEYWORD) PG_KEYWORD("except", EXCEPT, RESERVED_KEYWORD) + PG_KEYWORD("exceptions", EXCEPTIONS, UNRESERVED_KEYWORD) PG_KEYWORD("exclude", EXCLUDE, UNRESERVED_KEYWORD) PG_KEYWORD("excluding", EXCLUDING, UNRESERVED_KEYWORD) PG_KEYWORD("exclusive", EXCLUSIVE, UNRESERVED_KEYWORD) diff --git a/src/interfaces/ecpg/preproc/ecpg.addons b/src/interfaces/ecpg/preproc/ecpg.addons new file mode 100644 index b3b36cf..0a415e6 *** a/src/interfaces/ecpg/preproc/ecpg.addons --- b/src/interfaces/ecpg/preproc/ecpg.addons *************** ECPG: where_or_current_clauseWHERECURREN *** 192,198 **** char *cursor_marker = $4[0] == ':' ? mm_strdup("$0") : $4; $$ = cat_str(2,mm_strdup("where current of"), cursor_marker); } ! ECPG: CopyStmtCOPYopt_binaryqualified_nameopt_column_listopt_oidscopy_fromopt_programcopy_file_namecopy_delimiteropt_withcopy_options addon if (strcmp($6, "from") == 0 && (strcmp($7, "stdin") == 0 || strcmp($7, "stdout") == 0)) mmerror(PARSE_ERROR, ET_WARNING, "COPY FROM STDIN is not implemented"); --- 192,198 ---- char *cursor_marker = $4[0] == ':' ? mm_strdup("$0") : $4; $$ = cat_str(2,mm_strdup("where current of"), cursor_marker); } ! ECPG: CopyStmtCOPYopt_binaryqualified_nameopt_column_listopt_oidscopy_fromopt_programcopy_file_nameopt_copy_exceptionscopy_delimiteropt_withcopy_options addon if (strcmp($6, "from") == 0 && (strcmp($7, "stdin") == 0 || strcmp($7, "stdout") == 0)) mmerror(PARSE_ERROR, ET_WARNING, "COPY FROM STDIN is not implemented"); -- 2.1.0
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers