On Wed Nov 19, 2025 at 8:32 PM -03, Masahiko Sawada wrote:
> I think one key point in the patch is whether or not it's okay to
> switch using COPY based on the actual number of tuples inserted. While
> it should be okay from the performance perspective, it might be an
> issue that the remote query shown in EXPLAIN (without ANALYZE) might
> be different from the actual query sent. If there is a way to
> distinguish the batch insertion between INSERT and COPY in
> postgres_fdw, it might be a good idea to use COPY command for the
> remote query only when the COPY FROM comes.
>
Yeah, I agree that this EXPLAIN inconsistency is an issue that for now
it doesn't seems easy to fix. That being said I've take a step back and
tried to reduce the scope of the patch to implement this idea of using
COPY as a remote sql when the user is executing a COPY FROM on a foreign
table.
My initial idea was to use the COPY as a remote SQL whenever an user
execute a COPY FROM on a foreign table but this can cause breaking
changes because the table on the foreign server may have triggers for
INSERT's and changing to use only the COPY FROM as remote sql would
break these cases. We have some test cases for this scenario.
So on this new version I introduced two new foreign table and server
options:
- use_copy_for_batch_insert: Enable the usage of COPY when
appropriate
- copy_for_batch_insert_threshold: The number of rows necessary to
switch to use the COPY command instead of an INSERT.
I think that the threshold option is necessary because it can be
configured for a different value than batch_size option based on the
user needs. The default value is 1, so once "use_copy_for_batch_insert"
is set to true, the COPY will start to be used. Note that this option is
set to false by default.
Speeking about the implementation, the CopyFrom() calls
BeginForeignInsert() fdw routine. The postgres_fdw implementation of
this routine create the PgFdwModifyState that is used by
execute_foreign_modify() so I thought that it could be a good idea to
get the table options of COPY usage on this function and save it on
PgFdwModifyState struct and when execute_foreign_modify() is executed we
can just access the table options previously stored and check if the
COPY can be actually be used based on the number of tuples being batch
inserted.
The BeginForeignInsert() routine is also called when inserting tuples
into table partitions, so saving the COPY usage options on this stage
can make it possible to use the COPY command to speed up batch inserts
into partition tables that are postgres_fdw tables. I'm not sure if we
should keep the patch scope only for COPY FROM on foreign table but I
don't see any issue of using the COPY to speed up batch inserts of
postgres_fdw table partitions too since we don't expose the remote sql
being used on this case, and benchmarks shows that we can have a good
performance improvement.
I've implemented this idea on the attached v7 and here it is some
benchmarks that I've run.
Scenario: COPY FROM <fdw_table>
use_copy_for_batch_insert = false
rows being inserted = 100
batch_size = 100
copy_for_batch_insert_threshold = 50
tps = 6500.133253
Scenario: COPY FROM <fdw_table>
use_copy_for_batch_insert = true
rows being inserted = 100
batch_size = 100
copy_for_batch_insert_threshold = 50
tps = 13116.474292
Scenario: COPY FROM <fdw_table>
use_copy_for_batch_insert = false
rows being inserted = 140
batch_size = 100
copy_for_batch_insert_threshold = 50
tps = 4654.865032
Scenario: COPY FROM <fdw_table>
use_copy_for_batch_insert = true
rows being inserted = 140
batch_size = 100
copy_for_batch_insert_threshold = 50
tps = 7441.694325
-------------------------------------
Scenario: INSERT INTO <partitioned_table>
use_copy_for_batch_insert = false
rows being inserted per partition = 100
number of partitions: 3
tps = 3176.872369
Scenario: INSERT INTO <partitioned_table>
use_copy_for_batch_insert = true
rows being inserted per partition = 100
number of partitions: 3
tps = 6993.544958
-------------------------------------
Note that for the "copy_for_batch_insert_threshold = 50" and "rows being
inserted=140" the behaviour is to use the COPY for the first batch
iteration of 100 rows and then fallback to use INSERT for the 40 rows
remaining.
Summary of v7 changes:
- Introduce "use_copy_for_batch_insert" foreign server/table option
to enable the usage of COPY command
- Introduce "copy_for_batch_insert_threshold" option to use the COPY
command if the number of rows being inserted is >= of the
configured value. Default is 1.
- COPY command can only be used if the user is executing a COPY FROM
on a postgres_fdw table or an INSERT into a partitioned table that
has postgres_fdw as table partitions.
- COPY and INSERT can be used for the same execution if there is no
sufficient rows remaining (based on copy_usage_threshold) after
the first batch execution.
--
Matheus Alcantara
EDB: http://www.enterprisedb.com
From 17bbb0b129adc96f595e2eb4641b4b601f9fbf6b Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <[email protected]>
Date: Wed, 26 Nov 2025 16:34:46 -0300
Subject: [PATCH v7] postgres_fdw: speed up batch inserts using COPY
Previously when the user execute a COPY into a foreign table the
statement was translated into a INSERT statement to be executed on the
foreign server. This commit introduce a new foreign table/server option
"use_copy_for_batch_insert" to enable the usage of the COPY command
instead of an INSERT. Another option "copy_for_batch_insert_threshold"
was also added to switch to use the COPY command when the number of rows
being inserted is >= than the configured value.
This logic was implement on postgresBeginForeignInsert() that is the
implementation of BeginForeignInsert() fdw routine. As this function is
also called when inserting tuples into partitions the COPY can also be
used to speed up batch inserts for table partitions that are
postgres_fdw tables.
---
contrib/postgres_fdw/deparse.c | 30 +++
.../postgres_fdw/expected/postgres_fdw.out | 13 +
contrib/postgres_fdw/option.c | 13 +-
contrib/postgres_fdw/postgres_fdw.c | 237 +++++++++++++++++-
contrib/postgres_fdw/postgres_fdw.h | 1 +
contrib/postgres_fdw/sql/postgres_fdw.sql | 13 +
6 files changed, 304 insertions(+), 3 deletions(-)
diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index f2fb0051843..1cdf1d8cc8d 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -2236,6 +2236,36 @@ rebuildInsertSql(StringInfo buf, Relation rel,
appendStringInfoString(buf, orig_query + values_end_len);
}
+/*
+ * Build a COPY FROM STDIN statement using the TEXT format
+ */
+void
+deparseCopySql(StringInfo buf, Relation rel, List *target_attrs)
+{
+ TupleDesc tupdesc = RelationGetDescr(rel);
+ bool first = true;
+
+ appendStringInfo(buf, "COPY ");
+ deparseRelation(buf, rel);
+ appendStringInfo(buf, "(");
+
+ foreach_int(attnum, target_attrs)
+ {
+ Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
+
+ if (attr->attgenerated)
+ continue;
+
+ if (!first)
+ appendStringInfoString(buf, ", ");
+
+ first = false;
+
+ appendStringInfoString(buf,
quote_identifier(NameStr(attr->attname)));
+ }
+ appendStringInfoString(buf, ") FROM STDIN");
+}
+
/*
* deparse remote UPDATE statement
*
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out
b/contrib/postgres_fdw/expected/postgres_fdw.out
index cd28126049d..9d06d9b6eb0 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -9524,6 +9524,19 @@ select * from rem2;
2 | bar
(2 rows)
+delete from rem2;
+-- Test COPY with can_use_copy = true
+alter foreign table rem2 options (add use_copy_for_batch_insert 'true',
copy_for_batch_insert_threshold '2');
+-- Insert 3 rows so that the third row fallback to normal INSERT statement path
+copy rem2 from stdin;
+select * from rem2;
+ f1 | f2
+----+-----
+ 1 | foo
+ 2 | bar
+ 3 | baz
+(3 rows)
+
delete from rem2;
-- Test check constraints
alter table loc2 add constraint loc2_f1positive check (f1 >= 0);
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 04788b7e8b3..d56b6cc142d 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -157,7 +157,8 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
(void) ExtractExtensionList(defGetString(def), true);
}
else if (strcmp(def->defname, "fetch_size") == 0 ||
- strcmp(def->defname, "batch_size") == 0)
+ strcmp(def->defname, "batch_size") == 0 ||
+ strcmp(def->defname,
"copy_for_batch_insert_threshold") == 0)
{
char *value;
int int_val;
@@ -263,6 +264,16 @@ InitPgFdwOptions(void)
/* batch_size is available on both server and table */
{"batch_size", ForeignServerRelationId, false},
{"batch_size", ForeignTableRelationId, false},
+ /* use_copy_for_batch_insert is available on both server and
table */
+ {"use_copy_for_batch_insert", ForeignServerRelationId, false},
+ {"use_copy_for_batch_insert", ForeignTableRelationId, false},
+
+ /*
+ * copy_for_batch_insert_threshold is available on both server
and
+ * table
+ */
+ {"copy_for_batch_insert_threshold", ForeignServerRelationId,
false},
+ {"copy_for_batch_insert_threshold", ForeignTableRelationId,
false},
/* async_capable is available on both server and table */
{"async_capable", ForeignServerRelationId, false},
{"async_capable", ForeignTableRelationId, false},
diff --git a/contrib/postgres_fdw/postgres_fdw.c
b/contrib/postgres_fdw/postgres_fdw.c
index 06b52c65300..02c6312a655 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -63,6 +63,9 @@ PG_MODULE_MAGIC_EXT(
/* If no remote estimates, assume a sort costs 20% extra */
#define DEFAULT_FDW_SORT_MULTIPLIER 1.2
+/* Buffer size to send COPY IN data*/
+#define COPYBUFSIZ 8192
+
/*
* Indexes of FDW-private information stored in fdw_private lists.
*
@@ -198,6 +201,12 @@ typedef struct PgFdwModifyState
bool has_returning; /* is there a RETURNING clause? */
List *retrieved_attrs; /* attr numbers retrieved by RETURNING
*/
+ /* COPY usage stuff */
+ bool use_copy_for_batch_insert; /* COPY command is
enabled to use? */
+ int copy_for_batch_insert_threshold; /* # of
rows to switch to
+
* use COPY */
+ bool usingcopy; /* is COPY being used ? */
+
/* info about parameters for prepared statement */
AttrNumber ctidAttno; /* attnum of input resjunk ctid
column */
int p_nums; /* number of parameters
to transmit */
@@ -545,6 +554,11 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
const
PgFdwRelationInfo *fpinfo_o,
const
PgFdwRelationInfo *fpinfo_i);
static int get_batch_size_option(Relation rel);
+static bool get_use_copy_for_batch_insert(Relation rel);
+static int get_copy_for_batch_insert_threshold(Relation rel);
+static TupleTableSlot **execute_foreign_insert_using_copy(PgFdwModifyState
*fmstate,
+
TupleTableSlot **slots,
+
int *numSlots);
/*
@@ -2265,6 +2279,10 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
retrieved_attrs != NIL,
retrieved_attrs);
+ fmstate->use_copy_for_batch_insert = get_use_copy_for_batch_insert(rel);
+ if (fmstate->use_copy_for_batch_insert)
+ fmstate->copy_for_batch_insert_threshold =
get_copy_for_batch_insert_threshold(rel);
+
/*
* If the given resultRelInfo already has PgFdwModifyState set, it means
* the foreign table is an UPDATE subplan result rel; in which case,
store
@@ -4066,6 +4084,50 @@ create_foreign_modify(EState *estate,
return fmstate;
}
+/*
+ * Write target attribute values from fmstate into buf buffer to be sent as
+ * COPY FROM STDIN data
+ */
+static void
+convert_slot_to_copy_text(StringInfo buf,
+ PgFdwModifyState *fmstate,
+ TupleTableSlot *slot)
+{
+ TupleDesc tupdesc = RelationGetDescr(fmstate->rel);
+ bool first = true;
+ int i = 0;
+
+ foreach_int(attnum, fmstate->target_attrs)
+ {
+ CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum -
1);
+ Datum datum;
+ bool isnull;
+
+ /* Ignore generated columns; they are set to DEFAULT */
+ if (attr->attgenerated)
+ continue;
+
+ if (!first)
+ appendStringInfoCharMacro(buf, '\t');
+ first = false;
+
+ datum = slot_getattr(slot, attnum, &isnull);
+
+ if (isnull)
+ appendStringInfoString(buf, "\\N");
+ else
+ {
+ const char *value =
OutputFunctionCall(&fmstate->p_flinfo[i],
+
datum);
+
+ appendStringInfoString(buf, value);
+ }
+ i++;
+ }
+
+ appendStringInfoCharMacro(buf, '\n');
+}
+
/*
* execute_foreign_modify
* Perform foreign-table modification as required, and fetch
RETURNING
@@ -4097,11 +4159,34 @@ execute_foreign_modify(EState *estate,
if (fmstate->conn_state->pendingAreq)
process_pending_request(fmstate->conn_state->pendingAreq);
+ /*
+ * Check if the COPY command can be used to speed up inserts. The COPY
+ * command can not be used if the original query has a RETURNING clause.
+ */
+ if (operation == CMD_INSERT &&
+ fmstate->use_copy_for_batch_insert &&
+ !fmstate->has_returning &&
+ *numSlots >= fmstate->copy_for_batch_insert_threshold)
+ {
+
+ /* Build the COPY command if it's not already built */
+ if (!fmstate->usingcopy)
+ {
+ pfree(fmstate->query);
+ initStringInfo(&sql);
+ deparseCopySql(&sql, fmstate->rel,
fmstate->target_attrs);
+ fmstate->query = sql.data;
+ fmstate->usingcopy = true;
+ }
+ return execute_foreign_insert_using_copy(fmstate, slots,
numSlots);
+ }
+
/*
* If the existing query was deparsed and prepared for a different
number
- * of rows, rebuild it for the proper number.
+ * of rows or if COPY was being used on the previous execution, rebuild
+ * the INSERT statement and use the proper number.
*/
- if (operation == CMD_INSERT && fmstate->num_slots != *numSlots)
+ if ((operation == CMD_INSERT && fmstate->num_slots != *numSlots) ||
fmstate->usingcopy)
{
/* Destroy the prepared statement created previously */
if (fmstate->p_name)
@@ -7886,3 +7971,151 @@ get_batch_size_option(Relation rel)
return batch_size;
}
+
+/*
+ * Determine if the usage of the COPY command to execute a INSERT into a
foreign
+ * table is enabled. The option specified for a table has precedence.
+ */
+static bool
+get_use_copy_for_batch_insert(Relation rel)
+{
+ Oid foreigntableid = RelationGetRelid(rel);
+ List *options = NIL;
+ ListCell *lc;
+ ForeignTable *table;
+ ForeignServer *server;
+ bool can_use_copy = false;
+
+ /*
+ * Load options for table and server. We append server options after
table
+ * options, because table options take precedence.
+ */
+ table = GetForeignTable(foreigntableid);
+ server = GetForeignServer(table->serverid);
+
+ options = list_concat(options, table->options);
+ options = list_concat(options, server->options);
+
+ /* See if either table or server specifies enable_batch_with_copy. */
+ foreach(lc, options)
+ {
+ DefElem *def = (DefElem *) lfirst(lc);
+
+ if (strcmp(def->defname, "use_copy_for_batch_insert") == 0)
+ {
+ (void) parse_bool(defGetString(def), &can_use_copy);
+ break;
+ }
+ }
+ return can_use_copy;
+}
+
+static int
+get_copy_for_batch_insert_threshold(Relation rel)
+{
+ Oid foreigntableid = RelationGetRelid(rel);
+ List *options = NIL;
+ ListCell *lc;
+ ForeignTable *table;
+ ForeignServer *server;
+
+ /*
+ * We use 1 as default, which means that COPY will be used once
+ * "can_use_copy" is set to true.
+ */
+ int copy_for_batch_insert_threshold = 1;
+
+ /*
+ * Load options for table and server. We append server options after
table
+ * options, because table options take precedence.
+ */
+ table = GetForeignTable(foreigntableid);
+ server = GetForeignServer(table->serverid);
+
+ options = list_concat(options, table->options);
+ options = list_concat(options, server->options);
+
+ /* See if either table or server specifies enable_batch_with_copy. */
+ foreach(lc, options)
+ {
+ DefElem *def = (DefElem *) lfirst(lc);
+
+ if (strcmp(def->defname, "copy_for_batch_insert_threshold") ==
0)
+ {
+ (void) parse_int(defGetString(def),
©_for_batch_insert_threshold, 0, NULL);
+ break;
+ }
+ }
+ return copy_for_batch_insert_threshold;
+}
+
+/* Execute an insert into a foreign table using the COPY command */
+static TupleTableSlot **
+execute_foreign_insert_using_copy(PgFdwModifyState *fmstate,
+
TupleTableSlot **slots,
+ int *numSlots)
+{
+ PGresult *res;
+ StringInfoData copy_data;
+ int n_rows;
+ int i;
+
+ /* Send COPY command */
+ if (!PQsendQuery(fmstate->conn, fmstate->query))
+ pgfdw_report_error(NULL, fmstate->conn, fmstate->query);
+
+ /* get the COPY result */
+ res = pgfdw_get_result(fmstate->conn);
+ if (PQresultStatus(res) != PGRES_COPY_IN)
+ pgfdw_report_error(res, fmstate->conn, fmstate->query);
+
+ /* Convert the TupleTableSlot data into a TEXT-formatted line */
+ initStringInfo(©_data);
+ for (i = 0; i < *numSlots; i++)
+ {
+ convert_slot_to_copy_text(©_data, fmstate, slots[i]);
+
+ /*
+ * Send initial COPY data if the buffer reach the limit to
avoid large
+ * memory usage.
+ */
+ if (copy_data.len >= COPYBUFSIZ)
+ {
+ if (PQputCopyData(fmstate->conn, copy_data.data,
copy_data.len) <= 0)
+ pgfdw_report_error(NULL, fmstate->conn,
fmstate->query);
+ resetStringInfo(©_data);
+ }
+ }
+
+ /* Send the remaining COPY data */
+ if (copy_data.len > 0)
+ {
+ if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len)
<= 0)
+ pgfdw_report_error(NULL, fmstate->conn, fmstate->query);
+ }
+
+ /* End the COPY operation */
+ if (PQputCopyEnd(fmstate->conn, NULL) < 0 || PQflush(fmstate->conn))
+ pgfdw_report_error(NULL, fmstate->conn, fmstate->query);
+
+ /*
+ * Get the result, and check for success.
+ */
+ res = pgfdw_get_result(fmstate->conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(res, fmstate->conn, fmstate->query);
+
+ n_rows = atoi(PQcmdTuples(res));
+
+ /* And clean up */
+ PQclear(res);
+
+ MemoryContextReset(fmstate->temp_cxt);
+
+ *numSlots = n_rows;
+
+ /*
+ * Return NULL if nothing was inserted on the remote end
+ */
+ return (n_rows > 0) ? slots : NULL;
+}
diff --git a/contrib/postgres_fdw/postgres_fdw.h
b/contrib/postgres_fdw/postgres_fdw.h
index e69735298d7..aa54d6bba53 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -204,6 +204,7 @@ extern void rebuildInsertSql(StringInfo buf, Relation rel,
char *orig_query, List
*target_attrs,
int values_end_len,
int num_params,
int num_rows);
+extern void deparseCopySql(StringInfo buf, Relation rel, List *target_attrs);
extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
Index rtindex,
Relation rel,
List *targetAttrs,
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql
b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 9a8f9e28135..fac00c55553 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2807,6 +2807,19 @@ select * from rem2;
delete from rem2;
+-- Test COPY with can_use_copy = true
+alter foreign table rem2 options (add use_copy_for_batch_insert 'true',
copy_for_batch_insert_threshold '2');
+
+-- Insert 3 rows so that the third row fallback to normal INSERT statement path
+copy rem2 from stdin;
+1 foo
+2 bar
+3 baz
+\.
+select * from rem2;
+
+delete from rem2;
+
-- Test check constraints
alter table loc2 add constraint loc2_f1positive check (f1 >= 0);
alter foreign table rem2 add constraint rem2_f1positive check (f1 >= 0);
--
2.51.2