I've spent some more time on this patch cleaning up some things and
trying to simplify some things.

I've renamed "copy_for_batch_insert_threshold" to
"batch_with_copy_threshold" and removed the boolean option
"use_copy_for_batch_insert", so now to enable the COPY usage for batch
inserts it only need to set batch_with_copy_threshold to a number
greater than 0.

Also the COPY can only be used if batching is also enabled (batch_size >
1) and it will only be used for the COPY FROM on a foreign table and for
inserts into table partitions that are also foreign tables.

--
Matheus Alcantara
EDB: http://www.enterprisedb.com

From dead99e8a2db663df8676f1caca1d834e19ca076 Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <[email protected]>
Date: Wed, 26 Nov 2025 16:34:46 -0300
Subject: [PATCH v8] postgres_fdw: speed up batch inserts using COPY

This commit include a new foreign table/server option
"batch_with_copy_threshold" that enable the usage of the COPY command to
speed up batch inserts when a COPY FROM or an insert into a table
partition that is a foreign table is executed. In both cases the
BeginForeignInsert fdw routine is called, so this new option is
retrieved only on this routine. For the other cases that use the
ForeignModify routines still use the INSERT as a remote SQL.

Note that the COPY will only be used for batch inserts and only if the
current number of rows being inserted on the batch operation is >=
batch_with_copy_threshold. If batch_size=100, batch_with_copy_threshold=50
and number of rows being inserted is 120 the first 100 rows will be
inserted using the COPY command and the remaining 20 rows will be
inserted using INSERT statement because it did not reach the copy
threshold.
---
 contrib/postgres_fdw/deparse.c                |  35 +++
 .../postgres_fdw/expected/postgres_fdw.out    |  26 +++
 contrib/postgres_fdw/option.c                 |   6 +-
 contrib/postgres_fdw/postgres_fdw.c           | 210 +++++++++++++++++-
 contrib/postgres_fdw/postgres_fdw.h           |   1 +
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  23 ++
 6 files changed, 298 insertions(+), 3 deletions(-)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index f2fb0051843..54e821f6bf5 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -2236,6 +2236,41 @@ rebuildInsertSql(StringInfo buf, Relation rel,
        appendStringInfoString(buf, orig_query + values_end_len);
 }
 
+/*
+ *  Build a COPY FROM STDIN statement using the TEXT format
+ */
+void
+deparseCopySql(StringInfo buf, Relation rel, List *target_attrs)
+{
+       TupleDesc       tupdesc = RelationGetDescr(rel);
+       bool            first = true;
+       int                     nattrs = list_length(target_attrs);
+
+       appendStringInfo(buf, "COPY ");
+       deparseRelation(buf, rel);
+       if (nattrs > 0)
+               appendStringInfo(buf, "(");
+
+       foreach_int(attnum, target_attrs)
+       {
+               Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
+
+               if (attr->attgenerated)
+                       continue;
+
+               if (!first)
+                       appendStringInfoString(buf, ", ");
+
+               first = false;
+
+               appendStringInfoString(buf, 
quote_identifier(NameStr(attr->attname)));
+       }
+       if (nattrs > 0)
+               appendStringInfoString(buf, ") FROM STDIN");
+       else
+               appendStringInfoString(buf, " FROM STDIN");
+}
+
 /*
  * deparse remote UPDATE statement
  *
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out 
b/contrib/postgres_fdw/expected/postgres_fdw.out
index 48e3185b227..ffba243dece 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -9215,6 +9215,19 @@ with result as (insert into itrtest values (1, 'test1'), 
(2, 'test2') returning
 
 drop trigger loct1_br_insert_trigger on loct1;
 drop trigger loct2_br_insert_trigger on loct2;
+-- Test batch insert using COPY with batch_with_copy_threshold
+delete from itrtest;
+alter server loopback options (add batch_with_copy_threshold '2', batch_size 
'3');
+insert into itrtest values (1, 'test1'), (2, 'test2'), (2, 'test3');
+select * from itrtest;
+ a |   b   
+---+-------
+ 1 | test1
+ 2 | test2
+ 2 | test3
+(3 rows)
+
+alter server loopback options (drop batch_with_copy_threshold, drop 
batch_size);
 drop table itrtest;
 drop table loct1;
 drop table loct2;
@@ -9524,6 +9537,19 @@ select * from rem2;
   2 | bar
 (2 rows)
 
+delete from rem2;
+-- Test COPY with batch_with_copy_threshold
+alter foreign table rem2 options (add batch_with_copy_threshold '2');
+-- Insert 3 rows so that the third row fallback to normal INSERT statement path
+copy rem2 from stdin;
+select * from rem2;
+ f1 | f2  
+----+-----
+  1 | foo
+  2 | bar
+  3 | baz
+(3 rows)
+
 delete from rem2;
 -- Test check constraints
 alter table loc2 add constraint loc2_f1positive check (f1 >= 0);
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 04788b7e8b3..d2696206e75 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -157,7 +157,8 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
                        (void) ExtractExtensionList(defGetString(def), true);
                }
                else if (strcmp(def->defname, "fetch_size") == 0 ||
-                                strcmp(def->defname, "batch_size") == 0)
+                                strcmp(def->defname, "batch_size") == 0 ||
+                                strcmp(def->defname, 
"batch_with_copy_threshold") == 0)
                {
                        char       *value;
                        int                     int_val;
@@ -263,6 +264,9 @@ InitPgFdwOptions(void)
                /* batch_size is available on both server and table */
                {"batch_size", ForeignServerRelationId, false},
                {"batch_size", ForeignTableRelationId, false},
