On Wed, Apr 22, 2026 at 10:11 AM jian he <[email protected]> wrote:
>
> Hi, V8 is attached.
>
> I've refactored a significant portion of the code. Now the new syntax is:
>
> COPY FROM (ON_ERROR TABLE, ERROR_TABLE err_tbl);
>
> We now produce a ModifyTableState and ResultRelInfo node, form the slot, and
> then use table_tuple_insert() + ExecInsertIndexTuples() to insert the error
> metadata into the ERROR_TABLE.
>
> This is very similar to the normal ExecInsert() path. Since the ERROR_TABLE 
> is a
> user-defined table, we enforce the a lot of restriction,
> in CopyFromErrorTableCheck, we have comments like:
> +

Actually, We can export ExecInsert, construct a ModifyTableContext,
and let ExecInsert do the remaining job.
This would be neater.

V9-0001: Export ExecInsert, ExecSetupTransitionCaptureState,
fireBSTriggers, fireASTrigger.
v9-0002: Introduce a (ModifyTableContext *) field in CopyFromStateData.
Initialize all useful subordinate structures, including such as
ModifyTableState, EState, and ModifyTableState->ResultRelInfo.  Set
cstate->escontext->details_wanted to true to ensure that all input conversion
error metadata is captured in cstate->escontext.  Use this metadata to construct
a TupleTableSlot, and then delegate the remaining processing to ExecInsert.

Before calling ExecInsert() to insert error metadata into the ERROR_TABLE, we
must first populate a ModifyTableState, similar to how it is done in
ExecInitModifyTable.

ERROR_TABLE currently cannot be a partitioned table there is no need to call
ExecSetupPartitionTupleRouting. ERROR_TABLE cannot be a foreign table,
ExecInitModifyTable foreign table related initialization handling is not needed.
ERROR_TABLE does not support Row-Level Security (RLS), we do not need to handle
ModifyTable->withCheckOptionLists. For the INSERT code path, we also do not need
to worry about EvalPlanQual.

By comparing side-by-side with ExecInitModifyTable, it is ok to populate the
necessary information for the ERROR_TABLE using the following functions:
ExecInitRangeTable, ExecInitResultRelation, ExecOpenIndices, and
CheckValidResultRel.
This will allow the ERROR_TABLE to safely use ExecInsert().



--
jian
https://www.enterprisedb.com/
From cc0873cefec93b41cffe22410ec349dfc001450f Mon Sep 17 00:00:00 2001
From: jian he <[email protected]>
Date: Wed, 6 May 2026 09:56:38 +0800
Subject: [PATCH v9 2/2] COPY FROM (on_error table, error_table errtbl)

The syntax is {on_error table, error_table error_saving_tbl}.
The error_saving_tbl must be a typed table, based on internal type
pg_catalog.copy_error_saving.  A preliminary lock check is also performed on the
error-saving table to ensure that inserts into it will not block.

When an error occurs, we record the error metadata and insert it into the
error_saving_tbl, then proceed to the next row.  Although the error_saving_tbl
may not capture information for every invalid column in each row, it retains the
raw_field_value, which can be used for further investigation.

the build-in type: pg_catalog.copy_error_saving definition is
CREATE TYPE copy_error_saving AS
(
userid    oid,
copy_tbl  oid,
filename  text COLLATE "C",
lineno    bigint,
line      text COLLATE "C",
colname   text COLLATE "C",
raw_field_value text COLLATE "C",
err_message     text COLLATE "C",
err_detail      text COLLATE "C",
errorcode       text COLLATE "C"
);

It's declared in src/backend/catalog/system_functions.sql.
If it's going to change, which may cause potential upgrade issue, to avoid that
we may need be sure that this will unlikely to change in the future.

TODO: Should we also add field (starttime timestamptz) to copy_error_saving to
indicate the time when this error record was inserted.

Since We are using ExecInsert inserts to error_saving_tbl, FOR EACH ROW trigger
on error_saving_tbl will triggered for each insert FOR EACH STATEMENT will be
trigger only once.

reference: https://postgr.es/m/CACJufxHi53OpGYPAe6SdCb4m=-+H8L+7LDbUWvTiJp=v4yy...@mail.gmail.com
reference: https://postgr.es/m/752672.1699474336%40sss.pgh.pa.us
discussion: https://postgr.es/m/CACJufxH_OJpVra=0c4ow8fbxhj7hemcvatnepa5vaursena...@mail.gmail.com
commitfest entry: https://commitfest.postgresql.org/patch/4817
---
 doc/src/sgml/datatype.sgml               | 100 ++++++++
 doc/src/sgml/ref/copy.sgml               |  39 +++-
 src/backend/catalog/system_functions.sql |  13 ++
 src/backend/commands/copy.c              |  28 +++
 src/backend/commands/copyfrom.c          | 282 ++++++++++++++++++++++-
 src/backend/commands/copyfromparse.c     |  98 ++++++++
 src/backend/parser/gram.y                |   1 +
 src/include/commands/copy.h              |   2 +
 src/include/commands/copyfrom_internal.h |   9 +
 src/test/regress/expected/copy2.out      | 168 ++++++++++++++
 src/test/regress/sql/copy2.sql           | 138 +++++++++++
 11 files changed, 867 insertions(+), 11 deletions(-)

diff --git a/doc/src/sgml/datatype.sgml b/doc/src/sgml/datatype.sgml
index d8d91678e86..370e99c72ea 100644
--- a/doc/src/sgml/datatype.sgml
+++ b/doc/src/sgml/datatype.sgml
@@ -5116,6 +5116,106 @@ WHERE ...
    </para>
   </sect1>
 
+  <sect1 id="datatype-copy_error_saving">
+   <title><type>copy_error_saving</type> Type</title>
+   <indexterm zone="datatype-copy_error_saving">
+    <primary>copy_error_saving</primary>
+   </indexterm>
+
+   <para>
+    The built-in composite type <type>copy_error_saving</type> is used by the
+    <link linkend="sql-copy"><command>COPY FROM</command></link> command.
+    It contains the following fields, which are used to store information when <command>COPY FROM</command>
+    encounters an error converting a column’s input value to its data type.
+  </para>
+
+   <para>
+<informaltable>
+    <tgroup cols="3">
+     <thead>
+      <row>
+       <entry>Column name</entry>
+       <entry>Data type</entry>
+       <entry>Description</entry>
+      </row>
+     </thead>
+
+      <tbody>
+       <row>
+       <entry> <literal>userid</literal> </entry>
+       <entry><type>oid</type></entry>
+       <entry>The <command>COPY FROM</command> operation user.
+       Reference <link linkend="catalog-pg-authid"><structname>pg_authid</structname></link>.<structfield>oid</structfield>,
+       however there is no hard dependency with catalog <literal>pg_authid</literal>.
+       If the corresponding row on <literal>pg_authid</literal> is deleted, this value becomes stale.
+    </entry>
+       </row>
+
+       <row>
+       <entry> <literal>copy_tbl</literal> </entry>
+       <entry><type>oid</type></entry>
+       <entry>The <command>COPY FROM</command> operation destination table.
+        Reference <link linkend="catalog-pg-class"><structname>pg_class</structname></link>.<structfield>oid</structfield>,
+        however there is no hard dependency with catalog <literal>pg_class</literal>.
+        If the corresponding row on <literal>pg_class</literal> is deleted, this value becomes stale.
+        </entry>
+       </row>
+
+       <row>
+       <entry> <literal>filename</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>The path name of the <command>COPY FROM</command> input</entry>
+       </row>
+
+       <row>
+       <entry> <literal>lineno</literal> </entry>
+       <entry><type>bigint</type></entry>
+       <entry>Line number where the error occurred, counting from 1</entry>
+       </row>
+
+       <row>
+       <entry> <literal>line</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Raw content of the error occurred line</entry>
+       </row>
+
+       <row>
+       <entry> <literal>colname</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Field where the error occurred</entry>
+       </row>
+
+       <row>
+       <entry> <literal>raw_field_value</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Raw content of the error occurred field</entry>
+       </row>
+
+       <row>
+       <entry> <literal>err_message </literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>The primary error message, see <link linkend="error-message-reporting">ereport</link></entry>
+       </row>
+
+       <row>
+       <entry> <literal>err_detail</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>The optionally detailed error message</entry>
+       </row>
+
+       <row>
+       <entry> <literal>errorcode </literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>The SQLSTATE error identifier code for the error condition</entry>
+       </row>
+
+      </tbody>
+     </tgroup>
+   </informaltable>
+     </para>
+  </sect1>
+
+
   <sect1 id="datatype-pg-lsn">
    <title><type>pg_lsn</type> Type</title>
 
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 4706c9a4410..73c3583dd7c 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -46,6 +46,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     FORCE_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * }
     ON_ERROR <replaceable class="parameter">error_action</replaceable>
     REJECT_LIMIT <replaceable class="parameter">maxerror</replaceable>
