Hello,

The attached patch add error handling for
Extra data

missing data

invalid oid

null oid and

row count mismatch

And the record that field on the above case write to the file with appended
error message in it and in case of unique violation or exclusion constraint
violation error the failed record write as it is because the case of the
error can not be identified specifically

The new syntax became :

COPY ... WITH ON CONFLICT LOG maximum_error, LOG FILE NAME '…';


Regards

Surafel
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 13a8b68d95..bf21abd8e0 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -364,6 +364,17 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>IGNORE_CONFLICTS</literal></term>
+    <listitem>
+     <para>
+      specifies ignore to error up to specified amount .
+      Instead write the error record to failed record file and 
+      precede to the next record
+     </para>
+    </listitem>
+   </varlistentry>
+
   </variablelist>
  </refsect1>
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 9bc67ce60f..ffa6aecbd5 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -43,6 +43,7 @@
 #include "port/pg_bswap.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
+#include "storage/lmgr.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
@@ -118,6 +119,7 @@ typedef struct CopyStateData
 	int			file_encoding;	/* file or remote side's character encoding */
 	bool		need_transcoding;	/* file encoding diff from server? */
 	bool		encoding_embeds_ascii;	/* ASCII can be non-first byte? */
+	FILE	   *failed_rec_file;
 
 	/* parameters from the COPY command */
 	Relation	rel;			/* relation to copy to or from */
@@ -147,6 +149,9 @@ typedef struct CopyStateData
 	bool		convert_selectively;	/* do selective binary conversion? */
 	List	   *convert_select; /* list of column names (can be NIL) */
 	bool	   *convert_select_flags;	/* per-column CSV/TEXT CS flags */
+	char	   *failed_rec_filename;
+	bool	   ignore_conflict;
+	int	   error_limit;	/* total # of error to log */
 
 	/* these are just for error messages, see CopyFromErrorCallback */
 	const char *cur_relname;	/* table name for error messages */
@@ -766,6 +771,21 @@ CopyLoadRawBuf(CopyState cstate)
 	return (inbytes > 0);
 }
 
+/*
+ * LogCopyError log error in to error log file
+ */
+static void
+LogCopyError(CopyState cstate, const char *str)
+{
+	appendBinaryStringInfo(&cstate->line_buf, str, strlen(str));
+#ifndef WIN32
+	appendStringInfoCharMacro(&cstate->line_buf, '\n');
+#else
+	appendBinaryStringInfo(&cstate->line_buf, "\r\n", strlen("\r\n"));
+#endif
+	fwrite(cstate->line_buf.data, 1, cstate->line_buf.len, cstate->failed_rec_file);
+	cstate->error_limit--;
+}
 
 /*
  *	 DoCopy executes the SQL COPY statement
@@ -1226,6 +1246,19 @@ ProcessCopyOptions(ParseState *pstate,
 								defel->defname),
 						 parser_errposition(pstate, defel->location)));
 		}
+		else if (strcmp(defel->defname, "ignore_conflicts") == 0)
+		{
+			List       *conflictOption;
+			if (cstate->ignore_conflict)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options"),
+						 parser_errposition(pstate, defel->location)));
+			cstate->ignore_conflict = true;
+			conflictOption = (List *) defel->arg;
+			cstate->error_limit = intVal(list_nth(conflictOption, 0));
+			cstate->failed_rec_filename = strVal(list_nth(conflictOption, 1));
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -1749,6 +1782,11 @@ EndCopy(CopyState cstate)
 					(errcode_for_file_access(),
 					 errmsg("could not close file \"%s\": %m",
 							cstate->filename)));
+		if (cstate->failed_rec_filename != NULL && FreeFile(cstate->failed_rec_file))
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not close file \"%s\": %m",
+							cstate->failed_rec_filename)));
 	}
 
 	MemoryContextDelete(cstate->copycontext);
@@ -2461,6 +2499,8 @@ CopyFrom(CopyState cstate)
 		hi_options |= HEAP_INSERT_FROZEN;
 	}
 
+	if (!cstate->ignore_conflict)
+		cstate->error_limit = 0;
 	/*
 	 * We need a ResultRelInfo so we can use the regular executor's
 	 * index-entry-making machinery.  (There used to be a huge amount of code
@@ -2579,6 +2619,10 @@ CopyFrom(CopyState cstate)
 		 */
 		insertMethod = CIM_SINGLE;
 	}