+               /* batch_with_copy_threshold is available on both server and 
table */
+               {"batch_with_copy_threshold", ForeignServerRelationId, false},
+               {"batch_with_copy_threshold", ForeignTableRelationId, false},
                /* async_capable is available on both server and table */
                {"async_capable", ForeignServerRelationId, false},
                {"async_capable", ForeignTableRelationId, false},
diff --git a/contrib/postgres_fdw/postgres_fdw.c 
b/contrib/postgres_fdw/postgres_fdw.c
index 06b52c65300..7896760d51a 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -63,6 +63,9 @@ PG_MODULE_MAGIC_EXT(
 /* If no remote estimates, assume a sort costs 20% extra */
 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
 
+/* Buffer size to send COPY IN data*/
+#define COPYBUFSIZ 8192
+
 /*
  * Indexes of FDW-private information stored in fdw_private lists.
  *
@@ -198,6 +201,10 @@ typedef struct PgFdwModifyState
        bool            has_returning;  /* is there a RETURNING clause? */
        List       *retrieved_attrs;    /* attr numbers retrieved by RETURNING 
*/
 
+       /* COPY usage stuff */
+       int                     batch_with_copy_threshold;      /* value of FDW 
option */
+       char       *cmd_copy;           /* COPY statement */
+
        /* info about parameters for prepared statement */
        AttrNumber      ctidAttno;              /* attnum of input resjunk ctid 
column */
        int                     p_nums;                 /* number of parameters 
to transmit */
@@ -545,6 +552,10 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
                                                          const 
PgFdwRelationInfo *fpinfo_o,
                                                          const 
PgFdwRelationInfo *fpinfo_i);
 static int     get_batch_size_option(Relation rel);
+static int     get_batch_with_copy_threshold(Relation rel);
+static TupleTableSlot **execute_foreign_modify_using_copy(PgFdwModifyState 
*fmstate,
+                                                                               
                                  TupleTableSlot **slots,
+                                                                               
                                  int *numSlots);
 
 
 /*
@@ -2013,8 +2024,30 @@ postgresExecForeignBatchInsert(EState *estate,
         */
        if (fmstate->aux_fmstate)
                resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
-       rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
-                                                                  slots, 
planSlots, numSlots);
+
+       /*
+        * Check if "batch_with_copy_threshold" is enable (> 0) and if the COPY
+        * can be used based on the number of rows being inserted on this batch.
+        * The original query also should not have a RETURNING clause.
+        */
+       if (fmstate->batch_with_copy_threshold > 0 &&
+               fmstate->batch_with_copy_threshold <= *numSlots &&
+               !fmstate->has_returning)
+       {
+               if (fmstate->cmd_copy == NULL)
+               {
+                       StringInfoData sql;
+
+                       initStringInfo(&sql);
+                       deparseCopySql(&sql, fmstate->rel, 
fmstate->target_attrs);
+                       fmstate->cmd_copy = sql.data;
+               }
+
+               rslot = execute_foreign_modify_using_copy(fmstate, slots, 
numSlots);
+       }
+       else
+               rslot = execute_foreign_modify(estate, resultRelInfo, 
CMD_INSERT,
+                                                                          
slots, planSlots, numSlots);
        /* Revert that change */
        if (fmstate->aux_fmstate)
                resultRelInfo->ri_FdwState = fmstate;
