Re: POC PATCH: copy from ... exceptions to: (was Re: VLDB Features)

2023-11-14 Thread Damir Belyalov
>  Here is a very straw-man-level sketch of what I think might work.
>  The option to COPY FROM looks something like
>
>   ERRORS TO other_table_name (item [, item [, ...]])
>

I tried to implement the patch using a table and came across a number of
questions.

Which table should we implement for this feature: a system catalog table or
store this table as a file or create a new table?

In these cases, security and user rights management issues arise.
It is better for other users not to see error lines from another user. It
is also not clear how access rights to this table are inherited and be
given.


--
Regards,
Damir Belyalov
Postgres Professional


Re: POC PATCH: copy from ... exceptions to: (was Re: VLDB Features)

2023-11-08 Thread Damir Belyalov
Hello everyone!
Thanks for turning back to this patch.


I had already thought about storing errors in the table / separate file /
logfile and it seems to me that the best way is to output errors in
logfile. As for user it is more convenient to look for errors in the place
where they are usually generated - in logfile and if he wants to intercept
them he could easily do that by few commands.


The analogues of this feature in other DBSM usually had additional files
for storing errors, but their features had too many options (see attached
files).
I also think that the best way is to simplify this feature for the first
version and don't use redundant adjustments such as additional files and
other options.
IMHO for more complicated operations with loading tables files pgloader
exists: https://github.com/dimitri/pgloader


Links of analogues of COPY IGNORE_DATATYPE_ERRORS
https://dev.mysql.com/doc/refman/8.0/en/load-data.html
https://docs.aws.amazon.com/redshift/latest/dg/r_COPY_command_examples.html

Regards,
Damir Belyalov
Postgres Professional


Re: POC PATCH: copy from ... exceptions to: (was Re: VLDB Features)

2023-09-15 Thread Damir Belyalov
> Since v5 patch failed applying anymore, updated the patch.
Thank you for updating the patch . I made a little review on it where
corrected some formatting.


> - COPY with a datatype error that can't be handled as a soft error
>
> I didn't know proper way to test this, but I've found data type widget's
> input function widget_in() defined to occur hard-error in regress.c,
> attached patch added a test using it.
>

This test seems to be weird a bit, because of the "widget" type. The hard
error is thrown by the previous test with missing data. Also it'll be
interesting for me to list all cases when a hard error can be thrown.

Regards,
Damir Belyalov
Postgres Professional
From 0e1193e00bb5ee810a015a2baaf7c79e395a54c7 Mon Sep 17 00:00:00 2001
From: Damir Belyalov 
Date: Fri, 15 Sep 2023 11:14:57 +0300
Subject: [PATCH] ignore errors