+    ERROR_TABLE <replaceable class="parameter">error_saving_table</replaceable>
     ENCODING '<replaceable class="parameter">encoding_name</replaceable>'
     LOG_VERBOSITY <replaceable class="parameter">verbosity</replaceable>
 </synopsis>
@@ -450,11 +451,13 @@ COPY (SELECT j FROM (VALUES ('null'::json), (NULL::json)) v(j))
       <literal>stop</literal> means fail the command, while
       <literal>ignore</literal> means discard the input row and continue with the next one,
       and <literal>set_null</literal> means replace the field containing the invalid
-      input value with a null value and continue to the next field.
+      input value with a null value and continue to the next field,
+      and <literal>table</literal> means save error information to <replaceable class="parameter">error_saving_table</replaceable>
+      and continue with the next one.
       The default is <literal>stop</literal>.
      </para>
      <para>
-      The <literal>ignore</literal> and <literal>set_null</literal>
+      The <literal>ignore</literal>, <literal>set_null</literal> and <literal>table</literal>
       options are applicable only for <command>COPY FROM</command>
       when the <literal>FORMAT</literal> is <literal>text</literal> or <literal>csv</literal>.
      </para>
@@ -463,16 +466,24 @@ COPY (SELECT j FROM (VALUES ('null'::json), (NULL::json)) v(j))
       <literal>set_null</literal>, a <literal>NOTICE</literal> message is emitted at the end of the
       <command>COPY FROM</command> command containing the count of rows that were ignored or
       changed, if at least one row was affected.
+      If <literal>ON_ERROR</literal> option is set to <literal>table</literal>,
+      a <literal>NOTICE</literal> message containing the row count inserted to
+      <replaceable class="parameter">error_saving_table</replaceable> is
+      emitted at the end of the <command>COPY FROM</command>.
      </para>
      <para>
       When <literal>LOG_VERBOSITY</literal> option is set to <literal>verbose</literal>,
-      for <literal>ignore</literal> option, a <literal>NOTICE</literal> message
+      for <literal>ignore</literal> or <literal>table</literal> option, a <literal>NOTICE</literal> message
       containing the line of the input file and the column name whose input
       conversion has failed is emitted for each discarded row;
       for <literal>set_null</literal> option, a <literal>NOTICE</literal>
       message containing the line of the input file and the column name where
       value was replaced with <literal>NULL</literal> for each input conversion
-      failure.
+      failure,
+      for <literal>table</literal> option, a <literal>NOTICE</literal>
+      message containing the line of the input file and the column name whose input
+      conversion has failed is emitted for each row that inserted to
+      <replaceable class="parameter">error_saving_table</replaceable>.
       When it is set to <literal>silent</literal>, no message is emitted
       regarding input conversion failed rows.
      </para>
@@ -497,6 +508,22 @@ COPY (SELECT j FROM (VALUES ('null'::json), (NULL::json)) v(j))
     </listitem>
    </varlistentry>
 
+   <varlistentry id="sql-copy-params-error-table">
+    <term><literal>ERROR_TABLE</literal></term>
+    <listitem>
+      <para>
+        Save error context details to table <replaceable class="parameter">error_saving_table</replaceable>.
+        This option is allowed only in <command>COPY FROM</command> and
+        <literal>ON_ERROR</literal> is specified as <literal>TABLE</literal>.
+        The current <command>COPY FROM</command> operation user requires have <literal>INSERT</literal> privileges on all columns
+        of the <replaceable class="parameter">error_saving_table</replaceable>.
+        The <replaceable class="parameter">error_saving_table</replaceable> must
+        be a typed table derived from system catalog composite type <link
+        linkend="datatype-copy_error_saving"><command>copy_error_saving</command></link>.
+      </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry id="sql-copy-params-encoding">
     <term><literal>ENCODING</literal></term>
     <listitem>
@@ -522,8 +549,8 @@ COPY (SELECT j FROM (VALUES ('null'::json), (NULL::json)) v(j))
      </para>
      <para>
       This is currently used in <command>COPY FROM</command> command when