@@ -2265,6 +2298,16 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
                                                                        
retrieved_attrs != NIL,
                                                                        
retrieved_attrs);
 
+
+       /*
+        * Set batch_with_copy_threshold from foreign server/table options. We 
do
+        * this outside of create_foreign_modify() because we only want to use
+        * COPY as a remote SQL when a COPY FROM on a foreign table is executed 
or
+        * an insert is being performed on a table partition. In both cases the
+        * BeginForeignInsert fdw routine is called.
+        */
+       fmstate->batch_with_copy_threshold = get_batch_with_copy_threshold(rel);
+
        /*
         * If the given resultRelInfo already has PgFdwModifyState set, it means
         * the foreign table is an UPDATE subplan result rel; in which case, 
store
@@ -4066,6 +4109,50 @@ create_foreign_modify(EState *estate,
        return fmstate;
 }
 
+/*
+ *  Write target attribute values from fmstate into buf buffer to be sent as
+ *  COPY FROM STDIN data
+ */
+static void
+convert_slot_to_copy_text(StringInfo buf,
+                                                 PgFdwModifyState *fmstate,
+                                                 TupleTableSlot *slot)
+{
+       TupleDesc       tupdesc = RelationGetDescr(fmstate->rel);
+       bool            first = true;
+       int                     i = 0;
+
+       foreach_int(attnum, fmstate->target_attrs)
+       {
+               CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 
1);
+               Datum           datum;
+               bool            isnull;
+
+               /* Ignore generated columns; they are set to DEFAULT */
+               if (attr->attgenerated)
+                       continue;
+
+               if (!first)
+                       appendStringInfoCharMacro(buf, '\t');
+               first = false;
+
+               datum = slot_getattr(slot, attnum, &isnull);
+
+               if (isnull)
+                       appendStringInfoString(buf, "\\N");
+               else
+               {
+                       const char *value = 
OutputFunctionCall(&fmstate->p_flinfo[i],
+                                                                               
                   datum);
+
+                       appendStringInfoString(buf, value);
+               }
+               i++;
+       }
+
+       appendStringInfoCharMacro(buf, '\n');
+}
+
 /*
  * execute_foreign_modify
  *             Perform foreign-table modification as required, and fetch 
RETURNING
@@ -7886,3 +7973,122 @@ get_batch_size_option(Relation rel)
 
        return batch_size;
 }
+
+/*
+ * Determine COPY usage threshold for batching inserts for a given foreign
+ * table. The option specified for a table has precedence.
+ */
+static int
+get_batch_with_copy_threshold(Relation rel)
+{
+       Oid                     foreigntableid = RelationGetRelid(rel);
+       List       *options = NIL;
+       ListCell   *lc;
+       ForeignTable *table;
+       ForeignServer *server;
+
+       /*
+        * We use 0 as default, which means that COPY will not be used by 
default
+        * for batching insert.
+        */
+       int                     copy_for_batch_insert_threshold = 0;
+
+       /*
+        * Load options for table and server. We append server options after 
table
+        * options, because table options take precedence.
+        */
+       table = GetForeignTable(foreigntableid);
+       server = GetForeignServer(table->serverid);
+
+       options = list_concat(options, table->options);
+       options = list_concat(options, server->options);
+
+       /* See if either table or server specifies enable_batch_with_copy. */
+       foreach(lc, options)
+       {
+               DefElem    *def = (DefElem *) lfirst(lc);
+
+               if (strcmp(def->defname, "batch_with_copy_threshold") == 0)
+               {
+                       (void) parse_int(defGetString(def), 
&copy_for_batch_insert_threshold, 0, NULL);
+                       break;
+               }
+       }
+       return copy_for_batch_insert_threshold;
+}
+
+/*
+ * execute_foreign_modify_using_copy
+ *             Perform foreign-table modification using the COPY command.
+ */
+static TupleTableSlot **
+execute_foreign_modify_using_copy(PgFdwModifyState *fmstate,
+                                                                 
TupleTableSlot **slots,
+                                                                 int *numSlots)
+{
+       PGresult   *res;
+       StringInfoData copy_data;
+       int                     n_rows;
+       int                     i;
+
+       Assert(fmstate->cmd_copy != NULL);
+
+       /* Send COPY command */
+       if (!PQsendQuery(fmstate->conn, fmstate->cmd_copy))
+               pgfdw_report_error(NULL, fmstate->conn, fmstate->cmd_copy);
+
+       /* get the COPY result */
+       res = pgfdw_get_result(fmstate->conn);
+       if (PQresultStatus(res) != PGRES_COPY_IN)
+               pgfdw_report_error(res, fmstate->conn, fmstate->cmd_copy);
+
+       /* Convert the TupleTableSlot data into a TEXT-formatted line */
+       initStringInfo(&copy_data);
+       for (i = 0; i < *numSlots; i++)
+       {
+               convert_slot_to_copy_text(&copy_data, fmstate, slots[i]);
+
+               /*
+                * Send initial COPY data if the buffer reach the limit to 
avoid large
+                * memory usage.
+                */
+               if (copy_data.len >= COPYBUFSIZ)
+               {
+                       if (PQputCopyData(fmstate->conn, copy_data.data, 
copy_data.len) <= 0)
+                               pgfdw_report_error(NULL, fmstate->conn, 
fmstate->cmd_copy);
+                       resetStringInfo(&copy_data);
+               }
+       }
+
+       /* Send the remaining COPY data */
+       if (copy_data.len > 0)
+       {
+               if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) 
<= 0)
+                       pgfdw_report_error(NULL, fmstate->conn, 
fmstate->cmd_copy);
+       }
+
+       /* End the COPY operation */
+       if (PQputCopyEnd(fmstate->conn, NULL) < 0 || PQflush(fmstate->conn))
+               pgfdw_report_error(NULL, fmstate->conn, fmstate->cmd_copy);
+
+       /*
+        * Get the result, and check for success.
+        */
+       res = pgfdw_get_result(fmstate->conn);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               pgfdw_report_error(res, fmstate->conn, fmstate->cmd_copy);
+
+       n_rows = atoi(PQcmdTuples(res));
+
+       /* And clean up */
+       PQclear(res);
+
+       MemoryContextReset(fmstate->temp_cxt);
+
+       *numSlots = n_rows;
+
+       /*
+        * Return NULL if nothing was inserted on the remote end
+        */
+       return (n_rows > 0) ? slots : NULL;
+}
diff --git a/contrib/postgres_fdw/postgres_fdw.h 
b/contrib/postgres_fdw/postgres_fdw.h
index e69735298d7..aa54d6bba53 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -204,6 +204,7 @@ extern void rebuildInsertSql(StringInfo buf, Relation rel,
                                                         char *orig_query, List 
*target_attrs,
                                                         int values_end_len, 
int num_params,
                                                         int num_rows);
