Hi,
With the current file_fdw, if even one line of data conversion fails,
the contents of the file cannot be referenced at all:
=# \! cat data/test.data
1,a
2,b
a,c
=# create foreign table f_fdw_test_1 (i int, t text) server f_fdw
options (filename 'test.data', format 'csv');
CREATE FOREIGN TABLE
=# table f_fdw_test_1;
ERROR: invalid input syntax for type integer: "a"
CONTEXT: COPY f_fdw_test, line 3, column i: "a"
Since we'll support ON_ERROR option which tolerates data conversion
errors in COPY FROM and LOG_VERBOSITY option at v17[1], how about
supporting them on file_fdw?
This idea comes from Fujii-san[2], and I think it'd be useful when
reading a bit dirty data.
Attached PoC patch works like below:
=# create foreign table f_fdw_test_2 (i int, t text) server f_fdw
options (filename 'test.data', format 'csv', on_error 'ignore');
CREATE FOREIGN TABLE
=# table f_fdw_test_2;
NOTICE: 1 row was skipped due to data type incompatibility
i | t
---+---
1 | a
2 | b
(2 rows)
=# create foreign table f_fdw_test_3 (i int, t text) server f_fdw
options (filename 'test.data', format 'csv', on_error 'ignore',
log_verbosity 'verbose');
CREATE FOREIGN TABLE
=# table f_fdw_test_3 ;
NOTICE: skipping row due to data type incompatibility at line 3 for
column i: "a"
NOTICE: 1 row was skipped due to data type incompatibility
i | t
---+---
1 | a
2 | b
(2 rows)
I'm going to continue developing the patch(e.g. add doc, measure
performance degradation) when people also think this feature is worth
adding.
What do you think?
[1] https://www.postgresql.org/docs/devel/sql-copy.html
[2] https://x.com/fujii_masao/status/1808178032219509041
--
Regards,
--
Atsushi Torikoshi
NTT DATA Group Corporation
From b6ec598bfdd64833e0bffc889a11addc5d677b51 Mon Sep 17 00:00:00 2001
From: Atsushi Torikoshi <[email protected]>
Date: Fri, 5 Jul 2024 00:07:26 +0900
Subject: [PATCH v1] PoC patch for adding on_error and log_verbosity options to
file_fdw
---
contrib/file_fdw/expected/file_fdw.out | 20 +++++++++
contrib/file_fdw/file_fdw.c | 58 +++++++++++++++++++++++---
contrib/file_fdw/sql/file_fdw.sql | 6 +++
3 files changed, 78 insertions(+), 6 deletions(-)
diff --git a/contrib/file_fdw/expected/file_fdw.out b/contrib/file_fdw/expected/file_fdw.out
index 86c148a86b..1af79af20f 100644
--- a/contrib/file_fdw/expected/file_fdw.out
+++ b/contrib/file_fdw/expected/file_fdw.out
@@ -206,6 +206,26 @@ SELECT * FROM agg_csv c JOIN agg_text t ON (t.a = c.a) ORDER BY c.a;
SELECT * FROM agg_bad; -- ERROR
ERROR: invalid input syntax for type real: "aaa"
CONTEXT: COPY agg_bad, line 3, column b: "aaa"
+-- on_error and log_verbosity tests
+ALTER FOREIGN TABLE agg_bad OPTIONS (ADD on_error 'ignore');
+SELECT * FROM agg_bad;
+NOTICE: 1 row was skipped due to data type incompatibility
+ a | b
+-----+--------
+ 100 | 99.097
+ 42 | 324.78
+(2 rows)
+
+ALTER FOREIGN TABLE agg_bad OPTIONS (ADD log_verbosity 'verbose');
+SELECT * FROM agg_bad;
+NOTICE: skipping row due to data type incompatibility at line 3 for column b: "aaa"
+NOTICE: 1 row was skipped due to data type incompatibility
+ a | b
+-----+--------
+ 100 | 99.097
+ 42 | 324.78
+(2 rows)
+
-- misc query tests
\t on
SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv');
diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c
index 249d82d3a0..86fb655df1 100644
--- a/contrib/file_fdw/file_fdw.c
+++ b/contrib/file_fdw/file_fdw.c
@@ -22,8 +22,10 @@
#include "catalog/pg_authid.h"
#include "catalog/pg_foreign_table.h"
#include "commands/copy.h"
+#include "commands/copyfrom_internal.h"
#include "commands/defrem.h"
#include "commands/explain.h"
+#include "commands/progress.h"
#include "commands/vacuum.h"
#include "foreign/fdwapi.h"
#include "foreign/foreign.h"
@@ -34,6 +36,7 @@
#include "optimizer/planmain.h"
#include "optimizer/restrictinfo.h"
#include "utils/acl.h"
+#include "utils/backend_progress.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/sampling.h"
@@ -74,6 +77,8 @@ static const struct FileFdwOption valid_options[] = {
{"null", ForeignTableRelationId},
{"default", ForeignTableRelationId},
{"encoding", ForeignTableRelationId},
+ {"on_error", ForeignTableRelationId},
+ {"log_verbosity", ForeignTableRelationId},
{"force_not_null", AttributeRelationId},
{"force_null", AttributeRelationId},
@@ -724,12 +729,13 @@ fileIterateForeignScan(ForeignScanState *node)
ExprContext *econtext;
MemoryContext oldcontext;
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
- bool found;
+ CopyFromState cstate = festate->cstate;
+ int64 skipped = 0;
ErrorContextCallback errcallback;
/* Set up callback to identify error line number. */
errcallback.callback = CopyFromErrorCallback;
- errcallback.arg = (void *) festate->cstate;
+ errcallback.arg = (void *) cstate;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
@@ -750,10 +756,40 @@ fileIterateForeignScan(ForeignScanState *node)
* switch in case we are doing that.
*/
oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
- found = NextCopyFrom(festate->cstate, econtext,
- slot->tts_values, slot->tts_isnull);
- if (found)
+
+ for(;;)
+ {
+ if (!NextCopyFrom(cstate, econtext,
+ slot->tts_values, slot->tts_isnull))
+ break;
+
+ if (cstate->opts.on_error != COPY_ON_ERROR_STOP &&
+ cstate->escontext->error_occurred)
+ {
+ /*
+ * Soft error occurred, skip this tuple and deal with error
+ * information according to ON_ERROR.
+ */
+ if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
+
+ /*
+ * Just make ErrorSaveContext ready for the next NextCopyFrom.
+ * Since we don't set details_wanted and error_data is not to
+ * be filled, just resetting error_occurred is enough.
+ */
+ cstate->escontext->error_occurred = false;
+
+ /* Report that this tuple was skipped by the ON_ERROR clause */
+ pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
+ ++skipped);
+
+ /* Repeat NextCopyFrom() until no soft error occurs */
+ continue;
+ }
+
ExecStoreVirtualTuple(slot);
+ break;
+ }
/* Switch back to original memory context */
MemoryContextSwitchTo(oldcontext);
@@ -795,7 +831,17 @@ fileEndForeignScan(ForeignScanState *node)
FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
/* if festate is NULL, we are in EXPLAIN; nothing to do */
- if (festate)
+ if (!festate)
+ return;
+
+ if (festate->cstate->opts.on_error != COPY_ON_ERROR_STOP &&
+ festate->cstate->num_errors > 0)
+ ereport(NOTICE,
+ errmsg_plural("%llu row was skipped due to data type incompatibility",
+ "%llu rows were skipped due to data type incompatibility",
+ (unsigned long long) festate->cstate->num_errors,
+ (unsigned long long) festate->cstate->num_errors));
+
EndCopyFrom(festate->cstate);
}
diff --git a/contrib/file_fdw/sql/file_fdw.sql b/contrib/file_fdw/sql/file_fdw.sql
index f0548e14e1..5eae01d0f2 100644
--- a/contrib/file_fdw/sql/file_fdw.sql
+++ b/contrib/file_fdw/sql/file_fdw.sql
@@ -150,6 +150,12 @@ SELECT * FROM agg_csv c JOIN agg_text t ON (t.a = c.a) ORDER BY c.a;
-- error context report tests
SELECT * FROM agg_bad; -- ERROR
+-- on_error and log_verbosity tests
+ALTER FOREIGN TABLE agg_bad OPTIONS (ADD on_error 'ignore');
+SELECT * FROM agg_bad;
+ALTER FOREIGN TABLE agg_bad OPTIONS (ADD log_verbosity 'verbose');
+SELECT * FROM agg_bad;
+
-- misc query tests
\t on
SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv');
base-commit: 2ef575c7803a55101352c0f6cb8f745af063a66c
--
2.39.2