-      <literal>ON_ERROR</literal> option is set to <literal>ignore</literal>
-      or <literal>set_null</literal>.
+      <literal>ON_ERROR</literal> option is set to <literal>ignore</literal>,
+      <literal>set_null</literal> or <literal>table</literal>.
       </para>
     </listitem>
    </varlistentry>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index c3c0a6e84ed..62a4a6d98aa 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -366,3 +366,16 @@ CREATE OR REPLACE FUNCTION ts_debug(document text,
 BEGIN ATOMIC
     SELECT * FROM ts_debug(get_current_ts_config(), $1);
 END;
+
+CREATE TYPE copy_error_saving AS(
+    userid    oid,
+    copy_tbl  oid,
+    filename  text COLLATE "C",
+    lineno    bigint,
+    line      text COLLATE "C",
+    colname   text COLLATE "C",
+    raw_field_value text COLLATE "C",
+    err_message     text COLLATE "C",
+    err_detail      text COLLATE "C",
+    errorcode       text COLLATE "C"
+);
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 003b70852bb..0839ab7bbe3 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -494,6 +494,8 @@ defGetCopyOnErrorChoice(DefElem *def, ParseState *pstate, bool is_from)
 		return COPY_ON_ERROR_IGNORE;
 	if (pg_strcasecmp(sval, "set_null") == 0)
 		return COPY_ON_ERROR_SET_NULL;
+	if (pg_strcasecmp(sval, "table") == 0)
+		return COPY_ON_ERROR_TABLE;
 
 	ereport(ERROR,
 			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@@ -587,6 +589,7 @@ ProcessCopyOptions(ParseState *pstate,
 	bool		freeze_specified = false;
 	bool		header_specified = false;
 	bool		on_error_specified = false;
+	bool		error_rel_specified = false;
 	bool		log_verbosity_specified = false;
 	bool		reject_limit_specified = false;
 	bool		force_array_specified = false;
@@ -774,6 +777,13 @@ ProcessCopyOptions(ParseState *pstate,
 			reject_limit_specified = true;
 			opts_out->reject_limit = defGetCopyRejectLimitOption(defel);
 		}
+		else if (strcmp(defel->defname, "error_table") == 0)
+		{
+			if (error_rel_specified)
+				errorConflictingDefElem(defel, pstate);
+			error_rel_specified = true;
+			opts_out->error_table = defGetString(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -782,6 +792,24 @@ ProcessCopyOptions(ParseState *pstate,
 					 parser_errposition(pstate, defel->location)));
 	}
 
+	if (opts_out->on_error == COPY_ON_ERROR_TABLE)
+	{
+		if (opts_out->error_table == NULL)
+			ereport(ERROR,
+					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					errmsg("cannot set option %s to \"%s\" when \"%s\" is not specified", "ON_ERROR", "TABLE", "ERROR_TABLE"),
+					errhint("\"%s\" option is required", "ERROR_TABLE"));
+
+		if (opts_out->reject_limit)
+			ereport(ERROR,
+					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("cannot set option %s when %s is specified as \"%s\"", "REJECT_LIMIT", "ON_ERROR", "TABLE"));
+	}
+	else if (opts_out->error_table != NULL)
+		ereport(ERROR,
+				errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				errmsg("COPY %s can only be used when option %s is set to \"%s\"", "ERROR_TABLE", "ON_ERROR", "TABLE"));
+
 	/*
 	 * Check for incompatible options (must do these three before inserting
 	 * defaults)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 64ac3063c61..5f29dcf5bac 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -29,6 +29,7 @@
 #include "access/tupconvert.h"
 #include "access/xact.h"
 #include "catalog/namespace.h"
+#include "catalog/pg_namespace.h"
 #include "commands/copyapi.h"
 #include "commands/copyfrom_internal.h"
 #include "commands/progress.h"
@@ -42,16 +43,20 @@
 #include "miscadmin.h"
 #include "nodes/miscnodes.h"
 #include "optimizer/optimizer.h"
+#include "parser/parse_relation.h"
 #include "pgstat.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
 #include "tcop/tcopprot.h"
+#include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/portal.h"
 #include "utils/rel.h"
+#include "utils/regproc.h"
 #include "utils/snapmgr.h"
 #include "utils/typcache.h"
+#include "utils/syscache.h"
 
 /*
  * No more than this many tuples per CopyMultiInsertBuffer
@@ -120,6 +125,10 @@ static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
 								 FmgrInfo *finfo, Oid *typioparam);
 static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc);
 static void CopyFromBinaryEnd(CopyFromState cstate);
+static void RangeVarCallbackForCopyErrorTable(const RangeVar *rv, Oid relid, Oid oldrelid,
+											  void *arg);
+static void CopyFromErrorTableInit(CopyFromState cstate);
+static void CopyFromErrorTablePermissionCheck(ParseState *pstate, Relation rel);
 
 
 /*
@@ -1151,22 +1160,26 @@ CopyFrom(CopyFromState cstate)
 		if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
 			break;
 
-		if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
+		if ((cstate->opts.on_error == COPY_ON_ERROR_IGNORE ||
+			 cstate->opts.on_error == COPY_ON_ERROR_TABLE) &&
 			cstate->escontext->error_occurred)
 		{
 			/*
 			 * Soft error occurred, skip this tuple and just make
-			 * ErrorSaveContext ready for the next NextCopyFrom. Since we
-			 * don't set details_wanted and error_data is not to be filled,
-			 * just resetting error_occurred is enough.
+			 * ErrorSaveContext ready for the next NextCopyFrom.
 			 */
 			cstate->escontext->error_occurred = false;
 
+			/* Reset ErrorSaveContext->error_data */
+			if (cstate->opts.on_error == COPY_ON_ERROR_TABLE)
+				memset(cstate->escontext->error_data, 0, sizeof(ErrorData));
+
 			/* Report that this tuple was skipped by the ON_ERROR clause */
 			pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
 										 cstate->num_errors);
 
-			if (cstate->opts.reject_limit > 0 &&
+			if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
+				cstate->opts.reject_limit > 0 &&
 				cstate->num_errors > cstate->opts.reject_limit)
 				ereport(ERROR,
 						(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
@@ -1482,6 +1495,24 @@ CopyFrom(CopyFromState cstate)
 								  cstate->num_errors));
 	}
 
+	if (cstate->opts.on_error == COPY_ON_ERROR_TABLE && cstate->num_errors > 0)
+	{
+		ModifyTableState *on_error_mtstate = cstate->mtcontext->mtstate;
+
+		fireASTriggers(on_error_mtstate);
+		AfterTriggerEndQuery(on_error_mtstate->ps.state);
+
+		on_error_mtstate->mt_done = true;
+
+		if (cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
+			ereport(NOTICE,
+					errmsg_plural("%" PRIu64 " row was saved to table \"%s\" due to data type incompatibility",
+								  "%" PRIu64 " rows were saved to table \"%s\" due to data type incompatibility",
+								  cstate->num_errors,
+								  cstate->num_errors,
+								  RelationGetRelationName(cstate->error_rel)));
+	}
+
 	if (bistate != NULL)
 		FreeBulkInsertState(bistate);
 
@@ -1515,6 +1546,15 @@ CopyFrom(CopyFromState cstate)
 
 	FreeExecutorState(estate);
 
+	/* Release resouces associated with ERROR_TABLE */
+	if (cstate->error_rel)
+	{
+		ExecResetTupleTable(cstate->mtcontext->mtstate->ps.state->es_tupleTable, false);
+		ExecCloseResultRelations(cstate->mtcontext->mtstate->ps.state);
+		ExecCloseRangeTableRelations(cstate->mtcontext->mtstate->ps.state);
+		FreeExecutorState(cstate->mtcontext->mtstate->ps.state);
+	}
+
 	return processed;
 }
 
@@ -1630,6 +1670,14 @@ BeginCopyFrom(ParseState *pstate,
 		if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE ||
 			cstate->opts.on_error == COPY_ON_ERROR_SET_NULL)
 			cstate->escontext->details_wanted = false;
+
+		/*
+		 * For ON_ERROR = TABLE, we must set details_wanted to true. This
+		 * ensures that ErrorData is populated when the next error occurs,
+		 * allowing us to capture error metadata.
+		 */
+		else if (cstate->opts.on_error == COPY_ON_ERROR_TABLE)
+			cstate->escontext->details_wanted = true;
 	}
 	else
 		cstate->escontext = NULL;
@@ -1657,6 +1705,86 @@ BeginCopyFrom(ParseState *pstate,
 			cstate->domain_with_constraint[i] = DomainHasConstraints(att->atttypid, NULL);
 		}
 	}
+	else if (cstate->opts.on_error == COPY_ON_ERROR_TABLE)
+	{
+		/* Set up COPY FROM (ON_ERROR TABLE) */
+		RangeVar   *relvar;
+		List	   *relname_list;
+		HeapTuple	tp;
+		Oid			err_relOid,
+					typoid;
+		Oid			reloftype = InvalidOid;
+
+		Assert(cstate->opts.error_table != NULL);
+
+		relname_list = stringToQualifiedNameList(cstate->opts.error_table, NULL);
+		relvar = makeRangeVarFromNameList(relname_list);
+
+		/*
+		 * We may insert tuples into ERROR_TABLE later. To avoid a deadlock or
+		 * long hang during COPY, verify that the table is not already locked;
+		 * otherwise, report a lock conflict error.
+		 */
+		err_relOid = RangeVarGetRelidExtended(relvar,
+											  RowExclusiveLock,
+											  RVR_NOWAIT,
+											  RangeVarCallbackForCopyErrorTable,
+											  NULL);
+
+		if (RelationGetRelid(cstate->rel) == err_relOid)
+			ereport(ERROR,
+					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					errmsg("cannot use relation \"%s\" for COPY error saving while copying data to it",
+						   cstate->opts.error_table));
+
+		cstate->error_rel = table_open(err_relOid, NoLock);
+
+		/*
+		 * The error-saving table must be a plain table. It cannot have
+		 * rewrite rules or any enabled Row-Level Security (RLS) policies.
+		 */
+		if (cstate->error_rel->rd_rel->relrowsecurity || cstate->error_rel->rd_rules)
+			ereport(ERROR,
+					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("cannot use relation \"%s\" for COPY error saving",
+						   RelationGetRelationName(cstate->error_rel)),
+					cstate->error_rel->rd_rel->relrowsecurity
+					? errdetail("The error saving table cannot have row-level security policies.")
+					: errdetail("The error saving table cannot have rules."));
+
+		typoid = GetSysCacheOid2(TYPENAMENSP, Anum_pg_type_oid,
+								 PointerGetDatum("copy_error_saving"),
+								 ObjectIdGetDatum(PG_CATALOG_NAMESPACE));
+
+		tp = SearchSysCache1(RELOID, ObjectIdGetDatum(err_relOid));
+		if (!HeapTupleIsValid(tp))
+			elog(ERROR, "cache lookup failed for relation %u", err_relOid);
+		else
+		{
+			Form_pg_class reltup = (Form_pg_class) GETSTRUCT(tp);
+
+			reloftype = reltup->reloftype;
+
+			if (reloftype != typoid)
+				ereport(ERROR,
+						errcode(ERRCODE_WRONG_OBJECT_TYPE),
+						errmsg("cannot use relation \"%s\" for COPY error saving",
+							   RelationGetRelationName(cstate->error_rel)),
+						OidIsValid(reloftype)
+						? errdetail("Relation \"%s\" is a typed table based on type \"%s\".",
+									RelationGetRelationName(cstate->error_rel),
+									format_type_be_qualified(reloftype))
+						: 0,
+						errhint("The COPY error saving table must be a typed table based on type \"%s\".",
+								format_type_be_qualified(typoid)));
+
+			ReleaseSysCache(tp);
+		}
+
+		CopyFromErrorTableInit(cstate);
+
+		table_close(cstate->error_rel, NoLock);
+	}
 
 	/* Convert FORCE_NULL name list to per-column flags, check validity */
 	cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
@@ -1998,3 +2126,147 @@ ClosePipeFromProgram(CopyFromState cstate)
 				 errdetail_internal("%s", wait_result_to_str(pclose_rc))));
 	}
 }