+	else if (cstate->ignore_conflict)
+	{
+		insertMethod = CIM_SINGLE;
+	}
 	else
 	{
 		/*
@@ -2968,12 +3012,59 @@ CopyFrom(CopyState cstate)
 						 */
 						tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
 					}
+					else if (cstate->ignore_conflict && cstate->error_limit > 0)
+					{
+						bool		specConflict;
+						uint32		specToken;
+						specConflict = false;
+
+						specToken = SpeculativeInsertionLockAcquire(GetCurrentTransactionId());
+						HeapTupleHeaderSetSpeculativeToken(tuple->t_data, specToken);
+
+						/* insert the tuple, with the speculative token */
+						heap_insert(resultRelInfo->ri_RelationDesc, tuple,
+									estate->es_output_cid,
+									HEAP_INSERT_SPECULATIVE,
+									NULL);
+
+						/* insert index entries for tuple */
+						recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
+													   estate, true, &specConflict,
+													   NIL);
+
+						/* adjust the tuple's state accordingly */
+						if (!specConflict)
+						{
+							heap_finish_speculative(resultRelInfo->ri_RelationDesc, tuple);
+							processed++;
+						}
+						else
+						{
+							heap_abort_speculative(resultRelInfo->ri_RelationDesc, tuple);
+#ifndef WIN32
+							appendStringInfoCharMacro(&cstate->line_buf, '\n');
+#else
+							appendBinaryStringInfo(&cstate->cstate->line_buf, "\r\n", strlen("\r\n"));
+#endif
+							fwrite(cstate->line_buf.data, 1, cstate->line_buf.len, cstate->failed_rec_file);
+							cstate->error_limit--;
+
+						}
+
+				/*
+				 * Wake up anyone waiting for our decision.  They will re-check
+				 * the tuple, see that it's no longer speculative, and wait on our
+				 * XID as if this was a regularly inserted tuple all along.
+				 */
+						SpeculativeInsertionLockRelease(GetCurrentTransactionId());
+
+				}
 					else
 						heap_insert(resultRelInfo->ri_RelationDesc, tuple,
 									mycid, hi_options, bistate);
 
 					/* And create index entries for it */
-					if (resultRelInfo->ri_NumIndices > 0)
+					if (resultRelInfo->ri_NumIndices > 0 && cstate->error_limit == 0)
 						recheckIndexes = ExecInsertIndexTuples(slot,
 															   &(tuple->t_self),
 															   estate,
@@ -2994,7 +3085,8 @@ CopyFrom(CopyState cstate)
 			 * or FDW; this is the same definition used by nodeModifyTable.c
 			 * for counting tuples inserted by an INSERT command.
 			 */
-			processed++;
+				if(!cstate->ignore_conflict)
+				processed++;
 		}
 	}
 