+extern void deparseCopySql(StringInfo buf, Relation rel, List *target_attrs);
 extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
                                                         Index rtindex, 
Relation rel,
                                                         List *targetAttrs,
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql 
b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 9a8f9e28135..f973ef07d80 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2635,6 +2635,16 @@ with result as (insert into itrtest values (1, 'test1'), 
(2, 'test2') returning
 drop trigger loct1_br_insert_trigger on loct1;
 drop trigger loct2_br_insert_trigger on loct2;
 
+-- Test batch insert using COPY with batch_with_copy_threshold
+delete from itrtest;
+alter server loopback options (add batch_with_copy_threshold '2', batch_size 
'3');
+
+insert into itrtest values (1, 'test1'), (2, 'test2'), (2, 'test3');
+
+select * from itrtest;
+
+alter server loopback options (drop batch_with_copy_threshold, drop 
batch_size);
+
 drop table itrtest;
 drop table loct1;
 drop table loct2;
@@ -2807,6 +2817,19 @@ select * from rem2;
 
 delete from rem2;
 
+-- Test COPY with batch_with_copy_threshold
+alter foreign table rem2 options (add batch_with_copy_threshold '2');
+
+-- Insert 3 rows so that the third row fallback to normal INSERT statement path
+copy rem2 from stdin;
+1      foo
+2      bar
+3      baz
+\.
+select * from rem2;
+
+delete from rem2;
+
 -- Test check constraints
 alter table loc2 add constraint loc2_f1positive check (f1 >= 0);
 alter foreign table rem2 add constraint rem2_f1positive check (f1 >= 0);
-- 
2.51.2

Reply via email to