+
+/*
+ * Perform permission and related checks for ERROR_TABLE, and initialize
+ * cstate->mtcontext.
+ */
+static void
+CopyFromErrorTableInit(CopyFromState cstate)
+{
+	Relation	relation = cstate->error_rel;
+	ResultRelInfo *resultRelInfo = makeNode(ResultRelInfo);
+	ModifyTableState *mtstate = makeNode(ModifyTableState);
+	ParseState *pstate = make_parsestate(NULL);
+	EState	   *estate = CreateExecutorState();
+	ModifyTableContext *mtcontext = palloc0_object(ModifyTableContext);
+	ModifyTable *node = makeNode(ModifyTable);
+
+	node->operation = CMD_INSERT;
+	node->canSetTag = false;
+	node->rootRelation = 0;
+	node->resultRelations = list_make1_int(1);
+
+	Assert(cstate->opts.on_error == COPY_ON_ERROR_TABLE);
+
+	/* Do the error saving table permission check */
+	CopyFromErrorTablePermissionCheck(pstate, relation);
+
+	/*
+	 * We need a ResultRelInfo so we can use the regular executor's
+	 * index-entry-making machinery.
+	 */
+	ExecInitRangeTable(estate, pstate->p_rtable, pstate->p_rteperminfos,
+					   bms_make_singleton(1));
+
+	ExecInitResultRelation(estate, resultRelInfo, 1);
+
+	ExecOpenIndices(resultRelInfo, false);
+
+	/* Verify the named relation is a valid target for INSERT */
+	CheckValidResultRel(resultRelInfo, CMD_INSERT, ONCONFLICT_NONE, NIL);
+
+	/*
+	 * Populate ModifyTableState for inserting record to error saving table.
+	 */
+	mtstate->ps.plan = (Plan *) node;
+	mtstate->ps.state = estate;
+	mtstate->operation = CMD_INSERT;
+	mtstate->mt_nrels = 1;
+	mtstate->resultRelInfo = resultRelInfo;
+	mtstate->rootResultRelInfo = resultRelInfo;
+	mtstate->fireBSTriggers = true;
+
+	mtcontext->mtstate = mtstate;
+	mtcontext->estate = estate;
+	cstate->mtcontext = mtcontext;
+}
+
+/*
+ * Callback to RangeVarGetRelidExtended().
+ *
+ * Checks the following:
+ *	- the relation specified is a table.
+ *	- the table is not a system table.
+ *
+ * If any of these checks fails then an error is raised.
+ */
+static void
+RangeVarCallbackForCopyErrorTable(const RangeVar *rv, Oid relid, Oid oldrelid,
+								  void *arg)
+{
+	HeapTuple	tuple;
+	Form_pg_class classform;
+	char		relkind;
+
+	tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
+	if (!HeapTupleIsValid(tuple))
+		return;
+
+	classform = (Form_pg_class) GETSTRUCT(tuple);
+	relkind = classform->relkind;
+
+	/* No system table modifications unless explicitly allowed */
+	if (!allowSystemTableMods && IsSystemClass(relid, classform))
+		ereport(ERROR,
+				errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+				errmsg("permission denied: \"%s\" is a system catalog",
+					   rv->relname));
+
+	/* The error saving table must be a regular realtion */
+	if (relkind != RELKIND_RELATION)
+		ereport(ERROR,
+				errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				errmsg("cannot use relation \"%s\" for COPY error saving",
+					   rv->relname),
+				errdetail_relkind_not_supported(relkind));
+
+	ReleaseSysCache(tuple);
+}
+
+/*
+ * COPY (ON_ERROR TABLE) log COPY FROM error details to the ERROR_TABLE.
+ * Therefore, the current user must have INSERT privileges on all columns of the
+ * ERROR_TABLE.
+ */
+static void
+CopyFromErrorTablePermissionCheck(ParseState *pstate, Relation rel)
+{
+	LOCKMODE	lockmode = RowExclusiveLock;
+	ParseNamespaceItem *nsitem;
+	RTEPermissionInfo *perminfo;
+	TupleDesc	tupDesc;
+	AclResult	aclresult;
+
+	/* Must have INSERT privilege on the table */
+	aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(), ACL_INSERT);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult, get_relkind_objtype(get_rel_relkind(RelationGetRelid(rel))),
+					   RelationGetRelationName(rel));
+
+	nsitem = addRangeTableEntryForRelation(pstate, rel, lockmode,
+										   NULL, false, false);
+	perminfo = nsitem->p_perminfo;
+	perminfo->requiredPerms = ACL_INSERT;
+
+	tupDesc = RelationGetDescr(rel);
+
+	/* Must have INSERT privilege on each column of the table */
+	for (int i = 0; i < tupDesc->natts; i++)
+	{
+		Bitmapset **bms;
+		int			attno;
+
+		CompactAttribute *attr = TupleDescCompactAttr(tupDesc, i);
+
+		if (attr->attisdropped)
+			continue;
+
+		attno = i + 1 - FirstLowInvalidHeapAttributeNumber;
+		bms = &perminfo->insertedCols;
+
+		*bms = bms_add_member(*bms, attno);
+	}
+
+	ExecCheckPermissions(pstate->p_rtable, list_make1(perminfo), true);
+}
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 65fd5a0ab4f..8e870309339 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -1101,6 +1101,89 @@ CopyFromTextLikeOneRow(CopyFromState cstate, ExprContext *econtext,
 					cstate->num_errors++;
 				}
 			}
+			else if (cstate->opts.on_error == COPY_ON_ERROR_TABLE)
+			{
+				char	   *err_code;
+				Datum	   *newvalues;
+				bool	   *newnulls;
+				ModifyTableState *mtstate = cstate->mtcontext->mtstate;
+				EState	   *estate = mtstate->ps.state;
+
+				/* Prepare to build the result tuple */
+				TupleTableSlot *myslot = ExecGetReturningSlot(estate,
+															  mtstate->resultRelInfo);
+
+				ExecClearTuple(myslot);
+
+				newvalues = myslot->tts_values;
+				newnulls = myslot->tts_isnull;
+
+				Assert(RelationGetDescr(cstate->error_rel)->natts == 10);
+
+				for (int i = 0; i < RelationGetDescr(cstate->error_rel)->natts; i++)
+					newnulls[i] = false;
+
+				newvalues[0] = ObjectIdGetDatum(GetUserId());
+				newvalues[1] = ObjectIdGetDatum(cstate->rel->rd_rel->oid);
+				newvalues[2] = CStringGetTextDatum(cstate->filename ? cstate->filename : "STDIN");
+				newvalues[3] = Int64GetDatum((int64) cstate->cur_lineno);
+				newvalues[4] = CStringGetTextDatum(cstate->line_buf.data);
+				newvalues[5] = CStringGetTextDatum(cstate->cur_attname);
+
+				if (string)
+					newvalues[6] = CStringGetTextDatum(string);
+				else
+				{
+					newvalues[6] = (Datum) 0;
+					newnulls[6] = true;
+				}
+
+				if (cstate->escontext->error_data->message)
+					newvalues[7] =
+						CStringGetTextDatum(cstate->escontext->error_data->message);
+				else
+				{
+					newvalues[7] = (Datum) 0;
+					newnulls[7] = true;
+				}
+
+				if (cstate->escontext->error_data->detail)
+					newvalues[8] =
+						CStringGetTextDatum(cstate->escontext->error_data->detail);
+				else
+				{
+					newvalues[8] = (Datum) 0;
+					newnulls[8] = true;
+				}
+
+				err_code = unpack_sql_state(cstate->escontext->error_data->sqlerrcode);
+				newvalues[9] = CStringGetTextDatum(err_code);
+
+				/* Build the virtual tuple. */
+				ExecStoreVirtualTuple(myslot);
+
+				/*
+				 * On first call, fire BEFORE STATEMENT triggers before
+				 * proceeding. We will only fire BEFORE STATEMENT on
+				 * ERROR_TABLE once.
+				 */
+				if (mtstate->fireBSTriggers)
+				{
+					AfterTriggerBeginQuery();
+					ExecSetupTransitionCaptureState(mtstate, estate);
+					fireBSTriggers(mtstate);
+					mtstate->fireBSTriggers = false;
+				}
+
+				ExecInsert(cstate->mtcontext,
+						   cstate->mtcontext->mtstate->resultRelInfo,
+						   myslot,
+						   false,
+						   NULL,
+						   NULL);
+
+				cstate->num_errors++;
+			}
 
 			if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
 			{
@@ -1130,6 +1213,13 @@ CopyFromTextLikeOneRow(CopyFromState cstate, ExprContext *econtext,
 									   cstate->cur_lineno,
 									   cstate->cur_attname,
 									   attval));