@@ -3286,6 +3378,48 @@ BeginCopyFrom(ParseState *pstate,
 	cstate->num_defaults = num_defaults;
 	cstate->is_program = is_program;
 
+	if (cstate->failed_rec_filename)
+	{
+		mode_t		oumask; /* Pre-existing umask value */
+		struct stat st;
+			/*
+			 * Prevent write to relative path ... too easy to shoot oneself in
+			 * the foot by overwriting a database file ...
+			 */
+			if (!is_absolute_path(cstate->failed_rec_filename))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_NAME),
+						 errmsg("relative path not allowed for failed record file")));
+			oumask = umask(S_IWGRP | S_IWOTH);
+			PG_TRY();
+			{
+				cstate->failed_rec_file = AllocateFile(cstate->failed_rec_filename, PG_BINARY_W);
+			}
+			PG_CATCH();
+			{
+				umask(oumask);
+				PG_RE_THROW();
+			}
+			PG_END_TRY();
+			umask(oumask);
+			if (cstate->failed_rec_file == NULL)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not open file \"%s\" for writing: %m",
+								cstate->failed_rec_filename)));
+
+			if (fstat(fileno(cstate->failed_rec_file), &st))
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not stat file \"%s\": %m",
+								cstate->failed_rec_filename)));
+
+			if (S_ISDIR(st.st_mode))
+				ereport(ERROR,
+						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+						 errmsg("\"%s\" is a directory", cstate->failed_rec_filename)));
+		}
+
 	if (data_source_cb)
 	{
 		cstate->copy_dest = COPY_CALLBACK;
@@ -3498,7 +3632,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 	/* Initialize all values for row to NULL */
 	MemSet(values, 0, num_phys_attrs * sizeof(Datum));
 	MemSet(nulls, true, num_phys_attrs * sizeof(bool));
-
+next_line:
 	if (!cstate->binary)
 	{
 		char	  **field_strings;
@@ -3513,9 +3647,16 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 
 		/* check for overflowing fields */
 		if (nfields > 0 && fldct > nfields)
+		{
+			if (cstate->ignore_conflict && cstate->error_limit > 0)
+			{
+				LogCopyError(cstate, " extra data after last expected column");
+				goto next_line;
+			}else
 			ereport(ERROR,
 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
 					 errmsg("extra data after last expected column")));
+		}
 
 		fieldno = 0;
 
@@ -3523,15 +3664,29 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 		if (file_has_oids)
 		{
 			if (fieldno >= fldct)
-				ereport(ERROR,
-						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-						 errmsg("missing data for OID column")));
+			{
+				if (cstate->ignore_conflict && cstate->error_limit > 0)
+				{
+					LogCopyError(cstate, " missing data for OID column");
+					goto next_line;
+				}else
+					ereport(ERROR,
+							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+							 errmsg("missing data for OID column")));
+			}
 			string = field_strings[fieldno++];
 
 			if (string == NULL)
+			{
+				if (cstate->ignore_conflict && cstate->error_limit > 0)
+				{
+					LogCopyError(cstate, " null OID in COPY data");
+					goto next_line;
+				}else
 				ereport(ERROR,
 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
 						 errmsg("null OID in COPY data")));
+			}
 			else if (cstate->oids && tupleOid != NULL)
 			{
 				cstate->cur_attname = "oid";
@@ -3539,9 +3694,17 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 				*tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
 																 CStringGetDatum(string)));
 				if (*tupleOid == InvalidOid)
+				{
+					if (cstate->ignore_conflict && cstate->error_limit > 0)
+					{
+						LogCopyError(cstate, " invalid OID in COPY data");
+						goto next_line;
+					}else
+
 					ereport(ERROR,
 							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
 							 errmsg("invalid OID in COPY data")));
+				}
 				cstate->cur_attname = NULL;
 				cstate->cur_attval = NULL;
 			}
@@ -3555,10 +3718,20 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 			Form_pg_attribute att = TupleDescAttr(tupDesc, m);
 
 			if (fieldno >= fldct)
+			{
+				if (cstate->ignore_conflict && cstate->error_limit > 0)
+				{
+					appendStringInfo(&cstate->line_buf, " missing data for column %s",
+								NameStr(att->attname));
+					LogCopyError(cstate, " ");
+					goto next_line;
+				}else
+
 				ereport(ERROR,
 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
 						 errmsg("missing data for column \"%s\"",
 								NameStr(att->attname))));
+			}
 			string = field_strings[fieldno++];
 
 			if (cstate->convert_select_flags &&
@@ -3645,10 +3818,19 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 		}
 
 		if (fld_count != attr_count)