---
 doc/src/sgml/ref/copy.sgml   | 13 +
 src/backend/commands/copy.c  | 13 +
 src/backend/commands/copyfrom.c  | 37 
 src/backend/commands/copyfromparse.c | 20 ++---
 src/bin/psql/tab-complete.c  |  3 +-
 src/include/commands/copy.h  |  1 +
 src/include/commands/copyfrom_internal.h |  3 ++
 src/test/regress/expected/copy2.out  | 28 ++
 src/test/regress/sql/copy2.sql   | 26 +
 9 files changed, 139 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 4d614a0225..d5cdbb4025 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -43,6 +43,7 @@ COPY { table_name [ ( column_name [, ...] ) | * }
 FORCE_NOT_NULL ( column_name [, ...] )
 FORCE_NULL ( column_name [, ...] )
+IGNORE_DATATYPE_ERRORS [ boolean ]
 ENCODING 'encoding_name'
 
  
@@ -370,6 +371,18 @@ COPY { table_name [ ( 

 
+   
+IGNORE_DATATYPE_ERRORS
+
+ 
+  Drops rows that contain malformed data while copying. These are rows
+  with columns where the data type's input-function raises an error.
+  This option is not allowed when using binary format.  Note that this
+  is only supported in current COPY syntax.
+ 
+
+   
+

 ENCODING
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index f14fae3308..beb73f5357 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -419,6 +419,7 @@ ProcessCopyOptions(ParseState *pstate,
 	bool		format_specified = false;
 	bool		freeze_specified = false;
 	bool		header_specified = false;
+	bool		ignore_datatype_errors_specified = false;
 	ListCell   *option;
 
 	/* Support external use for option sanity checking */
@@ -458,6 +459,13 @@ ProcessCopyOptions(ParseState *pstate,
 			freeze_specified = true;
 			opts_out->freeze = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "ignore_datatype_errors") == 0)
+		{
+			if (ignore_datatype_errors_specified)
+errorConflictingDefElem(defel, pstate);
+			ignore_datatype_errors_specified = true;
+			opts_out->ignore_datatype_errors = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "delimiter") == 0)
 		{
 			if (opts_out->delim)
@@ -594,6 +602,11 @@ ProcessCopyOptions(ParseState *pstate,
 (errcode(ERRCODE_SYNTAX_ERROR),
  errmsg("cannot specify DEFAULT in BINARY mode")));
 
+	if (opts_out->binary && opts_out->ignore_datatype_errors)
+		ereport(ERROR,
+(errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("cannot specify IGNORE_DATATYPE_ERRORS in BINARY mode")));
+
 	/* Set defaults for omitted options */
 	if (!opts_out->delim)
 		opts_out->delim = opts_out->csv_mode ? "," : "\t";
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 70871ed819..b18aea6376 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -752,6 +752,14 @@ CopyFrom(CopyFromState cstate)
 		ti_options |= TABLE_INSERT_FROZEN;
 	}
 
+	/* Set up soft error handler for IGNORE_DATATYPE_ERRORS */
+	if (cstate->opts.ignore_datatype_errors)
+	{
+		ErrorSaveContext escontext = {T_ErrorSaveContext};
+		escontext.details_wanted = true;
+		cstate->escontext = escontext;
+	}
+
 	/*
 	 * We need a ResultRelInfo so we can use the regular executor's
 	 * index-entry-making machinery.  (There used to be a huge amount of code
@@ -987,7 +995,36 @@ CopyFrom(CopyFromState cstate)
 
 		/* Directly store the values/nulls array in the slot */
 		if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
+		{
+			if (cstate->opts.ignore_datatype_errors &&
+cstate->ignored_errors_count > 0)
+ereport(WARNING,
+		errmsg("%zd rows were skipped due to data type incompatibility",
+			   cstate->ignored_errors_count));
 			break;
+		}
+
+		/* Soft error occured, skip this tuple

Re: Redundant Unique plan node for table with a unique index

2023-09-14 Thread Damir Belyalov
Thank you for feedback and thread [1].

Regards,
Damir Belyalov
Postgres Professional


Redundant Unique plan node for table with a unique index

2023-09-13 Thread Damir Belyalov
Hello!

There is a table with a unique index on it and we have a query that
searching DISTINCT values on this table on columns of unique index. Example:


create table a (n int);
insert into a (n) select x from generate_series(1, 14) as g(x);
create unique index on a (n);
explain select distinct n from a;
 QUERY PLAN


 Unique  (cost=0.42..6478.42 rows=14 width=4)
   ->  Index Only Scan using a_n_idx on a  (cost=0.42..6128.42 rows=14
width=4)
(2 rows)


We can see that Unique node is redundant for this case. So I implemented a
simple patch that removes Unique node from the plan.
After patch:


explain select distinct n from a;
   QUERY PLAN
-
 Seq Scan on a  (cost=0.00..2020.00 rows=14 width=4)
(1 row)


The patch is rather simple and doesn't consider queries with joins. The
criteria when Unique node is should be removed is a case when a set of Vars
in DISTINCT clause contains unique index columns from the same table.
Another example:
CREATE TABLE a (n int, m int);
CRETE UNIQUE INDEX ON a (n);
SELECT DISTINCT (n,m) FROM a;
The Unique node should be deleted because n is contained in (n,m).


The patch doesn't consider these cases:
1. DISTINCT ON [EXPR]
   Because this case can need grouping.
2. Subqueries.
   Because this case can need grouping:
   CREATE TABLE a (n int);
   CREA UNIQUE INDEX ON a (n);
   SELECT DISTINCT g FROM (SELECT * FROM a) as g;
3. Joins, because it demands complication of code.
   Example:
   SELECT DISTINCT a.n1 JOIN b where a.n1 = b.n1;
   where a.n1 and b.n1 should be unique indexes and join qual should be
on this index columns.
   or
   a have a unique index on n1 and b is "unique for a" on join qual.


I am wondering if there are opportunities for further development of this
patch, in particular for JOIN cases.
For several levels of JOINs we should understand which set columns is
unique for the every joinrel in query. In general terms I identified two
cases when joinrel "saves" unique index from table: when tables are joined
by unique index columns and when one table has unique index and it is
"unique_for" (has one common tuple) another table.


Regards,
Damir Belyalov
Postgres Professional
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 44efb1f4ebc..8f9912f48ad 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -1266,6 +1266,47 @@ preprocess_phv_expression(PlannerInfo *root, Expr *expr)
 	return (Expr *) preprocess_expression(root, (Node *) expr, EXPRKIND_PHV);
 }
 
+static bool
+is_distinct_redundant(Query *parse, RelOptInfo *current_rel, PathTarget *sort_input_target)
+{
+	List	 *distinct_vars = NIL;
+	ListCell *lc1, *lc2;
+
+	/* Doesn't process DISTINCT ON because it can need grouping */
+	if (parse->hasDistinctOn)
+		return false;
+
+	/* Fetch Vars from DISTINCT clause */
+	foreach(lc1, sort_input_target->exprs)
+	{
+		Var *distinct_expr = (Var *) lfirst(lc1);
+
+		if (IsA(distinct_expr, Var))
+			distinct_vars = list_append_unique_int(distinct_vars, distinct_expr->varattno);
+		else
+			/* Doesn't process this case because it can need grouping */
+			return false;
+	}
+
+	foreach(lc2, current_rel->indexlist)
+	{
+		IndexOptInfo *indexinfo = (IndexOptInfo *) lfirst(lc2);
+		List		 *unique_indexkeys = NIL;
+		int			  i;
+
+		if (indexinfo->unique)
+		{
+			for (i = 0; i < indexinfo->ncolumns; i++)
+unique_indexkeys = list_append_unique_int(unique_indexkeys, indexinfo->indexkeys[i]);
+
+			if (list_difference_int(unique_indexkeys, distinct_vars) == NIL)
+return true;
+		}
+	}
+
+	return false;
+}
+
 /*
  * grouping_planner
  *	  Perform planning steps related to grouping, aggregation, etc.
@@ -1694,8 +1735,9 @@ grouping_planner(PlannerInfo *root, double tuple_fraction)
 		 */
 		if (parse->distinctClause)
 		{
-			current_rel = create_distinct_paths(root,
-current_rel);
+			if (!is_distinct_redundant(parse, current_rel, sort_input_target))
+current_rel = create_distinct_paths(root,
+	current_rel);
 		}
 	}			/* end of if (setOperations) */
 
diff --git a/src/test/regress/expected/join.out b/src/test/regress/expected/join.out
index 9b8638f286a..49223c5be10 100644
--- a/src/test/regress/expected/join.out
+++ b/src/test/regress/expected/join.out
@@ -5645,18 +5645,15 @@ select d.* from d left join (select * from b group by b.id, b.c_id) s
 explain (costs off)
 select d.* from d left join (select distinct * from b) s
   on d.a = s.id;
-  QUERY PLAN  
---
+ QUERY PLAN 
+
  Merge Right Join
  

Re: Output affected rows in EXPLAIN

2023-09-07 Thread Damir Belyalov
> This creates a bug, not fixes one.  It's intentional that "insert into a"
> is shown as returning zero rows, because that's what it did.  If you'd
> written "insert ... returning", you'd have gotten a different result:
>
Maybe I didn't understand you correctly, but I didn't touch the number of
affected rows in EXPLAIN output.
It's just a simple patch that adds 1 row after using commands: EXPLAIN
INSERT, EXPLAIN UPDATE, EXPLAIN DELETE.
It was done because the commands INSERT/UPDATE/DELETE return one row after
execution: "UPDATE 7" or "INSERT 0 4".
EXPLAIN (ANALYZE) INSERT/UPDATE/DELETE does the same thing as these
commands, but doesn't output this row. So I added it.


Patch is fixed. There is no row "EXPLAIN" in queries like:
postgres=# explain (analyze) select * from t;
  QUERY PLAN
---
 Seq Scan on t  (cost=0.00..35.50 rows=2550 width=4) (actual
time=0.064..0.075 rows=5 loops=1)
 Planning Time: 1.639 ms
 Execution Time: 0.215 ms
(3 rows)

EXPLAIN


What is about queries EXPLAIN INSERT/UPDATE/DELETE without ANALYZE?
Now it is outputting a row with 0 affected (inserted) rows at the end:
"INSERT 0 0", "UPDATE 0". Example:
explain update a set n = 2;
 QUERY PLAN

 Update on a  (cost=0.00..35.50 rows=0 width=0)
   ->  Seq Scan on a  (cost=0.00..35.50 rows=2550 width=10)
(2 rows)

UPDATE 0

Regards,
Damir Belyalov
Postgres Professional
From c6cbc6fa9ddf24f29bc19ff115224dd76e351db1 Mon Sep 17 00:00:00 2001
From: Damir Belyalov 
Date: Tue, 5 Sep 2023 15:04:01 +0300
Subject: [PATCH 1/2] Output affected rows in EXPLAIN.

---
 src/backend/commands/explain.c | 10 +-
 src/backend/tcop/cmdtag.c  |  2 +-
 src/backend/tcop/pquery.c  |  8 +++-
 src/backend/tcop/utility.c | 27 ++-
 src/bin/psql/common.c  |  5 +++--
 src/include/commands/explain.h |  3 ++-
 src/include/tcop/cmdtag.h  |  1 +
 src/include/tcop/cmdtaglist.h  |  3 +++
 src/interfaces/libpq/fe-exec.c |  4 +++-
 9 files changed, 55 insertions(+), 8 deletions(-)

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 8570b14f62..453e545ba5 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -162,7 +162,7 @@ static void escape_yaml(StringInfo buf, const char *str);
  */
 void
 ExplainQuery(ParseState *pstate, ExplainStmt *stmt,
-			 ParamListInfo params, DestReceiver *dest)
+			 ParamListInfo params, DestReceiver *dest, uint64 *processed)
 {
 	ExplainState *es = NewExplainState();
 	TupOutputState *tstate;
@@ -173,6 +173,9 @@ ExplainQuery(ParseState *pstate, ExplainStmt *stmt,
 	bool		timing_set = false;
 	bool		summary_set = false;
 
+	if (processed)
+		*processed = 0;
+
 	/* Parse options list. */
 	foreach(lc, stmt->options)
 	{
@@ -311,6 +314,9 @@ ExplainQuery(ParseState *pstate, ExplainStmt *stmt,
 	end_tup_output(tstate);
 
 	pfree(es->str->data);
+
+	if (processed)
+		*processed = es->es_processed;
 }
 
 /*
@@ -649,6 +655,8 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
 	 */
 	INSTR_TIME_SET_CURRENT(starttime);
 
+	es->es_processed += queryDesc->estate->es_processed;
+
 	ExecutorEnd(queryDesc);
 
 	FreeQueryDesc(queryDesc);
diff --git a/src/backend/tcop/cmdtag.c b/src/backend/tcop/cmdtag.c
index 4bd713a0b4..9e6fdbd8af 100644
--- a/src/backend/tcop/cmdtag.c
+++ b/src/backend/tcop/cmdtag.c
@@ -146,7 +146,7 @@ BuildQueryCompletionString(char *buff, const QueryCompletion *qc,
 	 */
 	if (command_tag_display_rowcount(tag) && !nameonly)
 	{
-		if (tag == CMDTAG_INSERT)
+		if (tag == CMDTAG_INSERT || tag == CMDTAG_EXPLAIN_INSERT)
 		{
 			*bufp++ = ' ';
 			*bufp++ = '0';
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 5565f200c3..ba0b33cc67 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -775,7 +775,13 @@ PortalRun(Portal portal, long count, bool isTopLevel, bool run_once,
 if (qc && portal->qc.commandTag != CMDTAG_UNKNOWN)
 {
 	CopyQueryCompletion(qc, >qc);
-	qc->nprocessed = nprocessed;
+	if (portal->qc.commandTag == CMDTAG_EXPLAIN ||
+		portal->qc.commandTag == CMDTAG_EXPLAIN_INSERT ||
+		portal->qc.commandTag == CMDTAG_EXPLAIN_UPDATE ||
+		portal->qc.commandTag == CMDTAG_EXPLAIN_DELETE)
+		qc->nprocessed = portal->qc.nprocessed;
+	else
+		qc->nprocessed = nprocessed;
 }
 
 /* Mark portal not active */
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index e3ccf6c7f7..8975d046f9 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -867,7 +867,32 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 

Output affected rows in EXPLAIN

2023-09-06 Thread Damir Belyalov
I create a patch that outputs affected rows in EXPLAIN that occur by
INSERT/UPDATE/DELETE.
Despite the fact that commands in EXPLAIN ANALYZE query are executed as
usual, EXPLAIN doesn't show outputting affected rows as in these commands.
The patch fixes this problem.

Examples:
explain analyze insert into a values (1);
QUERY PLAN

--
 Insert on a  (cost=0.00..0.01 rows=0 width=0) (actual time=0.076..0.077
rows=0 loops=1)
   ->  Result  (cost=0.00..0.01 rows=1 width=4) (actual time=0.002..0.002
rows=1 loops=1)
 Planning Time: 0.025 ms
 Execution Time: 0.412 ms
(4 rows)

INSERT 0 1

  QUERY PLAN

--
 Update on a  (cost=0.00..35.50 rows=0 width=0) (actual time=0.059..0.060
rows=0 loops=1)
   ->  Seq Scan on a  (cost=0.00..35.50 rows=2550 width=10) (actual
time=0.012..0.013 rows=7 loops=1)
 Planning Time: 0.142 ms
 Execution Time: 0.666 ms
(4 rows)

UPDATE 7

explain analyze delete from a where n = 1;
QUERY PLAN

---
 Delete on a  (cost=0.00..41.88 rows=0 width=0) (actual time=0.147..0.147
rows=0 loops=1)
   ->  Seq Scan on a  (cost=0.00..41.88 rows=13 width=6) (actual
time=0.120..0.123 rows=7 loops=1)
 Filter: (n = 1)
 Planning Time: 1.073 ms
 Execution Time: 0.178 ms
(5 rows)

DELETE 7

EXPLAIN queries without ANALYZE don't affect rows, so the output number is
0.

explain update a set n = 2;
 QUERY PLAN

 Update on a  (cost=0.00..35.50 rows=0 width=0)
   ->  Seq Scan on a  (cost=0.00..35.50 rows=2550 width=10)
(2 rows)

UPDATE 0

Maybe there is no need to add this row when EXPLAIN has no ANALYZE. So it
is a discussion question.
Also haven't fixed regress tests yet.

Regards,
Damir Belyalov
Postgres Professional
From c6cbc6fa9ddf24f29bc19ff115224dd76e351db1 Mon Sep 17 00:00:00 2001
From: Damir Belyalov 
Date: Tue, 5 Sep 2023 15:04:01 +0300
Subject: [PATCH] Output affected rows in EXPLAIN.

---
 src/backend/commands/explain.c | 10 +-
 src/backend/tcop/cmdtag.c  |  2 +-
 src/backend/tcop/pquery.c  |  8 +++-
 src/backend/tcop/utility.c | 27 ++-
 src/bin/psql/common.c  |  5 +++--
 src/include/commands/explain.h |  3 ++-
 src/include/tcop/cmdtag.h  |  1 +
 src/include/tcop/cmdtaglist.h  |  3 +++
 src/interfaces/libpq/fe-exec.c |  4 +++-
 9 files changed, 55 insertions(+), 8 deletions(-)

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 8570b14f62..453e545ba5 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -162,7 +162,7 @@ static void escape_yaml(StringInfo buf, const char *str);
  */
 void
 ExplainQuery(ParseState *pstate, ExplainStmt *stmt,
-			 ParamListInfo params, DestReceiver *dest)
+			 ParamListInfo params, DestReceiver *dest, uint64 *processed)
 {
 	ExplainState *es = NewExplainState();
 	TupOutputState *tstate;
@@ -173,6 +173,9 @@ ExplainQuery(ParseState *pstate, ExplainStmt *stmt,
 	bool		timing_set = false;
 	bool		summary_set = false;
 
+	if (processed)
+		*processed = 0;
+
 	/* Parse options list. */
 	foreach(lc, stmt->options)
 	{
@@ -311,6 +314,9 @@ ExplainQuery(ParseState *pstate, ExplainStmt *stmt,
 	end_tup_output(tstate);
 
 	pfree(es->str->data);
+
+	if (processed)
+		*processed = es->es_processed;
 }
 
 /*
@@ -649,6 +655,8 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
 	 */
 	INSTR_TIME_SET_CURRENT(starttime);
 
+	es->es_processed += queryDesc->estate->es_processed;
+
 	ExecutorEnd(queryDesc);
 
 	FreeQueryDesc(queryDesc);
diff --git a/src/backend/tcop/cmdtag.c b/src/backend/tcop/cmdtag.c
index 4bd713a0b4..9e6fdbd8af 100644
--- a/src/backend/tcop/cmdtag.c
+++ b/src/backend/tcop/cmdtag.c
@@ -146,7 +146,7 @@ BuildQueryCompletionString(char *buff, const QueryCompletion *qc,
 	 */
 	if (command_tag_display_rowcount(tag) && !nameonly)
 	{
-		if (tag == CMDTAG_INSERT)
+		if (tag == CMDTAG_INSERT || tag == CMDTAG_EXPLAIN_INSERT)
 		{
 			*bufp++ = ' ';
 			*bufp++ = '0';
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 5565f200c3..ba0b33cc67 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -775,7 +775,13 @@ PortalRun(Portal portal, long count, bool isTopLevel, bool run_once,
 if (qc && portal->qc.commandTag != CMDTAG_UNKNOWN)
 {
 	CopyQueryCompletion(qc, >qc);
-	qc->nprocessed = nprocessed;
+	if (portal->qc.commandTag == CMDTAG_EXPLAIN ||
+		portal->qc.commandTag == CMDTAG_EXPLAIN_INSERT ||
+	

Re: [feature]COPY FROM enable FORCE_NULL/FORCE_NOT_NULL on all columns

2023-07-07 Thread Damir Belyalov
Hello!

The patch does not work for the current version of postgres, it needs to be
updated.
I tested your patch. Everything looks simple and works well.

There is a suggestion to simplify the code: instead of using

if (cstate->opts.force_notnull_all)
{
int i;
for(i = 0; i < num_phys_attrs; i++)
cstate->opt.force_notnull_flags[i] = true;
}

you can use MemSet():

if (cstate->opts.force_notnull_all)
MemSet(cstate->opt.force_notnull_flags, true, num_phys_attrs *
sizeof(bool));

The same for the force_null case.

Regards,
Damir Belyalov,
Postgres Professional


Re: Implement missing join selectivity estimation for range types

2023-07-07 Thread Damir Belyalov
Hello!

Thank you for the patch, very interesting article.
The patch doesn't apply to the current postgres version. Could you please
update it?

Regards,
Damir Belyalov,
Postgres Professional


Re: POC PATCH: copy from ... exceptions to: (was Re: VLDB Features)

2023-03-28 Thread Damir Belyalov
>
> I might misunderstand something, but I believe the v5 patch uses
> copy_generic_opt_list and it does not add IGNORE_DATATYPE_ERRORS as a
> keyword.
> It modifies neither kwlist.h nor gram.y.
>

Sorry, didn't notice that. I think everything is alright now.

Regards,
Damir Belyalov
Postgres Professional


Re: POC PATCH: copy from ... exceptions to: (was Re: VLDB Features)

2023-03-27 Thread Damir Belyalov
Hi!

I made the specified changes and my patch turned out the same as yours. The
performance measurements were the same too.

The only thing left to do is how not to add IGNORE_DATATYPE_ERRORS as a
keyword. See how this is done for parameters such as FORCE_NOT_NULL,
FORCE_NULL, FORCE_QUOTE. They are not in kwlist.h and are not as keywords
in gram.y.

Regards,
Damir Belyalov
Postgres Professional


Re: POC PATCH: copy from ... exceptions to: (was Re: VLDB Features)

2023-03-07 Thread Damir Belyalov
>
> FWIW, Greenplum has a similar construct (but which also logs the errors
> in the
> db) where data type errors are skipped as long as the number of errors
> don't
> exceed a reject limit.  If the reject limit is reached then the COPY
> fails:
> >
> >   LOG ERRORS [ SEGMENT REJECT LIMIT  [ ROWS | PERCENT ]]
> >
> IIRC the gist of this was to catch then the user copies the wrong input
> data or
> plain has a broken file.  Rather than finding out after copying n rows
> which
> are likely to be garbage the process can be restarted.
>

I think this is a matter for discussion. The same question is: "Where to
log errors to separate files or to the system logfile?".
IMO it's better for users to log short-detailed error message to system
logfile and not output errors to the terminal.


This version of the patch has a compiler error in the error message:
>
Yes, corrected it. Changed "ignored_errors" to int64 because "processed"
(used for counting copy rows) is int64.


I felt just logging "Error: %ld" would make people wonder the meaning of
> the %ld. Logging something like ""Error: %ld data type errors were
> found" might be clearer.
>

Thanks. For more clearance change the message to: "Errors were found: %".

Regards, Damir Belyalov
Postgres Professional
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index c25b52d0cb..706b929947 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -34,6 +34,7 @@ COPY { table_name [ ( format_name
 FREEZE [ boolean ]
+IGNORE_DATATYPE_ERRORS [ boolean ]
 DELIMITER 'delimiter_character'
 NULL 'null_string'
 HEADER [ boolean | MATCH ]
@@ -233,6 +234,17 @@ COPY { table_name [ ( 

 
+   
+IGNORE_DATATYPE_ERRORS
+
+ 
+  Drops rows that contain malformed data while copying. These are rows
+  with columns where the data type's input-function raises an error.
+  Outputs warnings about rows with incorrect data to system logfile.
+ 
+
+   
+

 DELIMITER
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index e34f583ea7..0334894014 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -410,6 +410,7 @@ ProcessCopyOptions(ParseState *pstate,
 	bool		format_specified = false;
 	bool		freeze_specified = false;
 	bool		header_specified = false;
+	bool		ignore_datatype_errors_specified= false;
 	ListCell   *option;
 
 	/* Support external use for option sanity checking */
@@ -449,6 +450,13 @@ ProcessCopyOptions(ParseState *pstate,
 			freeze_specified = true;
 			opts_out->freeze = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "ignore_datatype_errors") == 0)
+		{
+			if (ignore_datatype_errors_specified)
+errorConflictingDefElem(defel, pstate);
+			ignore_datatype_errors_specified = true;
+			opts_out->ignore_datatype_errors = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "delimiter") == 0)
 		{
 			if (opts_out->delim)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 29cd1cf4a6..facfc44def 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -949,10 +949,14 @@ CopyFrom(CopyFromState cstate)
 	errcallback.previous = error_context_stack;
 	error_context_stack = 
 
+	if (cstate->opts.ignore_datatype_errors)
+		cstate->ignored_errors = 0;
+
 	for (;;)
 	{
 		TupleTableSlot *myslot;
 		bool		skip_tuple;
+		ErrorSaveContext escontext = {T_ErrorSaveContext};
 
 		CHECK_FOR_INTERRUPTS();
 
@@ -985,9 +989,26 @@ CopyFrom(CopyFromState cstate)
 
 		ExecClearTuple(myslot);
 
+		if (cstate->opts.ignore_datatype_errors)
+		{
+			escontext.details_wanted = true;
+			cstate->escontext = escontext;
+		}
+
 		/* Directly store the values/nulls array in the slot */
 		if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
+		{
+			if (cstate->opts.ignore_datatype_errors && cstate->ignored_errors > 0)
+ereport(WARNING, errmsg("Errors were found: %lld", (long long) cstate->ignored_errors));
 			break;
+		}
+
+		/* Soft error occured, skip this tuple */
+		if (cstate->escontext.error_occurred)
+		{
+			ExecClearTuple(myslot);
+			continue;
+		}
 
 		ExecStoreVirtualTuple(myslot);
 
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 91b564c2bc..9c36b0dc8b 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -70,6 +70,7 @@
 #include "libpq/pqformat.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "nodes/miscnodes.h"
 #include "pgstat.h"
 #include "port/pg_bswap.h"
 #include "utils/builtins.h"
@@ -938,10 +939,23 @@ NextCopyFrom(CopyFromState csta

Re: POC PATCH: copy from ... exceptions to: (was Re: VLDB Features)

2023-02-28 Thread Damir Belyalov
Hello

Tested patch on all cases: CIM_SINGLE, CIM_MULTI, CIM_MULTI_CONDITION. As
expected it works.
Also added a description to copy.sgml and made a review on patch.

I added 'ignored_errors' integer parameter that should be output after the
option is finished.
All errors were added to the system logfile with full detailed context.
Maybe it's better to log only error message.
file:///home/abc13/Documents/todo_copy/postgres/v2-0001-Add-COPY-option-IGNORE_DATATYPE_ERRORS.patch



Regards, Damir Belyalov
Postgres Professional
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index c25b52d0cb..706b929947 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -34,6 +34,7 @@ COPY { table_name [ ( format_name
 FREEZE [ boolean ]
+IGNORE_DATATYPE_ERRORS [ boolean ]
 DELIMITER 'delimiter_character'
 NULL 'null_string'
 HEADER [ boolean | MATCH ]
@@ -233,6 +234,17 @@ COPY { table_name [ ( 

 
+   
+IGNORE_DATATYPE_ERRORS
+
+ 
+  Drops rows that contain malformed data while copying. These are rows
+  with columns where the data type's input-function raises an error.
+  Outputs warnings about rows with incorrect data to system logfile.
+ 
+
+   
+

 DELIMITER
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index e34f583ea7..0334894014 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -410,6 +410,7 @@ ProcessCopyOptions(ParseState *pstate,
 	bool		format_specified = false;
 	bool		freeze_specified = false;
 	bool		header_specified = false;
+	bool		ignore_datatype_errors_specified= false;
 	ListCell   *option;
 
 	/* Support external use for option sanity checking */
@@ -449,6 +450,13 @@ ProcessCopyOptions(ParseState *pstate,
 			freeze_specified = true;
 			opts_out->freeze = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "ignore_datatype_errors") == 0)
+		{
+			if (ignore_datatype_errors_specified)
+errorConflictingDefElem(defel, pstate);
+			ignore_datatype_errors_specified = true;
+			opts_out->ignore_datatype_errors = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "delimiter") == 0)
 		{
 			if (opts_out->delim)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index af52faca6d..ecaa750568 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -955,10 +955,14 @@ CopyFrom(CopyFromState cstate)
 	errcallback.previous = error_context_stack;
 	error_context_stack = 
 
+	if (cstate->opts.ignore_datatype_errors)
+		cstate->ignored_errors = 0;
+
 	for (;;)
 	{
 		TupleTableSlot *myslot;
 		bool		skip_tuple;
+		ErrorSaveContext escontext = {T_ErrorSaveContext};
 
 		CHECK_FOR_INTERRUPTS();
 
@@ -991,9 +995,26 @@ CopyFrom(CopyFromState cstate)
 
 		ExecClearTuple(myslot);
 
+		if (cstate->opts.ignore_datatype_errors)
+		{
+			escontext.details_wanted = true;
+			cstate->escontext = escontext;
+		}
+
 		/* Directly store the values/nulls array in the slot */
 		if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
+		{
+			if (cstate->opts.ignore_datatype_errors && cstate->ignored_errors > 0)
+ereport(WARNING, errmsg("Errors: %ld", cstate->ignored_errors));
 			break;
+		}
+
+		/* Soft error occured, skip this tuple */
+		if (cstate->escontext.error_occurred)
+		{
+			ExecClearTuple(myslot);
+			continue;
+		}
 
 		ExecStoreVirtualTuple(myslot);
 
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 91b564c2bc..9c36b0dc8b 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -70,6 +70,7 @@
 #include "libpq/pqformat.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "nodes/miscnodes.h"
 #include "pgstat.h"
 #include "port/pg_bswap.h"
 #include "utils/builtins.h"
@@ -938,10 +939,23 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 
 			cstate->cur_attname = NameStr(att->attname);
 			cstate->cur_attval = string;
-			values[m] = InputFunctionCall(_functions[m],
-		  string,
-		  typioparams[m],
-		  att->atttypmod);
+
+			/* If IGNORE_DATATYPE_ERRORS is enabled skip rows with datatype errors */
+			if (!InputFunctionCallSafe(_functions[m],
+	   string,
+	   typioparams[m],
+	   att->atttypmod,
+	   (Node *) >escontext,
+	   [m]))
+			{
+cstate->ignored_errors++;
+
+ereport(LOG,
+		errmsg("%s", cstate->escontext.error_data->message));
+
+return true;
+			}
+
 			if (string != NULL)
 nulls[m] = false;
 			cstate->cur_attname = NULL;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index a0138382a1..d79d293c0d 100644
--- a/src/backend/parser/gram.y
++

Re: POC PATCH: copy from ... exceptions to: (was Re: VLDB Features)

2023-02-05 Thread Damir Belyalov
Hi, Andres!

Thank you for reviewing.


> I don't think this is the right approach. Creating a subtransaction for
> each row will cause substantial performance issues.
>

Subtransactions aren't created for each row. The block of rows in one
subtransaction is 1000 (SAFE_BUFFER_SIZE) and can be changed. There is also
a constraint for the number of bytes MAX_SAFE_BUFFER_BYTES in safe_buffer:
 while (sfcstate->saved_tuples < SAFE_BUFFER_SIZE &&
 sfcstate->safeBufferBytes < MAX_SAFE_BUFFER_BYTES)



We now can call data type input functions without throwing errors, see
> InputFunctionCallSafe(). Use that to avoid throwing an error instead of
> catching it.
>
InputFunctionCallSafe() is good for detecting errors from input-functions
but there are such errors from NextCopyFrom () that can not be detected
with InputFunctionCallSafe(), e.g. "wrong number of columns in row''. Do
you offer to process input-function errors separately from other errors?
Now all errors are processed in one "switch" loop in PG_CATCH, so this
change can complicate code.



Regards,
Damir Belyalov
Postgres Professional


Re: POC PATCH: copy from ... exceptions to: (was Re: VLDB Features)

2023-02-03 Thread Damir Belyalov
Hi, Danil and Nikita!
Thank you for reviewing.

Why is there no static keyword in the definition of the SafeCopying()
> function, but it is in the function declaration.
>
Correct this.

675: MemoryContext cxt =
> MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
> 676:
> 677: valid_row = NextCopyFrom(cstate, econtext, myslot->tts_values,
> myslot->tts_isnull);
> 678: tuple_is_valid = valid_row;
> 679:
> 680: if (valid_row)
> 681: sfcstate->safeBufferBytes += cstate->line_buf.len;
> 682:
> 683: CurrentMemoryContext = cxt;
>
> Why are you using a direct assignment to CurrentMemoryContext instead of
> using the MemoryContextSwitchTo function in the SafeCopying() routine?
>

"CurrentMemoryContext = cxt" is the same as "MemoryContextSwitchTo(cxt)",
you can see it in the implementation of MemoryContextSwitchTo(). Also
correct this.


When you initialize the cstate->sfcstate structure, you create a
> cstate->sfcstate->safe_cxt memory context that inherits from oldcontext.
> Was it intended to use cstate->copycontext as the parent context here?
>

Good remark, correct this.



Thanks Nikita Malakhov for advice to create file with errors. But I decided
to to log errors in the system logfile and don't print them to the
terminal. The error's output in logfile is rather simple - only error
context logs (maybe it's better to log all error information?).
There are 2 points why logging errors in logfile is better than logging
errors in another file (e.g. PGDATA/copy_ignore_errors.txt). The user is
used to looking for errors in logfile. Creating another file entails
problems like: 'what file name to create?', 'do we need to make file
rotation?', 'where does this file create?' (we can't create it in PGDATA
cause of memory constraints)



Regards,
Damir Belyalov
Postgres Professional
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index c25b52d0cb..50151aec54 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -34,6 +34,7 @@ COPY { table_name [ ( format_name
 FREEZE [ boolean ]
+IGNORE_ERRORS [ boolean ]
 DELIMITER 'delimiter_character'
 NULL 'null_string'
 HEADER [ boolean | MATCH ]
@@ -233,6 +234,18 @@ COPY { table_name [ ( 

 
+   
+IGNORE_ERRORS
+
+ 
+  Drops rows that contain malformed data while copying. These are rows
+  containing syntax errors in data, rows with too many or too few columns,
+  rows containing columns where the data type's input function raises an error.
+  Logs errors to system logfile and outputs the total number of errors.
+ 
+
+   
+

 DELIMITER
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index e34f583ea7..e741ce3e5a 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -407,6 +407,7 @@ ProcessCopyOptions(ParseState *pstate,
    bool is_from,
    List *options)
 {
+	bool		ignore_errors_specified = false;
 	bool		format_specified = false;
 	bool		freeze_specified = false;
 	bool		header_specified = false;
@@ -449,6 +450,13 @@ ProcessCopyOptions(ParseState *pstate,
 			freeze_specified = true;
 			opts_out->freeze = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "ignore_errors") == 0)
+		{
+			if (ignore_errors_specified)
+errorConflictingDefElem(defel, pstate);
+			ignore_errors_specified = true;
+			opts_out->ignore_errors = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "delimiter") == 0)
 		{
 			if (opts_out->delim)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index af52faca6d..657fa44e0b 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -107,6 +107,9 @@ static char *limit_printout_length(const char *str);
 
 static void ClosePipeFromProgram(CopyFromState cstate);
 
+static bool SafeCopying(CopyFromState cstate, ExprContext *econtext,
+		TupleTableSlot *myslot);
+
 /*
  * error context callback for COPY FROM
  *
@@ -625,6 +628,173 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	miinfo->bufferedBytes += tuplen;
 }
 
+/*
+ * Safely reads source data, converts to a tuple and fills tuple buffer.
+ * Skips some data in the case of failed conversion if data source for
+ * a next tuple can be surely read without a danger.
+ */
+static bool
+SafeCopying(CopyFromState cstate, ExprContext *econtext, TupleTableSlot *myslot)
+{
+	SafeCopyFromState  *sfcstate = cstate->sfcstate;
+	bool valid_row = true;
+
+	/* Standard COPY if IGNORE_ERRORS is disabled */
+	if (!cstate->sfcstate)
+		/* Directly stores the values/nulls array in the slot */
+		return NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull);
+
+	if (sfcstate->replayed_tuples < sfcstate->saved_tuples)
+	{
+		Assert(sfcstate->saved_tuple

Re: POC PATCH: copy from ... exceptions to: (was Re: VLDB Features)

2022-11-02 Thread Damir Belyalov
Updated the patch:
- Optimized and simplified logic of IGNORE_ERRORS
- Changed variable names to more understandable ones
- Added an analogue of MAX_BUFFERED_BYTES for safe buffer


Regards,
Damir Belyalov
Postgres Professional

>
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index c25b52d0cb..22c992e6f6 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -34,6 +34,7 @@ COPY { table_name [ ( format_name
 FREEZE [ boolean ]
+IGNORE_ERRORS [ boolean ]
 DELIMITER 'delimiter_character'
 NULL 'null_string'
 HEADER [ boolean | MATCH ]
@@ -233,6 +234,19 @@ COPY { table_name [ ( 

 
+   
+IGNORE_ERRORS
+
+ 
+  Drops rows that contain malformed data while copying. These are rows
+  containing syntax errors in data, rows with too many or too few columns,
+  rows containing columns where the data type's input function raises an error.
+  Outputs warnings about rows with incorrect data (the number of warnings
+  is not more than 100) and the total number of errors.
+ 
+
+   
+

 DELIMITER
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index db4c9dbc23..d04753a4c8 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -406,6 +406,7 @@ ProcessCopyOptions(ParseState *pstate,
    bool is_from,
    List *options)
 {
+	bool		ignore_errors_specified = false;
 	bool		format_specified = false;
 	bool		freeze_specified = false;
 	bool		header_specified = false;
@@ -448,6 +449,13 @@ ProcessCopyOptions(ParseState *pstate,
 			freeze_specified = true;
 			opts_out->freeze = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "ignore_errors") == 0)
+		{
+			if (ignore_errors_specified)
+errorConflictingDefElem(defel, pstate);
+			ignore_errors_specified = true;
+			opts_out->ignore_errors = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "delimiter") == 0)
 		{
 			if (opts_out->delim)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index a079c70152..846eac022d 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -107,6 +107,9 @@ static char *limit_printout_length(const char *str);
 
 static void ClosePipeFromProgram(CopyFromState cstate);
 
+static bool SafeCopying(CopyFromState cstate, ExprContext *econtext,
+			 TupleTableSlot *myslot);
+
 /*
  * error context callback for COPY FROM
  *
@@ -625,6 +628,175 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	miinfo->bufferedBytes += tuplen;
 }
 
+/*
+ * Safely reads source data, converts to a tuple and fills tuple buffer.
+ * Skips some data in the case of failed conversion if data source for
+ * a next tuple can be surely read without a danger.
+ */
+bool
+SafeCopying(CopyFromState cstate, ExprContext *econtext, TupleTableSlot *myslot)
+{
+	SafeCopyFromState  *sfcstate = cstate->sfcstate;
+	bool valid_row = true;
+
+	/* Standard COPY if IGNORE_ERRORS is disabled */
+	if (!cstate->sfcstate)
+		/* Directly stores the values/nulls array in the slot */
+		return NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull);
+
+	if (sfcstate->replayed_tuples < sfcstate->saved_tuples)
+	{
+		Assert(sfcstate->saved_tuples > 0);
+
+		/* Prepare to replay the tuple */
+		heap_deform_tuple(sfcstate->safe_buffer[sfcstate->replayed_tuples++], RelationGetDescr(cstate->rel),
+		  myslot->tts_values, myslot->tts_isnull);
+		return true;
+	}
+	else
+	{
+		/* All tuples from buffer were replayed, clean it up */
+		MemoryContextReset(sfcstate->safe_cxt);
+
+		sfcstate->saved_tuples = sfcstate->replayed_tuples = 0;
+		sfcstate->safeBufferBytes = 0;
+	}
+
+	BeginInternalSubTransaction(NULL);
+	CurrentResourceOwner = sfcstate->oldowner;
+
+	while (sfcstate->saved_tuples < SAFE_BUFFER_SIZE &&
+		   sfcstate->safeBufferBytes < MAX_SAFE_BUFFER_BYTES)
+	{
+		bool	tuple_is_valid = true;
+
+		PG_TRY();
+		{
+			MemoryContext cxt = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
+
+			valid_row = NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull);
+			tuple_is_valid = valid_row;
+
+			if (valid_row)
+sfcstate->safeBufferBytes += cstate->line_buf.len;
+
+			CurrentMemoryContext = cxt;
+		}
+		PG_CATCH();
+		{
+			MemoryContext ecxt = MemoryContextSwitchTo(sfcstate->oldcontext);
+			ErrorData 	 *errdata = CopyErrorData();
+
+			tuple_is_valid = false;
+
+			Assert(IsSubTransaction());
+
+			RollbackAndReleaseCurrentSubTransaction();
+			CurrentResourceOwner = sfcstate->oldowner;
+
+			switch (errdata->sqlerrcode)
+			{
+/* Ignore data exceptions */
+case ERRCODE_CHARACTER_NOT_IN_REPERTOIRE:
+case ERRCODE_DATA_EXCEPTION:
+case ERRCODE_ARRAY_ELEMENT_ERROR:
+case ERRCODE_DATETIME_VALUE_OUT_OF_RANGE:
+

Re: POC PATCH: copy from ... exceptions to: (was Re: VLDB Features)

2022-10-17 Thread Damir Belyalov
Updated the patch due to conflicts when applying to master.

>
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index c25b52d0cb..22c992e6f6 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -34,6 +34,7 @@ COPY { table_name [ ( format_name
 FREEZE [ boolean ]
+IGNORE_ERRORS [ boolean ]
 DELIMITER 'delimiter_character'
 NULL 'null_string'
 HEADER [ boolean | MATCH ]
@@ -233,6 +234,19 @@ COPY { table_name [ ( 

 
+   
+IGNORE_ERRORS
+
+ 
+  Drops rows that contain malformed data while copying. These are rows
+  containing syntax errors in data, rows with too many or too few columns,
+  rows containing columns where the data type's input function raises an error.
+  Outputs warnings about rows with incorrect data (the number of warnings
+  is not more than 100) and the total number of errors.
+ 
+
+   
+

 DELIMITER
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index db4c9dbc23..d04753a4c8 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -406,6 +406,7 @@ ProcessCopyOptions(ParseState *pstate,
    bool is_from,
    List *options)
 {
+	bool		ignore_errors_specified = false;
 	bool		format_specified = false;
 	bool		freeze_specified = false;
 	bool		header_specified = false;
@@ -448,6 +449,13 @@ ProcessCopyOptions(ParseState *pstate,
 			freeze_specified = true;
 			opts_out->freeze = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "ignore_errors") == 0)
+		{
+			if (ignore_errors_specified)
+errorConflictingDefElem(defel, pstate);
+			ignore_errors_specified = true;
+			opts_out->ignore_errors = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "delimiter") == 0)
 		{
 			if (opts_out->delim)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index a079c70152..fa169d2cf4 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -107,6 +107,9 @@ static char *limit_printout_length(const char *str);
 
 static void ClosePipeFromProgram(CopyFromState cstate);
 
+static bool FillReplayBuffer(CopyFromState cstate, ExprContext *econtext,
+			 TupleTableSlot *myslot);
+
 /*
  * error context callback for COPY FROM
  *
@@ -625,6 +628,177 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	miinfo->bufferedBytes += tuplen;
 }
 
+/*
+ * Fills replay_buffer for safe copying, enables by IGNORE_ERRORS option.
+ */
+bool
+FillReplayBuffer(CopyFromState cstate, ExprContext *econtext, TupleTableSlot *myslot)
+{
+	SafeCopyFromState  *sfcstate = cstate->sfcstate;
+	bool valid_row = true;
+
+	if (!sfcstate->replay_is_active)
+	{
+		BeginInternalSubTransaction(NULL);
+		CurrentResourceOwner = sfcstate->oldowner;
+
+		while (sfcstate->saved_tuples < REPLAY_BUFFER_SIZE)
+		{
+			bool tuple_is_valid = false;
+
+			PG_TRY();
+			{
+MemoryContext cxt = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
+
+valid_row = NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull);
+
+if (valid_row)
+	tuple_is_valid = true;
+
+CurrentMemoryContext = cxt;
+			}
+			PG_CATCH();
+			{
+MemoryContext ecxt = MemoryContextSwitchTo(sfcstate->oldcontext);
+ErrorData *errdata = CopyErrorData();
+
+Assert(IsSubTransaction());
+RollbackAndReleaseCurrentSubTransaction();
+CurrentResourceOwner = sfcstate->oldowner;
+
+switch (errdata->sqlerrcode)
+{
+	/* Ignore data exceptions */
+	case ERRCODE_CHARACTER_NOT_IN_REPERTOIRE:
+	case ERRCODE_DATA_EXCEPTION:
+	case ERRCODE_ARRAY_ELEMENT_ERROR:
+	case ERRCODE_DATETIME_VALUE_OUT_OF_RANGE:
+	case ERRCODE_INTERVAL_FIELD_OVERFLOW:
+	case ERRCODE_INVALID_CHARACTER_VALUE_FOR_CAST:
+	case ERRCODE_INVALID_DATETIME_FORMAT:
+	case ERRCODE_INVALID_ESCAPE_CHARACTER:
+	case ERRCODE_INVALID_ESCAPE_SEQUENCE:
+	case ERRCODE_NONSTANDARD_USE_OF_ESCAPE_CHARACTER:
+	case ERRCODE_INVALID_PARAMETER_VALUE:
+	case ERRCODE_INVALID_TABLESAMPLE_ARGUMENT:
+	case ERRCODE_INVALID_TIME_ZONE_DISPLACEMENT_VALUE:
+	case ERRCODE_NULL_VALUE_NOT_ALLOWED:
+	case ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE:
+	case ERRCODE_SEQUENCE_GENERATOR_LIMIT_EXCEEDED:
+	case ERRCODE_STRING_DATA_LENGTH_MISMATCH:
+	case ERRCODE_STRING_DATA_RIGHT_TRUNCATION:
+	case ERRCODE_INVALID_TEXT_REPRESENTATION:
+	case ERRCODE_INVALID_BINARY_REPRESENTATION:
+	case ERRCODE_BAD_COPY_FILE_FORMAT:
+	case ERRCODE_UNTRANSLATABLE_CHARACTER:
+	case ERRCODE_DUPLICATE_JSON_OBJECT_KEY_VALUE:
+	case ERRCODE_INVALID_ARGUMENT_FOR_SQL_JSON_DATETIME_FUNCTION:
+	case ERRCODE_INVALID_JSON_TEXT:
+	case ERRCODE_INVALID_SQL_JSON_SUBSCRIPT:
+	case ERRCODE_MORE_THAN_ONE_SQL_JSON_ITEM:
+	case ERRCODE_NO_SQL_JSON_ITEM:
+	case ERRCODE_NON_NUMERIC_SQL_JSON_ITEM:
+	case ERRCODE_NON_UNIQUE_KEYS_IN_A_JSON_OBJECT:
+	case 

Re: POC PATCH: copy from ... exceptions to: (was Re: VLDB Features)

2022-09-29 Thread Damir Belyalov
>
> Do you mean you stop dealing with errors concerned with constraints and
> triggers and we should review 0006-COPY_IGNORE_ERRORS?
>
Yes, this patch is simpler and I think it's worth adding it first.


> Hmm, I applied v6 patch and when canceled COPY by sending SIGINT(ctrl +
> C), I faced the same situation as below.
> I tested it on CentOS 8.4.
>
Thank you for pointing out this error. it really needs to be taken into
account. In the previous  0006 patch, there were 2 stages in one
subtransaction - filling the buffer and 'replay mode' (reading from the
buffer). Since only NextCopyFrom() is included in PG_TRY(), and the
ERRCODE_QUERY_CANCELED error can occur anywhere, it is impossible to catch
it and rollback the subtransaction.

I changed the 0006 patch and fixed this error and now only the 'replay
buffer filling' is made in the subtransaction.

Patch 0005 (that processed constraints) needs to be finalized, because it
requires subtransactions to rollback constraints, triggers. Therefore, it
is not possible to fix it yet. There is a decision to put for(;;) loop in
PG_TRY. It will solve the problem, but the code will be too complicated.
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index c25b52d0cb..22c992e6f6 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -34,6 +34,7 @@ COPY { table_name [ ( format_name
 FREEZE [ boolean ]
+IGNORE_ERRORS [ boolean ]
 DELIMITER 'delimiter_character'
 NULL 'null_string'
 HEADER [ boolean | MATCH ]
@@ -233,6 +234,19 @@ COPY { table_name [ ( 

 
+   
+IGNORE_ERRORS
+
+ 
+  Drops rows that contain malformed data while copying. These are rows
+  containing syntax errors in data, rows with too many or too few columns,
+  rows containing columns where the data type's input function raises an error.
+  Outputs warnings about rows with incorrect data (the number of warnings
+  is not more than 100) and the total number of errors.
+ 
+
+   
+

 DELIMITER
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 49924e476a..f41b25f26a 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -406,6 +406,7 @@ ProcessCopyOptions(ParseState *pstate,
    bool is_from,
    List *options)
 {
+	bool		ignore_errors_specified = false;
 	bool		format_specified = false;
 	bool		freeze_specified = false;
 	bool		header_specified = false;
@@ -448,6 +449,13 @@ ProcessCopyOptions(ParseState *pstate,
 			freeze_specified = true;
 			opts_out->freeze = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "ignore_errors") == 0)
+		{
+			if (ignore_errors_specified)
+errorConflictingDefElem(defel, pstate);
+			ignore_errors_specified = true;
+			opts_out->ignore_errors = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "delimiter") == 0)
 		{
 			if (opts_out->delim)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index e8bb168aea..caa3375758 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -106,6 +106,9 @@ static char *limit_printout_length(const char *str);
 
 static void ClosePipeFromProgram(CopyFromState cstate);
 
+static bool FillReplayBuffer(CopyFromState cstate, ExprContext *econtext,
+			 TupleTableSlot *myslot);
+
 /*
  * error context callback for COPY FROM
  *
@@ -521,6 +524,177 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	miinfo->bufferedBytes += tuplen;
 }
 
+/*
+ * Fills replay_buffer for safe copying, enables by IGNORE_ERRORS option.
+ */
+bool
+FillReplayBuffer(CopyFromState cstate, ExprContext *econtext, TupleTableSlot *myslot)
+{
+	SafeCopyFromState  *sfcstate = cstate->sfcstate;
+	bool valid_row = true;
+
+	if (!sfcstate->replay_is_active)
+	{
+		BeginInternalSubTransaction(NULL);
+		CurrentResourceOwner = sfcstate->oldowner;
+
+		while (sfcstate->saved_tuples < REPLAY_BUFFER_SIZE)
+		{
+			bool tuple_is_valid = false;
+
+			PG_TRY();
+			{
+MemoryContext cxt = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
+
+valid_row = NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull);
+
+if (valid_row)
+	tuple_is_valid = true;
+
+CurrentMemoryContext = cxt;
+			}
+			PG_CATCH();
+			{
+MemoryContext ecxt = MemoryContextSwitchTo(sfcstate->oldcontext);
+ErrorData *errdata = CopyErrorData();
+
+Assert(IsSubTransaction());
+RollbackAndReleaseCurrentSubTransaction();
+CurrentResourceOwner = sfcstate->oldowner;
+
+switch (errdata->sqlerrcode)
+{
+	/* Ignore data exceptions */
+	case ERRCODE_CHARACTER_NOT_IN_REPERTOIRE:
+	case ERRCODE_DATA_EXCEPTION:
+	case ERRCODE_ARRAY_ELEMENT_ERROR:
+	case ERRCODE_DATETIME_VALUE_OUT_OF_RANGE:
+	case ERRCODE_INTERVAL_FIELD_OVERFLOW:
+	case ERRCODE_INVALID_CHARACTER_VALUE_FOR_CAST:
+	case ERRCODE_INVALID_DATETIME_FORMAT:
+	case 

Re: POC PATCH: copy from ... exceptions to: (was Re: VLDB Features)

2022-09-21 Thread Damir Belyalov
Thank you for reviewing.
In the previous patch there was an error when processing constraints. The
patch was fixed, but the code grew up and became more complicated
(0005-COPY_IGNORE_ERRORS). I also simplified the logic of
safeNextCopyFrom().
You asked why we need subtransactions, so the answer is in this patch. When
processing a row that does not satisfy constraints or INSTEAD OF triggers,
it is necessary to rollback the subtransaction and return the table to its
original state.
Cause of complexity, I had to abandon the constraints, triggers processing
in and handle only errors that occur when reading the file. Attaching
simplified patch (0006-COPY_IGNORE_ERRORS).
Checked out these patches on all COPY regress tests and it worked correctly.


> BTW in v4 patch, data are loaded into the buffer one by one, and when
> the buffer fills up, the data in the buffer are 'replayed' also one by
> one, right?
> Wouldn't this have more overhead than a normal COPY?
>
The data is loaded into the buffer one by one, but in the "replay mode"
ignore_errors works as standard COPY. Tuples add to the slot and depending
on the type of the slot (CIM_SINGLE or CIM_MULTI) tuples are replayed in
the corresponding case.
For the 0006 patch you can imagine that we divide the loop for(;;) in 2
parts. The first part is adding tuples to the buffer and the second part is
inserting tuples to the table. These parts don't intersect with each other
and are completed sequentially.
The main idea of replay_buffer is that it is needed for the CIM_SINGLE
case. You can implement the CIM_SINGLE case and see that tuples before an
error occurring don't add to the table. Logic of the 0005 patch is similar
but with some differences.

As a test, I COPYed slightly larger data with and without ignore_errors
> option.
> There might be other reasons, but I found a performance difference.

Tried to reduce performance difference with cleaning up replay_buffer with
resetting the new context for replay_buffer - replay_cxt.
```
Before:
Without ignore_errors:
COPY 1000
Time: 15538,579 ms (00:15,539)
With ignore_errors:
COPY 1000
Time: 21289,121 ms (00:21,289)

After:
Without ignore_errors:
COPY 1000
Time: 15318,922 ms (00:15,319)
With ignore_errors:
COPY 1000
Time: 19868,175 ms (00:19,868)
```

 - Put in the documentation that the warnings will not be output for more
> than 101 cases.
>
Yeah, I point it out in the doc.


> I applied v4 patch and when canceled the COPY, there was a case I found
> myself left in a transaction.
> Should this situation be prevented from occurring?
>
> ```
> =# copy test from '/tmp/1000.data' with (ignore_errors );
>
> ^CCancel request sent
> ERROR:  canceling statement due to user request
>
> =# truncate test;
> ERROR:  current transaction is aborted, commands ignored until end of
> transaction block
> ```
>
Tried to implement your error and could not. The result was the same as
COPY FROM implements.
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index c25b52d0cb..c99adabcc9 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -34,6 +34,7 @@ COPY { table_name [ ( format_name
 FREEZE [ boolean ]
+IGNORE_ERRORS [ boolean ]
 DELIMITER 'delimiter_character'
 NULL 'null_string'
 HEADER [ boolean | MATCH ]
@@ -233,6 +234,20 @@ COPY { table_name [ ( 

 
+   
+IGNORE_ERRORS
+
+ 
+  Drops rows that contain malformed data while copying. These are rows
+  containing syntax errors in data, rows with too many or too few columns,
+  rows that result in constraint violations, rows containing columns where
+  the data type's input function raises an error.
+  Outputs warnings about rows with incorrect data (the number of warnings
+  is not more than 100) and the total number of errors.
+ 
+
+   
+

 DELIMITER
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 49924e476a..f41b25f26a 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -406,6 +406,7 @@ ProcessCopyOptions(ParseState *pstate,
    bool is_from,
    List *options)
 {
+	bool		ignore_errors_specified = false;
 	bool		format_specified = false;
 	bool		freeze_specified = false;
 	bool		header_specified = false;
@@ -448,6 +449,13 @@ ProcessCopyOptions(ParseState *pstate,
 			freeze_specified = true;
 			opts_out->freeze = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "ignore_errors") == 0)
+		{
+			if (ignore_errors_specified)
+errorConflictingDefElem(defel, pstate);
+			ignore_errors_specified = true;
+			opts_out->ignore_errors = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "delimiter") == 0)
 		{
 			if (opts_out->delim)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index e8bb168aea..6474d47d29 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -106,6 +106,12 @@ static char 

Re: POC PATCH: copy from ... exceptions to: (was Re: VLDB Features)

2022-08-24 Thread Damir Belyalov
>
From 09befdad45a6b1ae70d6c5abc90d1c2296e56ee1 Mon Sep 17 00:00:00 2001
From: Damir Belyalov 
Date: Fri, 15 Oct 2021 11:55:18 +0300
Subject: [PATCH] COPY_IGNORE_ERRORS with GUC for replay_buffer size

---
 doc/src/sgml/config.sgml  |  17 ++
 doc/src/sgml/ref/copy.sgml|  19 ++
 src/backend/commands/copy.c   |   8 +
 src/backend/commands/copyfrom.c   | 114 +++-
 src/backend/commands/copyfromparse.c  | 169 ++
 src/backend/parser/gram.y |   8 +-
 src/backend/utils/misc/guc.c  |  11 ++
 src/backend/utils/misc/postgresql.conf.sample |   2 +
 src/bin/psql/tab-complete.c   |   3 +-
 src/include/commands/copy.h   |   6 +
 src/include/commands/copyfrom_internal.h  |  19 ++
 src/include/parser/kwlist.h   |   1 +
 src/test/regress/expected/copy2.out   | 130 ++
 src/test/regress/sql/copy2.sql| 116 
 14 files changed, 617 insertions(+), 6 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 37fd80388c..69373b8d8c 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1961,6 +1961,23 @@ include_dir 'conf.d'
   
  
 
+ 
+  replay_buffer_size (integer)
+  
+   replay_buffer_size configuration parameter
+  
+  
+  
+   
+Specifies the size of buffer for COPY FROM IGNORE_ERRORS option. This buffer
+is created when subtransaction begins and accumulates tuples until an error
+occurs. Then it starts replaying stored tuples. the buffer size is the size
+of the subtransaction. Therefore, on large tables, in order to avoid the
+error of the maximum number of subtransactions, it should be increased.
+   
+  
+ 
+
  
   max_stack_depth (integer)
   
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 8aae711b3b..7ff6f6dea7 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -34,6 +34,7 @@ COPY { table_name [ ( format_name
 FREEZE [ boolean ]
+IGNORE_ERRORS [ boolean ]
 DELIMITER 'delimiter_character'
 NULL 'null_string'
 HEADER [ boolean | MATCH ]
@@ -233,6 +234,24 @@ COPY { table_name [ ( 

 
+   
+IGNORE_ERRORS
+
+ 
+  Drop rows that contain malformed data while copying. These are rows
+  containing syntax errors in data, rows with too many or too few columns,
+  rows that result in constraint violations, rows containing columns where
+  the data type's input function raises an error.
+  Outputs warnings about rows with incorrect data (the number of warnings
+  is not more than 100) and the total number of errors.
+  The option is implemented with subtransactions and a buffer created in
+  them, which accumulates tuples until an error occures.
+  The size of buffer is the size of subtransaction block.
+  It is a GUC parameter "replay_buffer_size" and equals 1000 by default.
+ 
+
+   
+

 DELIMITER
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 3ac731803b..fead1aba46 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -402,6 +402,7 @@ ProcessCopyOptions(ParseState *pstate,
 {
 	bool		format_specified = false;
 	bool		freeze_specified = false;
+	bool		ignore_errors_specified = false;
 	bool		header_specified = false;
 	ListCell   *option;
 
@@ -442,6 +443,13 @@ ProcessCopyOptions(ParseState *pstate,
 			freeze_specified = true;
 			opts_out->freeze = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "ignore_errors") == 0)
+		{
+			if (ignore_errors_specified)
+errorConflictingDefElem(defel, pstate);
+			ignore_errors_specified = true;
+			opts_out->ignore_errors = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "delimiter") == 0)
 		{
 			if (opts_out->delim)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index a976008b3d..7e997d15c6 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -73,6 +73,11 @@
 /* Trim the list of buffers back down to this number after flushing */
 #define MAX_PARTITION_BUFFERS	32
 
+/*
+ * GUC parameters
+ */
+int		replay_buffer_size;
+
 /* Stores multi-insert data related to a single relation in CopyFrom. */
 typedef struct CopyMultiInsertBuffer
 {
@@ -100,12 +105,13 @@ typedef struct CopyMultiInsertInfo
 	int			ti_options;		/* table insert options */
 } CopyMultiInsertInfo;
 
-
 /* non-export function prototypes */
 static char *limit_printout_length(const char *str);
 
 static void ClosePipeFromProgram(CopyFromState cstate);
 
+static void safeExecConstraints(CopyFromState cstate, ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, EState *estate);
+
 /*
  * error cont

Re: POC PATCH: copy from ... exceptions to: (was Re: VLDB Features)

2022-08-24 Thread Damir Belyalov
>
> > +   /* Buffer was filled, commit subtransaction and prepare
> to replay */
> > +   ReleaseCurrentSubTransaction();
> What is actually being committed by this ReleaseCurrentSubTransaction()?
> It seems to me that just safecstate->replay_buffer is fulfilled before
> this commit.
>
All tuples are collected in replay_buffer, which is created in
''oldcontext'' of CopyFrom() (not context of a subtransaction). That's why
it didn't clean up when you used RollbackAndReleaseCurrentSubTransaction().
Subtransactions are needed for better safety. There is no error when
copying from a file to the replay_buffer, but an error may occur at the
next stage - when adding a tuple to the table. Also there may be other
errors that are not obvious at first glance.

I feel it might be better to have it as a parameter rather than fixed at
> 1000.
>
Yes, I thought about it too. So I created another version with the GUC
parameter - replay_buffer_size. Attached it. But I think users won't need
to change replay_buffer_size.
Also replay_buffer does the same thing as MultiInsert buffer does and
MultiInsert buffer defined by const = 1000.

NextCopyFrom() is in copyfromparse.c while safeNextCopyFrom() is in
> copyfrom.c.
> Since safeNextCopyFrom() is analog of NextCopyFrom() as commented, would
> it be natural to put them in the same file?
>
Sure, corrected it.


> > 188 +   safecstate->errors++;
> > 189 +   if (safecstate->errors <= 100)
> > 190 +   ereport(WARNING,
> > 191 +   (errcode(errdata->sqlerrcode),
> > 192 +   errmsg("%s", errdata->context)));
>
> It would be better to state in the manual that errors exceeding 100 are
> not displayed.
> Or, it might be acceptable to output all errors that exceed 100.
>
It'll be too complicated to create a new parameter just for showing the
given number of errors. I think 100 is an optimal size.


> > +typedef struct SafeCopyFromState
> > +{
> > +#defineREPLAY_BUFFER_SIZE 1000
> > +   HeapTuple   replay_buffer[REPLAY_BUFFER_SIZE]; /* accumulates
> > tuples for replaying it after an error */
> > +   int saved_tuples;   /* # of tuples in
> > replay_buffer */
> > +   int replayed_tuples;/* # of tuples was
> > replayed from buffer */
> > +   int errors; /* total # of errors */
> > +   boolreplay_is_active;
> > +   boolbegin_subtransaction;
> > +   boolprocessed_remaining_tuples; /* for case of
> > replaying last tuples */
> > +   boolskip_row;
>
> It would be helpful to add comments about skip_row, etc.
>
Corrected it.


> WARNING:  FIND 0 ERRORS
> When there were no errors, this WARNING seems not necessary.
>
Corrected it.

Add to this patch processing other errors and constraints and tests for
them.
I had to create another function safeExecConstraints() only for processing
constraints, because ExecConstraints() is after NextCopyFrom() and is not
in PG_TRY. This thing a little bit complicated the code.
Maybe it is a good approach to create a new function SafeCopyFrom() and do
all ''safe copying'' in PG_TRY, but it will almost duplicate the CopyFrom()
code.
Or maybe create a function only for loop for(;;). But we have the same
thing with duplicating code and passing a lot of variables (which are
created at the beginning of CopyFrom()) to this function.


Re: POC PATCH: copy from ... exceptions to: (was Re: VLDB Features)

2022-08-15 Thread Damir Belyalov
>
>
> Thank you for feedback.
I improved my patch recently and tested it on different sizes of
MAX_BUFFERED_TUPLES and REPLAY_BUFFER_SIZE.

> I loaded 1 rows which contained 1 wrong row.
> I expected I could see  rows after COPY, but just saw 999 rows.
Also I implemented your case and it worked correctly.

> BTW I may be overlooking it, but have you submit this proposal to the
next CommitFest?
Good idea. Haven't done it yet.

Regards,
Damir
Postgres Professional
From fa6b99c129eb890b25f006bb7891a247c8a431a7 Mon Sep 17 00:00:00 2001
From: Damir Belyalov 
Date: Fri, 15 Oct 2021 11:55:18 +0300
Subject: [PATCH] COPY_IGNORE_ERRORS without GUC with function
 safeNextCopyFrom() with struct SafeCopyFromState with refactoring

---
 doc/src/sgml/ref/copy.sgml   |  13 ++
 src/backend/commands/copy.c  |   8 ++
 src/backend/commands/copyfrom.c  | 162 ++-
 src/backend/parser/gram.y|   8 +-
 src/bin/psql/tab-complete.c  |   3 +-
 src/include/commands/copy.h  |   1 +
 src/include/commands/copyfrom_internal.h |  21 +++
 src/include/parser/kwlist.h  |   1 +
 src/test/regress/expected/copy2.out  | 123 +
 src/test/regress/sql/copy2.sql   | 110 +++
 10 files changed, 445 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 8aae711b3b..7d20b1649e 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -34,6 +34,7 @@ COPY { table_name [ ( format_name
 FREEZE [ boolean ]
+IGNORE_ERRORS [ boolean ]
 DELIMITER 'delimiter_character'
 NULL 'null_string'
 HEADER [ boolean | MATCH ]
@@ -233,6 +234,18 @@ COPY { table_name [ ( 

 
+   
+IGNORE_ERRORS
+
+ 
+  Drop rows that contain malformed data while copying. That is rows
+  containing syntax errors in data, rows with too many or too few columns,
+  rows that result in constraint violations, rows containing columns where
+  the data type's input function raises an error.
+ 
+
+   
+

 DELIMITER
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 3ac731803b..fead1aba46 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -402,6 +402,7 @@ ProcessCopyOptions(ParseState *pstate,
 {
 	bool		format_specified = false;
 	bool		freeze_specified = false;
+	bool		ignore_errors_specified = false;
 	bool		header_specified = false;
 	ListCell   *option;
 
@@ -442,6 +443,13 @@ ProcessCopyOptions(ParseState *pstate,
 			freeze_specified = true;
 			opts_out->freeze = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "ignore_errors") == 0)
+		{
+			if (ignore_errors_specified)
+errorConflictingDefElem(defel, pstate);
+			ignore_errors_specified = true;
+			opts_out->ignore_errors = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "delimiter") == 0)
 		{
 			if (opts_out->delim)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index a976008b3d..285c491ddd 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -106,6 +106,9 @@ static char *limit_printout_length(const char *str);
 
 static void ClosePipeFromProgram(CopyFromState cstate);
 
+static bool safeNextCopyFrom(CopyFromState cstate, ExprContext *econtext,
+	 Datum *values, bool *nulls);
+
 /*
  * error context callback for COPY FROM
  *
@@ -521,6 +524,125 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	miinfo->bufferedBytes += tuplen;
 }
 
+/*
+ * Analog of NextCopyFrom() but ignore rows with errors while copying.
+ */
+static bool
+safeNextCopyFrom(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
+{
+	SafeCopyFromState *safecstate = cstate->safecstate;
+	bool valid_row = true;
+
+	safecstate->skip_row = false;
+
+	PG_TRY();
+	{
+		if (!safecstate->replay_is_active)
+		{
+			if (safecstate->begin_subtransaction)
+			{
+BeginInternalSubTransaction(NULL);
+CurrentResourceOwner = safecstate->oldowner;
+
+safecstate->begin_subtransaction = false;
+			}
+
+			if (safecstate->saved_tuples < REPLAY_BUFFER_SIZE)
+			{
+valid_row = NextCopyFrom(cstate, econtext, values, nulls);
+if (valid_row)
+{
+	/* Fill replay_buffer in oldcontext*/
+	MemoryContextSwitchTo(safecstate->oldcontext);
+	safecstate->replay_buffer[safecstate->saved_tuples++] = heap_form_tuple(RelationGetDescr(cstate->rel), values, nulls);
+
+	safecstate->skip_row = true;
+}
+else if (!safecstate->processed_remaining_tuples)
+{
+	ReleaseCurrentSubTransaction();
+	CurrentResourceOwner = safecstate->oldowner;
+	if (safecstate->replayed_tuples < safecstate->saved_tuples)
+	{
+		/* Prepare to replay remaining tuples if 

Fwd: Merging statistics from children instead of re-sampling everything

2022-08-15 Thread Damir Belyalov
>
> 3) stadistinct - This is quite problematic. We only have the per-child
> estimates, and it's not clear if there's any overlap. For now I've just
> summed it up, because that's safer / similar to what we do for gather
> merge paths etc. Maybe we could improve this by estimating the overlap
> somehow (e.g. from MCV lists / histograms). But honestly, I doubt the
> estimates based on tiny sample of each child are any better. I suppose
> we could introduce a column option, determining how to combine ndistinct
> (similar to how we can override n_distinct itself).
>
> 4) MCV - It's trivial to build a new "parent" MCV list, although it may
> be too large (in which case we cut it at statistics target, and copy the
> remaining bits to the histogram)
>

I think there is one approach to solve the problem with calculating mcv and
distinct statistics.
To do this, you need to calculate the density of the sample distribution
and store it, for example, in some slot.
Then, when merging statistics, we will sum up the densities of all
partitions as functions and get a new density.
According to the new density, you can find out which values are most common
and which are distinct.

To calculate the partition densities, you can use the "Kernel density
Estimation" -
https://www.statsmodels.org/dev/examples/notebooks/generated/kernel_density
html

The approach may be very inaccurate and difficult to implement, but solves
the problem.

Regards,
Damir Belyalov
Postgres Professional


Re: POC PATCH: copy from ... exceptions to: (was Re: VLDB Features)

2022-07-19 Thread Damir Belyalov
quot;a"
WARNING:  COPY check_ign_err, line 5, column m: "b"
WARNING:  COPY check_ign_err, line 6, column n: ""
SELECT * FROM check_ign_err;
1 1 1
2 2 2 2
3 3
a 4 4
5 b b

7 7 7
\.
 n | m | k
---+---+---
 1 | 1 | 1
 7 | 7 | 7
(2 rows)

Thanks for feedback.
Regards, Damir
From 6bf2168cd962b3cce666a2cabf082f558eec848c Mon Sep 17 00:00:00 2001
From: Damir Belyalov 
Date: Fri, 15 Oct 2021 11:55:18 +0300
Subject: [PATCH] COPY IGNORE_ERRORS

---
 doc/src/sgml/ref/copy.sgml  |  13 +++
 src/backend/commands/copy.c |   8 ++
 src/backend/commands/copyfrom.c | 138 +++-
 src/backend/parser/gram.y   |   8 +-
 src/bin/psql/tab-complete.c |   3 +-
 src/include/commands/copy.h |   1 +
 src/include/parser/kwlist.h |   1 +
 src/test/regress/expected/copy2.out | 118 
 src/test/regress/sql/copy2.sql  | 110 ++
 9 files changed, 395 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 8aae711b3b..7d20b1649e 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -34,6 +34,7 @@ COPY { table_name [ ( format_name
 FREEZE [ boolean ]
+IGNORE_ERRORS [ boolean ]
 DELIMITER 'delimiter_character'
 NULL 'null_string'
 HEADER [ boolean | MATCH ]
@@ -233,6 +234,18 @@ COPY { table_name [ ( 

 
+   
+IGNORE_ERRORS
+
+ 
+  Drop rows that contain malformed data while copying. That is rows
+  containing syntax errors in data, rows with too many or too few columns,
+  rows that result in constraint violations, rows containing columns where
+  the data type's input function raises an error.
+ 
+
+   
+

 DELIMITER
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 3ac731803b..fead1aba46 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -402,6 +402,7 @@ ProcessCopyOptions(ParseState *pstate,
 {
 	bool		format_specified = false;
 	bool		freeze_specified = false;
+	bool		ignore_errors_specified = false;
 	bool		header_specified = false;
 	ListCell   *option;
 
@@ -442,6 +443,13 @@ ProcessCopyOptions(ParseState *pstate,
 			freeze_specified = true;
 			opts_out->freeze = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "ignore_errors") == 0)
+		{
+			if (ignore_errors_specified)
+errorConflictingDefElem(defel, pstate);
+			ignore_errors_specified = true;
+			opts_out->ignore_errors = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "delimiter") == 0)
 		{
 			if (opts_out->delim)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index a976008b3d..b994697b9d 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -535,6 +535,7 @@ CopyFrom(CopyFromState cstate)
 	ExprContext *econtext;
 	TupleTableSlot *singleslot = NULL;
 	MemoryContext oldcontext = CurrentMemoryContext;
+	ResourceOwner oldowner = CurrentResourceOwner;
 
 	PartitionTupleRouting *proute = NULL;
 	ErrorContextCallback errcallback;
@@ -549,6 +550,17 @@ CopyFrom(CopyFromState cstate)
 	bool		has_instead_insert_row_trig;
 	bool		leafpart_use_multi_insert = false;
 
+	/* variables for copy from ignore_errors option */
+#define			REPLAY_BUFFER_SIZE 3
+	HeapTuple		replay_buffer[REPLAY_BUFFER_SIZE];
+	HeapTuple 		replay_tuple;
+	int 			saved_tuples = 0;
+	intreplayed_tuples = 0;
+	bool			replay_is_active = false;
+	bool			begin_subtransaction = true;
+	boolfind_error = false;
+	bool			last_replaying = false;
+
 	Assert(cstate->rel);
 	Assert(list_length(cstate->range_table) == 1);
 
@@ -855,9 +867,129 @@ CopyFrom(CopyFromState cstate)
 
 		ExecClearTuple(myslot);
 
-		/* Directly store the values/nulls array in the slot */
-		if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
-			break;
+		/*
+		 * If option IGNORE_ERRORS is enabled, COPY skip rows with errors.
+		 * NextCopyFrom() directly store the values/nulls array in the slot.
+		 */
+		if (cstate->opts.ignore_errors)
+		{
+			bool valid_row = true;
+			bool skip_row = false;
+
+			PG_TRY();
+			{
+if (!replay_is_active)
+{
+	if (begin_subtransaction)
+	{
+		BeginInternalSubTransaction(NULL);
+		CurrentResourceOwner = oldowner;
+	}
+
+	if (saved_tuples < REPLAY_BUFFER_SIZE)
+	{
+		valid_row = NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull);
+		if (valid_row)
+		{
+			if (insertMethod == CIM_SINGLE)
+			{
+MemoryContextSwitchTo(oldcontext);
+
+replay_tuple = heap_form_tuple(RelationGetDescr(cstate->rel), myslot->tts_values, myslot->tts_isnull);
+replay_buffer[saved_tuples++] = replay_tuple;
+
+if (find_error)
+	skip_row = true;
+			}
+
+			begin_subtransaction = false;
+		}
+		

Re: POC PATCH: copy from ... exceptions to: (was Re: VLDB Features)

2021-12-18 Thread Damir Belyalov
--+---
>>>  id | integer |
>>>  val| integer |
>>>
>>> postgres=# copy test_copy2 from program 'seq 3' exceptions to stdout;
>>> 1
>>> NOTICE:  missing data for column "val"
>>> CONTEXT:  COPY test_copy2, line 1: "1"
>>> 2
>>> NOTICE:  missing data for column "val"
>>> CONTEXT:  COPY test_copy2, line 2: "2"
>>> 3
>>> NOTICE:  missing data for column "val"
>>> CONTEXT:  COPY test_copy2, line 3: "3"
>>> NOTICE:  total exceptions ignored: 3
>>>
>>> postgres=# \d test_copy1
>>>   Table "public.test_copy1"
>>>  Column |  Type   | Modifiers
>>> +-+---
>>>  id | integer | not null
>>>
>>> postgres=# set client_min_messages to warning;
>>> SET
>>> postgres=# copy test_copy1 from program 'ls /proc' exceptions to stdout;
>>> ...
>>> vmstat
>>> zoneinfo
>>> postgres=#
>>>
>>> Limited performance testing shows no significant difference between
>>> error-catching and plain code path.  For example, timing
>>>
>>>   copy test_copy1 from program 'seq 100' [exceptions to stdout]
>>>
>>> shows similar numbers with or without the added "exceptions to" clause.
>>>
>>> Now that I'm sending this I wonder if the original comment about the
>>> need for subtransaction around every loaded line still holds.  Any
>>> example of what would be not properly rolled back by just PG_TRY?
>>>
>>
>> this method is unsafe .. exception handlers doesn't free memory usually -
>> there is risk of memory leaks, source leaks
>>
>> you can enforce same performance with block subtransactions - when you
>> use subtransaction for 1000 rows, then impact of subtransactions is minimal
>>
>> when block fails, then you can use row level subtransaction - it works
>> well when you expect almost correct data.
>>
>
> Two years ago I wrote a extension that did it - but I have not time to
> finish it and push to upstream.
>
> Regards
>
> Pavel
>
>
>>
>> Regards
>>
>> Pavel
>>
>>
>>>
>>> Happy hacking!
>>> --
>>> Alex
>>>
>>>
>>>
>>> --
>>> Sent via pgsql-hackers mailing list (pgsql-hack...@postgresql.org)
>>> To make changes to your subscription:
>>> http://www.postgresql.org/mailpref/pgsql-hackers
>>>
>>>
>>
>
> --
> Sent via pgsql-hackers mailing list (pgsql-hack...@postgresql.org)
> To make changes to your subscription:
> http://www.postgresql.org/mailpref/pgsql-hackers
>
From b2dbe11f103655bf845aabc3d7af3697d1441b43 Mon Sep 17 00:00:00 2001
From: Damir Belyalov 
Date: Fri, 15 Oct 2021 11:55:18 +0300
Subject: [PATCH] COPY IGNORE_ERRORS option

---
 doc/src/sgml/ref/copy.sgml   |  13 +
 src/backend/commands/copy.c  |   8 +
 src/backend/commands/copyfrom.c  |  72 -
 src/backend/commands/copyfromparse.c |  13 +-
 src/backend/parser/gram.y|   8 +-
 src/backend/utils/.gitignore |   1 -
 src/backend/utils/errcodes.h | 354 +++
 src/bin/psql/tab-complete.c  |   3 +-
 src/include/commands/copy.h  |   1 +
 src/include/commands/copyfrom_internal.h |   1 +
 src/include/parser/kwlist.h  |   1 +
 src/test/regress/expected/copy2.out  |  32 ++
 src/test/regress/sql/copy2.sql   |  26 ++
 13 files changed, 525 insertions(+), 8 deletions(-)
 create mode 100644 src/backend/utils/errcodes.h

diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 4624c8f4c9..5ca8ff876d 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -34,6 +34,7 @@ COPY { table_name [ ( format_name
 FREEZE [ boolean ]
+IGNORE_ERRORS [ boolean ]
 DELIMITER 'delimiter_character'
 NULL 'null_string'
 HEADER [ boolean ]
@@ -233,6 +234,18 @@ COPY { table_name [ ( 

 
+   
+IGNORE_ERRORS
+
+ 
+  Drop rows that contain malformed data while copying. That is rows
+  containing syntax errors in data, rows with too many or too few columns,
+  rows that result in constraint violations, rows containing columns where
+  the data type's input function raises an error.
+ 
+
+   
+

 DELIMITER
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 53f4853141..9ddc8c3d96 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -337,6 +337,7 @@ ProcessCopyOptions(ParseState *pstate,