+					else if (cstate->opts.on_error == COPY_ON_ERROR_TABLE)
+						ereport(NOTICE,
+								errmsg("saving error information to table \"%s\" row due to data type incompatibility at line %" PRIu64 " for column \"%s\": \"%s\"",
+									   RelationGetRelationName(cstate->error_rel),
+									   cstate->cur_lineno,
+									   cstate->cur_attname,
+									   attval));
 					pfree(attval);
 				}
 				else
@@ -1139,6 +1229,12 @@ CopyFromTextLikeOneRow(CopyFromState cstate, ExprContext *econtext,
 								errmsg("skipping row due to data type incompatibility at line %" PRIu64 " for column \"%s\": null input",
 									   cstate->cur_lineno,
 									   cstate->cur_attname));
+					else if (cstate->opts.on_error == COPY_ON_ERROR_TABLE)
+						ereport(NOTICE,
+								errmsg("saving error information to table \"%s\" row due to data type incompatibility at line %" PRIu64 " for column \"%s\": null input",
+									   RelationGetRelationName(cstate->error_rel),
+									   cstate->cur_lineno,
+									   cstate->cur_attname));
 				}
 				/* reset relname_only */
 				cstate->relname_only = false;
@@ -1148,6 +1244,8 @@ CopyFromTextLikeOneRow(CopyFromState cstate, ExprContext *econtext,
 				return true;
 			else if (cstate->opts.on_error == COPY_ON_ERROR_SET_NULL)
 				continue;
+			else if (cstate->opts.on_error == COPY_ON_ERROR_TABLE)
+				return true;
 		}
 
 		cstate->cur_attname = NULL;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index ff4e1388c55..2854f2a884f 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -3755,6 +3755,7 @@ copy_generic_opt_arg:
 			| NumericOnly					{ $$ = (Node *) $1; }
 			| '*'							{ $$ = (Node *) makeNode(A_Star); }
 			| DEFAULT                       { $$ = (Node *) makeString("default"); }
+			| TABLE                         { $$ = (Node *) makeString("table"); }
 			| '(' copy_generic_opt_arg_list ')'		{ $$ = (Node *) $2; }
 			| /* EMPTY */					{ $$ = NULL; }
 		;
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index abecfe51098..3745342d0f9 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -36,6 +36,7 @@ typedef enum CopyOnErrorChoice
 	COPY_ON_ERROR_STOP = 0,		/* immediately throw errors, default */
 	COPY_ON_ERROR_IGNORE,		/* ignore errors */
 	COPY_ON_ERROR_SET_NULL,		/* set error field to null */
+	COPY_ON_ERROR_TABLE,		/* save input conversion errors info to table */
 } CopyOnErrorChoice;
 
 /*
@@ -96,6 +97,7 @@ typedef struct CopyFormatOptions
 	CopyOnErrorChoice on_error; /* what to do when error happened */
 	CopyLogVerbosityChoice log_verbosity;	/* verbosity of logged messages */
 	int64		reject_limit;	/* maximum tolerable number of errors */
+	char		*error_table; 	/* on error, save error info to the table, table name */
 	List	   *convert_select; /* list of column names (can be NIL) */
 } CopyFormatOptions;
 
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index 9d3e244ee55..2f4aa296b0e 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -16,6 +16,7 @@
 
 #include "commands/copy.h"
 #include "commands/trigger.h"
+#include "executor/nodeModifyTable.h"
 #include "nodes/miscnodes.h"
 
 /*
@@ -73,6 +74,7 @@ typedef struct CopyFromStateData
 
 	/* parameters from the COPY command */
 	Relation	rel;			/* relation to copy from */
+	Relation	error_rel;		/* relation for copy from error saving */
 	List	   *attnumlist;		/* integer list of attnums to copy */
 	char	   *filename;		/* filename, or NULL for STDIN */
 	bool		is_program;		/* is 'filename' a program to popen? */
@@ -189,6 +191,13 @@ typedef struct CopyFromStateData
 #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
 
 	uint64		bytes_processed;	/* number of bytes processed so far */
+
+	/*
+	 * INSERT operation context for inserting COPY FROM input conversion
+	 * failure error information to error_table. Populated only when ON_ERROR
+	 * TABLE is specified.
+	 */
+	ModifyTableContext *mtcontext;
 } CopyFromStateData;
 
 extern void ReceiveCopyBegin(CopyFromState cstate);
diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out
index 7600e5239d2..85f11c9c1e9 100644
--- a/src/test/regress/expected/copy2.out
+++ b/src/test/regress/expected/copy2.out
@@ -884,7 +884,175 @@ ERROR:  skipped more than REJECT_LIMIT (3) rows due to data type incompatibility
 CONTEXT:  COPY check_ign_err, line 5, column n: ""
 COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 4);
 NOTICE:  4 rows were skipped due to data type incompatibility