-			ereport(ERROR,
-					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-					 errmsg("row field count is %d, expected %d",
-							(int) fld_count, attr_count)));
+		{
+			if (cstate->ignore_conflict && cstate->error_limit > 0)
+			{
+				appendStringInfo(&cstate->line_buf, "row field count is %d, expected %d",
+						(int) fld_count, attr_count);
+				LogCopyError(cstate, " ");
+				goto next_line;
+			}else
+				ereport(ERROR,
+						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+						 errmsg("row field count is %d, expected %d",
+								(int) fld_count, attr_count)));
+		}
 
 		if (file_has_oids)
 		{
@@ -3663,9 +3845,16 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 														 -1,
 														 &isnull));
 			if (isnull || loaded_oid == InvalidOid)
-				ereport(ERROR,
-						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-						 errmsg("invalid OID in COPY data")));
+			{
+				if (cstate->ignore_conflict && cstate->error_limit > 0)
+				{
+					LogCopyError(cstate, " invalid OID in COPY data");
+					goto next_line;
+				}else
+					ereport(ERROR,
+							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+							 errmsg("invalid OID in COPY data")));
+			}
 			cstate->cur_attname = NULL;
 			if (cstate->oids && tupleOid != NULL)
 				*tupleOid = loaded_oid;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 87f5e95827..c1084f71bc 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -632,7 +632,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN
 	EXTENSION EXTERNAL EXTRACT
 
-	FALSE_P FAMILY FETCH FILTER FIRST_P FLOAT_P FOLLOWING FOR
+	FALSE_P FAMILY FETCH FILE_P FILTER FIRST_P FLOAT_P FOLLOWING FOR
 	FORCE FOREIGN FORWARD FREEZE FROM FULL FUNCTION FUNCTIONS
 
 	GENERATED GLOBAL GRANT GRANTED GREATEST GROUP_P GROUPING GROUPS
@@ -650,7 +650,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 	LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
 	LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
-	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOG_P LOGGED
 
 	MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
 
@@ -3107,6 +3107,10 @@ copy_opt_item:
 				{
 					$$ = makeDefElem("encoding", (Node *)makeString($2), @1);
 				}
+			| ON CONFLICT LOG_P Iconst ',' LOG_P FILE_P NAME_P Sconst
+				{
+					$$ = makeDefElem("ignore_conflicts", (Node *)list_make2(makeInteger($4), makeString($9)), @1);
+				}
 		;
 
 /* The following exist for backward compatibility with very old versions */
@@ -15086,6 +15090,7 @@ unreserved_keyword:
 			| EXTENSION
 			| EXTERNAL
 			| FAMILY
+			| FILE_P
 			| FILTER
 			| FIRST_P
 			| FOLLOWING
@@ -15134,6 +15139,7 @@ unreserved_keyword:
 			| LOCATION
 			| LOCK_P
 			| LOCKED
+			| LOG_P
 			| LOGGED
 			| MAPPING
 			| MATCH
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 23db40147b..442562b0fe 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -162,6 +162,7 @@ PG_KEYWORD("extract", EXTRACT, COL_NAME_KEYWORD)
 PG_KEYWORD("false", FALSE_P, RESERVED_KEYWORD)
 PG_KEYWORD("family", FAMILY, UNRESERVED_KEYWORD)
 PG_KEYWORD("fetch", FETCH, RESERVED_KEYWORD)
+PG_KEYWORD("file", FILE_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("filter", FILTER, UNRESERVED_KEYWORD)
 PG_KEYWORD("first", FIRST_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("float", FLOAT_P, COL_NAME_KEYWORD)
@@ -242,6 +243,7 @@ PG_KEYWORD("localtimestamp", LOCALTIMESTAMP, RESERVED_KEYWORD)
 PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
 PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
+PG_KEYWORD("log", LOG_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD)
 PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
 PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)

Reply via email to