On Fri, Nov 15, 2019 at 6:24 PM Alexey Kondratov <[email protected]>
wrote:
> On 11.11.2019 16:00, Surafel Temesgen wrote:
> >
> >
> > Next, you use DestRemoteSimple for returning conflicting tuples back:
> >
> > + dest = CreateDestReceiver(DestRemoteSimple);
> > + dest->rStartup(dest, (int) CMD_SELECT, tupDesc);
> >
> > However, printsimple supports very limited subset of built-in
> > types, so
> >
> > CREATE TABLE large_test (id integer primary key, num1 bigint, num2
> > double precision);
> > COPY large_test FROM '/path/to/copy-test.tsv';
> > COPY large_test FROM '/path/to/copy-test.tsv' ERROR 3;
> >
> > fails with following error 'ERROR: unsupported type OID: 701', which
> > seems to be very confusing from the end user perspective. I've
> > tried to
> > switch to DestRemote, but couldn't figure it out quickly.
> >
> >
> > fixed
>
> Thanks, now it works with my tests.
>
> 1) Maybe it is fine, but now I do not like this part:
>
> + portal = GetPortalByName("");
> + dest = CreateDestReceiver(DestRemote);
> + SetRemoteDestReceiverParams(dest, portal);
> + dest->rStartup(dest, (int) CMD_SELECT, tupDesc);
>
> Here you implicitly use the fact that portal with a blank name is always
> created in exec_simple_query before we get to this point. Next, you
> create new DestReceiver and set it to this portal, but it is also
> already created and set in the exec_simple_query.
>
> Would it be better if you just explicitly pass ready DestReceiver to
> DoCopy (similarly to how it is done for T_ExecuteStmt / ExecuteQuery),
>
>
Good idea .Thank you
>
> 2) My second concern is that you use three internal flags to track
> errors limit:
>
> + int error_limit; /* total number of error to ignore */
> + bool ignore_error; /* is ignore error specified? */
> + bool ignore_all_error; /* is error_limit -1 (ignore all
> error)
> + * specified? */
>
> Though it seems that we can just leave error_limit as a user-defined
> constant and track errors with something like errors_count. In that case
> you do not need auxiliary ignore_all_error flag. But probably it is a
> matter of personal choice.
>
>
using bool flags will save as from using integer type as a boolean and hold
the fact
error limit was specified even if it became zero and it seems to me it is
straightforward
to treat ignore_all_error separately.
Attache is the patch that use already created DestReceiver
regards
Surafel
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index d9b7c4d0d4..ffcfe1e8d3 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -44,6 +44,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
FORCE_NOT_NULL ( <replaceable class="parameter">column_name</replaceable> [, ...] )
FORCE_NULL ( <replaceable class="parameter">column_name</replaceable> [, ...] )
ENCODING '<replaceable class="parameter">encoding_name</replaceable>'
+ ERROR_LIMIT '<replaceable class="parameter">limit_number</replaceable>'
</synopsis>
</refsynopsisdiv>
@@ -355,6 +356,23 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><literal>ERROR_LIMIT</literal></term>
+ <listitem>
+ <para>
+ Specifies to return error record up to <replaceable
+ class="parameter">limit_number</replaceable> number.
+ specifying it to -1 returns all error record.
+ </para>
+
+ <para>
+ Currently, only unique or exclusion constraint violation
+ and same record formatting error is ignored.
+ </para>
+
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><literal>WHERE</literal></term>
<listitem>
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index e17d8c760f..c911b3d0c2 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -24,6 +24,7 @@
#include "access/tableam.h"
#include "access/xact.h"
#include "access/xlog.h"
+#include "access/printtup.h"
#include "catalog/dependency.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
@@ -48,7 +49,9 @@
#include "port/pg_bswap.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
+#include "storage/lmgr.h"
#include "tcop/tcopprot.h"
+#include "tcop/pquery.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@@ -154,6 +157,7 @@ typedef struct CopyStateData
List *convert_select; /* list of column names (can be NIL) */
bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
Node *whereClause; /* WHERE condition (or NULL) */
+ int error_limit; /* total number of error to ignore */
/* these are just for error messages, see CopyFromErrorCallback */
const char *cur_relname; /* table name for error messages */
@@ -183,6 +187,9 @@ typedef struct CopyStateData
bool volatile_defexprs; /* is any of defexprs volatile? */
List *range_table;
ExprState *qualexpr;
+ bool ignore_error; /* is ignore error specified? */
+ bool ignore_all_error; /* is error_limit -1 (ignore all error)
+ * specified? */
TransitionCaptureState *transition_capture;
@@ -837,7 +844,7 @@ CopyLoadRawBuf(CopyState cstate)
void
DoCopy(ParseState *pstate, const CopyStmt *stmt,
int stmt_location, int stmt_len,
- uint64 *processed)
+ uint64 *processed, DestReceiver *dest)
{
CopyState cstate;
bool is_from = stmt->is_from;
@@ -1068,7 +1075,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
NULL, stmt->attlist, stmt->options);
cstate->whereClause = whereClause;
- *processed = CopyFrom(cstate); /* copy from file to database */
+ *processed = CopyFrom(cstate, dest); /* copy from file to database */
EndCopyFrom(cstate);
}
else
@@ -1290,6 +1297,18 @@ ProcessCopyOptions(ParseState *pstate,
defel->defname),
parser_errposition(pstate, defel->location)));
}
+ else if (strcmp(defel->defname, "error_limit") == 0)
+ {
+ if (cstate->ignore_error)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
+ cstate->error_limit = defGetInt64(defel);
+ cstate->ignore_error = true;
+ if (cstate->error_limit == -1)
+ cstate->ignore_all_error = true;
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -1440,6 +1459,10 @@ ProcessCopyOptions(ParseState *pstate,
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("CSV quote character must not appear in the NULL specification")));
+ if (cstate->ignore_error && !cstate->is_copy_from)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("ERROR LIMIT only available using COPY FROM")));
}
/*
@@ -2653,7 +2676,7 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
* Copy FROM file to relation.
*/
uint64
-CopyFrom(CopyState cstate)
+CopyFrom(CopyState cstate, DestReceiver *dest)
{
ResultRelInfo *resultRelInfo;
ResultRelInfo *target_resultRelInfo;
@@ -2675,6 +2698,7 @@ CopyFrom(CopyState cstate)
bool has_before_insert_row_trig;
bool has_instead_insert_row_trig;
bool leafpart_use_multi_insert = false;
+ Portal portal = NULL;
Assert(cstate->rel);
@@ -2838,7 +2862,19 @@ CopyFrom(CopyState cstate)
/* Verify the named relation is a valid target for INSERT */
CheckValidResultRel(resultRelInfo, CMD_INSERT);
- ExecOpenIndices(resultRelInfo, false);
+ if (cstate->ignore_error)
+ {
+ TupleDesc tupDesc;
+
+ ExecOpenIndices(resultRelInfo, true);
+ tupDesc = RelationGetDescr(cstate->rel);
+
+ portal = GetPortalByName("");
+ SetRemoteDestReceiverParams(dest, portal);
+ dest->rStartup(dest, (int) CMD_SELECT, tupDesc);
+ }
+ else
+ ExecOpenIndices(resultRelInfo, false);
estate->es_result_relations = resultRelInfo;
estate->es_num_result_relations = 1;
@@ -2943,6 +2979,13 @@ CopyFrom(CopyState cstate)
*/
insertMethod = CIM_SINGLE;
}
+ else if (cstate->ignore_error)
+ {
+ /*
+ * Can't support speculative insertion in multi-inserts.
+ */
+ insertMethod = CIM_SINGLE;
+ }
else
{
/*
@@ -3286,6 +3329,63 @@ CopyFrom(CopyState cstate)
*/
myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
}
+ else if ((cstate->error_limit > 0 || cstate->ignore_all_error) && resultRelInfo->ri_NumIndices > 0)
+ {
+ /* Perform a speculative insertion. */
+ uint32 specToken;
+ ItemPointerData conflictTid;
+ bool specConflict;
+
+ /*
+ * Do a non-conclusive check for conflicts first.
+ */
+ specConflict = false;
+
+ if (!ExecCheckIndexConstraints(myslot, estate, &conflictTid,
+ NIL))
+ {
+ (void) dest->receiveSlot(myslot, dest);
+ cstate->error_limit--;
+ continue;
+ }
+
+ /*
+ * Acquire our speculative insertion lock.
+ */
+ specToken = SpeculativeInsertionLockAcquire(GetCurrentTransactionId());
+
+ /* insert the tuple, with the speculative token */
+ table_tuple_insert_speculative(resultRelInfo->ri_RelationDesc, myslot,
+ estate->es_output_cid,
+ 0,
+ NULL,
+ specToken);
+
+ /* insert index entries for tuple */
+ recheckIndexes = ExecInsertIndexTuples(myslot, estate, true,
+ &specConflict,
+ NIL);
+
+ /* adjust the tuple's state accordingly */
+ table_tuple_complete_speculative(resultRelInfo->ri_RelationDesc, myslot,
+ specToken, !specConflict);
+
+ /*
+ * Wake up anyone waiting for our decision.
+ */
+ SpeculativeInsertionLockRelease(GetCurrentTransactionId());
+
+ /*
+ * If there was a conflict, return it and preceded to
+ * the next record if there are any.
+ */
+ if (specConflict)
+ {
+ (void) dest->receiveSlot(myslot, dest);
+ cstate->error_limit--;
+ continue;
+ }
+ }
else
{
/* OK, store the tuple and create index entries for it */
@@ -3703,7 +3803,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
/* Initialize all values for row to NULL */
MemSet(values, 0, num_phys_attrs * sizeof(Datum));
MemSet(nulls, true, num_phys_attrs * sizeof(bool));
-
+next_line:
if (!cstate->binary)
{
char **field_strings;
@@ -3718,9 +3818,21 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
/* check for overflowing fields */
if (attr_count > 0 && fldct > attr_count)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("extra data after last expected column")));
+ {
+ if (cstate->error_limit > 0 || cstate->ignore_all_error)
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("skipping \"%s\" --- extra data after last expected column",
+ cstate->line_buf.data)));
+ cstate->error_limit--;
+ goto next_line;
+ }
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("extra data after last expected column")));
+ }
fieldno = 0;
@@ -3732,10 +3844,22 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
Form_pg_attribute att = TupleDescAttr(tupDesc, m);
if (fieldno >= fldct)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("missing data for column \"%s\"",
- NameStr(att->attname))));
+ {
+ if (cstate->error_limit > 0 || cstate->ignore_all_error)
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("skipping \"%s\" --- missing data for column \"%s\"",
+ cstate->line_buf.data, NameStr(att->attname))));
+ cstate->error_limit--;
+ goto next_line;
+ }
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("missing data for column \"%s\"",
+ NameStr(att->attname))));
+ }
string = field_strings[fieldno++];
if (cstate->convert_select_flags &&
@@ -3822,10 +3946,23 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
}
if (fld_count != attr_count)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("row field count is %d, expected %d",
- (int) fld_count, attr_count)));
+ {
+ if (cstate->error_limit > 0 || cstate->ignore_all_error)
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("skipping \"%s\" --- row field count is %d, expected %d",
+ cstate->line_buf.data, (int) fld_count, attr_count)));
+ cstate->error_limit--;
+ goto next_line;
+ }
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("row field count is %d, expected %d",
+ (int) fld_count, attr_count)));
+
+ }
i = 0;
foreach(cur, cstate->attnumlist)
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 7881079e96..521696be29 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -791,7 +791,7 @@ copy_table(Relation rel)
cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL);
/* Do the copy */
- (void) CopyFrom(cstate);
+ (void) CopyFrom(cstate, NULL);
logicalrep_rel_close(relmapentry, NoLock);
}
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index e984545780..cb7b0c80d2 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -550,7 +550,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
DoCopy(pstate, (CopyStmt *) parsetree,
pstmt->stmt_location, pstmt->stmt_len,
- &processed);
+ &processed, dest);
if (completionTag)
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
"COPY " UINT64_FORMAT, processed);
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index bbe0105d77..16fc8e6a82 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -25,7 +25,7 @@ typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
extern void DoCopy(ParseState *state, const CopyStmt *stmt,
int stmt_location, int stmt_len,
- uint64 *processed);
+ uint64 *processed, DestReceiver *dest);
extern void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options);
extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename,
@@ -37,7 +37,7 @@ extern bool NextCopyFromRawFields(CopyState cstate,
char ***fields, int *nfields);
extern void CopyFromErrorCallback(void *arg);
-extern uint64 CopyFrom(CopyState cstate);
+extern uint64 CopyFrom(CopyState cstate, DestReceiver *dest);
extern DestReceiver *CreateCopyDestReceiver(void);
diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out
index c53ed3ebf5..37a77dcaa2 100644
--- a/src/test/regress/expected/copy2.out
+++ b/src/test/regress/expected/copy2.out
@@ -55,6 +55,11 @@ LINE 1: COPY x TO stdout WHERE a = 1;
^
COPY x from stdin WHERE a = 50004;
COPY x from stdin WHERE a > 60003;
+COPY x from stdin WITH(ERROR_LIMIT 5);
+WARNING: skipping "70001 22 32" --- missing data for column "d"
+WARNING: skipping "70002 23 33 43 53 54" --- extra data after last expected column
+WARNING: skipping "70003 24 34 44" --- missing data for column "e"
+
COPY x from stdin WHERE f > 60003;
ERROR: column "f" does not exist
LINE 1: COPY x from stdin WHERE f > 60003;
@@ -102,12 +107,14 @@ SELECT * FROM x;
50004 | 25 | 35 | 45 | before trigger fired
60004 | 25 | 35 | 45 | before trigger fired
60005 | 26 | 36 | 46 | before trigger fired
+ 70004 | 25 | 35 | 45 | before trigger fired
+ 70005 | 26 | 36 | 46 | before trigger fired
1 | 1 | stuff | test_1 | after trigger fired
2 | 2 | stuff | test_2 | after trigger fired
3 | 3 | stuff | test_3 | after trigger fired
4 | 4 | stuff | test_4 | after trigger fired
5 | 5 | stuff | test_5 | after trigger fired
-(28 rows)
+(30 rows)
-- check copy out
COPY x TO stdout;
@@ -134,6 +141,8 @@ COPY x TO stdout;
50004 25 35 45 before trigger fired
60004 25 35 45 before trigger fired
60005 26 36 46 before trigger fired
+70004 25 35 45 before trigger fired
+70005 26 36 46 before trigger fired
1 1 stuff test_1 after trigger fired
2 2 stuff test_2 after trigger fired
3 3 stuff test_3 after trigger fired
@@ -163,6 +172,8 @@ Delimiter before trigger fired
35 before trigger fired
35 before trigger fired
36 before trigger fired
+35 before trigger fired
+36 before trigger fired
stuff after trigger fired
stuff after trigger fired
stuff after trigger fired
@@ -192,6 +203,8 @@ I'm null before trigger fired
25 before trigger fired
25 before trigger fired
26 before trigger fired
+25 before trigger fired
+26 before trigger fired
1 after trigger fired
2 after trigger fired
3 after trigger fired
diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql
index 902f4fac19..2378f428fc 100644
--- a/src/test/regress/sql/copy2.sql
+++ b/src/test/regress/sql/copy2.sql
@@ -110,6 +110,14 @@ COPY x from stdin WHERE a > 60003;
60005 26 36 46 56
\.
+COPY x from stdin WITH(ERROR_LIMIT 5);
+70001 22 32
+70002 23 33 43 53 54
+70003 24 34 44
+70004 25 35 45 55
+70005 26 36 46 56
+\.
+
COPY x from stdin WHERE f > 60003;
COPY x from stdin WHERE a = max(x.b);