+-- Tests for ON_ERROR TABLE, saving error information to a table
+-- Direct modification of system catalog composite type 'copy_error_saving' is
+-- not permitted.
+ALTER TYPE copy_error_saving ADD ATTRIBUTE b text;
+ERROR:  permission denied: "copy_error_saving" is a system catalog
+ALTER TYPE copy_error_saving DROP ATTRIBUTE userid;
+ERROR:  permission denied: "copy_error_saving" is a system catalog
+ALTER TYPE copy_error_saving RENAME ATTRIBUTE userid to userid1;
+ERROR:  permission denied: "copy_error_saving" is a system catalog
+ALTER TYPE copy_error_saving ALTER ATTRIBUTE userid SET DATA TYPE OID8;
+ERROR:  permission denied: "copy_error_saving" is a system catalog
+CREATE TABLE t_on_error_table(a jsonb, b int, c int, d dcheck_ign_err2);
+CREATE TABLE err_tbl OF copy_error_saving;
+CREATE TABLE err_tbl1 OF copy_error_saving;
+CREATE TABLE err_tbl2 OF copy_error_saving PARTITION BY RANGE (lineno);
+CREATE UNIQUE INDEX err_tbl_idx ON err_tbl(colname);
+CREATE TEMP TABLE err_tbl3 AS SELECT * FROM err_tbl;
+CREATE TYPE t_copy_typ AS (a int, b int, c int);
+CREATE TABLE t_copy_tbl1 OF t_copy_typ;
+CREATE TEMP VIEW t_copy_v1 AS SELECT * FROM t_on_error_table;
+-- all of the following should fail
+COPY t_on_error_table FROM STDIN WITH (FORMAT BINARY, ON_ERROR TABLE, ERROR_TABLE err_tbl);
+ERROR:  only ON_ERROR STOP is allowed in BINARY mode
+COPY t_on_error_table FROM STDIN WITH (ON_ERROR TABLE, ERROR_TABLE t_on_error_table);
+ERROR:  cannot use relation "t_on_error_table" for COPY error saving while copying data to it
+COPY t_on_error_table(a) FROM STDIN WITH (ON_ERROR TABLE, ERROR_TABLE t_copy_tbl1);
+ERROR:  cannot use relation "t_copy_tbl1" for COPY error saving
+DETAIL:  Relation "t_copy_tbl1" is a typed table based on type "public.t_copy_typ".
+HINT:  The COPY error saving table must be a typed table based on type "pg_catalog.copy_error_saving".
+COPY t_on_error_table(a) FROM STDIN WITH (ON_ERROR TABLE, ERROR_TABLE t_copy_v1);
+ERROR:  cannot use relation "t_copy_v1" for COPY error saving
+DETAIL:  This operation is not supported for views.
+COPY t_on_error_table(a,b) FROM STDIN WITH (ON_ERROR TABLE, reject_limit 10, ERROR_TABLE err_tbl);
+ERROR:  cannot set option REJECT_LIMIT when ON_ERROR is specified as "TABLE"
+COPY t_on_error_table(a,b) FROM STDIN WITH (ON_ERROR TABLE, ERROR_TABLE not_exists);
+ERROR:  relation "not_exists" does not exist
+COPY t_on_error_table FROM STDIN WITH (ON_ERROR TABLE);
+ERROR:  cannot set option ON_ERROR to "TABLE" when "ERROR_TABLE" is not specified
+HINT:  "ERROR_TABLE" option is required
+COPY t_on_error_table FROM STDIN WITH (ERROR_TABLE err_tbl);
+ERROR:  COPY ERROR_TABLE can only be used when option ON_ERROR is set to "TABLE"
+COPY t_on_error_table TO STDIN WITH (ON_ERROR TABLE);
+ERROR:  COPY ON_ERROR cannot be used with COPY TO
+LINE 1: COPY t_on_error_table TO STDIN WITH (ON_ERROR TABLE);
+                                             ^
+-- all of the above should fail
+-- The error saving table must be a plain table and it must not have rules, nor
+-- row-level security policies.
+COPY t_on_error_table(a,b) FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl2); -- error
+ERROR:  cannot use relation "err_tbl2" for COPY error saving
+DETAIL:  This operation is not supported for partitioned tables.
+CREATE RULE regtest_test_rule AS ON INSERT TO err_tbl1 DO ALSO NOTHING;
+COPY t_on_error_table(a,b) FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl1); -- error
+ERROR:  cannot use relation "err_tbl1" for COPY error saving
+DETAIL:  The error saving table cannot have rules.
+DROP RULE regtest_test_rule ON err_tbl1;
+CREATE POLICY p1 ON err_tbl1 FOR SELECT USING (true);
+ALTER TABLE err_tbl1 ENABLE ROW LEVEL SECURITY;
+ALTER TABLE err_tbl1 FORCE ROW LEVEL SECURITY;
+COPY t_on_error_table(a,b) FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl1); -- error
+ERROR:  cannot use relation "err_tbl1" for COPY error saving
+DETAIL:  The error saving table cannot have row-level security policies.
+DROP POLICY IF EXISTS p1 ON err_tbl1;
+ALTER TABLE err_tbl1 DISABLE ROW LEVEL SECURITY;
+ALTER TABLE err_tbl ADD CONSTRAINT cc2 CHECK(lineno > 0);
+ALTER TABLE err_tbl ADD CONSTRAINT cc3 NOT NULL userid;
+COPY t_on_error_table(a, b) FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl1); -- ok
+-- ok, constraints on table err_tbl1 will be vertified
+COPY t_on_error_table(b, a) FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl1);
+NOTICE:  1 row was saved to table "err_tbl1" due to data type incompatibility
+ALTER TABLE err_tbl DROP CONSTRAINT cc2;
+ALTER TABLE err_tbl DROP CONSTRAINT cc3;
+TRUNCATE err_tbl;
+-- fail, copied data have extra columns
+COPY t_on_error_table(a,b) FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl);
+ERROR:  extra data after last expected column
+CONTEXT:  COPY t_on_error_table, line 1: "1,2,3,4,5"
+-- fail, copied data have less columns
+COPY t_on_error_table(a,b) FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl);
+ERROR:  extra data after last expected column
+CONTEXT:  COPY t_on_error_table, line 1: "1,2,3"
+-- permission check
+BEGIN;
+CREATE USER regress_user30;
+GRANT INSERT(userid, copy_tbl, filename, lineno, line, colname, raw_field_value, err_message, err_detail)
+	ON TABLE err_tbl TO regress_user30;
+GRANT INSERT ON TABLE t_on_error_table TO regress_user30;
+GRANT SELECT ON TABLE err_tbl TO regress_user30;
+SAVEPOINT s1;
+SET ROLE regress_user30;
+COPY t_on_error_table FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl); -- error, not enough privilege
+ERROR:  permission denied for table err_tbl
+ROLLBACK TO SAVEPOINT s1;
+RESET ROLE;
+GRANT INSERT on TABLE err_tbl to regress_user30;
+GRANT INSERT(errorcode) ON TABLE err_tbl TO regress_user30;
+SET ROLE regress_user30;
+COPY t_on_error_table FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl, LOG_VERBOSITY verbose); -- ok
+NOTICE:  saving error information to table "err_tbl" row due to data type incompatibility at line 1 for column "a": "a"
+NOTICE:  1 row was saved to table "err_tbl" due to data type incompatibility
+SELECT copy_tbl::regclass, filename, lineno, line, colname, raw_field_value, err_message, err_detail, errorcode
+FROM err_tbl;
+     copy_tbl     | filename | lineno |  line   | colname | raw_field_value |            err_message             |      err_detail       | errorcode 
+------------------+----------+--------+---------+---------+-----------------+------------------------------------+-----------------------+-----------
+ t_on_error_table | STDIN    |      1 | a,b,3,4 | a       | a               | invalid input syntax for type json | Token "a" is invalid. | 22P02
+(1 row)
+
+-- error, due to unique constraint violation on table err_tbl
+COPY t_on_error_table FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl);
+ERROR:  duplicate key value violates unique constraint "err_tbl_idx"
+DETAIL:  Key (colname)=(a) already exists.
+CONTEXT:  COPY t_on_error_table, line 1, column a: "a"
+ROLLBACK;
+DROP INDEX err_tbl_idx;
+CREATE FUNCTION trig_copy_error_saving_insert()
+RETURNS TRIGGER LANGUAGE plpgsql AS
+$$
+BEGIN
+  RAISE NOTICE 'trigger name: %: TG_OP: % WHEN: % TG_LEVEL: %', TG_NAME, TG_WHEN, TG_OP, TG_LEVEL;
+  RAISE NOTICE 'NEW raw_field_value: %, err_message: %', NEW.raw_field_value, NEW.err_message;
+  RETURN NEW;
+END;
+$$;
+CREATE TRIGGER err_tbl_row_trig
+  BEFORE INSERT ON err_tbl
+  FOR EACH ROW EXECUTE PROCEDURE trig_copy_error_saving_insert();
+CREATE TRIGGER err_tbl_stmt_trig
+  BEFORE INSERT ON err_tbl
+  FOR EACH STATEMENT EXECUTE PROCEDURE trig_copy_error_saving_insert();
+CREATE TRIGGER err_tbl_after_stmt_trig
+  AFTER INSERT ON err_tbl
+  FOR EACH STATEMENT EXECUTE PROCEDURE trig_copy_error_saving_insert();
+COPY t_on_error_table FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl);
+NOTICE:  trigger name: err_tbl_stmt_trig: TG_OP: BEFORE WHEN: INSERT TG_LEVEL: STATEMENT
+NOTICE:  NEW raw_field_value: <NULL>, err_message: <NULL>
+NOTICE:  trigger name: err_tbl_row_trig: TG_OP: BEFORE WHEN: INSERT TG_LEVEL: ROW
+NOTICE:  NEW raw_field_value: a, err_message: invalid input syntax for type integer: "a"
+NOTICE:  trigger name: err_tbl_row_trig: TG_OP: BEFORE WHEN: INSERT TG_LEVEL: ROW
+NOTICE:  NEW raw_field_value: <NULL>, err_message: domain dcheck_ign_err2 does not allow null values
+NOTICE:  trigger name: err_tbl_row_trig: TG_OP: BEFORE WHEN: INSERT TG_LEVEL: ROW
+NOTICE:  NEW raw_field_value: _junk, err_message: invalid input syntax for type integer: "_junk"
+NOTICE:  trigger name: err_tbl_row_trig: TG_OP: BEFORE WHEN: INSERT TG_LEVEL: ROW
+NOTICE:  NEW raw_field_value: cola, err_message: invalid input syntax for type json
+NOTICE:  trigger name: err_tbl_row_trig: TG_OP: BEFORE WHEN: INSERT TG_LEVEL: ROW
+NOTICE:  NEW raw_field_value: 4238679732489879879, err_message: value "4238679732489879879" is out of range for type integer
+NOTICE:  trigger name: err_tbl_after_stmt_trig: TG_OP: AFTER WHEN: INSERT TG_LEVEL: STATEMENT
+NOTICE:  NEW raw_field_value: <NULL>, err_message: <NULL>
+NOTICE:  5 rows were saved to table "err_tbl" due to data type incompatibility
+SELECT copy_tbl::regclass, filename, lineno, line, colname ,raw_field_value, err_message, err_detail, errorcode
+FROM err_tbl;
+     copy_tbl     | filename | lineno |            line            | colname |   raw_field_value   |                         err_message                          |        err_detail        | errorcode 
+------------------+----------+--------+----------------------------+---------+---------------------+--------------------------------------------------------------+--------------------------+-----------
+ t_on_error_table | STDIN    |      1 | 1,2,a,1                    | c       | a                   | invalid input syntax for type integer: "a"                   |                          | 22P02
+ t_on_error_table | STDIN    |      2 | 1,2,3,\N                   | d       |                     | domain dcheck_ign_err2 does not allow null values            |                          | 23502
+ t_on_error_table | STDIN    |      3 | 1,_junk,test,11            | b       | _junk               | invalid input syntax for type integer: "_junk"               |                          | 22P02
+ t_on_error_table | STDIN    |      4 | cola,colb,colc,12          | a       | cola                | invalid input syntax for type json                           | Token "cola" is invalid. | 22P02
+ t_on_error_table | STDIN    |      6 | 1,11,4238679732489879879,2 | c       | 4238679732489879879 | value "4238679732489879879" is out of range for type integer |                          | 22003
+(5 rows)
+
 -- clean up
+DROP TABLE err_tbl;
+DROP TABLE err_tbl1;
+DROP TABLE err_tbl2;
+DROP TABLE err_tbl3;
+DROP VIEW t_copy_v1;
+DROP TABLE t_on_error_table;
+DROP TABLE t_copy_tbl1;
+DROP TYPE t_copy_typ;
+DROP FUNCTION trig_copy_error_saving_insert();
 DROP TABLE forcetest;
 DROP TABLE vistest;
 DROP FUNCTION truncate_in_subxact();
diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql
index e0810109473..122e598ae7c 100644
--- a/src/test/regress/sql/copy2.sql
+++ b/src/test/regress/sql/copy2.sql
@@ -636,7 +636,145 @@ a	{7}	7
 10	{10}	10
 \.
 
+-- Tests for ON_ERROR TABLE, saving error information to a table
+
+-- Direct modification of system catalog composite type 'copy_error_saving' is
+-- not permitted.
+ALTER TYPE copy_error_saving ADD ATTRIBUTE b text;
+ALTER TYPE copy_error_saving DROP ATTRIBUTE userid;
+ALTER TYPE copy_error_saving RENAME ATTRIBUTE userid to userid1;
+ALTER TYPE copy_error_saving ALTER ATTRIBUTE userid SET DATA TYPE OID8;
+
+CREATE TABLE t_on_error_table(a jsonb, b int, c int, d dcheck_ign_err2);
+CREATE TABLE err_tbl OF copy_error_saving;
+CREATE TABLE err_tbl1 OF copy_error_saving;
+CREATE TABLE err_tbl2 OF copy_error_saving PARTITION BY RANGE (lineno);
+CREATE UNIQUE INDEX err_tbl_idx ON err_tbl(colname);
+CREATE TEMP TABLE err_tbl3 AS SELECT * FROM err_tbl;
+CREATE TYPE t_copy_typ AS (a int, b int, c int);
+CREATE TABLE t_copy_tbl1 OF t_copy_typ;
+CREATE TEMP VIEW t_copy_v1 AS SELECT * FROM t_on_error_table;
+
+-- all of the following should fail
+COPY t_on_error_table FROM STDIN WITH (FORMAT BINARY, ON_ERROR TABLE, ERROR_TABLE err_tbl);
+COPY t_on_error_table FROM STDIN WITH (ON_ERROR TABLE, ERROR_TABLE t_on_error_table);
+COPY t_on_error_table(a) FROM STDIN WITH (ON_ERROR TABLE, ERROR_TABLE t_copy_tbl1);
+COPY t_on_error_table(a) FROM STDIN WITH (ON_ERROR TABLE, ERROR_TABLE t_copy_v1);
+COPY t_on_error_table(a,b) FROM STDIN WITH (ON_ERROR TABLE, reject_limit 10, ERROR_TABLE err_tbl);
+COPY t_on_error_table(a,b) FROM STDIN WITH (ON_ERROR TABLE, ERROR_TABLE not_exists);
+COPY t_on_error_table FROM STDIN WITH (ON_ERROR TABLE);
+COPY t_on_error_table FROM STDIN WITH (ERROR_TABLE err_tbl);
+COPY t_on_error_table TO STDIN WITH (ON_ERROR TABLE);
+-- all of the above should fail
+
+-- The error saving table must be a plain table and it must not have rules, nor
+-- row-level security policies.
+COPY t_on_error_table(a,b) FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl2); -- error
+CREATE RULE regtest_test_rule AS ON INSERT TO err_tbl1 DO ALSO NOTHING;
+COPY t_on_error_table(a,b) FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl1); -- error
+DROP RULE regtest_test_rule ON err_tbl1;
+
+CREATE POLICY p1 ON err_tbl1 FOR SELECT USING (true);
+ALTER TABLE err_tbl1 ENABLE ROW LEVEL SECURITY;
+ALTER TABLE err_tbl1 FORCE ROW LEVEL SECURITY;
+COPY t_on_error_table(a,b) FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl1); -- error
+DROP POLICY IF EXISTS p1 ON err_tbl1;
+ALTER TABLE err_tbl1 DISABLE ROW LEVEL SECURITY;
+
+ALTER TABLE err_tbl ADD CONSTRAINT cc2 CHECK(lineno > 0);
+ALTER TABLE err_tbl ADD CONSTRAINT cc3 NOT NULL userid;
+COPY t_on_error_table(a, b) FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl1); -- ok
+\.
+-- ok, constraints on table err_tbl1 will be vertified
+COPY t_on_error_table(b, a) FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl1);
+a,b
+\.
+ALTER TABLE err_tbl DROP CONSTRAINT cc2;
+ALTER TABLE err_tbl DROP CONSTRAINT cc3;
+TRUNCATE err_tbl;
+
+-- fail, copied data have extra columns
+COPY t_on_error_table(a,b) FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl);
+1,2,3,4,5
+\.
+
+-- fail, copied data have less columns
+COPY t_on_error_table(a,b) FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl);
+1,2,3
+\.
+
+-- permission check
+BEGIN;
+CREATE USER regress_user30;
+GRANT INSERT(userid, copy_tbl, filename, lineno, line, colname, raw_field_value, err_message, err_detail)
+	ON TABLE err_tbl TO regress_user30;
+GRANT INSERT ON TABLE t_on_error_table TO regress_user30;
+GRANT SELECT ON TABLE err_tbl TO regress_user30;
+SAVEPOINT s1;
+
+SET ROLE regress_user30;
+COPY t_on_error_table FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl); -- error, not enough privilege
+ROLLBACK TO SAVEPOINT s1;
+
+RESET ROLE;
+GRANT INSERT on TABLE err_tbl to regress_user30;
+GRANT INSERT(errorcode) ON TABLE err_tbl TO regress_user30;
+SET ROLE regress_user30;
+COPY t_on_error_table FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl, LOG_VERBOSITY verbose); -- ok
+a,b,3,4
+\.
+
+SELECT copy_tbl::regclass, filename, lineno, line, colname, raw_field_value, err_message, err_detail, errorcode
+FROM err_tbl;
+
+-- error, due to unique constraint violation on table err_tbl
+COPY t_on_error_table FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl);
+a,b,3,4
+\.
+ROLLBACK;
+DROP INDEX err_tbl_idx;
+
+CREATE FUNCTION trig_copy_error_saving_insert()
+RETURNS TRIGGER LANGUAGE plpgsql AS
+$$
+BEGIN
+  RAISE NOTICE 'trigger name: %: TG_OP: % WHEN: % TG_LEVEL: %', TG_NAME, TG_WHEN, TG_OP, TG_LEVEL;
+  RAISE NOTICE 'NEW raw_field_value: %, err_message: %', NEW.raw_field_value, NEW.err_message;
+  RETURN NEW;
+END;
+$$;
+
+CREATE TRIGGER err_tbl_row_trig
+  BEFORE INSERT ON err_tbl
+  FOR EACH ROW EXECUTE PROCEDURE trig_copy_error_saving_insert();
+CREATE TRIGGER err_tbl_stmt_trig
+  BEFORE INSERT ON err_tbl
+  FOR EACH STATEMENT EXECUTE PROCEDURE trig_copy_error_saving_insert();
+CREATE TRIGGER err_tbl_after_stmt_trig
+  AFTER INSERT ON err_tbl
+  FOR EACH STATEMENT EXECUTE PROCEDURE trig_copy_error_saving_insert();
+
+COPY t_on_error_table FROM STDIN WITH (DELIMITER ',', ON_ERROR TABLE, ERROR_TABLE err_tbl);
+1,2,a,1
+1,2,3,\N
+1,_junk,test,11
+cola,colb,colc,12
+4,5,6,1111
+1,11,4238679732489879879,2
+\.
+SELECT copy_tbl::regclass, filename, lineno, line, colname ,raw_field_value, err_message, err_detail, errorcode
+FROM err_tbl;
+
 -- clean up
+DROP TABLE err_tbl;
+DROP TABLE err_tbl1;
+DROP TABLE err_tbl2;
+DROP TABLE err_tbl3;
+DROP VIEW t_copy_v1;
+DROP TABLE t_on_error_table;
+DROP TABLE t_copy_tbl1;
+DROP TYPE t_copy_typ;
+DROP FUNCTION trig_copy_error_saving_insert();
 DROP TABLE forcetest;
 DROP TABLE vistest;
 DROP FUNCTION truncate_in_subxact();
-- 
2.34.1

From 43404ffc048cc68d35ee040e307e909ee60d600f Mon Sep 17 00:00:00 2001
From: jian he <[email protected]>
Date: Wed, 6 May 2026 09:36:40 +0800
Subject: [PATCH v9 1/2] export ExecInsert and other friends function

The ExecInsert function encapsulates core logic for the insertion pipeline,
including partition routing, BEFORE ROW triggers, INSTEAD OF triggers, and AFTER
ROW triggers and others.

exporting ExecInsert, the COPY FROM command can leverage the exact same
execution path as standard inserts.

Export ExecSetupTransitionCaptureState, fireBSTriggers, fireASTriggers so later
patch can use it.

reference: https://postgr.es/m/CACJufxHi53OpGYPAe6SdCb4m=-+H8L+7LDbUWvTiJp=v4yy...@mail.gmail.com
reference: https://postgr.es/m/752672.1699474336%40sss.pgh.pa.us
discussion: https://postgr.es/m/CACJufxH_OJpVra=0c4ow8fbxhj7hemcvatnepa5vaursena...@mail.gmail.com
commitfest entry: https://commitfest.postgresql.org/patch/4817
---
 src/backend/executor/nodeModifyTable.c | 49 +++-----------------------
 src/include/executor/nodeModifyTable.h | 48 +++++++++++++++++++++++++
 2 files changed, 52 insertions(+), 45 deletions(-)

diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 4cb057ca4f9..8d7f8073166 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -83,44 +83,6 @@ typedef struct MTTargetRelLookup
 	int			relationIndex;	/* rel's index in resultRelInfo[] array */
 } MTTargetRelLookup;
 
-/*
- * Context struct for a ModifyTable operation, containing basic execution
- * state and some output variables populated by ExecUpdateAct() and
- * ExecDeleteAct() to report the result of their actions to callers.
- */
-typedef struct ModifyTableContext
-{
-	/* Operation state */
-	ModifyTableState *mtstate;
-	EPQState   *epqstate;
-	EState	   *estate;
-
-	/*
-	 * Slot containing tuple obtained from ModifyTable's subplan.  Used to
-	 * access "junk" columns that are not going to be stored.
-	 */
-	TupleTableSlot *planSlot;
-
-	/*
-	 * Information about the changes that were made concurrently to a tuple
-	 * being updated or deleted
-	 */
-	TM_FailureData tmfd;
-
-	/*
-	 * The tuple deleted when doing a cross-partition UPDATE with a RETURNING
-	 * clause that refers to OLD columns (converted to the root's tuple
-	 * descriptor).
-	 */
-	TupleTableSlot *cpDeletedSlot;
-
-	/*
-	 * The tuple projected by the INSERT's RETURNING clause, when doing a
-	 * cross-partition UPDATE
-	 */
-	TupleTableSlot *cpUpdateReturningSlot;
-} ModifyTableContext;
-
 /*
  * Context struct containing output data specific to UPDATE operations.
  */
@@ -195,9 +157,6 @@ static TupleTableSlot *ExecMergeMatched(ModifyTableContext *context,
 static TupleTableSlot *ExecMergeNotMatched(ModifyTableContext *context,
 										   ResultRelInfo *resultRelInfo,
 										   bool canSetTag);
-static void ExecSetupTransitionCaptureState(ModifyTableState *mtstate, EState *estate);
-static void fireBSTriggers(ModifyTableState *node);
-static void fireASTriggers(ModifyTableState *node);
 
 
 /*
@@ -867,7 +826,7 @@ ExecGetUpdateNewTuple(ResultRelInfo *relinfo,
  *		save the previous value to avoid losing track of it.
  * ----------------------------------------------------------------
  */
-static TupleTableSlot *
+TupleTableSlot *
 ExecInsert(ModifyTableContext *context,
 		   ResultRelInfo *resultRelInfo,
 		   TupleTableSlot *slot,
@@ -4443,7 +4402,7 @@ ExecInitMergeTupleSlots(ModifyTableState *mtstate,
 /*
  * Process BEFORE EACH STATEMENT triggers
  */
-static void
+void
 fireBSTriggers(ModifyTableState *node)
 {
 	ModifyTable *plan = (ModifyTable *) node->ps.plan;
@@ -4480,7 +4439,7 @@ fireBSTriggers(ModifyTableState *node)
 /*
  * Process AFTER EACH STATEMENT triggers
  */
-static void
+void
 fireASTriggers(ModifyTableState *node)
 {
 	ModifyTable *plan = (ModifyTable *) node->ps.plan;
@@ -4525,7 +4484,7 @@ fireASTriggers(ModifyTableState *node)
  * Set up the state needed for collecting transition tuples for AFTER
  * triggers.
  */
-static void
+void
 ExecSetupTransitionCaptureState(ModifyTableState *mtstate, EState *estate)
 {
 	ModifyTable *plan = (ModifyTable *) mtstate->ps.plan;
diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h
index f6070e1cdf3..e8a86ec3c75 100644
--- a/src/include/executor/nodeModifyTable.h
+++ b/src/include/executor/nodeModifyTable.h
@@ -13,8 +13,47 @@
 #ifndef NODEMODIFYTABLE_H
 #define NODEMODIFYTABLE_H
 
+#include "access/tableam.h"
 #include "nodes/execnodes.h"
 
+/*
+ * Context struct for a ModifyTable operation, containing basic execution
+ * state and some output variables populated by ExecUpdateAct() and
+ * ExecDeleteAct() to report the result of their actions to callers.
+ */
+typedef struct ModifyTableContext
+{
+	/* Operation state */
+	ModifyTableState *mtstate;
+	EPQState   *epqstate;
+	EState	   *estate;
+
+	/*
+	 * Slot containing tuple obtained from ModifyTable's subplan.  Used to
+	 * access "junk" columns that are not going to be stored.
+	 */
+	TupleTableSlot *planSlot;
+
+	/*
+	 * Information about the changes that were made concurrently to a tuple
+	 * being updated or deleted
+	 */
+	TM_FailureData tmfd;
+
+	/*
+	 * The tuple deleted when doing a cross-partition UPDATE with a RETURNING
+	 * clause that refers to OLD columns (converted to the root's tuple
+	 * descriptor).
+	 */
+	TupleTableSlot *cpDeletedSlot;
+
+	/*
+	 * The tuple projected by the INSERT's RETURNING clause, when doing a
+	 * cross-partition UPDATE
+	 */
+	TupleTableSlot *cpUpdateReturningSlot;
+} ModifyTableContext;
+
 extern void ExecInitGenerated(ResultRelInfo *resultRelInfo,
 							  EState *estate,
 							  CmdType cmdtype);
@@ -24,10 +63,19 @@ extern void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo,
 									   CmdType cmdtype);
 
 extern ModifyTableState *ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags);
+extern TupleTableSlot *ExecInsert(ModifyTableContext *context,
+								  ResultRelInfo *resultRelInfo,
+								  TupleTableSlot *slot,
+								  bool canSetTag,
+								  TupleTableSlot **inserted_tuple,
+								  ResultRelInfo **insert_destrel);
 extern void ExecEndModifyTable(ModifyTableState *node);
 extern void ExecReScanModifyTable(ModifyTableState *node);
 
 extern void ExecInitMergeTupleSlots(ModifyTableState *mtstate,
 									ResultRelInfo *resultRelInfo);
 
+extern void fireBSTriggers(ModifyTableState *node);
+extern void ExecSetupTransitionCaptureState(ModifyTableState *mtstate, EState *estate);
+extern void fireASTriggers(ModifyTableState *node);
 #endif							/* NODEMODIFYTABLE_H */
-- 
2.34.1

Reply via email to